From 6ed9a8b1b62071a3b1cabafd146b4b0b90a7a61b Mon Sep 17 00:00:00 2001 From: wink Date: Tue, 10 Aug 2010 14:52:56 -0500 Subject: [PATCH 01/26] modified to dispatch queued queries --- lib/mongodb.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/mongodb.js b/lib/mongodb.js index 05375ee..17c4084 100644 --- a/lib/mongodb.js +++ b/lib/mongodb.js @@ -92,6 +92,7 @@ MongoDB.prototype.addQuery = function(callback, ns, query, fields, limit, skip ) if (limit) q.push(limit); if (skip) q.push(skip); this.queries.push(q); + this.dispatch(); } MongoDB.prototype.dispatch = function() { From fab6b43c49d389458f525330a6749c4fc47a1af0 Mon Sep 17 00:00:00 2001 From: wink Date: Wed, 11 Aug 2010 14:40:08 -0500 Subject: [PATCH 02/26] firing 'close' event by calling reallyClose() on connection error --- src/mongo.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/mongo.cc b/src/mongo.cc index 815322b..b9f1b8d 100644 --- a/src/mongo.cc +++ b/src/mongo.cc @@ -19,7 +19,7 @@ extern "C" { } #include "bson.h" -#define DEBUGMODE 0 +#define DEBUGMODE 1 #define pdebug(...) do{if(DEBUGMODE)printf(__VA_ARGS__);}while(0) const int chunk_size(4094); @@ -153,6 +153,7 @@ class Connection : public node::EventEmitter { Ref(); StartWriteWatcher(); + StartReadWatcher(); } void Close() { @@ -385,6 +386,7 @@ class Connection : public node::EventEmitter { else if (readbuflen <= 0) { // socket problem? pdebug("length error on read %d errno = %d\n", readbuflen, errno); + reallyClose(); } else { tmp = static_cast(new char[buflen+readbuflen]); @@ -703,6 +705,7 @@ class Connection : public node::EventEmitter { if (!conn->connected) { StopReadWatcher(); StopWriteWatcher(); + reallyClose(); return; }; pdebug("event %d %d\n", conn->connected, close ? 1 : 0); From f23aab72128b6932158b270da9390a64e2f81de0 Mon Sep 17 00:00:00 2001 From: wink Date: Wed, 11 Aug 2010 14:41:34 -0500 Subject: [PATCH 03/26] turning off debug :> --- src/mongo.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mongo.cc b/src/mongo.cc index b9f1b8d..002a1b2 100644 --- a/src/mongo.cc +++ b/src/mongo.cc @@ -19,7 +19,7 @@ extern "C" { } #include "bson.h" -#define DEBUGMODE 1 +#define DEBUGMODE 0 #define pdebug(...) do{if(DEBUGMODE)printf(__VA_ARGS__);}while(0) const int chunk_size(4094); From 5a81924296b90981def46647a08b6f008a41c5d4 Mon Sep 17 00:00:00 2001 From: wink Date: Wed, 11 Aug 2010 14:52:58 -0500 Subject: [PATCH 04/26] resetting close state so the next event doesn't do anything silly --- src/mongo.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mongo.cc b/src/mongo.cc index 002a1b2..47e856b 100644 --- a/src/mongo.cc +++ b/src/mongo.cc @@ -387,6 +387,7 @@ class Connection : public node::EventEmitter { // socket problem? pdebug("length error on read %d errno = %d\n", readbuflen, errno); reallyClose(); + close = false; } else { tmp = static_cast(new char[buflen+readbuflen]); @@ -706,6 +707,7 @@ class Connection : public node::EventEmitter { StopReadWatcher(); StopWriteWatcher(); reallyClose(); + close = false; return; }; pdebug("event %d %d\n", conn->connected, close ? 1 : 0); From 8819bd6421f6350d2c9bc64426f97a5c92cee4f0 Mon Sep 17 00:00:00 2001 From: wink Date: Sun, 15 Aug 2010 17:56:16 -0500 Subject: [PATCH 05/26] adding upsert capability --- lib/mongodb.js | 9 +++++++-- src/mongo.cc | 8 +++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/lib/mongodb.js b/lib/mongodb.js index 17c4084..ced163e 100644 --- a/lib/mongodb.js +++ b/lib/mongodb.js @@ -18,8 +18,13 @@ Collection.prototype.insert = function(obj) { this.mongo.connection.insert(this.ns, obj); } -Collection.prototype.update = function(cond, obj) { - this.mongo.connection.update(this.ns, cond, obj); +Collection.prototype.update = function(cond, obj, options) { + var db_upsert = 0; + var db_multi_update = 0; + db_upsert = options != null && options['upsert'] != null ? (options['upsert'] == true ? 1 : 0) : db_upsert; + db_multi_update = options != null && options['multi'] != null ? (options['multi'] == true ? 1 : 0) : db_multi_update; + flags = parseInt(db_multi_update.toString() + db_upsert.toString(), 2); + this.mongo.connection.update(this.ns, cond, obj, flags); } Collection.prototype.remove = function(query) { diff --git a/src/mongo.cc b/src/mongo.cc index 47e856b..83e8d91 100644 --- a/src/mongo.cc +++ b/src/mongo.cc @@ -646,6 +646,7 @@ class Connection : public node::EventEmitter { bson cond; bson obj; + int flags = 0; if (args.Length() > 1 && !args[1]->IsUndefined()) { Local query(args[1]->ToObject()); @@ -663,7 +664,12 @@ class Connection : public node::EventEmitter { bson_empty(&obj); } - connection->Update(*ns, cond, obj); + if (args.Length() > 3 && !args[3]->IsUndefined()) { + Local jsflags = args[3]->ToInteger(); + flags = jsflags->Value(); + } + + connection->Update(*ns, cond, obj, flags); bson_destroy(&cond); bson_destroy(&obj); From f1d13877c5d4e382f16af6f5df05a842b882241c Mon Sep 17 00:00:00 2001 From: wink Date: Mon, 16 Aug 2010 11:41:36 -0500 Subject: [PATCH 06/26] beefing up find() as it would fail to return large data sets, now also supports limit and skip as -> find(query, fields, callback, limit, skip) --- lib/mongodb.js | 12 +- src/mongo.cc | 1294 ++++++++++++++++++++++++------------------------ 2 files changed, 663 insertions(+), 643 deletions(-) diff --git a/lib/mongodb.js b/lib/mongodb.js index ced163e..32fbfb5 100644 --- a/lib/mongodb.js +++ b/lib/mongodb.js @@ -8,8 +8,8 @@ function Collection(mongo, db, name) { this.name = name; } -Collection.prototype.find = function(query, fields, callback) { - this.mongo.addQuery(callback, this.ns, query, fields); +Collection.prototype.find = function(query, fields, callback, limit, skip) { + this.mongo.addQuery(callback, this.ns, query, fields, limit, skip); } jjj = JSON.stringify @@ -92,10 +92,10 @@ MongoDB.prototype.close = function() { MongoDB.prototype.addQuery = function(callback, ns, query, fields, limit, skip ) { var q = [ callback, ns ]; - if (query) q.push(query); - if (fields) q.push(fields); - if (limit) q.push(limit); - if (skip) q.push(skip); + if (query) q.push(query); else q.push({}); + if (fields) q.push(fields); else q.push({}); + if (limit) q.push(limit); else q.push(0); + if (skip) q.push(skip); else q.push(0); this.queries.push(q); this.dispatch(); } diff --git a/src/mongo.cc b/src/mongo.cc index 83e8d91..f63f9cb 100644 --- a/src/mongo.cc +++ b/src/mongo.cc @@ -12,10 +12,10 @@ #include extern "C" { - #define MONGO_HAVE_STDINT - #include - #include - #include +#define MONGO_HAVE_STDINT +#include +#include +#include } #include "bson.h" @@ -28,777 +28,797 @@ const int headerSize(sizeof(mongo_header) + sizeof(mongo_reply_fields)); using namespace v8; void setNonBlocking(int sock) { - int sockflags = fcntl(sock, F_GETFL, 0); - fcntl(sock, F_SETFL, sockflags | O_NONBLOCK); + int sockflags = fcntl(sock, F_GETFL, 0); + fcntl(sock, F_SETFL, sockflags | O_NONBLOCK); } class Connection : public node::EventEmitter { - public: +public: - static void - Initialize (Handle target) { - HandleScope scope; + static void + Initialize (Handle target) { + HandleScope scope; - Local t = FunctionTemplate::New(Connection::New); + Local t = FunctionTemplate::New(Connection::New); - t->Inherit(node::EventEmitter::constructor_template); - t->InstanceTemplate()->SetInternalFieldCount(1); + t->Inherit(node::EventEmitter::constructor_template); + t->InstanceTemplate()->SetInternalFieldCount(1); - NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); - NODE_SET_PROTOTYPE_METHOD(t, "close", Close); - NODE_SET_PROTOTYPE_METHOD(t, "find", Find); - NODE_SET_PROTOTYPE_METHOD(t, "insert", Insert); - NODE_SET_PROTOTYPE_METHOD(t, "update", Update); - NODE_SET_PROTOTYPE_METHOD(t, "remove", Remove); + NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); + NODE_SET_PROTOTYPE_METHOD(t, "close", Close); + NODE_SET_PROTOTYPE_METHOD(t, "find", Find); + NODE_SET_PROTOTYPE_METHOD(t, "insert", Insert); + NODE_SET_PROTOTYPE_METHOD(t, "update", Update); + NODE_SET_PROTOTYPE_METHOD(t, "remove", Remove); - target->Set(String::NewSymbol("Connection"), t->GetFunction()); - } + target->Set(String::NewSymbol("Connection"), t->GetFunction()); + } - void StartReadWatcher() { - pdebug("*** Starting read watcher\n"); - ev_io_start(EV_DEFAULT_ &read_watcher); - } + void StartReadWatcher() { + pdebug("*** Starting read watcher\n"); + ev_io_start(EV_DEFAULT_ &read_watcher); + } - void StopReadWatcher() { - pdebug("*** Stopping read watcher\n"); - ev_io_stop(EV_DEFAULT_ &read_watcher); - } + void StopReadWatcher() { + pdebug("*** Stopping read watcher\n"); + ev_io_stop(EV_DEFAULT_ &read_watcher); + } - void StartWriteWatcher() { - pdebug("*** Starting write watcher\n"); - ev_io_start(EV_DEFAULT_ &write_watcher); - } + void StartWriteWatcher() { + pdebug("*** Starting write watcher\n"); + ev_io_start(EV_DEFAULT_ &write_watcher); + } - void StopWriteWatcher() { - pdebug("*** Stopping write watcher\n"); - ev_io_stop(EV_DEFAULT_ &write_watcher); - } + void StopWriteWatcher() { + pdebug("*** Stopping write watcher\n"); + ev_io_stop(EV_DEFAULT_ &write_watcher); + } - void StartConnectWatcher() { - pdebug("*** Starting connect watcher\n"); - ev_io_start(EV_DEFAULT_ &connect_watcher); - } + void StartConnectWatcher() { + pdebug("*** Starting connect watcher\n"); + ev_io_start(EV_DEFAULT_ &connect_watcher); + } - void StopConnectWatcher() { - pdebug("*** Stopping connect watcher\n"); - ev_io_stop(EV_DEFAULT_ &connect_watcher); - } + void StopConnectWatcher() { + pdebug("*** Stopping connect watcher\n"); + ev_io_stop(EV_DEFAULT_ &connect_watcher); + } - void CreateConnection(mongo_connection_options *options) { + void CreateConnection(mongo_connection_options *options) { // MONGO_INIT_EXCEPTION(&conn->exception); - conn->left_opts = (mongo_connection_options *)bson_malloc(sizeof(mongo_connection_options)); - conn->right_opts = NULL; + conn->left_opts = (mongo_connection_options *)bson_malloc(sizeof(mongo_connection_options)); + conn->right_opts = NULL; - if ( options ){ - memcpy( conn->left_opts , options , sizeof( mongo_connection_options ) ); - } else { - strcpy( conn->left_opts->host , "127.0.0.1" ); - conn->left_opts->port = 27017; - } - - MongoCreateSocket(); + if ( options ){ + memcpy( conn->left_opts , options , sizeof( mongo_connection_options ) ); + } else { + strcpy( conn->left_opts->host , "127.0.0.1" ); + conn->left_opts->port = 27017; } - void MongoCreateSocket() { - conn->sock = 0; - conn->connected = 0; + MongoCreateSocket(); + } - memset(conn->sa.sin_zero, 0, sizeof(conn->sa.sin_zero)); - conn->sa.sin_family = AF_INET; - conn->sa.sin_port = htons(conn->left_opts->port); - conn->sa.sin_addr.s_addr = inet_addr(conn->left_opts->host); - conn->addressSize = sizeof(conn->sa); + void MongoCreateSocket() { + conn->sock = 0; + conn->connected = 0; - conn->sock = socket( AF_INET, SOCK_STREAM, 0 ); - if (conn->sock <= 0){ - //return mongo_conn_no_socket; - // throw exception here? - } + memset(conn->sa.sin_zero, 0, sizeof(conn->sa.sin_zero)); + conn->sa.sin_family = AF_INET; + conn->sa.sin_port = htons(conn->left_opts->port); + conn->sa.sin_addr.s_addr = inet_addr(conn->left_opts->host); + conn->addressSize = sizeof(conn->sa); - setNonBlocking(conn->sock); - int res = connect(conn->sock, (struct sockaddr*) &conn->sa, conn->addressSize); + conn->sock = socket( AF_INET, SOCK_STREAM, 0 ); + if (conn->sock <= 0){ + //return mongo_conn_no_socket; + // throw exception here? + } - // make sure we've gotten a non-blocking connection - assert(res < 0); - assert(errno == EINPROGRESS); + setNonBlocking(conn->sock); + int res = connect(conn->sock, (struct sockaddr*) &conn->sa, conn->addressSize); - ev_io_set(&connect_watcher, conn->sock, EV_WRITE); - StartConnectWatcher(); - } + // make sure we've gotten a non-blocking connection + assert(res < 0); + assert(errno == EINPROGRESS); - void Connected() { - StopConnectWatcher(); - setsockopt( conn->sock, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one) ); + ev_io_set(&connect_watcher, conn->sock, EV_WRITE); + StartConnectWatcher(); + } - conn->connected = 1; + void Connected() { + StopConnectWatcher(); + setsockopt( conn->sock, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one) ); - Emit(String::New("connection"), 0, NULL); - } + conn->connected = 1; - void Connect(const char *host, const int32_t port) { - HandleScope scope; + Emit(String::New("connection"), 0, NULL); + } - mongo_connection_options opts; - memcpy(opts.host, host, strlen(host)+1); - opts.host[strlen(host)] = '\0'; - opts.port = port; + void Connect(const char *host, const int32_t port) { + HandleScope scope; - CreateConnection(&opts); + mongo_connection_options opts; + memcpy(opts.host, host, strlen(host)+1); + opts.host[strlen(host)] = '\0'; + opts.port = port; - setNonBlocking(conn->sock); + CreateConnection(&opts); - ev_io_set(&read_watcher, conn->sock, EV_READ); - ev_io_set(&write_watcher, conn->sock, EV_WRITE); + setNonBlocking(conn->sock); - Ref(); - StartWriteWatcher(); - StartReadWatcher(); - } + ev_io_set(&read_watcher, conn->sock, EV_READ); + ev_io_set(&write_watcher, conn->sock, EV_WRITE); - void Close() { - pdebug("--- in Close()\n"); - HandleScope scope; - close = true; - } + Ref(); + StartWriteWatcher(); + StartReadWatcher(); + } - void reallyClose() { - HandleScope scope; - StopWriteWatcher(); - StopReadWatcher(); + void Close() { + pdebug("--- in Close()\n"); + HandleScope scope; + close = true; + } - if (writebuf) { - delete [] writebuf; - writebuf = NULL; - writebuflen = 0; - } + void reallyClose() { + HandleScope scope; + StopWriteWatcher(); + StopReadWatcher(); - if (buf) { - delete [] buf; - buf = NULL; - buflen = 0; - } + if (writebuf) { + delete [] writebuf; + writebuf = NULL; + writebuflen = 0; + } + + if (buf) { + delete [] buf; + buf = NULL; + buflen = 0; + } - buf = writebuf = NULL; + buf = writebuf = NULL; - mongo_destroy(conn); + mongo_destroy(conn); - Emit(String::New("close"), 0, NULL); + Emit(String::New("close"), 0, NULL); - Unref(); - } + Unref(); + } - void CheckBuffer() { - if (buflen < headerSize) return; + void CheckBuffer() { + if (buflen < headerSize) return; - mongo_header reply_head; - mongo_reply_fields reply_fields; + //mongo_header reply_head; + //mongo_reply_fields reply_fields; + if(!expectedlen) + { memcpy(&reply_head, bufptr, sizeof(reply_head)); bufptr += sizeof(reply_head); memcpy(&reply_fields, bufptr, sizeof(reply_fields)); bufptr += sizeof(reply_fields); + bson_little_endian32(&expectedlen, &reply_head.len); + } + + if (expectedlen-buflen == 0) { + // we've gotten the full response + //expectedlen = 0; + ParseReply(reply_head, reply_fields); + + delete [] buf; + buf = bufptr = NULL; + buflen = 0; + expectedlen = 0; + StopReadWatcher(); + StartWriteWatcher(); + } else + StartReadWatcher(); + } + + void ParseReply(mongo_header reply_head, mongo_reply_fields reply_fields) { + HandleScope scope; - int len; - bson_little_endian32(&len, &reply_head.len); - - if (len-buflen == 0) { - // we've gotten the full response - ParseReply(reply_head, reply_fields); + int len;// = expectedlen; + bson_little_endian32(&len, &reply_head.len); - delete [] buf; - buf = bufptr = NULL; - buflen = 0; + char replybuf[len]; - StopReadWatcher(); - StartWriteWatcher(); - } - } + mongo_reply *reply = reinterpret_cast(replybuf); - void ParseReply(mongo_header reply_head, mongo_reply_fields reply_fields) { - HandleScope scope; + reply->head.len = len; + bson_little_endian32(&reply->head.id, &reply_head.id); + bson_little_endian32(&reply->head.responseTo, &reply_head.responseTo); + bson_little_endian32(&reply->head.op, &reply_head.op); - int len; - bson_little_endian32(&len, &reply_head.len); + bson_little_endian32(&reply->fields.flag, &reply_fields.flag); + bson_little_endian64(&reply->fields.cursorID, &reply_fields.cursorID); + bson_little_endian32(&reply->fields.start, &reply_fields.start); + bson_little_endian32(&reply->fields.num, &reply_fields.num); - char replybuf[len]; + memcpy(&reply->objs, bufptr, len-headerSize); - mongo_reply *reply = reinterpret_cast(replybuf); + cursor->mm = reply; + cursor->current.data = NULL; - reply->head.len = len; - bson_little_endian32(&reply->head.id, &reply_head.id); - bson_little_endian32(&reply->head.responseTo, &reply_head.responseTo); - bson_little_endian32(&reply->head.op, &reply_head.op); - - bson_little_endian32(&reply->fields.flag, &reply_fields.flag); - bson_little_endian64(&reply->fields.cursorID, &reply_fields.cursorID); - bson_little_endian32(&reply->fields.start, &reply_fields.start); - bson_little_endian32(&reply->fields.num, &reply_fields.num); + for (int i = results->Length(); AdvanceCursor(); i++){ + Local val = decodeObjectStr(cursor->current.data); + results->Set(Integer::New(i), val); + } + // if this is the last cursor + if ( (!cursor->mm || ! reply_fields.cursorID) || reply_fields.num == limit) + { + pdebug("finished, forwarding results\n"); + FreeCursor(); + get_more = false; + EmitResults(); + results.Dispose(); + results.Clear(); + results = Persistent::New(Array::New()); + } + } - memcpy(&reply->objs, bufptr, len-headerSize); + void FreeCursor() { + free((void*)cursor->ns); + free(cursor); + cursor = NULL; + } - cursor->mm = reply; - cursor->current.data = NULL; + void EmitResults() { + Emit(String::New("result"), 1, reinterpret_cast *>(&results)); + } - for (int i = results->Length(); AdvanceCursor(); i++){ - Local val = decodeObjectStr(cursor->current.data); - results->Set(Integer::New(i), val); - } + bool AdvanceCursor() { + char* bson_addr; - // if this is the last cursor - if (!cursor->mm || ! reply_fields.cursorID) { - FreeCursor(); - get_more = false; - EmitResults(); - results.Dispose(); - results.Clear(); - results = Persistent::New(Array::New()); - } - } + /* no data */ + if (!cursor->mm || cursor->mm->fields.num == 0) + return false; - void FreeCursor() { - free((void*)cursor->ns); - free(cursor); - cursor = NULL; + /* first */ + if (cursor->current.data == NULL){ + bson_init(&cursor->current, &cursor->mm->objs, 0); + return true; } - void EmitResults() { - Emit(String::New("result"), 1, reinterpret_cast *>(&results)); - } + // new cursor position + bson_addr = cursor->current.data + bson_size(&cursor->current); - bool AdvanceCursor() { - char* bson_addr; + if (bson_addr >= ((char*)cursor->mm + cursor->mm->head.len)){ + // current cursor is out of data + get_more = true; - /* no data */ - if (!cursor->mm || cursor->mm->fields.num == 0) - return false; + // indicate that this is the last result + return false; + } else { + // advance cursor by one object + bson_init(&cursor->current, bson_addr, 0); - /* first */ - if (cursor->current.data == NULL){ - bson_init(&cursor->current, &cursor->mm->objs, 0); - return true; - } + return true; + } + return false; + } - // new cursor position - bson_addr = cursor->current.data + bson_size(&cursor->current); + void BufferMessageToSend(mongo_message *mm) { + mongo_header head; + bson_little_endian32(&head.len, &mm->head.len); + bson_little_endian32(&head.id, &mm->head.id); + bson_little_endian32(&head.responseTo, &mm->head.responseTo); + bson_little_endian32(&head.op, &mm->head.op); - if (bson_addr >= ((char*)cursor->mm + cursor->mm->head.len)){ - // current cursor is out of data - get_more = true; + int size = mm->head.len; + pdebug("buffering message of size %d\n", size); - // indicate that this is the last result - return false; - } else { - // advance cursor by one object - bson_init(&cursor->current, bson_addr, 0); + char *tmp = new char[writebuflen+size]; - return true; - } - return false; + if (writebuf) { + memcpy(tmp, writebuf, writebuflen); } - void BufferMessageToSend(mongo_message *mm) { - mongo_header head; - bson_little_endian32(&head.len, &mm->head.len); - bson_little_endian32(&head.id, &mm->head.id); - bson_little_endian32(&head.responseTo, &mm->head.responseTo); - bson_little_endian32(&head.op, &mm->head.op); + memcpy(tmp+writebuflen, &head, sizeof(head)); + memcpy(tmp+writebuflen+sizeof(head), &mm->data, size-sizeof(head)); + free(mm); + + int ptrdelta = writebufptr - writebuf; - int size = mm->head.len; - pdebug("buffering message of size %d\n", size); + if (writebuf) { + delete [] writebuf; + } - char *tmp = new char[writebuflen+size]; + writebuflen = writebuflen + size; + writebuf = tmp; + writebufptr = tmp + ptrdelta; + pdebug("write buf is of size %d\n", writebuflen); + pdebug("est lenRem = %d\n", writebuflen-ptrdelta); + pdebug("wbuf diff = %d\n", ptrdelta); + StartWriteWatcher(); + } - if (writebuf) { - memcpy(tmp, writebuf, writebuflen); - } + void WriteSendBuffer() { + pdebug("going to write buffer\n"); - memcpy(tmp+writebuflen, &head, sizeof(head)); - memcpy(tmp+writebuflen+sizeof(head), &mm->data, size-sizeof(head)); - free(mm); + int sock = conn->sock; + int lenRemaining = writebuflen-(writebufptr-writebuf); - int ptrdelta = writebufptr - writebuf; + pdebug("remaining: %d\n", lenRemaining); + while (lenRemaining) { + pdebug("trying to write %d\n", lenRemaining); + int sent = write(sock, writebufptr, lenRemaining); + pdebug("write = %d\n", sent); + if (sent == -1) { + if (errno == EAGAIN) { + // we need to set the write watcher again and continue + // later + pdebug("EAGAIN\n"); - if (writebuf) { - delete [] writebuf; + StartWriteWatcher(); + return; } + else { + pdebug("errorno was %d\n", errno); + } + } + writebufptr += sent; + lenRemaining -= sent; + } + if (!lenRemaining) { + delete [] writebuf; + writebufptr = writebuf = NULL; + writebuflen = 0; + } + pdebug("done! write buf is of size %d\n", writebuflen); + pdebug("done! est lenRem = %d\n", writebuflen-(writebufptr-writebuf)); + pdebug("done! wbuf diff = %d\n", (writebufptr-writebuf)); + StopWriteWatcher(); + } + + void ConsumeInput() { + char *tmp; + char readbuf[chunk_size]; + int32_t readbuflen; + + for (;;) { + readbuflen = read(conn->sock, readbuf, chunk_size); + pdebug("reading %d bytes current: %d expected: %d\n", readbuflen, buflen, expectedlen); + + if (readbuflen == -1 && errno == EAGAIN) { + // no more input to consume + pdebug("len == -1 && errno == EAGAIN\n"); + return; + } + else if (readbuflen <= 0) { + // socket problem? + pdebug("length error on read %d errno = %d\n", readbuflen, errno); + reallyClose(); + close = false; + } + else { + tmp = static_cast(new char[buflen+readbuflen]); + memset(tmp, 0, buflen+readbuflen); - writebuflen = writebuflen + size; - writebuf = tmp; - writebufptr = tmp + ptrdelta; - pdebug("write buf is of size %d\n", writebuflen); - pdebug("est lenRem = %d\n", writebuflen-ptrdelta); - pdebug("wbuf diff = %d\n", ptrdelta); - StartWriteWatcher(); - } - - void WriteSendBuffer() { - pdebug("going to write buffer\n"); - - int sock = conn->sock; - int lenRemaining = writebuflen-(writebufptr-writebuf); - - pdebug("remaining: %d\n", lenRemaining); - while (lenRemaining) { - pdebug("trying to write %d\n", lenRemaining); - int sent = write(sock, writebufptr, lenRemaining); - pdebug("write = %d\n", sent); - if (sent == -1) { - if (errno == EAGAIN) { - // we need to set the write watcher again and continue - // later - pdebug("EAGAIN\n"); - - StartWriteWatcher(); - return; - } - else { - pdebug("errorno was %d\n", errno); - } - } - writebufptr += sent; - lenRemaining -= sent; + if (buf) { + memcpy(tmp, buf, buflen); } - if (!lenRemaining) { - delete [] writebuf; - writebufptr = writebuf = NULL; - writebuflen = 0; + memcpy(tmp+buflen, readbuf, readbuflen); + if (buf) { + delete [] buf; } - pdebug("done! write buf is of size %d\n", writebuflen); - pdebug("done! est lenRem = %d\n", writebuflen-(writebufptr-writebuf)); - pdebug("done! wbuf diff = %d\n", (writebufptr-writebuf)); - StopWriteWatcher(); + buflen = buflen + readbuflen; + bufptr = tmp + (bufptr - buf); + buf = tmp; + break; + } } + } - void ConsumeInput() { - char *tmp; - char readbuf[chunk_size]; - int32_t readbuflen; - - for (;;) { - readbuflen = read(conn->sock, readbuf, chunk_size); - - if (readbuflen == -1 && errno == EAGAIN) { - // no more input to consume - pdebug("len == -1 && errno == EAGAIN\n"); - return; - } - else if (readbuflen <= 0) { - // socket problem? - pdebug("length error on read %d errno = %d\n", readbuflen, errno); - reallyClose(); - close = false; - } - else { - tmp = static_cast(new char[buflen+readbuflen]); - memset(tmp, 0, buflen+readbuflen); - - if (buf) { - memcpy(tmp, buf, buflen); - } - memcpy(tmp+buflen, readbuf, readbuflen); - if (buf) { - delete [] buf; - } - buflen = buflen + readbuflen; - bufptr = tmp + (bufptr - buf); - buf = tmp; - break; - } - } - } + void RequestMore() { + HandleScope scope; - void RequestMore() { - HandleScope scope; - - char* data; - int sl = strlen(cursor->ns)+1; - mongo_message * mm = mongo_message_create(16 /*header*/ - +4 /*ZERO*/ - +sl - +4 /*numToReturn*/ - +8 /*cursorID*/ - , 0, 0, mongo_op_get_more); - data = &mm->data; - data = mongo_data_append32(data, &zero); - data = mongo_data_append(data, cursor->ns, sl); - data = mongo_data_append32(data, &zero); - data = mongo_data_append64(data, &(cursor->mm->fields.cursorID)); - - BufferMessageToSend(mm); - } + char* data; + int sl = strlen(cursor->ns)+1; + mongo_message * mm = mongo_message_create(16 /*header*/ + +4 /*ZERO*/ + +sl + +4 /*numToReturn*/ + +8 /*cursorID*/ + , 0, 0, mongo_op_get_more); + data = &mm->data; + data = mongo_data_append32(data, &zero); + data = mongo_data_append(data, cursor->ns, sl); + data = mongo_data_append32(data, &zero); + data = mongo_data_append64(data, &(cursor->mm->fields.cursorID)); + + BufferMessageToSend(mm); + pdebug("asking for more data\n"); + } + + + bool Find(const char *ns, bson *query=0, bson *query_fields=0, + int nToReturn=0, int nToSkip=0, int options=0) { + StartReadWatcher(); + assert(!close); + cursor = static_cast( + bson_malloc(sizeof(mongo_cursor))); + limit = nToReturn; + skip = nToSkip; + + int sl = strlen(ns)+1; + cursor->ns = static_cast(bson_malloc(sl)); + + memcpy(static_cast(const_cast(cursor->ns)), ns, sl); + cursor->conn = conn; + + char * data; + mongo_message * mm = mongo_message_create( 16 + /* header */ + 4 + /* options */ + sl + /* ns */ + 4 + 4 + /* skip,return */ + bson_size( query ) + + bson_size( query_fields ) , + 0 , 0 , mongo_op_query ); + + data = &mm->data; + data = mongo_data_append32(data, &options); + data = mongo_data_append(data, ns, strlen(ns)+ 1); + data = mongo_data_append32(data, &nToSkip); + data = mongo_data_append32(data, &nToReturn); + data = mongo_data_append(data, query->data, bson_size(query)); + if (query_fields) + data = mongo_data_append(data, query_fields->data, bson_size(query_fields)); + + bson_fatal_msg((data == ((char*)mm) + mm->head.len), "query building fail!"); + + BufferMessageToSend(mm); + } + + void Insert(const char *ns, bson obj) { + char * data; + mongo_message *mm = mongo_message_create( 16 /* header */ + + 4 /* ZERO */ + + strlen(ns) + + 1 + bson_size(&obj) + , 0, 0, mongo_op_insert); + + data = &mm->data; + data = mongo_data_append32(data, &zero); + data = mongo_data_append(data, ns, strlen(ns) + 1); + data = mongo_data_append(data, obj.data, bson_size(&obj)); + + BufferMessageToSend(mm); + } + + void Remove(const char *ns, bson cond) { + char * data; + mongo_message * mm = mongo_message_create( 16 /* header */ + + 4 /* ZERO */ + + strlen(ns) + 1 + + 4 /* ZERO */ + + bson_size(&cond) + , 0 , 0 , mongo_op_delete ); + + data = &mm->data; + data = mongo_data_append32(data, &zero); + data = mongo_data_append(data, ns, strlen(ns) + 1); + data = mongo_data_append32(data, &zero); + data = mongo_data_append(data, cond.data, bson_size(&cond)); + BufferMessageToSend(mm); + } + + void Update(const char *ns, bson cond, bson op, int flags=0) { + char * data; + mongo_message * mm = mongo_message_create( 16 /* header */ + + 4 /* ZERO */ + + strlen(ns) + 1 + + 4 /* flags */ + + bson_size(&cond) + + bson_size(&op) + , 0 , 0 , mongo_op_update ); + + data = &mm->data; + data = mongo_data_append32(data, &zero); + data = mongo_data_append(data, ns, strlen(ns) + 1); + data = mongo_data_append32(data, &flags); + data = mongo_data_append(data, cond.data, bson_size(&cond)); + data = mongo_data_append(data, op.data, bson_size(&op)); + + BufferMessageToSend(mm); + } + +protected: + + static Handle + New(const Arguments& args) { + HandleScope scope; + // XXX where should this be deleted? + Connection *connection = new Connection(); + connection->Wrap(args.This()); + return args.This(); + } - bool Find(const char *ns, bson *query=0, bson *query_fields=0, - int nToReturn=0, int nToSkip=0, int options=0) { - StartReadWatcher(); - assert(!close); - cursor = static_cast( - bson_malloc(sizeof(mongo_cursor))); - - int sl = strlen(ns)+1; - cursor->ns = static_cast(bson_malloc(sl)); - - memcpy(static_cast(const_cast(cursor->ns)), ns, sl); - cursor->conn = conn; - - char * data; - mongo_message * mm = mongo_message_create( 16 + /* header */ - 4 + /* options */ - sl + /* ns */ - 4 + 4 + /* skip,return */ - bson_size( query ) + - bson_size( query_fields ) , - 0 , 0 , mongo_op_query ); - - data = &mm->data; - data = mongo_data_append32(data, &options); - data = mongo_data_append(data, ns, strlen(ns)+ 1); - data = mongo_data_append32(data, &nToSkip); - data = mongo_data_append32(data, &nToReturn); - data = mongo_data_append(data, query->data, bson_size(query)); - if (query_fields) - data = mongo_data_append(data, query_fields->data, bson_size(query_fields)); - - bson_fatal_msg((data == ((char*)mm) + mm->head.len), "query building fail!"); - - BufferMessageToSend(mm); - } + ~Connection() { + } - void Insert(const char *ns, bson obj) { - char * data; - mongo_message *mm = mongo_message_create( 16 /* header */ - + 4 /* ZERO */ - + strlen(ns) - + 1 + bson_size(&obj) - , 0, 0, mongo_op_insert); + Connection() : node::EventEmitter() { + HandleScope scope; + results = Persistent::New(Array::New()); + + close = false; + cursor = false; + get_more = false; + buflen = writebuflen = 0; + buf = bufptr = writebuf = writebufptr = NULL; + expectedlen = 0; + limit = 0; + skip = 0; + + ev_init(&read_watcher, io_event); + read_watcher.data = this; + ev_init(&write_watcher, io_event); + write_watcher.data = this; + ev_init(&connect_watcher, connect_event); + connect_watcher.data = this; + } + + static Handle + Connect(const Arguments &args) { + HandleScope scope; + Connection *connection = ObjectWrap::Unwrap(args.This()); - data = &mm->data; - data = mongo_data_append32(data, &zero); - data = mongo_data_append(data, ns, strlen(ns) + 1); - data = mongo_data_append(data, obj.data, bson_size(&obj)); + // XXX check args.Length + String::Utf8Value host(args[0]->ToString()); + connection->Connect(*host, args[1]->Int32Value()); - BufferMessageToSend(mm); - } + return Undefined(); + } - void Remove(const char *ns, bson cond) { - char * data; - mongo_message * mm = mongo_message_create( 16 /* header */ - + 4 /* ZERO */ - + strlen(ns) + 1 - + 4 /* ZERO */ - + bson_size(&cond) - , 0 , 0 , mongo_op_delete ); - - data = &mm->data; - data = mongo_data_append32(data, &zero); - data = mongo_data_append(data, ns, strlen(ns) + 1); - data = mongo_data_append32(data, &zero); - data = mongo_data_append(data, cond.data, bson_size(&cond)); - BufferMessageToSend(mm); - } - - void Update(const char *ns, bson cond, bson op, int flags=0) { - char * data; - mongo_message * mm = mongo_message_create( 16 /* header */ - + 4 /* ZERO */ - + strlen(ns) + 1 - + 4 /* flags */ - + bson_size(&cond) - + bson_size(&op) - , 0 , 0 , mongo_op_update ); - - data = &mm->data; - data = mongo_data_append32(data, &zero); - data = mongo_data_append(data, ns, strlen(ns) + 1); - data = mongo_data_append32(data, &flags); - data = mongo_data_append(data, cond.data, bson_size(&cond)); - data = mongo_data_append(data, op.data, bson_size(&op)); - - BufferMessageToSend(mm); - } + static Handle + Close(const Arguments &args) { + HandleScope scope; + Connection *connection = ObjectWrap::Unwrap(args.This()); - protected: + connection->Close(); - static Handle - New(const Arguments& args) { - HandleScope scope; + return Undefined(); + } - // XXX where should this be deleted? - Connection *connection = new Connection(); - connection->Wrap(args.This()); - return args.This(); - } + static Handle + Find(const Arguments &args) { + HandleScope scope; + Connection *connection = ObjectWrap::Unwrap(args.This()); - ~Connection() { - } + // TODO assert ns != undefined (args.Length > 0) + String::Utf8Value ns(args[0]->ToString()); - Connection() : node::EventEmitter() { - HandleScope scope; - results = Persistent::New(Array::New()); + bson query_bson; + bson query_fields_bson; + int nToReturn(0), nToSkip(0); - close = false; - cursor = false; - get_more = false; - buflen = writebuflen = 0; - buf = bufptr = writebuf = writebufptr = NULL; - - ev_init(&read_watcher, io_event); - read_watcher.data = this; - ev_init(&write_watcher, io_event); - write_watcher.data = this; - ev_init(&connect_watcher, connect_event); - connect_watcher.data = this; + if (args.Length() > 1 && !args[1]->IsUndefined()) { + Local query(args[1]->ToObject()); + query_bson = encodeObject(query); } - - static Handle - Connect(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); - - // XXX check args.Length - String::Utf8Value host(args[0]->ToString()); - connection->Connect(*host, args[1]->Int32Value()); - - return Undefined(); + else { + bson_empty(&query_bson); } - static Handle - Close(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); - - connection->Close(); - - return Undefined(); + if (args.Length() > 2 && !args[2]->IsUndefined()) { + Local query_fields(args[2]->ToObject()); + query_fields_bson = encodeObject(query_fields); + } + else { + bson_empty(&query_fields_bson); } - static Handle - Find(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); - - // TODO assert ns != undefined (args.Length > 0) - String::Utf8Value ns(args[0]->ToString()); - - bson query_bson; - bson query_fields_bson; - int nToReturn(0), nToSkip(0); - - if (args.Length() > 1 && !args[1]->IsUndefined()) { - Local query(args[1]->ToObject()); - query_bson = encodeObject(query); - } - else { - bson_empty(&query_bson); - } - - if (args.Length() > 2 && !args[2]->IsUndefined()) { - Local query_fields(args[2]->ToObject()); - query_fields_bson = encodeObject(query_fields); - } - else { - bson_empty(&query_fields_bson); - } - - if (args.Length() > 3 && !args[3]->IsUndefined()) { - nToReturn = args[3]->Int32Value(); - } - - if (args.Length() > 4 && !args[4]->IsUndefined()) { - nToSkip = args[4]->Int32Value(); - } - - connection->Find(*ns, &query_bson, &query_fields_bson, nToReturn, nToSkip); + if (args.Length() > 3 && !args[3]->IsUndefined()) { + nToReturn = args[3]->Int32Value(); + } - bson_destroy(&query_bson); - bson_destroy(&query_fields_bson); - return Undefined(); + if (args.Length() > 4 && !args[4]->IsUndefined()) { + nToSkip = args[4]->Int32Value(); } - static Handle - Insert(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); + connection->Find(*ns, &query_bson, &query_fields_bson, nToReturn, nToSkip); - String::Utf8Value ns(args[0]->ToString()); - // TODO assert ns != undefined (args.Length > 0) + bson_destroy(&query_bson); + bson_destroy(&query_fields_bson); + return Undefined(); + } - bson obj; + static Handle + Insert(const Arguments &args) { + HandleScope scope; + Connection *connection = ObjectWrap::Unwrap(args.This()); - // XXX check args > 1 - Local query(args[1]->ToObject()); - obj = encodeObject(query); + String::Utf8Value ns(args[0]->ToString()); + // TODO assert ns != undefined (args.Length > 0) - connection->Insert(*ns, obj); + bson obj; - bson_destroy(&obj); - return Undefined(); - } + // XXX check args > 1 + Local query(args[1]->ToObject()); + obj = encodeObject(query); - static Handle - Update(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); + connection->Insert(*ns, obj); - String::Utf8Value ns(args[0]->ToString()); - // TODO assert ns != undefined (args.Length > 0) + bson_destroy(&obj); + return Undefined(); + } - bson cond; - bson obj; - int flags = 0; + static Handle + Update(const Arguments &args) { + HandleScope scope; + Connection *connection = ObjectWrap::Unwrap(args.This()); - if (args.Length() > 1 && !args[1]->IsUndefined()) { - Local query(args[1]->ToObject()); - cond = encodeObject(query); - } - else { - bson_empty(&cond); - } + String::Utf8Value ns(args[0]->ToString()); + // TODO assert ns != undefined (args.Length > 0) - if (args.Length() > 2 && !args[2]->IsUndefined()) { - Local query(args[2]->ToObject()); - obj = encodeObject(query); - } - else { - bson_empty(&obj); - } + bson cond; + bson obj; + int flags = 0; - if (args.Length() > 3 && !args[3]->IsUndefined()) { - Local jsflags = args[3]->ToInteger(); - flags = jsflags->Value(); - } - - connection->Update(*ns, cond, obj, flags); + if (args.Length() > 1 && !args[1]->IsUndefined()) { + Local query(args[1]->ToObject()); + cond = encodeObject(query); + } + else { + bson_empty(&cond); + } - bson_destroy(&cond); - bson_destroy(&obj); - return Undefined(); + if (args.Length() > 2 && !args[2]->IsUndefined()) { + Local query(args[2]->ToObject()); + obj = encodeObject(query); + } + else { + bson_empty(&obj); } - static Handle - Remove(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); - if (!args[0]->IsString()) { - return ThrowException( - Exception::Error( - String::New("ns must be specified"))); - } - String::Utf8Value ns(args[0]->ToString()); + if (args.Length() > 3 && !args[3]->IsUndefined()) { + Local jsflags = args[3]->ToInteger(); + flags = jsflags->Value(); + } + connection->Update(*ns, cond, obj, flags); - bson cond; - if (args.Length() > 1 && args[1]->IsObject()) { - Local query(args[1]->ToObject()); - cond = encodeObject(query); - } - else if (args.Length() > 1 && args[1]->IsUndefined()) { - bson_empty(&cond); - } - else if (args.Length() > 1 && !args[1]->IsObject()) { - return ThrowException( - Exception::Error( - String::New("Condition must be an object"))); - } + bson_destroy(&cond); + bson_destroy(&obj); + return Undefined(); + } - connection->Remove(*ns, cond); + static Handle + Remove(const Arguments &args) { + HandleScope scope; + Connection *connection = ObjectWrap::Unwrap(args.This()); + if (!args[0]->IsString()) { + return ThrowException( + Exception::Error( + String::New("ns must be specified"))); + } + String::Utf8Value ns(args[0]->ToString()); + + + bson cond; + if (args.Length() > 1 && args[1]->IsObject()) { + Local query(args[1]->ToObject()); + cond = encodeObject(query); + } + else if (args.Length() > 1 && args[1]->IsUndefined()) { + bson_empty(&cond); + } + else if (args.Length() > 1 && !args[1]->IsObject()) { + return ThrowException( + Exception::Error( + String::New("Condition must be an object"))); + } + + connection->Remove(*ns, cond); + + bson_destroy(&cond); + return Undefined(); + } + + void Event(EV_P_ ev_io *w, int revents) { + if (!conn->connected) { + StopReadWatcher(); + StopWriteWatcher(); + reallyClose(); + close = false; + return; + }; + pdebug("event %d %d\n", conn->connected, close ? 1 : 0); + if (revents & EV_READ) { + pdebug("!!! got a read event\n"); + StopReadWatcher(); + ConsumeInput(); + CheckBuffer(); + } + if (revents & EV_WRITE) { + pdebug("!!! got a write event\n"); + pdebug("!!! writebuflen = %d\n", writebuflen); + if (writebuflen) { + pdebug("things to write\n"); + WriteSendBuffer(); + } + else { + StopWriteWatcher(); + } - bson_destroy(&cond); - return Undefined(); + if (get_more) { + StopWriteWatcher(); + RequestMore(); + StartReadWatcher(); + get_more = false; + } + else { + Emit(String::New("ready"), 0, NULL); + } } - - void Event(EV_P_ ev_io *w, int revents) { - if (!conn->connected) { - StopReadWatcher(); - StopWriteWatcher(); - reallyClose(); - close = false; - return; - }; - pdebug("event %d %d\n", conn->connected, close ? 1 : 0); - if (revents & EV_READ) { - pdebug("!!! got a read event\n"); - StopReadWatcher(); - ConsumeInput(); - CheckBuffer(); - } - if (revents & EV_WRITE) { - pdebug("!!! got a write event\n"); - pdebug("!!! writebuflen = %d\n", writebuflen); - if (writebuflen) { - pdebug("things to write\n"); - WriteSendBuffer(); - } - else { - StopWriteWatcher(); - } - - if (get_more) { - RequestMore(); - } - else { - Emit(String::New("ready"), 0, NULL); - } - } - if (close) { - pdebug("!!! really closing %d\n", close); - reallyClose(); - close = false; - } - if (revents & EV_ERROR) { - pdebug("!!! got an error event\n"); - } + if (close) { + pdebug("!!! really closing %d\n", close); + reallyClose(); + close = false; + } + if (revents & EV_ERROR) { + pdebug("!!! got an error event\n"); } + } - private: +private: - static void - connect_event(EV_P_ ev_io *w, int revents) { - pdebug("!!! got a connect event\n"); - Connection *connection = static_cast(w->data); - connection->Connected(); - } + static void + connect_event(EV_P_ ev_io *w, int revents) { + pdebug("!!! got a connect event\n"); + Connection *connection = static_cast(w->data); + connection->Connected(); + } - static void - io_event (EV_P_ ev_io *w, int revents) { - Connection *connection = static_cast(w->data); - connection->Event(w, revents); - } + static void + io_event (EV_P_ ev_io *w, int revents) { + Connection *connection = static_cast(w->data); + connection->Event(w, revents); + } - mongo_connection conn[1]; + mongo_connection conn[1]; - // states - bool get_more; - bool close; + // states + bool get_more; + bool close; - mongo_cursor *cursor; + mongo_cursor *cursor; + int limit; + int skip; - Persistent results; + Persistent results; - char *buf; - char *bufptr; - int buflen; + char *buf; + char *bufptr; + int buflen; + int expectedlen; + mongo_header reply_head; + mongo_reply_fields reply_fields; - char *writebuf; - char *writebufptr; - int writebuflen; - ev_io read_watcher; - ev_io write_watcher; - ev_io connect_watcher; + char *writebuf; + char *writebufptr; + int writebuflen; + + ev_io read_watcher; + ev_io write_watcher; + ev_io connect_watcher; }; extern "C" void init (Handle target) { - HandleScope scope; - - target->Set( - String::New("encode"), - FunctionTemplate::New(encode)->GetFunction()); - target->Set( - String::New("decode"), - FunctionTemplate::New(decode)->GetFunction()); - ObjectID::Initialize(target); - Connection::Initialize(target); + HandleScope scope; + + target->Set( + String::New("encode"), + FunctionTemplate::New(encode)->GetFunction()); + target->Set( + String::New("decode"), + FunctionTemplate::New(decode)->GetFunction()); + ObjectID::Initialize(target); + Connection::Initialize(target); } From 723e8964facdf197a45fc3e74a0ab6b5783bbfec Mon Sep 17 00:00:00 2001 From: wink Date: Mon, 16 Aug 2010 15:42:11 -0500 Subject: [PATCH 07/26] janky sort support --- lib/mongodb.js | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/mongodb.js b/lib/mongodb.js index 32fbfb5..a7daa45 100644 --- a/lib/mongodb.js +++ b/lib/mongodb.js @@ -8,8 +8,15 @@ function Collection(mongo, db, name) { this.name = name; } -Collection.prototype.find = function(query, fields, callback, limit, skip) { - this.mongo.addQuery(callback, this.ns, query, fields, limit, skip); +Collection.prototype.find = function(query, fields, callback, limit, skip, sort) { + var cmd = { + "$query" : query ? query : {}, + } + + if(sort) + cmd.$orderby = sort; + //sys.puts(sys.inspect(cmd)); + this.mongo.addQuery(callback, this.ns, cmd, fields, limit, skip); } jjj = JSON.stringify From 9868e82e49bf511fa96e29fe8764c9d93da1c190 Mon Sep 17 00:00:00 2001 From: wink Date: Mon, 16 Aug 2010 15:49:28 -0500 Subject: [PATCH 08/26] using bentruyman's fix for dispatching queries that miss the ready event as it doesnt blindly dispatch them :> --- lib/mongodb.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/mongodb.js b/lib/mongodb.js index a7daa45..91efb10 100644 --- a/lib/mongodb.js +++ b/lib/mongodb.js @@ -64,10 +64,12 @@ function MongoDB() { var self = this; this.connection.addListener("close", function () { + this.isReady = false; self.emit("close"); }); this.connection.addListener("ready", function () { + this.isReady = true; self.dispatch(); }); @@ -104,7 +106,8 @@ MongoDB.prototype.addQuery = function(callback, ns, query, fields, limit, skip ) if (limit) q.push(limit); else q.push(0); if (skip) q.push(skip); else q.push(0); this.queries.push(q); - this.dispatch(); + if(this.isReady) + this.dispatch(); } MongoDB.prototype.dispatch = function() { From 42a0b3715474d3903519a5a8a9814765572b7891 Mon Sep 17 00:00:00 2001 From: wink Date: Wed, 18 Aug 2010 14:58:36 -0500 Subject: [PATCH 09/26] adding getLastError(function(error) { // error }) --- lib/mongodb.js | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/lib/mongodb.js b/lib/mongodb.js index 91efb10..5de000d 100644 --- a/lib/mongodb.js +++ b/lib/mongodb.js @@ -8,6 +8,20 @@ function Collection(mongo, db, name) { this.name = name; } +Collection.prototype.getLastError = function(callback) +{ + ns = this.db + ".$cmd"; + var cmd = { + "query": {getlasterror: 1} + }; + + this.find_one(cmd, {}, ns, function (result) { + callback(result); + }); + +} + + Collection.prototype.find = function(query, fields, callback, limit, skip, sort) { var cmd = { "$query" : query ? query : {}, @@ -19,8 +33,6 @@ Collection.prototype.find = function(query, fields, callback, limit, skip, sort) this.mongo.addQuery(callback, this.ns, cmd, fields, limit, skip); } -jjj = JSON.stringify - Collection.prototype.insert = function(obj) { this.mongo.connection.insert(this.ns, obj); } From d4735bbbbc7731efd1a5dc6cf7868944e1a9ab19 Mon Sep 17 00:00:00 2001 From: wink Date: Fri, 24 Sep 2010 14:57:55 -0500 Subject: [PATCH 10/26] preparing new commit --- src/bson.cc | 370 ----------------------- src/bson.h | 41 --- src/mongo.cc | 824 --------------------------------------------------- 3 files changed, 1235 deletions(-) delete mode 100644 src/bson.cc delete mode 100644 src/bson.h delete mode 100644 src/mongo.cc diff --git a/src/bson.cc b/src/bson.cc deleted file mode 100644 index 59f8b13..0000000 --- a/src/bson.cc +++ /dev/null @@ -1,370 +0,0 @@ -#include -#include -#include -#include -extern "C" { - #define MONGO_HAVE_STDINT - #include -} - -#include "bson.h" - -using namespace std; -using namespace v8; - -Persistent ObjectID::constructor_template; - -void ObjectID::Initialize(Handle target) { - HandleScope scope; - - Local t = FunctionTemplate::New(ObjectID::New); - constructor_template = Persistent::New(t); - constructor_template->InstanceTemplate()->SetInternalFieldCount(1); - constructor_template->SetClassName(String::NewSymbol("ObjectID")); - - NODE_SET_PROTOTYPE_METHOD(ObjectID::constructor_template, "toString", ObjectID::ToString); - - target->Set(String::NewSymbol("ObjectID"), constructor_template->GetFunction()); -} - -Handle ObjectID::New(const Arguments &args) { - HandleScope scope; - - if (args.Length() < 1 - || !args[0]->IsString() - || (args[0]->IsString() - && args[0]->ToString()->Length() != 24)) { - return ThrowException(Exception::Error( - String::New("Argument must be 24 character hex string"))); - } - - String::Utf8Value hex(args[0]->ToString()); - - // XXX where should this be deleted? - ObjectID *o = new ObjectID((const char *) *hex); - o->Wrap(args.This()); - return args.This(); -} - -void ObjectID::str(char *str) { - bson_oid_to_string(&oid, str); -} - -Handle -ObjectID::ToString(const Arguments &args) { - ObjectID *o = ObjectWrap::Unwrap(args.This()); - - HandleScope scope; - char hex[25]; - o->str(hex); - return String::New(hex); -} - -const char * -ToCString(const String::Utf8Value& value) { - return *value ? *value : ""; -} - -inline void -encodeString(bson_buffer *bb, const char *name, const Local element) { - String::Utf8Value v(element); - const char *value(ToCString(v)); - bson_append_string(bb, name, value); -} - -inline void -encodeNumber(bson_buffer *bb, const char *name, const Local element) { - double value(element->NumberValue()); - bson_append_double(bb, name, value); -} - -inline void -encodeInteger(bson_buffer *bb, const char *name, const Local element) { - int value(element->NumberValue()); - bson_append_int(bb, name, value); -} - -inline void -encodeBoolean(bson_buffer *bb, const char *name, const Local element) { - bool value(element->IsTrue()); - bson_append_bool(bb, name, value); -} - -void -encodeObjectID(bson_buffer *bb, const char *name, const Local element) { - // get at the delicious wrapped object centre - Local obj = element->ToObject(); - assert(!obj.IsEmpty()); - assert(obj->InternalFieldCount() > 0); - ObjectID *o = static_cast(Handle::Cast( - obj->GetInternalField(0))->Value()); - bson_oid_t oid; - char oid_hex[25]; - o->str(oid_hex); - bson_oid_from_string(&oid, oid_hex); - bson_append_oid(bb, name, &oid); -} - -void -encodeArray(bson_buffer *bb, const char *name, const Local element) { - Local a = Array::Cast(*element); - bson_buffer *arr = bson_append_start_array(bb, name); - - for (int i = 0, l=a->Length(); i < l; i++) { - Local val = a->Get(Number::New(i)); - stringstream keybuf; - string keyval; - keybuf << i << endl; - keybuf >> keyval; - - if (val->IsString()) { - encodeString(arr, keyval.c_str(), val); - } - else if (val->IsInt32()) { - encodeInteger(arr, keyval.c_str(), val); - } - else if (val->IsNumber()) { - encodeNumber(arr, keyval.c_str(), val); - } - else if (val->IsBoolean()) { - encodeBoolean(arr, keyval.c_str(), val); - } - else if (val->IsArray()) { - encodeArray(arr, keyval.c_str(), val); - } - else if (val->IsObject()) { - bson bson(encodeObject(val)); - bson_append_bson(arr, keyval.c_str(), &bson); - bson_destroy(&bson); - } - } - bson_append_finish_object(arr); -} - -bson encodeObject(const Local element) { - HandleScope scope; - bson_buffer bb; - bson_buffer_init(&bb); - - Local object = element->ToObject(); - Local properties = object->GetPropertyNames(); - - for (int i = 0; i < properties->Length(); i++) { - // get the property name and value - Local prop_name = properties->Get(Integer::New(i)); - Local prop_val = object->Get(prop_name->ToString()); - - // convert the property name to a c string - String::Utf8Value n(prop_name); - const char *pname = ToCString(n); - - // append property using appropriate appender - if (prop_val->IsString()) { - encodeString(&bb, pname, prop_val); - } - else if (prop_val->IsInt32()) { - encodeInteger(&bb, pname, prop_val); - } - else if (prop_val->IsNumber()) { - encodeNumber(&bb, pname, prop_val); - } - else if (prop_val->IsBoolean()) { - encodeBoolean(&bb, pname, prop_val); - } - else if (prop_val->IsArray()) { - encodeArray(&bb, pname, prop_val); - } - else if (prop_val->IsObject() - && ObjectID::constructor_template->HasInstance(prop_val)) { - encodeObjectID(&bb, pname, prop_val); - } - else if (prop_val->IsObject()) { - bson bson(encodeObject(prop_val)); - bson_append_bson(&bb, pname, &bson); - bson_destroy(&bson); - } - } - - bson bson; - bson_from_buffer(&bson, &bb); - - return bson; -} - -Handle -encode(const Arguments &args) { - // TODO assert args.length > 0 - // TODO assert args.type == Object - HandleScope scope; - - bson bson(encodeObject(args[0])); - Handle ret = node::Encode(bson.data, bson_size(&bson), node::BINARY); - bson_destroy(&bson); - return ret; -} - -// Decoding functions - -Handle -decodeString(bson_iterator *i) { - HandleScope scope; - const char *val = bson_iterator_string(i); - Local str = String::New(val); - return scope.Close(str); -} - -Handle -decodeObject(bson_iterator *i) { - HandleScope scope; - bson bson; - bson_iterator_subobject(i, &bson); - Handle sub = decodeObjectStr(bson.data); - return scope.Close(sub); -} - -Handle -decodeObjectID(bson_iterator *i) { - HandleScope scope; - char hex_oid[25]; - bson_oid_t *oid = bson_iterator_oid(i); - bson_oid_to_string(oid, hex_oid); - Handle argv[1]; - argv[0] = String::New(hex_oid); - - Handle obj = - ObjectID::constructor_template->GetFunction()->NewInstance(1, argv); - - return scope.Close(obj); -} - -Handle -decodeDouble(bson_iterator *i) { - HandleScope scope; - double val = bson_iterator_double_raw(i); - Local obj = Number::New(val); - return scope.Close(obj); -} - -Handle -decodeInteger(bson_iterator *i) { - HandleScope scope; - double val = bson_iterator_int_raw(i); - Local obj = Integer::New(val); - return scope.Close(obj); -} - -Handle -decodeBool(bson_iterator *i) { - HandleScope scope; - bson_bool_t val = bson_iterator_bool(i); - Handle obj = Boolean::New(val); - return scope.Close(obj); -} - -Handle -decodeArray(bson_iterator *it) { - HandleScope scope; - bson_iterator sub; - bson_iterator_subiterator(it, &sub); - Local obj = Array::New(); - - for (int i = 0; bson_iterator_next(&sub); i++) { - bson_type type = bson_iterator_type(&sub); - - const char *key = bson_iterator_key(&sub); - - switch (type) { - case bson_string: - obj->Set(Number::New(i), decodeString(&sub)); - break; - case bson_array: - obj->Set(Number::New(i), decodeArray(&sub)); - break; - case bson_object: - obj->Set(Number::New(i), decodeObject(&sub)); - break; - case bson_oid: - obj->Set(Number::New(i), decodeObjectID(&sub)); - break; - case bson_double: - obj->Set(Number::New(i), decodeDouble(&sub)); - break; - case bson_int: - obj->Set(Number::New(i), decodeInteger(&sub)); - break; - case bson_bool: - obj->Set(Number::New(i), decodeBool(&sub)); - break; - } - } - - return scope.Close(obj); -} - -Local -decodeObjectIterator(bson_iterator *it) { - HandleScope scope; - Local obj = Object::New(); - while (bson_iterator_next(it)) { - bson_type type = bson_iterator_type(it); - const char *key = bson_iterator_key(it); - - switch (type) { - case bson_string: - obj->Set(String::New(key), decodeString(it)); - break; - - case bson_array: - obj->Set(String::New(key), decodeArray(it)); - break; - - case bson_object: - obj->Set(String::New(key), decodeObject(it)); - break; - - case bson_oid: - obj->Set(String::New(key), decodeObjectID(it)); - break; - - case bson_double: - obj->Set(String::New(key), decodeDouble(it)); - break; - - case bson_int: - obj->Set(String::New(key), decodeInteger(it)); - break; - - case bson_bool: - obj->Set(String::New(key), decodeBool(it)); - break; - } - } - - return scope.Close(obj); -} - -Local -decodeObjectStr(const char *buf) { - HandleScope scope; - - bson_iterator it; - bson_iterator_init(&it, buf); - - Handle obj = decodeObjectIterator(&it); - return scope.Close(obj); -} - -Handle -decodeObject(const Local str) { - HandleScope scope; - size_t buflen = str->ToString()->Length(); - char buf[buflen]; - node::DecodeWrite(buf, buflen, str, node::BINARY); - return decodeObjectStr(buf); -} - -Handle -decode(const Arguments &args) { - HandleScope scope; - return decodeObject(args[0]); -} diff --git a/src/bson.h b/src/bson.h deleted file mode 100644 index a02c5c6..0000000 --- a/src/bson.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef NODE_BSON_H -#define NODE_BSON_H -#include -#include -#include - -using namespace v8; - -class ObjectID : public node::ObjectWrap { - public: - static v8::Persistent constructor_template; - - static void Initialize(Handle target); - - ObjectID() : ObjectWrap() {} - ~ObjectID() {} - - static Handle ToString(const Arguments &args); - - bson_oid_t get() { return oid; } - void str(char *); - protected: - static Handle New(const Arguments& args); - - ObjectID(const char *hex) : node::ObjectWrap() { - bson_oid_from_string(&oid, hex); - } - - private: - - bson_oid_t oid; -}; - -// v8 wrappers -Handle encode(const Arguments &args); -Handle decode(const Arguments &args); - -v8::Local decodeObjectStr(const char *); -bson encodeObject(const v8::Local element); - -#endif diff --git a/src/mongo.cc b/src/mongo.cc deleted file mode 100644 index f63f9cb..0000000 --- a/src/mongo.cc +++ /dev/null @@ -1,824 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include - -#include -#include -#include - -extern "C" { -#define MONGO_HAVE_STDINT -#include -#include -#include -} -#include "bson.h" - -#define DEBUGMODE 0 -#define pdebug(...) do{if(DEBUGMODE)printf(__VA_ARGS__);}while(0) - -const int chunk_size(4094); -const int headerSize(sizeof(mongo_header) + sizeof(mongo_reply_fields)); - -using namespace v8; - -void setNonBlocking(int sock) { - int sockflags = fcntl(sock, F_GETFL, 0); - fcntl(sock, F_SETFL, sockflags | O_NONBLOCK); -} - -class Connection : public node::EventEmitter { -public: - - static void - Initialize (Handle target) { - HandleScope scope; - - Local t = FunctionTemplate::New(Connection::New); - - t->Inherit(node::EventEmitter::constructor_template); - t->InstanceTemplate()->SetInternalFieldCount(1); - - NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); - NODE_SET_PROTOTYPE_METHOD(t, "close", Close); - NODE_SET_PROTOTYPE_METHOD(t, "find", Find); - NODE_SET_PROTOTYPE_METHOD(t, "insert", Insert); - NODE_SET_PROTOTYPE_METHOD(t, "update", Update); - NODE_SET_PROTOTYPE_METHOD(t, "remove", Remove); - - target->Set(String::NewSymbol("Connection"), t->GetFunction()); - } - - void StartReadWatcher() { - pdebug("*** Starting read watcher\n"); - ev_io_start(EV_DEFAULT_ &read_watcher); - } - - void StopReadWatcher() { - pdebug("*** Stopping read watcher\n"); - ev_io_stop(EV_DEFAULT_ &read_watcher); - } - - void StartWriteWatcher() { - pdebug("*** Starting write watcher\n"); - ev_io_start(EV_DEFAULT_ &write_watcher); - } - - void StopWriteWatcher() { - pdebug("*** Stopping write watcher\n"); - ev_io_stop(EV_DEFAULT_ &write_watcher); - } - - void StartConnectWatcher() { - pdebug("*** Starting connect watcher\n"); - ev_io_start(EV_DEFAULT_ &connect_watcher); - } - - void StopConnectWatcher() { - pdebug("*** Stopping connect watcher\n"); - ev_io_stop(EV_DEFAULT_ &connect_watcher); - } - - void CreateConnection(mongo_connection_options *options) { - // MONGO_INIT_EXCEPTION(&conn->exception); - - conn->left_opts = (mongo_connection_options *)bson_malloc(sizeof(mongo_connection_options)); - conn->right_opts = NULL; - - if ( options ){ - memcpy( conn->left_opts , options , sizeof( mongo_connection_options ) ); - } else { - strcpy( conn->left_opts->host , "127.0.0.1" ); - conn->left_opts->port = 27017; - } - - MongoCreateSocket(); - } - - void MongoCreateSocket() { - conn->sock = 0; - conn->connected = 0; - - memset(conn->sa.sin_zero, 0, sizeof(conn->sa.sin_zero)); - conn->sa.sin_family = AF_INET; - conn->sa.sin_port = htons(conn->left_opts->port); - conn->sa.sin_addr.s_addr = inet_addr(conn->left_opts->host); - conn->addressSize = sizeof(conn->sa); - - conn->sock = socket( AF_INET, SOCK_STREAM, 0 ); - if (conn->sock <= 0){ - //return mongo_conn_no_socket; - // throw exception here? - } - - setNonBlocking(conn->sock); - int res = connect(conn->sock, (struct sockaddr*) &conn->sa, conn->addressSize); - - // make sure we've gotten a non-blocking connection - assert(res < 0); - assert(errno == EINPROGRESS); - - ev_io_set(&connect_watcher, conn->sock, EV_WRITE); - StartConnectWatcher(); - } - - void Connected() { - StopConnectWatcher(); - setsockopt( conn->sock, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one) ); - - conn->connected = 1; - - Emit(String::New("connection"), 0, NULL); - } - - void Connect(const char *host, const int32_t port) { - HandleScope scope; - - mongo_connection_options opts; - memcpy(opts.host, host, strlen(host)+1); - opts.host[strlen(host)] = '\0'; - opts.port = port; - - CreateConnection(&opts); - - setNonBlocking(conn->sock); - - ev_io_set(&read_watcher, conn->sock, EV_READ); - ev_io_set(&write_watcher, conn->sock, EV_WRITE); - - Ref(); - StartWriteWatcher(); - StartReadWatcher(); - } - - void Close() { - pdebug("--- in Close()\n"); - HandleScope scope; - close = true; - } - - void reallyClose() { - HandleScope scope; - StopWriteWatcher(); - StopReadWatcher(); - - if (writebuf) { - delete [] writebuf; - writebuf = NULL; - writebuflen = 0; - } - - if (buf) { - delete [] buf; - buf = NULL; - buflen = 0; - } - - buf = writebuf = NULL; - - mongo_destroy(conn); - - Emit(String::New("close"), 0, NULL); - - Unref(); - } - - void CheckBuffer() { - if (buflen < headerSize) return; - - //mongo_header reply_head; - //mongo_reply_fields reply_fields; - - if(!expectedlen) - { - memcpy(&reply_head, bufptr, sizeof(reply_head)); - bufptr += sizeof(reply_head); - memcpy(&reply_fields, bufptr, sizeof(reply_fields)); - bufptr += sizeof(reply_fields); - bson_little_endian32(&expectedlen, &reply_head.len); - } - - if (expectedlen-buflen == 0) { - // we've gotten the full response - //expectedlen = 0; - ParseReply(reply_head, reply_fields); - - delete [] buf; - buf = bufptr = NULL; - buflen = 0; - expectedlen = 0; - StopReadWatcher(); - StartWriteWatcher(); - } else - StartReadWatcher(); - } - - void ParseReply(mongo_header reply_head, mongo_reply_fields reply_fields) { - HandleScope scope; - - int len;// = expectedlen; - bson_little_endian32(&len, &reply_head.len); - - char replybuf[len]; - - mongo_reply *reply = reinterpret_cast(replybuf); - - reply->head.len = len; - bson_little_endian32(&reply->head.id, &reply_head.id); - bson_little_endian32(&reply->head.responseTo, &reply_head.responseTo); - bson_little_endian32(&reply->head.op, &reply_head.op); - - bson_little_endian32(&reply->fields.flag, &reply_fields.flag); - bson_little_endian64(&reply->fields.cursorID, &reply_fields.cursorID); - bson_little_endian32(&reply->fields.start, &reply_fields.start); - bson_little_endian32(&reply->fields.num, &reply_fields.num); - - memcpy(&reply->objs, bufptr, len-headerSize); - - cursor->mm = reply; - cursor->current.data = NULL; - - for (int i = results->Length(); AdvanceCursor(); i++){ - Local val = decodeObjectStr(cursor->current.data); - results->Set(Integer::New(i), val); - } - // if this is the last cursor - if ( (!cursor->mm || ! reply_fields.cursorID) || reply_fields.num == limit) - { - pdebug("finished, forwarding results\n"); - FreeCursor(); - get_more = false; - EmitResults(); - results.Dispose(); - results.Clear(); - results = Persistent::New(Array::New()); - } - } - - void FreeCursor() { - free((void*)cursor->ns); - free(cursor); - cursor = NULL; - } - - void EmitResults() { - Emit(String::New("result"), 1, reinterpret_cast *>(&results)); - } - - bool AdvanceCursor() { - char* bson_addr; - - /* no data */ - if (!cursor->mm || cursor->mm->fields.num == 0) - return false; - - /* first */ - if (cursor->current.data == NULL){ - bson_init(&cursor->current, &cursor->mm->objs, 0); - return true; - } - - // new cursor position - bson_addr = cursor->current.data + bson_size(&cursor->current); - - if (bson_addr >= ((char*)cursor->mm + cursor->mm->head.len)){ - // current cursor is out of data - get_more = true; - - // indicate that this is the last result - return false; - } else { - // advance cursor by one object - bson_init(&cursor->current, bson_addr, 0); - - return true; - } - return false; - } - - void BufferMessageToSend(mongo_message *mm) { - mongo_header head; - bson_little_endian32(&head.len, &mm->head.len); - bson_little_endian32(&head.id, &mm->head.id); - bson_little_endian32(&head.responseTo, &mm->head.responseTo); - bson_little_endian32(&head.op, &mm->head.op); - - int size = mm->head.len; - pdebug("buffering message of size %d\n", size); - - char *tmp = new char[writebuflen+size]; - - if (writebuf) { - memcpy(tmp, writebuf, writebuflen); - } - - memcpy(tmp+writebuflen, &head, sizeof(head)); - memcpy(tmp+writebuflen+sizeof(head), &mm->data, size-sizeof(head)); - free(mm); - - int ptrdelta = writebufptr - writebuf; - - if (writebuf) { - delete [] writebuf; - } - - writebuflen = writebuflen + size; - writebuf = tmp; - writebufptr = tmp + ptrdelta; - pdebug("write buf is of size %d\n", writebuflen); - pdebug("est lenRem = %d\n", writebuflen-ptrdelta); - pdebug("wbuf diff = %d\n", ptrdelta); - StartWriteWatcher(); - } - - void WriteSendBuffer() { - pdebug("going to write buffer\n"); - - int sock = conn->sock; - int lenRemaining = writebuflen-(writebufptr-writebuf); - - pdebug("remaining: %d\n", lenRemaining); - while (lenRemaining) { - pdebug("trying to write %d\n", lenRemaining); - int sent = write(sock, writebufptr, lenRemaining); - pdebug("write = %d\n", sent); - if (sent == -1) { - if (errno == EAGAIN) { - // we need to set the write watcher again and continue - // later - pdebug("EAGAIN\n"); - - StartWriteWatcher(); - return; - } - else { - pdebug("errorno was %d\n", errno); - } - } - writebufptr += sent; - lenRemaining -= sent; - } - if (!lenRemaining) { - delete [] writebuf; - writebufptr = writebuf = NULL; - writebuflen = 0; - } - pdebug("done! write buf is of size %d\n", writebuflen); - pdebug("done! est lenRem = %d\n", writebuflen-(writebufptr-writebuf)); - pdebug("done! wbuf diff = %d\n", (writebufptr-writebuf)); - StopWriteWatcher(); - } - - void ConsumeInput() { - char *tmp; - char readbuf[chunk_size]; - int32_t readbuflen; - - for (;;) { - readbuflen = read(conn->sock, readbuf, chunk_size); - pdebug("reading %d bytes current: %d expected: %d\n", readbuflen, buflen, expectedlen); - - if (readbuflen == -1 && errno == EAGAIN) { - // no more input to consume - pdebug("len == -1 && errno == EAGAIN\n"); - return; - } - else if (readbuflen <= 0) { - // socket problem? - pdebug("length error on read %d errno = %d\n", readbuflen, errno); - reallyClose(); - close = false; - } - else { - tmp = static_cast(new char[buflen+readbuflen]); - memset(tmp, 0, buflen+readbuflen); - - if (buf) { - memcpy(tmp, buf, buflen); - } - memcpy(tmp+buflen, readbuf, readbuflen); - if (buf) { - delete [] buf; - } - buflen = buflen + readbuflen; - bufptr = tmp + (bufptr - buf); - buf = tmp; - break; - } - } - } - - void RequestMore() { - HandleScope scope; - - char* data; - int sl = strlen(cursor->ns)+1; - mongo_message * mm = mongo_message_create(16 /*header*/ - +4 /*ZERO*/ - +sl - +4 /*numToReturn*/ - +8 /*cursorID*/ - , 0, 0, mongo_op_get_more); - data = &mm->data; - data = mongo_data_append32(data, &zero); - data = mongo_data_append(data, cursor->ns, sl); - data = mongo_data_append32(data, &zero); - data = mongo_data_append64(data, &(cursor->mm->fields.cursorID)); - - BufferMessageToSend(mm); - pdebug("asking for more data\n"); - } - - - bool Find(const char *ns, bson *query=0, bson *query_fields=0, - int nToReturn=0, int nToSkip=0, int options=0) { - StartReadWatcher(); - assert(!close); - cursor = static_cast( - bson_malloc(sizeof(mongo_cursor))); - limit = nToReturn; - skip = nToSkip; - - int sl = strlen(ns)+1; - cursor->ns = static_cast(bson_malloc(sl)); - - memcpy(static_cast(const_cast(cursor->ns)), ns, sl); - cursor->conn = conn; - - char * data; - mongo_message * mm = mongo_message_create( 16 + /* header */ - 4 + /* options */ - sl + /* ns */ - 4 + 4 + /* skip,return */ - bson_size( query ) + - bson_size( query_fields ) , - 0 , 0 , mongo_op_query ); - - data = &mm->data; - data = mongo_data_append32(data, &options); - data = mongo_data_append(data, ns, strlen(ns)+ 1); - data = mongo_data_append32(data, &nToSkip); - data = mongo_data_append32(data, &nToReturn); - data = mongo_data_append(data, query->data, bson_size(query)); - if (query_fields) - data = mongo_data_append(data, query_fields->data, bson_size(query_fields)); - - bson_fatal_msg((data == ((char*)mm) + mm->head.len), "query building fail!"); - - BufferMessageToSend(mm); - } - - void Insert(const char *ns, bson obj) { - char * data; - mongo_message *mm = mongo_message_create( 16 /* header */ - + 4 /* ZERO */ - + strlen(ns) - + 1 + bson_size(&obj) - , 0, 0, mongo_op_insert); - - data = &mm->data; - data = mongo_data_append32(data, &zero); - data = mongo_data_append(data, ns, strlen(ns) + 1); - data = mongo_data_append(data, obj.data, bson_size(&obj)); - - BufferMessageToSend(mm); - } - - void Remove(const char *ns, bson cond) { - char * data; - mongo_message * mm = mongo_message_create( 16 /* header */ - + 4 /* ZERO */ - + strlen(ns) + 1 - + 4 /* ZERO */ - + bson_size(&cond) - , 0 , 0 , mongo_op_delete ); - - data = &mm->data; - data = mongo_data_append32(data, &zero); - data = mongo_data_append(data, ns, strlen(ns) + 1); - data = mongo_data_append32(data, &zero); - data = mongo_data_append(data, cond.data, bson_size(&cond)); - BufferMessageToSend(mm); - } - - void Update(const char *ns, bson cond, bson op, int flags=0) { - char * data; - mongo_message * mm = mongo_message_create( 16 /* header */ - + 4 /* ZERO */ - + strlen(ns) + 1 - + 4 /* flags */ - + bson_size(&cond) - + bson_size(&op) - , 0 , 0 , mongo_op_update ); - - data = &mm->data; - data = mongo_data_append32(data, &zero); - data = mongo_data_append(data, ns, strlen(ns) + 1); - data = mongo_data_append32(data, &flags); - data = mongo_data_append(data, cond.data, bson_size(&cond)); - data = mongo_data_append(data, op.data, bson_size(&op)); - - BufferMessageToSend(mm); - } - -protected: - - static Handle - New(const Arguments& args) { - HandleScope scope; - - // XXX where should this be deleted? - Connection *connection = new Connection(); - connection->Wrap(args.This()); - return args.This(); - } - - ~Connection() { - } - - Connection() : node::EventEmitter() { - HandleScope scope; - results = Persistent::New(Array::New()); - - close = false; - cursor = false; - get_more = false; - buflen = writebuflen = 0; - buf = bufptr = writebuf = writebufptr = NULL; - expectedlen = 0; - limit = 0; - skip = 0; - - ev_init(&read_watcher, io_event); - read_watcher.data = this; - ev_init(&write_watcher, io_event); - write_watcher.data = this; - ev_init(&connect_watcher, connect_event); - connect_watcher.data = this; - } - - static Handle - Connect(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); - - // XXX check args.Length - String::Utf8Value host(args[0]->ToString()); - connection->Connect(*host, args[1]->Int32Value()); - - return Undefined(); - } - - static Handle - Close(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); - - connection->Close(); - - return Undefined(); - } - - static Handle - Find(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); - - // TODO assert ns != undefined (args.Length > 0) - String::Utf8Value ns(args[0]->ToString()); - - bson query_bson; - bson query_fields_bson; - int nToReturn(0), nToSkip(0); - - if (args.Length() > 1 && !args[1]->IsUndefined()) { - Local query(args[1]->ToObject()); - query_bson = encodeObject(query); - } - else { - bson_empty(&query_bson); - } - - if (args.Length() > 2 && !args[2]->IsUndefined()) { - Local query_fields(args[2]->ToObject()); - query_fields_bson = encodeObject(query_fields); - } - else { - bson_empty(&query_fields_bson); - } - - if (args.Length() > 3 && !args[3]->IsUndefined()) { - nToReturn = args[3]->Int32Value(); - } - - if (args.Length() > 4 && !args[4]->IsUndefined()) { - nToSkip = args[4]->Int32Value(); - } - - connection->Find(*ns, &query_bson, &query_fields_bson, nToReturn, nToSkip); - - bson_destroy(&query_bson); - bson_destroy(&query_fields_bson); - return Undefined(); - } - - static Handle - Insert(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); - - String::Utf8Value ns(args[0]->ToString()); - // TODO assert ns != undefined (args.Length > 0) - - bson obj; - - // XXX check args > 1 - Local query(args[1]->ToObject()); - obj = encodeObject(query); - - connection->Insert(*ns, obj); - - bson_destroy(&obj); - return Undefined(); - } - - static Handle - Update(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); - - String::Utf8Value ns(args[0]->ToString()); - // TODO assert ns != undefined (args.Length > 0) - - bson cond; - bson obj; - int flags = 0; - - if (args.Length() > 1 && !args[1]->IsUndefined()) { - Local query(args[1]->ToObject()); - cond = encodeObject(query); - } - else { - bson_empty(&cond); - } - - if (args.Length() > 2 && !args[2]->IsUndefined()) { - Local query(args[2]->ToObject()); - obj = encodeObject(query); - } - else { - bson_empty(&obj); - } - - if (args.Length() > 3 && !args[3]->IsUndefined()) { - Local jsflags = args[3]->ToInteger(); - flags = jsflags->Value(); - } - - connection->Update(*ns, cond, obj, flags); - - bson_destroy(&cond); - bson_destroy(&obj); - return Undefined(); - } - - static Handle - Remove(const Arguments &args) { - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.This()); - if (!args[0]->IsString()) { - return ThrowException( - Exception::Error( - String::New("ns must be specified"))); - } - String::Utf8Value ns(args[0]->ToString()); - - - bson cond; - if (args.Length() > 1 && args[1]->IsObject()) { - Local query(args[1]->ToObject()); - cond = encodeObject(query); - } - else if (args.Length() > 1 && args[1]->IsUndefined()) { - bson_empty(&cond); - } - else if (args.Length() > 1 && !args[1]->IsObject()) { - return ThrowException( - Exception::Error( - String::New("Condition must be an object"))); - } - - connection->Remove(*ns, cond); - - bson_destroy(&cond); - return Undefined(); - } - - void Event(EV_P_ ev_io *w, int revents) { - if (!conn->connected) { - StopReadWatcher(); - StopWriteWatcher(); - reallyClose(); - close = false; - return; - }; - pdebug("event %d %d\n", conn->connected, close ? 1 : 0); - if (revents & EV_READ) { - pdebug("!!! got a read event\n"); - StopReadWatcher(); - ConsumeInput(); - CheckBuffer(); - } - if (revents & EV_WRITE) { - pdebug("!!! got a write event\n"); - pdebug("!!! writebuflen = %d\n", writebuflen); - if (writebuflen) { - pdebug("things to write\n"); - WriteSendBuffer(); - } - else { - StopWriteWatcher(); - } - - if (get_more) { - StopWriteWatcher(); - RequestMore(); - StartReadWatcher(); - get_more = false; - } - else { - Emit(String::New("ready"), 0, NULL); - } - } - if (close) { - pdebug("!!! really closing %d\n", close); - reallyClose(); - close = false; - } - if (revents & EV_ERROR) { - pdebug("!!! got an error event\n"); - } - } - -private: - - static void - connect_event(EV_P_ ev_io *w, int revents) { - pdebug("!!! got a connect event\n"); - Connection *connection = static_cast(w->data); - connection->Connected(); - } - - static void - io_event (EV_P_ ev_io *w, int revents) { - Connection *connection = static_cast(w->data); - connection->Event(w, revents); - } - - mongo_connection conn[1]; - - // states - bool get_more; - bool close; - - mongo_cursor *cursor; - int limit; - int skip; - - Persistent results; - - char *buf; - char *bufptr; - int buflen; - int expectedlen; - mongo_header reply_head; - mongo_reply_fields reply_fields; - - - char *writebuf; - char *writebufptr; - int writebuflen; - - ev_io read_watcher; - ev_io write_watcher; - ev_io connect_watcher; -}; - -extern "C" void -init (Handle target) { - HandleScope scope; - - target->Set( - String::New("encode"), - FunctionTemplate::New(encode)->GetFunction()); - target->Set( - String::New("decode"), - FunctionTemplate::New(decode)->GetFunction()); - ObjectID::Initialize(target); - Connection::Initialize(target); -} From 3ae3265a0bac73279be1e7c7223835ae1b2a5bd4 Mon Sep 17 00:00:00 2001 From: wink Date: Fri, 24 Sep 2010 14:59:17 -0500 Subject: [PATCH 11/26] rewrite of the binding backend --- src/Connection.cpp | 382 ++++++++++++++++++++++++++++++++++++++++ src/Connection.h | 57 ++++++ src/MongoConnection.cpp | 322 +++++++++++++++++++++++++++++++++ src/MongoConnection.h | 89 ++++++++++ src/bson.cpp | 370 ++++++++++++++++++++++++++++++++++++++ src/bson.h | 41 +++++ 6 files changed, 1261 insertions(+) create mode 100644 src/Connection.cpp create mode 100644 src/Connection.h create mode 100644 src/MongoConnection.cpp create mode 100644 src/MongoConnection.h create mode 100644 src/bson.cpp create mode 100644 src/bson.h diff --git a/src/Connection.cpp b/src/Connection.cpp new file mode 100644 index 0000000..f8aea74 --- /dev/null +++ b/src/Connection.cpp @@ -0,0 +1,382 @@ +#include +#include "Connection.h" + +extern "C" { +#define MONGO_HAVE_STDINT +#include +#include +#include +} + + +using namespace v8; +using namespace node; + +void Connection::Initialize(Handle target) +{ + HandleScope scope; + + Local t = FunctionTemplate::New(Connection::New); + + t->Inherit(node::EventEmitter::constructor_template); + t->InstanceTemplate()->SetInternalFieldCount(1); + + NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); + NODE_SET_PROTOTYPE_METHOD(t, "close", Close); + NODE_SET_PROTOTYPE_METHOD(t, "find", Find); + NODE_SET_PROTOTYPE_METHOD(t, "insert", Insert); + NODE_SET_PROTOTYPE_METHOD(t, "update", Update); + NODE_SET_PROTOTYPE_METHOD(t, "remove", Remove); + target->Set(String::NewSymbol("Connection"), t->GetFunction()); + scope.Close(Undefined()); +} + +Handle Connection::New(const Arguments &args) +{ + HandleScope scope; + Connection *mongo = new Connection(); + mongo->Wrap(args.This()); + mongo->m_results= Persistent::New(Array::New()); + mongo->m_gettingMore = false; + return args.This(); +} + +Handle Connection::Connect(const Arguments &args) +{ + Connection *connection = ObjectWrap::Unwrap(args.This()); + HandleScope scope; + + String::Utf8Value conninfo(args[0]->ToString()); + //Number::New(args[1]->ToNumber()); + connection->connect(*conninfo, 27017); + connection->Ref(); + return scope.Close(Undefined()); +} + +void Connection::onConnected() +{ + pdebug("got connection!\n"); + Emit(String::New("connection"), 0, NULL); +} + +void Connection::onReady() +{ + Emit(String::New("ready"), 0, NULL); +} + +void Connection::onClose() +{ + Emit(String::New("close"), 0, NULL); +} + +void Connection::onResults(MongoMessage *message) +{ + HandleScope scope; + mongo_reply *reply = reinterpret_cast(message->messageBuf); + mongo_cursor cursor; + + cursor.mm = reply; + cursor.current.data = NULL; + + if(!m_gettingMore) + { + m_results.Dispose(); + m_results = Persistent::New(Array::New()); + } + + int start = m_results->Length(); + pdebug("starting at rec: %d adding %d for a total of %d\n", m_results->Length(), reply->fields.num, start+reply->fields.num); + + for(int i = m_results->Length(); i < reply->fields.num+start; i++) + { + mongo_cursor_next(&cursor); + Local val = decodeObjectStr(cursor.current.data); + m_results->Set(Integer::New(i), val); + m_recCount = i+1; + + if(m_recLimit > 0 && m_recCount == m_recLimit) // got all we were asked for + { + pdebug("hit limit, bailing %d\n", m_recCount); + + if(reply->fields.cursorID) // still more data in cursor even though we dont want it, release it + { + pdebug("killing cursor\n"); + mongo_message * mm = mongo_message_create(16 /*header*/ + +4 /*ZERO*/ + +4 /*numCursors*/ + +8 /*cursorID*/ + , 0, 0, mongo_op_kill_cursors); + char* data = &mm->data; + data = mongo_data_append32(data, &zero); + data = mongo_data_append32(data, &one); + data = mongo_data_append64(data, &reply->fields.cursorID); + WriteMessage(mm); + } + + reply->fields.cursorID = 0; // easy way to abort gathering more data + break; + } + } + + if(reply->fields.cursorID && m_recCount < m_recLimit) // get more data + { + pdebug("need to get more data\n"); + m_gettingMore = true; + unsigned int count = m_recLimit-m_recCount; + mongo_connection* conn = cursor.conn; + char* data; + int sl = strlen(m_cursor->ns)+1; + mongo_message * mm = mongo_message_create(16 /*header*/ + +4 /*ZERO*/ + +sl + +4 /*numToReturn*/ + +8 /*cursorID*/ + , 0, 0, mongo_op_get_more); + data = &mm->data; + data = mongo_data_append32(data, &zero); + data = mongo_data_append(data, m_cursor->ns, sl); + data = mongo_data_append32(data, &count); + data = mongo_data_append64(data, &reply->fields.cursorID); + WriteMessage(mm); + scope.Close(Undefined()); + return; + } + + + Emit(String::New("result"), 1, reinterpret_cast *>(&m_results)); + + // clean up and reset state + m_results.Clear(); + m_results.Dispose(); + m_results = Persistent::New(Array::New()); + m_gettingMore = false; + free((void *)m_cursor->ns); + free(m_cursor); + scope.Close(Undefined()); +} + +Handle Connection::Insert(const Arguments &args) +{ + HandleScope scope; + Connection *connection = ObjectWrap::Unwrap(args.This()); + + String::Utf8Value ns(args[0]->ToString()); + // TODO assert ns != undefined (args.Length > 0) + bson obj; + + // XXX check args > 1 + Local query(args[1]->ToObject()); + obj = encodeObject(query); + + char * data; + mongo_message *mm = mongo_message_create( 16 /* header */ + + 4 /* ZERO */ + + strlen(*ns) + + 1 + bson_size(&obj) + , 0, 0, mongo_op_insert); + + data = &mm->data; + data = mongo_data_append32(data, &zero); + data = mongo_data_append(data, *ns, strlen(*ns) + 1); + data = mongo_data_append(data, obj.data, bson_size(&obj)); + + connection->WriteMessage(mm); + + bson_destroy(&obj); + return scope.Close(Undefined()); +} + +Handle Connection::Update(const Arguments &args) { + HandleScope scope; + Connection *connection = ObjectWrap::Unwrap(args.This()); + + String::Utf8Value ns(args[0]->ToString()); + + bson cond; + bson obj; + int flags = 0; + + if (args.Length() > 1 && !args[1]->IsUndefined()) { + Local query(args[1]->ToObject()); + cond = encodeObject(query); + } + else { + bson_empty(&cond); + } + + if (args.Length() > 2 && !args[2]->IsUndefined()) { + Local query(args[2]->ToObject()); + obj = encodeObject(query); + } + else { + bson_empty(&obj); + } + + if (args.Length() > 3 && !args[3]->IsUndefined()) { + Local jsflags = args[3]->ToInteger(); + flags = jsflags->Value(); + } + + //connection->Update(*ns, cond, obj, flags); + char * data; + mongo_message * mm = mongo_message_create( 16 /* header */ + + 4 /* ZERO */ + + strlen(*ns) + 1 + + 4 /* flags */ + + bson_size(&cond) + + bson_size(&obj) + , 0 , 0 , mongo_op_update ); + + data = &mm->data; + data = mongo_data_append32(data, &zero); + data = mongo_data_append(data, *ns, strlen(*ns) + 1); + data = mongo_data_append32(data, &flags); + data = mongo_data_append(data, cond.data, bson_size(&cond)); + data = mongo_data_append(data, obj.data, bson_size(&obj)); + connection->WriteMessage(mm); + + + bson_destroy(&cond); + bson_destroy(&obj); + return scope.Close(Undefined()); +} +Handle Connection::Close(const Arguments &args) +{ + HandleScope scope; + Connection *connection = ObjectWrap::Unwrap(args.This()); + + connection->disconnect(); + + return scope.Close(Undefined()); + +} +Handle Connection::Remove(const Arguments &args) { + HandleScope scope; + Connection *connection = ObjectWrap::Unwrap(args.This()); + if (!args[0]->IsString()) { + return ThrowException(Exception::Error(String::New("ns must be specified"))); + } + String::Utf8Value ns(args[0]->ToString()); + + + bson cond; + if (args.Length() > 1 && args[1]->IsObject()) { + Local query(args[1]->ToObject()); + cond = encodeObject(query); + } + else if (args.Length() > 1 && args[1]->IsUndefined()) { + bson_empty(&cond); + } + else if (args.Length() > 1 && !args[1]->IsObject()) { + return ThrowException(Exception::Error(String::New("Condition must be an object"))); + } + + //connection->Remove(*ns, cond); + char * data; + mongo_message * mm = mongo_message_create( 16 /* header */ + + 4 /* ZERO */ + + strlen(*ns) + 1 + + 4 /* ZERO */ + + bson_size(&cond) + , 0 , 0 , mongo_op_delete ); + + data = &mm->data; + data = mongo_data_append32(data, &zero); + data = mongo_data_append(data, *ns, strlen(*ns) + 1); + data = mongo_data_append32(data, &zero); + data = mongo_data_append(data, cond.data, bson_size(&cond)); + connection->WriteMessage(mm); + + bson_destroy(&cond); + return scope.Close(Undefined()); +} + + + +Handle Connection::Find(const Arguments &args) +{ + HandleScope scope; + Connection *connection = ObjectWrap::Unwrap(args.This()); + + // TODO assert ns != undefined (args.Length > 0) + String::Utf8Value ns(args[0]->ToString()); + + bson query_bson; + bson query_fields_bson; + int nToReturn(0), nToSkip(0); + + if (args.Length() > 1 && !args[1]->IsUndefined()) { + Local query(args[1]->ToObject()); + query_bson = encodeObject(query); + } + else { + bson_empty(&query_bson); + } + + if (args.Length() > 2 && !args[2]->IsUndefined()) { + Local query_fields(args[2]->ToObject()); + query_fields_bson = encodeObject(query_fields); + } + else { + bson_empty(&query_fields_bson); + } + + if (args.Length() > 3 && !args[3]->IsUndefined()) { + nToReturn = args[3]->Int32Value(); + } + + if (args.Length() > 4 && !args[4]->IsUndefined()) { + nToSkip = args[4]->Int32Value(); + } + + pdebug("find on: %s\n", *ns); + + mongo_cursor *cursor = static_cast( + bson_malloc(sizeof(mongo_cursor))); + + if(nToReturn > 0) // track limit so cursor can advance until necessary + connection->m_recLimit = nToReturn; + + int sl = strlen(*ns)+1; + cursor->ns = static_cast(bson_malloc(sl)); + + memcpy(static_cast(const_cast(cursor->ns)), *ns, sl); + cursor->conn = connection->m_connection; + + char * data; + mongo_message * mm = mongo_message_create( 16 + /* header */ + 4 + /* options */ + sl + /* ns */ + 4 + 4 + /* skip,return */ + bson_size( &query_bson ) + + bson_size( &query_fields_bson ) , + 0 , 0 , mongo_op_query ); + int options = 0; + data = &mm->data; + data = mongo_data_append32(data, &options); + data = mongo_data_append(data, *ns, strlen(*ns)+ 1); + data = mongo_data_append32(data, &nToSkip); + data = mongo_data_append32(data, &nToReturn); + data = mongo_data_append(data, query_bson.data, bson_size(&query_bson)); + //if (query_fields_bson) + data = mongo_data_append(data, query_fields_bson.data, bson_size(&query_fields_bson)); + + bson_fatal_msg((data == ((char*)mm) + mm->head.len), "query building fail!"); + + + connection->WriteMessage(mm); + connection->m_cursor = cursor; + + bson_destroy(&query_bson); + bson_destroy(&query_fields_bson); + + return scope.Close(Undefined()); +} + +extern "C" +void init( Handle target ) { + HandleScope scope; + Connection::Initialize( target ); + ObjectID::Initialize(target); + scope.Close(Undefined()); +} diff --git a/src/Connection.h b/src/Connection.h new file mode 100644 index 0000000..d0a2137 --- /dev/null +++ b/src/Connection.h @@ -0,0 +1,57 @@ +#ifndef _mongo_h_ +#define _mongo_h_ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "MongoConnection.h" + +extern "C" { +#define MONGO_HAVE_STDINT +#include +#include +#include +} +#include "bson.h" + +using namespace v8; + +class Connection : public node::EventEmitter, MongoConnection +{ + public: + // v8 interfaces + static void Initialize(Handle target); + static Handle New(const Arguments &args); + static Handle Connect(const Arguments &args); + static Handle Close(const Arguments &args); + static Handle Find(const Arguments &args); + static Handle Insert(const Arguments &args); + static Handle Update(const Arguments &args); + static Handle Remove(const Arguments &args); + + protected: + Connection() : EventEmitter() { }; + + // inherited events + void onConnected(); + void onReady(); + void onResults(MongoMessage *message); + void onClose(); + + Persistent m_results; + //Handle results; + bool m_gettingMore; + unsigned int m_recCount; + unsigned int m_recLimit; + mongo_cursor *m_cursor; +}; +#endif diff --git a/src/MongoConnection.cpp b/src/MongoConnection.cpp new file mode 100644 index 0000000..60e5154 --- /dev/null +++ b/src/MongoConnection.cpp @@ -0,0 +1,322 @@ +#include "MongoConnection.h" + +#define chunksize 8092 + +MongoConnection::MongoConnection() +{ + m_connected = false; + + ev_init(&read_watcher, IOEventProxy); + read_watcher.data = this; // stash this reference for use in static callback + ev_init(&write_watcher, IOEventProxy); + write_watcher.data = this; + ev_init(&connect_watcher, ConnectEventProxy); + connect_watcher.data = this; + + memset(&m_inboundBuffer, 0, sizeof(MongoMessage)); + memset(&m_outboundBuffer, 0, sizeof(MongoMessage)); + pdebug("created\n"); +} + +MongoConnection::~MongoConnection() +{ + disconnect(); + // clean up m_connection + // clean up m_currentMessage +} + +void MongoConnection::connect(const char *host, const int32_t port) +{ + pdebug("connecting\n"); + mongo_connection_options opts; + memcpy(opts.host, host, strlen(host)+1); + opts.host[strlen(host)] = '\0'; + opts.port = port; + + m_connection->left_opts = (mongo_connection_options *)bson_malloc(sizeof(mongo_connection_options)); + m_connection->right_opts = NULL; + + if ( strlen(opts.host) > 0 ) // ghetto + { + memcpy( m_connection->left_opts , &opts , sizeof( mongo_connection_options ) ); + } + else + { + strcpy( m_connection->left_opts->host , "127.0.0.1" ); + m_connection->left_opts->port = 27017; + } + + m_connection->sock = 0; + m_connection->connected = 0; + + memset(m_connection->sa.sin_zero, 0, sizeof(m_connection->sa.sin_zero)); + m_connection->sa.sin_family = AF_INET; + m_connection->sa.sin_port = htons(m_connection->left_opts->port); + m_connection->sa.sin_addr.s_addr = inet_addr(m_connection->left_opts->host); + m_connection->addressSize = sizeof(m_connection->sa); + + m_connection->sock = socket( AF_INET, SOCK_STREAM, 0 ); + + if(m_connection->sock <= 0) + { + // onerror + } + + int sockflags = fcntl(m_connection->sock, F_GETFL, 0); + fcntl(m_connection->sock, F_SETFL, sockflags | O_NONBLOCK); + int res = ::connect(m_connection->sock, (struct sockaddr*) &m_connection->sa, m_connection->addressSize); + + ev_io_set(&connect_watcher, m_connection->sock, EV_WRITE); + ConnectWatcher(true); + + ev_io_set(&read_watcher, m_connection->sock, EV_READ); + ev_io_set(&write_watcher, m_connection->sock, EV_WRITE); + ReadWatcher(true); + //WriteWatcher(true); +} + +void MongoConnection::disconnect() +{ + ReadWatcher(false); + WriteWatcher(false); + free(m_outboundBuffer.messageBuf); + free(m_inboundBuffer.messageBuf); + memset(&m_outboundBuffer, 0, sizeof(MongoMessage)); + memset(&m_inboundBuffer, 0, sizeof(MongoMessage)); + mongo_destroy(m_connection); + onClose(); +} + +bool MongoConnection::isConnected() +{ + return m_connected; +} + +void MongoConnection::onConnected() +{ + pdebug("!!!got my connection!\n"); +} + +void MongoConnection::onClose() +{ + pdebug("!!! closing connection\n"); +} + +void MongoConnection::onResults(MongoMessage *message) +{ + pdebug("!!!results\n"); +} + +void MongoConnection::onReady() +{ + pdebug("!!!ready!\n"); +} +void MongoConnection::ReadWatcher(bool state) +{ + if(state) + ev_io_start(EV_DEFAULT_ &read_watcher); + else + ev_io_stop(EV_DEFAULT_ &read_watcher); +} + +void MongoConnection::WriteWatcher(bool state) +{ + if(state) + ev_io_start(EV_DEFAULT_ &write_watcher); + else + ev_io_stop(EV_DEFAULT_ &write_watcher); +} + + +void MongoConnection::ConnectWatcher(bool state) +{ + if(state) + ev_io_start(EV_DEFAULT_ &connect_watcher); + else + ev_io_stop(EV_DEFAULT_ &connect_watcher); +} + +void MongoConnection::ConnectEvent(int revents) +{ + ConnectWatcher(false); + int flag = 1; + setsockopt( m_connection->sock, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag) ); // disable nagle, not sure if needed + + m_connection->connected = 1; + onConnected(); + onReady(); + ReadWatcher(true); + // WriteWatcher(true); +} + +void MongoConnection::ReadData() +{ + char readbuf[chunksize]; + + while(true) + { + // read the largest chunk we can + int len = read(m_connection->sock, readbuf, chunksize); + if(len == -1) + { + if(errno == EAGAIN) + { + //pdebug("EGAIN on read, try later\n"); + return; + } + else + { + // TODO disconnect and clean up + disconnect(); + return; + } + } + + pdebug("realloc: %p\t%d bytes -- %d + %d\n", m_inboundBuffer.messageBuf, m_inboundBuffer.messageLen+len, m_inboundBuffer.messageLen, len); + m_inboundBuffer.messageBuf = (char *) realloc(m_inboundBuffer.messageBuf, m_inboundBuffer.bufferLen+len); + m_inboundBuffer.index = m_inboundBuffer.messageBuf + m_inboundBuffer.bufferLen; + memcpy(m_inboundBuffer.index, readbuf, len); + + if(!m_inboundBuffer.messageLen && len > sizeof(mongo_header)+sizeof(mongo_reply_fields)) // set up header struct so we can watch for completed messages + { + memcpy(&m_inboundBuffer.messageHeader, m_inboundBuffer.index, sizeof(mongo_header)); + m_inboundBuffer.index += sizeof(mongo_header); + memcpy(&m_inboundBuffer.messageReply, m_inboundBuffer.index, sizeof(mongo_reply_fields)); + m_inboundBuffer.index += sizeof(mongo_reply_fields); + bson_little_endian32(&m_inboundBuffer.messageLen, &m_inboundBuffer.messageHeader.len); + pdebug("initial read, expecting %d bytes\n", m_inboundBuffer.messageLen); + } + + //pdebug("read: %d (%d) expected: %d left: %d\n", len, m_inboundBuffer.bufferLen, m_inboundBuffer.messageLen, m_inboundBuffer.messageLen - m_inboundBuffer.bufferLen); + m_inboundBuffer.bufferLen += len; + m_inboundBuffer.index += len; // hmm. + + if(m_inboundBuffer.bufferLen == m_inboundBuffer.messageLen) // complete message! + { + // get appropriate data structures + mongo_reply *reply = reinterpret_cast(m_inboundBuffer.messageBuf); + + + onResults(&m_inboundBuffer); + free(m_inboundBuffer.messageBuf); + memset(&m_inboundBuffer, 0, sizeof(MongoMessage)); + } + } + +} + +void MongoConnection::WriteMessage(mongo_message *message) +{ + mongo_header head; + bson_little_endian32(&head.len, &message->head.len); + bson_little_endian32(&head.id, &message->head.id); + bson_little_endian32(&head.responseTo, &message->head.responseTo); + bson_little_endian32(&head.op, &message->head.op); + + int len = message->head.len; + pdebug("adding %d bytes to outbound buffer (%d)\n", len, m_outboundBuffer.messageLen); + + int currentOffset = m_outboundBuffer.index - m_outboundBuffer.messageBuf; + pdebug("realloc: %p\t%d bytes\n", m_outboundBuffer.messageBuf, m_outboundBuffer.messageLen+len); + m_outboundBuffer.messageBuf = (char *) realloc(m_outboundBuffer.messageBuf, m_outboundBuffer.messageLen+len); + + m_outboundBuffer.index = m_outboundBuffer.messageBuf + currentOffset; // adjust write ptr in case of relocation + + // copy new data to the end of the buffer + //memcpy(m_outboundBuffer.index, &head, sizeof(head)); + //memcpy(m_outboundBuffer.index+sizeof(head), &message->data, len-sizeof(head)); + memcpy(m_outboundBuffer.messageBuf+m_outboundBuffer.messageLen, &head, sizeof(head)); + memcpy(m_outboundBuffer.messageBuf+m_outboundBuffer.messageLen+sizeof(head), &message->data, len-sizeof(head)); + + m_outboundBuffer.messageLen += len; + + free(message); // clean up message since it's been queued + WriteWatcher(true); +} + +void MongoConnection::WriteData() +{ + WriteWatcher(false); + pdebug("%d bytes to left write\n", m_outboundBuffer.messageLen); + + int bytes = write(m_connection->sock, m_outboundBuffer.index, m_outboundBuffer.messageLen); + pdebug("wrote: %d bytes [%d - %d]\n", bytes, m_outboundBuffer.index-m_outboundBuffer.messageBuf, m_outboundBuffer.index-m_outboundBuffer.messageBuf+bytes); + if(bytes == -1) + { + if(errno == EAGAIN) + { + WriteWatcher(true); + return; + } + // TODO disconnect me + } + + m_outboundBuffer.index += bytes; + m_outboundBuffer.messageLen -= bytes; + + if(m_outboundBuffer.messageLen == 0) // done sending, clean up + { + pdebug("done!\n"); + free(m_outboundBuffer.messageBuf); + memset(&m_outboundBuffer, 0 , sizeof(MongoMessage)); + WriteWatcher(false); + } + else + WriteWatcher(true); // not done writing yet +} + +void MongoConnection::IOEvent(int revents) +{ + if(!m_connection->connected) + { + ReadWatcher(false); + WriteWatcher(false); + // TODO + //disconnect(); + //onClose(); + } + //pdebug("event %d %d\n", m_connection->connected, revents); + + if(revents & EV_READ) + { + //pdebug("read event\n"); + ReadData(); + // TODO + // read data + } + + if(revents & EV_WRITE) + { + pdebug("write event\n"); + if(m_outboundBuffer.messageBuf) + WriteData(); + else + { + WriteWatcher(false); + ReadWatcher(true); + onReady(); + } + // TODO + // write data + } + + if(revents & EV_ERROR) + { + pdebug("error event\n"); + // TODO + // handle error, probably tear down and emit onClose + } +} + +void MongoConnection::ConnectEventProxy(EV_P_ ev_io *w, int revents) +{ + pdebug("got a connect event\n"); + MongoConnection *connection = static_cast(w->data); + connection->ConnectEvent(revents); +} + +void MongoConnection::IOEventProxy(EV_P_ ev_io *w, int revents) +{ + MongoConnection *connection = static_cast(w->data); + connection->IOEvent(revents); +} diff --git a/src/MongoConnection.h b/src/MongoConnection.h new file mode 100644 index 0000000..7517dd1 --- /dev/null +++ b/src/MongoConnection.h @@ -0,0 +1,89 @@ +#ifndef _mongoconnection_h_ +#define _mongoconnection_h_ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +extern "C" { +#define MONGO_HAVE_STDINT +#include +#include +#include +} +#include "bson.h" + +#define DEBUGMODE 0 +#define pdebug(...) do{if(DEBUGMODE)printf(__VA_ARGS__);}while(0) + + +typedef struct { + char *messageBuf; + mongo_header messageHeader; + mongo_reply_fields messageReply; + unsigned int messageLen; + unsigned int bufferLen; + char *index; +} MongoMessage; + +class MongoConnection +{ +public: + MongoConnection(); + ~MongoConnection(); + + void connect(const char *host, const int32_t port); + void disconnect(); + + bool isConnected(); + + // (TODO: soon to be) virtual functions for inheriting classes to implement + virtual void onConnected(); + virtual void onResults(MongoMessage *message); + virtual void onReady(); + virtual void onClose(); + + // IO + void WriteMessage(mongo_message *message); + + mongo_connection m_connection[1]; + private: + bool m_connected; + + // ev watchers + ev_io read_watcher; + ev_io write_watcher; + ev_io connect_watcher; + + void ReadWatcher(bool state); + void WriteWatcher(bool state); + void ConnectWatcher(bool state); + + // ev interfaces + static void ConnectEventProxy(EV_P_ ev_io *w, int revents); + static void IOEventProxy(EV_P_ ev_io *w, int revents); + + // instance event handlers + void IOEvent(int revents); + void ConnectEvent(int revents); + + + // current buffers + MongoMessage m_inboundBuffer; + MongoMessage m_outboundBuffer; + + // internal IO + void WriteData(); + void ReadData(); +}; + +#endif diff --git a/src/bson.cpp b/src/bson.cpp new file mode 100644 index 0000000..8cea779 --- /dev/null +++ b/src/bson.cpp @@ -0,0 +1,370 @@ +#include +#include +#include +#include +extern "C" { + #define MONGO_HAVE_STDINT + #include +} + +#include "bson.h" + +using namespace std; +using namespace v8; + +Persistent ObjectID::constructor_template; + +void ObjectID::Initialize(Handle target) { + HandleScope scope; + + Local t = FunctionTemplate::New(ObjectID::New); + constructor_template = Persistent::New(t); + constructor_template->InstanceTemplate()->SetInternalFieldCount(1); + constructor_template->SetClassName(String::NewSymbol("ObjectID")); + + NODE_SET_PROTOTYPE_METHOD(ObjectID::constructor_template, "toString", ObjectID::ToString); + + target->Set(String::NewSymbol("ObjectID"), constructor_template->GetFunction()); +} + +Handle ObjectID::New(const Arguments &args) { + HandleScope scope; + + if (args.Length() < 1 + || !args[0]->IsString() + || (args[0]->IsString() + && args[0]->ToString()->Length() != 24)) { + return ThrowException(Exception::Error( + String::New("Argument must be 24 character hex string"))); + } + + String::Utf8Value hex(args[0]->ToString()); + + // XXX where should this be deleted? + ObjectID *o = new ObjectID((const char *) *hex); + o->Wrap(args.This()); + return args.This(); +} + +void ObjectID::str(char *str) { + bson_oid_to_string(&oid, str); +} + +Handle +ObjectID::ToString(const Arguments &args) { + ObjectID *o = ObjectWrap::Unwrap(args.This()); + + HandleScope scope; + char hex[25]; + o->str(hex); + return String::New(hex); +} + +const char * +ToCString(const String::Utf8Value& value) { + return *value ? *value : ""; +} + +inline void +encodeString(bson_buffer *bb, const char *name, const Local element) { + String::Utf8Value v(element); + const char *value(ToCString(v)); + bson_append_string(bb, name, value); +} + +inline void +encodeNumber(bson_buffer *bb, const char *name, const Local element) { + double value(element->NumberValue()); + bson_append_double(bb, name, value); +} + +inline void +encodeInteger(bson_buffer *bb, const char *name, const Local element) { + int value(element->NumberValue()); + bson_append_int(bb, name, value); +} + +inline void +encodeBoolean(bson_buffer *bb, const char *name, const Local element) { + bool value(element->IsTrue()); + bson_append_bool(bb, name, value); +} + +void +encodeObjectID(bson_buffer *bb, const char *name, const Local element) { + // get at the delicious wrapped object centre + Local obj = element->ToObject(); + assert(!obj.IsEmpty()); + assert(obj->InternalFieldCount() > 0); + ObjectID *o = static_cast(Handle::Cast( + obj->GetInternalField(0))->Value()); + bson_oid_t oid; + char oid_hex[25]; + o->str(oid_hex); + bson_oid_from_string(&oid, oid_hex); + bson_append_oid(bb, name, &oid); +} + +void +encodeArray(bson_buffer *bb, const char *name, const Local element) { + Local a = Array::Cast(*element); + bson_buffer *arr = bson_append_start_array(bb, name); + + for (int i = 0, l=a->Length(); i < l; i++) { + Local val = a->Get(Number::New(i)); + stringstream keybuf; + string keyval; + keybuf << i << endl; + keybuf >> keyval; + + if (val->IsString()) { + encodeString(arr, keyval.c_str(), val); + } + else if (val->IsInt32()) { + encodeInteger(arr, keyval.c_str(), val); + } + else if (val->IsNumber()) { + encodeNumber(arr, keyval.c_str(), val); + } + else if (val->IsBoolean()) { + encodeBoolean(arr, keyval.c_str(), val); + } + else if (val->IsArray()) { + encodeArray(arr, keyval.c_str(), val); + } + else if (val->IsObject()) { + bson bson(encodeObject(val)); + bson_append_bson(arr, keyval.c_str(), &bson); + bson_destroy(&bson); + } + } + bson_append_finish_object(arr); +} + +bson encodeObject(const Local element) { + HandleScope scope; + bson_buffer bb; + bson_buffer_init(&bb); + + Local object = element->ToObject(); + Local properties = object->GetPropertyNames(); + + for (int i = 0; i < properties->Length(); i++) { + // get the property name and value + Local prop_name = properties->Get(Integer::New(i)); + Local prop_val = object->Get(prop_name->ToString()); + + // convert the property name to a c string + String::Utf8Value n(prop_name); + const char *pname = ToCString(n); + + // append property using appropriate appender + if (prop_val->IsString()) { + encodeString(&bb, pname, prop_val); + } + else if (prop_val->IsInt32()) { + encodeInteger(&bb, pname, prop_val); + } + else if (prop_val->IsNumber()) { + encodeNumber(&bb, pname, prop_val); + } + else if (prop_val->IsBoolean()) { + encodeBoolean(&bb, pname, prop_val); + } + else if (prop_val->IsArray()) { + encodeArray(&bb, pname, prop_val); + } + else if (prop_val->IsObject() + && ObjectID::constructor_template->HasInstance(prop_val)) { + encodeObjectID(&bb, pname, prop_val); + } + else if (prop_val->IsObject()) { + bson bson(encodeObject(prop_val)); + bson_append_bson(&bb, pname, &bson); + bson_destroy(&bson); + } + } + + bson bson; + bson_from_buffer(&bson, &bb); + + return bson; +} + +Handle +encode(const Arguments &args) { + // TODO assert args.length > 0 + // TODO assert args.type == Object + HandleScope scope; + + bson bson(encodeObject(args[0])); + Handle ret = node::Encode(bson.data, bson_size(&bson), node::BINARY); + bson_destroy(&bson); + return ret; +} + +// Decoding functions + +Handle +decodeString(bson_iterator *i) { + HandleScope scope; + const char *val = bson_iterator_string(i); + Local str = String::New(val); + return scope.Close(str); +} + +Handle +decodeObject(bson_iterator *i) { + HandleScope scope; + bson bson; + bson_iterator_subobject(i, &bson); + Handle sub = decodeObjectStr(bson.data); + return scope.Close(sub); +} + +Handle +decodeObjectID(bson_iterator *i) { + HandleScope scope; + char hex_oid[25]; + bson_oid_t *oid = bson_iterator_oid(i); + bson_oid_to_string(oid, hex_oid); + Handle argv[1]; + argv[0] = String::New(hex_oid); + + Handle obj = + ObjectID::constructor_template->GetFunction()->NewInstance(1, argv); + + return scope.Close(obj); +} + +Handle +decodeDouble(bson_iterator *i) { + HandleScope scope; + double val = bson_iterator_double_raw(i); + Local obj = Number::New(val); + return scope.Close(obj); +} + +Handle +decodeInteger(bson_iterator *i) { + HandleScope scope; + double val = bson_iterator_int_raw(i); + Local obj = Integer::New(val); + return scope.Close(obj); +} + +Handle +decodeBool(bson_iterator *i) { + HandleScope scope; + bson_bool_t val = bson_iterator_bool(i); + Handle obj = Boolean::New(val); + return scope.Close(obj); +} + +Handle +decodeArray(bson_iterator *it) { + HandleScope scope; + bson_iterator sub; + bson_iterator_subiterator(it, &sub); + Local obj = Array::New(); + + for (int i = 0; bson_iterator_next(&sub); i++) { + bson_type type = bson_iterator_type(&sub); + + const char *key = bson_iterator_key(&sub); + + switch (type) { + case bson_string: + obj->Set(Number::New(i), decodeString(&sub)); + break; + case bson_array: + obj->Set(Number::New(i), decodeArray(&sub)); + break; + case bson_object: + obj->Set(Number::New(i), decodeObject(&sub)); + break; + case bson_oid: + obj->Set(Number::New(i), decodeObjectID(&sub)); + break; + case bson_double: + obj->Set(Number::New(i), decodeDouble(&sub)); + break; + case bson_int: + obj->Set(Number::New(i), decodeInteger(&sub)); + break; + case bson_bool: + obj->Set(Number::New(i), decodeBool(&sub)); + break; + } + } + + return scope.Close(obj); +} + +Local +decodeObjectIterator(bson_iterator *it) { + HandleScope scope; + Local obj = Object::New(); + while (bson_iterator_next(it)) { + bson_type type = bson_iterator_type(it); + const char *key = bson_iterator_key(it); + + switch (type) { + case bson_string: + obj->Set(String::New(key), decodeString(it)); + break; + + case bson_array: + obj->Set(String::New(key), decodeArray(it)); + break; + + case bson_object: + obj->Set(String::New(key), decodeObject(it)); + break; + + case bson_oid: + obj->Set(String::New(key), decodeObjectID(it)); + break; + + case bson_double: + obj->Set(String::New(key), decodeDouble(it)); + break; + + case bson_int: + obj->Set(String::New(key), decodeInteger(it)); + break; + + case bson_bool: + obj->Set(String::New(key), decodeBool(it)); + break; + } + } + + return scope.Close(obj); +} + +Local +decodeObjectStr(const char *buf) { + HandleScope scope; + + bson_iterator it; + bson_iterator_init(&it, buf); + + Handle obj = decodeObjectIterator(&it); + return scope.Close(obj); +} + +Handle +decodeObject(const Local str) { + HandleScope scope; + size_t buflen = str->ToString()->Length(); + char buf[buflen]; + node::DecodeWrite(buf, buflen, str, node::BINARY); + return decodeObjectStr(buf); +} + +Handle +decode(const Arguments &args) { + HandleScope scope; + return decodeObject(args[0]); +} diff --git a/src/bson.h b/src/bson.h new file mode 100644 index 0000000..a02c5c6 --- /dev/null +++ b/src/bson.h @@ -0,0 +1,41 @@ +#ifndef NODE_BSON_H +#define NODE_BSON_H +#include +#include +#include + +using namespace v8; + +class ObjectID : public node::ObjectWrap { + public: + static v8::Persistent constructor_template; + + static void Initialize(Handle target); + + ObjectID() : ObjectWrap() {} + ~ObjectID() {} + + static Handle ToString(const Arguments &args); + + bson_oid_t get() { return oid; } + void str(char *); + protected: + static Handle New(const Arguments& args); + + ObjectID(const char *hex) : node::ObjectWrap() { + bson_oid_from_string(&oid, hex); + } + + private: + + bson_oid_t oid; +}; + +// v8 wrappers +Handle encode(const Arguments &args); +Handle decode(const Arguments &args); + +v8::Local decodeObjectStr(const char *); +bson encodeObject(const v8::Local element); + +#endif From f85fba349be70ad200c958d175f115df9fff5c1e Mon Sep 17 00:00:00 2001 From: wink Date: Fri, 24 Sep 2010 15:02:13 -0500 Subject: [PATCH 12/26] update build script --- wscript | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/wscript b/wscript index ece2b56..4e2e290 100644 --- a/wscript +++ b/wscript @@ -14,6 +14,8 @@ def configure(conf): conf.check_tool('compiler_cxx') conf.check_tool('node_addon') + #conf.check(lib='mongoc', uselib_store='MONGO',libpath='./mongo-c-driver/') + conf.env.append_value("LIBPATH_BSON", abspath("./mongo-c-driver/")) conf.env.append_value("LIB_BSON", "bson") conf.env.append_value("CPPPATH_BSON", abspath("./mongo-c-driver/src")) @@ -26,7 +28,8 @@ def build(bld): mongo = bld.new_task_gen('cxx', 'shlib', 'node_addon') mongo.cxxflags = "-g" mongo.target = 'mongo' - mongo.source = "src/mongo.cc src/bson.cc" + mongo.source = "src/Connection.cpp src/MongoConnection.cpp src/bson.cpp" + #mongo.source = "src/Mongo.cpp" mongo.uselib = "MONGO BSON" def shutdown(): From 11b006530b72b1883b23fc9c2a387b9cee0981de Mon Sep 17 00:00:00 2001 From: wink Date: Fri, 24 Sep 2010 15:05:24 -0500 Subject: [PATCH 13/26] updating readme some --- README.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 01d179e..dd8d706 100644 --- a/README.md +++ b/README.md @@ -97,11 +97,15 @@ Installation - Make sure you have git installed. - ./update-mongo-c-driver.sh - node-waf configure build -- ./run-tests.sh +- ./run-tests.sh <-- the tests may or may not work, i'll update them as i get time BUGS ---- +The entire binding backend has been rewritten with the exception of the bson parsing. Please report any bugs to me instead of bugging orlandov about them as they're likely my fault. + + +--- This package is EXPERIMENTAL, with emphasis on MENTAL. I am working on this in my spare time to learn the Node, v8 and MongoDB API's. @@ -127,4 +131,5 @@ SEE ALSO AUTHOR ------ -Orlando Vazquez (ovazquez@gmail.com) +Lee Smith (notwink@gmail.com) +Orlando Vazquez (ovazquez@gmail.com) <- Thanks for the bson parsing :) From 197db0dfdea2c9acaf89281b6640717b5692bd60 Mon Sep 17 00:00:00 2001 From: wink Date: Fri, 24 Sep 2010 15:13:01 -0500 Subject: [PATCH 14/26] tidying up a little --- src/Connection.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Connection.cpp b/src/Connection.cpp index f8aea74..d91dd25 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -38,7 +38,7 @@ Handle Connection::New(const Arguments &args) mongo->Wrap(args.This()); mongo->m_results= Persistent::New(Array::New()); mongo->m_gettingMore = false; - return args.This(); + return scope.Close(args.This()); } Handle Connection::Connect(const Arguments &args) From f5d04e8c0128959e152b774c8b307aca32d7ff7b Mon Sep 17 00:00:00 2001 From: wink Date: Fri, 24 Sep 2010 16:00:44 -0500 Subject: [PATCH 15/26] woops, hard coded port, fixed --- src/Connection.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Connection.cpp b/src/Connection.cpp index d91dd25..42734ce 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -47,8 +47,9 @@ Handle Connection::Connect(const Arguments &args) HandleScope scope; String::Utf8Value conninfo(args[0]->ToString()); - //Number::New(args[1]->ToNumber()); - connection->connect(*conninfo, 27017); + int port = args[1]->ToNumber()->Value(); + if(!port) port = 27017; + connection->connect(*conninfo, port); connection->Ref(); return scope.Close(Undefined()); } From f2fbf32045dace724b3b67d269348c3dffa22eaf Mon Sep 17 00:00:00 2001 From: wink Date: Mon, 27 Sep 2010 08:43:16 -0500 Subject: [PATCH 16/26] removed some TODOs by tearing down the connection on some errors --- src/MongoConnection.cpp | 16 +++++----------- src/MongoConnection.h | 1 - wscript | 2 +- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/MongoConnection.cpp b/src/MongoConnection.cpp index 60e5154..ea35f59 100644 --- a/src/MongoConnection.cpp +++ b/src/MongoConnection.cpp @@ -83,6 +83,8 @@ void MongoConnection::disconnect() free(m_inboundBuffer.messageBuf); memset(&m_outboundBuffer, 0, sizeof(MongoMessage)); memset(&m_inboundBuffer, 0, sizeof(MongoMessage)); + close(m_connection->sock); + m_connected = false; mongo_destroy(m_connection); onClose(); } @@ -166,7 +168,6 @@ void MongoConnection::ReadData() } else { - // TODO disconnect and clean up disconnect(); return; } @@ -248,7 +249,7 @@ void MongoConnection::WriteData() WriteWatcher(true); return; } - // TODO disconnect me + disconnect(); } m_outboundBuffer.index += bytes; @@ -271,9 +272,7 @@ void MongoConnection::IOEvent(int revents) { ReadWatcher(false); WriteWatcher(false); - // TODO - //disconnect(); - //onClose(); + disconnect(); } //pdebug("event %d %d\n", m_connection->connected, revents); @@ -281,8 +280,6 @@ void MongoConnection::IOEvent(int revents) { //pdebug("read event\n"); ReadData(); - // TODO - // read data } if(revents & EV_WRITE) @@ -296,15 +293,12 @@ void MongoConnection::IOEvent(int revents) ReadWatcher(true); onReady(); } - // TODO - // write data } if(revents & EV_ERROR) { pdebug("error event\n"); - // TODO - // handle error, probably tear down and emit onClose + disconnect(); } } diff --git a/src/MongoConnection.h b/src/MongoConnection.h index 7517dd1..86366b6 100644 --- a/src/MongoConnection.h +++ b/src/MongoConnection.h @@ -46,7 +46,6 @@ class MongoConnection bool isConnected(); - // (TODO: soon to be) virtual functions for inheriting classes to implement virtual void onConnected(); virtual void onResults(MongoMessage *message); virtual void onReady(); diff --git a/wscript b/wscript index 4e2e290..4a82a2d 100644 --- a/wscript +++ b/wscript @@ -26,7 +26,7 @@ def configure(conf): def build(bld): mongo = bld.new_task_gen('cxx', 'shlib', 'node_addon') - mongo.cxxflags = "-g" + mongo.cxxflags = "-O3" mongo.target = 'mongo' mongo.source = "src/Connection.cpp src/MongoConnection.cpp src/bson.cpp" #mongo.source = "src/Mongo.cpp" From fc1c462c0c69d50e45e898943cf5e0ca326b353a Mon Sep 17 00:00:00 2001 From: wink Date: Mon, 27 Sep 2010 11:33:48 -0500 Subject: [PATCH 17/26] adding authentication support....and fixing a bug in the process. for authentication, just add username/password to the object passed into connect. if username is found, authentication will be attempted. the bug was caused when a .find (or equivalent) was issued from within a callback from another find...was leaking a cursor, should be plugged. --- lib/mongodb.js | 89 ++++++++++++++++++++++++++++++++++------------ src/Connection.cpp | 14 +++++--- wscript | 2 +- 3 files changed, 76 insertions(+), 29 deletions(-) diff --git a/lib/mongodb.js b/lib/mongodb.js index 5de000d..c6bd99f 100644 --- a/lib/mongodb.js +++ b/lib/mongodb.js @@ -1,5 +1,6 @@ -var sys = require("sys"), - mongo = require("../lib/mongo"); +var sys = require("sys"); +var mongo = require("../lib/mongo"); +var crypto = require("crypto"); function Collection(mongo, db, name) { this.mongo = mongo; @@ -19,23 +20,22 @@ Collection.prototype.getLastError = function(callback) callback(result); }); -} - +}; Collection.prototype.find = function(query, fields, callback, limit, skip, sort) { var cmd = { - "$query" : query ? query : {}, - } + "$query" : query ? query : {} + }; if(sort) cmd.$orderby = sort; - //sys.puts(sys.inspect(cmd)); + this.mongo.addQuery(callback, this.ns, cmd, fields, limit, skip); -} +}; Collection.prototype.insert = function(obj) { this.mongo.connection.insert(this.ns, obj); -} +}; Collection.prototype.update = function(cond, obj, options) { var db_upsert = 0; @@ -44,30 +44,30 @@ Collection.prototype.update = function(cond, obj, options) { db_multi_update = options != null && options['multi'] != null ? (options['multi'] == true ? 1 : 0) : db_multi_update; flags = parseInt(db_multi_update.toString() + db_upsert.toString(), 2); this.mongo.connection.update(this.ns, cond, obj, flags); -} +}; Collection.prototype.remove = function(query) { this.mongo.connection.remove(this.ns, query); -} +}; Collection.prototype.find_one = function(query, fields, ns, callback) { this.mongo.addQuery(function (results) { // XXX what if result.Length < 1 callback(results[0]); }, ns || this.ns, query, fields, 1); -} +}; Collection.prototype.count = function(query, callback) { ns = this.db + ".$cmd"; var cmd = { "count": this.name, "query": query - } + }; this.find_one(cmd, {}, ns, function (result) { callback(result.n); }); -} +}; function MongoDB() { this.myID = Math.random(); @@ -86,13 +86,55 @@ function MongoDB() { }); this.connection.addListener("connection", function () { + if(self.username) // if we see a username, authenticate + { + var ns = self.db + ".$cmd"; + var cmd = { + "query": { getnonce: 1} + }; + + self.addQuery(function (results) + { + var nonce = results[0].nonce; + var authcmd = {}; + authcmd.authenticate = 1; + authcmd.user = self.username; + authcmd.nonce = nonce; + + var hash = crypto.createHash('md5'); + hash.update(self.username + ":mongo:" + self.password); + var pwd = hash.digest('hex'); + + var hash = crypto.createHash('md5'); + hash.update(nonce + self.username + pwd); + authcmd.key = hash.digest('hex'); + + var ns = self.db + ".$cmd"; + var cmd = { + "query": authcmd + }; + self.addQuery(function(results) + { + if(results[0].ok == 1) + { + self.isReady = true; + self.emit("connection", self); + } + else + sys.puts("authentication error!"); + }, ns, cmd, {}, 1); + self.dispatch(); + }, ns, cmd, {}, 1); + self.dispatch(); + return; + } self.emit("connection", self); }); this.connection.addListener("result", function(result) { var callback = self.currentQuery[0]; - callback(result); self.currentQuery = null; + callback(result); }); } @@ -103,13 +145,14 @@ MongoDB.prototype.connect = function(args) { this.hostname = args.hostname || "127.0.0.1"; this.port = args.port || 27017; this.db = args.db; - + this.username = args.username; + this.password = args.password; this.connection.connect(this.hostname, this.port); -} +}; MongoDB.prototype.close = function() { this.connection.close(); -} +}; MongoDB.prototype.addQuery = function(callback, ns, query, fields, limit, skip ) { var q = [ callback, ns ]; @@ -120,17 +163,17 @@ MongoDB.prototype.addQuery = function(callback, ns, query, fields, limit, skip ) this.queries.push(q); if(this.isReady) this.dispatch(); -} +}; MongoDB.prototype.dispatch = function() { if (this.currentQuery || !this.queries.length) return; this.currentQuery = this.queries.shift(); this.connection.find.apply(this.connection, this.currentQuery.slice(1)); -} +}; MongoDB.prototype.getCollection = function(name) { return new Collection(this, this.db, name); -} +}; MongoDB.prototype.getCollections = function(callback) { this.addQuery(function (results) { @@ -143,7 +186,7 @@ MongoDB.prototype.getCollections = function(callback) { callback(collections); }, this.db + ".system.namespaces"); -} +}; exports.MongoDB = MongoDB; -exports.ObjectID = mongo.ObjectID +exports.ObjectID = mongo.ObjectID; diff --git a/src/Connection.cpp b/src/Connection.cpp index 42734ce..7cc9ff8 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -143,6 +143,13 @@ void Connection::onResults(MongoMessage *message) return; } + if(m_cursor) + { + // free cursor now in case the results callback issues another find causing a leak + free((void *)m_cursor->ns); + free(m_cursor); + m_cursor = NULL; + } Emit(String::New("result"), 1, reinterpret_cast *>(&m_results)); @@ -151,8 +158,7 @@ void Connection::onResults(MongoMessage *message) m_results.Dispose(); m_results = Persistent::New(Array::New()); m_gettingMore = false; - free((void *)m_cursor->ns); - free(m_cursor); + scope.Close(Undefined()); } @@ -332,8 +338,7 @@ Handle Connection::Find(const Arguments &args) pdebug("find on: %s\n", *ns); - mongo_cursor *cursor = static_cast( - bson_malloc(sizeof(mongo_cursor))); + mongo_cursor *cursor = static_cast(bson_malloc(sizeof(mongo_cursor))); if(nToReturn > 0) // track limit so cursor can advance until necessary connection->m_recLimit = nToReturn; @@ -364,7 +369,6 @@ Handle Connection::Find(const Arguments &args) bson_fatal_msg((data == ((char*)mm) + mm->head.len), "query building fail!"); - connection->WriteMessage(mm); connection->m_cursor = cursor; diff --git a/wscript b/wscript index 4a82a2d..4e2e290 100644 --- a/wscript +++ b/wscript @@ -26,7 +26,7 @@ def configure(conf): def build(bld): mongo = bld.new_task_gen('cxx', 'shlib', 'node_addon') - mongo.cxxflags = "-O3" + mongo.cxxflags = "-g" mongo.target = 'mongo' mongo.source = "src/Connection.cpp src/MongoConnection.cpp src/bson.cpp" #mongo.source = "src/Mongo.cpp" From 32b562784ac0b66af21f7015f680c1fc9410236b Mon Sep 17 00:00:00 2001 From: wink Date: Mon, 27 Sep 2010 12:27:13 -0500 Subject: [PATCH 18/26] no real changes --- src/Connection.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Connection.cpp b/src/Connection.cpp index 7cc9ff8..264a060 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -142,7 +142,7 @@ void Connection::onResults(MongoMessage *message) scope.Close(Undefined()); return; } - + if(m_cursor) { // free cursor now in case the results callback issues another find causing a leak @@ -150,7 +150,7 @@ void Connection::onResults(MongoMessage *message) free(m_cursor); m_cursor = NULL; } - + Emit(String::New("result"), 1, reinterpret_cast *>(&m_results)); // clean up and reset state From 2895b9f3a229c3b5be44bfb6cab79681956bb9eb Mon Sep 17 00:00:00 2001 From: wink Date: Mon, 27 Sep 2010 12:31:50 -0500 Subject: [PATCH 19/26] needed to potentially dispatch after results to handle weird query nesting --- lib/mongodb.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/mongodb.js b/lib/mongodb.js index c6bd99f..35cd54f 100644 --- a/lib/mongodb.js +++ b/lib/mongodb.js @@ -135,6 +135,7 @@ function MongoDB() { var callback = self.currentQuery[0]; self.currentQuery = null; callback(result); + self.dispatch(); }); } From f4c76eb2b08cbbaaddd3f28d90d69eb39fbcb69f Mon Sep 17 00:00:00 2001 From: wink Date: Mon, 27 Sep 2010 14:54:01 -0500 Subject: [PATCH 20/26] adding note --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index dd8d706..35dab31 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,8 @@ BUGS The entire binding backend has been rewritten with the exception of the bson parsing. Please report any bugs to me instead of bugging orlandov about them as they're likely my fault. +- need to compartmentalize limit/skip so that embedded finds dont inherit those from the parents + --- This package is EXPERIMENTAL, with emphasis on MENTAL. I am working on this in my spare time to learn the Node, v8 and MongoDB API's. From 085d52305b4de908c9e081df21b7b899393d980d Mon Sep 17 00:00:00 2001 From: wink Date: Mon, 27 Sep 2010 15:13:08 -0500 Subject: [PATCH 21/26] silly bug, was only writing limits > 0, so once a limit was set it was stuck... according to the mongo docs, a limit of 0 is no limit at all... g2g --- README.md | 2 -- src/Connection.cpp | 4 +++- src/MongoConnection.cpp | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 35dab31..dd8d706 100644 --- a/README.md +++ b/README.md @@ -105,8 +105,6 @@ BUGS The entire binding backend has been rewritten with the exception of the bson parsing. Please report any bugs to me instead of bugging orlandov about them as they're likely my fault. -- need to compartmentalize limit/skip so that embedded finds dont inherit those from the parents - --- This package is EXPERIMENTAL, with emphasis on MENTAL. I am working on this in my spare time to learn the Node, v8 and MongoDB API's. diff --git a/src/Connection.cpp b/src/Connection.cpp index 264a060..05ca8fb 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -79,6 +79,8 @@ void Connection::onResults(MongoMessage *message) cursor.mm = reply; cursor.current.data = NULL; + pdebug("%llu:\t limit: %d\n", reply->fields.cursorID, m_recLimit); + if(!m_gettingMore) { m_results.Dispose(); @@ -340,7 +342,7 @@ Handle Connection::Find(const Arguments &args) mongo_cursor *cursor = static_cast(bson_malloc(sizeof(mongo_cursor))); - if(nToReturn > 0) // track limit so cursor can advance until necessary + //if(nToReturn > 0) // track limit so cursor can advance until necessary connection->m_recLimit = nToReturn; int sl = strlen(*ns)+1; diff --git a/src/MongoConnection.cpp b/src/MongoConnection.cpp index ea35f59..bb829d3 100644 --- a/src/MongoConnection.cpp +++ b/src/MongoConnection.cpp @@ -173,7 +173,7 @@ void MongoConnection::ReadData() } } - pdebug("realloc: %p\t%d bytes -- %d + %d\n", m_inboundBuffer.messageBuf, m_inboundBuffer.messageLen+len, m_inboundBuffer.messageLen, len); + //pdebug("realloc: %p\t%d bytes -- %d + %d\n", m_inboundBuffer.messageBuf, m_inboundBuffer.messageLen+len, m_inboundBuffer.messageLen, len); m_inboundBuffer.messageBuf = (char *) realloc(m_inboundBuffer.messageBuf, m_inboundBuffer.bufferLen+len); m_inboundBuffer.index = m_inboundBuffer.messageBuf + m_inboundBuffer.bufferLen; memcpy(m_inboundBuffer.index, readbuf, len); @@ -218,7 +218,7 @@ void MongoConnection::WriteMessage(mongo_message *message) pdebug("adding %d bytes to outbound buffer (%d)\n", len, m_outboundBuffer.messageLen); int currentOffset = m_outboundBuffer.index - m_outboundBuffer.messageBuf; - pdebug("realloc: %p\t%d bytes\n", m_outboundBuffer.messageBuf, m_outboundBuffer.messageLen+len); + //pdebug("realloc: %p\t%d bytes\n", m_outboundBuffer.messageBuf, m_outboundBuffer.messageLen+len); m_outboundBuffer.messageBuf = (char *) realloc(m_outboundBuffer.messageBuf, m_outboundBuffer.messageLen+len); m_outboundBuffer.index = m_outboundBuffer.messageBuf + currentOffset; // adjust write ptr in case of relocation From 34a04ed3cdd2198b52a29448de8198b2c639d116 Mon Sep 17 00:00:00 2001 From: wink Date: Mon, 27 Sep 2010 17:42:45 -0500 Subject: [PATCH 22/26] some performance tuning, need to ask how to handle the problem of too many small reads the driver is now on par with the php driver as far as large datasets go, the native driver fails due to running out of memory --- src/Connection.cpp | 16 +++++++++++----- src/MongoConnection.cpp | 4 +++- wscript | 2 +- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/Connection.cpp b/src/Connection.cpp index 05ca8fb..8c1ee63 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -38,6 +38,8 @@ Handle Connection::New(const Arguments &args) mongo->Wrap(args.This()); mongo->m_results= Persistent::New(Array::New()); mongo->m_gettingMore = false; + mongo->m_recLimit = 0; + mongo->m_recCount = 0; return scope.Close(args.This()); } @@ -121,11 +123,15 @@ void Connection::onResults(MongoMessage *message) } } - if(reply->fields.cursorID && m_recCount < m_recLimit) // get more data + if(reply->fields.cursorID)// && m_recCount < m_recLimit) // get more data { - pdebug("need to get more data\n"); m_gettingMore = true; - unsigned int count = m_recLimit-m_recCount; + unsigned int count = 0; + + if(m_recLimit > 0) + count = m_recLimit-m_recCount; + + pdebug("need to get more data %d\n", count); mongo_connection* conn = cursor.conn; char* data; int sl = strlen(m_cursor->ns)+1; @@ -338,12 +344,12 @@ Handle Connection::Find(const Arguments &args) nToSkip = args[4]->Int32Value(); } - pdebug("find on: %s\n", *ns); + pdebug("find on: %s (%d)\n", *ns, nToReturn); mongo_cursor *cursor = static_cast(bson_malloc(sizeof(mongo_cursor))); //if(nToReturn > 0) // track limit so cursor can advance until necessary - connection->m_recLimit = nToReturn; + connection->m_recLimit = nToReturn; int sl = strlen(*ns)+1; cursor->ns = static_cast(bson_malloc(sl)); diff --git a/src/MongoConnection.cpp b/src/MongoConnection.cpp index bb829d3..d68287b 100644 --- a/src/MongoConnection.cpp +++ b/src/MongoConnection.cpp @@ -1,6 +1,7 @@ #include "MongoConnection.h" -#define chunksize 8092 +//#define chunksize 16384 +#define chunksize 8192 MongoConnection::MongoConnection() { @@ -158,6 +159,7 @@ void MongoConnection::ReadData() while(true) { // read the largest chunk we can + usleep(100); int len = read(m_connection->sock, readbuf, chunksize); if(len == -1) { diff --git a/wscript b/wscript index 4e2e290..4a82a2d 100644 --- a/wscript +++ b/wscript @@ -26,7 +26,7 @@ def configure(conf): def build(bld): mongo = bld.new_task_gen('cxx', 'shlib', 'node_addon') - mongo.cxxflags = "-g" + mongo.cxxflags = "-O3" mongo.target = 'mongo' mongo.source = "src/Connection.cpp src/MongoConnection.cpp src/bson.cpp" #mongo.source = "src/Mongo.cpp" From 2775be1ad3acaf2c6d2955c6f3e770dad4778ce8 Mon Sep 17 00:00:00 2001 From: wink Date: Tue, 28 Sep 2010 09:37:06 -0500 Subject: [PATCH 23/26] adding a drained event to writes to allow a loop of inserts to know when it can properly tear down the connection --- lib/mongodb.js | 4 ++++ src/Connection.cpp | 4 ++++ src/Connection.h | 1 + src/MongoConnection.cpp | 10 ++++++++-- src/MongoConnection.h | 1 + wscript | 2 +- 6 files changed, 19 insertions(+), 3 deletions(-) diff --git a/lib/mongodb.js b/lib/mongodb.js index 35cd54f..edf525c 100644 --- a/lib/mongodb.js +++ b/lib/mongodb.js @@ -80,6 +80,10 @@ function MongoDB() { self.emit("close"); }); + this.connection.addListener("drained", function() { + self.emit("drained"); + }); + this.connection.addListener("ready", function () { this.isReady = true; self.dispatch(); diff --git a/src/Connection.cpp b/src/Connection.cpp index 8c1ee63..ccd35cf 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -72,6 +72,10 @@ void Connection::onClose() Emit(String::New("close"), 0, NULL); } +void Connection::onDrained() +{ + Emit(String::New("drained"), 0, NULL); +} void Connection::onResults(MongoMessage *message) { HandleScope scope; diff --git a/src/Connection.h b/src/Connection.h index d0a2137..a973be8 100644 --- a/src/Connection.h +++ b/src/Connection.h @@ -46,6 +46,7 @@ class Connection : public node::EventEmitter, MongoConnection void onReady(); void onResults(MongoMessage *message); void onClose(); + void onDrained(); Persistent m_results; //Handle results; diff --git a/src/MongoConnection.cpp b/src/MongoConnection.cpp index d68287b..eb3effa 100644 --- a/src/MongoConnection.cpp +++ b/src/MongoConnection.cpp @@ -80,8 +80,8 @@ void MongoConnection::disconnect() { ReadWatcher(false); WriteWatcher(false); - free(m_outboundBuffer.messageBuf); - free(m_inboundBuffer.messageBuf); + if(m_outboundBuffer.messageBuf) free(m_outboundBuffer.messageBuf); + if(m_inboundBuffer.messageBuf) free(m_inboundBuffer.messageBuf); memset(&m_outboundBuffer, 0, sizeof(MongoMessage)); memset(&m_inboundBuffer, 0, sizeof(MongoMessage)); close(m_connection->sock); @@ -105,6 +105,11 @@ void MongoConnection::onClose() pdebug("!!! closing connection\n"); } +void MongoConnection::onDrained() +{ + pdebug("!!! outbound drained\n"); +} + void MongoConnection::onResults(MongoMessage *message) { pdebug("!!!results\n"); @@ -263,6 +268,7 @@ void MongoConnection::WriteData() free(m_outboundBuffer.messageBuf); memset(&m_outboundBuffer, 0 , sizeof(MongoMessage)); WriteWatcher(false); + onDrained(); } else WriteWatcher(true); // not done writing yet diff --git a/src/MongoConnection.h b/src/MongoConnection.h index 86366b6..887cff8 100644 --- a/src/MongoConnection.h +++ b/src/MongoConnection.h @@ -50,6 +50,7 @@ class MongoConnection virtual void onResults(MongoMessage *message); virtual void onReady(); virtual void onClose(); + virtual void onDrained(); // IO void WriteMessage(mongo_message *message); diff --git a/wscript b/wscript index 4a82a2d..4e2e290 100644 --- a/wscript +++ b/wscript @@ -26,7 +26,7 @@ def configure(conf): def build(bld): mongo = bld.new_task_gen('cxx', 'shlib', 'node_addon') - mongo.cxxflags = "-O3" + mongo.cxxflags = "-g" mongo.target = 'mongo' mongo.source = "src/Connection.cpp src/MongoConnection.cpp src/bson.cpp" #mongo.source = "src/Mongo.cpp" From 6de8b1815250f653f80a8380dd9812fcd053427a Mon Sep 17 00:00:00 2001 From: wink Date: Tue, 28 Sep 2010 09:43:19 -0500 Subject: [PATCH 24/26] drained event may need some more thought, but it works for the limited scope i needed it... play with some more use cases when time permits for(var i = 0; i < 10000; i++) { collection.insert({ count: i}); } mongo.addListener("drained", function() { mongo.removeAllListeners("drained"); mongo.close(); }); ugly, but its a reasonable tell that closing the connection isnt going to result in inserts being dropped by the socket at least. --- src/MongoConnection.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/MongoConnection.cpp b/src/MongoConnection.cpp index eb3effa..c557bb7 100644 --- a/src/MongoConnection.cpp +++ b/src/MongoConnection.cpp @@ -164,7 +164,7 @@ void MongoConnection::ReadData() while(true) { // read the largest chunk we can - usleep(100); + usleep(100); // this is dirty but drastically increases performance on large result sets int len = read(m_connection->sock, readbuf, chunksize); if(len == -1) { From 9fed89771594ca0f9c85cf6d60c13efdbfba0be2 Mon Sep 17 00:00:00 2001 From: wink Date: Fri, 12 Aug 2011 13:18:28 -0500 Subject: [PATCH 25/26] off by one, woops --- src/MongoConnection.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/MongoConnection.cpp b/src/MongoConnection.cpp index c557bb7..765bae5 100644 --- a/src/MongoConnection.cpp +++ b/src/MongoConnection.cpp @@ -179,13 +179,13 @@ void MongoConnection::ReadData() return; } } - + pdebug("read %d bytes need %d\n", len, sizeof(mongo_header)+sizeof(mongo_reply_fields)); //pdebug("realloc: %p\t%d bytes -- %d + %d\n", m_inboundBuffer.messageBuf, m_inboundBuffer.messageLen+len, m_inboundBuffer.messageLen, len); m_inboundBuffer.messageBuf = (char *) realloc(m_inboundBuffer.messageBuf, m_inboundBuffer.bufferLen+len); m_inboundBuffer.index = m_inboundBuffer.messageBuf + m_inboundBuffer.bufferLen; memcpy(m_inboundBuffer.index, readbuf, len); - - if(!m_inboundBuffer.messageLen && len > sizeof(mongo_header)+sizeof(mongo_reply_fields)) // set up header struct so we can watch for completed messages + + if(!m_inboundBuffer.messageLen && len >= sizeof(mongo_header)+sizeof(mongo_reply_fields)) // set up header struct so we can watch for completed messages { memcpy(&m_inboundBuffer.messageHeader, m_inboundBuffer.index, sizeof(mongo_header)); m_inboundBuffer.index += sizeof(mongo_header); From 3627415dff5d38580c2d8dd2c608387291803fc9 Mon Sep 17 00:00:00 2001 From: wink Date: Fri, 12 Aug 2011 13:20:10 -0500 Subject: [PATCH 26/26] dispatching in edge cases as well as just cleaning up some (potential) scoping mess --- lib/mongodb.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/mongodb.js b/lib/mongodb.js index edf525c..688f969 100644 --- a/lib/mongodb.js +++ b/lib/mongodb.js @@ -29,7 +29,6 @@ Collection.prototype.find = function(query, fields, callback, limit, skip, sort) if(sort) cmd.$orderby = sort; - this.mongo.addQuery(callback, this.ns, cmd, fields, limit, skip); }; @@ -76,7 +75,7 @@ function MongoDB() { var self = this; this.connection.addListener("close", function () { - this.isReady = false; + self.isReady = false; self.emit("close"); }); @@ -85,7 +84,7 @@ function MongoDB() { }); this.connection.addListener("ready", function () { - this.isReady = true; + self.isReady = true; self.dispatch(); }); @@ -132,6 +131,7 @@ function MongoDB() { self.dispatch(); return; } + self.dispatch(); self.emit("connection", self); });