Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 94 additions & 94 deletions lib/node-redis-pubsub.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"use strict";
var redis = require('redis');
const redis = require('redis')
const noop = () => {}

/**
* Create a new NodeRedisPubsub instance that can subscribe to channels and publish messages
Expand All @@ -8,112 +8,112 @@ var redis = require('redis');
* scope - Optional, two NodeRedisPubsubs with different scopes will not share messages
* url - Optional, a correctly formed redis connection url
*/
function NodeRedisPubsub(options){
if (!(this instanceof NodeRedisPubsub)){ return new NodeRedisPubsub(options); }

if(!options)
options = {};

var auth = options.auth;
var redisUrl = options.url;

options.port = options.port || 6379; // 6379 is Redis' default
options.host = options.host || '127.0.0.1';

// Need to create two Redis clients as one cannot be both in receiver and emitter mode
// I wonder why that is, by the way ...
if (!redisUrl) {
this.emitter = redis.createClient(options);
this.receiver = redis.createClient(options);
} else {
delete options.url;
this.emitter = redis.createClient(redisUrl, options);
this.receiver = redis.createClient(redisUrl, options);
class NodeRedisPubsub {
constructor(opts) {
const options = Object.assign({}, { port: 6379, host: '127.0.0.1' }, opts)
const { auth = null, redisUrl = null } = options

if (!redisUrl) {
this.emitter = redis.createClient(options)
this.receiver = redis.createClient(options)
} else {
delete options.url
this.emitter = redis.createClient(redisUrl, options)
this.receiver = redis.createClient(redisUrl, options)
}

if (auth) {
this.emitter.auth(auth)
this.receiver.auth(auth)
}

this.receiver.setMaxListeners(0)
this.prefix = options.scope ? options.scope + ':' : ''
}

if(auth){
this.emitter.auth(auth);
this.receiver.auth(auth);
/**
* Return the emitter object to be used as a regular redis client to save resources.
*/
getRedisClient() {
return this.emitter
}

this.receiver.setMaxListeners(0);
this.prefix = options.scope ? options.scope + ':' : '';
}
/**
* Subscribe to a channel
* @param {String} channel The channel to subscribe to, can be a pattern e.g. 'user.*'
* @param {Function} handler Function to call with the received message.
* @param {Function} cb Optional callback to call once the handler is registered.
*
*/
on(channel, handler, callback = noop) {
if (channel === 'error') {
this.errorHandler = handler
this.emitter.on('error', handler)
this.receiver.on('error', handler)
callback()
return
}

/**
* Return the emitter object to be used as a regular redis client to save resources.
*/
NodeRedisPubsub.prototype.getRedisClient = function(){
return this.emitter;
};
const fullChannel = `${this.prefix}${channel}`

/**
* Subscribe to a channel
* @param {String} channel The channel to subscribe to, can be a pattern e.g. 'user.*'
* @param {Function} handler Function to call with the received message.
* @param {Function} cb Optional callback to call once the handler is registered.
*
*/
NodeRedisPubsub.prototype.on = NodeRedisPubsub.prototype.subscribe = function(channel, handler, callback){
if(!callback)
callback = function(){};
var self = this;

if(channel === "error"){
self.errorHandler = handler;
this.emitter.on("error", handler);
this.receiver.on("error", handler);
callback();
return;
}
const pmessageHandler = (pattern, channel, message) => {
if (fullChannel !== pattern) return

var pmessageHandler = function(pattern, _channel, message){
if(self.prefix + channel === pattern){
try{
return handler(JSON.parse(message), _channel);
} catch (ex){
if(typeof self.errorHandler === 'function'){
return self.errorHandler("Invalid JSON received! Channel: " + self.prefix + channel + " Message: " + message);
try {
return handler(JSON.parse(message), channel)
} catch (ex) {
if (typeof this.errorHandler === 'function') {
return this.errorHandler(`Invalid JSON received! Channel: ${fullChannel} Message: ${message}`)
}
}
}
}
};

this.receiver.on('pmessage', pmessageHandler);
this.receiver.on('pmessage', pmessageHandler)
this.receiver.psubscribe(fullChannel, callback)

this.receiver.psubscribe(this.prefix + channel, callback);
const removeListener = callback => {
this.receiver.removeListener('pmessage', pmessageHandler)
return this.receiver.punsubscribe(fullChannel, callback)
}

var removeListener = function(callback){
self.receiver.removeListener('pmessage', pmessageHandler);
return self.receiver.punsubscribe(self.prefix + channel, callback);
};
return removeListener
}

return removeListener;
};
subscribe(channel, handler, callback = noop) {
return this.on(channel, handler, callback)
}

/**
* Emit an event
* @param {String} channel Channel on which to emit the message
* @param {Object} message
*/
NodeRedisPubsub.prototype.emit = NodeRedisPubsub.prototype.publish = function (channel, message) {
return this.emitter.publish(this.prefix + channel, JSON.stringify(message));
};
off(channel, callback = noop) {
return this.receiver.punsubscribe(`${this.prefix}${channel}`, callback)
}

/**
* Safely close the redis connections 'soon'
*/
NodeRedisPubsub.prototype.quit = function() {
this.emitter.quit();
this.receiver.quit();
};
/**
* Emit an event
* @param {String} channel Channel on which to emit the message
* @param {Object} message
*/
emit(channel, message) {
return this.emitter.publish(`${this.prefix}${channel}`, JSON.stringify(message))
}

/**
* Dangerously close the redis connections immediately
*/
NodeRedisPubsub.prototype.end = function() {
this.emitter.end();
this.receiver.end();
};
publish(channel, message) {
return this.emit(channel, message)
}

/**
* Safely close the redis connections 'soon'
*/
quit() {
this.emitter.quit()
this.receiver.quit()
}

module.exports = NodeRedisPubsub;
/**
* Dangerously close the redis connections immediately
*/
end() {
this.emitter.end()
this.receiver.end()
}
}
module.exports = NodeRedisPubsub
53 changes: 21 additions & 32 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,40 +1,29 @@
{
"name": "node-redis-pubsub",
"version": "1.0.4",

"name" : "node-redis-pubsub" ,
"version" : "1.0.3" ,

"author" : {
"name" : "Louis Chatriot" ,
"email" : "louis.chatriot@gmail.com"
"author": {
"name": "Louis Chatriot",
"email": "louis.chatriot@gmail.com"
},

"contributors" : [
"Louis Chatriot" ,
"omarmohamed" ,
"Martin Saint-Macary" ,
"Narciso Guillen"
],
"contributors": ["Louis Chatriot", "omarmohamed", "Martin Saint-Macary", "Narciso Guillen", "notVitaliy"],

"description" : "Redis PubSub client for Node",
"description": "Redis PubSub client for Node",

"keywords" : [
"redis" ,
"pubsub" ,
"node" ,
"simple"
],
"keywords": ["redis", "pubsub", "node", "simple"],

"homepage" : "https://github.com/louischatriot/node-redis-pubsub",
"homepage": "https://github.com/louischatriot/node-redis-pubsub",

"dependencies" : {
"redis" : "^2.7.1"
"dependencies": {
"redis": "^2.8.0"
},

"devDependencies" : {
"chai" : "^3.5.0",
"mocha" : "*",
"should" : "*",
"sinon" : "^1.14.1",
"devDependencies": {
"chai": "^3.5.0",
"mocha": "*",
"should": "*",
"sinon": "^1.14.1",
"sinon-chai": "^2.7.0"
},

Expand All @@ -44,14 +33,14 @@
},

"engines": {
"node": "*"
"node": ">=6"
},

"main" : "index",
"main": "index",

"scripts" : {
"test" : "make test"
"scripts": {
"test": "make test"
},

"licence" : "mit"
"licence": "mit"
}
Loading