Skip to content

Commit feb8854

Browse files
committed
DPL: introduce function to do forwarding on insertion
1 parent 9961552 commit feb8854

File tree

1 file changed

+28
-27
lines changed

1 file changed

+28
-27
lines changed

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1680,6 +1680,33 @@ struct WaitBackpressurePolicy {
16801680
}
16811681
};
16821682

1683+
auto forwardOnInsertion(ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>& messages) -> void
1684+
{
1685+
O2_LOG_ENABLE(forwarding);
1686+
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
1687+
auto& timesliceIndex = ref.get<TimesliceIndex>();
1688+
auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput();
1689+
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for incoming data with oldestTimeslice %zu with copy.",
1690+
oldestTimeslice.timeslice.value);
1691+
auto& proxy = ref.get<FairMQDeviceProxy>();
1692+
std::vector<fair::mq::Parts> forwardedParts;
1693+
DataProcessingHelpers::routeForwardedMessages(proxy, messages, forwardedParts, true, false);
1694+
1695+
for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
1696+
if (forwardedParts[fi].Size() == 0) {
1697+
continue;
1698+
}
1699+
ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
1700+
auto& parts = forwardedParts[fi];
1701+
if (info.policy == nullptr) {
1702+
O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
1703+
continue;
1704+
}
1705+
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
1706+
info.policy->forward(parts, ChannelIndex{fi}, ref);
1707+
}
1708+
};
1709+
16831710
/// This is the inner loop of our framework. The actual implementation
16841711
/// is divided in two parts. In the first one we define a set of lambdas
16851712
/// which describe what is actually going to happen, hiding all the state
@@ -1859,7 +1886,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
18591886
input,
18601887
nMessages,
18611888
nPayloadsPerHeader,
1862-
nullptr,
1889+
forwardOnInsertion,
18631890
onDrop);
18641891
switch (relayed.type) {
18651892
case DataRelayer::RelayChoice::Type::Backpressured:
@@ -2274,34 +2301,8 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22742301
bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;
22752302

22762303
if (context.canForwardEarly && hasForwards && consumeSomething) {
2277-
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
22782304
auto& timesliceIndex = ref.get<TimesliceIndex>();
22792305
auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput();
2280-
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
2281-
bool consume = action.op == CompletionPolicy::CompletionOp::Consume;
2282-
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
2283-
action.slot.index, oldestTimeslice.timeslice.value, "with copy", consume ? " and " : "", consume ? "with consume" : "");
2284-
auto& proxy = ref.get<FairMQDeviceProxy>();
2285-
for (auto &messageSet : currentSetOfInputs) {
2286-
std::vector<fair::mq::Parts> forwardedParts;
2287-
auto span = std::span(messageSet.messages);
2288-
DataProcessingHelpers::routeForwardedMessages(proxy, span, forwardedParts, true, consume);
2289-
2290-
for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
2291-
if (forwardedParts[fi].Size() == 0) {
2292-
continue;
2293-
}
2294-
ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
2295-
auto& parts = forwardedParts[fi];
2296-
if (info.policy == nullptr) {
2297-
O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
2298-
continue;
2299-
}
2300-
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
2301-
info.policy->forward(parts, ChannelIndex{fi}, ref);
2302-
}
2303-
}
2304-
23052306
auto& asyncQueue = ref.get<AsyncQueue>();
23062307
auto& decongestion = ref.get<DecongestionService>();
23072308
O2_SIGNPOST_ID_GENERATE(aid, async_queue);

0 commit comments

Comments
 (0)