Skip to content
This repository was archived by the owner on Nov 26, 2024. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 83 additions & 40 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
var childProcess = require('child_process'),
events = require('events'),
os = require('os'),
events = require('events'),
os = require('os'),
util = require('util');


Expand All @@ -12,12 +12,12 @@ WorkerState = {
module.exports.WorkerState = WorkerState;

/**
* Encapsulates a worker process. Manages state and notifying listeners of
* Encapsulates a worker process. Manages state and notifying listeners of
* changes in state.
*/
function Worker(workerScript) {
this.process = childProcess.fork(workerScript);
this.pid = this.process.pid;
this.pid = this.process.pid;
this.status = WorkerState.STARTING;

this.process.once('message', this.onReady.bind(this));
Expand All @@ -38,7 +38,7 @@ Worker.prototype.onMessage = function (callback, message) {
callback(message);

this.status = WorkerState.READY;
this.emit('ready', this);
this.emit('ready', this);
};

Worker.prototype.send = function (message, callback) {
Expand All @@ -51,67 +51,110 @@ Worker.prototype.send = function (message, callback) {
module.exports.Worker = Worker;


function WorkQueue(workerScript, nWorkers) {
function WorkQueue(workerScript, nWorkers, options) {
this.workers = [];
this.queue = [];

var self = this;
function fork() {
var worker = new Worker(workerScript);

worker.on('ready', self._run.bind(self));

worker.process.on('exit', function (code, signal) {
if (code !== 0) { // Code will be non-zero if process dies suddenly
console.warn('Worker process ' + worker.pid + ' died. Respawning...');
for (var i = 0; i < self.workers.length; i++) {
if (self.workers[i].pid === worker.pid) {
self.workers.splice(i, 1); // Remove dead worker from pool.
}
}
fork(); // FTW!
}
});

self.workers.push(worker);
}
this.options = options || {};
this.workerScript = workerScript;

nWorkers = nWorkers || os.cpus().length;
console.log('Starting ' + nWorkers + ' workers..');
for (var i = 0; i < nWorkers; i++) {
fork();
this.fork(workerScript);
}
}

WorkQueue.prototype.fork = function() {
var self = this;

var worker = new Worker(this.workerScript);

worker.on('ready', self._run.bind(self));

worker.process.on('exit', function (code, signal) {
if (code !== 0) { // Code will be non-zero if process dies suddenly
console.warn('Worker process ' + worker.pid + ' died with code ' + code + '. Respawning...');
for (var i = 0; i < self.workers.length; i++) {
if (self.workers[i].pid === worker.pid) {
self.workers.splice(i, 1); // Remove dead worker from pool.
}
}
self.fork(this.workerScript); // FTW!
} else {
console.warn("Dead on purpose, code: " + code);
}
});

self.workers.push(worker);
}

/**
* Enqueue a task for a worker process to handle. A task can be any type of var,
* as long your worker script knows what to do with it.
*/
WorkQueue.prototype.enqueue = function (task, callback) {
this.queue.push({ task: task, callback: callback });
WorkQueue.prototype.enqueue = function (task, timeout, callback) {
if(!callback) {
callback = timeout;
timeout = null;
}

this.queue.push({ task: task, callback: callback, timeout: timeout });
process.nextTick(this._run.bind(this));
};

WorkQueue.prototype._run = function (worker) {
WorkQueue.prototype._run = function (worker) {
if (this.queue.length === 0) {
return; // nothing to do
}
}

if (!worker) {
// Find the first available worker.
for (var i = 0; i < this.workers.length; i++) {
if (!worker) {
// Find the first available worker.
for (var i = 0; i < this.workers.length; i++) {
if (this.workers[i].status === WorkerState.READY) {
worker = this.workers[i];
worker = this.workers[i];
break;
}
}
}

if (!worker) {
return; // there are no workers available to handle requests. Leave queue as is.
if (!worker) {
if(this.options.autoexpand) {
// if maxWorkers is set, make sure we don't exceed it
if(this.options.maxWorkers && this.workers.length > this.options.maxWorkers) {
return;
}

console.warn("Ran out of workers, forking another");
this.fork(this.workerScript);

return;
} else {
return; // there are no workers available to handle requests. Leave queue as is.
}
}

var queued = this.queue.shift();
var callback = null;
var self = this;

if(queued.timeout) {
var timeoutId = null;

callback = function() {
clearTimeout(timeoutId);

queued.callback.apply(this, arguments);
};

timeoutId = setTimeout(function() {
worker.process.kill('SIGINT');

callback.call(this, (self.options.timeoutResult || {}));
}, queued.timeout);
} else {
callback = queued.callback;
}

var queued = this.queue.shift();
worker.send(queued.task, queued.callback);
worker.send(queued.task, callback);
};
module.exports.WorkQueue = WorkQueue;