diff --git a/src/TaskWorker.js b/src/TaskWorker.js new file mode 100644 index 0000000..18c739c --- /dev/null +++ b/src/TaskWorker.js @@ -0,0 +1,145 @@ +'use strict' + +const Worker = require('tiny-worker') +const log = require('./log').log + +const MAXIMUM_BACKLOG = 20 + +/** A web worker with a promise-oriented message-call interface. */ +class TaskWorker { + /** + * Create a new TaskWorker. + * @param {string} scriptPath - A path to a JS file to execute. + */ + constructor(scriptPath) { + this.scriptPath = scriptPath + this.backlog = 0 + this.pending = new Map() + this.messageId = 0 + this.lastStarted = null + this.dead = false + this.worker = null + this.start() + } + + /** + * Send a message to this TaskWorker. + * @param {map} message - An object to send to the worker. + * @return {Promise} + */ + send(message) { + if (this.backlog > MAXIMUM_BACKLOG) { + throw new Error('backlog-exceeded') + } + + if (!this.worker) { + throw new Error('Worker not running') + } + + return new Promise((resolve, reject) => { + const messageId = this.messageId + this.messageId += 1 + this.backlog += 1 + + this.worker.postMessage({message: message, messageId: messageId}) + this.pending.set(messageId, [resolve, reject]) + }) + } + + /** + * Handler for messages received from the worker. + * @private + * @param {MessageEvent} event + * @return {Promise} + */ + onmessage(event) { + const pair = this.pending.get(event.data.messageId) + if (!pair) { + log.error(`Got unknown message ID ${event.data.messageId}`) + return + } + + this.backlog -= 1 + this.pending.delete(event.data.messageId) + const [resolve, reject] = pair + if (event.data.error) { + reject(new Error(event.data.error)) + return + } + + resolve(event.data) + } + + /** + * Start the worker process. + * @return {number} + */ + start() { + // Do nothing if the child is still running + if (this.worker && this.worker.child.connected) { + return this.worker.child.pid + } + + // If we died within the past hour, don't restart. Something is wrong + if (this.lastStarted && ((new Date()) - this.lastStarted) < TaskWorker.MIN_RESTART_INTERVAL) { + this.dead = true + } + + if (this.dead) { + return -1 + } + + const worker = new Worker(this.scriptPath) + worker.onmessage = this.onmessage.bind(this) + worker.child.addListener('exit', (code, signal) => { + log.warning(`Worker exited: code=${code} signal=${signal}`) + this.stop() + + // Don't restart if graceful or due to SIGINT + if (code === 0 || signal === 'SIGINT') { + return + } + + // Wait a random interval up to a minute before restarting + // This might help prevent a thundering herd problem + const randomFactor = ( + TaskWorker.MAX_RESTART_TIMEOUT - TaskWorker.MIN_RESTART_TIMEOUT) + + TaskWorker.MIN_RESTART_TIMEOUT + setTimeout(() => this.start(), (Math.random() * randomFactor)) + }) + + + this.stop() + this.worker = worker + + this.lastStarted = new Date() + return this.worker.child.pid + } + + stop() { + for (const pair of this.pending.values()) { + pair[1](new Error('Worker terminated')) + } + + this.backlog = 0 + this.pending.clear() + this.messageId = 0 + + if (this.worker && this.worker.child.connected) { + this.worker.terminate() + } + + this.worker = null + } +} + +// Configurable knobs +// If a restart happens less than this number of ms from the last restart, flag the worker as dead +// Default: 1 hour +TaskWorker.MIN_RESTART_INTERVAL = 1000 * 60 * 60 + +// We wait a random amount of time before restarting a stopped worker. Default: 1-10 seconds +TaskWorker.MIN_RESTART_TIMEOUT = 1000 +TaskWorker.MAX_RESTART_TIMEOUT = 1000 * 9 + +exports.TaskWorker = TaskWorker diff --git a/src/index.js b/src/index.js index 42ed585..13fa6c6 100755 --- a/src/index.js +++ b/src/index.js @@ -12,11 +12,11 @@ const util = require('util') const zlib = require('zlib') const Pool = require('./pool.js').Pool +const TaskWorker = require('./TaskWorker.js').TaskWorker +const log = require('./log.js').log const dive = require('dive') const iltorb = require('iltorb') -const Logger = require('basic-logger') const S3 = require('aws-sdk/clients/s3') -const Worker = require('tiny-worker') process.title = 'marian' @@ -24,17 +24,12 @@ const MAXIMUM_QUERY_LENGTH = 100 // If a worker's backlog rises above this threshold, reject the request. // This prevents the server from getting bogged down for unbounded periods of time. -const MAXIMUM_BACKLOG = 20 const WARNING_BACKLOG = 15 const STANDARD_HEADERS = { 'X-Content-Type-Options': 'nosniff' } -const log = new Logger({ - showTimestamp: true, -}) - /** * Find an acceptable compression format for the client, and return a compressed * version of the content if possible. Otherwise return the original input text. @@ -80,66 +75,6 @@ function checkMethod(req, res, method) { return true } -/** A web worker with a promise-oriented message-call interface. */ -class TaskWorker { - /** - * Create a new TaskWorker. - * @param {string} scriptPath - A path to a JS file to execute. - */ - constructor(scriptPath) { - this.worker = new Worker(scriptPath) - this.worker.onmessage = this.onmessage.bind(this) - - this.backlog = 0 - this.pending = new Map() - this.messageId = 0 - } - - /** - * Send a message to this TaskWorker. - * @param {map} message - An object to send to the worker. - * @return {Promise} - */ - send(message) { - if (this.backlog > MAXIMUM_BACKLOG) { - throw new Error('backlog-exceeded') - } - - return new Promise((resolve, reject) => { - const messageId = this.messageId - this.messageId += 1 - this.backlog += 1 - - this.worker.postMessage({message: message, messageId: messageId}) - this.pending.set(messageId, [resolve, reject]) - }) - } - - /** - * Handler for messages received from the worker. - * @private - * @param {MessageEvent} event - * @return {Promise} - */ - onmessage(event) { - const pair = this.pending.get(event.data.messageId) - if (!pair) { - log.error(`Got unknown message ID ${event.data.messageId}`) - return - } - - this.backlog -= 1 - this.pending.delete(event.data.messageId) - const [resolve, reject] = pair - if (event.data.error) { - reject(new Error(event.data.error)) - return - } - - resolve(event.data) - } -} - class Index { constructor(manifestSource) { this.manifestSource = manifestSource @@ -150,10 +85,11 @@ class Index { this.currentlyIndexing = false const MAX_WORKERS = parseInt(process.env.MAX_WORKERS) || 2 - this.workers = new Pool(Math.min(os.cpus().length, MAX_WORKERS), () => new TaskWorker(pathModule.join(__dirname, 'worker-searcher.js'))) + const nWorkers = Math.min(os.cpus().length, MAX_WORKERS) + this.workers = new Pool(nWorkers, () => new TaskWorker(pathModule.join(__dirname, 'worker-searcher.js'))) // Suspend all of our workers until we have an index - for (const worker of this.workers.pool) { + for (const worker of this.workers) { this.workers.suspend(worker) } } @@ -282,7 +218,7 @@ class Index { this.errors = [] setTimeout(async () => { - for (const worker of this.workers.pool) { + for (const worker of this.workers) { this.workers.suspend(worker) try { await worker.send({sync: manifests}) @@ -396,9 +332,15 @@ class Marian { body = await compress(req, headers, body) // If all workers are overloaded, return 503 - let statusCode = 200 - if (status.workers.filter((n) => n <= WARNING_BACKLOG).length === 0) { - statusCode = 503 + // If a worker is dead, return 500 + let statusCode = 503 + for (const workerState of status.workers) { + if (workerState === 'd') { + statusCode = 500 + break + } else if (workerState <= WARNING_BACKLOG) { + statusCode = 200 + } } res.writeHead(statusCode, headers) @@ -468,8 +410,6 @@ class Marian { } async function main() { - Logger.setLevel('info', true) - const server = new Marian(process.argv[2]) server.start(8080) } diff --git a/src/log.js b/src/log.js new file mode 100644 index 0000000..935942d --- /dev/null +++ b/src/log.js @@ -0,0 +1,8 @@ +'use strict' + +const Logger = require('basic-logger') +Logger.setLevel('info', true) + +exports.log = new Logger({ + showTimestamp: true, +}) diff --git a/src/pool.js b/src/pool.js index e1de882..b66521e 100644 --- a/src/pool.js +++ b/src/pool.js @@ -47,6 +47,10 @@ class Pool { getStatus() { return this.pool.map((worker) => { + if (worker.dead) { + return 'd' + } + if (!this.suspended.has(worker)) { return worker.backlog } @@ -54,6 +58,10 @@ class Pool { return 's' }) } + + [Symbol.iterator]() { + return this.pool.values() + } } exports.Pool = Pool diff --git a/test/test_pool.js b/test/test_pool.js index d6455c6..ed2f08b 100644 --- a/test/test_pool.js +++ b/test/test_pool.js @@ -40,7 +40,7 @@ describe('Pool', () => { }) it('Should throw if no elements are available', () => { - for (const worker of pool.pool) { + for (const worker of pool) { pool.suspend(worker) } diff --git a/test/test_taskworker.js b/test/test_taskworker.js new file mode 100644 index 0000000..8c12666 --- /dev/null +++ b/test/test_taskworker.js @@ -0,0 +1,43 @@ +/* eslint-env node, mocha */ +'use strict' + +const assert = require('assert') +const TaskWorker = require('../src/TaskWorker.js').TaskWorker +TaskWorker.MIN_RESTART_INTERVAL = 200 +TaskWorker.MIN_RESTART_TIMEOUT = 10 +TaskWorker.MAX_RESTART_TIMEOUT = 10 + +function promiseTimeout(time) { + return new Promise(resolve => setTimeout(resolve, time)) +} + +describe('TaskWorker', function() { + this.slow(1000) + + const workerPath = 'test/worker.js' + const worker = new TaskWorker(workerPath) + + it('Should work', async () => { + assert.equal((await worker.send('ping')).message, 'pong') + assert.equal((await worker.send('ping')).message, 'pong') + }) + + it('Should restart and reject stale requests', async () => { + await promiseTimeout(200) + await assert.rejects(async () => await worker.send('die'), new Error('Worker terminated')) + await promiseTimeout(50) + assert.equal((await worker.send('ping')).message, 'pong') + }) + + it('Should avoid restarting too much', async () => { + assert.strictEqual(worker.dead, false) + await assert.rejects(async () => await worker.send('die'), new Error('Worker terminated')) + await promiseTimeout(10) + await assert.rejects(async () => await worker.send('ping'), new Error('Worker not running')) + assert.strictEqual(worker.dead, true) + }) + + after(() => { + worker.stop() + }) +}) diff --git a/test/worker.js b/test/worker.js new file mode 100644 index 0000000..ed62f2c --- /dev/null +++ b/test/worker.js @@ -0,0 +1,12 @@ +'use strict' + +self.onmessage = (ev) => { + const message = ev.data.message + const messageId = ev.data.messageId + + if (message === 'ping') { + self.postMessage({message: 'pong', messageId: messageId}) + } else if (message === 'die') { + process.exit(1) + } +}