From 81d98b3da584862b7c33aaa1545d60833ae67ff7 Mon Sep 17 00:00:00 2001 From: Narciso Date: Thu, 1 Nov 2018 17:38:42 -0600 Subject: [PATCH 1/3] Implement off/unsubscribe --- README.md | 8 ++++ lib/node-redis-pubsub.js | 80 +++++++++++++++++++++++++++++++++------- test/redisQueue.test.js | 49 +++++++++++++----------- 3 files changed, 102 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index ef9245b..1ae77b6 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,14 @@ nrp.on('mydata:sync', function(myData) { }); ``` +### off / unsubscribe + +```javascript +nrp.off('channel', handler, function(){ + // handler is unsubscribe; +}); +``` + ### Unsubscribe ```javascript diff --git a/lib/node-redis-pubsub.js b/lib/node-redis-pubsub.js index a119115..0d6ae9d 100644 --- a/lib/node-redis-pubsub.js +++ b/lib/node-redis-pubsub.js @@ -1,11 +1,17 @@ "use strict"; var redis = require('redis'); +var Pool = {}; + +Pool.of = function(namespace){ + (Pool[namespace]) || (Pool[namespace] = { handlers : [], pmessageHandlers : [] }); + return Pool[namespace]; +}; function initClient(options) { var auth = options.auth; var redisUrl = options.url; var client; - + options.port = options.port || 6379; // 6379 is Redis' default options.host = options.host || '127.0.0.1'; @@ -18,7 +24,7 @@ function initClient(options) { if(auth){ client.auth(auth); } - + return client; } @@ -33,9 +39,9 @@ function initClient(options) { */ function NodeRedisPubsub(options){ if (!(this instanceof NodeRedisPubsub)){ return new NodeRedisPubsub(options); } - + options || (options = {}); - + // accept connections / clients having the same interface as node_redis clients // Need to create two Redis clients as one cannot be both in receiver and emitter mode // I wonder why that is, by the way ... @@ -44,16 +50,16 @@ function NodeRedisPubsub(options){ } else { this.emitter = initClient(options); } - + if(options.receiver) { this.receiver = options.receiver; } else { this.receiver = initClient(options); this.receiver.setMaxListeners(0); } - + delete options.url; - + this.prefix = options.scope ? options.scope + ':' : ''; } @@ -72,9 +78,10 @@ NodeRedisPubsub.prototype.getRedisClient = function(){ * */ NodeRedisPubsub.prototype.on = NodeRedisPubsub.prototype.subscribe = function(channel, handler, callback){ - if(!callback) - callback = function(){}; + if(!callback){ callback = function(){}; } + var self = this; + var pool = Pool.of(self.prefix + channel); if(channel === "error"){ self.errorHandler = handler; @@ -85,7 +92,7 @@ NodeRedisPubsub.prototype.on = NodeRedisPubsub.prototype.subscribe = function(ch } var pmessageHandler = function(pattern, _channel, message){ - if(self.prefix + channel === pattern){ + if(self.prefix + channel === pattern){ var jsonmsg = message; try{ jsonmsg = JSON.parse(message); @@ -93,23 +100,70 @@ NodeRedisPubsub.prototype.on = NodeRedisPubsub.prototype.subscribe = function(ch if(typeof self.errorHandler === 'function'){ return self.errorHandler("Invalid JSON received! Channel: " + self.prefix + channel + " Message: " + message); } - } - return handler(jsonmsg, _channel); + } + return handler(jsonmsg, _channel); } }; this.receiver.on('pmessage', pmessageHandler); + pool.handlers.push(handler); + pool.pmessageHandlers.push(pmessageHandler); this.receiver.psubscribe(this.prefix + channel, callback); var removeListener = function(callback){ - self.receiver.removeListener('pmessage', pmessageHandler); + self.receiver.removeListener('pmessage', pool.pmessageHandler); return self.receiver.punsubscribe(self.prefix + channel, callback); }; return removeListener; }; +/** + * Unsubscribe to a channel + * @param {String} channel The channel to unsubscribe from, can be a pattern e.g. 'user.*' + * @param {Function} handler Function to unsubscribe + * @param {Function} cb Optional callback to call once the handler is unsubscribed. + * + */ +NodeRedisPubsub.prototype.off = NodeRedisPubsub.prototype.unsubscribe = function(channel, handler, callback){ + (callback) || (callback = function(){}); + + var self = this; + var namespace = self.prefix + channel; + var pool = Pool.of(namespace); + + pool.pmessageHandlers.forEach( pmessageHandler => { + self.receiver.removeListener('pmessage', pmessageHandler); + }); + + self.receiver.punsubscribe(namespace, function(){ + + // No hanlder was provided + if(!handler || typeof handler !== 'function'){ + pool.handlers = []; // Clear pool + return callback(); // notify + } + + var index = pool.handlers.indexOf(handler); + if(index > -1){ pool.handlers.splice(index, 1); } // Remove from pool + if(!pool.handlers.length){ return callback(); } // Nothing to subscribe + + var done = after(pool.handlers.length, callback); + + // Re assign past handlers + pool.handlers.forEach(function(hd){ + self.on(namespace, hd, done); + }); + }); + + function after(times, func) { + return function() { + if (--times < 1) { return func.apply(this, arguments); } + }; + }; +}; + /** * Emit an event * @param {String} channel Channel on which to emit the message diff --git a/test/redisQueue.test.js b/test/redisQueue.test.js index fb7bb4d..6f92d01 100644 --- a/test/redisQueue.test.js +++ b/test/redisQueue.test.js @@ -44,7 +44,7 @@ describe('Node Redis Pubsub', function () { rq.emit('a test', { first: 'First message' , second: 'Second message' }); }); - + after(function(){ rq.end(); }); @@ -95,27 +95,32 @@ describe('Node Redis Pubsub', function () { }); }); - // should re-write this test - // it('Should have the ability to unsubscribe', function (done) { - // var rq = new NodeRedisPubsub(); - // var called = false; - - // rq.should.have.property('off'); - // rq.on('a test', function (data){ - // called = true; - // }, function(){ - // rq.off('a test'); - // rq.emit('a test', { }); - // }); - - // setTimeout(function(){ - // called.should.be.false; - // done(); - // }, 10); + it('Should have the ability to unsubscribe', function (done) { + var rq = new NodeRedisPubsub(); + var channel = 'test channel'; + var called = { A : false, B : false }; + + rq.should.have.property('off'); + rq.should.have.property('unsubscribe'); + + var handlerA = function(msg){ called.A = true; }; + var handlerB = function(msg){ called.B = true; }; + + rq.on(channel, handlerA, function(){ // Subscribe Handler A + rq.on(channel, handlerB, function(){ // Subscribe Handler B + rq.off(channel, handlerA, function(){ // Unsubscribe Handler A + rq.emit(channel, {}); // Emit a message + setTimeout(function(){ + called.A.should.be.false; // Handler A was NOT called + called.B.should.be.true; // Handler B was called + done(); + }, 0); + }); + }); + }); - // }); + }); - it('Should gracefully handle invalid JSON message data', function (done) { var rq = new NodeRedisPubsub(conf); @@ -138,7 +143,7 @@ describe('Node Redis Pubsub', function () { rq.end(); }); }); - + it('Should be able to handle non JSON message data', function(done) { var rq = new NodeRedisPubsub(conf); @@ -182,6 +187,6 @@ describe('Node Redis Pubsub', function () { }); - + }); From d17a43bcbaaac8c1d416f444e87a80e415e1c73d Mon Sep 17 00:00:00 2001 From: Narciso Date: Thu, 1 Nov 2018 17:42:25 -0600 Subject: [PATCH 2/3] Add more documentation on off --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1ae77b6..d2d0987 100644 --- a/README.md +++ b/README.md @@ -106,8 +106,15 @@ nrp.on('mydata:sync', function(myData) { ### off / unsubscribe ```javascript -nrp.off('channel', handler, function(){ +function handlerA(){} +function handlerB(){} + +nrp.on('channel', handlerA); +nrp.on('channel', handlerB); + +nrp.off('channel', handlerB, function(){ // handler is unsubscribe; + nrp.emit('channel', { message }); // Only handlerA gets triggered }); ``` From a73789466db6c93824f3fd453b078b10d8b84367 Mon Sep 17 00:00:00 2001 From: Narciso Date: Mon, 5 Nov 2018 12:21:40 -0600 Subject: [PATCH 3/3] Clear pool on end and quit --- lib/node-redis-pubsub.js | 19 ++++++---------- lib/pool.js | 48 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 12 deletions(-) create mode 100644 lib/pool.js diff --git a/lib/node-redis-pubsub.js b/lib/node-redis-pubsub.js index 0d6ae9d..6fec8cc 100644 --- a/lib/node-redis-pubsub.js +++ b/lib/node-redis-pubsub.js @@ -1,11 +1,6 @@ "use strict"; var redis = require('redis'); -var Pool = {}; - -Pool.of = function(namespace){ - (Pool[namespace]) || (Pool[namespace] = { handlers : [], pmessageHandlers : [] }); - return Pool[namespace]; -}; +var Pool = require('./pool'); function initClient(options) { var auth = options.auth; @@ -107,8 +102,7 @@ NodeRedisPubsub.prototype.on = NodeRedisPubsub.prototype.subscribe = function(ch this.receiver.on('pmessage', pmessageHandler); - pool.handlers.push(handler); - pool.pmessageHandlers.push(pmessageHandler); + pool.add(handler, pmessageHandler); this.receiver.psubscribe(this.prefix + channel, callback); var removeListener = function(callback){ @@ -141,12 +135,11 @@ NodeRedisPubsub.prototype.off = NodeRedisPubsub.prototype.unsubscribe = function // No hanlder was provided if(!handler || typeof handler !== 'function'){ - pool.handlers = []; // Clear pool - return callback(); // notify + pool.flush(); // Clear pool + return callback(); // notify } - var index = pool.handlers.indexOf(handler); - if(index > -1){ pool.handlers.splice(index, 1); } // Remove from pool + pool.remove(handler); if(!pool.handlers.length){ return callback(); } // Nothing to subscribe var done = after(pool.handlers.length, callback); @@ -179,6 +172,7 @@ NodeRedisPubsub.prototype.emit = NodeRedisPubsub.prototype.publish = function (c NodeRedisPubsub.prototype.quit = function() { this.emitter.quit(); this.receiver.quit(); + Pool.flushAll(); }; /** @@ -187,6 +181,7 @@ NodeRedisPubsub.prototype.quit = function() { NodeRedisPubsub.prototype.end = function() { this.emitter.end(true); this.receiver.end(true); + Pool.flushAll(); }; module.exports = NodeRedisPubsub; diff --git a/lib/pool.js b/lib/pool.js new file mode 100644 index 0000000..1712c04 --- /dev/null +++ b/lib/pool.js @@ -0,0 +1,48 @@ +var Pool = {}; + +// NameSpace Pool definition +function NSP(){ + this.handlers = []; + this.pmessageHandlers = []; +} + +// Add a new henalder to the namespace pool +NSP.prototype.add = function(hanlder, pmessageHandler){ + this.handlers.push(hanlder); + this.pmessageHandlers.push(pmessageHandler); +}; + +// Remove a hendler from namespace pool +NSP.prototype.remove = function(handler){ + this.pmessageHandlers = []; + var index = this.handlers.indexOf(handler); + if(index > -1){ this.handlers.splice(index, 1); } // Remove from pool +}; + +// Clear current namespace pool +NSP.prototype.flush = function(){ + this.handlers = []; + this.pmessageHandlers = []; +}; + +/* + * Wrap Pool on a specific NameSpace + * */ +Pool.of = function(namespace){ + if(typeof Pool[namespace] === 'function'){ namespace = '_' + namespace; } // Internal pool Method + (Pool[namespace]) || (Pool[namespace] = new NSP()); + return Pool[namespace]; +}; + +/* + * Flush all handlers from Pool + * */ +Pool.flushAll = function(){ + for (var nsp in Pool) { + if (!Pool.hasOwnProperty(nsp) || typeof Pool[nsp] == 'function') continue; + Pool[nsp].flush(); + delete Pool[nsp]; + } +}; + +module.exports = Pool;