@@ -617,6 +617,20 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
617617 O2_SIGNPOST_END (forwarding, sid, " forwardInputs" , " Forwarding done" );
618618};
619619
620+ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
621+ TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true ) {
622+ auto & proxy = registry.get <FairMQDeviceProxy>();
623+
624+ O2_SIGNPOST_ID_GENERATE (sid, forwarding);
625+ O2_SIGNPOST_START (forwarding, sid, " forwardInputs" , " Cleaning up slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s" ,
626+ slot.index , oldestTimeslice.timeslice .value , copy ? " with copy" : " " , copy && consume ? " and " : " " , consume ? " with consume" : " " );
627+ // Always copy them, because we do not want to actually send them.
628+ // We merely need the side effect of the consume, if applicable.
629+ auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet (proxy, currentSetOfInputs, true , consume);
630+
631+ O2_SIGNPOST_END (forwarding, sid, " forwardInputs" , " Forwarding done" );
632+ };
633+
620634extern volatile int region_read_global_dummy_variable;
621635volatile int region_read_global_dummy_variable;
622636
@@ -1680,6 +1694,53 @@ struct WaitBackpressurePolicy {
16801694 }
16811695};
16821696
1697+ auto forwardOnInsertion (ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>& messages) -> void
1698+ {
1699+ O2_LOG_ENABLE (forwarding);
1700+ O2_SIGNPOST_ID_GENERATE (sid, forwarding);
1701+
1702+ auto & spec = ref.get <DeviceSpec const >();
1703+ auto & context = ref.get <DataProcessorContext>();
1704+ if (!context.canForwardEarly || spec.forwards .empty ()) {
1705+ O2_SIGNPOST_EVENT_EMIT (device, sid, " device" , " Early forwardinding not enabled / needed." );
1706+ return ;
1707+ }
1708+
1709+ O2_SIGNPOST_EVENT_EMIT (device, sid, " device" , " Early forwardinding before injecting data into relayer." );
1710+ auto & timesliceIndex = ref.get <TimesliceIndex>();
1711+ auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput ();
1712+
1713+ auto & proxy = ref.get <FairMQDeviceProxy>();
1714+
1715+ O2_SIGNPOST_START (forwarding, sid, " forwardInputs" ,
1716+ " Starting forwarding for incoming messages with oldestTimeslice %zu with copy" ,
1717+ oldestTimeslice.timeslice .value );
1718+ std::vector<fair::mq::Parts> forwardedParts (proxy.getNumForwardChannels ());
1719+ DataProcessingHelpers::routeForwardedMessages (proxy, messages, forwardedParts, true , false );
1720+
1721+ for (int fi = 0 ; fi < proxy.getNumForwardChannels (); fi++) {
1722+ if (forwardedParts[fi].Size () == 0 ) {
1723+ continue ;
1724+ }
1725+ ForwardChannelInfo info = proxy.getForwardChannelInfo (ChannelIndex{fi});
1726+ auto & parts = forwardedParts[fi];
1727+ if (info.policy == nullptr ) {
1728+ O2_SIGNPOST_EVENT_EMIT_ERROR (forwarding, sid, " forwardInputs" , " Forwarding to %{public}s %d has no policy." , info.name .c_str (), fi);
1729+ continue ;
1730+ }
1731+ O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding to %{public}s %d" , info.name .c_str (), fi);
1732+ info.policy ->forward (parts, ChannelIndex{fi}, ref);
1733+ }
1734+ auto & asyncQueue = ref.get <AsyncQueue>();
1735+ auto & decongestion = ref.get <DecongestionService>();
1736+ O2_SIGNPOST_ID_GENERATE (aid, async_queue);
1737+ O2_SIGNPOST_EVENT_EMIT (async_queue, aid, " forwardInputs" , " Queuing forwarding oldestPossible %zu" , oldestTimeslice.timeslice .value );
1738+ AsyncQueueHelpers::post (asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice , .id = decongestion.oldestPossibleTimesliceTask , .debounce = -1 , .callback = decongestionCallbackLate}
1739+ .user <DecongestionContext>({.ref = ref, .oldestTimeslice = oldestTimeslice}));
1740+ O2_SIGNPOST_END (forwarding, sid, " forwardInputs" , " Forwarding done" );
1741+ O2_LOG_DISABLE (forwarding);
1742+ };
1743+
16831744// / This is the inner loop of our framework. The actual implementation
16841745// / is divided in two parts. In the first one we define a set of lambdas
16851746// / which describe what is actually going to happen, hiding all the state
@@ -1854,12 +1915,13 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
18541915 VariableContextHelpers::getTimeslice (variables);
18551916 forwardInputs (ref, slot, dropped, oldestOutputInfo, false , true );
18561917 };
1918+
18571919 auto relayed = relayer.relay (parts.At (headerIndex)->GetData (),
18581920 &parts.At (headerIndex),
18591921 input,
18601922 nMessages,
18611923 nPayloadsPerHeader,
1862- nullptr ,
1924+ forwardOnInsertion ,
18631925 onDrop);
18641926 switch (relayed.type ) {
18651927 case DataRelayer::RelayChoice::Type::Backpressured:
@@ -2274,9 +2336,16 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22742336 bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;
22752337
22762338 if (context.canForwardEarly && hasForwards && consumeSomething) {
2277- O2_SIGNPOST_EVENT_EMIT (device, aid, " device" , " Early forwainding: %{public}s." , fmt::format (" {}" , action.op ).c_str ());
2339+ // We used to do fowarding here, however we now do it much earlier.
2340+ // We still need to clean the inputs which were already consumed
2341+ // via ConsumeExisting and which still have an header to hold the slot.
2342+ // FIXME: do we? This should really happen when we do the forwarding on
2343+ // insertion, because otherwise we lose the relevant information on how to
2344+ // navigate the set of headers. We could actually rely on the messageset index,
2345+ // is that the right thing to do though?
2346+ O2_SIGNPOST_EVENT_EMIT (device, aid, " device" , " cleaning early forwarding: %{public}s." , fmt::format (" {}" , action.op ).c_str ());
22782347 auto & timesliceIndex = ref.get <TimesliceIndex>();
2279- forwardInputs (ref, action.slot , currentSetOfInputs, timesliceIndex.getOldestPossibleOutput (), true , action.op == CompletionPolicy::CompletionOp::Consume);
2348+ cleanEarlyForward (ref, action.slot , currentSetOfInputs, timesliceIndex.getOldestPossibleOutput (), true , action.op == CompletionPolicy::CompletionOp::Consume);
22802349 }
22812350 markInputsAsDone (action.slot );
22822351
0 commit comments