Skip to content

Commit c562ccb

Browse files
committed
DPL: add helper to clean consumed headers
1 parent ea23c37 commit c562ccb

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ struct DataProcessingHelpers {
5555
/// Helper to route messages for forwarding
5656
static std::vector<fair::mq::Parts> routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
5757
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume);
58+
static void cleanForwardedMessageSet(std::vector<MessageSet>& currentSetOfInputs);
5859
};
5960
} // namespace o2::framework
6061
#endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,46 @@ std::vector<fair::mq::Parts> DataProcessingHelpers::routeForwardedMessages(FairM
349349
}
350350
}
351351
return forwardedParts;
352-
};
352+
}
353+
354+
void DataProcessingHelpers::cleanForwardedMessageSet(std::vector<MessageSet>& currentSetOfInputs)
355+
{
356+
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
357+
auto messages = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
358+
size_t pi = 0;
359+
while (pi < messages.size()) {
360+
auto& header = messages[pi];
361+
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
362+
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
363+
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
364+
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
365+
if (header->GetData() == nullptr || sih || dih || dph == nullptr || dh == nullptr) {
366+
pi += 2;
367+
continue;
368+
}
369+
370+
// At least one payload.
371+
auto& payload = messages[pi + 1];
372+
373+
if (payload.get() == nullptr) {
374+
// If the payload is not there, it means we already
375+
// processed it with ConsumeExisiting. Therefore we
376+
// need to do something only if this is the last consume.
377+
header.reset(nullptr);
378+
}
379+
380+
// Calculate the number of messages which should be handled together
381+
// all in one go.
382+
if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
383+
// Sequence of (header, payload[0], ... , payload[splitPayloadParts - 2]) pairs belonging together.
384+
pi += dh->splitPayloadParts + 1;
385+
} else {
386+
// Sequence of splitPayloadParts (header, payload) pairs belonging together.
387+
// In case splitPayloadParts = 0, we consider this as a single message pair
388+
pi += (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
389+
}
390+
}
391+
}
392+
}
353393

354394
} // namespace o2::framework

0 commit comments

Comments
 (0)