Skip to content

Commit 9961552

Browse files
committed
DPL: split early forward function so that it does not depend on MessageSet
1 parent 1afdd6c commit 9961552

File tree

1 file changed

+33
-1
lines changed

1 file changed

+33
-1
lines changed

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2276,7 +2276,39 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22762276
if (context.canForwardEarly && hasForwards && consumeSomething) {
22772277
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
22782278
auto& timesliceIndex = ref.get<TimesliceIndex>();
2279-
forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2279+
auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput();
2280+
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
2281+
bool consume = action.op == CompletionPolicy::CompletionOp::Consume;
2282+
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
2283+
action.slot.index, oldestTimeslice.timeslice.value, "with copy", consume ? " and " : "", consume ? "with consume" : "");
2284+
auto& proxy = ref.get<FairMQDeviceProxy>();
2285+
for (auto &messageSet : currentSetOfInputs) {
2286+
std::vector<fair::mq::Parts> forwardedParts;
2287+
auto span = std::span(messageSet.messages);
2288+
DataProcessingHelpers::routeForwardedMessages(proxy, span, forwardedParts, true, consume);
2289+
2290+
for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
2291+
if (forwardedParts[fi].Size() == 0) {
2292+
continue;
2293+
}
2294+
ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
2295+
auto& parts = forwardedParts[fi];
2296+
if (info.policy == nullptr) {
2297+
O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
2298+
continue;
2299+
}
2300+
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
2301+
info.policy->forward(parts, ChannelIndex{fi}, ref);
2302+
}
2303+
}
2304+
2305+
auto& asyncQueue = ref.get<AsyncQueue>();
2306+
auto& decongestion = ref.get<DecongestionService>();
2307+
O2_SIGNPOST_ID_GENERATE(aid, async_queue);
2308+
O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
2309+
AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
2310+
.user<DecongestionContext>({.ref = ref, .oldestTimeslice = oldestTimeslice}));
2311+
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
22802312
}
22812313
markInputsAsDone(action.slot);
22822314

0 commit comments

Comments
 (0)