Skip to content

Commit dec4c31

Browse files
committed
use matchers to extract tables
1 parent 4fcbd49 commit dec4c31

16 files changed

+138
-70
lines changed

Framework/AnalysisSupport/src/AODReaderHelpers.cxx

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "Framework/DataProcessingHelpers.h"
1919
#include "Framework/AlgorithmSpec.h"
2020
#include "Framework/DataSpecUtils.h"
21+
#include "Framework/DataSpecViews.h"
2122
#include "Framework/ConfigContext.h"
2223
#include "Framework/DanglingEdgesContext.h"
2324

@@ -29,6 +30,7 @@ struct Buildable {
2930
bool exclusive = false;
3031
std::string binding;
3132
std::vector<std::string> labels;
33+
std::vector<framework::ConcreteDataMatcher> matchers;
3234
header::DataOrigin origin;
3335
header::DataDescription description;
3436
header::DataHeader::SubSpecificationType version;
@@ -52,6 +54,7 @@ struct Buildable {
5254

5355
for (auto const& r : records) {
5456
labels.emplace_back(r.label);
57+
matchers.emplace_back(r.matcher);
5558
}
5659
outputSchema = std::make_shared<arrow::Schema>([](std::vector<o2::soa::IndexRecord> const& recs) {
5760
std::vector<std::shared_ptr<arrow::Field>> fields;
@@ -68,6 +71,7 @@ struct Buildable {
6871
return {
6972
exclusive,
7073
labels,
74+
matchers,
7175
records,
7276
outputSchema,
7377
origin,
@@ -105,6 +109,7 @@ namespace
105109
struct Spawnable {
106110
std::string binding;
107111
std::vector<std::string> labels;
112+
std::vector<framework::ConcreteDataMatcher> matchers;
108113
std::vector<expressions::Projector> projectors;
109114
std::vector<std::shared_ptr<gandiva::Expression>> expressions;
110115
std::shared_ptr<arrow::Schema> outputSchema;
@@ -132,14 +137,17 @@ struct Spawnable {
132137
o2::framework::addLabelToSchema(outputSchema, binding.c_str());
133138

134139
std::vector<std::shared_ptr<arrow::Schema>> schemas;
135-
for (auto& i : spec.metadata) {
136-
if (i.name.starts_with("input-schema:")) {
137-
labels.emplace_back(i.name.substr(13));
138-
iws.clear();
139-
auto json = i.defaultValue.get<std::string>();
140-
iws.str(json);
141-
schemas.emplace_back(ArrowJSONHelpers::read(iws));
142-
}
140+
for (auto const& i : spec.metadata | views::filter_string_params_starts_with("input-schema:")) {
141+
labels.emplace_back(i.name.substr(13));
142+
iws.clear();
143+
auto json = i.defaultValue.get<std::string>();
144+
iws.str(json);
145+
schemas.emplace_back(ArrowJSONHelpers::read(iws));
146+
}
147+
for (auto const& i : spec.metadata | views::filter_string_params_starts_with("input:") | std::ranges::views::transform([](auto const& param){
148+
return DataSpecUtils::fromMetadataString(param.defaultValue.template get<std::string>());
149+
})) {
150+
matchers.emplace_back(std::get<ConcreteDataMatcher>(i.matcher));
143151
}
144152

145153
std::vector<std::shared_ptr<arrow::Field>> fields;
@@ -169,6 +177,7 @@ struct Spawnable {
169177
return {
170178
binding,
171179
labels,
180+
matchers,
172181
expressions,
173182
makeProjector(),
174183
outputSchema,

Framework/AnalysisSupport/src/AODWriterHelpers.cxx

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,12 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
185185
}
186186

187187
// get the TableConsumer and corresponding arrow table
188-
auto msg = pc.inputs().get(ref.spec->binding);
189-
if (msg.header == nullptr) {
188+
if (ref.header == nullptr) {
190189
LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec));
191190
continue;
192191
}
193-
auto s = pc.inputs().get<TableConsumer>(ref.spec->binding);
194-
auto table = s->asArrowTable();
192+
193+
auto table = pc.inputs().get<TableConsumer>(std::get<ConcreteDataMatcher>(ref.spec->matcher))->asArrowTable();
195194
if (!table->Validate().ok()) {
196195
LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName);
197196
continue;

Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
8383
if (m.name.starts_with("input:")) {
8484
auto name = m.name.substr(6);
8585
schemaMetadata->Append("sourceTable", name);
86+
schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
8687
continue;
8788
}
8889
// Ignore the non ccdb: entries
@@ -109,13 +110,13 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
109110
for (auto& schema : schemas) {
110111
std::vector<CCDBFetcherHelper::FetchOp> ops;
111112
auto inputBinding = *schema->metadata()->Get("sourceTable");
113+
auto inputMatcher = DataSpecUtils::fromString(*schema->metadata()->Get("sourceMatcher"));
112114
auto outRouteDesc = *schema->metadata()->Get("outputRoute");
113115
std::string outBinding = *schema->metadata()->Get("outputBinding");
114116
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
115117
"Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
116118
outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
117-
auto ref = inputs.get<TableConsumer>(inputBinding);
118-
auto table = ref->asArrowTable();
119+
auto table = inputs.get<TableConsumer>(inputMatcher)->asArrowTable();
119120
// FIXME: make the fTimestamp column configurable.
120121
auto timestampColumn = table->GetColumnByName("fTimestamp");
121122
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",

Framework/Core/include/Framework/ASoA.h

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ consteval const char* signature()
379379
template <soa::TableRef R>
380380
constexpr framework::ConcreteDataMatcher matcher()
381381
{
382-
return {origin<R>(), header::DataDescription{description_str(signature<R>())}, R.version};
382+
return {origin<R>(), description(signature<R>()), R.version};
383383
}
384384

385385
/// hash identification concepts
@@ -1400,6 +1400,12 @@ static constexpr std::pair<bool, std::string> hasKey(std::string const& key)
14001400
return {hasColumnForKey(typename aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::columns{}, key), aod::label<ref>()};
14011401
}
14021402

1403+
template <TableRef ref>
1404+
static constexpr std::pair<bool, framework::ConcreteDataMatcher> hasKeyM(std::string const& key)
1405+
{
1406+
return {hasColumnForKey(typename aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::columns{}, key), aod::matcher<ref>()};
1407+
}
1408+
14031409
template <typename... C>
14041410
static constexpr auto haveKey(framework::pack<C...>, std::string const& key)
14051411
{
@@ -1434,6 +1440,31 @@ static constexpr std::string getLabelFromTypeForKey(std::string const& key)
14341440
O2_BUILTIN_UNREACHABLE();
14351441
}
14361442

1443+
template <with_originals T, bool OPT = false>
1444+
static constexpr framework::ConcreteDataMatcher getMatcherFromTypeForKey(std::string const& key)
1445+
{
1446+
if constexpr (T::originals.size() == 1) {
1447+
auto locate = hasKeyM<T::originals[0]>(key);
1448+
if (locate.first) {
1449+
return locate.second;
1450+
}
1451+
} else {
1452+
auto locate = [&]<size_t... Is>(std::index_sequence<Is...>) {
1453+
return std::vector{hasKeyM<T::originals[Is]>(key)...};
1454+
}(std::make_index_sequence<T::originals.size()>{});
1455+
auto it = std::find_if(locate.begin(), locate.end(), [](auto const& x) { return x.first; });
1456+
if (it != locate.end()) {
1457+
return it->second;
1458+
}
1459+
}
1460+
if constexpr (!OPT) {
1461+
notFoundColumn(getLabelFromType<std::decay_t<T>>().data(), key.data());
1462+
} else {
1463+
return framework::ConcreteDataMatcher{header::DataOrigin{"AOD"}, header::DataDescription{"[MISSING]"}, 0};
1464+
}
1465+
O2_BUILTIN_UNREACHABLE();
1466+
}
1467+
14371468
template <typename B, typename... C>
14381469
consteval static bool hasIndexTo(framework::pack<C...>&&)
14391470
{
@@ -1484,15 +1515,18 @@ struct PreslicePolicyGeneral : public PreslicePolicyBase {
14841515
std::span<const int64_t> getSliceFor(int value) const;
14851516
};
14861517

1487-
template <typename T, typename Policy, bool OPT = false>
1518+
template <typename T>
1519+
concept is_preslice_policy = std::derived_from<T, PreslicePolicyBase>;
1520+
1521+
template <typename T, is_preslice_policy Policy, bool OPT = false>
14881522
struct PresliceBase : public Policy {
14891523
constexpr static bool optional = OPT;
14901524
using target_t = T;
14911525
using policy_t = Policy;
14921526
const std::string binding;
14931527

14941528
PresliceBase(expressions::BindingNode index_)
1495-
: Policy{PreslicePolicyBase{{o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name})}, Entry(o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name}), std::string{index_.name})}, {}}
1529+
: Policy{PreslicePolicyBase{{o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name})}, Entry(o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name}), o2::soa::getMatcherFromTypeForKey<T, OPT>(std::string{index_.name}), std::string{index_.name})}, {}}
14961530
{
14971531
}
14981532

@@ -1527,7 +1561,7 @@ template <typename T>
15271561
using PresliceOptional = PresliceBase<T, PreslicePolicySorted, true>;
15281562

15291563
template <typename T>
1530-
concept is_preslice = std::derived_from<T, PreslicePolicyBase>;
1564+
concept is_preslice = std::derived_from<T, PreslicePolicyBase> && requires(T) { T::optional; };
15311565

15321566
/// Can be user to group together a number of Preslice declaration
15331567
/// to avoid the limit of 100 data members per task
@@ -1674,10 +1708,10 @@ auto doFilteredSliceBy(T const* table, o2::framework::PresliceBase<C, framework:
16741708
return prepareFilteredSlice(table, slice, offset);
16751709
}
16761710

1677-
template <typename T>
1711+
template <soa::is_table T>
16781712
auto doSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
16791713
{
1680-
auto localCache = cache.ptr->getCacheFor({o2::soa::getLabelFromTypeForKey<T>(node.name), node.name});
1714+
auto localCache = cache.ptr->getCacheFor({"", o2::soa::getMatcherFromTypeForKey<T>(node.name), node.name});
16811715
auto [offset, count] = localCache.getSliceFor(value);
16821716
auto t = typename T::self_t({table->asArrowTable()->Slice(static_cast<uint64_t>(offset), count)}, static_cast<uint64_t>(offset));
16831717
if (t.tableSize() != 0) {
@@ -1686,19 +1720,19 @@ auto doSliceByCached(T const* table, framework::expressions::BindingNode const&
16861720
return t;
16871721
}
16881722

1689-
template <typename T>
1723+
template <soa::is_filtered_table T>
16901724
auto doFilteredSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
16911725
{
1692-
auto localCache = cache.ptr->getCacheFor({o2::soa::getLabelFromTypeForKey<T>(node.name), node.name});
1726+
auto localCache = cache.ptr->getCacheFor({"", o2::soa::getMatcherFromTypeForKey<T>(node.name), node.name});
16931727
auto [offset, count] = localCache.getSliceFor(value);
16941728
auto slice = table->asArrowTable()->Slice(static_cast<uint64_t>(offset), count);
16951729
return prepareFilteredSlice(table, slice, offset);
16961730
}
16971731

1698-
template <typename T>
1732+
template <soa::is_table T>
16991733
auto doSliceByCachedUnsorted(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
17001734
{
1701-
auto localCache = cache.ptr->getCacheUnsortedFor({o2::soa::getLabelFromTypeForKey<T>(node.name), node.name});
1735+
auto localCache = cache.ptr->getCacheUnsortedFor({"", o2::soa::getMatcherFromTypeForKey<T>(node.name), node.name});
17021736
if constexpr (soa::is_filtered_table<T>) {
17031737
auto t = typename T::self_t({table->asArrowTable()}, localCache.getSliceFor(value));
17041738
if (t.tableSize() != 0) {

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ namespace o2::soa
3030
{
3131
struct IndexRecord {
3232
std::string label;
33+
framework::ConcreteDataMatcher matcher;
3334
std::string columnLabel;
3435
IndexKind kind;
3536
int pos;
@@ -142,6 +143,7 @@ std::vector<std::shared_ptr<arrow::Table>> extractSources(ProcessingContext& pc,
142143
struct Spawner {
143144
std::string binding;
144145
std::vector<std::string> labels;
146+
std::vector<framework::ConcreteDataMatcher> matchers;
145147
std::vector<std::shared_ptr<gandiva::Expression>> expressions;
146148
std::shared_ptr<gandiva::Projector> projector = nullptr;
147149
std::shared_ptr<arrow::Schema> schema = nullptr;
@@ -157,6 +159,7 @@ struct Spawner {
157159
struct Builder {
158160
bool exclusive;
159161
std::vector<std::string> labels;
162+
std::vector<framework::ConcreteDataMatcher> matchers;
160163
std::vector<o2::soa::IndexRecord> records;
161164
std::shared_ptr<arrow::Schema> outputSchema;
162165
header::DataOrigin origin;
@@ -258,9 +261,9 @@ inline constexpr auto getIndexMapping()
258261
([&idx]<TableRef ref, typename C>() mutable {
259262
constexpr auto pos = o2::aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::template getIndexPosToKey<Key>();
260263
if constexpr (pos == -1) {
261-
idx.emplace_back(o2::aod::label<ref>(), C::columnLabel(), IndexKind::IdxSelf, pos);
264+
idx.emplace_back(o2::aod::label<ref>(), o2::aod::matcher<ref>(), C::columnLabel(), IndexKind::IdxSelf, pos);
262265
} else {
263-
idx.emplace_back(o2::aod::label<ref>(), C::columnLabel(), getIndexKind<typename C::type>(), pos);
266+
idx.emplace_back(o2::aod::label<ref>(), o2::aod::matcher<ref>(), C::columnLabel(), getIndexKind<typename C::type>(), pos);
264267
}
265268
}.template operator()<refs[Is], typename framework::pack_element_t<Is, indices>>(),
266269
...);

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ template <size_t N, std::array<soa::TableRef, N> refs>
3838
static inline auto extractOriginals(ProcessingContext& pc)
3939
{
4040
return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
41-
return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
41+
return {pc.inputs().get<TableConsumer>(o2::aod::matcher<refs[Is]>())->asArrowTable()...};
4242
}(std::make_index_sequence<refs.size()>());
4343
}
4444
} // namespace
@@ -151,7 +151,7 @@ template <typename T>
151151
concept with_base_table = requires { T::base_specs(); };
152152

153153
template <with_base_table T>
154-
bool requestInputs(std::vector<InputSpec>& inputs, T const& entity)
154+
bool requestInputs(std::vector<InputSpec>& inputs, T const& /*entity*/)
155155
{
156156
auto base_specs = T::base_specs();
157157
for (auto base_spec : base_specs) {
@@ -586,7 +586,7 @@ bool registerCache(T& preslice, Cache& bsks, Cache&)
586586
return true;
587587
}
588588
}
589-
auto locate = std::find_if(bsks.begin(), bsks.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
589+
auto locate = std::find(bsks.begin(), bsks.end(), preslice.getBindingKey());
590590
if (locate == bsks.end()) {
591591
bsks.emplace_back(preslice.getBindingKey());
592592
} else if (locate->enabled == false) {
@@ -604,7 +604,7 @@ bool registerCache(T& preslice, Cache&, Cache& bsksU)
604604
return true;
605605
}
606606
}
607-
auto locate = std::find_if(bsksU.begin(), bsksU.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
607+
auto locate = std::find(bsksU.begin(), bsksU.end(), preslice.getBindingKey());
608608
if (locate == bsksU.end()) {
609609
bsksU.emplace_back(preslice.getBindingKey());
610610
} else if (locate->enabled == false) {

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ struct AnalysisDataProcessorBuilder {
7575
auto key = std::string{"fIndex"} + o2::framework::cutString(soa::getLabelFromType<std::decay_t<G>>());
7676
([&bk, &bku, &key, enabled]() mutable {
7777
if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
78-
auto binding = soa::getLabelFromTypeForKey<std::decay_t<As>>(key);
78+
Entry e{soa::getLabelFromTypeForKey<std::decay_t<As>>(key), soa::getMatcherFromTypeForKey<std::decay_t<As>>(key), key, enabled};
7979
if constexpr (o2::soa::is_smallgroups<std::decay_t<As>>) {
80-
framework::updatePairList(bku, binding, key, enabled);
80+
framework::updatePairList(bku, e);
8181
} else {
82-
framework::updatePairList(bk, binding, key, enabled);
82+
framework::updatePairList(bk, e);
8383
}
8484
}
8585
}(),

Framework/Core/include/Framework/ArrowTableSlicingCache.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#ifndef ARROWTABLESLICINGCACHE_H
1313
#define ARROWTABLESLICINGCACHE_H
1414

15+
#include "Framework/ConcreteDataMatcher.h"
1516
#include "Framework/ServiceHandle.h"
1617
#include <arrow/array.h>
1718
#include <gsl/span>
@@ -36,20 +37,28 @@ struct SliceInfoUnsortedPtr {
3637

3738
struct Entry {
3839
std::string binding;
40+
ConcreteDataMatcher matcher;
3941
std::string key;
4042
bool enabled;
4143

42-
Entry(std::string b, std::string k, bool e = true)
44+
Entry(std::string b, ConcreteDataMatcher m, std::string k, bool e = true)
4345
: binding{b},
46+
matcher{m},
4447
key{k},
4548
enabled{e}
4649
{
4750
}
51+
52+
friend bool operator==(Entry const& lhs, Entry const& rhs)
53+
{
54+
return (lhs.matcher == rhs.matcher) &&
55+
(lhs.key == rhs.key);
56+
}
4857
};
4958

5059
using Cache = std::vector<Entry>;
5160

52-
void updatePairList(Cache& list, std::string const& binding, std::string const& key, bool enabled);
61+
void updatePairList(Cache& list, Entry& entry);
5362

5463
struct ArrowTableSlicingCacheDef {
5564
constexpr static ServiceKind service_kind = ServiceKind::Global;

Framework/Core/include/Framework/GroupSlicer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ struct GroupSlicer {
5555
{
5656
constexpr auto index = framework::has_type_at_v<std::decay_t<T>>(associated_pack_t{});
5757
auto binding = o2::soa::getLabelFromTypeForKey<std::decay_t<T>>(mIndexColumnName);
58-
auto bk = Entry(binding, mIndexColumnName);
58+
auto bk = Entry(binding, o2::soa::getMatcherFromTypeForKey<std::decay_t<T>>(mIndexColumnName), mIndexColumnName);
5959
if constexpr (!o2::soa::is_smallgroups<std::decay_t<T>>) {
6060
if (table.size() == 0) {
6161
return;

0 commit comments

Comments
 (0)