diff --git a/lib/node-redis-pubsub.js b/lib/node-redis-pubsub.js index 460d740..4615e96 100644 --- a/lib/node-redis-pubsub.js +++ b/lib/node-redis-pubsub.js @@ -38,6 +38,7 @@ function NodeRedisPubsub(options){ this.receiver.setMaxListeners(0); this.prefix = options.scope ? options.scope + ':' : ''; + this.channelRefCountMap = new ChannelRefCountMap(); } /** @@ -80,12 +81,14 @@ NodeRedisPubsub.prototype.on = NodeRedisPubsub.prototype.subscribe = function(ch }; this.receiver.on('pmessage', pmessageHandler); - this.receiver.psubscribe(this.prefix + channel, callback); + this.channelRefCountMap.increment(this.prefix + channel); var removeListener = function(callback){ self.receiver.removeListener('pmessage', pmessageHandler); - return self.receiver.punsubscribe(self.prefix + channel, callback); + if (self.channelRefCountMap.decrement(self.prefix + channel)){ + return self.receiver.punsubscribe(self.prefix + channel, callback); + } }; return removeListener; @@ -116,4 +119,44 @@ NodeRedisPubsub.prototype.end = function() { this.receiver.end(); }; + +/** + * Create a map that stores how many listeners we have for each pattern we subscribed on. + */ +function ChannelRefCountMap(){ + this.map = {}; +} + +/** + * Increment by 1 the ref count of that channel + * @param {String} channel Channel of which to increment the ref count + */ +ChannelRefCountMap.prototype.increment = function(channel){ + if (!this.map[channel]){ + this.map[channel] = 1; + } else { + this.map[channel] = this.map[channel] + 1; + } +}; + +/** + * Decrement by 1 the ref count of that channel + * @param {String} channel Channel of which to decrement the ref count + * @return {bool} true if the ref count has reached zero, false otherwise. + */ +ChannelRefCountMap.prototype.decrement = function(channel){ + if (!this.map[channel]){ + throw new Error("Trying to decrement ref count of channel '" + channel + "' more times than it was incremented."); + } + + this.map[channel] = this.map[channel] - 1; + + if (this.map[channel] === 0){ + delete this.map[channel]; + return true; + } else { + return false; + } +}; + module.exports = NodeRedisPubsub;