Skip to content

Commit 69cc24e

Browse files
committed
DPL: earlier forwarding
This anticipates the forwarding to the earliest possible moment, i.e. when we are about to insert the messages in a slot. This is the earliest moment we can guarantee messages will be seen only once.
1 parent cad5746 commit 69cc24e

File tree

3 files changed

+45
-13
lines changed

3 files changed

+45
-13
lines changed

Framework/Core/include/Framework/DataProcessingContext.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ struct ServiceRegistry;
2323
struct DataAllocator;
2424
struct DataProcessorSpec;
2525

26+
enum struct ForwardPolicy {
27+
AtInjection,
28+
AtCompletionPolicySatisified,
29+
AfterProcessing
30+
};
31+
2632
struct DataProcessorContext {
2733
DataProcessorContext(DataProcessorContext const&) = delete;
2834
DataProcessorContext() = default;
@@ -122,7 +128,7 @@ struct DataProcessorContext {
122128
mutable std::vector<ServicePreLoopHandle> preLoopHandles;
123129

124130
/// Wether or not the associated DataProcessor can forward things early
125-
bool canForwardEarly = true;
131+
ForwardPolicy forwardPolicy = ForwardPolicy::AtInjection;
126132
bool isSink = false;
127133
bool balancingInputs = true;
128134

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,36 +1050,48 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
10501050
};
10511051
}
10521052

1053-
auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> bool {
1053+
auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> ForwardPolicy {
1054+
ForwardPolicy defaultEarlyForwardPolicy = getenv("DPL_OLD_EARLY_FORWARD") ? ForwardPolicy::AtCompletionPolicySatisified : ForwardPolicy::AtInjection;
1055+
10541056
/// We must make sure there is no optional
10551057
/// if we want to optimize the forwarding
1056-
bool canForwardEarly = (spec.forwards.empty() == false) && deviceContext.processingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1058+
ForwardPolicy forwardPolicy = defaultEarlyForwardPolicy;
1059+
if (spec.forwards.empty() == false) {
1060+
switch (deviceContext.processingPolicies.earlyForward) {
1061+
case o2::framework::EarlyForwardPolicy::NEVER:
1062+
forwardPolicy = ForwardPolicy::AfterProcessing;
1063+
case o2::framework::EarlyForwardPolicy::ALWAYS:
1064+
forwardPolicy = defaultEarlyForwardPolicy;
1065+
case o2::framework::EarlyForwardPolicy::NORAW:
1066+
forwardPolicy = defaultEarlyForwardPolicy;
1067+
};
1068+
}
10571069
bool onlyConditions = true;
10581070
bool overriddenEarlyForward = false;
10591071
for (auto& forwarded : spec.forwards) {
10601072
if (forwarded.matcher.lifetime != Lifetime::Condition) {
10611073
onlyConditions = false;
10621074
}
10631075
if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && deviceContext.processingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
1064-
context.canForwardEarly = false;
1076+
forwardPolicy = ForwardPolicy::AfterProcessing;
10651077
overriddenEarlyForward = true;
10661078
LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
10671079
break;
10681080
}
10691081
if (forwarded.matcher.lifetime == Lifetime::Optional) {
1070-
context.canForwardEarly = false;
1082+
forwardPolicy = ForwardPolicy::AfterProcessing;
10711083
overriddenEarlyForward = true;
10721084
LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
10731085
break;
10741086
}
10751087
}
10761088
if (!overriddenEarlyForward && onlyConditions) {
1077-
context.canForwardEarly = true;
1089+
forwardPolicy = defaultEarlyForwardPolicy;
10781090
LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
10791091
}
1080-
return canForwardEarly;
1092+
return forwardPolicy;
10811093
};
1082-
context.canForwardEarly = decideEarlyForward();
1094+
context.forwardPolicy = decideEarlyForward();
10831095
}
10841096

10851097
void DataProcessingDevice::PreRun()
@@ -1700,7 +1712,7 @@ auto forwardOnInsertion(ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>
17001712

17011713
auto& spec = ref.get<DeviceSpec const>();
17021714
auto& context = ref.get<DataProcessorContext>();
1703-
if (!context.canForwardEarly || spec.forwards.empty()) {
1715+
if (context.forwardPolicy == ForwardPolicy::AfterProcessing || spec.forwards.empty()) {
17041716
O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding not enabled / needed.");
17051717
return;
17061718
}
@@ -1919,7 +1931,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
19191931
input,
19201932
nMessages,
19211933
nPayloadsPerHeader,
1922-
nullptr,
1934+
context.forwardPolicy == ForwardPolicy::AtInjection ? forwardOnInsertion : nullptr,
19231935
onDrop);
19241936
switch (relayed.type) {
19251937
case DataRelayer::RelayChoice::Type::Backpressured:
@@ -2333,11 +2345,23 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
23332345
bool hasForwards = spec.forwards.empty() == false;
23342346
bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;
23352347

2336-
if (context.canForwardEarly && hasForwards && consumeSomething) {
2337-
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2348+
if (context.forwardPolicy == ForwardPolicy::AtCompletionPolicySatisified && hasForwards && consumeSomething) {
2349+
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwarding: %{public}s.", fmt::format("{}", action.op).c_str());
23382350
auto& timesliceIndex = ref.get<TimesliceIndex>();
23392351
forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2352+
} else if (context.forwardPolicy == ForwardPolicy::AtInjection && hasForwards && consumeSomething) {
2353+
// We used to do fowarding here, however we now do it much earlier.
2354+
// We still need to clean the inputs which were already consumed
2355+
// via ConsumeExisting and which still have an header to hold the slot.
2356+
// FIXME: do we? This should really happen when we do the forwarding on
2357+
// insertion, because otherwise we lose the relevant information on how to
2358+
// navigate the set of headers. We could actually rely on the messageset index,
2359+
// is that the right thing to do though?
2360+
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "cleaning early forwarding: %{public}s.", fmt::format("{}", action.op).c_str());
2361+
auto& timesliceIndex = ref.get<TimesliceIndex>();
2362+
cleanEarlyForward(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
23402363
}
2364+
23412365
markInputsAsDone(action.slot);
23422366

23432367
uint64_t tStart = uv_hrtime();
@@ -2456,7 +2480,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
24562480
context.postDispatchingCallbacks(processContext);
24572481
ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
24582482
}
2459-
if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
2483+
if ((context.forwardPolicy == ForwardPolicy::AfterProcessing) && hasForwards && consumeSomething) {
24602484
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
24612485
auto& timesliceIndex = ref.get<TimesliceIndex>();
24622486
forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);

Framework/Core/src/DataRelayer.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,8 @@ DataRelayer::RelayChoice
521521
continue;
522522
}
523523
auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
524+
// Notice this will split [(header, payload), (header, payload)] multiparts
525+
// in N different subParts for the message spec.
524526
target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1);
525527
mi += nPayloads;
526528
saved += nPayloads;

0 commit comments

Comments
 (0)