Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@ nrp.on('mydata:sync', function(myData) {
});
```

### off / unsubscribe

```javascript
function handlerA(){}
function handlerB(){}

nrp.on('channel', handlerA);
nrp.on('channel', handlerB);

nrp.off('channel', handlerB, function(){
// handler is unsubscribe;
nrp.emit('channel', { message }); // Only handlerA gets triggered
});
```

### Unsubscribe

```javascript
Expand Down
75 changes: 62 additions & 13 deletions lib/node-redis-pubsub.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"use strict";
var redis = require('redis');
var Pool = require('./pool');

function initClient(options) {
var auth = options.auth;
var redisUrl = options.url;
var client;

options.port = options.port || 6379; // 6379 is Redis' default
options.host = options.host || '127.0.0.1';

Expand All @@ -18,7 +19,7 @@ function initClient(options) {
if(auth){
client.auth(auth);
}

return client;
}

Expand All @@ -33,9 +34,9 @@ function initClient(options) {
*/
function NodeRedisPubsub(options){
if (!(this instanceof NodeRedisPubsub)){ return new NodeRedisPubsub(options); }

options || (options = {});

// accept connections / clients having the same interface as node_redis clients
// Need to create two Redis clients as one cannot be both in receiver and emitter mode
// I wonder why that is, by the way ...
Expand All @@ -44,16 +45,16 @@ function NodeRedisPubsub(options){
} else {
this.emitter = initClient(options);
}

if(options.receiver) {
this.receiver = options.receiver;
} else {
this.receiver = initClient(options);
this.receiver.setMaxListeners(0);
}

delete options.url;

this.prefix = options.scope ? options.scope + ':' : '';
}

Expand All @@ -72,9 +73,10 @@ NodeRedisPubsub.prototype.getRedisClient = function(){
*
*/
NodeRedisPubsub.prototype.on = NodeRedisPubsub.prototype.subscribe = function(channel, handler, callback){
if(!callback)
callback = function(){};
if(!callback){ callback = function(){}; }

var self = this;
var pool = Pool.of(self.prefix + channel);

if(channel === "error"){
self.errorHandler = handler;
Expand All @@ -85,31 +87,76 @@ NodeRedisPubsub.prototype.on = NodeRedisPubsub.prototype.subscribe = function(ch
}

var pmessageHandler = function(pattern, _channel, message){
if(self.prefix + channel === pattern){
if(self.prefix + channel === pattern){
var jsonmsg = message;
try{
jsonmsg = JSON.parse(message);
} catch (ex){
if(typeof self.errorHandler === 'function'){
return self.errorHandler("Invalid JSON received! Channel: " + self.prefix + channel + " Message: " + message);
}
}
return handler(jsonmsg, _channel);
}
return handler(jsonmsg, _channel);
}
};

this.receiver.on('pmessage', pmessageHandler);

pool.add(handler, pmessageHandler);
this.receiver.psubscribe(this.prefix + channel, callback);

var removeListener = function(callback){
self.receiver.removeListener('pmessage', pmessageHandler);
self.receiver.removeListener('pmessage', pool.pmessageHandler);
return self.receiver.punsubscribe(self.prefix + channel, callback);
};

return removeListener;
};

/**
* Unsubscribe to a channel
* @param {String} channel The channel to unsubscribe from, can be a pattern e.g. 'user.*'
* @param {Function} handler Function to unsubscribe
* @param {Function} cb Optional callback to call once the handler is unsubscribed.
*
*/
NodeRedisPubsub.prototype.off = NodeRedisPubsub.prototype.unsubscribe = function(channel, handler, callback){
(callback) || (callback = function(){});

var self = this;
var namespace = self.prefix + channel;
var pool = Pool.of(namespace);

pool.pmessageHandlers.forEach( pmessageHandler => {
self.receiver.removeListener('pmessage', pmessageHandler);
});

self.receiver.punsubscribe(namespace, function(){

// No hanlder was provided
if(!handler || typeof handler !== 'function'){
pool.flush(); // Clear pool
return callback(); // notify
}

pool.remove(handler);
if(!pool.handlers.length){ return callback(); } // Nothing to subscribe

var done = after(pool.handlers.length, callback);

// Re assign past handlers
pool.handlers.forEach(function(hd){
self.on(namespace, hd, done);
});
});

function after(times, func) {
return function() {
if (--times < 1) { return func.apply(this, arguments); }
};
};
};

/**
* Emit an event
* @param {String} channel Channel on which to emit the message
Expand All @@ -125,6 +172,7 @@ NodeRedisPubsub.prototype.emit = NodeRedisPubsub.prototype.publish = function (c
NodeRedisPubsub.prototype.quit = function() {
this.emitter.quit();
this.receiver.quit();
Pool.flushAll();
};

/**
Expand All @@ -133,6 +181,7 @@ NodeRedisPubsub.prototype.quit = function() {
NodeRedisPubsub.prototype.end = function() {
this.emitter.end(true);
this.receiver.end(true);
Pool.flushAll();
};

module.exports = NodeRedisPubsub;
48 changes: 48 additions & 0 deletions lib/pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
var Pool = {};

// NameSpace Pool definition
function NSP(){
this.handlers = [];
this.pmessageHandlers = [];
}

// Add a new henalder to the namespace pool
NSP.prototype.add = function(hanlder, pmessageHandler){
this.handlers.push(hanlder);
this.pmessageHandlers.push(pmessageHandler);
};

// Remove a hendler from namespace pool
NSP.prototype.remove = function(handler){
this.pmessageHandlers = [];
var index = this.handlers.indexOf(handler);
if(index > -1){ this.handlers.splice(index, 1); } // Remove from pool
};

// Clear current namespace pool
NSP.prototype.flush = function(){
this.handlers = [];
this.pmessageHandlers = [];
};

/*
* Wrap Pool on a specific NameSpace
* */
Pool.of = function(namespace){
if(typeof Pool[namespace] === 'function'){ namespace = '_' + namespace; } // Internal pool Method
(Pool[namespace]) || (Pool[namespace] = new NSP());
return Pool[namespace];
};

/*
* Flush all handlers from Pool
* */
Pool.flushAll = function(){
for (var nsp in Pool) {
if (!Pool.hasOwnProperty(nsp) || typeof Pool[nsp] == 'function') continue;
Pool[nsp].flush();
delete Pool[nsp];
}
};

module.exports = Pool;
49 changes: 27 additions & 22 deletions test/redisQueue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe('Node Redis Pubsub', function () {
rq.emit('a test', { first: 'First message'
, second: 'Second message' });
});

after(function(){
rq.end();
});
Expand Down Expand Up @@ -95,27 +95,32 @@ describe('Node Redis Pubsub', function () {
});
});

// should re-write this test
// it('Should have the ability to unsubscribe', function (done) {
// var rq = new NodeRedisPubsub();
// var called = false;

// rq.should.have.property('off');
// rq.on('a test', function (data){
// called = true;
// }, function(){
// rq.off('a test');
// rq.emit('a test', { });
// });

// setTimeout(function(){
// called.should.be.false;
// done();
// }, 10);
it('Should have the ability to unsubscribe', function (done) {
var rq = new NodeRedisPubsub();
var channel = 'test channel';
var called = { A : false, B : false };

rq.should.have.property('off');
rq.should.have.property('unsubscribe');

var handlerA = function(msg){ called.A = true; };
var handlerB = function(msg){ called.B = true; };

rq.on(channel, handlerA, function(){ // Subscribe Handler A
rq.on(channel, handlerB, function(){ // Subscribe Handler B
rq.off(channel, handlerA, function(){ // Unsubscribe Handler A
rq.emit(channel, {}); // Emit a message
setTimeout(function(){
called.A.should.be.false; // Handler A was NOT called
called.B.should.be.true; // Handler B was called
done();
}, 0);
});
});
});

// });
});


it('Should gracefully handle invalid JSON message data', function (done) {
var rq = new NodeRedisPubsub(conf);

Expand All @@ -138,7 +143,7 @@ describe('Node Redis Pubsub', function () {
rq.end();
});
});

it('Should be able to handle non JSON message data', function(done) {
var rq = new NodeRedisPubsub(conf);

Expand Down Expand Up @@ -182,6 +187,6 @@ describe('Node Redis Pubsub', function () {

});



});