Skip to content

Commit e2f3588

Browse files
committed
DPL: add callback when inserting in the slot
1 parent 0ee51a0 commit e2f3588

File tree

3 files changed

+13
-2
lines changed

3 files changed

+13
-2
lines changed

Framework/Core/include/Framework/DataRelayer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ class DataRelayer
114114

115115
using OnDropCallback = std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo info)>;
116116

117+
// Callback for when some messages are about to be owned by the the DataRelayer
118+
using OnInsertionCallback = std::function<void(ServiceRegistryRef&, std::span<fair::mq::MessagePtr>&)>;
119+
117120
/// Prune all the pending entries in the cache.
118121
void prunePending(OnDropCallback);
119122
/// Prune the cache for a given slot
@@ -135,6 +138,7 @@ class DataRelayer
135138
InputInfo const& info,
136139
size_t nMessages,
137140
size_t nPayloads = 1,
141+
OnInsertionCallback onInsertion = nullptr,
138142
OnDropCallback onDrop = nullptr);
139143

140144
/// This is to set the oldest possible @a timeslice this relayer can

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,6 +1859,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
18591859
input,
18601860
nMessages,
18611861
nPayloadsPerHeader,
1862+
nullptr,
18621863
onDrop);
18631864
switch (relayed.type) {
18641865
case DataRelayer::RelayChoice::Type::Backpressured:

Framework/Core/src/DataRelayer.cxx

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,8 @@ DataRelayer::RelayChoice
436436
InputInfo const& info,
437437
size_t nMessages,
438438
size_t nPayloads,
439-
std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo)> onDrop)
439+
OnInsertionCallback onInsertion,
440+
OnDropCallback onDrop)
440441
{
441442
std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
442443
DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
@@ -482,6 +483,7 @@ DataRelayer::RelayChoice
482483
&messages,
483484
&nMessages,
484485
&nPayloads,
486+
&onInsertion,
485487
&cache = mCache,
486488
&services = mContext,
487489
numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
@@ -512,7 +514,11 @@ DataRelayer::RelayChoice
512514
mi += nPayloads;
513515
continue;
514516
}
515-
target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1);
517+
auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
518+
if (onInsertion) {
519+
onInsertion(services, span);
520+
}
521+
target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1);
516522
mi += nPayloads;
517523
saved += nPayloads;
518524
}

0 commit comments

Comments
 (0)