Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 65 additions & 6 deletions src/Handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,24 @@ bool Handler::preHandleRequest(Request* req, const String& key)
case Command::Scan:
{
auto sp = mProxy->serverPool();
unsigned long cursor = atol(key.data());
int groupIdx = cursor & Const::ServGroupMask;
// Use 128-bit integer to handle large cursor values
__uint128_t cursor = 0;
const char* cursorStr = key.data();
// Manually parse string to 128-bit integer
while (*cursorStr >= '0' && *cursorStr <= '9') {
cursor = cursor * 10 + (*cursorStr - '0');
cursorStr++;
}

// Debug log: cursor received from client
char cursorBuf[64];
Util::uint128ToString(cursor, cursorBuf);

int groupIdx = (unsigned long)(cursor & Const::ServGroupMask);
auto g = sp->getGroup(groupIdx);
if (!g) {
logDebug("h %d SCAN cursor from client: %s (invalid group %d)",
id(), cursorBuf, groupIdx);
directResponse(req, Response::InvalidScanCursor);
return true;
}
Expand All @@ -661,12 +675,26 @@ bool Handler::preHandleRequest(Request* req, const String& key)
serv = g->getMaster();
}
if (!serv) {
logDebug("h %d SCAN cursor from client: %s (no server available)",
id(), cursorBuf);
directResponse(req, Response::ScanEnd);
return true;
}
if (ConnectConnection* s = getConnectConnection(req, serv)) {
if (cursor != 0) {
req->adjustScanCursor(cursor >> Const::ServGroupBits);
__uint128_t actualCursor = cursor >> Const::ServGroupBits;

// Debug log: cursor to be sent to backend server
char actualCursorBuf[64];
Util::uint128ToString(actualCursor, actualCursorBuf);

logDebug("h %d SCAN cursor from client: %s, group: %d, cursor to server: %s",
id(), cursorBuf, groupIdx, actualCursorBuf);

req->adjustScanCursor(actualCursor);
} else {
logDebug("h %d SCAN cursor from client: 0, group: %d, cursor to server: 0",
id(), groupIdx);
}
handleRequest(req, s);
} else {
Expand Down Expand Up @@ -814,21 +842,52 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res)
} else if (req->type() == Command::Scan && s && res->type() == Reply::Array) {
SegmentStr<64> str(res->body());
if (const char* p = strchr(str.data() + sizeof("*2\r\n$"), '\n')) {
long cursor = atol(p + 1);
// Use 128-bit integer to handle large cursor values (Kvrocks may return cursor close to 64-bit limit)
__uint128_t cursor = 0;
const char* cursorStr = p + 1;
const char* cursorStart = cursorStr;
while (*cursorStr >= '0' && *cursorStr <= '9') {
cursor = cursor * 10 + (*cursorStr - '0');
cursorStr++;
}

// Debug log: cursor received from backend server
char serverCursorBuf[64];
int serverCursorLen = cursorStr - cursorStart;
if (serverCursorLen > 0 && serverCursorLen < 64) {
memcpy(serverCursorBuf, cursorStart, serverCursorLen);
serverCursorBuf[serverCursorLen] = '\0';
} else {
strcpy(serverCursorBuf, "0");
}

auto g = s->server()->group();
int currentGroupId = g ? g->id() : -1;

if (cursor != 0 || (g = sp->getGroup(g->id() + 1)) != nullptr) {
// Use 128-bit integer for left shift, will not overflow
cursor <<= Const::ServGroupBits;
cursor |= g->id();
if ((p = strchr(p, '*')) != nullptr) {
char buf[32];
int n = snprintf(buf, sizeof(buf), "%ld", cursor);
// Convert 128-bit integer to string
char buf[64]; // 128-bit needs at most 39 decimal digits
int n = Util::uint128ToString(cursor, buf);

// Debug log: cursor to be sent to client
logDebug("h %d SCAN cursor from server: %s, group: %d, cursor to client: %s",
id(), serverCursorBuf, g->id(), buf);

res->head().fset(nullptr,
"*2\r\n"
"$%d\r\n"
"%s\r\n",
n, buf);
res->body().cut(p - str.data());
}
} else {
// Scan completed, return 0 to client
logDebug("h %d SCAN cursor from server: %s, group: %d, cursor to client: 0 (scan complete)",
id(), serverCursorBuf, currentGroupId);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/Request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,10 @@ void Request::setSentinelSlaves(const String& master)
master.length(), master.length(), master.data());
}

void Request::adjustScanCursor(long cursor)
void Request::adjustScanCursor(__uint128_t cursor)
{
char buf[32];
int n = snprintf(buf, sizeof(buf), "%ld", cursor);
char buf[64]; // 128-bit integer needs at most 39 decimal digits
int n = Util::uint128ToString(cursor, buf);
if (mHead.empty()) {
SegmentStr<64> str(mReq);
const char* p = strchr(str.data(), '$');
Expand Down
2 changes: 1 addition & 1 deletion src/Request.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Request :
void setSentinels(const String& master);
void setSentinelGetMaster(const String& master);
void setSentinelSlaves(const String& master);
void adjustScanCursor(long cursor);
void adjustScanCursor(__uint128_t cursor);
void follow(Request* leader);
void setResponse(Response* res);
bool send(Socket* s);
Expand Down
25 changes: 25 additions & 0 deletions src/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,31 @@ namespace Util
{
return duration_cast<microseconds>(steady_clock::now().time_since_epoch()).count();
}

// Convert 128-bit unsigned integer to string
// Returns the length of the converted string (excluding null terminator)
// The buffer must be at least 40 bytes (39 digits + null terminator for max __uint128_t value)
inline int uint128ToString(__uint128_t value, char* buf)
{
int n = 0;
if (value == 0) {
buf[n++] = '0';
} else {
char temp[64];
int tempLen = 0;
__uint128_t v = value;
while (v > 0) {
temp[tempLen++] = '0' + (v % 10);
v /= 10;
}
// Reverse digits (extracted from low to high)
for (int i = tempLen - 1; i >= 0; i--) {
buf[n++] = temp[i];
}
}
buf[n] = '\0';
return n;
}
};

#endif