From 2e207158d20c616ed37f69b8f363eff614658120 Mon Sep 17 00:00:00 2001 From: Ali Volkan ATLI Date: Mon, 2 Nov 2015 14:38:35 +0200 Subject: [PATCH 01/10] Update Connection.h On success, the number of bytes read function is returned zero indicates end of peer connection --- lib/src/Connection.h | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/src/Connection.h b/lib/src/Connection.h index 8c7d26a..e9237f0 100644 --- a/lib/src/Connection.h +++ b/lib/src/Connection.h @@ -42,6 +42,7 @@ class Connection static const int SOCKET_UNINITIALIZED = -1; static const int OPEN_CONNECTION_ERROR = -1; static const int READ_ERROR = -1; + static const int END_OF_CONNECTION_ERROR = 0; static const int WRITE_ERROR = -1; Connection(std::string host, int port); From dfb0e4859cdf693ecaeb51ef7ed16e14728405c2 Mon Sep 17 00:00:00 2001 From: Ali Volkan ATLI Date: Mon, 2 Nov 2015 14:48:10 +0200 Subject: [PATCH 02/10] Update Connection.cc recv function: On success, the number of bytes read function is returned zero indicates end of peer connection send function: Requests not to send SIGPIPE on errors on stream oriented sockets when the other end breaks the connection. The EPIPE error is still returned. --- lib/src/Connection.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/src/Connection.cc b/lib/src/Connection.cc index 598e479..4cc730e 100644 --- a/lib/src/Connection.cc +++ b/lib/src/Connection.cc @@ -43,6 +43,7 @@ const int Connection::DEFAULT_BUFFER_SIZE; const int Connection::SOCKET_UNINITIALIZED; const int Connection::OPEN_CONNECTION_ERROR; const int Connection::READ_ERROR; +const int Connection::END_OF_CONNECTION_ERROR; const int Connection::WRITE_ERROR; Connection::Connection(string host, int port) @@ -141,7 +142,7 @@ int Connection::read(int numBytes, unsigned char* buffer) while (numBytesReceived < numBytes) { int rcvd = (int)::recv(this->socketFd, p, (size_t)(numBytes-numBytesReceived), flags); - if (rcvd == READ_ERROR) { E("Connection::read():error:" << strerror(errno) << "\n"); break; } + if (rcvd == READ_ERROR || rcvd == END_OF_CONNECTION_ERROR) {E("Connection::read():error:" << strerror(errno) << "\n"); break; } p += rcvd; numBytesReceived += rcvd; D(cout.flush() << "--------------Connection::read(" << numBytes << "):read " << rcvd << " bytes\n";) @@ -155,7 +156,10 @@ int Connection::write(int numBytes, unsigned char* buffer) { D(cout.flush() << "--------------Connection::write(" << numBytes << ")\n";) - int flags = 0; + // MSG_NOSIGNAL (since Linux 2.2) + // The local end has been "shut down" on a connection oriented socket. In this case the process will also receive a SIGPIPE unless MSG_NOSIGNAL is set. + + int flags = MSG_NOSIGNAL; int numBytesSent = (int)::send(this->socketFd, (const void*)buffer, (ssize_t)numBytes, flags); if (numBytesSent == WRITE_ERROR) { E("Connection::write():error:" << strerror(errno) << "\n"); } D(cout.flush() << "--------------Connection::write(" << numBytes << "):wrote " << numBytesSent << "bytes\n";) From 2756ea048beceb57d43671c1415a84eda177753b Mon Sep 17 00:00:00 2001 From: Ali Volkan ATLI Date: Mon, 2 Nov 2015 14:57:12 +0200 Subject: [PATCH 03/10] Update Client.cc When we have an error on the connection between pairs, we need to delete the socket conn before try to connect again. --- lib/src/Client.cc | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/lib/src/Client.cc b/lib/src/Client.cc index 7da175d..c61f9d9 100644 --- a/lib/src/Client.cc +++ b/lib/src/Client.cc @@ -84,11 +84,7 @@ ResponseClass *Client::apiCall(RequestClass *request) if (!this->prepareConnection()) { E("Client::apiCall():unable to create connection"); return NULL; } int status = this->sendRequest(request); - if (status == Connection::WRITE_ERROR) - { - E("Client::apiCall():sendRequest() error:" << strerror(errno) << "\n"); - return NULL; - } + if (status == Connection::WRITE_ERROR) { E("Client::apiCall():sendRequest() error:" << strerror(errno) << "\n"); return NULL; } D(cout.flush() << "Client::apiCall:" << typeid(RequestClass).name() << " sent:\n" << *request;) @@ -112,7 +108,11 @@ int Client::sendRequest(Request *request) unsigned char *buffer = request->toWireFormat(); int numBytesSent = this->connection->write(request->size(), buffer); - if (numBytesSent == Connection::WRITE_ERROR) { E("Client::sendRequest():write error:" << strerror(errno) << "\n"); return numBytesSent; } + if (numBytesSent == Connection::WRITE_ERROR) { + E("Client::sendRequest():write error:" << strerror(errno) << "\n"); + delete this->connection; this->connection = NULL; + return numBytesSent; + } D(cout.flush() << "Client::sendRequest():request sent:numBytes:" << numBytesSent << "\n";) return numBytesSent; } @@ -126,13 +126,21 @@ ResponseClass *Client::receiveResponse() int netValueSize = -1; int numBytesReceived = this->connection->read(sizeof(int), (unsigned char *)(&netValueSize)); - if (numBytesReceived == Connection::READ_ERROR) { E("Client::receiveResponse():read error on size:" << strerror(errno) << "\n"); return NULL; } + if (numBytesReceived == Connection::READ_ERROR || numBytesReceived == Connection::END_OF_CONNECTION_ERROR) { + E("Client::receiveResponse():read error on size:" << strerror(errno) << "\n"); + delete this->connection; this->connection = NULL; + return NULL; + } int hostValueSize = ntohl(netValueSize); D(cout.flush() << "Client::receiveResponse():incoming response:size:" << hostValueSize << "\n";) unsigned char *buffer = new unsigned char[hostValueSize+sizeof(int)]; // add space for int32 size memcpy(buffer, &netValueSize, sizeof(int)); numBytesReceived = this->connection->read(hostValueSize, buffer + sizeof(int)); - if (numBytesReceived == Connection::READ_ERROR) { E("Client::receiveResponse():read error on body:" << strerror(errno) << "\n"); return NULL; } + if (numBytesReceived == Connection::READ_ERROR || numBytesReceived == Connection::END_OF_CONNECTION_ERROR) { + E("Client::receiveResponse():read error on body:" << strerror(errno) << "\n"); + delete this->connection; this->connection = NULL; + return NULL; + } return new ResponseClass(buffer, true); // true specfies delete buffer on ~Response() } From 464b724fbe02f3f9d0b1b70b668f9dac5a597999 Mon Sep 17 00:00:00 2001 From: Ali Volkan ATLI Date: Mon, 2 Nov 2015 14:59:55 +0200 Subject: [PATCH 04/10] Update Client.cc --- lib/src/Client.cc | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/src/Client.cc b/lib/src/Client.cc index c61f9d9..cc04a55 100644 --- a/lib/src/Client.cc +++ b/lib/src/Client.cc @@ -84,7 +84,11 @@ ResponseClass *Client::apiCall(RequestClass *request) if (!this->prepareConnection()) { E("Client::apiCall():unable to create connection"); return NULL; } int status = this->sendRequest(request); - if (status == Connection::WRITE_ERROR) { E("Client::apiCall():sendRequest() error:" << strerror(errno) << "\n"); return NULL; } + if (status == Connection::WRITE_ERROR) + { + E("Client::apiCall():sendRequest() error:" << strerror(errno) << "\n"); + return NULL; + } D(cout.flush() << "Client::apiCall:" << typeid(RequestClass).name() << " sent:\n" << *request;) @@ -126,7 +130,8 @@ ResponseClass *Client::receiveResponse() int netValueSize = -1; int numBytesReceived = this->connection->read(sizeof(int), (unsigned char *)(&netValueSize)); - if (numBytesReceived == Connection::READ_ERROR || numBytesReceived == Connection::END_OF_CONNECTION_ERROR) { + if (numBytesReceived == Connection::READ_ERROR || numBytesReceived == Connection::END_OF_CONNECTION_ERROR) + { E("Client::receiveResponse():read error on size:" << strerror(errno) << "\n"); delete this->connection; this->connection = NULL; return NULL; @@ -136,7 +141,8 @@ ResponseClass *Client::receiveResponse() unsigned char *buffer = new unsigned char[hostValueSize+sizeof(int)]; // add space for int32 size memcpy(buffer, &netValueSize, sizeof(int)); numBytesReceived = this->connection->read(hostValueSize, buffer + sizeof(int)); - if (numBytesReceived == Connection::READ_ERROR || numBytesReceived == Connection::END_OF_CONNECTION_ERROR) { + if (numBytesReceived == Connection::READ_ERROR || numBytesReceived == Connection::END_OF_CONNECTION_ERROR) + { E("Client::receiveResponse():read error on body:" << strerror(errno) << "\n"); delete this->connection; this->connection = NULL; return NULL; From 8f2664d783b0352514bcdbe38495c0436ae81b06 Mon Sep 17 00:00:00 2001 From: Ali Volkan ATLI Date: Mon, 2 Nov 2015 15:33:08 +0200 Subject: [PATCH 05/10] Update Request.cc --- lib/src/Request.cc | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/src/Request.cc b/lib/src/Request.cc index cc114f9..6648fcb 100644 --- a/lib/src/Request.cc +++ b/lib/src/Request.cc @@ -58,6 +58,17 @@ Request::Request(short int apiKey, short int apiVersion, int correlationId, stri this->clientId = clientId; } +Request::Request(short int apiKey, short int apiVersion, int correlationId, string clientId, long bufferSize) : RequestOrResponse(bufferSize) +{ + D(cout.flush() << "--------------Request(params)\n";) + + this->apiKey = apiKey; + this->apiVersion = apiVersion; + this->correlationId = correlationId; + this->clientId = clientId; +} + + unsigned char* Request::toWireFormat(bool updatePacketSize) { unsigned char* buffer = this->RequestOrResponse::toWireFormat(false); From a608c131dfbbb49d2153ad5ef46f903f6851ab04 Mon Sep 17 00:00:00 2001 From: Ali Volkan ATLI Date: Mon, 2 Nov 2015 15:33:41 +0200 Subject: [PATCH 06/10] Update Request.h --- lib/src/Request.h | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/src/Request.h b/lib/src/Request.h index e6efba8..e1d2fb9 100644 --- a/lib/src/Request.h +++ b/lib/src/Request.h @@ -42,6 +42,7 @@ class Request : public RequestOrResponse Request(unsigned char *buffer, bool releaseBuffer = false); Request(short int apiKey, short int apiVersion, int correlationId, std::string clientId); + Request(short int apiKey, short int apiVersion, int correlationId, std::string clientId, long bufferSize); unsigned char* toWireFormat(bool updatePacketSize = true); int getWireFormatSize(bool includePacketSize = true); From a40f027b0c686372e44880769c89e2ce1b7f00c1 Mon Sep 17 00:00:00 2001 From: Ali Volkan ATLI Date: Mon, 2 Nov 2015 15:34:16 +0200 Subject: [PATCH 07/10] Update RequestOrResponse.cc --- lib/src/RequestOrResponse.cc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/src/RequestOrResponse.cc b/lib/src/RequestOrResponse.cc index f5d6aaf..fb00d49 100644 --- a/lib/src/RequestOrResponse.cc +++ b/lib/src/RequestOrResponse.cc @@ -48,6 +48,13 @@ RequestOrResponse::RequestOrResponse() : WireFormatter() D(cout.flush() << "--------------RequestOrResponse(params)\n";) } +RequestOrResponse::RequestOrResponse(long bufferSize) : WireFormatter() +{ + this->packet = new Packet(bufferSize); + + D(cout.flush() << "--------------RequestOrResponse(params)\n";) +} + RequestOrResponse::~RequestOrResponse() { delete this->packet; From 32301dded406a21acbc9411221028ec5518d6a5a Mon Sep 17 00:00:00 2001 From: Ali Volkan ATLI Date: Mon, 2 Nov 2015 15:34:53 +0200 Subject: [PATCH 08/10] Update RequestOrResponse.h --- lib/src/RequestOrResponse.h | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/src/RequestOrResponse.h b/lib/src/RequestOrResponse.h index 54221ef..a661197 100644 --- a/lib/src/RequestOrResponse.h +++ b/lib/src/RequestOrResponse.h @@ -39,6 +39,7 @@ class RequestOrResponse : public WireFormatter, public PacketWriter public: RequestOrResponse(); + RequestOrResponse(long bufferSize); RequestOrResponse(unsigned char *buffer, bool releaseBuffer = false); ~RequestOrResponse(); From 895a6de5e2f783caf0729db8b189ae8de93e0fb7 Mon Sep 17 00:00:00 2001 From: Ali Volkan ATLI Date: Wed, 4 Nov 2015 15:39:24 +0200 Subject: [PATCH 09/10] Update ProduceRequest.h see Issue #4 https://github.com/adobe-research/libkafka/issues/4 --- lib/src/produce/ProduceRequest.h | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/src/produce/ProduceRequest.h b/lib/src/produce/ProduceRequest.h index b0d8403..876808f 100644 --- a/lib/src/produce/ProduceRequest.h +++ b/lib/src/produce/ProduceRequest.h @@ -48,6 +48,7 @@ class ProduceRequest : public Request ProduceRequest(unsigned char *buffer, bool releaseBuffer = false); ProduceRequest(int correlationId, std::string clientId, short int requiredAcks, int timeout, int produceTopicArraySize, TopicNameBlock **produceTopicArray, bool releaseArrays = false); + ProduceRequest(int correlationId, std::string clientId, short int requiredAcks, int timeout, int produceTopicArraySize, TopicNameBlock **produceTopicArray, long bufferSize, bool releaseArrays = false); ~ProduceRequest(); unsigned char* toWireFormat(bool updatePacketSize = true); From 42e6bcff0097afa8716adfd2f7ee44ef7e00542d Mon Sep 17 00:00:00 2001 From: Ali Volkan ATLI Date: Wed, 4 Nov 2015 15:43:16 +0200 Subject: [PATCH 10/10] Update ProduceRequest.cc See issue #4 --- lib/src/produce/ProduceRequest.cc | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib/src/produce/ProduceRequest.cc b/lib/src/produce/ProduceRequest.cc index 99c6f12..524981b 100644 --- a/lib/src/produce/ProduceRequest.cc +++ b/lib/src/produce/ProduceRequest.cc @@ -61,6 +61,18 @@ namespace LibKafka { this->produceTopicArray = produceTopicArray; this->releaseArrays = releaseArrays; } + + ProduceRequest::ProduceRequest(int correlationId, std::string clientId, short int requiredAcks, int timeout, int produceTopicArraySize, TopicNameBlock** produceTopicArray, long bufferSize, bool releaseArrays) : Request(ApiConstants::PRODUCE_REQUEST_KEY, ApiConstants::API_VERSION, correlationId, clientId, bufferSize) + { + D(cout.flush() << "--------------ProduceRequest(params)\n";) + + this->requiredAcks = requiredAcks; + this->timeout = timeout; + this->produceTopicArraySize = produceTopicArraySize; + this->produceTopicArray = produceTopicArray; + this->releaseArrays = releaseArrays; + } + ProduceRequest::~ProduceRequest() {