Skip to content

Commit 027cad2

Browse files
aalkinktf
authored andcommitted
Propagate dangling edges context to init context and delay algo loading
1 parent a5f88b7 commit 027cad2

File tree

5 files changed

+37
-46
lines changed

5 files changed

+37
-46
lines changed

Framework/AnalysisSupport/src/AODReaderHelpers.cxx

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,12 @@ struct Buildable {
7979

8080
} // namespace
8181

82-
AlgorithmSpec AODReaderHelpers::indexBuilderCallback(ConfigContext const& ctx)
82+
AlgorithmSpec AODReaderHelpers::indexBuilderCallback(ConfigContext const& /*ctx*/)
8383
{
84-
auto& ac = ctx.services().get<DanglingEdgesContext>();
85-
return AlgorithmSpec::InitCallback{[requested = ac.requestedIDXs](InitContext& /*ic*/) {
84+
return AlgorithmSpec::InitCallback{[](InitContext& ic) {
85+
auto const& requested = ic.services().get<DanglingEdgesContext>().requestedIDXs;
8686
std::vector<Buildable> buildables;
87-
for (auto& i : requested) {
87+
for (auto const& i : requested) {
8888
buildables.emplace_back(i);
8989
}
9090
std::vector<Builder> builders;
@@ -181,12 +181,12 @@ struct Spawnable {
181181

182182
} // namespace
183183

184-
AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(ConfigContext const& ctx)
184+
AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(ConfigContext const& /*ctx*/)
185185
{
186-
auto& ac = ctx.services().get<DanglingEdgesContext>();
187-
return AlgorithmSpec::InitCallback{[requested = ac.spawnerInputs](InitContext& /*ic*/) {
186+
return AlgorithmSpec::InitCallback{[](InitContext& ic) {
187+
auto const& requested = ic.services().get<DanglingEdgesContext>().spawnerInputs;
188188
std::vector<Spawnable> spawnables;
189-
for (auto& i : requested) {
189+
for (auto const& i : requested) {
190190
spawnables.emplace_back(i);
191191
}
192192
std::vector<Spawner> spawners;

Framework/AnalysisSupport/src/AODReaderHelpers.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ namespace o2::framework::readers
2020

2121
struct AODReaderHelpers {
2222
static AlgorithmSpec rootFileReaderCallback();
23-
static AlgorithmSpec aodSpawnerCallback(ConfigContext const& ctx);
24-
static AlgorithmSpec indexBuilderCallback(ConfigContext const& ctx);
23+
static AlgorithmSpec aodSpawnerCallback(ConfigContext const& /*ctx*/);
24+
static AlgorithmSpec indexBuilderCallback(ConfigContext const& /*ctx*/);
2525
};
2626

2727
} // namespace o2::framework::readers

Framework/Core/src/ArrowSupport.cxx

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "Framework/ArrowContext.h"
1414
#include "Framework/ArrowTableSlicingCache.h"
1515
#include "Framework/DataProcessor.h"
16+
#include "Framework/CommonDataProcessors.h"
1617
#include "Framework/DataProcessingStats.h"
1718
#include "Framework/ServiceRegistry.h"
1819
#include "Framework/ConfigContext.h"
@@ -609,9 +610,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
609610
// recreate inputs and outputs
610611
builder->inputs.clear();
611612
builder->outputs.clear();
612-
// replace AlgorithmSpec
613-
// FIXME: it should be made more generic, so it does not need replacement...
614-
builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx); // readers::AODReaderHelpers::indexBuilderCallback(ctx);
613+
614+
// load real AlgorithmSpec before deployment
615+
builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx);
615616
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.requestedIDXs, dec.requestedAODs, dec.requestedDYNs, *builder);
616617
}
617618

@@ -634,10 +635,10 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
634635
// recreate inputs and outputs
635636
spawner->outputs.clear();
636637
spawner->inputs.clear();
637-
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, *spawner);
638-
// replace AlgorithmSpec
639-
// FIXME: it should be made more generic, so it does not need replacement...
638+
639+
// load real AlgorithmSpec before deployment
640640
spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx);
641+
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, *spawner);
641642
}
642643

643644
if (analysisCCDB != workflow.end()) {
@@ -654,8 +655,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
654655
// recreate inputs and outputs
655656
analysisCCDB->outputs.clear();
656657
analysisCCDB->inputs.clear();
657-
// replace AlgorithmSpec
658-
// FIXME: it should be made more generic, so it does not need replacement...
658+
// load real AlgorithmSpec before deployment
659659
// FIXME how can I make the lookup depend on DYN tables as well??
660660
analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
661661
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB);
@@ -682,6 +682,10 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
682682
if (reader->outputs.empty()) {
683683
// nothing to read
684684
workflow.erase(reader);
685+
} else {
686+
// load reader algorithm before deployment
687+
auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx);
688+
reader->algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo);
685689
}
686690
}
687691

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ int defaultConditionQueryRateMultiplier()
156156

