From c3ee62d96652586f90d61f8712dcd22412fe257d Mon Sep 17 00:00:00 2001 From: UnderLord Date: Mon, 24 Nov 2025 19:41:13 -0800 Subject: [PATCH 1/8] chore(palantir): update contracts to v1.3.0 with envelope.proto and error.proto --- docs/palantir | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/palantir b/docs/palantir index 0a544e9..4b5bfad 160000 --- a/docs/palantir +++ b/docs/palantir @@ -1 +1 @@ -Subproject commit 0a544e9f66cdd8401745370d1ccb7e121dc252e0 +Subproject commit 4b5bfadcd210864d43583517eaa4c31fb11795cb From 8a966375049a2ff1190116818118b1c83e196924 Mon Sep 17 00:00:00 2001 From: UnderLord Date: Mon, 24 Nov 2025 19:44:18 -0800 Subject: [PATCH 2/8] build(proto): add envelope.proto and error.proto codegen for Sprint 4.5 --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 900c66a..b4aa390 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -237,7 +237,7 @@ if(BEDROCK_WITH_TRANSPORT_DEPS) target_link_libraries(bedrock_palantir_proto PUBLIC ${ABSEIL_LIBS}) endif() - message(STATUS "Palantir proto codegen enabled: Capabilities.proto -> bedrock_palantir_proto") + message(STATUS "Palantir proto codegen enabled: Capabilities, XYSine, Envelope, Error -> bedrock_palantir_proto") # --------------------------------------- # CapabilitiesService library From 9755652909c548554b52c7291bcd8ac422f016df Mon Sep 17 00:00:00 2001 From: UnderLord Date: Mon, 24 Nov 2025 19:45:04 -0800 Subject: [PATCH 3/8] build(proto): add envelope.proto and error.proto codegen (fix) --- CMakeLists.txt | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index b4aa390..3f9a650 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -148,6 +148,8 @@ if(BEDROCK_WITH_TRANSPORT_DEPS) set(CAPABILITIES_PROTO "${PALANTIR_PROTO_DIR}/palantir/capabilities.proto") set(XYSINE_PROTO "${PALANTIR_PROTO_DIR}/palantir/xysine.proto") + set(ENVELOPE_PROTO "${PALANTIR_PROTO_DIR}/palantir/envelope.proto") + set(ERROR_PROTO "${PALANTIR_PROTO_DIR}/palantir/error.proto") if(EXISTS "${CAPABILITIES_PROTO}") add_custom_command( @@ -177,6 +179,34 @@ if(BEDROCK_WITH_TRANSPORT_DEPS) ) endif() + if(EXISTS "${ENVELOPE_PROTO}") + add_custom_command( + OUTPUT + "${PALANTIR_PROTO_OUT_DIR}/palantir/envelope.pb.cc" + "${PALANTIR_PROTO_OUT_DIR}/palantir/envelope.pb.h" + COMMAND "${PROTOC_EXECUTABLE}" + --proto_path="${PALANTIR_PROTO_DIR}" + --cpp_out="${PALANTIR_PROTO_OUT_DIR}" + "${ENVELOPE_PROTO}" + DEPENDS "${ENVELOPE_PROTO}" + COMMENT "Generating C++ from Palantir envelope.proto" + ) + endif() + + if(EXISTS "${ERROR_PROTO}") + add_custom_command( + OUTPUT + "${PALANTIR_PROTO_OUT_DIR}/palantir/error.pb.cc" + "${PALANTIR_PROTO_OUT_DIR}/palantir/error.pb.h" + COMMAND "${PROTOC_EXECUTABLE}" + --proto_path="${PALANTIR_PROTO_DIR}" + --cpp_out="${PALANTIR_PROTO_OUT_DIR}" + "${ERROR_PROTO}" + DEPENDS "${ERROR_PROTO}" + COMMENT "Generating C++ from Palantir error.proto" + ) + endif() + # Build proto library with all available proto files set(PROTO_SOURCES) if(EXISTS "${CAPABILITIES_PROTO}") @@ -185,6 +215,12 @@ if(BEDROCK_WITH_TRANSPORT_DEPS) if(EXISTS "${XYSINE_PROTO}") list(APPEND PROTO_SOURCES "${PALANTIR_PROTO_OUT_DIR}/palantir/xysine.pb.cc") endif() + if(EXISTS "${ENVELOPE_PROTO}") + list(APPEND PROTO_SOURCES "${PALANTIR_PROTO_OUT_DIR}/palantir/envelope.pb.cc") + endif() + if(EXISTS "${ERROR_PROTO}") + list(APPEND PROTO_SOURCES "${PALANTIR_PROTO_OUT_DIR}/palantir/error.pb.cc") + endif() if(PROTO_SOURCES) add_library(bedrock_palantir_proto STATIC From 5778100369b697d67e0be5435e7c6d3e23b3d44a Mon Sep 17 00:00:00 2001 From: UnderLord Date: Mon, 24 Nov 2025 20:49:24 -0800 Subject: [PATCH 4/8] feat(transport): implement MessageType wire protocol and ErrorResponse (Sprint 4.5) - Add MessageType enum to wire protocol (4-byte length + 1-byte type + payload) - Implement message size limits (10MB max) - Add ErrorResponse handling with ErrorCode enum - Add backward compatibility for old format (deprecation warning) - Update sendMessage/readMessage to use new protocol --- src/palantir/PalantirServer.cpp | 213 +++++++++++++++++++++++++++++--- src/palantir/PalantirServer.hpp | 12 +- 2 files changed, 206 insertions(+), 19 deletions(-) diff --git a/src/palantir/PalantirServer.cpp b/src/palantir/PalantirServer.cpp index 6add816..e516903 100644 --- a/src/palantir/PalantirServer.cpp +++ b/src/palantir/PalantirServer.cpp @@ -12,6 +12,8 @@ #ifdef BEDROCK_WITH_TRANSPORT_DEPS #include "palantir/xysine.pb.h" +#include "palantir/envelope.pb.h" +#include "palantir/error.pb.h" #endif PalantirServer::PalantirServer(QObject *parent) @@ -164,11 +166,54 @@ void PalantirServer::onHeartbeatTimer() void PalantirServer::handleMessage(QLocalSocket* client, const QByteArray& message) { - // Parse message type and dispatch - // WP1: Handle CapabilitiesRequest and XYSineRequest - // Future: Add StartJob, Cancel, Ping/Pong when those proto messages are defined - #ifdef BEDROCK_WITH_TRANSPORT_DEPS + // Sprint 4.5: Try new format first (with MessageType) + palantir::MessageType messageType; + QByteArray payload; + + if (readMessageWithType(client, messageType, payload)) { + // New format: dispatch by MessageType + switch (messageType) { + case palantir::MessageType::CAPABILITIES_REQUEST: { + palantir::CapabilitiesRequest request; + if (request.ParseFromArray(payload.data(), payload.size())) { + handleCapabilitiesRequest(client); + } else { + sendErrorResponse(client, palantir::ErrorCode::PROTOBUF_PARSE_ERROR, + "Failed to parse CapabilitiesRequest"); + } + return; + } + case palantir::MessageType::XY_SINE_REQUEST: { + palantir::XYSineRequest request; + if (request.ParseFromArray(payload.data(), payload.size())) { + handleXYSineRequest(client, request); + } else { + sendErrorResponse(client, palantir::ErrorCode::PROTOBUF_PARSE_ERROR, + "Failed to parse XYSineRequest"); + } + return; + } + case palantir::MessageType::ERROR_RESPONSE: + // Server should not receive ErrorResponse + qDebug() << "Server received ErrorResponse (unexpected)"; + return; + default: + sendErrorResponse(client, palantir::ErrorCode::UNKNOWN_MESSAGE_TYPE, + QString("Unknown message type: %1").arg(static_cast(messageType))); + return; + } + } + + // Backward compatibility: try old format (no MessageType) + // Log deprecation warning + static bool deprecationWarningLogged = false; + if (!deprecationWarningLogged) { + qWarning() << "DEPRECATED: Received message in old format (no MessageType). " + << "This format will be removed in Sprint 4.6. Please upgrade to new format."; + deprecationWarningLogged = true; + } + // Try to parse as XYSineRequest (check before CapabilitiesRequest for specificity) palantir::XYSineRequest xySineRequest; if (xySineRequest.ParseFromArray(message.data(), message.size())) { @@ -182,9 +227,13 @@ void PalantirServer::handleMessage(QLocalSocket* client, const QByteArray& messa handleCapabilitiesRequest(client); return; } -#endif - qDebug() << "Unknown message type received"; + // Unknown message format + sendErrorResponse(client, palantir::ErrorCode::UNKNOWN_MESSAGE_TYPE, + "Failed to parse message (unknown format)"); +#else + qDebug() << "Message received but transport deps disabled"; +#endif } // WP1: StartJob handler disabled (proto message not yet defined) @@ -243,7 +292,7 @@ void PalantirServer::handleCapabilitiesRequest(QLocalSocket* client) #ifdef BEDROCK_WITH_TRANSPORT_DEPS bedrock::palantir::CapabilitiesService service; palantir::CapabilitiesResponse response = service.getCapabilities(); - sendMessage(client, response); + sendMessage(client, palantir::MessageType::CAPABILITIES_RESPONSE, response); #else qWarning() << "Capabilities requested but transport deps disabled"; #endif @@ -269,7 +318,7 @@ void PalantirServer::handleXYSineRequest(QLocalSocket* client, const palantir::X response.set_status("OK"); // Send response - sendMessage(client, response); + sendMessage(client, palantir::MessageType::XY_SINE_RESPONSE, response); #else qWarning() << "XY Sine requested but transport deps disabled"; #endif @@ -485,7 +534,7 @@ void PalantirServer::computeXYSine(const palantir::ComputeSpec& spec, std::vecto */ #ifdef BEDROCK_WITH_TRANSPORT_DEPS -void PalantirServer::sendMessage(QLocalSocket* client, const google::protobuf::Message& message) +void PalantirServer::sendMessage(QLocalSocket* client, palantir::MessageType type, const google::protobuf::Message& message) { if (!client || client->state() != QLocalSocket::ConnectedState) { return; @@ -495,15 +544,30 @@ void PalantirServer::sendMessage(QLocalSocket* client, const google::protobuf::M std::string serialized; if (!message.SerializeToString(&serialized)) { qDebug() << "Failed to serialize message"; + sendErrorResponse(client, palantir::ErrorCode::INTERNAL_ERROR, + "Failed to serialize message"); return; } - // Create length-prefixed message - QByteArray data; - uint32_t length = static_cast(serialized.size()); + // Check size limit + if (serialized.size() > MAX_MESSAGE_SIZE) { + qDebug() << "Message too large:" << serialized.size(); + sendErrorResponse(client, palantir::ErrorCode::MESSAGE_TOO_LARGE, + QString("Message size %1 exceeds limit %2") + .arg(serialized.size()).arg(MAX_MESSAGE_SIZE)); + return; + } - // Write length (little-endian) - data.append(reinterpret_cast(&length), 4); + // Create length-prefixed message with MessageType + QByteArray data; + uint32_t totalLength = static_cast(serialized.size() + 1); // +1 for MessageType byte + uint8_t typeByte = static_cast(type); + + // Write length (little-endian, 4 bytes) + data.append(reinterpret_cast(&totalLength), 4); + // Write MessageType (1 byte) + data.append(reinterpret_cast(&typeByte), 1); + // Write serialized message data.append(serialized.data(), serialized.size()); // Send data @@ -512,8 +576,65 @@ void PalantirServer::sendMessage(QLocalSocket* client, const google::protobuf::M qDebug() << "Failed to send complete message"; } } -#endif // BEDROCK_WITH_TRANSPORT_DEPS +void PalantirServer::sendErrorResponse(QLocalSocket* client, palantir::ErrorCode errorCode, + const QString& message, const QString& details) +{ + palantir::ErrorResponse error; + error.set_error_code(errorCode); + error.set_message(message.toStdString()); + if (!details.isEmpty()) { + error.set_details(details.toStdString()); + } + sendMessage(client, palantir::MessageType::ERROR_RESPONSE, error); +} + +bool PalantirServer::readMessageWithType(QLocalSocket* client, palantir::MessageType& outType, QByteArray& outPayload) +{ + if (!client) { + return false; + } + + QByteArray& buffer = clientBuffers_[client]; + + // Need at least 5 bytes: 4-byte length + 1-byte MessageType + if (buffer.size() < 5) { + return false; + } + + // Read length (little-endian) + uint32_t totalLength; + std::memcpy(&totalLength, buffer.data(), 4); + + // Check size limit + if (totalLength > MAX_MESSAGE_SIZE + 1) { // +1 for MessageType byte + qDebug() << "Message length exceeds limit:" << totalLength; + sendErrorResponse(client, palantir::ErrorCode::MESSAGE_TOO_LARGE, + QString("Message length %1 exceeds limit %2") + .arg(totalLength).arg(MAX_MESSAGE_SIZE + 1)); + buffer.clear(); // Clear buffer to prevent further parsing + return false; + } + + // Check if we have the complete message + if (buffer.size() < 4 + totalLength) { + return false; + } + + // Read MessageType (1 byte after length) + uint8_t typeByte = static_cast(buffer[4]); + outType = static_cast(typeByte); + + // Read payload (everything after MessageType) + outPayload = buffer.mid(5, totalLength - 1); + buffer.remove(0, 4 + totalLength); + + return true; +} + +// Backward compatibility: read old format (no MessageType, just length + payload) +// This function is kept for backward compatibility but is deprecated +// It reads from the buffer directly (used by parseIncomingData for old format) QByteArray PalantirServer::readMessage(QLocalSocket* client) { if (!client) { @@ -527,7 +648,14 @@ QByteArray PalantirServer::readMessage(QLocalSocket* client) } uint32_t length; - memcpy(&length, buffer.data(), 4); + std::memcpy(&length, buffer.data(), 4); + + // Check size limit + if (length > MAX_MESSAGE_SIZE) { + qDebug() << "Old-format message length exceeds limit:" << length; + buffer.clear(); + return QByteArray(); + } if (buffer.size() < 4 + length) { return QByteArray(); @@ -538,6 +666,7 @@ QByteArray PalantirServer::readMessage(QLocalSocket* client) return message; } +#endif // BEDROCK_WITH_TRANSPORT_DEPS void PalantirServer::parseIncomingData(QLocalSocket* client) { @@ -548,14 +677,64 @@ void PalantirServer::parseIncomingData(QLocalSocket* client) QByteArray& buffer = clientBuffers_[client]; buffer += client->readAll(); +#ifdef BEDROCK_WITH_TRANSPORT_DEPS + // Try new format first (5 bytes minimum: 4-byte length + 1-byte MessageType) + while (buffer.size() >= 5) { + palantir::MessageType messageType; + QByteArray payload; + if (readMessageWithType(client, messageType, payload)) { + // Dispatch by MessageType + switch (messageType) { + case palantir::MessageType::CAPABILITIES_REQUEST: { + palantir::CapabilitiesRequest request; + if (request.ParseFromArray(payload.data(), payload.size())) { + handleCapabilitiesRequest(client); + } else { + sendErrorResponse(client, palantir::ErrorCode::PROTOBUF_PARSE_ERROR, + "Failed to parse CapabilitiesRequest"); + } + continue; + } + case palantir::MessageType::XY_SINE_REQUEST: { + palantir::XYSineRequest request; + if (request.ParseFromArray(payload.data(), payload.size())) { + handleXYSineRequest(client, request); + } else { + sendErrorResponse(client, palantir::ErrorCode::PROTOBUF_PARSE_ERROR, + "Failed to parse XYSineRequest"); + } + continue; + } + case palantir::MessageType::ERROR_RESPONSE: + qDebug() << "Server received ErrorResponse (unexpected)"; + continue; + default: + sendErrorResponse(client, palantir::ErrorCode::UNKNOWN_MESSAGE_TYPE, + QString("Unknown message type: %1").arg(static_cast(messageType))); + continue; + } + } + break; // Not enough data yet + } + + // Fallback: try old format (backward compatibility) while (buffer.size() >= 4) { QByteArray message = readMessage(client); if (message.isEmpty()) { break; } - handleMessage(client, message); } +#else + // Old format only (when transport deps disabled) + while (buffer.size() >= 4) { + QByteArray message = readMessage(client); + if (message.isEmpty()) { + break; + } + handleMessage(client, message); + } +#endif } #include "PalantirServer.moc" diff --git a/src/palantir/PalantirServer.hpp b/src/palantir/PalantirServer.hpp index d4cfdd6..baa374e 100644 --- a/src/palantir/PalantirServer.hpp +++ b/src/palantir/PalantirServer.hpp @@ -15,6 +15,8 @@ #ifdef BEDROCK_WITH_TRANSPORT_DEPS #include "palantir/capabilities.pb.h" #include "palantir/xysine.pb.h" +#include "palantir/envelope.pb.h" +#include "palantir/error.pb.h" #include "CapabilitiesService.hpp" #endif @@ -70,11 +72,17 @@ private slots: // Protocol helpers #ifdef BEDROCK_WITH_TRANSPORT_DEPS - void sendMessage(QLocalSocket* client, const google::protobuf::Message& message); -#endif + void sendMessage(QLocalSocket* client, palantir::MessageType type, const google::protobuf::Message& message); + void sendErrorResponse(QLocalSocket* client, palantir::ErrorCode errorCode, const QString& message, const QString& details = QString()); + bool readMessageWithType(QLocalSocket* client, palantir::MessageType& outType, QByteArray& outPayload); + // Backward compatibility: read old format (no MessageType) QByteArray readMessage(QLocalSocket* client); +#endif void parseIncomingData(QLocalSocket* client); + // Constants + static constexpr uint32_t MAX_MESSAGE_SIZE = 10 * 1024 * 1024; // 10MB + // Server state std::unique_ptr server_; QTimer heartbeatTimer_; From d7d8664d2ea18618fc57a68ce0f4a6fbcb4689e9 Mon Sep 17 00:00:00 2001 From: UnderLord Date: Mon, 24 Nov 2025 20:50:43 -0800 Subject: [PATCH 5/8] feat(transport): add thread safety to PalantirServer (Sprint 4.5) - Add clientBuffersMutex_ to protect clientBuffers_ access - Ensure all client buffer access is thread-safe - Protect job-related data structures with jobMutex_ - Fix potential race conditions in onClientDisconnected - Validate client pointers before use in sendMessage - Clear all data structures in stopServer() --- src/palantir/PalantirServer.cpp | 215 ++++++++++++++++++++++++-------- src/palantir/PalantirServer.hpp | 6 +- 2 files changed, 164 insertions(+), 57 deletions(-) diff --git a/src/palantir/PalantirServer.cpp b/src/palantir/PalantirServer.cpp index e516903..79f137c 100644 --- a/src/palantir/PalantirServer.cpp +++ b/src/palantir/PalantirServer.cpp @@ -73,11 +73,23 @@ void PalantirServer::stopServer() } } - // Wait for job threads to finish - for (auto& [jobId, thread] : jobThreads_) { - if (thread.joinable()) { - thread.join(); + // Wait for job threads to finish (thread-safe) + { + std::lock_guard lock(jobMutex_); + for (auto& [jobId, thread] : jobThreads_) { + if (thread.joinable()) { + thread.join(); + } } + jobThreads_.clear(); + jobClients_.clear(); + jobCancelled_.clear(); + } + + // Clear client buffers (thread-safe) + { + std::lock_guard lock(clientBuffersMutex_); + clientBuffers_.clear(); } // Stop server @@ -114,8 +126,11 @@ void PalantirServer::onNewConnection() connect(client, &QLocalSocket::disconnected, this, &PalantirServer::onClientDisconnected); connect(client, &QLocalSocket::readyRead, this, &PalantirServer::onClientReadyRead); - // Initialize client buffer - clientBuffers_[client] = QByteArray(); + // Initialize client buffer (thread-safe) + { + std::lock_guard lock(clientBuffersMutex_); + clientBuffers_[client] = QByteArray(); + } emit clientConnected(); qDebug() << "Client connected"; @@ -128,21 +143,28 @@ void PalantirServer::onClientDisconnected() return; } - // Remove client from tracking - clientBuffers_.erase(client); + // Remove client from tracking (thread-safe) + { + std::lock_guard lock(clientBuffersMutex_); + clientBuffers_.erase(client); + } - // Cancel jobs for this client - for (auto it = jobClients_.begin(); it != jobClients_.end();) { - if (it->second == client) { - QString jobId = it->first; - jobCancelled_[jobId] = true; - jobClients_.erase(it++); - } else { - ++it; + // Cancel jobs for this client (thread-safe) + { + std::lock_guard lock(jobMutex_); + for (auto it = jobClients_.begin(); it != jobClients_.end();) { + if (it->second == client) { + QString jobId = it->first; + jobCancelled_[jobId] = true; + jobClients_.erase(it++); + } else { + ++it; + } } } - client->deleteLater(); + // Qt will handle socket deletion via parent-child relationship + // No need to call deleteLater() explicitly - client is child of server emit clientDisconnected(); qDebug() << "Client disconnected"; } @@ -536,10 +558,26 @@ void PalantirServer::computeXYSine(const palantir::ComputeSpec& spec, std::vecto #ifdef BEDROCK_WITH_TRANSPORT_DEPS void PalantirServer::sendMessage(QLocalSocket* client, palantir::MessageType type, const google::protobuf::Message& message) { - if (!client || client->state() != QLocalSocket::ConnectedState) { + if (!client) { + return; + } + + // Check if client is still valid and connected (thread-safe check) + // Note: QLocalSocket::state() is thread-safe for reading + if (client->state() != QLocalSocket::ConnectedState) { + qDebug() << "Attempted to send message to disconnected client"; return; } + // Verify client is still in our tracking (optional safety check) + { + std::lock_guard lock(clientBuffersMutex_); + if (clientBuffers_.find(client) == clientBuffers_.end()) { + qDebug() << "Attempted to send message to unknown client"; + return; + } + } + // Serialize message std::string serialized; if (!message.SerializeToString(&serialized)) { @@ -595,7 +633,13 @@ bool PalantirServer::readMessageWithType(QLocalSocket* client, palantir::Message return false; } - QByteArray& buffer = clientBuffers_[client]; + // Thread-safe access to client buffer + std::lock_guard lock(clientBuffersMutex_); + auto it = clientBuffers_.find(client); + if (it == clientBuffers_.end()) { + return false; // Client not found (may have disconnected) + } + QByteArray& buffer = it->second; // Need at least 5 bytes: 4-byte length + 1-byte MessageType if (buffer.size() < 5) { @@ -641,7 +685,13 @@ QByteArray PalantirServer::readMessage(QLocalSocket* client) return QByteArray(); } - QByteArray& buffer = clientBuffers_[client]; + // Thread-safe access to client buffer + std::lock_guard lock(clientBuffersMutex_); + auto it = clientBuffers_.find(client); + if (it == clientBuffers_.end()) { + return QByteArray(); // Client not found (may have disconnected) + } + QByteArray& buffer = it->second; if (buffer.size() < 4) { return QByteArray(); @@ -674,51 +724,67 @@ void PalantirServer::parseIncomingData(QLocalSocket* client) return; } - QByteArray& buffer = clientBuffers_[client]; - buffer += client->readAll(); + // Read all available data first + QByteArray newData = client->readAll(); + if (newData.isEmpty()) { + return; + } + + // Thread-safe access to client buffer + std::lock_guard lock(clientBuffersMutex_); + auto it = clientBuffers_.find(client); + if (it == clientBuffers_.end()) { + // Client not found (may have disconnected) + return; + } + QByteArray& buffer = it->second; + buffer += newData; #ifdef BEDROCK_WITH_TRANSPORT_DEPS // Try new format first (5 bytes minimum: 4-byte length + 1-byte MessageType) - while (buffer.size() >= 5) { + // Note: readMessageWithType will lock clientBuffersMutex_ internally + while (true) { palantir::MessageType messageType; QByteArray payload; - if (readMessageWithType(client, messageType, payload)) { - // Dispatch by MessageType - switch (messageType) { - case palantir::MessageType::CAPABILITIES_REQUEST: { - palantir::CapabilitiesRequest request; - if (request.ParseFromArray(payload.data(), payload.size())) { - handleCapabilitiesRequest(client); - } else { - sendErrorResponse(client, palantir::ErrorCode::PROTOBUF_PARSE_ERROR, - "Failed to parse CapabilitiesRequest"); - } - continue; + if (!readMessageWithType(client, messageType, payload)) { + break; // Not enough data or client disconnected + } + + // Dispatch by MessageType + switch (messageType) { + case palantir::MessageType::CAPABILITIES_REQUEST: { + palantir::CapabilitiesRequest request; + if (request.ParseFromArray(payload.data(), payload.size())) { + handleCapabilitiesRequest(client); + } else { + sendErrorResponse(client, palantir::ErrorCode::PROTOBUF_PARSE_ERROR, + "Failed to parse CapabilitiesRequest"); } - case palantir::MessageType::XY_SINE_REQUEST: { - palantir::XYSineRequest request; - if (request.ParseFromArray(payload.data(), payload.size())) { - handleXYSineRequest(client, request); - } else { - sendErrorResponse(client, palantir::ErrorCode::PROTOBUF_PARSE_ERROR, - "Failed to parse XYSineRequest"); - } - continue; + continue; + } + case palantir::MessageType::XY_SINE_REQUEST: { + palantir::XYSineRequest request; + if (request.ParseFromArray(payload.data(), payload.size())) { + handleXYSineRequest(client, request); + } else { + sendErrorResponse(client, palantir::ErrorCode::PROTOBUF_PARSE_ERROR, + "Failed to parse XYSineRequest"); } - case palantir::MessageType::ERROR_RESPONSE: - qDebug() << "Server received ErrorResponse (unexpected)"; - continue; - default: - sendErrorResponse(client, palantir::ErrorCode::UNKNOWN_MESSAGE_TYPE, - QString("Unknown message type: %1").arg(static_cast(messageType))); - continue; + continue; } + case palantir::MessageType::ERROR_RESPONSE: + qDebug() << "Server received ErrorResponse (unexpected)"; + continue; + default: + sendErrorResponse(client, palantir::ErrorCode::UNKNOWN_MESSAGE_TYPE, + QString("Unknown message type: %1").arg(static_cast(messageType))); + continue; } - break; // Not enough data yet } // Fallback: try old format (backward compatibility) - while (buffer.size() >= 4) { + // Note: readMessage will lock clientBuffersMutex_ internally + while (true) { QByteArray message = readMessage(client); if (message.isEmpty()) { break; @@ -727,12 +793,51 @@ void PalantirServer::parseIncomingData(QLocalSocket* client) } #else // Old format only (when transport deps disabled) + // Read data first + QByteArray newData = client->readAll(); + if (newData.isEmpty()) { + return; + } + + // Thread-safe access to client buffer + std::lock_guard lock(clientBuffersMutex_); + auto it = clientBuffers_.find(client); + if (it == clientBuffers_.end()) { + return; // Client not found (may have disconnected) + } + QByteArray& buffer = it->second; + buffer += newData; + + // Process messages (readMessage will not be called here since we're already locked) + // Instead, parse directly from buffer while (buffer.size() >= 4) { - QByteArray message = readMessage(client); - if (message.isEmpty()) { + uint32_t length; + std::memcpy(&length, buffer.data(), 4); + + if (length > MAX_MESSAGE_SIZE) { + qDebug() << "Old-format message length exceeds limit:" << length; + buffer.clear(); break; } + + if (buffer.size() < 4 + length) { + break; // Not enough data yet + } + + QByteArray message = buffer.mid(4, length); + buffer.remove(0, 4 + length); + + // Release lock before calling handleMessage (which may take time) + lock.~lock_guard(); handleMessage(client, message); + // Re-acquire lock for next iteration + new (std::addressof(lock)) std::lock_guard(clientBuffersMutex_); + // Re-find iterator (client may have been removed) + it = clientBuffers_.find(client); + if (it == clientBuffers_.end()) { + break; // Client disconnected + } + buffer = it->second; } #endif } diff --git a/src/palantir/PalantirServer.hpp b/src/palantir/PalantirServer.hpp index baa374e..3130f93 100644 --- a/src/palantir/PalantirServer.hpp +++ b/src/palantir/PalantirServer.hpp @@ -88,8 +88,10 @@ private slots: QTimer heartbeatTimer_; std::atomic running_; - // Client management + // Client management (thread-safe access required) std::map clientBuffers_; + std::mutex clientBuffersMutex_; // Protects clientBuffers_ access + std::map jobClients_; // Job tracking (WP1: disabled - proto messages not yet defined) @@ -98,7 +100,7 @@ private slots: // Threading std::map jobThreads_; - std::mutex jobMutex_; + std::mutex jobMutex_; // Protects jobClients_, jobCancelled_, jobThreads_ // Capabilities int maxConcurrency_; From 9065534c3313124cb4d47c1b451f8994487a270a Mon Sep 17 00:00:00 2001 From: UnderLord Date: Mon, 24 Nov 2025 20:50:56 -0800 Subject: [PATCH 6/8] fix(transport): fix lock re-acquisition in old format path --- src/palantir/PalantirServer.cpp | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/palantir/PalantirServer.cpp b/src/palantir/PalantirServer.cpp index 79f137c..9b8c36e 100644 --- a/src/palantir/PalantirServer.cpp +++ b/src/palantir/PalantirServer.cpp @@ -808,8 +808,9 @@ void PalantirServer::parseIncomingData(QLocalSocket* client) QByteArray& buffer = it->second; buffer += newData; - // Process messages (readMessage will not be called here since we're already locked) - // Instead, parse directly from buffer + // Process messages (parse directly from buffer while holding lock) + // Extract messages to process outside the lock + std::vector messagesToProcess; while (buffer.size() >= 4) { uint32_t length; std::memcpy(&length, buffer.data(), 4); @@ -826,18 +827,13 @@ void PalantirServer::parseIncomingData(QLocalSocket* client) QByteArray message = buffer.mid(4, length); buffer.remove(0, 4 + length); - - // Release lock before calling handleMessage (which may take time) - lock.~lock_guard(); + messagesToProcess.push_back(message); + } + // Lock is released here when lock goes out of scope + + // Process messages outside the lock + for (const QByteArray& message : messagesToProcess) { handleMessage(client, message); - // Re-acquire lock for next iteration - new (std::addressof(lock)) std::lock_guard(clientBuffersMutex_); - // Re-find iterator (client may have been removed) - it = clientBuffers_.find(client); - if (it == clientBuffers_.end()) { - break; // Client disconnected - } - buffer = it->second; } #endif } From e08503d61faecbb6a4a14df0a57536ee040b6715 Mon Sep 17 00:00:00 2001 From: UnderLord Date: Tue, 25 Nov 2025 13:00:01 -0800 Subject: [PATCH 7/8] Sprint 4.5: finalize integration tests, deadlock fix, and changelog --- BEDROCK_SPRINT4.5_PLAN.md | 369 +++++++++++++++++ CHANGELOG.md | 16 + CMakeLists.txt | 10 + src/palantir/CapabilitiesService.cpp | 2 +- src/palantir/CapabilitiesService.hpp | 2 +- src/palantir/EnvelopeHelpers.cpp | 99 +++++ src/palantir/EnvelopeHelpers.hpp | 48 +++ src/palantir/PalantirServer.cpp | 387 +++++++----------- src/palantir/PalantirServer.hpp | 14 +- tests/CMakeLists.txt | 8 + tests/integration/CMakeLists.txt | 44 ++ .../CapabilitiesIntegrationTest.cpp | 110 +++++ tests/integration/IntegrationTestClient.cpp | 289 +++++++++++++ tests/integration/IntegrationTestClient.hpp | 84 ++++ .../IntegrationTestServerFixture.cpp | 86 ++++ .../IntegrationTestServerFixture.hpp | 55 +++ tests/integration/XYSineIntegrationTest.cpp | 143 +++++++ tests/integration/integration_main.cpp | 26 ++ tests/palantir/EnvelopeHelpers_test.cpp | 311 ++++++++++++++ 19 files changed, 1858 insertions(+), 245 deletions(-) create mode 100644 BEDROCK_SPRINT4.5_PLAN.md create mode 100644 src/palantir/EnvelopeHelpers.cpp create mode 100644 src/palantir/EnvelopeHelpers.hpp create mode 100644 tests/integration/CMakeLists.txt create mode 100644 tests/integration/CapabilitiesIntegrationTest.cpp create mode 100644 tests/integration/IntegrationTestClient.cpp create mode 100644 tests/integration/IntegrationTestClient.hpp create mode 100644 tests/integration/IntegrationTestServerFixture.cpp create mode 100644 tests/integration/IntegrationTestServerFixture.hpp create mode 100644 tests/integration/XYSineIntegrationTest.cpp create mode 100644 tests/integration/integration_main.cpp create mode 100644 tests/palantir/EnvelopeHelpers_test.cpp diff --git a/BEDROCK_SPRINT4.5_PLAN.md b/BEDROCK_SPRINT4.5_PLAN.md new file mode 100644 index 0000000..29ab391 --- /dev/null +++ b/BEDROCK_SPRINT4.5_PLAN.md @@ -0,0 +1,369 @@ +# Bedrock Sprint 4.5 Plan +## Remote Engine Hardening & Multi-Server Execution + +**Date**: 2025-11-24 +**Status**: Planning Complete +**Base Branch**: `main` → `sprint/4.5` + +--- + +## Executive Summary + +Bedrock Sprint 4.5 focuses on hardening the PalantirServer, implementing message envelope support, adding status/health endpoints, and implementing Thermal Diffusion computation with progress/cancel. + +**Bedrock-Specific Goals**: +- Harden `PalantirServer` with message envelope and size limits +- Clean up threading and ownership model +- Add status/health RPC endpoint +- Implement Thermal Diffusion computation (OpenMP parallelization) +- Support progress reporting and cancel handling + +--- + +## Branch Strategy + +### Sprint Branch Creation + +```bash +cd /Users/underlord/workspace/bedrock +git checkout main +git pull origin main +git checkout -b sprint/4.5 +git push -u origin sprint/4.5 +``` + +### Feature Branch Naming + +Feature branches from `sprint/4.5`: +- `feature/wp2a-message-envelope` +- `feature/wp2a-server-hardening` +- `feature/wp2c-status-rpc` +- `feature/wp2d-thermal-diffusion` +- `feature/wp2e-cmake-modularization` + +### PR Strategy + +- **Per-WP PRs**: Each WP may have multiple feature branches merged to `sprint/4.5`, then one PR per WP +- **Final PR**: `sprint/4.5` → `main` (comprehensive review after all WPs) + +--- + +## Work Package Breakdown + +### WP2.A – Transport & Server Hardening (Bedrock Side) + +**Goal**: Harden `PalantirServer` with message envelope, size limits, and error handling. + +**Chunks**: + +1. **WP2.A Chunk 1** – Update Palantir submodule to v1.3.0 + - Update `docs/palantir` submodule + - Verify `envelope.proto` and `error.proto` are available + - Update CMake proto codegen + +2. **WP2.A Chunk 2** – Implement message envelope in PalantirServer + - Update `handleMessage()` to parse `MessageEnvelope` + - Extract `MessageType` and dispatch to appropriate handler + - Update `sendMessage()` to wrap responses in `MessageEnvelope` + - Backward compatibility: Handle old messages (no envelope) gracefully + +3. **WP2.A Chunk 3** – Add message size limits + - Reject messages > max size (configurable, default 10MB) + - Send `ErrorResponse` for size limit violations + - Add config option: `--max-message-size` (CLI) + +4. **WP2.A Chunk 4** – PalantirServer threading & ownership cleanup + - Review and document ownership model + - Ensure `QLocalSocket` objects are properly parented to `PalantirServer` + - Fix any thread-safety issues (use Qt's queued connections) + - Add connection timeout handling (disconnect idle connections after 5 minutes) + +5. **WP2.A Chunk 5** – ErrorResponse integration + - Implement `ErrorResponse` proto handling + - Send `ErrorResponse` for all error conditions (size limit, parse error, unknown message type) + - Add error codes and user-friendly messages + +**Files Modified**: +- `src/palantir/PalantirServer.hpp` +- `src/palantir/PalantirServer.cpp` +- `src/palantir/bedrock_server.cpp` (add CLI options) +- `CMakeLists.txt` (proto codegen for envelope/error) + +**Testing**: +- Unit tests for envelope parsing/serialization +- Unit tests for message size limits +- Integration test: Old client → new server (backward compat) +- Integration test: New client → new server (envelope) + +--- + +### WP2.C – Bedrock Monitor & Observability (Bedrock Side) + +**Goal**: Add status/health RPC endpoint for observability. + +**Chunks**: + +1. **WP2.C Chunk 1** – Design status RPC + - Extend Capabilities or create new Status RPC + - Define `StatusRequest` and `StatusResponse` proto + - Include: server health, active connections, queue depth, uptime + - Tag as Palantir v1.4.0 (if separate from Capabilities) + +2. **WP2.C Chunk 2** – Implement status tracking in PalantirServer + - Track active connections (count) + - Track request count, error count + - Track server uptime (start time) + - Track queue depth (if async processing) + +3. **WP2.C Chunk 3** – Implement status RPC handler + - Add `handleStatusRequest()` method + - Build `StatusResponse` with current state + - Send response via `sendMessage()` + +4. **WP2.C Chunk 4** – Add status to Capabilities (alternative) + - If Status RPC is separate, skip this + - If Status is part of Capabilities, extend `CapabilitiesResponse` + - Update `CapabilitiesService` to include status + +**Files Modified**: +- `src/palantir/PalantirServer.hpp` (add status tracking) +- `src/palantir/PalantirServer.cpp` (implement status tracking and handler) +- `src/palantir/CapabilitiesService.cpp` (if status in Capabilities) +- `CMakeLists.txt` (proto codegen for status, if separate) + +**Testing**: +- Unit tests for status RPC +- Integration test: Status polling with server restart +- Integration test: Status accuracy (connection count, uptime) + +--- + +### WP2.D – Thermal Diffusion RPC (Bedrock Side) + +**Goal**: Implement Thermal Diffusion computation with progress/cancel. + +**Chunks**: + +1. **WP2.D Chunk 1** – Update Palantir submodule to v1.5.0 + - Update `docs/palantir` submodule + - Verify `thermal_diffusion.proto` is available + - Update CMake proto codegen + +2. **WP2.D Chunk 2** – Design Thermal Diffusion algorithm + - Finite difference method (2D grid) + - Time-stepping loop + - Boundary conditions (fixed temperature) + - Initial conditions (temperature field) + +3. **WP2.D Chunk 3** – Implement Thermal Diffusion computation + - Create `src/compute/ThermalDiffusionSolver.hpp/.cpp` + - Implement finite difference solver + - Add OpenMP parallelization (parallelize grid updates) + - Add progress callback support + +4. **WP2.D Chunk 4** – Integrate Thermal Diffusion into PalantirServer + - Add `handleThermalDiffusionRequest()` method + - Parse `ThermalDiffusionRequest` (grid size, time steps, boundary conditions) + - Call solver with progress callback + - Send `ThermalDiffusionProgress` messages during computation + - Send `ThermalDiffusionResponse` on completion + +5. **WP2.D Chunk 5** – Add progress reporting + - Progress callback sends `ThermalDiffusionProgress` via `sendMessage()` + - Progress includes: current step, percent complete, estimated time + - Throttle progress updates (max 10 updates per second) + +6. **WP2.D Chunk 6** – Add cancel support + - Check cancel flag during computation (atomic bool) + - Handle `CancelRequest` message + - Send `CancelResponse` on cancel + - Clean up computation thread on cancel + +7. **WP2.D Chunk 7** – Integration and testing + - End-to-end test: Thermal Diffusion with progress + - Test cancel during computation + - Performance test: OpenMP scaling (1, 2, 4, 8 threads) + +**Files Modified**: +- `src/compute/ThermalDiffusionSolver.hpp` (new) +- `src/compute/ThermalDiffusionSolver.cpp` (new) +- `src/palantir/PalantirServer.hpp` (add handler) +- `src/palantir/PalantirServer.cpp` (implement handler) +- `CMakeLists.txt` (new solver sources, proto codegen) + +**Testing**: +- Unit tests for Thermal Diffusion algorithm +- Unit tests for OpenMP parallelization +- Integration test: Progress streaming +- Integration test: Cancel during computation +- Performance test: OpenMP scaling + +--- + +### WP2.E – Cleanup & Formal Review (Bedrock Side) + +**Goal**: Clean up code, modularize CMake, add guardrails. + +**Chunks**: + +1. **WP2.E Chunk 1** – CMake modularization + - Extract proto codegen to `cmake/PalantirProto.cmake` + - Extract abseil linking to `cmake/AbseilLinking.cmake` + - Update `CMakeLists.txt` to use modules + +2. **WP2.E Chunk 2** – Code cleanup + - Remove TODOs (or convert to GitHub issues) + - Fix compiler warnings + - Document public APIs (Doxygen comments) + - Format code (clang-format) + +3. **WP2.E Chunk 3** – Guardrails and formal review + - Add assertions for invariants (assert, Q_ASSERT) + - Add bounds checking (vector access, grid bounds) + - Prepare review checklist + - Generate architecture diagrams (if needed) + +**Files Modified**: +- `cmake/PalantirProto.cmake` (new) +- `cmake/AbseilLinking.cmake` (new) +- `CMakeLists.txt` (use modules) +- Various source files (cleanup, documentation) + +**Testing**: Ensure all existing tests still pass + +--- + +## Testing & CI Strategy + +### Unit Tests + +**New Test Files**: +- `tests/palantir/PalantirServer_envelope_test.cpp` +- `tests/palantir/PalantirServer_size_limit_test.cpp` +- `tests/palantir/PalantirServer_status_test.cpp` +- `tests/compute/ThermalDiffusionSolver_test.cpp` +- `tests/compute/ThermalDiffusionSolver_openmp_test.cpp` + +**Updated Test Files**: +- `tests/palantir/CapabilitiesService_test.cpp` (add status tests if in Capabilities) + +### Integration Tests + +**New Integration Tests**: +- `tests/integration/message_envelope_test.cpp` +- `tests/integration/thermal_diffusion_progress_test.cpp` +- `tests/integration/cancel_during_computation_test.cpp` + +### CI Jobs + +| Job | Coverage | +|-----|----------| +| `build` | All configs (BEDROCK_WITH_TRANSPORT_DEPS ON/OFF) | +| `test` | Unit + integration tests | +| `codeql` | Security analysis | +| `performance` | OpenMP scaling tests (if separate job) | + +--- + +## Dependencies on Other Repos + +### Palantir + +**Proto Updates Required**: +- v1.3.0: `envelope.proto`, `error.proto` (WP2.A) +- v1.4.0: Status RPC (if separate from Capabilities) (WP2.C) +- v1.5.0: `thermal_diffusion.proto` (WP2.D) + +**Update Sequence**: +1. Palantir creates proto, tags version +2. Bedrock updates `docs/palantir` submodule +3. Bedrock rebuilds with new proto + +### Phoenix + +**Coordination Required**: +- WP2.A: Phoenix must implement envelope before Bedrock can require it (or backward compat) +- WP2.C: Phoenix status polling +- WP2.D: Phoenix Thermal Diffusion client + +**Testing Coordination**: +- Integration tests require both Phoenix and Bedrock running +- CI may need to start Phoenix client for integration tests + +--- + +## Risk Analysis (Bedrock-Specific) + +### WP2.A Risks + +**Risk**: Breaking existing RPCs when introducing envelope +- **Mitigation**: Backward compatibility layer, support old client messages +- **Testing**: Old client → new server, new client → new server + +**Risk**: Message size limits too restrictive +- **Mitigation**: Configurable limit, default 10MB +- **Testing**: Test with various message sizes + +**Risk**: Thread-safety issues in PalantirServer +- **Mitigation**: Use Qt's queued connections, document ownership +- **Testing**: Thread-safety tests, stress tests + +### WP2.C Risks + +**Risk**: Status tracking overhead +- **Mitigation**: Lightweight tracking (counters, timestamps) +- **Testing**: Measure status tracking overhead + +**Risk**: Status accuracy (race conditions) +- **Mitigation**: Atomic counters, careful synchronization +- **Testing**: Status accuracy tests under load + +### WP2.D Risks + +**Risk**: OpenMP parallelization correctness +- **Mitigation**: Careful loop parallelization, avoid race conditions +- **Testing**: OpenMP correctness tests, compare with serial version + +**Risk**: Progress reporting overhead +- **Mitigation**: Throttle progress updates (max 10 per second) +- **Testing**: Measure progress reporting overhead + +**Risk**: Cancel during computation race conditions +- **Mitigation**: Atomic cancel flag, check at safe points (between time steps) +- **Testing**: Cancel during computation stress test + +**Risk**: Memory usage for large grids +- **Mitigation**: Validate grid size limits, use efficient data structures +- **Testing**: Memory usage tests with large grids + +--- + +## Summary + +**Total Bedrock Chunks**: 20 +- WP2.A: 5 chunks +- WP2.C: 4 chunks +- WP2.D: 7 chunks +- WP2.E: 3 chunks + +**Key Bedrock Files**: +- New: `ThermalDiffusionSolver` +- Modified: `PalantirServer`, `CapabilitiesService`, `bedrock_server` + +**Open Questions**: +1. Should Status RPC be separate or part of Capabilities? + - **Recommendation**: Separate (cleaner, more extensible) + +2. Should Thermal Diffusion use separate thread or Qt's thread pool? + - **Recommendation**: Separate thread (better control, easier cancel) + +3. Should progress updates be sent via same socket or separate socket? + - **Recommendation**: Same socket with message envelope (simpler) + +4. Should OpenMP thread count be configurable? + - **Recommendation**: Yes, via CLI option `--openmp-threads` + +--- + +**Bedrock Sprint 4.5 Planning Complete** — Ready for implementation. + diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fae068..4aabb67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,22 @@ This changelog follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) --- +## [0.0.4] – 2025-01-25 +### Bedrock (Backend) +- **Envelope-Based Palantir Transport**: Migrated to envelope-based wire format for all IPC communication. All messages now use `MessageEnvelope` protobuf with version, type, payload, and metadata fields. Wire format: `[4-byte length][serialized MessageEnvelope]`. Replaced legacy `[length][type][payload]` format. +- **Deadlock Fix**: Eliminated deadlock in `parseIncomingData()` by refactoring to `extractMessage()` helper (no locking) and narrowing lock scope in buffer manipulation. Removed mutex from `sendMessage()` to prevent nested locking. Transport layer now safe under concurrency. +- **Envelope Helpers**: Added reusable `makeEnvelope()` and `parseEnvelope()` helpers for creating and parsing envelope messages. Helpers include version validation, type checking, and error reporting. +- **Integration Test Harness**: Added comprehensive integration test framework with `IntegrationTestServerFixture` (in-process server) and `IntegrationTestClient` (minimal test client). Framework validates end-to-end envelope transport for RPCs. +- **Capabilities RPC Integration Test**: End-to-end test validating Capabilities request/response cycle using envelope transport. Test confirms server processes requests and sends responses correctly. +- **XY Sine RPC Integration Test**: End-to-end test validating XY Sine computation with mathematical correctness checks. Test validates envelope transport for numeric RPCs with repeated double fields and confirms server computation matches expected sine wave formula. + +### Phoenix (Frontend) +- **Envelope-Based Palantir Protocol**: Client-side migration to envelope-based protocol matching Bedrock server. All RPCs now use envelope-based transport with proper versioning and extensibility. +- **IPC Hardening**: Eliminated deadlocks in transport layer by refactoring lock scope. Transport layer now safe under concurrency. +- **Envelope Helpers**: Added reusable `makeEnvelope()` and `parseEnvelope()` helpers matching server-side implementation. + +--- + ## [v0.1.0-sprint1] – 2025-10-08 ### Status **Sprint 1 Complete — Baseline Release** diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f9a650..b8a7ed9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -245,6 +245,8 @@ if(BEDROCK_WITH_TRANSPORT_DEPS) find_library(ABSL_LOG_INTERNAL_CONDITIONS_LIB absl_log_internal_conditions PATHS /opt/homebrew/opt/abseil/lib NO_DEFAULT_PATH) find_library(ABSL_LOG_INTERNAL_MESSAGE_LIB absl_log_internal_message PATHS /opt/homebrew/opt/abseil/lib NO_DEFAULT_PATH) find_library(ABSL_LOG_INTERNAL_NULLGUARD_LIB absl_log_internal_nullguard PATHS /opt/homebrew/opt/abseil/lib NO_DEFAULT_PATH) + find_library(ABSL_HASH_LIB absl_hash PATHS /opt/homebrew/opt/abseil/lib NO_DEFAULT_PATH) + find_library(ABSL_LOG_INTERNAL_FORMAT_LIB absl_log_internal_format PATHS /opt/homebrew/opt/abseil/lib NO_DEFAULT_PATH) set(ABSEIL_LIBS) if(ABSL_DIE_IF_NULL_LIB) @@ -268,6 +270,12 @@ if(BEDROCK_WITH_TRANSPORT_DEPS) if(ABSL_LOG_INTERNAL_NULLGUARD_LIB) list(APPEND ABSEIL_LIBS ${ABSL_LOG_INTERNAL_NULLGUARD_LIB}) endif() + if(ABSL_HASH_LIB) + list(APPEND ABSEIL_LIBS ${ABSL_HASH_LIB}) + endif() + if(ABSL_LOG_INTERNAL_FORMAT_LIB) + list(APPEND ABSEIL_LIBS ${ABSL_LOG_INTERNAL_FORMAT_LIB}) + endif() if(ABSEIL_LIBS) target_link_libraries(bedrock_palantir_proto PUBLIC ${ABSEIL_LIBS}) @@ -301,6 +309,8 @@ if(BEDROCK_WITH_TRANSPORT_DEPS) add_library(bedrock_palantir_server STATIC src/palantir/PalantirServer.cpp + src/palantir/EnvelopeHelpers.cpp + src/palantir/EnvelopeHelpers.hpp ) target_include_directories(bedrock_palantir_server PUBLIC diff --git a/src/palantir/CapabilitiesService.cpp b/src/palantir/CapabilitiesService.cpp index 576dd34..813141a 100644 --- a/src/palantir/CapabilitiesService.cpp +++ b/src/palantir/CapabilitiesService.cpp @@ -9,7 +9,7 @@ ::palantir::CapabilitiesResponse CapabilitiesService::getCapabilities() const { ::palantir::CapabilitiesResponse response; auto* caps = response.mutable_capabilities(); - // WP1: Hard-coded capabilities + // Hard-coded capabilities (future: may be dynamic based on available features) // Future: May read from configuration or detect dynamically caps->set_server_version("bedrock-0.0.1"); caps->add_supported_features("xy_sine"); diff --git a/src/palantir/CapabilitiesService.hpp b/src/palantir/CapabilitiesService.hpp index bfae8f3..6f245e4 100644 --- a/src/palantir/CapabilitiesService.hpp +++ b/src/palantir/CapabilitiesService.hpp @@ -9,7 +9,7 @@ namespace bedrock { namespace palantir { // CapabilitiesService - In-process API for generating CapabilitiesResponse -// WP1: Simple, deterministic implementation without networking +// Capabilities service providing server capability information // Future: Will be integrated into Palantir IPC server class CapabilitiesService { public: diff --git a/src/palantir/EnvelopeHelpers.cpp b/src/palantir/EnvelopeHelpers.cpp new file mode 100644 index 0000000..ee20b46 --- /dev/null +++ b/src/palantir/EnvelopeHelpers.cpp @@ -0,0 +1,99 @@ +#include "EnvelopeHelpers.hpp" + +#ifdef BEDROCK_WITH_TRANSPORT_DEPS + +#include +#include + +namespace bedrock::palantir { + +std::optional<::palantir::MessageEnvelope> makeEnvelope( + ::palantir::MessageType type, + const google::protobuf::Message& innerMessage, + const std::map& metadata, + std::string* outError) +{ + // Serialize inner message + std::string serializedPayload; + if (!innerMessage.SerializeToString(&serializedPayload)) { + if (outError) { + *outError = "Failed to serialize inner message"; + } + return std::nullopt; + } + + // Create envelope + ::palantir::MessageEnvelope envelope; + envelope.set_version(PROTOCOL_VERSION); + envelope.set_type(type); + envelope.set_payload(serializedPayload); + + // Set metadata + for (const auto& [key, value] : metadata) { + (*envelope.mutable_metadata())[key] = value; + } + + return envelope; +} + +bool parseEnvelope( + const std::string& buffer, + ::palantir::MessageEnvelope& outEnvelope, + std::string* outError) +{ + if (buffer.empty()) { + if (outError) { + *outError = "Empty buffer"; + } + return false; + } + + // Parse envelope from buffer + if (!outEnvelope.ParseFromString(buffer)) { + if (outError) { + *outError = "Failed to parse MessageEnvelope"; + } + return false; + } + + // Validate version + if (outEnvelope.version() != PROTOCOL_VERSION) { + if (outError) { + std::ostringstream oss; + oss << "Invalid protocol version: " << outEnvelope.version() + << " (expected " << PROTOCOL_VERSION << ")"; + *outError = oss.str(); + } + return false; + } + + // Validate type (check if it's in valid enum range) + // MessageType enum values: 0-11 are defined, 12-255 are reserved + int typeValue = static_cast(outEnvelope.type()); + if (typeValue < 0 || typeValue > 255) { + if (outError) { + std::ostringstream oss; + oss << "Invalid MessageType value: " << typeValue; + *outError = oss.str(); + } + return false; + } + + // Check for UNSPECIFIED type (0) - this is reserved and should not be used + if (outEnvelope.type() == ::palantir::MessageType::MESSAGE_TYPE_UNSPECIFIED) { + if (outError) { + *outError = "MessageType is UNSPECIFIED (invalid)"; + } + return false; + } + + // Payload is optional (can be empty for some message types) + // No validation needed here - let the caller validate based on message type + + return true; +} + +} // namespace bedrock::palantir + +#endif // BEDROCK_WITH_TRANSPORT_DEPS + diff --git a/src/palantir/EnvelopeHelpers.hpp b/src/palantir/EnvelopeHelpers.hpp new file mode 100644 index 0000000..379226b --- /dev/null +++ b/src/palantir/EnvelopeHelpers.hpp @@ -0,0 +1,48 @@ +#pragma once + +#ifdef BEDROCK_WITH_TRANSPORT_DEPS + +#include "palantir/envelope.pb.h" +#include "palantir/error.pb.h" +#include +#include +#include +#include + +namespace bedrock::palantir { + +// Constants +static constexpr uint32_t PROTOCOL_VERSION = 1; + +/** + * Create a MessageEnvelope from an inner message. + * + * @param type Message type enum + * @param innerMessage Inner protobuf message to wrap + * @param metadata Optional metadata map (can be empty) + * @param outError Optional error string output + * @return MessageEnvelope on success, empty optional on failure + */ +std::optional<::palantir::MessageEnvelope> makeEnvelope( + ::palantir::MessageType type, + const google::protobuf::Message& innerMessage, + const std::map& metadata = {}, + std::string* outError = nullptr); + +/** + * Parse a MessageEnvelope from a buffer. + * + * @param buffer Serialized MessageEnvelope bytes + * @param outEnvelope Output envelope (populated on success) + * @param outError Optional error string output + * @return true on success, false on failure + */ +bool parseEnvelope( + const std::string& buffer, + ::palantir::MessageEnvelope& outEnvelope, + std::string* outError = nullptr); + +} // namespace bedrock::palantir + +#endif // BEDROCK_WITH_TRANSPORT_DEPS + diff --git a/src/palantir/PalantirServer.cpp b/src/palantir/PalantirServer.cpp index 9b8c36e..f2204e2 100644 --- a/src/palantir/PalantirServer.cpp +++ b/src/palantir/PalantirServer.cpp @@ -14,6 +14,7 @@ #include "palantir/xysine.pb.h" #include "palantir/envelope.pb.h" #include "palantir/error.pb.h" +#include "EnvelopeHelpers.hpp" #endif PalantirServer::PalantirServer(QObject *parent) @@ -119,9 +120,12 @@ void PalantirServer::onNewConnection() { QLocalSocket* client = server_->nextPendingConnection(); if (!client) { + qDebug() << "[SERVER] onNewConnection: ERROR - no pending connection"; return; } + qDebug() << "[SERVER] onNewConnection: new client" << client << ", state=" << client->state(); + // Connect client signals connect(client, &QLocalSocket::disconnected, this, &PalantirServer::onClientDisconnected); connect(client, &QLocalSocket::readyRead, this, &PalantirServer::onClientReadyRead); @@ -130,10 +134,11 @@ void PalantirServer::onNewConnection() { std::lock_guard lock(clientBuffersMutex_); clientBuffers_[client] = QByteArray(); + qDebug() << "[SERVER] onNewConnection: client added to buffers map, map size=" << clientBuffers_.size(); } emit clientConnected(); - qDebug() << "Client connected"; + qDebug() << "[SERVER] onNewConnection: Client connected signal emitted"; } void PalantirServer::onClientDisconnected() @@ -173,92 +178,25 @@ void PalantirServer::onClientReadyRead() { QLocalSocket* client = qobject_cast(sender()); if (!client) { + qDebug() << "[SERVER] onClientReadyRead: ERROR - no client"; return; } + qDebug() << "[SERVER] onClientReadyRead: bytes available=" << client->bytesAvailable(); parseIncomingData(client); } void PalantirServer::onHeartbeatTimer() { - // WP1: Heartbeat/Pong not yet implemented (requires Pong proto message) + // Heartbeat/Pong not yet implemented (requires Pong proto message) // Future: Send pong to all connected clients when Pong message is defined // For now, heartbeat timer is disabled or no-op } -void PalantirServer::handleMessage(QLocalSocket* client, const QByteArray& message) -{ -#ifdef BEDROCK_WITH_TRANSPORT_DEPS - // Sprint 4.5: Try new format first (with MessageType) - palantir::MessageType messageType; - QByteArray payload; - - if (readMessageWithType(client, messageType, payload)) { - // New format: dispatch by MessageType - switch (messageType) { - case palantir::MessageType::CAPABILITIES_REQUEST: { - palantir::CapabilitiesRequest request; - if (request.ParseFromArray(payload.data(), payload.size())) { - handleCapabilitiesRequest(client); - } else { - sendErrorResponse(client, palantir::ErrorCode::PROTOBUF_PARSE_ERROR, - "Failed to parse CapabilitiesRequest"); - } - return; - } - case palantir::MessageType::XY_SINE_REQUEST: { - palantir::XYSineRequest request; - if (request.ParseFromArray(payload.data(), payload.size())) { - handleXYSineRequest(client, request); - } else { - sendErrorResponse(client, palantir::ErrorCode::PROTOBUF_PARSE_ERROR, - "Failed to parse XYSineRequest"); - } - return; - } - case palantir::MessageType::ERROR_RESPONSE: - // Server should not receive ErrorResponse - qDebug() << "Server received ErrorResponse (unexpected)"; - return; - default: - sendErrorResponse(client, palantir::ErrorCode::UNKNOWN_MESSAGE_TYPE, - QString("Unknown message type: %1").arg(static_cast(messageType))); - return; - } - } - - // Backward compatibility: try old format (no MessageType) - // Log deprecation warning - static bool deprecationWarningLogged = false; - if (!deprecationWarningLogged) { - qWarning() << "DEPRECATED: Received message in old format (no MessageType). " - << "This format will be removed in Sprint 4.6. Please upgrade to new format."; - deprecationWarningLogged = true; - } - - // Try to parse as XYSineRequest (check before CapabilitiesRequest for specificity) - palantir::XYSineRequest xySineRequest; - if (xySineRequest.ParseFromArray(message.data(), message.size())) { - handleXYSineRequest(client, xySineRequest); - return; - } - - // Try to parse as CapabilitiesRequest - palantir::CapabilitiesRequest request; - if (request.ParseFromArray(message.data(), message.size())) { - handleCapabilitiesRequest(client); - return; - } - - // Unknown message format - sendErrorResponse(client, palantir::ErrorCode::UNKNOWN_MESSAGE_TYPE, - "Failed to parse message (unknown format)"); -#else - qDebug() << "Message received but transport deps disabled"; -#endif -} +// Legacy handleMessage() removed - envelope-based transport only +// All message handling now done via parseIncomingData() -> extractMessage() -// WP1: StartJob handler disabled (proto message not yet defined) +// StartJob handler disabled (proto message not yet defined) // Future: Re-enable when StartJob proto is added /* void PalantirServer::handleStartJob(QLocalSocket* client, const palantir::StartJob& startJob) @@ -305,16 +243,20 @@ void PalantirServer::handleStartJob(QLocalSocket* client, const palantir::StartJ jobThreads_[jobId] = std::move(jobThread); - qDebug() << "Started job:" << jobId; + // Job start logic commented out (requires StartJob proto message) } */ void PalantirServer::handleCapabilitiesRequest(QLocalSocket* client) { #ifdef BEDROCK_WITH_TRANSPORT_DEPS + qDebug() << "[SERVER] handleCapabilitiesRequest: starting"; bedrock::palantir::CapabilitiesService service; palantir::CapabilitiesResponse response = service.getCapabilities(); + qDebug() << "[SERVER] handleCapabilitiesRequest: got capabilities, server_version=" << response.capabilities().server_version().c_str(); + qDebug() << "[SERVER] handleCapabilitiesRequest: calling sendMessage..."; sendMessage(client, palantir::MessageType::CAPABILITIES_RESPONSE, response); + qDebug() << "[SERVER] handleCapabilitiesRequest: sendMessage returned"; #else qWarning() << "Capabilities requested but transport deps disabled"; #endif @@ -379,7 +321,7 @@ void PalantirServer::computeXYSine(const palantir::XYSineRequest& request, std:: } } -// WP1: Ping/Pong handler disabled (proto message not yet defined) +// Ping/Pong handler disabled (proto message not yet defined) // Future: Re-enable when Pong proto is added /* void PalantirServer::handlePing(QLocalSocket* client) @@ -569,49 +511,63 @@ void PalantirServer::sendMessage(QLocalSocket* client, palantir::MessageType typ return; } - // Verify client is still in our tracking (optional safety check) - { - std::lock_guard lock(clientBuffersMutex_); - if (clientBuffers_.find(client) == clientBuffers_.end()) { - qDebug() << "Attempted to send message to unknown client"; - return; - } + // Note: Removed clientBuffers_ map check to avoid deadlock. + // We rely on socket state check above. If client is connected, + // it's safe to write to it. + + qDebug() << "[SERVER] sendMessage: type=" << static_cast(type) << ", client=" << client; + + // Create envelope from message + std::string envelopeError; + auto envelope = bedrock::palantir::makeEnvelope(type, message, {}, &envelopeError); + + if (!envelope.has_value()) { + qDebug() << "[SERVER] sendMessage: ERROR - failed to create envelope:" << envelopeError.c_str(); + sendErrorResponse(client, palantir::ErrorCode::INTERNAL_ERROR, + QString("Failed to create envelope: %1").arg(envelopeError.c_str())); + return; } - // Serialize message + qDebug() << "[SERVER] sendMessage: envelope created, version=" << envelope->version() << ", type=" << static_cast(envelope->type()); + + // Serialize envelope std::string serialized; - if (!message.SerializeToString(&serialized)) { - qDebug() << "Failed to serialize message"; + if (!envelope->SerializeToString(&serialized)) { + qDebug() << "[SERVER] sendMessage: ERROR - failed to serialize envelope"; sendErrorResponse(client, palantir::ErrorCode::INTERNAL_ERROR, - "Failed to serialize message"); + "Failed to serialize MessageEnvelope"); return; } + qDebug() << "[SERVER] sendMessage: serialized size=" << serialized.size(); + // Check size limit if (serialized.size() > MAX_MESSAGE_SIZE) { - qDebug() << "Message too large:" << serialized.size(); + qDebug() << "[SERVER] sendMessage: ERROR - envelope too large:" << serialized.size(); sendErrorResponse(client, palantir::ErrorCode::MESSAGE_TOO_LARGE, - QString("Message size %1 exceeds limit %2") + QString("Envelope size %1 exceeds limit %2") .arg(serialized.size()).arg(MAX_MESSAGE_SIZE)); return; } - // Create length-prefixed message with MessageType + // Create length-prefixed message: [4-byte length][serialized MessageEnvelope] QByteArray data; - uint32_t totalLength = static_cast(serialized.size() + 1); // +1 for MessageType byte - uint8_t typeByte = static_cast(type); + uint32_t totalLength = static_cast(serialized.size()); // Write length (little-endian, 4 bytes) data.append(reinterpret_cast(&totalLength), 4); - // Write MessageType (1 byte) - data.append(reinterpret_cast(&typeByte), 1); - // Write serialized message + // Write serialized envelope data.append(serialized.data(), serialized.size()); + qDebug() << "[SERVER] sendMessage: writing" << data.size() << "bytes to client"; // Send data qint64 written = client->write(data); + qDebug() << "[SERVER] sendMessage: wrote" << written << "bytes"; + if (written != data.size()) { - qDebug() << "Failed to send complete message"; + qDebug() << "[SERVER] sendMessage: ERROR - failed to send complete message (wrote" << written << "of" << data.size() << "bytes)"; + } else { + qDebug() << "[SERVER] sendMessage: SUCCESS - message sent"; } } @@ -627,136 +583,148 @@ void PalantirServer::sendErrorResponse(QLocalSocket* client, palantir::ErrorCode sendMessage(client, palantir::MessageType::ERROR_RESPONSE, error); } -bool PalantirServer::readMessageWithType(QLocalSocket* client, palantir::MessageType& outType, QByteArray& outPayload) +bool PalantirServer::extractMessage(QByteArray& buffer, palantir::MessageType& outType, QByteArray& outPayload, QString* outError) { - if (!client) { - return false; - } - - // Thread-safe access to client buffer - std::lock_guard lock(clientBuffersMutex_); - auto it = clientBuffers_.find(client); - if (it == clientBuffers_.end()) { - return false; // Client not found (may have disconnected) - } - QByteArray& buffer = it->second; - - // Need at least 5 bytes: 4-byte length + 1-byte MessageType - if (buffer.size() < 5) { - return false; + // Need at least 4 bytes for length prefix + if (buffer.size() < 4) { + return false; // Incomplete frame, need more data } // Read length (little-endian) - uint32_t totalLength; - std::memcpy(&totalLength, buffer.data(), 4); + uint32_t envelopeLength; + std::memcpy(&envelopeLength, buffer.data(), 4); // Check size limit - if (totalLength > MAX_MESSAGE_SIZE + 1) { // +1 for MessageType byte - qDebug() << "Message length exceeds limit:" << totalLength; - sendErrorResponse(client, palantir::ErrorCode::MESSAGE_TOO_LARGE, - QString("Message length %1 exceeds limit %2") - .arg(totalLength).arg(MAX_MESSAGE_SIZE + 1)); + if (envelopeLength > MAX_MESSAGE_SIZE) { + if (outError) { + *outError = QString("Envelope length %1 exceeds limit %2") + .arg(envelopeLength).arg(MAX_MESSAGE_SIZE); + } buffer.clear(); // Clear buffer to prevent further parsing - return false; + return false; // Hard error } - // Check if we have the complete message - if (buffer.size() < 4 + totalLength) { - return false; + // Check if we have the complete envelope + if (buffer.size() < 4 + envelopeLength) { + return false; // Incomplete frame, need more data } - // Read MessageType (1 byte after length) - uint8_t typeByte = static_cast(buffer[4]); - outType = static_cast(typeByte); - - // Read payload (everything after MessageType) - outPayload = buffer.mid(5, totalLength - 1); - buffer.remove(0, 4 + totalLength); + // Extract envelope bytes (after length prefix) + QByteArray envelopeBytes = buffer.mid(4, envelopeLength); + buffer.remove(0, 4 + envelopeLength); - return true; -} - -// Backward compatibility: read old format (no MessageType, just length + payload) -// This function is kept for backward compatibility but is deprecated -// It reads from the buffer directly (used by parseIncomingData for old format) -QByteArray PalantirServer::readMessage(QLocalSocket* client) -{ - if (!client) { - return QByteArray(); - } - - // Thread-safe access to client buffer - std::lock_guard lock(clientBuffersMutex_); - auto it = clientBuffers_.find(client); - if (it == clientBuffers_.end()) { - return QByteArray(); // Client not found (may have disconnected) - } - QByteArray& buffer = it->second; - - if (buffer.size() < 4) { - return QByteArray(); - } - - uint32_t length; - std::memcpy(&length, buffer.data(), 4); - - // Check size limit - if (length > MAX_MESSAGE_SIZE) { - qDebug() << "Old-format message length exceeds limit:" << length; - buffer.clear(); - return QByteArray(); + // Parse envelope + ::palantir::MessageEnvelope envelope; + std::string parseError; + if (!bedrock::palantir::parseEnvelope( + std::string(envelopeBytes.data(), envelopeBytes.size()), + envelope, + &parseError)) { + if (outError) { + *outError = QString("Malformed envelope: %1").arg(parseError.c_str()); + } + return false; // Hard error } - if (buffer.size() < 4 + length) { - return QByteArray(); + // Validate envelope version + if (envelope.version() != 1) { + if (outError) { + *outError = QString("Invalid envelope version: %1").arg(envelope.version()); + } + return false; // Hard error } - QByteArray message = buffer.mid(4, length); - buffer.remove(0, 4 + length); + // Extract type and payload + outType = envelope.type(); + outPayload = QByteArray(envelope.payload().data(), envelope.payload().size()); - return message; + return true; // Success } + +// Legacy readMessage() removed - envelope-based transport only #endif // BEDROCK_WITH_TRANSPORT_DEPS void PalantirServer::parseIncomingData(QLocalSocket* client) { if (!client) { + qDebug() << "[SERVER] parseIncomingData: ERROR - no client"; return; } // Read all available data first QByteArray newData = client->readAll(); - if (newData.isEmpty()) { - return; - } + qDebug() << "[SERVER] parseIncomingData: read" << newData.size() << "bytes from client"; - // Thread-safe access to client buffer - std::lock_guard lock(clientBuffersMutex_); - auto it = clientBuffers_.find(client); - if (it == clientBuffers_.end()) { - // Client not found (may have disconnected) + if (newData.isEmpty()) { + qDebug() << "[SERVER] parseIncomingData: no data available"; return; } - QByteArray& buffer = it->second; - buffer += newData; #ifdef BEDROCK_WITH_TRANSPORT_DEPS - // Try new format first (5 bytes minimum: 4-byte length + 1-byte MessageType) - // Note: readMessageWithType will lock clientBuffersMutex_ internally + // Parse envelope-based messages + // Lock scope is narrowed to buffer manipulation only; dispatch happens outside lock while (true) { palantir::MessageType messageType; QByteArray payload; - if (!readMessageWithType(client, messageType, payload)) { - break; // Not enough data or client disconnected + QString extractError; + + // === CRITICAL SECTION: buffer manipulation only === + { + std::lock_guard lock(clientBuffersMutex_); + auto it = clientBuffers_.find(client); + if (it == clientBuffers_.end()) { + // Client not found (may have disconnected) + qDebug() << "[SERVER] parseIncomingData: ERROR - client not in buffers map"; + return; + } + QByteArray& buffer = it->second; + + // Append new data only once per call + if (!newData.isEmpty()) { + buffer += newData; + newData.clear(); + qDebug() << "[SERVER] parseIncomingData: buffer size now=" << buffer.size(); + } + + // Extract message from buffer (no locking inside extractMessage) + qDebug() << "[SERVER] parseIncomingData: attempting to extract message..."; + if (!extractMessage(buffer, messageType, payload, &extractError)) { + // Check if it's a hard error or just incomplete data + if (!extractError.isEmpty()) { + // Hard error - will handle outside lock + qDebug() << "[SERVER] parseIncomingData: extractMessage error:" << extractError; + } else { + // Incomplete frame - need more data, wait for next readyRead + qDebug() << "[SERVER] parseIncomingData: incomplete message, waiting for more data"; + } + break; // Exit critical section + } + + qDebug() << "[SERVER] parseIncomingData: extracted message, type=" << static_cast(messageType) << ", payload size=" << payload.size(); } + // === LOCK RELEASED HERE === - // Dispatch by MessageType + // Handle extraction errors outside lock + if (!extractError.isEmpty()) { + if (extractError.contains("exceeds limit")) { + sendErrorResponse(client, palantir::ErrorCode::MESSAGE_TOO_LARGE, extractError); + } else { + sendErrorResponse(client, palantir::ErrorCode::INVALID_MESSAGE_FORMAT, extractError); + } + continue; // Try to extract next message if available + } + + // === DISPATCH WITHOUT HOLDING MUTEX === switch (messageType) { case palantir::MessageType::CAPABILITIES_REQUEST: { + qDebug() << "[SERVER] parseIncomingData: handling CAPABILITIES_REQUEST"; palantir::CapabilitiesRequest request; if (request.ParseFromArray(payload.data(), payload.size())) { + qDebug() << "[SERVER] parseIncomingData: parsed CapabilitiesRequest, calling handleCapabilitiesRequest"; handleCapabilitiesRequest(client); + qDebug() << "[SERVER] parseIncomingData: handleCapabilitiesRequest returned"; } else { + qDebug() << "[SERVER] parseIncomingData: ERROR - failed to parse CapabilitiesRequest"; sendErrorResponse(client, palantir::ErrorCode::PROTOBUF_PARSE_ERROR, "Failed to parse CapabilitiesRequest"); } @@ -781,60 +749,9 @@ void PalantirServer::parseIncomingData(QLocalSocket* client) continue; } } - - // Fallback: try old format (backward compatibility) - // Note: readMessage will lock clientBuffersMutex_ internally - while (true) { - QByteArray message = readMessage(client); - if (message.isEmpty()) { - break; - } - handleMessage(client, message); - } #else - // Old format only (when transport deps disabled) - // Read data first - QByteArray newData = client->readAll(); - if (newData.isEmpty()) { - return; - } - - // Thread-safe access to client buffer - std::lock_guard lock(clientBuffersMutex_); - auto it = clientBuffers_.find(client); - if (it == clientBuffers_.end()) { - return; // Client not found (may have disconnected) - } - QByteArray& buffer = it->second; - buffer += newData; - - // Process messages (parse directly from buffer while holding lock) - // Extract messages to process outside the lock - std::vector messagesToProcess; - while (buffer.size() >= 4) { - uint32_t length; - std::memcpy(&length, buffer.data(), 4); - - if (length > MAX_MESSAGE_SIZE) { - qDebug() << "Old-format message length exceeds limit:" << length; - buffer.clear(); - break; - } - - if (buffer.size() < 4 + length) { - break; // Not enough data yet - } - - QByteArray message = buffer.mid(4, length); - buffer.remove(0, 4 + length); - messagesToProcess.push_back(message); - } - // Lock is released here when lock goes out of scope - - // Process messages outside the lock - for (const QByteArray& message : messagesToProcess) { - handleMessage(client, message); - } + // Transport deps disabled - envelope-based transport not available + qDebug() << "Transport deps disabled - cannot process envelope-based messages"; #endif } diff --git a/src/palantir/PalantirServer.hpp b/src/palantir/PalantirServer.hpp index 3130f93..99a0ebe 100644 --- a/src/palantir/PalantirServer.hpp +++ b/src/palantir/PalantirServer.hpp @@ -48,8 +48,7 @@ private slots: void onHeartbeatTimer(); private: - // Message handling - void handleMessage(QLocalSocket* client, const QByteArray& message); + // Message handling (legacy handleMessage removed - envelope-based transport only) #ifdef BEDROCK_WITH_TRANSPORT_DEPS void handleCapabilitiesRequest(QLocalSocket* client); void handleXYSineRequest(QLocalSocket* client, const palantir::XYSineRequest& request); @@ -60,23 +59,22 @@ private slots: // void handleCancel(QLocalSocket* client, const palantir::Cancel& cancel); // void handlePing(QLocalSocket* client); - // Job processing (WP1: disabled - proto messages not yet defined) + // Job processing (disabled - proto messages not yet defined) // Future: Re-enable when ComputeSpec, ResultMeta, etc. are defined // void processJob(const QString& jobId, const palantir::ComputeSpec& spec); // void sendProgress(const QString& jobId, double progress, const QString& status); // void sendResult(const QString& jobId, const palantir::ResultMeta& meta); // void sendDataChunk(const QString& jobId, const QByteArray& data, int chunkIndex, int totalChunks); - // XY Sine computation (WP1: disabled) + // XY Sine computation (implemented via handleXYSineRequest) // void computeXYSine(const palantir::ComputeSpec& spec, std::vector& xValues, std::vector& yValues); // Protocol helpers #ifdef BEDROCK_WITH_TRANSPORT_DEPS void sendMessage(QLocalSocket* client, palantir::MessageType type, const google::protobuf::Message& message); void sendErrorResponse(QLocalSocket* client, palantir::ErrorCode errorCode, const QString& message, const QString& details = QString()); - bool readMessageWithType(QLocalSocket* client, palantir::MessageType& outType, QByteArray& outPayload); - // Backward compatibility: read old format (no MessageType) - QByteArray readMessage(QLocalSocket* client); + bool extractMessage(QByteArray& buffer, palantir::MessageType& outType, QByteArray& outPayload, QString* outError = nullptr); + // Legacy readMessage() removed - envelope-based transport only #endif void parseIncomingData(QLocalSocket* client); @@ -94,7 +92,7 @@ private slots: std::map jobClients_; - // Job tracking (WP1: disabled - proto messages not yet defined) + // Job tracking (disabled - proto messages not yet defined) // std::map activeJobs_; std::map> jobCancelled_; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 5aec25b..e62258c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -13,6 +13,7 @@ add_executable(bedrock_tests test_math.cpp # Palantir tests (only when transport deps enabled) $<$:palantir/CapabilitiesService_test.cpp> + $<$:palantir/EnvelopeHelpers_test.cpp> ) target_link_libraries(bedrock_tests @@ -26,12 +27,19 @@ if(BEDROCK_WITH_TRANSPORT_DEPS) target_link_libraries(bedrock_tests PRIVATE bedrock_capabilities_service bedrock_palantir_proto + bedrock_palantir_server ) target_compile_definitions(bedrock_tests PRIVATE BEDROCK_WITH_TRANSPORT_DEPS) target_include_directories(bedrock_tests PRIVATE ${CMAKE_BINARY_DIR}/generated/palantir + ${CMAKE_SOURCE_DIR}/src ) endif() include(GoogleTest) gtest_discover_tests(bedrock_tests) + +# Integration tests (only when transport deps enabled) +if(BEDROCK_WITH_TRANSPORT_DEPS) + add_subdirectory(integration) +endif() diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt new file mode 100644 index 0000000..9ffe56c --- /dev/null +++ b/tests/integration/CMakeLists.txt @@ -0,0 +1,44 @@ +# Integration tests for Palantir transport layer +# Tests Phoenix ↔ Bedrock communication end-to-end using envelope-based transport + +if(BEDROCK_WITH_TRANSPORT_DEPS) + # Find Qt6 (required for QCoreApplication, QLocalSocket, QTest, etc.) + find_package(Qt6 REQUIRED COMPONENTS Core Network Test) + + add_executable(integration_tests + integration_main.cpp + IntegrationTestServerFixture.cpp + IntegrationTestClient.cpp + CapabilitiesIntegrationTest.cpp + XYSineIntegrationTest.cpp + ) + + target_link_libraries(integration_tests + PRIVATE + bedrock_palantir_server + bedrock_palantir_proto + bedrock_capabilities_service + GTest::gtest + Qt6::Core + Qt6::Network + Qt6::Test + ) + + target_compile_definitions(integration_tests PRIVATE BEDROCK_WITH_TRANSPORT_DEPS) + + target_include_directories(integration_tests PRIVATE + ${CMAKE_BINARY_DIR}/generated/palantir + ${CMAKE_SOURCE_DIR}/src + ) + + # Enable Qt MOC for QObject-derived classes + set_target_properties(integration_tests PROPERTIES + AUTOMOC ON + ) + + include(GoogleTest) + gtest_discover_tests(integration_tests) +else() + message(STATUS "Skipping integration tests - BEDROCK_WITH_TRANSPORT_DEPS not enabled") +endif() + diff --git a/tests/integration/CapabilitiesIntegrationTest.cpp b/tests/integration/CapabilitiesIntegrationTest.cpp new file mode 100644 index 0000000..7193304 --- /dev/null +++ b/tests/integration/CapabilitiesIntegrationTest.cpp @@ -0,0 +1,110 @@ +#include "IntegrationTestServerFixture.hpp" +#include "IntegrationTestClient.hpp" + +#ifdef BEDROCK_WITH_TRANSPORT_DEPS +#include +#include "palantir/capabilities.pb.h" +#include +#include +#include +#include +#include + +class CapabilitiesIntegrationTest : public ::testing::Test { +protected: + void SetUp() override { + // Ensure QCoreApplication exists + if (!QCoreApplication::instance()) { + static int argc = 1; + static char* argv[] = { const_cast("integration_tests"), nullptr }; + app_ = std::make_unique(argc, argv); + } + + qDebug() << "[TEST] SetUp: Starting server fixture..."; + // Start server + ASSERT_TRUE(fixture_.startServer()) << "Failed to start test server"; + + qDebug() << "[TEST] SetUp: Server started, processing events..."; + // Give server a moment to be ready and process any pending events + QCoreApplication::processEvents(); + QThread::msleep(100); // Small delay to ensure server is fully ready + QCoreApplication::processEvents(); + qDebug() << "[TEST] SetUp: Server ready"; + } + + void TearDown() override { + fixture_.stopServer(); + QCoreApplication::processEvents(); + } + + IntegrationTestServerFixture fixture_; + std::unique_ptr app_; +}; + +TEST_F(CapabilitiesIntegrationTest, CapabilitiesRequestResponse) { + qDebug() << "[TEST] Starting CapabilitiesRequestResponse test"; + + // Create client and connect + IntegrationTestClient client; + qDebug() << "[TEST] Connecting to socket:" << fixture_.socketPath(); + ASSERT_TRUE(client.connect(fixture_.socketPath())) + << "Failed to connect to test server"; + + qDebug() << "[TEST] Client connected, processing events to allow server to register client..."; + // Process events to allow connection to complete and server to register client + QCoreApplication::processEvents(); + QThread::msleep(100); // Give server time to process newConnection signal + QCoreApplication::processEvents(); + QThread::msleep(50); + QCoreApplication::processEvents(); + qDebug() << "[TEST] Events processed, ready to send request"; + + qDebug() << "[TEST] Sending CapabilitiesRequest..."; + // Send CapabilitiesRequest and receive response + palantir::CapabilitiesResponse response; + QString error; + + // Use explicit timeout with event processing + bool success = false; + const int maxAttempts = 20; + const int timeoutMs = 5000; + QElapsedTimer timer; + timer.start(); + + for (int i = 0; i < maxAttempts && !success && timer.elapsed() < timeoutMs; ++i) { + qDebug() << "[TEST] Attempt" << (i + 1) << "of" << maxAttempts << ", elapsed=" << timer.elapsed() << "ms"; + QCoreApplication::processEvents(); + success = client.getCapabilities(response, error); + if (!success && i < maxAttempts - 1) { + QThread::msleep(50); + } + } + + qDebug() << "[TEST] getCapabilities result: success=" << success << ", error=" << error; + ASSERT_TRUE(success) + << "getCapabilities failed: " << error.toStdString() << " (after " << timer.elapsed() << "ms)"; + + // Verify response is valid + ASSERT_TRUE(response.IsInitialized()); + + // Verify response has capabilities + ASSERT_TRUE(response.has_capabilities()); + + // Verify server version is set (should be non-empty) + const auto& caps = response.capabilities(); + EXPECT_FALSE(caps.server_version().empty()) + << "Server version should not be empty"; + + // Verify supported features list exists (may be empty, but field should exist) + // This is a basic sanity check that the response structure is correct + EXPECT_GE(caps.supported_features_size(), 0); +} + +#else +// Stub when transport deps disabled +#include +TEST(CapabilitiesIntegrationTest, DISABLED_RequiresTransportDeps) { + GTEST_SKIP() << "Integration tests require BEDROCK_WITH_TRANSPORT_DEPS=ON"; +} +#endif + diff --git a/tests/integration/IntegrationTestClient.cpp b/tests/integration/IntegrationTestClient.cpp new file mode 100644 index 0000000..07a4432 --- /dev/null +++ b/tests/integration/IntegrationTestClient.cpp @@ -0,0 +1,289 @@ +#include "IntegrationTestClient.hpp" + +#ifdef BEDROCK_WITH_TRANSPORT_DEPS +#include +#include +#include +#include +#include +#include "palantir/xysine.pb.h" + +IntegrationTestClient::IntegrationTestClient() + : socket_(std::make_unique()) +{ +} + +IntegrationTestClient::~IntegrationTestClient() +{ + disconnect(); +} + +bool IntegrationTestClient::connect(const QString& socketPath) +{ + if (!socket_) { + socket_ = std::make_unique(); + } + + if (socket_->state() == QLocalSocket::ConnectedState) { + return true; + } + + socket_->connectToServer(socketPath); + + if (!socket_->waitForConnected(5000)) { + return false; + } + + return true; +} + +void IntegrationTestClient::disconnect() +{ + if (socket_) { + socket_->disconnectFromServer(); + if (socket_->state() != QLocalSocket::UnconnectedState) { + socket_->waitForDisconnected(1000); + } + } +} + +bool IntegrationTestClient::isConnected() const +{ + return socket_ && socket_->state() == QLocalSocket::ConnectedState; +} + +bool IntegrationTestClient::sendEnvelope(palantir::MessageType type, const google::protobuf::Message& message, QString& outError) +{ + qDebug() << "[CLIENT] sendEnvelope: type=" << static_cast(type) << ", connected=" << isConnected(); + + if (!isConnected()) { + outError = "Not connected"; + qDebug() << "[CLIENT] sendEnvelope: ERROR - not connected"; + return false; + } + + // Create envelope using the live transport helper + std::string envelopeError; + auto envelope = bedrock::palantir::makeEnvelope(type, message, {}, &envelopeError); + + if (!envelope.has_value()) { + outError = QString("Failed to create envelope: %1").arg(envelopeError.c_str()); + qDebug() << "[CLIENT] sendEnvelope: ERROR - failed to create envelope:" << envelopeError.c_str(); + return false; + } + + qDebug() << "[CLIENT] sendEnvelope: envelope created, version=" << envelope->version() << ", type=" << static_cast(envelope->type()); + + // Validate envelope payload size (even empty messages should have some envelope overhead) + qDebug() << "[CLIENT] sendEnvelope: envelope payload size=" << envelope->payload().size(); + if (envelope->payload().size() == 0) { + qDebug() << "[CLIENT] sendEnvelope: WARNING - payload is empty (this is OK for empty messages like CapabilitiesRequest)"; + } + + // Serialize envelope + std::string serialized; + if (!envelope->SerializeToString(&serialized)) { + outError = "Failed to serialize MessageEnvelope"; + qDebug() << "[CLIENT] sendEnvelope: ERROR - failed to serialize"; + return false; + } + + qDebug() << "[CLIENT] sendEnvelope: serialized envelope size=" << serialized.size(); + + // Send: [4-byte length][serialized MessageEnvelope] + uint32_t length = static_cast(serialized.size()); + QByteArray data; + data.append(reinterpret_cast(&length), 4); + data.append(serialized.data(), static_cast(serialized.size())); + + qDebug() << "[CLIENT] sendEnvelope: writing" << data.size() << "bytes to socket"; + qint64 written = socket_->write(data); + qDebug() << "[CLIENT] sendEnvelope: wrote" << written << "bytes"; + + if (written != data.size()) { + outError = QString("Failed to send complete message (wrote %1 of %2 bytes)").arg(written).arg(data.size()); + qDebug() << "[CLIENT] sendEnvelope: ERROR -" << outError; + return false; + } + + // Flush to ensure data is sent (write() already accepted all bytes) + socket_->flush(); + qDebug() << "[CLIENT] sendEnvelope: flushed, message queued for sending"; + + qDebug() << "[CLIENT] sendEnvelope: SUCCESS - message sent"; + return true; +} + +bool IntegrationTestClient::receiveEnvelope(palantir::MessageEnvelope& outEnvelope, QString& outError) +{ + qDebug() << "[CLIENT] receiveEnvelope: starting, connected=" << isConnected(); + + if (!isConnected()) { + outError = "Not connected"; + qDebug() << "[CLIENT] receiveEnvelope: ERROR - not connected"; + return false; + } + + // Wait for data using Qt test pattern (spins event loop properly) + qDebug() << "[CLIENT] receiveEnvelope: waiting for data using QTest::qWait..."; + int elapsed = 0; + const int timeoutMs = 5000; + while (elapsed < timeoutMs && socket_->bytesAvailable() < 4) { + QTest::qWait(10); // Spins Qt event loop for 10ms + elapsed += 10; + } + + if (socket_->bytesAvailable() < 4) { + outError = QString("Timeout waiting for response (elapsed: %1ms)").arg(elapsed); + qDebug() << "[CLIENT] receiveEnvelope: ERROR -" << outError; + return false; + } + + qDebug() << "[CLIENT] receiveEnvelope: data available, bytes=" << socket_->bytesAvailable(); + + // Read length prefix (4 bytes) + QByteArray lengthBytes = socket_->read(4); + qDebug() << "[CLIENT] receiveEnvelope: read length prefix, size=" << lengthBytes.size(); + + if (lengthBytes.size() != 4) { + outError = QString("Failed to read length prefix (got %1 bytes)").arg(lengthBytes.size()); + qDebug() << "[CLIENT] receiveEnvelope: ERROR -" << outError; + return false; + } + + uint32_t responseLength; + std::memcpy(&responseLength, lengthBytes.data(), 4); + qDebug() << "[CLIENT] receiveEnvelope: response length=" << responseLength; + + // Wait for complete envelope using Qt test pattern + elapsed = 0; + while (elapsed < timeoutMs && socket_->bytesAvailable() < static_cast(responseLength)) { + QTest::qWait(10); // Spins Qt event loop for 10ms + elapsed += 10; + } + + if (socket_->bytesAvailable() < static_cast(responseLength)) { + outError = QString("Timeout reading MessageEnvelope (got %1 of %2 bytes, elapsed: %3ms)") + .arg(static_cast(socket_->bytesAvailable())).arg(static_cast(responseLength)).arg(elapsed); + qDebug() << "[CLIENT] receiveEnvelope: ERROR -" << outError; + return false; + } + + // Read complete envelope + QByteArray envelopeBytes = socket_->read(responseLength); + qDebug() << "[CLIENT] receiveEnvelope: read complete envelope, size=" << envelopeBytes.size(); + + if (envelopeBytes.size() != static_cast(responseLength)) { + outError = QString("Failed to read complete MessageEnvelope (got %1 of %2 bytes)") + .arg(envelopeBytes.size()).arg(static_cast(responseLength)); + qDebug() << "[CLIENT] receiveEnvelope: ERROR -" << outError; + return false; + } + + // Parse envelope using the live transport helper + std::string parseError; + if (!bedrock::palantir::parseEnvelope( + std::string(envelopeBytes.data(), envelopeBytes.size()), + outEnvelope, + &parseError)) { + outError = QString("Failed to parse MessageEnvelope: %1").arg(parseError.c_str()); + qDebug() << "[CLIENT] receiveEnvelope: ERROR -" << outError; + return false; + } + + qDebug() << "[CLIENT] receiveEnvelope: parsed envelope, version=" << outEnvelope.version() << ", type=" << static_cast(outEnvelope.type()); + + // Validate envelope version + if (outEnvelope.version() != 1) { + outError = QString("Invalid envelope version: %1").arg(outEnvelope.version()); + qDebug() << "[CLIENT] receiveEnvelope: ERROR -" << outError; + return false; + } + + qDebug() << "[CLIENT] receiveEnvelope: SUCCESS"; + return true; +} + +bool IntegrationTestClient::getCapabilities(palantir::CapabilitiesResponse& outResponse, QString& outError) +{ + // Send CapabilitiesRequest + // Note: CapabilitiesRequest is an empty message (no fields), but protobuf will still + // serialize it as an empty payload. The envelope itself should still serialize correctly + // with version and type fields. + palantir::CapabilitiesRequest request; + + // Verify the request can be serialized (even if empty) + std::string requestSerialized; + if (!request.SerializeToString(&requestSerialized)) { + outError = "Failed to serialize CapabilitiesRequest"; + qDebug() << "[CLIENT] getCapabilities: ERROR - failed to serialize request"; + return false; + } + qDebug() << "[CLIENT] getCapabilities: CapabilitiesRequest serialized size=" << requestSerialized.size(); + + if (!sendEnvelope(palantir::MessageType::CAPABILITIES_REQUEST, request, outError)) { + return false; + } + + // Receive envelope + palantir::MessageEnvelope envelope; + if (!receiveEnvelope(envelope, outError)) { + return false; + } + + // Validate response type + if (envelope.type() != palantir::MessageType::CAPABILITIES_RESPONSE) { + outError = QString("Unexpected message type: %1").arg(static_cast(envelope.type())); + return false; + } + + // Parse inner CapabilitiesResponse + const std::string& payload = envelope.payload(); + if (!outResponse.ParseFromArray(payload.data(), static_cast(payload.size()))) { + outError = "Failed to parse CapabilitiesResponse from envelope payload"; + return false; + } + + return true; +} + +bool IntegrationTestClient::sendXYSineRequest(const palantir::XYSineRequest& request, palantir::XYSineResponse& outResponse, QString& outError) +{ + // Send XYSineRequest + if (!sendEnvelope(palantir::MessageType::XY_SINE_REQUEST, request, outError)) { + return false; + } + + // Receive envelope + palantir::MessageEnvelope envelope; + if (!receiveEnvelope(envelope, outError)) { + return false; + } + + // Validate response type + if (envelope.type() != palantir::MessageType::XY_SINE_RESPONSE) { + outError = QString("Unexpected message type: %1").arg(static_cast(envelope.type())); + return false; + } + + // Parse inner XYSineResponse + const std::string& payload = envelope.payload(); + if (!outResponse.ParseFromArray(payload.data(), static_cast(payload.size()))) { + outError = "Failed to parse XYSineResponse from envelope payload"; + return false; + } + + return true; +} + +#else +// Stub implementation when transport deps disabled +IntegrationTestClient::IntegrationTestClient() {} +IntegrationTestClient::~IntegrationTestClient() {} +bool IntegrationTestClient::connect(const QString&) { return false; } +void IntegrationTestClient::disconnect() {} +bool IntegrationTestClient::isConnected() const { return false; } +bool IntegrationTestClient::getCapabilities(palantir::CapabilitiesResponse&, QString&) { return false; } +bool IntegrationTestClient::sendXYSineRequest(const palantir::XYSineRequest&, palantir::XYSineResponse&, QString&) { return false; } +#endif + diff --git a/tests/integration/IntegrationTestClient.hpp b/tests/integration/IntegrationTestClient.hpp new file mode 100644 index 0000000..2430d86 --- /dev/null +++ b/tests/integration/IntegrationTestClient.hpp @@ -0,0 +1,84 @@ +#pragma once + +#include +#include +#include + +#ifdef BEDROCK_WITH_TRANSPORT_DEPS +#include "palantir/envelope.pb.h" +#include "palantir/capabilities.pb.h" +#include "palantir/xysine.pb.h" +#include "palantir/EnvelopeHelpers.hpp" +#include +#include +#endif + +/** + * Minimal C++ client for integration testing. + * + * Connects to PalantirServer and sends/receives envelope-encoded messages + * using the live transport layer. + */ +class IntegrationTestClient { +public: + IntegrationTestClient(); + ~IntegrationTestClient(); + + /** + * Connect to server. + * @param socketPath Unix domain socket path + * @return true on success, false on failure + */ + bool connect(const QString& socketPath); + + /** + * Disconnect from server. + */ + void disconnect(); + + /** + * Check if connected. + * @return true if connected + */ + bool isConnected() const; + + /** + * Send CapabilitiesRequest and receive CapabilitiesResponse. + * @param outResponse Output response (populated on success) + * @param outError Output error message (populated on failure) + * @return true on success, false on failure + */ + bool getCapabilities(palantir::CapabilitiesResponse& outResponse, QString& outError); + + /** + * Send XYSineRequest and receive XYSineResponse. + * @param request Input request + * @param outResponse Output response (populated on success) + * @param outError Output error message (populated on failure) + * @return true on success, false on failure + */ + bool sendXYSineRequest(const palantir::XYSineRequest& request, palantir::XYSineResponse& outResponse, QString& outError); + +private: +#ifdef BEDROCK_WITH_TRANSPORT_DEPS + std::unique_ptr socket_; + + /** + * Send envelope-encoded message. + * @param type Message type + * @param message Inner message to wrap + * @param outError Output error message + * @return true on success, false on failure + */ + bool sendEnvelope(palantir::MessageType type, const google::protobuf::Message& message, QString& outError); + + /** + * Receive and parse envelope-encoded message. + * @param outEnvelope Output envelope (populated on success) + * @param outError Output error message (populated on failure) + * @return true on success, false on failure + */ + bool receiveEnvelope(palantir::MessageEnvelope& outEnvelope, QString& outError); +#endif +}; + diff --git a/tests/integration/IntegrationTestServerFixture.cpp b/tests/integration/IntegrationTestServerFixture.cpp new file mode 100644 index 0000000..c6b00e7 --- /dev/null +++ b/tests/integration/IntegrationTestServerFixture.cpp @@ -0,0 +1,86 @@ +#include "IntegrationTestServerFixture.hpp" + +#ifdef BEDROCK_WITH_TRANSPORT_DEPS +#include +#include +#include +#include +#include + +int IntegrationTestServerFixture::instanceCount_ = 0; + +IntegrationTestServerFixture::IntegrationTestServerFixture() +{ + // Create QCoreApplication if it doesn't exist + if (!QCoreApplication::instance()) { + // Create a minimal argc/argv for QCoreApplication + static int argc = 1; + static char* argv[] = { const_cast("integration_tests"), nullptr }; + app_ = std::make_unique(argc, argv); + } + + server_ = std::make_unique(); + + // Generate unique socket path for this test instance + QString uniqueId = QUuid::createUuid().toString(QUuid::WithoutBraces); + socketPath_ = QString("palantir_test_%1").arg(uniqueId); +} + +IntegrationTestServerFixture::~IntegrationTestServerFixture() +{ + stopServer(); +} + +bool IntegrationTestServerFixture::startServer() +{ + if (isRunning()) { + return true; + } + + qDebug() << "[FIXTURE] Starting server on socket:" << socketPath_; + if (!server_->startServer(socketPath_)) { + qDebug() << "[FIXTURE] ERROR - Failed to start integration test server"; + return false; + } + + qDebug() << "[FIXTURE] Server started, processing events..."; + // Process events to allow server to start and be ready for connections + if (QCoreApplication::instance()) { + QCoreApplication::processEvents(); + QThread::msleep(50); // Small delay to ensure server is fully ready + QCoreApplication::processEvents(); + } + + qDebug() << "[FIXTURE] Server ready"; + return true; +} + +void IntegrationTestServerFixture::stopServer() +{ + if (server_ && server_->isRunning()) { + server_->stopServer(); + if (QCoreApplication::instance()) { + QCoreApplication::processEvents(); + } + } + + // Clean up socket file + if (!socketPath_.isEmpty()) { + QLocalServer::removeServer(socketPath_); + } +} + +bool IntegrationTestServerFixture::isRunning() const +{ + return server_ && server_->isRunning(); +} + +#else +// Stub implementation when transport deps disabled +IntegrationTestServerFixture::IntegrationTestServerFixture() {} +IntegrationTestServerFixture::~IntegrationTestServerFixture() {} +bool IntegrationTestServerFixture::startServer() { return false; } +void IntegrationTestServerFixture::stopServer() {} +bool IntegrationTestServerFixture::isRunning() const { return false; } +#endif + diff --git a/tests/integration/IntegrationTestServerFixture.hpp b/tests/integration/IntegrationTestServerFixture.hpp new file mode 100644 index 0000000..201926c --- /dev/null +++ b/tests/integration/IntegrationTestServerFixture.hpp @@ -0,0 +1,55 @@ +#pragma once + +#include +#include +#include + +#ifdef BEDROCK_WITH_TRANSPORT_DEPS +#include "palantir/PalantirServer.hpp" +#include +#include +#endif + +/** + * Integration test server fixture. + * + * Manages an in-process PalantirServer for integration testing. + * Starts server on a temporary socket and stops it on teardown. + */ +class IntegrationTestServerFixture { +public: + IntegrationTestServerFixture(); + ~IntegrationTestServerFixture(); + + /** + * Start the server on a temporary socket. + * @return true on success, false on failure + */ + bool startServer(); + + /** + * Stop the server. + */ + void stopServer(); + + /** + * Get the socket path the server is listening on. + * @return Socket path, or empty string if server not started + */ + QString socketPath() const { return socketPath_; } + + /** + * Check if server is running. + * @return true if server is running + */ + bool isRunning() const; + +private: +#ifdef BEDROCK_WITH_TRANSPORT_DEPS + std::unique_ptr app_; + std::unique_ptr server_; + QString socketPath_; + static int instanceCount_; +#endif +}; + diff --git a/tests/integration/XYSineIntegrationTest.cpp b/tests/integration/XYSineIntegrationTest.cpp new file mode 100644 index 0000000..022b30e --- /dev/null +++ b/tests/integration/XYSineIntegrationTest.cpp @@ -0,0 +1,143 @@ +#include "IntegrationTestServerFixture.hpp" +#include "IntegrationTestClient.hpp" + +#ifdef BEDROCK_WITH_TRANSPORT_DEPS +#include +#include "palantir/xysine.pb.h" +#include +#include +#include +#include +#include + +class XYSineIntegrationTest : public ::testing::Test { +protected: + void SetUp() override { + // Ensure QCoreApplication exists + if (!QCoreApplication::instance()) { + static int argc = 1; + static char* argv[] = { const_cast("integration_tests"), nullptr }; + app_ = std::make_unique(argc, argv); + } + + qDebug() << "[TEST] SetUp: Starting server fixture..."; + // Start server + ASSERT_TRUE(fixture_.startServer()) << "Failed to start test server"; + + qDebug() << "[TEST] SetUp: Server started, processing events..."; + // Give server a moment to be ready and process any pending events + QCoreApplication::processEvents(); + QThread::msleep(100); // Small delay to ensure server is fully ready + QCoreApplication::processEvents(); + qDebug() << "[TEST] SetUp: Server ready"; + } + + void TearDown() override { + fixture_.stopServer(); + QCoreApplication::processEvents(); + } + + IntegrationTestServerFixture fixture_; + std::unique_ptr app_; +}; + +TEST_F(XYSineIntegrationTest, XYSineRequestResponse) { + qDebug() << "[TEST] Starting XYSineRequestResponse test"; + + // Create client and connect + IntegrationTestClient client; + qDebug() << "[TEST] Connecting to socket:" << fixture_.socketPath(); + ASSERT_TRUE(client.connect(fixture_.socketPath())) + << "Failed to connect to test server"; + + qDebug() << "[TEST] Client connected, processing events to allow server to register client..."; + // Process events to allow connection to complete and server to register client + QCoreApplication::processEvents(); + QThread::msleep(100); // Give server time to process newConnection signal + QCoreApplication::processEvents(); + QThread::msleep(50); + QCoreApplication::processEvents(); + qDebug() << "[TEST] Events processed, ready to send request"; + + // Build XYSineRequest + palantir::XYSineRequest request; + request.set_frequency(1.0); + request.set_samples(10); + request.set_amplitude(2.0); + request.set_phase(0.0); + + qDebug() << "[TEST] Sending XYSineRequest: freq=" << request.frequency() + << ", samples=" << request.samples() + << ", amplitude=" << request.amplitude(); + + // Send request and receive response + palantir::XYSineResponse response; + QString error; + + // Use explicit timeout with event processing + bool success = false; + const int maxAttempts = 20; + const int timeoutMs = 5000; + QElapsedTimer timer; + timer.start(); + + for (int i = 0; i < maxAttempts && !success && timer.elapsed() < timeoutMs; ++i) { + qDebug() << "[TEST] Attempt" << (i + 1) << "of" << maxAttempts << ", elapsed=" << timer.elapsed() << "ms"; + QCoreApplication::processEvents(); + success = client.sendXYSineRequest(request, response, error); + if (!success && i < maxAttempts - 1) { + QThread::msleep(50); + } + } + + qDebug() << "[TEST] sendXYSineRequest result: success=" << success << ", error=" << error; + ASSERT_TRUE(success) + << "sendXYSineRequest failed: " << error.toStdString() << " (after " << timer.elapsed() << "ms)"; + + // Verify response is valid + ASSERT_TRUE(response.IsInitialized()); + + // Validate response has correct number of samples + ASSERT_EQ(response.x_size(), request.samples()) + << "Response x size should match request samples"; + ASSERT_EQ(response.y_size(), request.samples()) + << "Response y size should match request samples"; + + // Validate numeric results + // Server algorithm: t = i / (samples - 1), y = amplitude * sin(2π * frequency * t + phase) + // For i in [0, samples-1], t goes from 0 to 1 + for (int i = 0; i < request.samples(); ++i) { + double t = static_cast(i) / (request.samples() - 1.0); // 0 to 1 + double expected = request.amplitude() * std::sin(2.0 * M_PI * request.frequency() * t + request.phase()); + + EXPECT_NEAR(response.y(i), expected, 1e-9) + << "Y value at index " << i << " should match expected sine wave"; + } + + // Validate X values (should be t * 2π, from 0 to 2π) + for (int i = 0; i < request.samples(); ++i) { + double t = static_cast(i) / (request.samples() - 1.0); // 0 to 1 + double expectedX = t * 2.0 * M_PI; // 0 to 2π + + EXPECT_NEAR(response.x(i), expectedX, 1e-9) + << "X value at index " << i << " should match expected domain"; + } + + // Validate status (should be "OK" for successful computation) + // Note: In proto3, status is always present (defaults to empty string) + if (!response.status().empty()) { + EXPECT_EQ(response.status(), "OK") + << "Response status should be OK"; + } + + qDebug() << "[TEST] XYSineRequestResponse test completed successfully"; +} + +#else +// Stub when transport deps disabled +#include +TEST(XYSineIntegrationTest, DISABLED_RequiresTransportDeps) { + GTEST_SKIP() << "Integration tests require BEDROCK_WITH_TRANSPORT_DEPS=ON"; +} +#endif + diff --git a/tests/integration/integration_main.cpp b/tests/integration/integration_main.cpp new file mode 100644 index 0000000..35bde14 --- /dev/null +++ b/tests/integration/integration_main.cpp @@ -0,0 +1,26 @@ +#include + +#ifdef BEDROCK_WITH_TRANSPORT_DEPS +#include +#include +#endif + +int main(int argc, char** argv) { +#ifdef BEDROCK_WITH_TRANSPORT_DEPS + // Create QCoreApplication for Qt-based tests + QCoreApplication app(argc, argv); + + // Initialize GoogleTest + ::testing::InitGoogleTest(&argc, argv); + + // Run tests + int result = RUN_ALL_TESTS(); + + return result; +#else + // Without transport deps, just run basic GoogleTest + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +#endif +} + diff --git a/tests/palantir/EnvelopeHelpers_test.cpp b/tests/palantir/EnvelopeHelpers_test.cpp new file mode 100644 index 0000000..593b9a6 --- /dev/null +++ b/tests/palantir/EnvelopeHelpers_test.cpp @@ -0,0 +1,311 @@ +#ifdef BEDROCK_WITH_TRANSPORT_DEPS + +#include +#include "palantir/EnvelopeHelpers.hpp" +#include "palantir/capabilities.pb.h" +#include "palantir/xysine.pb.h" +#include "palantir/envelope.pb.h" + +using namespace bedrock::palantir; + +TEST(EnvelopeHelpersTest, MakeEnvelopeCapabilitiesRequest) { + palantir::CapabilitiesRequest request; + std::string error; + + auto envelope = makeEnvelope(palantir::MessageType::CAPABILITIES_REQUEST, request, {}, &error); + + ASSERT_TRUE(envelope.has_value()); + EXPECT_EQ(envelope->version(), 1u); + EXPECT_EQ(envelope->type(), palantir::MessageType::CAPABILITIES_REQUEST); + // Payload can be empty for empty messages (CapabilitiesRequest has no fields) + EXPECT_TRUE(error.empty()); +} + +TEST(EnvelopeHelpersTest, MakeEnvelopeCapabilitiesResponse) { + palantir::CapabilitiesResponse response; + auto* caps = response.mutable_capabilities(); + caps->set_server_version("test-1.0"); + caps->add_supported_features("xy_sine"); + + std::string error; + auto envelope = makeEnvelope(palantir::MessageType::CAPABILITIES_RESPONSE, response, {}, &error); + + ASSERT_TRUE(envelope.has_value()); + EXPECT_EQ(envelope->version(), 1u); + EXPECT_EQ(envelope->type(), palantir::MessageType::CAPABILITIES_RESPONSE); + EXPECT_FALSE(envelope->payload().empty()); + EXPECT_TRUE(error.empty()); +} + +TEST(EnvelopeHelpersTest, MakeEnvelopeXYSineRequest) { + palantir::XYSineRequest request; + request.set_frequency(2.0); + request.set_amplitude(1.5); + request.set_phase(0.5); + request.set_samples(100); + + std::string error; + auto envelope = makeEnvelope(palantir::MessageType::XY_SINE_REQUEST, request, {}, &error); + + ASSERT_TRUE(envelope.has_value()); + EXPECT_EQ(envelope->version(), 1u); + EXPECT_EQ(envelope->type(), palantir::MessageType::XY_SINE_REQUEST); + EXPECT_FALSE(envelope->payload().empty()); + EXPECT_TRUE(error.empty()); +} + +TEST(EnvelopeHelpersTest, MakeEnvelopeXYSineResponse) { + palantir::XYSineResponse response; + response.add_x(0.0); + response.add_x(1.0); + response.add_y(0.0); + response.add_y(1.0); + response.set_status("OK"); + + std::string error; + auto envelope = makeEnvelope(palantir::MessageType::XY_SINE_RESPONSE, response, {}, &error); + + ASSERT_TRUE(envelope.has_value()); + EXPECT_EQ(envelope->version(), 1u); + EXPECT_EQ(envelope->type(), palantir::MessageType::XY_SINE_RESPONSE); + EXPECT_FALSE(envelope->payload().empty()); + EXPECT_TRUE(error.empty()); +} + +TEST(EnvelopeHelpersTest, MakeEnvelopeWithMetadata) { + palantir::CapabilitiesRequest request; + std::map metadata; + metadata["trace_id"] = "abc123"; + metadata["client_version"] = "phoenix-0.0.4"; + + std::string error; + auto envelope = makeEnvelope(palantir::MessageType::CAPABILITIES_REQUEST, request, metadata, &error); + + ASSERT_TRUE(envelope.has_value()); + EXPECT_EQ(envelope->metadata().size(), 2u); + EXPECT_EQ(envelope->metadata().at("trace_id"), "abc123"); + EXPECT_EQ(envelope->metadata().at("client_version"), "phoenix-0.0.4"); + EXPECT_TRUE(error.empty()); +} + +TEST(EnvelopeHelpersTest, ParseEnvelopeValid) { + // Create a valid envelope + palantir::CapabilitiesRequest request; + auto envelope = makeEnvelope(palantir::MessageType::CAPABILITIES_REQUEST, request); + ASSERT_TRUE(envelope.has_value()); + + // Serialize it + std::string serialized; + ASSERT_TRUE(envelope->SerializeToString(&serialized)); + + // Parse it back + palantir::MessageEnvelope parsed; + std::string error; + + EXPECT_TRUE(parseEnvelope(serialized, parsed, &error)); + EXPECT_EQ(parsed.version(), 1u); + EXPECT_EQ(parsed.type(), palantir::MessageType::CAPABILITIES_REQUEST); + EXPECT_TRUE(error.empty()); +} + +TEST(EnvelopeHelpersTest, ParseEnvelopeInvalidVersion) { + // Create envelope with wrong version + palantir::MessageEnvelope envelope; + envelope.set_version(999); // Invalid version + envelope.set_type(palantir::MessageType::CAPABILITIES_REQUEST); + envelope.set_payload("test"); + + std::string serialized; + ASSERT_TRUE(envelope.SerializeToString(&serialized)); + + palantir::MessageEnvelope parsed; + std::string error; + + EXPECT_FALSE(parseEnvelope(serialized, parsed, &error)); + EXPECT_FALSE(error.empty()); + EXPECT_NE(error.find("Invalid protocol version"), std::string::npos); +} + +TEST(EnvelopeHelpersTest, ParseEnvelopeInvalidType) { + // Create envelope with UNSPECIFIED type (invalid) + palantir::MessageEnvelope envelope; + envelope.set_version(1); + envelope.set_type(palantir::MessageType::MESSAGE_TYPE_UNSPECIFIED); + envelope.set_payload("test"); + + std::string serialized; + ASSERT_TRUE(envelope.SerializeToString(&serialized)); + + palantir::MessageEnvelope parsed; + std::string error; + + EXPECT_FALSE(parseEnvelope(serialized, parsed, &error)); + EXPECT_FALSE(error.empty()); + EXPECT_NE(error.find("UNSPECIFIED"), std::string::npos); +} + +TEST(EnvelopeHelpersTest, ParseEnvelopeTruncated) { + // Create valid envelope + palantir::CapabilitiesRequest request; + auto envelope = makeEnvelope(palantir::MessageType::CAPABILITIES_REQUEST, request); + ASSERT_TRUE(envelope.has_value()); + + std::string serialized; + ASSERT_TRUE(envelope->SerializeToString(&serialized)); + + // Truncate the buffer significantly - remove enough to break parsing + // Protobuf is lenient, so we need to remove a substantial portion + size_t truncatedSize = serialized.size() > 20 ? serialized.size() / 2 : 1; + std::string truncated = serialized.substr(0, truncatedSize); + palantir::MessageEnvelope parsed; + std::string error; + + // parseEnvelope may succeed on partial data (protobuf is lenient), but the envelope will be malformed + // Check that either parsing fails OR the parsed envelope fails validation + bool parseResult = parseEnvelope(truncated, parsed, &error); + if (parseResult) { + // If parsing succeeded, verify the envelope fails validation (version check should catch it) + // For a severely truncated envelope, version field might be missing or wrong + EXPECT_TRUE(parsed.version() != 1 || parsed.type() == palantir::MessageType::MESSAGE_TYPE_UNSPECIFIED); + } else { + // Parsing failed as expected + EXPECT_FALSE(error.empty()); + } +} + +TEST(EnvelopeHelpersTest, ParseEnvelopeEmptyBuffer) { + std::string buffer; + palantir::MessageEnvelope parsed; + std::string error; + + EXPECT_FALSE(parseEnvelope(buffer, parsed, &error)); + EXPECT_FALSE(error.empty()); + EXPECT_NE(error.find("Empty buffer"), std::string::npos); +} + +TEST(EnvelopeHelpersTest, ParseEnvelopeUnspecifiedType) { + // Test that UNSPECIFIED type is rejected + palantir::MessageEnvelope envelope; + envelope.set_version(1); + envelope.set_type(palantir::MessageType::MESSAGE_TYPE_UNSPECIFIED); + envelope.set_payload(""); + + std::string serialized; + ASSERT_TRUE(envelope.SerializeToString(&serialized)); + + palantir::MessageEnvelope parsed; + std::string error; + + EXPECT_FALSE(parseEnvelope(serialized, parsed, &error)); + EXPECT_NE(error.find("UNSPECIFIED"), std::string::npos); +} + +TEST(EnvelopeHelpersTest, RoundTripCapabilitiesRequest) { + palantir::CapabilitiesRequest original; + + // Encode + auto envelope = makeEnvelope(palantir::MessageType::CAPABILITIES_REQUEST, original); + ASSERT_TRUE(envelope.has_value()); + + // Serialize envelope + std::string serialized; + ASSERT_TRUE(envelope->SerializeToString(&serialized)); + + // Parse envelope + palantir::MessageEnvelope parsed; + ASSERT_TRUE(parseEnvelope(serialized, parsed)); + + // Extract payload + palantir::CapabilitiesRequest decoded; + ASSERT_TRUE(decoded.ParseFromString(parsed.payload())); + + // Verify round-trip (CapabilitiesRequest is empty, so just verify it parses) + EXPECT_TRUE(decoded.IsInitialized()); +} + +TEST(EnvelopeHelpersTest, RoundTripCapabilitiesResponse) { + palantir::CapabilitiesResponse original; + auto* caps = original.mutable_capabilities(); + caps->set_server_version("test-1.0"); + caps->add_supported_features("xy_sine"); + caps->add_supported_features("heat_diffusion"); + + // Encode + auto envelope = makeEnvelope(palantir::MessageType::CAPABILITIES_RESPONSE, original); + ASSERT_TRUE(envelope.has_value()); + + // Serialize and parse + std::string serialized; + ASSERT_TRUE(envelope->SerializeToString(&serialized)); + palantir::MessageEnvelope parsed; + ASSERT_TRUE(parseEnvelope(serialized, parsed)); + + // Extract and verify + palantir::CapabilitiesResponse decoded; + ASSERT_TRUE(decoded.ParseFromString(parsed.payload())); + EXPECT_EQ(decoded.capabilities().server_version(), "test-1.0"); + EXPECT_EQ(decoded.capabilities().supported_features_size(), 2); + EXPECT_EQ(decoded.capabilities().supported_features(0), "xy_sine"); + EXPECT_EQ(decoded.capabilities().supported_features(1), "heat_diffusion"); +} + +TEST(EnvelopeHelpersTest, RoundTripXYSineRequest) { + palantir::XYSineRequest original; + original.set_frequency(2.5); + original.set_amplitude(1.8); + original.set_phase(0.3); + original.set_samples(500); + + // Encode + auto envelope = makeEnvelope(palantir::MessageType::XY_SINE_REQUEST, original); + ASSERT_TRUE(envelope.has_value()); + + // Serialize and parse + std::string serialized; + ASSERT_TRUE(envelope->SerializeToString(&serialized)); + palantir::MessageEnvelope parsed; + ASSERT_TRUE(parseEnvelope(serialized, parsed)); + + // Extract and verify + palantir::XYSineRequest decoded; + ASSERT_TRUE(decoded.ParseFromString(parsed.payload())); + EXPECT_DOUBLE_EQ(decoded.frequency(), 2.5); + EXPECT_DOUBLE_EQ(decoded.amplitude(), 1.8); + EXPECT_DOUBLE_EQ(decoded.phase(), 0.3); + EXPECT_EQ(decoded.samples(), 500); +} + +TEST(EnvelopeHelpersTest, RoundTripXYSineResponse) { + palantir::XYSineResponse original; + original.add_x(0.0); + original.add_x(1.57); + original.add_x(3.14); + original.add_y(0.0); + original.add_y(1.0); + original.add_y(0.0); + original.set_status("OK"); + + // Encode + auto envelope = makeEnvelope(palantir::MessageType::XY_SINE_RESPONSE, original); + ASSERT_TRUE(envelope.has_value()); + + // Serialize and parse + std::string serialized; + ASSERT_TRUE(envelope->SerializeToString(&serialized)); + palantir::MessageEnvelope parsed; + ASSERT_TRUE(parseEnvelope(serialized, parsed)); + + // Extract and verify + palantir::XYSineResponse decoded; + ASSERT_TRUE(decoded.ParseFromString(parsed.payload())); + EXPECT_EQ(decoded.x_size(), 3); + EXPECT_EQ(decoded.y_size(), 3); + EXPECT_DOUBLE_EQ(decoded.x(0), 0.0); + EXPECT_DOUBLE_EQ(decoded.x(1), 1.57); + EXPECT_DOUBLE_EQ(decoded.y(0), 0.0); + EXPECT_DOUBLE_EQ(decoded.y(1), 1.0); + EXPECT_EQ(decoded.status(), "OK"); +} + +#endif // BEDROCK_WITH_TRANSPORT_DEPS + From 6b085cbb29a4bcd18999336d0001311bbbc1d175 Mon Sep 17 00:00:00 2001 From: UnderLord Date: Tue, 25 Nov 2025 13:40:23 -0800 Subject: [PATCH 8/8] Sprint 4.5: update Palantir submodule to merged proto commit --- docs/palantir | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/palantir b/docs/palantir index 4b5bfad..5b625fa 160000 --- a/docs/palantir +++ b/docs/palantir @@ -1 +1 @@ -Subproject commit 4b5bfadcd210864d43583517eaa4c31fb11795cb +Subproject commit 5b625faa8693c71a4c9aa8e307f0ddc488ccb189