From 1235c780243d332d6eed2c37322dd4d647f810b2 Mon Sep 17 00:00:00 2001 From: Derek Mansen Date: Wed, 6 Mar 2013 20:17:55 -0500 Subject: [PATCH 1/3] Add timeout and autoexpand support --- lib/index.js | 116 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 76 insertions(+), 40 deletions(-) diff --git a/lib/index.js b/lib/index.js index 2f1cbe5..9bbaecb 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,6 +1,6 @@ var childProcess = require('child_process'), - events = require('events'), - os = require('os'), + events = require('events'), + os = require('os'), util = require('util'); @@ -12,12 +12,12 @@ WorkerState = { module.exports.WorkerState = WorkerState; /** - * Encapsulates a worker process. Manages state and notifying listeners of + * Encapsulates a worker process. Manages state and notifying listeners of * changes in state. */ function Worker(workerScript) { this.process = childProcess.fork(workerScript); - this.pid = this.process.pid; + this.pid = this.process.pid; this.status = WorkerState.STARTING; this.process.once('message', this.onReady.bind(this)); @@ -38,7 +38,7 @@ Worker.prototype.onMessage = function (callback, message) { callback(message); this.status = WorkerState.READY; - this.emit('ready', this); + this.emit('ready', this); }; Worker.prototype.send = function (message, callback) { @@ -51,67 +51,103 @@ Worker.prototype.send = function (message, callback) { module.exports.Worker = Worker; -function WorkQueue(workerScript, nWorkers) { +function WorkQueue(workerScript, nWorkers, options) { this.workers = []; this.queue = []; - - var self = this; - function fork() { - var worker = new Worker(workerScript); - - worker.on('ready', self._run.bind(self)); - - worker.process.on('exit', function (code, signal) { - if (code !== 0) { // Code will be non-zero if process dies suddenly - console.warn('Worker process ' + worker.pid + ' died. Respawning...'); - for (var i = 0; i < self.workers.length; i++) { - if (self.workers[i].pid === worker.pid) { - self.workers.splice(i, 1); // Remove dead worker from pool. - } - } - fork(); // FTW! - } - }); - - self.workers.push(worker); - } + this.options = options || {}; + this.workerScript = workerScript; nWorkers = nWorkers || os.cpus().length; console.log('Starting ' + nWorkers + ' workers..'); for (var i = 0; i < nWorkers; i++) { - fork(); + this.fork(workerScript); } } +WorkQueue.prototype.fork = function() { + var self = this; + + var worker = new Worker(this.workerScript); + + worker.on('ready', self._run.bind(self)); + + worker.process.on('exit', function (code, signal) { + if (code !== 0) { // Code will be non-zero if process dies suddenly + console.warn('Worker process ' + worker.pid + ' died with code ' + code + '. Respawning...'); + for (var i = 0; i < self.workers.length; i++) { + if (self.workers[i].pid === worker.pid) { + self.workers.splice(i, 1); // Remove dead worker from pool. + } + } + self.fork(this.workerScript); // FTW! + } else { + console.warn("Dead on purpose, code: " + code); + } + }); + + self.workers.push(worker); +} + /** * Enqueue a task for a worker process to handle. A task can be any type of var, * as long your worker script knows what to do with it. */ -WorkQueue.prototype.enqueue = function (task, callback) { - this.queue.push({ task: task, callback: callback }); +WorkQueue.prototype.enqueue = function (task, timeout, callback) { + if(!callback) { + callback = timeout; + timeout = null; + } + + this.queue.push({ task: task, callback: callback, timeout: timeout }); process.nextTick(this._run.bind(this)); }; -WorkQueue.prototype._run = function (worker) { +WorkQueue.prototype._run = function (worker) { if (this.queue.length === 0) { return; // nothing to do - } + } - if (!worker) { - // Find the first available worker. - for (var i = 0; i < this.workers.length; i++) { + if (!worker) { + // Find the first available worker. + for (var i = 0; i < this.workers.length; i++) { if (this.workers[i].status === WorkerState.READY) { - worker = this.workers[i]; + worker = this.workers[i]; break; } } } - if (!worker) { - return; // there are no workers available to handle requests. Leave queue as is. + if (!worker) { + if(this.options.autoexpand) { + // if max_workers is set, make sure we don't exceed it + if(this.options.max_workers && this.workers.length > this.options.max_workers) return; + + console.warn("Ran out of workers, forking another"); + this.fork(this.workerScript); + process.nextTick(this._run.bind(this)); + return; + } else { + return; // there are no workers available to handle requests. Leave queue as is. + } + } + + var queued = this.queue.shift(); + var callback = null; + + if(queued.timeout) { + var timeoutId = setTimeout(function() { + worker.process.emit('exit', 1); + }, queued.timeout); + + callback = function() { + clearTimeout(timeoutId); + + queued.callback.apply(this, arguments); + }; + } else { + callback = queued.callback; } - var queued = this.queue.shift(); - worker.send(queued.task, queued.callback); + worker.send(queued.task, callback); }; module.exports.WorkQueue = WorkQueue; From 9232ab91cfce6d4973db32a2e7086e9061edb435 Mon Sep 17 00:00:00 2001 From: Derek Mansen Date: Thu, 7 Mar 2013 09:05:47 -0500 Subject: [PATCH 2/3] Fix up the timeout mechanism --- lib/index.js | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/lib/index.js b/lib/index.js index 9bbaecb..0b6ad41 100644 --- a/lib/index.js +++ b/lib/index.js @@ -119,12 +119,14 @@ WorkQueue.prototype._run = function (worker) { if (!worker) { if(this.options.autoexpand) { - // if max_workers is set, make sure we don't exceed it - if(this.options.max_workers && this.workers.length > this.options.max_workers) return; + // if maxWorkers is set, make sure we don't exceed it + if(this.options.maxWorkers && this.workers.length > this.options.maxWorkers) { + return; + } console.warn("Ran out of workers, forking another"); this.fork(this.workerScript); - process.nextTick(this._run.bind(this)); + return; } else { return; // there are no workers available to handle requests. Leave queue as is. @@ -133,17 +135,20 @@ WorkQueue.prototype._run = function (worker) { var queued = this.queue.shift(); var callback = null; + var self = this; if(queued.timeout) { - var timeoutId = setTimeout(function() { - worker.process.emit('exit', 1); - }, queued.timeout); - callback = function() { clearTimeout(timeoutId); queued.callback.apply(this, arguments); }; + + var timeoutId = setTimeout(function() { + worker.process.kill('SIGINT'); + + callback.call(this, (self.options.timeoutResult || {})); + }, queued.timeout); } else { callback = queued.callback; } From 0e09401c524e2d7e7e214edbfd861f0eb8d477f6 Mon Sep 17 00:00:00 2001 From: Derek Mansen Date: Thu, 7 Mar 2013 09:13:04 -0500 Subject: [PATCH 3/3] Be explicit about where timeoutId is created --- lib/index.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/index.js b/lib/index.js index 0b6ad41..52f1310 100644 --- a/lib/index.js +++ b/lib/index.js @@ -138,13 +138,15 @@ WorkQueue.prototype._run = function (worker) { var self = this; if(queued.timeout) { + var timeoutId = null; + callback = function() { clearTimeout(timeoutId); queued.callback.apply(this, arguments); }; - var timeoutId = setTimeout(function() { + timeoutId = setTimeout(function() { worker.process.kill('SIGINT'); callback.call(this, (self.options.timeoutResult || {}));