diff --git a/README.md b/README.md index ef9245b..d2d0987 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,21 @@ nrp.on('mydata:sync', function(myData) { }); ``` +### off / unsubscribe + +```javascript +function handlerA(){} +function handlerB(){} + +nrp.on('channel', handlerA); +nrp.on('channel', handlerB); + +nrp.off('channel', handlerB, function(){ + // handler is unsubscribe; + nrp.emit('channel', { message }); // Only handlerA gets triggered +}); +``` + ### Unsubscribe ```javascript diff --git a/lib/node-redis-pubsub.js b/lib/node-redis-pubsub.js index a119115..6fec8cc 100644 --- a/lib/node-redis-pubsub.js +++ b/lib/node-redis-pubsub.js @@ -1,11 +1,12 @@ "use strict"; var redis = require('redis'); +var Pool = require('./pool'); function initClient(options) { var auth = options.auth; var redisUrl = options.url; var client; - + options.port = options.port || 6379; // 6379 is Redis' default options.host = options.host || '127.0.0.1'; @@ -18,7 +19,7 @@ function initClient(options) { if(auth){ client.auth(auth); } - + return client; } @@ -33,9 +34,9 @@ function initClient(options) { */ function NodeRedisPubsub(options){ if (!(this instanceof NodeRedisPubsub)){ return new NodeRedisPubsub(options); } - + options || (options = {}); - + // accept connections / clients having the same interface as node_redis clients // Need to create two Redis clients as one cannot be both in receiver and emitter mode // I wonder why that is, by the way ... @@ -44,16 +45,16 @@ function NodeRedisPubsub(options){ } else { this.emitter = initClient(options); } - + if(options.receiver) { this.receiver = options.receiver; } else { this.receiver = initClient(options); this.receiver.setMaxListeners(0); } - + delete options.url; - + this.prefix = options.scope ? options.scope + ':' : ''; } @@ -72,9 +73,10 @@ NodeRedisPubsub.prototype.getRedisClient = function(){ * */ NodeRedisPubsub.prototype.on = NodeRedisPubsub.prototype.subscribe = function(channel, handler, callback){ - if(!callback) - callback = function(){}; + if(!callback){ callback = function(){}; } + var self = this; + var pool = Pool.of(self.prefix + channel); if(channel === "error"){ self.errorHandler = handler; @@ -85,7 +87,7 @@ NodeRedisPubsub.prototype.on = NodeRedisPubsub.prototype.subscribe = function(ch } var pmessageHandler = function(pattern, _channel, message){ - if(self.prefix + channel === pattern){ + if(self.prefix + channel === pattern){ var jsonmsg = message; try{ jsonmsg = JSON.parse(message); @@ -93,23 +95,68 @@ NodeRedisPubsub.prototype.on = NodeRedisPubsub.prototype.subscribe = function(ch if(typeof self.errorHandler === 'function'){ return self.errorHandler("Invalid JSON received! Channel: " + self.prefix + channel + " Message: " + message); } - } - return handler(jsonmsg, _channel); + } + return handler(jsonmsg, _channel); } }; this.receiver.on('pmessage', pmessageHandler); + pool.add(handler, pmessageHandler); this.receiver.psubscribe(this.prefix + channel, callback); var removeListener = function(callback){ - self.receiver.removeListener('pmessage', pmessageHandler); + self.receiver.removeListener('pmessage', pool.pmessageHandler); return self.receiver.punsubscribe(self.prefix + channel, callback); }; return removeListener; }; +/** + * Unsubscribe to a channel + * @param {String} channel The channel to unsubscribe from, can be a pattern e.g. 'user.*' + * @param {Function} handler Function to unsubscribe + * @param {Function} cb Optional callback to call once the handler is unsubscribed. + * + */ +NodeRedisPubsub.prototype.off = NodeRedisPubsub.prototype.unsubscribe = function(channel, handler, callback){ + (callback) || (callback = function(){}); + + var self = this; + var namespace = self.prefix + channel; + var pool = Pool.of(namespace); + + pool.pmessageHandlers.forEach( pmessageHandler => { + self.receiver.removeListener('pmessage', pmessageHandler); + }); + + self.receiver.punsubscribe(namespace, function(){ + + // No hanlder was provided + if(!handler || typeof handler !== 'function'){ + pool.flush(); // Clear pool + return callback(); // notify + } + + pool.remove(handler); + if(!pool.handlers.length){ return callback(); } // Nothing to subscribe + + var done = after(pool.handlers.length, callback); + + // Re assign past handlers + pool.handlers.forEach(function(hd){ + self.on(namespace, hd, done); + }); + }); + + function after(times, func) { + return function() { + if (--times < 1) { return func.apply(this, arguments); } + }; + }; +}; + /** * Emit an event * @param {String} channel Channel on which to emit the message @@ -125,6 +172,7 @@ NodeRedisPubsub.prototype.emit = NodeRedisPubsub.prototype.publish = function (c NodeRedisPubsub.prototype.quit = function() { this.emitter.quit(); this.receiver.quit(); + Pool.flushAll(); }; /** @@ -133,6 +181,7 @@ NodeRedisPubsub.prototype.quit = function() { NodeRedisPubsub.prototype.end = function() { this.emitter.end(true); this.receiver.end(true); + Pool.flushAll(); }; module.exports = NodeRedisPubsub; diff --git a/lib/pool.js b/lib/pool.js new file mode 100644 index 0000000..1712c04 --- /dev/null +++ b/lib/pool.js @@ -0,0 +1,48 @@ +var Pool = {}; + +// NameSpace Pool definition +function NSP(){ + this.handlers = []; + this.pmessageHandlers = []; +} + +// Add a new henalder to the namespace pool +NSP.prototype.add = function(hanlder, pmessageHandler){ + this.handlers.push(hanlder); + this.pmessageHandlers.push(pmessageHandler); +}; + +// Remove a hendler from namespace pool +NSP.prototype.remove = function(handler){ + this.pmessageHandlers = []; + var index = this.handlers.indexOf(handler); + if(index > -1){ this.handlers.splice(index, 1); } // Remove from pool +}; + +// Clear current namespace pool +NSP.prototype.flush = function(){ + this.handlers = []; + this.pmessageHandlers = []; +}; + +/* + * Wrap Pool on a specific NameSpace + * */ +Pool.of = function(namespace){ + if(typeof Pool[namespace] === 'function'){ namespace = '_' + namespace; } // Internal pool Method + (Pool[namespace]) || (Pool[namespace] = new NSP()); + return Pool[namespace]; +}; + +/* + * Flush all handlers from Pool + * */ +Pool.flushAll = function(){ + for (var nsp in Pool) { + if (!Pool.hasOwnProperty(nsp) || typeof Pool[nsp] == 'function') continue; + Pool[nsp].flush(); + delete Pool[nsp]; + } +}; + +module.exports = Pool; diff --git a/test/redisQueue.test.js b/test/redisQueue.test.js index fb7bb4d..6f92d01 100644 --- a/test/redisQueue.test.js +++ b/test/redisQueue.test.js @@ -44,7 +44,7 @@ describe('Node Redis Pubsub', function () { rq.emit('a test', { first: 'First message' , second: 'Second message' }); }); - + after(function(){ rq.end(); }); @@ -95,27 +95,32 @@ describe('Node Redis Pubsub', function () { }); }); - // should re-write this test - // it('Should have the ability to unsubscribe', function (done) { - // var rq = new NodeRedisPubsub(); - // var called = false; - - // rq.should.have.property('off'); - // rq.on('a test', function (data){ - // called = true; - // }, function(){ - // rq.off('a test'); - // rq.emit('a test', { }); - // }); - - // setTimeout(function(){ - // called.should.be.false; - // done(); - // }, 10); + it('Should have the ability to unsubscribe', function (done) { + var rq = new NodeRedisPubsub(); + var channel = 'test channel'; + var called = { A : false, B : false }; + + rq.should.have.property('off'); + rq.should.have.property('unsubscribe'); + + var handlerA = function(msg){ called.A = true; }; + var handlerB = function(msg){ called.B = true; }; + + rq.on(channel, handlerA, function(){ // Subscribe Handler A + rq.on(channel, handlerB, function(){ // Subscribe Handler B + rq.off(channel, handlerA, function(){ // Unsubscribe Handler A + rq.emit(channel, {}); // Emit a message + setTimeout(function(){ + called.A.should.be.false; // Handler A was NOT called + called.B.should.be.true; // Handler B was called + done(); + }, 0); + }); + }); + }); - // }); + }); - it('Should gracefully handle invalid JSON message data', function (done) { var rq = new NodeRedisPubsub(conf); @@ -138,7 +143,7 @@ describe('Node Redis Pubsub', function () { rq.end(); }); }); - + it('Should be able to handle non JSON message data', function(done) { var rq = new NodeRedisPubsub(conf); @@ -182,6 +187,6 @@ describe('Node Redis Pubsub', function () { }); - + });