@@ -617,6 +617,20 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
617617 O2_SIGNPOST_END (forwarding, sid, " forwardInputs" , " Forwarding done" );
618618};
619619
620+ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
621+ TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true ) {
622+ auto & proxy = registry.get <FairMQDeviceProxy>();
623+
624+ O2_SIGNPOST_ID_GENERATE (sid, forwarding);
625+ O2_SIGNPOST_START (forwarding, sid, " forwardInputs" , " Cleaning up slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s" ,
626+ slot.index , oldestTimeslice.timeslice .value , copy ? " with copy" : " " , copy && consume ? " and " : " " , consume ? " with consume" : " " );
627+ // Always copy them, because we do not want to actually send them.
628+ // We merely need the side effect of the consume, if applicable.
629+ auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet (proxy, currentSetOfInputs, true , consume);
630+
631+ O2_SIGNPOST_END (forwarding, sid, " forwardInputs" , " Forwarding done" );
632+ };
633+
620634extern volatile int region_read_global_dummy_variable;
621635volatile int region_read_global_dummy_variable;
622636
@@ -1680,6 +1694,51 @@ struct WaitBackpressurePolicy {
16801694 }
16811695};
16821696
1697+ auto forwardOnInsertion (ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>& messages) -> void
1698+ {
1699+ O2_SIGNPOST_ID_GENERATE (sid, forwarding);
1700+
1701+ auto & spec = ref.get <DeviceSpec const >();
1702+ auto & context = ref.get <DataProcessorContext>();
1703+ if (!context.canForwardEarly || spec.forwards .empty ()) {
1704+ O2_SIGNPOST_EVENT_EMIT (device, sid, " device" , " Early forwardinding not enabled / needed." );
1705+ return ;
1706+ }
1707+
1708+ O2_SIGNPOST_EVENT_EMIT (device, sid, " device" , " Early forwardinding before injecting data into relayer." );
1709+ auto & timesliceIndex = ref.get <TimesliceIndex>();
1710+ auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput ();
1711+
1712+ auto & proxy = ref.get <FairMQDeviceProxy>();
1713+
1714+ O2_SIGNPOST_START (forwarding, sid, " forwardInputs" ,
1715+ " Starting forwarding for incoming messages with oldestTimeslice %zu with copy" ,
1716+ oldestTimeslice.timeslice .value );
1717+ std::vector<fair::mq::Parts> forwardedParts (proxy.getNumForwardChannels ());
1718+ DataProcessingHelpers::routeForwardedMessages (proxy, messages, forwardedParts, true , false );
1719+
1720+ for (int fi = 0 ; fi < proxy.getNumForwardChannels (); fi++) {
1721+ if (forwardedParts[fi].Size () == 0 ) {
1722+ continue ;
1723+ }
1724+ ForwardChannelInfo info = proxy.getForwardChannelInfo (ChannelIndex{fi});
1725+ auto & parts = forwardedParts[fi];
1726+ if (info.policy == nullptr ) {
1727+ O2_SIGNPOST_EVENT_EMIT_ERROR (forwarding, sid, " forwardInputs" , " Forwarding to %{public}s %d has no policy." , info.name .c_str (), fi);
1728+ continue ;
1729+ }
1730+ O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding to %{public}s %d" , info.name .c_str (), fi);
1731+ info.policy ->forward (parts, ChannelIndex{fi}, ref);
1732+ }
1733+ auto & asyncQueue = ref.get <AsyncQueue>();
1734+ auto & decongestion = ref.get <DecongestionService>();
1735+ O2_SIGNPOST_ID_GENERATE (aid, async_queue);
1736+ O2_SIGNPOST_EVENT_EMIT (async_queue, aid, " forwardInputs" , " Queuing forwarding oldestPossible %zu" , oldestTimeslice.timeslice .value );
1737+ AsyncQueueHelpers::post (asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice , .id = decongestion.oldestPossibleTimesliceTask , .debounce = -1 , .callback = decongestionCallbackLate}
1738+ .user <DecongestionContext>({.ref = ref, .oldestTimeslice = oldestTimeslice}));
1739+ O2_SIGNPOST_END (forwarding, sid, " forwardInputs" , " Forwarding done" );
1740+ };
1741+
16831742// / This is the inner loop of our framework. The actual implementation
16841743// / is divided in two parts. In the first one we define a set of lambdas
16851744// / which describe what is actually going to happen, hiding all the state
@@ -1854,12 +1913,13 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
18541913 VariableContextHelpers::getTimeslice (variables);
18551914 forwardInputs (ref, slot, dropped, oldestOutputInfo, false , true );
18561915 };
1916+
18571917 auto relayed = relayer.relay (parts.At (headerIndex)->GetData (),
18581918 &parts.At (headerIndex),
18591919 input,
18601920 nMessages,
18611921 nPayloadsPerHeader,
1862- nullptr ,
1922+ forwardOnInsertion ,
18631923 onDrop);
18641924 switch (relayed.type ) {
18651925 case DataRelayer::RelayChoice::Type::Backpressured:
@@ -2274,9 +2334,16 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22742334 bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;
22752335
22762336 if (context.canForwardEarly && hasForwards && consumeSomething) {
2277- O2_SIGNPOST_EVENT_EMIT (device, aid, " device" , " Early forwainding: %{public}s." , fmt::format (" {}" , action.op ).c_str ());
2337+ // We used to do fowarding here, however we now do it much earlier.
2338+ // We still need to clean the inputs which were already consumed
2339+ // via ConsumeExisting and which still have an header to hold the slot.
2340+ // FIXME: do we? This should really happen when we do the forwarding on
2341+ // insertion, because otherwise we lose the relevant information on how to
2342+ // navigate the set of headers. We could actually rely on the messageset index,
2343+ // is that the right thing to do though?
2344+ O2_SIGNPOST_EVENT_EMIT (device, aid, " device" , " cleaning early forwarding: %{public}s." , fmt::format (" {}" , action.op ).c_str ());
22782345 auto & timesliceIndex = ref.get <TimesliceIndex>();
2279- forwardInputs (ref, action.slot , currentSetOfInputs, timesliceIndex.getOldestPossibleOutput (), true , action.op == CompletionPolicy::CompletionOp::Consume);
2346+ cleanEarlyForward (ref, action.slot , currentSetOfInputs, timesliceIndex.getOldestPossibleOutput (), true , action.op == CompletionPolicy::CompletionOp::Consume);
22802347 }
22812348 markInputsAsDone (action.slot );
22822349
0 commit comments