157157
void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx)
158158
{
159+
int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
159160
DataProcessorSpec ccdbBackend{
160161
.name = "internal-dpl-ccdb-backend",
161162
.outputs = {},
@@ -230,23 +231,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
230231
ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
231232
.requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")};
232233

233-
// AOD reader can be rate limited
234-
int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
235-
std::string rateLimitingChannelConfigInput;
236-
std::string rateLimitingChannelConfigOutput;
237-
bool internalRateLimiting = false;
238-
239-
// In case we have rate-limiting requested, any device without an input will get one on the special
240-
// "DPL/RATE" message.
241-
if (rateLimitingIPCID >= 0) {
242-
rateLimitingChannelConfigInput = fmt::format("name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
243-
ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
244-
rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
245-
ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
246-
internalRateLimiting = true;
247-
aodReader.options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, rateLimitingChannelConfigInput, {"how many timeframes can be in flight at the same time"}});
248-
}
249-
250234
ctx.services().registerService(ServiceRegistryHelpers::handleForService<DanglingEdgesContext>(new DanglingEdgesContext));
251235
auto& dec = ctx.services().get<DanglingEdgesContext>();
252236

@@ -274,7 +258,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
274258
// A timeframeSink consumes timeframes without creating new
275259
// timeframe data.
276260
bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
277-
if (std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid")) != -1) {
261+
if (rateLimitingIPCID != -1) {
278262
if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) {
279263
O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
280264
uint32_t hash = runtime_hash(processor.name.c_str());
@@ -384,7 +368,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
384368
"internal-dpl-aod-index-builder",
385369
{},
386370
{},
387-
PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx), // readers::AODReaderHelpers::indexBuilderCallback(ctx),
371+
AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
388372
{}};
389373
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.requestedIDXs, dec.requestedAODs, dec.requestedDYNs, indexBuilder);
390374

@@ -400,7 +384,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
400384
"internal-dpl-aod-spawner",
401385
{},
402386
{},
403-
PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx), // readers::AODReaderHelpers::aodSpawnerCallback(ctx),
387+
AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
404388
{}};
405389
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, aodSpawner);
406390
AnalysisSupportHelpers::addMissingOutputsToReader(dec.providedAODs, dec.requestedAODs, aodReader);
@@ -431,13 +415,11 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
431415
auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
432416
if (mctracks2aod == workflow.end()) {
433417
// add normal reader
434-
auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx);
435-
aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo);
436418
aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
437419
aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
438420
} else {
439-
// AODs are being injected on-the-fly, add dummy reader
440-
auto algo = AlgorithmSpec{
421+
// AODs are being injected on-the-fly, add error-handler reader
422+
aodReader.algorithm = AlgorithmSpec{
441423
adaptStateful(
442424
[outputs = aodReader.outputs](DeviceSpec const&) {
443425
LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
@@ -448,7 +430,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
448430
// to ensure the output type for adaptStateful
449431
return adaptStateless([](DataAllocator&) {});
450432
})};
451-
aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo);
452433
}
453434
auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
454435
timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
@@ -533,9 +514,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
533514

534515
// add the Analysys CCDB backend which reads CCDB objects using a provided table
535516
if (analysisCCDBBackend.outputs.empty() == false) {
536-
// add normal reader
537-
auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
538-
analysisCCDBBackend.algorithm = algo;
539517
extraSpecs.push_back(analysisCCDBBackend);
540518
}
541519

@@ -637,6 +615,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
637615
extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
638616
} else {
639617
O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");
618+
std::string rateLimitingChannelConfigOutput;
619+
if (rateLimitingIPCID != -1) {
620+
rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
621+
}
640622
extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput));
641623
}
642624
}

Framework/Core/src/runDataProcessing.cxx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
1111
#include <memory>
12+
#include "Framework/DanglingEdgesContext.h"
1213
#include "Framework/TopologyPolicyHelpers.h"
1314
#define BOOST_BIND_GLOBAL_PLACEHOLDERS
1415
#include <stdexcept>
@@ -1016,6 +1017,7 @@ void doDefaultWorkflowTerminationHook()
10161017
}
10171018

10181019
int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
1020+
DanglingEdgesContext& danglingEdgesContext,
10191021
RunningWorkflowInfo const& runningWorkflow,
10201022
RunningDeviceRef ref,
10211023
DriverConfig const& driverConfig,
@@ -1078,6 +1080,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
10781080
&spec,
10791081
&quotaEvaluator,
10801082
&serviceRegistry,
1083+
&danglingEdgesContext,
10811084
&deviceState,
10821085
&deviceProxy,
10831086
&processingPolicies,
@@ -1101,6 +1104,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
11011104
serviceRef.registerService(ServiceRegistryHelpers::handleForService<RunningWorkflowInfo const>(&runningWorkflow));
11021105
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceContext>(deviceContext.get()));
11031106
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
1107+
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DanglingEdgesContext>(&danglingEdgesContext));
11041108

11051109
auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry);
11061110

@@ -1953,6 +1957,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
19531957
if (runningWorkflow.devices[di].id == frameworkId) {
19541958
return doChild(driverInfo.argc, driverInfo.argv,
19551959
serviceRegistry,
1960+
driverInfo.configContext->services().get<DanglingEdgesContext>(),
19561961
runningWorkflow, ref,
19571962
driverConfig,
19581963
driverInfo.processingPolicies,

0 commit comments

Comments
 (0)