Skip to content

Commit c990996

Browse files
authored
DPL Analysis: Use dangling edges context in more places (#14953)
1 parent dd66913 commit c990996

File tree

5 files changed

+45
-45
lines changed

5 files changed

+45
-45
lines changed

Framework/AnalysisSupport/src/AODWriterHelpers.cxx

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,13 @@ const static std::unordered_map<OutputObjHandlingPolicy, std::string> ROOTfileNa
6262

6363
AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
6464
{
65-
auto& ac = ctx.services().get<DanglingEdgesContext>();
6665
auto dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
6766
int compressionLevel = 505;
6867
if (ctx.options().hasOption("aod-writer-compression")) {
6968
compressionLevel = ctx.options().get<int>("aod-writer-compression");
7069
}
71-
return AlgorithmSpec{[dod, outputInputs = ac.outputsInputsAOD, compressionLevel](InitContext& ic) -> std::function<void(ProcessingContext&)> {
70+
return AlgorithmSpec{[dod, compressionLevel](InitContext& ic) -> std::function<void(ProcessingContext&)> {
71+
auto outputInputs = ic.services().get<DanglingEdgesContext>().outputsInputsAOD;
7272
LOGP(debug, "======== getGlobalAODSink::Init ==========");
7373

7474
// find out if any table needs to be saved
@@ -241,14 +241,13 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
241241
};
242242
}
243243

244-
AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
244+
AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& /*ctx*/)
245245
{
246-
using namespace monitoring;
247-
auto& ac = ctx.services().get<DanglingEdgesContext>();
248-
auto tskmap = ac.outTskMap;
249-
auto objmap = ac.outObjHistMap;
250-
251-
return AlgorithmSpec{[objmap, tskmap](InitContext& ic) -> std::function<void(ProcessingContext&)> {
246+
return AlgorithmSpec{[](InitContext& ic) -> std::function<void(ProcessingContext&)> {
247+
using namespace monitoring;
248+
auto& dec = ic.services().get<DanglingEdgesContext>();
249+
auto tskmap = dec.outTskMap;
250+
auto objmap = dec.outObjHistMap;
252251
auto& callbacks = ic.services().get<CallbackService>();
253252
auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
254253

@@ -278,7 +277,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
278277

279278
callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
280279
return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
281-
auto mergePart = [&inputObjects, &objmap, &tskmap, &pc](DataRef const& ref) {
280+
auto mergePart = [&inputObjects, &objmap, &tskmap](DataRef const& ref) {
282281
O2_SIGNPOST_ID_GENERATE(hid, histogram_registry);
283282
O2_SIGNPOST_START(histogram_registry, hid, "mergePart", "Merging histogram");
284283
if (!ref.header) {
@@ -474,7 +473,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
474473
};
475474
O2_SIGNPOST_ID_GENERATE(rid, histogram_registry);
476475
O2_SIGNPOST_START(histogram_registry, rid, "processParts", "Start merging %zu parts received together.", pc.inputs().getNofParts(0));
477-
for (int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) {
476+
for (auto pi = 0U; pi < pc.inputs().getNofParts(0); ++pi) {
478477
mergePart(pc.inputs().get("x", pi));
479478
}
480479
O2_SIGNPOST_END(histogram_registry, rid, "processParts", "Done histograms in multipart message.");

Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -67,38 +67,39 @@ void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::Outpu
6767
}
6868
} // namespace
6969

70-
AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
70+
AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
7171
{
72-
auto& ac = ctx.services().get<DanglingEdgesContext>();
73-
std::vector<std::shared_ptr<arrow::Schema>> schemas;
74-
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
72+
return adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, InitContext& ic) {
73+
auto& dec = ic.services().get<DanglingEdgesContext>();
74+
std::vector<std::shared_ptr<arrow::Schema>> schemas;
75+
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
7576

76-
for (auto& input : ac.analysisCCDBInputs) {
77-
std::vector<std::shared_ptr<arrow::Field>> fields;
78-
schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
79-
schemaMetadata->Append("outputBinding", input.binding);
77+
for (auto& input : dec.analysisCCDBInputs) {
78+
std::vector<std::shared_ptr<arrow::Field>> fields;
79+
schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
80+
schemaMetadata->Append("outputBinding", input.binding);
8081

81-
for (auto& m : input.metadata) {
82-
// Save the list of input tables
83-
if (m.name.starts_with("input:")) {
84-
auto name = m.name.substr(6);
85-
schemaMetadata->Append("sourceTable", name);
86-
schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
87-
continue;
88-
}
89-
// Ignore the non ccdb: entries
90-
if (!m.name.starts_with("ccdb:")) {
91-
continue;
82+
for (auto& m : input.metadata) {
83+
// Save the list of input tables
84+
if (m.name.starts_with("input:")) {
85+
auto name = m.name.substr(6);
86+
schemaMetadata->Append("sourceTable", name);
87+
schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
88+
continue;
89+
}
90+
// Ignore the non ccdb: entries
91+
if (!m.name.starts_with("ccdb:")) {
92+
continue;
93+
}
94+
// Create the schema of the output
95+
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
96+
metadata->Append("url", m.defaultValue.asString());
97+
auto columnName = m.name.substr(strlen("ccdb:"));
98+
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
9299
}
93-
// Create the schema of the output
94-
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
95-
metadata->Append("url", m.defaultValue.asString());
96-
auto columnName = m.name.substr(strlen("ccdb:"));
97-
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
100+
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
98101
}
99-
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
100-
}
101-
return adaptStateful([schemas](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) {
102+
102103
std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
103104
CCDBFetcherHelper::initialiseHelper(*helper, options);
104105
std::unordered_map<std::string, int> bindings;
@@ -129,11 +130,11 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
129130
int outputRouteIndex = bindings.at(outRouteDesc);
130131
auto& spec = helper->routes[outputRouteIndex].matcher;
131132
std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
132-
for (auto& _ : schema->fields()) {
133+
for (auto const& _ : schema->fields()) {
133134
builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
134135
}
135136

136-
for (size_t ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
137+
for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
137138
std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
138139
auto const* timestamps = chunk->data()->GetValuesSafe<size_t>(1);
139140

Framework/CCDBSupport/src/AnalysisCCDBHelpers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace o2::framework
1717
{
1818

1919
struct AnalysisCCDBHelpers {
20-
static AlgorithmSpec fetchFromCCDB(ConfigContext const& ctx);
20+
static AlgorithmSpec fetchFromCCDB(ConfigContext const&);
2121
};
2222

2323
} // namespace o2::framework

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
521521
std::vector<ExpressionInfo> expressionInfos;
522522

523523
/// make sure options and configurables are set before expression infos are created
524-
homogeneous_apply_refs([&options, &hash](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
524+
homogeneous_apply_refs([&options](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
525525
/// extract conditions and append them as inputs
526526
homogeneous_apply_refs([&inputs](auto& element) { return analysis_task_parsers::appendCondition(inputs, element); }, *task.get());
527527

@@ -620,7 +620,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
620620
}
621621
// reset pre-slice for the next dataframe
622622
auto slices = pc.services().get<ArrowTableSlicingCache>();
623-
homogeneous_apply_refs([&pc, &slices](auto& element) {
623+
homogeneous_apply_refs([&slices](auto& element) {
624624
return analysis_task_parsers::updateSliceInfo(element, slices);
625625
},
626626
*(task.get()));

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ std::shared_ptr<DataOutputDirector> AnalysisSupportHelpers::getDataOutputDirecto
9898
if (!keepString.empty()) {
9999
dod->reset();
100100
std::string d("dangling");
101-
if (d.find(keepString) == 0) {
101+
if (d.starts_with(keepString) == 0) {
102102
// use the dangling outputs
103103
std::vector<InputSpec> danglingOutputs;
104104
for (auto ii = 0u; ii < OutputsInputs.size(); ii++) {
@@ -144,7 +144,7 @@ void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector<OutputSpec>
144144
sinks::append_to{publisher.outputs}; // append them to the publisher outputs
145145

146146
std::vector<InputSpec> additionalInputs;
147-
for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) {
147+
for (auto const& input : requestedSpecials | views::filter_not_matching(providedSpecials)) {
148148
input.metadata |
149149
views::filter_string_params_with("input:") |
150150
views::params_to_input_specs() |

0 commit comments

Comments
 (0)