@@ -221,129 +221,101 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
221221 }
222222}
223223
224- static auto toBeForwardedHeader = [](void* header) -> bool {
225- // If is now possible that the record is not complete when
226- // we forward it, because of a custom completion policy.
227- // this means that we need to skip the empty entries in the
228- // record for being forwarded.
229- if (header == nullptr) {
230- return false;
231- }
232- auto dh = o2::header::get<header::DataHeader*>(header);
233- if (!dh) {
234- return false;
235- }
236- bool retval = !o2::header::get<SourceInfoHeader*>(header) &&
237- !o2::header::get<DomainInfoHeader*>(header) &&
238- o2::header::get<DataProcessingHeader*>(header);
239- // DataHeader is there. Complain if we have unexpected headers present / missing
240- if (!retval) {
241- LOGP(error, "Dropping data because of malformed header structure");
242- }
243- return retval;
244- };
245-
246- static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
247- FairMQDeviceProxy& proxy,
248- std::unique_ptr<fair::mq::Message>& header,
249- std::unique_ptr<fair::mq::Message>& payload,
250- size_t total,
251- bool consume) {
252- if (header.get() == nullptr) {
253- // Missing an header is not an error anymore.
254- // it simply means that we did not receive the
255- // given input, but we were asked to
256- // consume existing, so we skip it.
257- return false;
258- }
259- if (payload.get() == nullptr && consume == true) {
260- // If the payload is not there, it means we already
261- // processed it with ConsumeExisiting. Therefore we
262- // need to do something only if this is the last consume.
263- header.reset(nullptr);
264- return false;
265- }
266-
267- auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
268- if (fdph == nullptr) {
269- LOG(error) << "Data is missing DataProcessingHeader";
270- return false;
271- }
272- auto fdh = o2::header::get<header::DataHeader*>(header->GetData());
273- if (fdh == nullptr) {
274- LOG(error) << "Data is missing DataHeader";
275- return false;
276- }
277-
278- // We need to find the forward route only for the first
279- // part of a split payload. All the others will use the same.
280- // but always check if we have a sequence of multiple payloads
281- if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
282- proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
283- }
284- return cachedForwardingChoices.empty() == false;
285- };
286-
287- std::vector<fair::mq::Parts> DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
288- TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
224+ auto DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot,
225+ std::vector<MessageSet>& currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice,
226+ const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
289227{
290228 // we collect all messages per forward in a map and send them together
291229 std::vector<fair::mq::Parts> forwardedParts;
292230 forwardedParts.resize(proxy.getNumForwards());
293- std::vector<ChannelIndex> cachedForwardingChoices {};
231+ std::vector<ChannelIndex> forwardingChoices {};
294232 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
295233 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
296- slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
234+ slot.index, oldestTimeslice.timeslice.value, copyByDefault ? "with copy" : "", copyByDefault && consume ? " and " : "", consume ? "with consume" : "");
297235
298236 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
299237 auto& messageSet = currentSetOfInputs[ii];
300- // In case the messageSet is empty, there is nothing to be done.
301- if (messageSet.size() == 0) {
302- continue;
303- }
304- if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
305- continue;
306- }
307- cachedForwardingChoices.clear();
308238
309- for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
310- auto& messageSet = currentSetOfInputs[ii];
239+ for (size_t pi = 0; pi < messageSet.size(); ++pi) {
311240 auto& header = messageSet.header(pi);
241+
242+ // If is now possible that the record is not complete when
243+ // we forward it, because of a custom completion policy.
244+ // this means that we need to skip the empty entries in the
245+ // record for being forwarded.
246+ if (header->GetData() == nullptr) {
247+ continue;
248+ }
249+ auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
250+ if (dih) {
251+ continue;
252+ }
253+ auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
254+ if (sih) {
255+ continue;
256+ }
257+
258+ auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
259+ auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
260+
261+ if (dph == nullptr || dh == nullptr) {
262+ // Complain only if this is not an out-of-band message
263+ LOGP(error, "Data is missing {}{}{}",
264+ dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
265+ continue;
266+ }
267+
312268 auto& payload = messageSet.payload(pi);
313- auto total = messageSet.getNumberOfPayloads(pi);
314269
315- if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
270+ if (payload.get() == nullptr && consume == true) {
271+ // If the payload is not there, it means we already
272+ // processed it with ConsumeExisiting. Therefore we
273+ // need to do something only if this is the last consume.
274+ header.reset(nullptr);
316275 continue;
317276 }
318277
319- // In case of more than one forward route, we need to copy the message.
320- // This will eventually use the same mamory if running with the same backend.
321- if (cachedForwardingChoices.size() > 1) {
322- copy = true;
278+ // We need to find the forward route only for the first
279+ // part of a split payload. All the others will use the same.
280+ // Therefore, we reset and recompute the forwarding choice:
281+ //
282+ // - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
283+ // which is actually always created and handled together
284+ // - If the message is not a multipart (splitPayloadParts 0) or has only one part
285+ // - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
286+ // we will already use the same choice in the for loop below.
287+ if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) {
288+ forwardingChoices.clear();
289+ proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
323290 }
324- auto* dh = o2::header::get<header::DataHeader*>(header->GetData());
325- auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
326291
327- if (copy) {
328- for (auto& cachedForwardingChoice : cachedForwardingChoices) {
292+ if (forwardingChoices.empty()) {
293+ // Nothing to forward go to the next messageset
294+ continue;
295+ }
296+
297+ // In case of more than one forward route, we need to copy the message.
298+ // This will eventually use the same memory if running with the same backend.
299+ if (copyByDefault || forwardingChoices.size()) {
300+ for (auto& choice : forwardingChoices) {
329301 auto&& newHeader = header->GetTransport()->CreateMessage();
330302 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
331- fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice .value);
303+ fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice .value);
332304 newHeader->Copy(*header);
333- forwardedParts[cachedForwardingChoice .value].AddPart(std::move(newHeader));
305+ forwardedParts[choice .value].AddPart(std::move(newHeader));
334306
335307 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
336308 auto&& newPayload = header->GetTransport()->CreateMessage();
337309 newPayload->Copy(*messageSet.payload(pi, payloadIndex));
338- forwardedParts[cachedForwardingChoice .value].AddPart(std::move(newPayload));
310+ forwardedParts[choice .value].AddPart(std::move(newPayload));
339311 }
340312 }
341313 } else {
342314 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
343- fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices .back().value);
344- forwardedParts[cachedForwardingChoices .back().value].AddPart(std::move(messageSet.header(pi)));
315+ fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices .back().value);
316+ forwardedParts[forwardingChoices .back().value].AddPart(std::move(messageSet.header(pi)));
345317 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
346- forwardedParts[cachedForwardingChoices .back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
318+ forwardedParts[forwardingChoices .back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
347319 }
348320 }
349321 }
0 commit comments