Skip to content

Commit 23eaaa8

Browse files
committed
DPL: earlier forwarding
This anticipates the forwarding to the earliest possible moment, i.e. when we are about to insert the messages in a slot. This is the earliest moment we can guarantee messages will be seen only once.
1 parent cad5746 commit 23eaaa8

File tree

3 files changed

+49
-14
lines changed

3 files changed

+49
-14
lines changed

Framework/Core/include/Framework/DataProcessingContext.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ struct ServiceRegistry;
2323
struct DataAllocator;
2424
struct DataProcessorSpec;
2525

26+
enum struct ForwardPolicy {
27+
AtInjection,
28+
AtCompletionPolicySatisified,
29+
AfterProcessing
30+
};
31+
2632
struct DataProcessorContext {
2733
DataProcessorContext(DataProcessorContext const&) = delete;
2834
DataProcessorContext() = default;
@@ -122,7 +128,7 @@ struct DataProcessorContext {
122128
mutable std::vector<ServicePreLoopHandle> preLoopHandles;
123129

124130
/// Wether or not the associated DataProcessor can forward things early
125-
bool canForwardEarly = true;
131+
ForwardPolicy forwardPolicy = ForwardPolicy::AtInjection;
126132
bool isSink = false;
127133
bool balancingInputs = true;
128134

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,36 +1050,51 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
10501050
};
10511051
}
10521052

1053-
auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> bool {
1053+
auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> ForwardPolicy {
1054+
ForwardPolicy defaultEarlyForwardPolicy = getenv("DPL_OLD_EARLY_FORWARD") ? ForwardPolicy::AtCompletionPolicySatisified : ForwardPolicy::AtInjection;
1055+
10541056
/// We must make sure there is no optional
10551057
/// if we want to optimize the forwarding
1056-
bool canForwardEarly = (spec.forwards.empty() == false) && deviceContext.processingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1058+
ForwardPolicy forwardPolicy = defaultEarlyForwardPolicy;
1059+
if (spec.forwards.empty() == false) {
1060+
switch (deviceContext.processingPolicies.earlyForward) {
1061+
case o2::framework::EarlyForwardPolicy::NEVER:
1062+
forwardPolicy = ForwardPolicy::AfterProcessing;
1063+
break;
1064+
case o2::framework::EarlyForwardPolicy::ALWAYS:
1065+
forwardPolicy = defaultEarlyForwardPolicy;
1066+
break;
1067+
case o2::framework::EarlyForwardPolicy::NORAW:
1068+
forwardPolicy = defaultEarlyForwardPolicy;
1069+
break;
1070+
}
1071+
}
10571072
bool onlyConditions = true;
10581073
bool overriddenEarlyForward = false;
10591074
for (auto& forwarded : spec.forwards) {
10601075
if (forwarded.matcher.lifetime != Lifetime::Condition) {
10611076
onlyConditions = false;
10621077
}
10631078
if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && deviceContext.processingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
1064-
context.canForwardEarly = false;
1079+
forwardPolicy = ForwardPolicy::AfterProcessing;
10651080
overriddenEarlyForward = true;
10661081
LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
10671082
break;
10681083
}
10691084
if (forwarded.matcher.lifetime == Lifetime::Optional) {
1070-
context.canForwardEarly = false;
1085+
forwardPolicy = ForwardPolicy::AfterProcessing;
10711086
overriddenEarlyForward = true;
10721087
LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
10731088
break;
10741089
}
10751090
}
10761091
if (!overriddenEarlyForward && onlyConditions) {
1077-
context.canForwardEarly = true;
1092+
forwardPolicy = defaultEarlyForwardPolicy;
10781093
LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
10791094
}
1080-
return canForwardEarly;
1095+
return forwardPolicy;
10811096
};
1082-
context.canForwardEarly = decideEarlyForward();
1097+
context.forwardPolicy = decideEarlyForward();
10831098
}
10841099

10851100
void DataProcessingDevice::PreRun()
@@ -1700,7 +1715,7 @@ auto forwardOnInsertion(ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>
17001715

17011716
auto& spec = ref.get<DeviceSpec const>();
17021717
auto& context = ref.get<DataProcessorContext>();
1703-
if (!context.canForwardEarly || spec.forwards.empty()) {
1718+
if (context.forwardPolicy == ForwardPolicy::AfterProcessing || spec.forwards.empty()) {
17041719
O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding not enabled / needed.");
17051720
return;
17061721
}
@@ -1858,7 +1873,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
18581873
stats.updateStats({(int)ProcessingStatsId::ERROR_COUNT, DataProcessingStats::Op::Add, 1});
18591874
};
18601875

1861-
auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
1876+
auto handleValidMessages = [&info, ref, &reportError, &context](std::vector<InputInfo> const& inputInfos) {
18621877
auto& relayer = ref.get<DataRelayer>();
18631878
auto& state = ref.get<DeviceState>();
18641879
static WaitBackpressurePolicy policy;
@@ -1919,7 +1934,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
19191934
input,
19201935
nMessages,
19211936
nPayloadsPerHeader,
1922-
nullptr,
1937+
context.forwardPolicy == ForwardPolicy::AtInjection ? forwardOnInsertion : nullptr,
19231938
onDrop);
19241939
switch (relayed.type) {
19251940
case DataRelayer::RelayChoice::Type::Backpressured:
@@ -2333,11 +2348,23 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
23332348
bool hasForwards = spec.forwards.empty() == false;
23342349
bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;
23352350

2336-
if (context.canForwardEarly && hasForwards && consumeSomething) {
2337-
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2351+
if (context.forwardPolicy == ForwardPolicy::AtCompletionPolicySatisified && hasForwards && consumeSomething) {
2352+
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwarding: %{public}s.", fmt::format("{}", action.op).c_str());
23382353
auto& timesliceIndex = ref.get<TimesliceIndex>();
23392354
forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2355+
} else if (context.forwardPolicy == ForwardPolicy::AtInjection && hasForwards && consumeSomething) {
2356+
// We used to do fowarding here, however we now do it much earlier.
2357+
// We still need to clean the inputs which were already consumed
2358+
// via ConsumeExisting and which still have an header to hold the slot.
2359+
// FIXME: do we? This should really happen when we do the forwarding on
2360+
// insertion, because otherwise we lose the relevant information on how to
2361+
// navigate the set of headers. We could actually rely on the messageset index,
2362+
// is that the right thing to do though?
2363+
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "cleaning early forwarding: %{public}s.", fmt::format("{}", action.op).c_str());
2364+
auto& timesliceIndex = ref.get<TimesliceIndex>();
2365+
cleanEarlyForward(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
23402366
}
2367+
23412368
markInputsAsDone(action.slot);
23422369

23432370
uint64_t tStart = uv_hrtime();
@@ -2456,7 +2483,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
24562483
context.postDispatchingCallbacks(processContext);
24572484
ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
24582485
}
2459-
if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
2486+
if ((context.forwardPolicy == ForwardPolicy::AfterProcessing) && hasForwards && consumeSomething) {
24602487
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
24612488
auto& timesliceIndex = ref.get<TimesliceIndex>();
24622489
forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);

Framework/Core/src/DataRelayer.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,8 @@ DataRelayer::RelayChoice
521521
continue;
522522
}
523523
auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
524+
// Notice this will split [(header, payload), (header, payload)] multiparts
525+
// in N different subParts for the message spec.
524526
target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1);
525527
mi += nPayloads;
526528
saved += nPayloads;

0 commit comments

Comments
 (0)