Skip to content

Commit a7145a6

Browse files
committed
[occ] Fix large cfg payloads cut off because of gRPC slices
1 parent df7d824 commit a7145a6

File tree

4 files changed

+54
-38
lines changed

4 files changed

+54
-38
lines changed

occ/plugin/OccFMQCommon.cxx

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ std::tuple<OccLite::nopb::TransitionResponse, ::grpc::Status> doTransition(fair:
7777
auto onDeviceStateChange = [&](fair::mq::PluginServices::DeviceState reachedState) {
7878
// CONFIGURE arguments must be pushed during InitializingDevice
7979
if (reachedState == fair::mq::PluginServices::DeviceState::InitializingDevice) {
80+
OLOG(debug) << "INITIALIZING_DEVICE reached, pushing configuration parameters";
8081

8182
// FIXME: workaround which special cases a stoi for certain properties
8283
// which must be pushed as int.
@@ -92,38 +93,42 @@ std::tuple<OccLite::nopb::TransitionResponse, ::grpc::Status> doTransition(fair:
9293
for (auto it = arguments.cbegin(); it != arguments.cend(); ++it) {
9394
std::string key = it->key;
9495
std::string value = it->value;
95-
if (boost::starts_with(key, "chans.")) {
96-
std::vector<std::string> split;
97-
boost::split(split, key, std::bind(std::equal_to<>(), '.', std::placeholders::_1));
98-
if (std::find(intKeys.begin(), intKeys.end(), split.back()) != intKeys.end()) {
99-
auto intValue = std::stoi(value);
100-
m_pluginServices->SetProperty(key, intValue);
101-
OLOG(debug) << "SetProperty(chan int) called " << key << ":" << intValue;
102-
}
103-
else {
96+
try {
97+
if (boost::starts_with(key, "chans.")) {
98+
std::vector<std::string> split;
99+
boost::split(split, key, std::bind(std::equal_to<>(), '.', std::placeholders::_1));
100+
if (std::find(intKeys.begin(), intKeys.end(), split.back()) != intKeys.end()) {
101+
auto intValue = std::stoi(value);
102+
m_pluginServices->SetProperty(key, intValue);
103+
OLOG(debug) << "SetProperty(chan int) called " << key << ":" << intValue;
104+
} else {
105+
m_pluginServices->SetProperty(key, value);
106+
OLOG(debug) << "SetProperty(chan string) called " << key << ":" << value;
107+
}
108+
} else if (boost::starts_with(key, "__ptree__:")) {
109+
// we need to ptreefy whatever payload we got under this kind of key, on a best-effort basis
110+
auto[newKey, newValue] = propMapEntryToPtree(key, value);
111+
if (newKey ==
112+
key) { // Means something went wrong and the called function already printed out the message
113+
continue;
114+
}
115+
116+
m_pluginServices->SetProperty(newKey, newValue);
117+
OLOG(debug) << "SetProperty(ptree) called " << newKey << ":" << value;
118+
} else { // default case, 1 k-v ==> 1 SetProperty
104119
m_pluginServices->SetProperty(key, value);
105-
OLOG(debug) << "SetProperty(chan string) called " << key << ":" << value;
106-
}
107-
}
108-
else if (boost::starts_with(key, "__ptree__:")) {
109-
// we need to ptreefy whatever payload we got under this kind of key, on a best-effort basis
110-
auto [newKey, newValue] = propMapEntryToPtree(key, value);
111-
if (newKey == key) { // Means something went wrong and the called function already printed out the message
112-
continue;
120+
OLOG(debug) << "SetProperty(string) called " << key << ":" << value;
113121
}
114-
115-
m_pluginServices->SetProperty(newKey, newValue);
116-
OLOG(debug) << "SetProperty(ptree) called " << newKey << ":" << value;
117122
}
118-
else { // default case, 1 k-v ==> 1 SetProperty
119-
m_pluginServices->SetProperty(key, value);
120-
OLOG(debug) << "SetProperty(string) called " << key << ":" << value;
123+
catch (std::runtime_error &e) {
124+
OLOG(warning) << "SetProperty call failed for key " + key + " with reason: " << e.what();
121125
}
122126
}
123127
auto chInfo = m_pluginServices->GetPropertiesAsStringStartingWith("chans.");
124128
for (auto it = chInfo.cbegin(); it != chInfo.cend(); ++it) {
125-
OLOG(debug) << "Written chan cfg: " << it->first << ":" << it->second;
129+
OLOG(debug) << "Written chan cfg: " << it->first << ": " << it->second;
126130
}
131+
OLOG(debug) << "INITIALIZING_DEVICE configuration push DONE";
127132
}
128133

129134
std::unique_lock<std::mutex> lk(cv_mu);

occ/plugin/OccLiteServer.cxx

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ ::grpc::Status OccLite::Service::GetState(::grpc::ServerContext* context,
7979
(void) context;
8080
(void) request;
8181

82+
OLOG(info) << "Incoming GetState request: " << request->JsonMessage::Serialize();
83+
8284
auto state = fair::mq::PluginServices::ToStr(m_pluginServices->GetCurrentDeviceState());
8385
pid_t pid = getpid();
8486

@@ -93,16 +95,19 @@ ::grpc::Status OccLite::Service::Transition(::grpc::ServerContext* context,
9395
const OccLite::nopb::TransitionRequest* request,
9496
OccLite::nopb::TransitionResponse* response)
9597
{
96-
OLOG(info) << "Incoming Transition request:" << request->JsonMessage::Serialize();
98+
OLOG(info) << "Incoming Transition request: " << request->JsonMessage::Serialize();
9799

98100
auto transitionOutcome = doTransition(m_pluginServices, *request);
99101
::grpc::Status grpcStatus = std::get<1>(transitionOutcome);
100102
if (!grpcStatus.ok()) {
101-
return grpcStatus;
103+
OLOG(error) << "Transition failed with error: " << grpcStatus.error_code() << " " << grpcStatus.error_message() << " " << grpcStatus.error_details();
104+
return grpc::Status::CANCELLED;
102105
}
103106

104107
auto nopbResponse = std::get<0>(transitionOutcome);
105108
*response = nopbResponse;
109+
OLOG(info) << "Transition response: " << response->state << "ok: " << response->ok;
110+
106111
return grpc::Status::OK;
107112
}
108113

occ/plugin/litestructs/JsonMessage.cxx

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include "JsonMessage.h"
2626

27+
2728
std::string OccLite::nopb::JsonMessage::Serialize() const
2829
{
2930
rapidjson::StringBuffer ss;
@@ -59,20 +60,29 @@ bool OccLite::nopb::JsonMessage::Deserialize(::grpc::ByteBuffer* byte_buffer)
5960
{
6061
auto slices = new std::vector<::grpc::Slice>;
6162
auto status = byte_buffer->Dump(slices);
62-
if (slices->size() != 1) return false;
6363

64-
auto rawSlice = (*slices)[0].c_slice();
65-
std::string str = grpc::StringFromCopiedSlice(rawSlice);
66-
::grpc::g_core_codegen_interface->grpc_slice_unref(rawSlice);
67-
OLOG(info) << "Deserializing Message:\n" << str;
64+
if (!status.ok()) {
65+
OLOG(info) << "Cannot dump JsonMessage slices, error code " << status.error_code() << " " << status.error_message() << " " << status.error_details();
66+
return false;
67+
}
68+
69+
std::stringstream ss;
70+
for (auto sl = slices->cbegin(); sl != slices->cend(); sl++) {
71+
auto rawSlice = sl->c_slice();
72+
std::string str = grpc::StringFromCopiedSlice(rawSlice);
73+
ss << str;
74+
::grpc::g_core_codegen_interface->grpc_slice_unref(rawSlice);
75+
}
76+
77+
OLOG(info) << "Deserialized JsonMessage: " << ss.str();
6878

69-
return Deserialize(str);
79+
return Deserialize(ss.str());
7080
}
7181

7282
::grpc::ByteBuffer* OccLite::nopb::JsonMessage::SerializeToByteBuffer() const
7383
{
7484
std::string str = Serialize();
75-
OLOG(info) << "Serialized Message:\n" << str;
85+
OLOG(info) << "Serialized JsonMessage: " << str;
7686

7787
// grpc::string = std::string
7888
// We build a Slice(grpc::string) and we add it to the ByteBuffer

occ/plugin/litestructs/Transition.cxx

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,8 @@ bool OccLite::nopb::TransitionRequest::Serialize(rapidjson::Writer<rapidjson::St
4141

4242
bool OccLite::nopb::TransitionRequest::Deserialize(const rapidjson::Value& obj)
4343
{
44-
OLOG(info) << "Deserializing TransitionRequest";
45-
4644
srcState = obj["srcState"].GetString();
4745
transitionEvent = obj["transitionEvent"].GetString();
48-
OLOG(info) << "state and transitionEvent ok";
4946

5047
if (obj.HasMember("arguments")) {
5148
auto array = obj["arguments"].GetArray();
@@ -59,8 +56,7 @@ bool OccLite::nopb::TransitionRequest::Deserialize(const rapidjson::Value& obj)
5956
arguments.push_back(*ce);
6057
}
6158
}
62-
OLOG(info) << "Deserialized TransitionRequest:";
63-
OLOG(info) << JsonMessage::Serialize();
59+
OLOG(info) << "Deserialized TransitionRequest: " << JsonMessage::Serialize();
6460

6561
return true;
6662
}

0 commit comments

Comments
 (0)