diff --git a/lib/index.js b/lib/index.js index 2f1cbe5..52f1310 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,6 +1,6 @@ var childProcess = require('child_process'), - events = require('events'), - os = require('os'), + events = require('events'), + os = require('os'), util = require('util'); @@ -12,12 +12,12 @@ WorkerState = { module.exports.WorkerState = WorkerState; /** - * Encapsulates a worker process. Manages state and notifying listeners of + * Encapsulates a worker process. Manages state and notifying listeners of * changes in state. */ function Worker(workerScript) { this.process = childProcess.fork(workerScript); - this.pid = this.process.pid; + this.pid = this.process.pid; this.status = WorkerState.STARTING; this.process.once('message', this.onReady.bind(this)); @@ -38,7 +38,7 @@ Worker.prototype.onMessage = function (callback, message) { callback(message); this.status = WorkerState.READY; - this.emit('ready', this); + this.emit('ready', this); }; Worker.prototype.send = function (message, callback) { @@ -51,67 +51,110 @@ Worker.prototype.send = function (message, callback) { module.exports.Worker = Worker; -function WorkQueue(workerScript, nWorkers) { +function WorkQueue(workerScript, nWorkers, options) { this.workers = []; this.queue = []; - - var self = this; - function fork() { - var worker = new Worker(workerScript); - - worker.on('ready', self._run.bind(self)); - - worker.process.on('exit', function (code, signal) { - if (code !== 0) { // Code will be non-zero if process dies suddenly - console.warn('Worker process ' + worker.pid + ' died. Respawning...'); - for (var i = 0; i < self.workers.length; i++) { - if (self.workers[i].pid === worker.pid) { - self.workers.splice(i, 1); // Remove dead worker from pool. - } - } - fork(); // FTW! - } - }); - - self.workers.push(worker); - } + this.options = options || {}; + this.workerScript = workerScript; nWorkers = nWorkers || os.cpus().length; console.log('Starting ' + nWorkers + ' workers..'); for (var i = 0; i < nWorkers; i++) { - fork(); + this.fork(workerScript); } } +WorkQueue.prototype.fork = function() { + var self = this; + + var worker = new Worker(this.workerScript); + + worker.on('ready', self._run.bind(self)); + + worker.process.on('exit', function (code, signal) { + if (code !== 0) { // Code will be non-zero if process dies suddenly + console.warn('Worker process ' + worker.pid + ' died with code ' + code + '. Respawning...'); + for (var i = 0; i < self.workers.length; i++) { + if (self.workers[i].pid === worker.pid) { + self.workers.splice(i, 1); // Remove dead worker from pool. + } + } + self.fork(this.workerScript); // FTW! + } else { + console.warn("Dead on purpose, code: " + code); + } + }); + + self.workers.push(worker); +} + /** * Enqueue a task for a worker process to handle. A task can be any type of var, * as long your worker script knows what to do with it. */ -WorkQueue.prototype.enqueue = function (task, callback) { - this.queue.push({ task: task, callback: callback }); +WorkQueue.prototype.enqueue = function (task, timeout, callback) { + if(!callback) { + callback = timeout; + timeout = null; + } + + this.queue.push({ task: task, callback: callback, timeout: timeout }); process.nextTick(this._run.bind(this)); }; -WorkQueue.prototype._run = function (worker) { +WorkQueue.prototype._run = function (worker) { if (this.queue.length === 0) { return; // nothing to do - } + } - if (!worker) { - // Find the first available worker. - for (var i = 0; i < this.workers.length; i++) { + if (!worker) { + // Find the first available worker. + for (var i = 0; i < this.workers.length; i++) { if (this.workers[i].status === WorkerState.READY) { - worker = this.workers[i]; + worker = this.workers[i]; break; } } } - if (!worker) { - return; // there are no workers available to handle requests. Leave queue as is. + if (!worker) { + if(this.options.autoexpand) { + // if maxWorkers is set, make sure we don't exceed it + if(this.options.maxWorkers && this.workers.length > this.options.maxWorkers) { + return; + } + + console.warn("Ran out of workers, forking another"); + this.fork(this.workerScript); + + return; + } else { + return; // there are no workers available to handle requests. Leave queue as is. + } + } + + var queued = this.queue.shift(); + var callback = null; + var self = this; + + if(queued.timeout) { + var timeoutId = null; + + callback = function() { + clearTimeout(timeoutId); + + queued.callback.apply(this, arguments); + }; + + timeoutId = setTimeout(function() { + worker.process.kill('SIGINT'); + + callback.call(this, (self.options.timeoutResult || {})); + }, queued.timeout); + } else { + callback = queued.callback; } - var queued = this.queue.shift(); - worker.send(queued.task, queued.callback); + worker.send(queued.task, callback); }; module.exports.WorkQueue = WorkQueue;