Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ $ npm install nsq.js
except fo the framer:

```
$ DEBUG=nsq*,-nsq:framer node test
$ DEBUG=nsq*,-nsq:framer npm test

nsq:reader connect nsqd 0.0.0.0:4150 events/ingestion [5] +0ms
nsq:connection connect: 0.0.0.0:4150 V2 +0ms
Expand Down Expand Up @@ -151,17 +151,17 @@ Close the writer's connection(s) and fire the optional [fn] when completed.

A single message.

### Message#finish()
### Message#finish([fn])

Mark message as complete.
Mark message as complete..

### Message#requeue([delay])
### Message#requeue([delay], [fn])

Re-queue the message immediately, or with the
given `delay` in milliseconds, or a string such
as "5s", "10m" etc.

### Message#touch()
### Message#touch([fn])

Reset the message's timeout, increasing the length
of time before NSQD considers it timed out.
Expand Down
8 changes: 4 additions & 4 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ machine:

dependencies:
pre:
- wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.3.6.linux-amd64.go1.5.1.tar.gz
- tar xvzf nsq-0.3.6.linux-amd64.go1.5.1.tar.gz
- cp nsq-0.3.6.linux-amd64.go1.5.1/bin/* .
- wget https://github.com/nsqio/nsq/releases/download/v0.3.8/nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
- tar xvzf nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
- cp nsq-0.3.8.linux-amd64.go1.6.2/bin/* .
- ./nsqlookupd:
background: true
- ./nsqd --lookupd-tcp-address=127.0.0.1:4160:
background: true
cache_directories:
- nsq-0.3.6.linux-amd64.go1.5.1
- nsq-0.3.8.linux-amd64.go1.6.2
override:
- npm install
36 changes: 24 additions & 12 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,17 @@ Connection.prototype.ready = function(n){
* @api private
*/

Connection.prototype.finish = function(id){
Connection.prototype.finish = function(id, fn){
assertValidMessageId(id);
var fn = this.onerror;
var self = this;
if (!this._ready) return fn(new Error('cannot finish, connection not ready'));
this.command('FIN', [id], function(){
if (!this._ready) {
var err = new Error('cannot finish, connection not ready');
if (fn) fn(err);
return this.onerror(err);
}
this.command('FIN', [id], function(err){
self.emit('finish', id);
if (fn) fn(err);
--self.inFlight;
});
};
Expand All @@ -361,13 +365,17 @@ Connection.prototype.finish = function(id){
* @api private
*/

Connection.prototype.requeue = function(id, timeout){
Connection.prototype.requeue = function(id, timeout, fn){
assertValidMessageId(id);
var fn = this.onerror;
var self = this;
if (!this._ready) return fn(new Error('cannot requeue, connection not ready'));
this.command('REQ', [id, timeout || 0], function(){
if (!this._ready) {
var err = new Error('cannot requeue, connection not ready');
if (fn) fn(err);
return this.onerror(err);
}
this.command('REQ', [id, timeout || 0], function(err){
self.emit('requeue', id);
if (fn) fn(err);
--self.inFlight;
});
};
Expand All @@ -379,13 +387,17 @@ Connection.prototype.requeue = function(id, timeout){
* @api private
*/

Connection.prototype.touch = function(id){
Connection.prototype.touch = function(id, fn){
assertValidMessageId(id);
var fn = this.onerror;
var self = this;
if (!this._ready) return fn(new Error('cannot touch, connection not ready'));
this.command('TOUCH', [id], function(){
if (!this._ready) {
var err = new Error('cannot touch, connection not ready');
if (fn) fn(err);
return this.onerror(err);
}
this.command('TOUCH', [id], function(err){
self.emit('touch', id);
if (fn) fn(err);
});
};

Expand Down
15 changes: 9 additions & 6 deletions lib/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ function Message(body, conn) {
/**
* Mark the message as finished.
*
* @param {Function} [fn]
* @api public
*/

Message.prototype.finish = function(){
Message.prototype.finish = function(fn){
this.responded = true;
this.conn.finish(this.id);
this.conn.finish(this.id, fn);
this.trace('message:finish', { msg: this });
};

Expand Down Expand Up @@ -77,25 +78,27 @@ Message.prototype.timedout = function(){
* Re-queue the message with optional `delay`.
*
* @param {Number|String} [delay]
* @param {Function} [fn]
* @api public
*/

Message.prototype.requeue = function(delay){
Message.prototype.requeue = function(delay, fn){
if ('string' == typeof delay) delay = ms(delay);
this.responded = true;
this.conn.requeue(this.id, delay);
this.conn.requeue(this.id, delay, fn);
this.trace('message:requeue', { msg: this });
};

/**
* Touch the message.
*
* @param {Function} [fn]
* @api public
*/

Message.prototype.touch = function(){
Message.prototype.touch = function(fn){
this.lastTouch = Date.now();
this.conn.touch(this.id);
this.conn.touch(this.id, fn);
this.trace('message:touch', { msg: this });
};

Expand Down
18 changes: 9 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
"queue"
],
"dependencies": {
"backo": "~1.0.1",
"backo": "~1.1.0",
"bignum": "~0.12.5",
"debug": "~0.7.4",
"ms": "~0.6.2",
"node-int64": "~0.3.0",
"nsq-lookup": "~1.0.0",
"debug": "~2.2.0",
"ms": "~0.7.1",
"node-int64": "~0.4.0",
"nsq-lookup": "~1.0.2",
"set-component": "~1.0.0"
},
"devDependencies": {
"bytes": "~0.3.0",
"jstrace": "~0.1.0",
"bytes": "~2.4.0",
"jstrace": "~0.3.0",
"lodash.get": "^4.4.2",
"mocha": "*",
"should": "*",
"superagent": "~0.17.0",
"uid": "0.0.2"
"superagent": "~2.3.0"
},
"license": "MIT"
}
24 changes: 8 additions & 16 deletions test/acceptance/connection.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@

require('./hooks');

var Connection = require('../../lib/connection');
var assert = require('assert');
var utils = require('../utils');
var uid = require('uid');

describe('Connection', function(){
var topic = uid();
afterEach(function(done){
utils.deleteTopic(topic, function(){
topic = uid();
done();
});
})

it('should identify on connect', function(done){
var conn = new Connection;

Expand All @@ -31,17 +23,17 @@ describe('Connection', function(){
var sub = new Connection;

pub.on('ready', function(){
pub.publish(topic, 'something');
pub.publish('test', 'something');
});

sub.on('ready', function(){
sub.subscribe(topic, 'tailer');
sub.subscribe('test', 'tailer');
sub.ready(5);
});

sub.on('message', function(msg){
msg.finish();
done();
sub.close(done);
});

pub.connect();
Expand All @@ -52,7 +44,7 @@ describe('Connection', function(){
var conn = new Connection;

conn.on('ready', function(){
conn.subscribe(topic, 'tailer', function(err){
conn.subscribe('test', 'tailer', function(err){
assert(!err);
conn.close(done);
});
Expand All @@ -68,7 +60,7 @@ describe('Connection', function(){
conn.on('error', function(){});
conn.on('ready', function(){
conn.sock.destroy();
conn.publish(topic, 'something', function(err){
conn.publish('test', 'something', function(err){
called++;
});
assert.equal(called, 1);
Expand All @@ -94,7 +86,7 @@ describe('Connection', function(){
conn.connect();
conn.on('ready', function(){
conn.end();
conn.publish(topic, 'stuff');
conn.publish('test', 'stuff');

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'test' should be a variable

conn.on('error', done);
conn.on('end', done);
});
Expand Down
17 changes: 17 additions & 0 deletions test/acceptance/hooks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

var assert = require('assert');
var get = require('lodash.get');
var utils = require('../utils');

beforeEach(function(done){
utils.deleteTopic('test', done);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

})

afterEach(function(done){
utils.stats('test', 'reader', function(err, stats){
if (err) return done(err);
var state = get(stats.body.data, 'topics[0].channels[0].clients[0].state');
assert(state != 3, 'client in subscribed state');
done();
})
});
Loading