Skip to content

Commit 7aa1bbc

Browse files
authored
DPL Analysis: Rework table input record extraction (#14944)
1 parent 7245d49 commit 7aa1bbc

26 files changed

+286
-103
lines changed

Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -98,29 +98,6 @@ using o2::monitoring::tags::Value;
9898

9999
namespace o2::framework::readers
100100
{
101-
auto setEOSCallback(InitContext& ic)
102-
{
103-
ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
104-
[](EndOfStreamContext& eosc) {
105-
auto& control = eosc.services().get<ControlService>();
106-
control.endOfStream();
107-
control.readyToQuit(QuitRequest::Me);
108-
});
109-
}
110-
111-
template <typename O>
112-
static inline auto extractTypedOriginal(ProcessingContext& pc)
113-
{
114-
/// FIXME: this should be done in invokeProcess() as some of the originals may be compound tables
115-
return O{pc.inputs().get<TableConsumer>(aod::MetadataTrait<O>::metadata::tableLabel())->asArrowTable()};
116-
}
117-
118-
template <typename... Os>
119-
static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
120-
{
121-
return std::make_tuple(extractTypedOriginal<Os>(pc)...);
122-
}
123-
124101
AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const& ctx)
125102
{
126103
// aod-parent-base-path-replacement is now a workflow option, so it needs to be

Framework/AnalysisSupport/src/AODReaderHelpers.cxx

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "Framework/DataProcessingHelpers.h"
1919
#include "Framework/AlgorithmSpec.h"
2020
#include "Framework/DataSpecUtils.h"
21+
#include "Framework/DataSpecViews.h"
2122
#include "Framework/ConfigContext.h"
2223
#include "Framework/DanglingEdgesContext.h"
2324

@@ -29,6 +30,7 @@ struct Buildable {
2930
bool exclusive = false;
3031
std::string binding;
3132
std::vector<std::string> labels;
33+
std::vector<framework::ConcreteDataMatcher> matchers;
3234
header::DataOrigin origin;
3335
header::DataDescription description;
3436
header::DataHeader::SubSpecificationType version;
@@ -52,6 +54,7 @@ struct Buildable {
5254

5355
for (auto const& r : records) {
5456
labels.emplace_back(r.label);
57+
matchers.emplace_back(r.matcher);
5558
}
5659
outputSchema = std::make_shared<arrow::Schema>([](std::vector<o2::soa::IndexRecord> const& recs) {
5760
std::vector<std::shared_ptr<arrow::Field>> fields;
@@ -68,6 +71,7 @@ struct Buildable {
6871
return {
6972
exclusive,
7073
labels,
74+
matchers,
7175
records,
7276
outputSchema,
7377
origin,
@@ -105,6 +109,7 @@ namespace
105109
struct Spawnable {
106110
std::string binding;
107111
std::vector<std::string> labels;
112+
std::vector<framework::ConcreteDataMatcher> matchers;
108113
std::vector<expressions::Projector> projectors;
109114
std::vector<std::shared_ptr<gandiva::Expression>> expressions;
110115
std::shared_ptr<arrow::Schema> outputSchema;
@@ -132,14 +137,17 @@ struct Spawnable {
132137
o2::framework::addLabelToSchema(outputSchema, binding.c_str());
133138

134139
std::vector<std::shared_ptr<arrow::Schema>> schemas;
135-
for (auto& i : spec.metadata) {
136-
if (i.name.starts_with("input-schema:")) {
137-
labels.emplace_back(i.name.substr(13));
138-
iws.clear();
139-
auto json = i.defaultValue.get<std::string>();
140-
iws.str(json);
141-
schemas.emplace_back(ArrowJSONHelpers::read(iws));
142-
}
140+
for (auto const& i : spec.metadata | views::filter_string_params_starts_with("input-schema:")) {
141+
labels.emplace_back(i.name.substr(13));
142+
iws.clear();
143+
auto json = i.defaultValue.get<std::string>();
144+
iws.str(json);
145+
schemas.emplace_back(ArrowJSONHelpers::read(iws));
146+
}
147+
for (auto const& i : spec.metadata | views::filter_string_params_starts_with("input:") | std::ranges::views::transform([](auto const& param) {
148+
return DataSpecUtils::fromMetadataString(param.defaultValue.template get<std::string>());
149+
})) {
150+
matchers.emplace_back(std::get<ConcreteDataMatcher>(i.matcher));
143151
}
144152

145153
std::vector<std::shared_ptr<arrow::Field>> fields;
@@ -169,6 +177,7 @@ struct Spawnable {
169177
return {
170178
binding,
171179
labels,
180+
matchers,
172181
expressions,
173182
makeProjector(),
174183
outputSchema,

Framework/AnalysisSupport/src/AODWriterHelpers.cxx

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,12 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
185185
}
186186

187187
// get the TableConsumer and corresponding arrow table
188-
auto msg = pc.inputs().get(ref.spec->binding);
189-
if (msg.header == nullptr) {
188+
if (ref.header == nullptr) {
190189
LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec));
191190
continue;
192191
}
193-
auto s = pc.inputs().get<TableConsumer>(ref.spec->binding);
194-
auto table = s->asArrowTable();
192+
193+
auto table = pc.inputs().get<TableConsumer>(std::get<ConcreteDataMatcher>(ref.spec->matcher))->asArrowTable();
195194
if (!table->Validate().ok()) {
196195
LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName);
197196
continue;

Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
8383
if (m.name.starts_with("input:")) {
8484
auto name = m.name.substr(6);
8585
schemaMetadata->Append("sourceTable", name);
86+
schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
8687
continue;
8788
}
8889
// Ignore the non ccdb: entries
@@ -109,13 +110,13 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
109110
for (auto& schema : schemas) {
110111
std::vector<CCDBFetcherHelper::FetchOp> ops;
111112
auto inputBinding = *schema->metadata()->Get("sourceTable");
113+
auto inputMatcher = DataSpecUtils::fromString(*schema->metadata()->Get("sourceMatcher"));
112114
auto outRouteDesc = *schema->metadata()->Get("outputRoute");
113115
std::string outBinding = *schema->metadata()->Get("outputBinding");
114116
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
115117
"Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
116118
outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
117-
auto ref = inputs.get<TableConsumer>(inputBinding);
118-
auto table = ref->asArrowTable();
119+
auto table = inputs.get<TableConsumer>(inputMatcher)->asArrowTable();
119120
// FIXME: make the fTimestamp column configurable.
120121
auto timestampColumn = table->GetColumnByName("fTimestamp");
121122
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",

Framework/Core/include/Framework/ASoA.h

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#ifndef O2_FRAMEWORK_ASOA_H_
1313
#define O2_FRAMEWORK_ASOA_H_
1414

15+
#include "Framework/ConcreteDataMatcher.h"
1516
#include "Framework/Pack.h" // IWYU pragma: export
1617
#include "Framework/FunctionalHelpers.h" // IWYU pragma: export
1718
#include "Headers/DataHeader.h" // IWYU pragma: export
@@ -375,6 +376,12 @@ consteval const char* signature()
375376
return o2::aod::Hash<R.desc_hash>::str;
376377
}
377378

379+
template <soa::TableRef R>
380+
constexpr framework::ConcreteDataMatcher matcher()
381+
{
382+
return {origin<R>(), description(signature<R>()), R.version};
383+
}
384+
378385
/// hash identification concepts
379386
template <typename T>
380387
concept is_aod_hash = requires(T t) { t.hash; t.str; };
@@ -1393,6 +1400,12 @@ static constexpr std::pair<bool, std::string> hasKey(std::string const& key)
13931400
return {hasColumnForKey(typename aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::columns{}, key), aod::label<ref>()};
13941401
}
13951402

1403+
template <TableRef ref>
1404+
static constexpr std::pair<bool, framework::ConcreteDataMatcher> hasKeyM(std::string const& key)
1405+
{
1406+
return {hasColumnForKey(typename aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::columns{}, key), aod::matcher<ref>()};
1407+
}
1408+
13961409
template <typename... C>
13971410
static constexpr auto haveKey(framework::pack<C...>, std::string const& key)
13981411
{
@@ -1427,6 +1440,31 @@ static constexpr std::string getLabelFromTypeForKey(std::string const& key)
14271440
O2_BUILTIN_UNREACHABLE();
14281441
}
14291442

1443+
template <with_originals T, bool OPT = false>
1444+
static constexpr framework::ConcreteDataMatcher getMatcherFromTypeForKey(std::string const& key)
1445+
{
1446+
if constexpr (T::originals.size() == 1) {
1447+
auto locate = hasKeyM<T::originals[0]>(key);
1448+
if (locate.first) {
1449+
return locate.second;
1450+
}
1451+
} else {
1452+
auto locate = [&]<size_t... Is>(std::index_sequence<Is...>) {
1453+
return std::vector{hasKeyM<T::originals[Is]>(key)...};
1454+
}(std::make_index_sequence<T::originals.size()>{});
1455+
auto it = std::find_if(locate.begin(), locate.end(), [](auto const& x) { return x.first; });
1456+
if (it != locate.end()) {
1457+
return it->second;
1458+
}
1459+
}
1460+
if constexpr (!OPT) {
1461+
notFoundColumn(getLabelFromType<std::decay_t<T>>().data(), key.data());
1462+
} else {
1463+
return framework::ConcreteDataMatcher{header::DataOrigin{"AOD"}, header::DataDescription{"[MISSING]"}, 0};
1464+
}
1465+
O2_BUILTIN_UNREACHABLE();
1466+
}
1467+
14301468
template <typename B, typename... C>
14311469
consteval static bool hasIndexTo(framework::pack<C...>&&)
14321470
{
@@ -1477,15 +1515,18 @@ struct PreslicePolicyGeneral : public PreslicePolicyBase {
14771515
std::span<const int64_t> getSliceFor(int value) const;
14781516
};
14791517

1480-
template <typename T, typename Policy, bool OPT = false>
1518+
template <typename T>
1519+
concept is_preslice_policy = std::derived_from<T, PreslicePolicyBase>;
1520+
1521+
template <typename T, is_preslice_policy Policy, bool OPT = false>
14811522
struct PresliceBase : public Policy {
14821523
constexpr static bool optional = OPT;
14831524
using target_t = T;
14841525
using policy_t = Policy;
14851526
const std::string binding;
14861527

14871528
PresliceBase(expressions::BindingNode index_)
1488-
: Policy{PreslicePolicyBase{{o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name})}, Entry(o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name}), std::string{index_.name})}, {}}
1529+
: Policy{PreslicePolicyBase{{o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name})}, Entry(o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name}), o2::soa::getMatcherFromTypeForKey<T, OPT>(std::string{index_.name}), std::string{index_.name})}, {}}
14891530
{
14901531
}
14911532

@@ -1520,7 +1561,11 @@ template <typename T>
15201561
using PresliceOptional = PresliceBase<T, PreslicePolicySorted, true>;
15211562

15221563
template <typename T>
1523-
concept is_preslice = std::derived_from<T, PreslicePolicyBase>;
1564+
concept is_preslice = std::derived_from<T, PreslicePolicyBase>&&
1565+
requires(T)
1566+
{
1567+
T::optional;
1568+
};
15241569

15251570
/// Can be user to group together a number of Preslice declaration
15261571
/// to avoid the limit of 100 data members per task
@@ -1667,10 +1712,10 @@ auto doFilteredSliceBy(T const* table, o2::framework::PresliceBase<C, framework:
16671712
return prepareFilteredSlice(table, slice, offset);
16681713
}
16691714

1670-
template <typename T>
1715+
template <soa::is_table T>
16711716
auto doSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
16721717
{
1673-
auto localCache = cache.ptr->getCacheFor({o2::soa::getLabelFromTypeForKey<T>(node.name), node.name});
1718+
auto localCache = cache.ptr->getCacheFor({"", o2::soa::getMatcherFromTypeForKey<T>(node.name), node.name});
16741719
auto [offset, count] = localCache.getSliceFor(value);
16751720
auto t = typename T::self_t({table->asArrowTable()->Slice(static_cast<uint64_t>(offset), count)}, static_cast<uint64_t>(offset));
16761721
if (t.tableSize() != 0) {
@@ -1679,19 +1724,19 @@ auto doSliceByCached(T const* table, framework::expressions::BindingNode const&
16791724
return t;
16801725
}
16811726

1682-
template <typename T>
1727+
template <soa::is_filtered_table T>
16831728
auto doFilteredSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
16841729
{
1685-
auto localCache = cache.ptr->getCacheFor({o2::soa::getLabelFromTypeForKey<T>(node.name), node.name});
1730+
auto localCache = cache.ptr->getCacheFor({"", o2::soa::getMatcherFromTypeForKey<T>(node.name), node.name});
16861731
auto [offset, count] = localCache.getSliceFor(value);
16871732
auto slice = table->asArrowTable()->Slice(static_cast<uint64_t>(offset), count);
16881733
return prepareFilteredSlice(table, slice, offset);
16891734
}
16901735

1691-
template <typename T>
1736+
template <soa::is_table T>
16921737
auto doSliceByCachedUnsorted(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
16931738
{
1694-
auto localCache = cache.ptr->getCacheUnsortedFor({o2::soa::getLabelFromTypeForKey<T>(node.name), node.name});
1739+
auto localCache = cache.ptr->getCacheUnsortedFor({"", o2::soa::getMatcherFromTypeForKey<T>(node.name), node.name});
16951740
if constexpr (soa::is_filtered_table<T>) {
16961741
auto t = typename T::self_t({table->asArrowTable()}, localCache.getSliceFor(value));
16971742
if (t.tableSize() != 0) {

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ namespace o2::soa
3030
{
3131
struct IndexRecord {
3232
std::string label;
33+
framework::ConcreteDataMatcher matcher;
3334
std::string columnLabel;
3435
IndexKind kind;
3536
int pos;
@@ -142,6 +143,7 @@ std::vector<std::shared_ptr<arrow::Table>> extractSources(ProcessingContext& pc,
142143
struct Spawner {
143144
std::string binding;
144145
std::vector<std::string> labels;
146+
std::vector<framework::ConcreteDataMatcher> matchers;
145147
std::vector<std::shared_ptr<gandiva::Expression>> expressions;
146148
std::shared_ptr<gandiva::Projector> projector = nullptr;
147149
std::shared_ptr<arrow::Schema> schema = nullptr;
@@ -157,6 +159,7 @@ struct Spawner {
157159
struct Builder {
158160
bool exclusive;
159161
std::vector<std::string> labels;
162+
std::vector<framework::ConcreteDataMatcher> matchers;
160163
std::vector<o2::soa::IndexRecord> records;
161164
std::shared_ptr<arrow::Schema> outputSchema;
162165
header::DataOrigin origin;
@@ -258,9 +261,9 @@ inline constexpr auto getIndexMapping()
258261
([&idx]<TableRef ref, typename C>() mutable {
259262
constexpr auto pos = o2::aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::template getIndexPosToKey<Key>();
260263
if constexpr (pos == -1) {
261-
idx.emplace_back(o2::aod::label<ref>(), C::columnLabel(), IndexKind::IdxSelf, pos);
264+
idx.emplace_back(o2::aod::label<ref>(), o2::aod::matcher<ref>(), C::columnLabel(), IndexKind::IdxSelf, pos);
262265
} else {
263-
idx.emplace_back(o2::aod::label<ref>(), C::columnLabel(), getIndexKind<typename C::type>(), pos);
266+
idx.emplace_back(o2::aod::label<ref>(), o2::aod::matcher<ref>(), C::columnLabel(), getIndexKind<typename C::type>(), pos);
264267
}
265268
}.template operator()<refs[Is], typename framework::pack_element_t<Is, indices>>(),
266269
...);

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ template <size_t N, std::array<soa::TableRef, N> refs>
3838
static inline auto extractOriginals(ProcessingContext& pc)
3939
{
4040
return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
41-
return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
41+
return {pc.inputs().get<TableConsumer>(o2::aod::matcher<refs[Is]>())->asArrowTable()...};
4242
}(std::make_index_sequence<refs.size()>());
4343
}
4444
} // namespace
@@ -151,7 +151,7 @@ template <typename T>
151151
concept with_base_table = requires { T::base_specs(); };
152152

153153
template <with_base_table T>
154-
bool requestInputs(std::vector<InputSpec>& inputs, T const& entity)
154+
bool requestInputs(std::vector<InputSpec>& inputs, T const& /*entity*/)
155155
{
156156
auto base_specs = T::base_specs();
157157
for (auto base_spec : base_specs) {
@@ -586,7 +586,7 @@ bool registerCache(T& preslice, Cache& bsks, Cache&)
586586
return true;
587587
}
588588
}
589-
auto locate = std::find_if(bsks.begin(), bsks.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
589+
auto locate = std::find(bsks.begin(), bsks.end(), preslice.getBindingKey());
590590
if (locate == bsks.end()) {
591591
bsks.emplace_back(preslice.getBindingKey());
592592
} else if (locate->enabled == false) {
@@ -604,7 +604,7 @@ bool registerCache(T& preslice, Cache&, Cache& bsksU)
604604
return true;
605605
}
606606
}
607-
auto locate = std::find_if(bsksU.begin(), bsksU.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
607+
auto locate = std::find(bsksU.begin(), bsksU.end(), preslice.getBindingKey());
608608
if (locate == bsksU.end()) {
609609
bsksU.emplace_back(preslice.getBindingKey());
610610
} else if (locate->enabled == false) {

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ struct AnalysisDataProcessorBuilder {
7575
auto key = std::string{"fIndex"} + o2::framework::cutString(soa::getLabelFromType<std::decay_t<G>>());
7676
([&bk, &bku, &key, enabled]() mutable {
7777
if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
78-
auto binding = soa::getLabelFromTypeForKey<std::decay_t<As>>(key);
78+
Entry e{soa::getLabelFromTypeForKey<std::decay_t<As>>(key), soa::getMatcherFromTypeForKey<std::decay_t<As>>(key), key, enabled};
7979
if constexpr (o2::soa::is_smallgroups<std::decay_t<As>>) {
80-
framework::updatePairList(bku, binding, key, enabled);
80+
framework::updatePairList(bku, e);
8181
} else {
82-
framework::updatePairList(bk, binding, key, enabled);
82+
framework::updatePairList(bk, e);
8383
}
8484
}
8585
}(),
@@ -214,7 +214,7 @@ struct AnalysisDataProcessorBuilder {
214214
template <soa::TableRef R>
215215
static auto extractTableFromRecord(InputRecord& record)
216216
{
217-
auto table = record.get<TableConsumer>(o2::aod::label<R>())->asArrowTable();
217+
auto table = record.get<TableConsumer>(o2::aod::matcher<R>())->asArrowTable();
218218
if (table->num_rows() == 0) {
219219
table = makeEmptyTable<R>();
220220
}

0 commit comments

Comments
 (0)