From 0f480ffbe8feec60c60e9687375d6741a5ad7e0f Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Thu, 4 Sep 2025 11:09:32 +0800 Subject: [PATCH 01/11] fixes aggregated changelog result following, fix hybrid aggr --- cmake/autogenerated_versions.txt | 2 +- .../Aggregator/MemoryAggregator/MemoryAggregatedDataVariants.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmake/autogenerated_versions.txt b/cmake/autogenerated_versions.txt index 84c0b27434..b0a0328001 100644 --- a/cmake/autogenerated_versions.txt +++ b/cmake/autogenerated_versions.txt @@ -2,7 +2,7 @@ # NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION, # only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes. -SET(VERSION_REVISION 136) +SET(VERSION_REVISION 140) SET(VERSION_MAJOR 3) SET(VERSION_MINOR 0) SET(VERSION_PATCH 5) diff --git a/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregatedDataVariants.h b/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregatedDataVariants.h index 638f47b916..bce8e4a744 100644 --- a/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregatedDataVariants.h +++ b/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregatedDataVariants.h @@ -613,7 +613,7 @@ SERDE struct MemoryAggregatedDataVariants final : public IAggregatedDataVariants /// Existed versions: /// STATE V3 - REVISION 14 (Add updates tracking state) - /// STATE V4 - REVISION 130 (Track updates for retract state) + /// STATE V4 - REVISION 140 (Track updates for retract state) static constexpr VersionType version = 4; }; From 267159ec5955d4ed0e1f527c8155a401a2c80ed9 Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Thu, 4 Sep 2025 19:27:58 +0800 Subject: [PATCH 02/11] add negate() for unique_exact --- .../Streaming/AggregateFunctionUniq.h | 10 ++++ ...e_exact_and_unique_exact_changelog_kv.yaml | 50 ++++++++++++++++++- 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/src/AggregateFunctions/Streaming/AggregateFunctionUniq.h b/src/AggregateFunctions/Streaming/AggregateFunctionUniq.h index a91b50bdfe..4ca6b47da2 100644 --- a/src/AggregateFunctions/Streaming/AggregateFunctionUniq.h +++ b/src/AggregateFunctions/Streaming/AggregateFunctionUniq.h @@ -381,6 +381,11 @@ class AggregateFunctionUniq final : public IAggregateFunctionDataHelper::add(this->data(place), columns, num_args, row_num); } + void ALWAYS_INLINE negate(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override + { + detail::Adder::negate(this->data(place), columns, num_args, row_num); + } + void ALWAYS_INLINE addBatchSinglePlace( size_t row_begin, size_t row_end, @@ -520,6 +525,11 @@ class AggregateFunctionUniqVariadic final : public IAggregateFunctionDataHelper< detail::Adder::add(this->data(place), columns, num_args, row_num); } + void negate(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override + { + detail::Adder::negate(this->data(place), columns, num_args, row_num); + } + void addBatchSinglePlace( size_t row_begin, size_t row_end, diff --git a/tests/cluster/smoke/0013_changelog_stream/06_aggr_func_unique_exact_and_unique_exact_changelog_kv.yaml b/tests/cluster/smoke/0013_changelog_stream/06_aggr_func_unique_exact_and_unique_exact_changelog_kv.yaml index 20c141fb35..3993827d18 100644 --- a/tests/cluster/smoke/0013_changelog_stream/06_aggr_func_unique_exact_and_unique_exact_changelog_kv.yaml +++ b/tests/cluster/smoke/0013_changelog_stream/06_aggr_func_unique_exact_and_unique_exact_changelog_kv.yaml @@ -1,6 +1,6 @@ tags: - changelog_kv_aggr -description: aggregation function unique_exact and unique_exact changelog_kv +description: aggregation function unique and unique_exact changelog_kv cluster: - p1k1 - p3k1 @@ -56,6 +56,42 @@ steps: - name: unique_exact_if(id, val, val > 1) type: uint64 + - name: "1406-2" + type: stream + query: | + select id, unique(id), unique(to_string(id)), unique(id, val), + unique_if(id, id > 2), unique_if(to_string(id), id > 2), unique_if(id, val, val > 1), + unique_exact(val), unique_exact(to_string(val)), unique_exact(id, val), + unique_exact_if(id, id > 2), unique_exact_if(to_string(id), id > 2), unique_exact_if(id, val, val > 1) + from test14_changelog_kv_stream group by id order by id emit on update; + schema: + - name: id + type: int32 + - name: unique(id) + type: uint64 + - name: unique(to_string(id)) + type: uint64 + - name: unique(id, val) + type: uint64 + - name: unique_if(id, id > 2) + type: uint64 + - name: unique_if(to_string(id), id > 2) + type: uint64 + - name: unique_if(id, val, val > 1) + type: uint64 + - name: unique_exact(val) + type: uint64 + - name: unique_exact(to_string(val)) + type: uint64 + - name: unique_exact(id, val) + type: uint64 + - name: unique_exact_if(id, id > 2) + type: uint64 + - name: unique_exact_if(to_string(id), id > 2) + type: uint64 + - name: unique_exact_if(id, val, val > 1) + type: uint64 + - type: wait time: 2 @@ -79,3 +115,15 @@ steps: - ["5", "5", "6", "3", "3", "6", "6", "6", "6", "3", "3", "6"] - ["4", "4", "5", "3", "3", "5", "5", "5", "5", "3", "3", "5"] - ["3", "3", "4", "2", "2", "4", "4", "4", "4", "2", "2", "4"] + + - type: check + target_name: "1406-2" + expected_result: + - [1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 0, 1] + - [2, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 0, 1] + - [3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] + - [1, 1, 1, 2, 0, 0, 2, 2, 2, 2, 0, 0, 2] + - [4, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] + - [5, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] + - [2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + - [3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] From c27ac02a9e32119b613b51e0d30eff074ef65da2 Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Fri, 5 Sep 2025 04:33:19 +0800 Subject: [PATCH 03/11] skip access unused disks during dropping stream * partial ported from https://github.com/ClickHouse/ClickHouse/pull/84710 * ignore no_such_file_or_directory during dropAllData --- src/Interpreters/DatabaseCatalog.cpp | 22 +++++++++++++++++++++- src/Storages/MergeTree/MergeTreeData.cpp | 4 ++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index e5410822a8..8e18bc489f 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -949,11 +950,30 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) table.table->drop(); } + /// Check if we are interested in a particular disk + /// or it is better to bypass it e.g. to avoid interactions with a remote storage + auto is_disk_eligible_for_search = [this](DiskPtr disk, std::shared_ptr storage) + { + bool is_disk_eligible = !disk->isReadOnly(); + + /// Disk is not actually used by MergeTree table + if (is_disk_eligible && storage && !storage->getStoragePolicy()->tryGetVolumeIndexByDiskName(disk->getName()).has_value()) + { + /// proton: starts. Missing SearchOrphanedPartsDisks part from https://github.com/ClickHouse/ClickHouse/pull/84710 + is_disk_eligible = false; + /// proton: ends. + } + + LOG_TRACE(log, "is disk {} eligible for search: {}", disk->getName(), is_disk_eligible); + return is_disk_eligible; + }; + /// Even if table is not loaded, try remove its data from disks. for (const auto & [disk_name, disk] : getContext()->getDisksMap()) { String data_path = "store/" + getPathForUUID(table.table_id.uuid); - if (!disk->exists(data_path) || disk->isReadOnly()) + auto table_merge_tree = std::dynamic_pointer_cast(table.table); + if (!is_disk_eligible_for_search(disk, table_merge_tree) || !disk->exists(data_path)) continue; LOG_INFO(log, "Removing data directory {} of dropped table {} from disk {}", data_path, table.table_id.getNameForLogs(), disk_name); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index f551e32355..7bf810e6b1 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2713,10 +2713,10 @@ void MergeTreeData::dropAllData() if (disk->exists(fs::path(relative_data_path) / MOVING_DIR_NAME)) disk->removeRecursive(fs::path(relative_data_path) / MOVING_DIR_NAME); - MergeTreeWriteAheadLog::dropAllWriteAheadLogs(disk, relative_data_path); - try { + MergeTreeWriteAheadLog::dropAllWriteAheadLogs(disk, relative_data_path); + if (!disk->isDirectoryEmpty(relative_data_path) && supportsReplication() && disk->supportZeroCopyReplication() && settings_ptr->allow_remote_fs_zero_copy_replication) From 0efe39a495d3cd2a428faf2c979fcd9123977433 Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Mon, 8 Sep 2025 14:37:58 +0800 Subject: [PATCH 04/11] update aggregates of mv query --- .../HybridAggregatedDataVariants.cpp | 32 ++++++++--- .../HybridAggregator/HybridAggregator.h | 8 ++- .../HybridAggregator_States.cpp | 6 +- .../MemoryAggregatedDataVariants.cpp | 17 ++++-- .../MemoryAggregator/MemoryAggregator.h | 3 +- .../MemoryAggregator_State.cpp | 6 +- .../0_stateless/99020_alter_view.reference | 11 ++++ .../0_stateless/99020_alter_view.sql | 56 +++++++++++++++++-- 8 files changed, 114 insertions(+), 25 deletions(-) diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregatedDataVariants.cpp b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregatedDataVariants.cpp index 3b9b0121f8..8aca3bba73 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregatedDataVariants.cpp +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregatedDataVariants.cpp @@ -169,11 +169,16 @@ void HybridAggregatedDataVariants::deserialize(ReadBuffer & rb, const IAggregato hybrid_aggregator.initStates(*this); /// [aggregates_size] was added in state V3 + size_t recovered_aggregates_size = hybrid_aggregator.getParams()->aggregates_size; if (recovered_version >= 3) { - size_t recovered_aggregates_size = 0; readVarUInt(recovered_aggregates_size, rb); - /// TODO: supports for adding aggregates + if (recovered_aggregates_size > hybrid_aggregator.getParams()->aggregates_size) + throw Exception( + ErrorCodes::RECOVER_CHECKPOINT_FAILED, + "Failed to recover aggregation checkpoint. Number of aggregation functions are not compatible, checkpointed={}, current={}", + recovered_aggregates_size, + hybrid_aggregator.getParams()->aggregates_size); } bool is_without_key_type = false; @@ -330,11 +335,21 @@ void HybridAggregatedDataVariants::read(RocksHandlerPtr handler, const IAggregat chassert(empty()); hybrid_aggregator.initStates(*this); + std::optional old_aggregates_size; if (recovered_version >= 3) { size_t recovered_aggregates_size = 0; handler->get("aggregates_size", recovered_aggregates_size); - /// TODO: supports for adding aggregates + if (recovered_aggregates_size > hybrid_aggregator.getParams()->aggregates_size) + throw Exception( + ErrorCodes::RECOVER_CHECKPOINT_FAILED, + "Failed to recover aggregation checkpoint. Number of aggregation functions are not compatible, checkpointed={}, current={}", + recovered_aggregates_size, + hybrid_aggregator.getParams()->aggregates_size); + + /// Supports adding new aggregates + if (recovered_aggregates_size < hybrid_aggregator.getParams()->aggregates_size) + old_aggregates_size = recovered_aggregates_size; } std::string_view without_key_state; @@ -347,7 +362,7 @@ void HybridAggregatedDataVariants::read(RocksHandlerPtr handler, const IAggregat type()); ReadBufferFromString rb(without_key_state); - hybrid_aggregator.deserializeAggregateStates(without_key.get(), rb, recovered_version); + hybrid_aggregator.deserializeAggregateStates(without_key.get(), rb, recovered_version, old_aggregates_size); std::string_view without_key_retracts_state; if (handler->tryGet("without_key_retracts", without_key_retracts_state)) @@ -356,7 +371,7 @@ void HybridAggregatedDataVariants::read(RocksHandlerPtr handler, const IAggregat initWithoutKeyRetractStates(hybrid_aggregator.totalSizeOfAggregatedStates(), hybrid_aggregator.alignOfAggregatedStates()); ReadBufferFromString rb2(without_key_retracts_state); - hybrid_aggregator.deserializeAggregateStates(without_key_retracts.get(), rb2, recovered_version); + hybrid_aggregator.deserializeAggregateStates(without_key_retracts.get(), rb2, recovered_version, old_aggregates_size); } } else @@ -389,9 +404,10 @@ void HybridAggregatedDataVariants::read(RocksHandlerPtr handler, const IAggregat } std::function old_value_deserializer; - if (recovered_version <= 2) - old_value_deserializer = [&hybrid_aggregator, recovered_version](void * data, ReadBuffer & rb) { - hybrid_aggregator.deserializeAggregateStates(reinterpret_cast(data), rb, recovered_version); + /// Deserialize old aggregate states + if (recovered_version <= 2 || old_aggregates_size) + old_value_deserializer = [&hybrid_aggregator, recovered_version, old_aggregates_size](void * data, ReadBuffer & rb) { + hybrid_aggregator.deserializeAggregateStates(reinterpret_cast(data), rb, recovered_version, old_aggregates_size); return ErrorCodes::OK; }; diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h index be18d1bcdb..58332af41d 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h @@ -197,7 +197,13 @@ class HybridAggregator final : public IAggregator void createAggregateStates(AggregateDataPtr aggregate_data) const; void destroyAggregateStates(AggregateDataPtr aggregate_data) const; void serializeAggregateStates(ConstAggregateDataPtr place, WriteBuffer & wb) const; - void deserializeAggregateStates(AggregateDataPtr place, ReadBuffer & rb, VersionType version = HybridAggregatedDataVariants::version) const; + + /// \param version and \param old_aggregates_size are used for deserialization of old aggregate states + void deserializeAggregateStates( + AggregateDataPtr place, + ReadBuffer & rb, + VersionType version = HybridAggregatedDataVariants::version, + std::optional old_aggregates_size = {}) const; void saveAggregateStatesToRetract(AggregateDataPtr place) const; diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_States.cpp b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_States.cpp index 7990938c01..51eb9b76ff 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_States.cpp +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_States.cpp @@ -137,7 +137,7 @@ void HybridAggregator::serializeAggregateStates(ConstAggregateDataPtr place, DB: aggregate_functions[i]->serialize(place + offsets_of_aggregate_states[i], wb); } -void HybridAggregator::deserializeAggregateStates(AggregateDataPtr place, ReadBuffer & rb, VersionType version) const +void HybridAggregator::deserializeAggregateStates(AggregateDataPtr place, ReadBuffer & rb, VersionType version, std::optional old_aggregates_size) const { chassert(place); @@ -154,7 +154,9 @@ void HybridAggregator::deserializeAggregateStates(AggregateDataPtr place, ReadBu if (trackingStateTime()) TrackingTime::deserialize(place + tracking_time_offset, rb); - for (size_t i = 0; i < params->aggregates_size; ++i) + auto aggregates_size = old_aggregates_size.value_or(params->aggregates_size); + chassert(aggregates_size <= params->aggregates_size); + for (size_t i = 0; i < aggregates_size; ++i) aggregate_functions[i]->deserialize(place + offsets_of_aggregate_states[i], rb, std::nullopt, /*arena=*/nullptr); /// FIXME, arena } } diff --git a/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregatedDataVariants.cpp b/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregatedDataVariants.cpp index 72e02980bf..c07020c389 100644 --- a/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregatedDataVariants.cpp +++ b/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregatedDataVariants.cpp @@ -144,7 +144,7 @@ void MemoryAggregatedDataVariants::serialize(WriteBuffer & wb, const IAggregator writeIntBinary(static_cast(memory_aggregator.params->tracking_updates_type), wb); - writeVarUInt(memory_aggregator.params->aggregates.size(), wb); /// Added in V4 + writeVarUInt(memory_aggregator.params->aggregates_size, wb); /// Added in V4 auto state_serializer = [&memory_aggregator](auto place, auto & wb_) { chassert(place); @@ -251,14 +251,19 @@ void MemoryAggregatedDataVariants::deserialize(ReadBuffer & rb, const IAggregato magic_enum::enum_name(memory_aggregator.params->tracking_updates_type)); /// [aggregates_size] was added in V4 + size_t recovered_aggregates_size = memory_aggregator.params->aggregates_size; if (recovered_version >= 4) { - size_t recovered_aggregates_size = 0; readVarUInt(recovered_aggregates_size, rb); - /// TODO: supports for adding aggregates + if (recovered_aggregates_size > memory_aggregator.params->aggregates_size) + throw Exception( + ErrorCodes::RECOVER_CHECKPOINT_FAILED, + "Failed to recover aggregation checkpoint. Number of aggregation functions are not compatible, checkpointed={}, current={}", + recovered_aggregates_size, + memory_aggregator.params->aggregates_size); } - auto state_deserializer = [this, &memory_aggregator, recovered_version](auto & place, auto & rb_, Arena *) { + auto state_deserializer = [this, &memory_aggregator, recovered_version, recovered_aggregates_size](auto & place, auto & rb_, Arena *) { place = nullptr; /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. auto * aggregate_data = aggregates_pool->alignedAlloc(memory_aggregator.totalSizeOfAggregatedStates(), memory_aggregator.alignOfAggregatedStates()); @@ -288,7 +293,7 @@ void MemoryAggregatedDataVariants::deserialize(ReadBuffer & rb, const IAggregato /// whether add() or negate(), ensure it is not empty TrackingUpdates::data(retract_place).updates = std::numeric_limits::max(); - memory_aggregator.deserializeAggregateStates(retract_place, rb_, retract_pool.get()); + memory_aggregator.deserializeAggregateStates(retract_place, rb_, retract_pool.get(), recovered_aggregates_size); } break; } @@ -301,7 +306,7 @@ void MemoryAggregatedDataVariants::deserialize(ReadBuffer & rb, const IAggregato break; } - memory_aggregator.deserializeAggregateStates(place, rb_, aggregates_pool); + memory_aggregator.deserializeAggregateStates(place, rb_, aggregates_pool, recovered_aggregates_size); }; /// [aggr-func-state-without-key] diff --git a/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator.h b/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator.h index b01c3f0650..8eb7a227e5 100644 --- a/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator.h +++ b/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator.h @@ -295,7 +295,8 @@ class MemoryAggregator final : public IAggregator void destroyAggregateStates(AggregateDataPtr & place) const; void serializeAggregateStates(const AggregateDataPtr & place, WriteBuffer & wb) const; - void deserializeAggregateStates(AggregateDataPtr & place, ReadBuffer & rb, Arena * arena) const; + /// \param old_aggregates_size is used for deserialization of old aggregate states + void deserializeAggregateStates(AggregateDataPtr & place, ReadBuffer & rb, Arena * arena, std::optional old_aggregates_size = {}) const; /// \return true means execution must be aborted, false means normal bool checkAndProcessResult(MemoryAggregatedDataVariants & result) const; diff --git a/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator_State.cpp b/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator_State.cpp index 898993f9b3..272a081f09 100644 --- a/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator_State.cpp +++ b/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator_State.cpp @@ -29,11 +29,13 @@ void MemoryAggregator::serializeAggregateStates(const AggregateDataPtr & place, aggregate_functions[i]->serialize(place + offsets_of_aggregate_states[i], wb); } -void MemoryAggregator::deserializeAggregateStates(AggregateDataPtr & place, ReadBuffer & rb, Arena * arena) const +void MemoryAggregator::deserializeAggregateStates(AggregateDataPtr & place, ReadBuffer & rb, Arena * arena, std::optional old_aggregates_size) const { chassert(place); - for (size_t i = 0; i < params->aggregates_size; ++i) + auto aggregates_size = old_aggregates_size.value_or(params->aggregates_size); + chassert(aggregates_size <= params->aggregates_size); + for (size_t i = 0; i < aggregates_size; ++i) aggregate_functions[i]->deserialize(place + offsets_of_aggregate_states[i], rb, std::nullopt, arena); } diff --git a/tests/queries_ported/0_stateless/99020_alter_view.reference b/tests/queries_ported/0_stateless/99020_alter_view.reference index ce00c92e2f..e959097be0 100644 --- a/tests/queries_ported/0_stateless/99020_alter_view.reference +++ b/tests/queries_ported/0_stateless/99020_alter_view.reference @@ -4,3 +4,14 @@ -1 3 0 3 -1 +=== ALTER VIEW MODIFY QUERY (update aggregates) === +memory: +1 0 +2 1 +3 3 +4 5 +hybrid: +1 0 +2 1 +3 3 +4 5 diff --git a/tests/queries_ported/0_stateless/99020_alter_view.sql b/tests/queries_ported/0_stateless/99020_alter_view.sql index 8941e20825..7c9e1ef801 100644 --- a/tests/queries_ported/0_stateless/99020_alter_view.sql +++ b/tests/queries_ported/0_stateless/99020_alter_view.sql @@ -51,8 +51,54 @@ SELECT * except _tp_time from table(99020_view); SELECT * except _tp_time from table(99020_mv) order by _tp_time; SELECT * except _tp_time from table(99020_mv_with_target) order by _tp_time; -DROP VIEW 99020_view; -DROP VIEW 99020_mv; -DROP VIEW 99020_mv_with_target; -DROP STREAM 99020_stream; -DROP STREAM 99020_target_stream; +DROP VIEW IF EXISTS 99020_view; +DROP VIEW IF EXISTS 99020_mv; +DROP VIEW IF EXISTS 99020_mv_with_target; +DROP VIEW IF EXISTS 99020_mv_with_target_hybrid; +DROP STREAM IF EXISTS 99020_stream; +DROP STREAM IF EXISTS 99020_target_stream; +DROP STREAM IF EXISTS 99020_target_stream_hybrid; + +SELECT '=== ALTER VIEW MODIFY QUERY (update aggregates) ==='; +CREATE STREAM 99020_stream (a int, b int); +CREATE STREAM 99020_target_stream (res int, res2 int); +CREATE STREAM 99020_target_stream_hybrid (res int, res2 int); +CREATE MATERIALIZED VIEW 99020_mv_with_target INTO 99020_target_stream AS SELECT count() as res FROM 99020_stream emit on update; +CREATE MATERIALIZED VIEW 99020_mv_with_target_hybrid INTO 99020_target_stream_hybrid AS SELECT count() as res FROM 99020_stream emit on update settings default_hash_table='hybrid'; +SELECT sleep(2) FORMAT Null; +insert into 99020_stream(a, b) values (1, 2); +SELECT sleep(1) FORMAT Null; + +--- add aggregate `sum(a)` +ALTER VIEW 99020_mv_with_target MODIFY QUERY SELECT count() as res, sum(a) as res2 FROM 99020_stream emit on update; +ALTER VIEW 99020_mv_with_target_hybrid MODIFY QUERY SELECT count() as res, sum(a) as res2 FROM 99020_stream emit on update settings default_hash_table='hybrid'; +SELECT sleep(2) FORMAT Null; +insert into 99020_stream(a, b) values (1, 2); +SELECT sleep(1) FORMAT Null; + +--- remove aggregate `sum(a)` (FAILED) +ALTER VIEW 99020_mv_with_target MODIFY QUERY SELECT count() as res FROM 99020_stream emit on update; +ALTER VIEW 99020_mv_with_target_hybrid MODIFY QUERY SELECT count() as res FROM 99020_stream emit on update settings default_hash_table='hybrid'; +SELECT sleep(2) FORMAT Null; +insert into 99020_stream(a, b) values (1, 2); +SELECT sleep(1) FORMAT Null; + +--- replace aggregate `sum(a)` with `sum(b)` (TRICK) +--- remove aggregate `sum(a)` (TRICK) and add aggregate `sum(b)` +ALTER VIEW 99020_mv_with_target MODIFY QUERY SELECT count() as res, sum(b) as deleted, sum(b) as res2 FROM 99020_stream emit on update; +ALTER VIEW 99020_mv_with_target_hybrid MODIFY QUERY SELECT count() as res, sum(b) as deleted, sum(b) as res2 FROM 99020_stream emit on update settings default_hash_table='hybrid'; +SYSTEM recover materialized view 99020_mv_with_target; --- need to manually recover it since the previous alter request has aborted the mv. +SYSTEM recover materialized view 99020_mv_with_target_hybrid; +SELECT sleep(2) FORMAT Null; +insert into 99020_stream(a, b) values (1, 2); +SELECT sleep(2) FORMAT Null; +SELECT 'memory:'; +SELECT * except _tp_time from table(99020_mv_with_target) order by _tp_time; +SELECT 'hybrid:'; +SELECT * except _tp_time from table(99020_mv_with_target_hybrid) order by _tp_time; + +DROP VIEW IF EXISTS 99020_mv_with_target; +DROP VIEW IF EXISTS 99020_mv_with_target_hybrid; +DROP STREAM IF EXISTS 99020_stream; +DROP STREAM IF EXISTS 99020_target_stream; +DROP STREAM IF EXISTS 99020_target_stream_hybrid; From 7074550eaa94471c4d7cea8ab82565d6bdb15548 Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Thu, 9 Oct 2025 23:38:19 +0800 Subject: [PATCH 05/11] fix read remote loop --- src/Databases/DatabaseOrdinary.cpp | 2 +- src/Storages/MatView/StorageMaterializedView_ReadWrite.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Databases/DatabaseOrdinary.cpp b/src/Databases/DatabaseOrdinary.cpp index 13c45220a0..df89113357 100644 --- a/src/Databases/DatabaseOrdinary.cpp +++ b/src/Databases/DatabaseOrdinary.cpp @@ -302,7 +302,7 @@ void DatabaseOrdinary::loadTableFromMetadata(ContextMutablePtr local_context, co } catch (const Exception & ex) { - LOG_INFO(log, "Failed to load table, error={} ddl={}", ex.message(), queryToString(create_query, true)); + LOG_ERROR(log, "Failed to load table, error={} ddl={}", ex.message(), queryToString(create_query, true)); } } diff --git a/src/Storages/MatView/StorageMaterializedView_ReadWrite.cpp b/src/Storages/MatView/StorageMaterializedView_ReadWrite.cpp index 988fb0f23a..9179f4e862 100644 --- a/src/Storages/MatView/StorageMaterializedView_ReadWrite.cpp +++ b/src/Storages/MatView/StorageMaterializedView_ReadWrite.cpp @@ -28,8 +28,8 @@ void StorageMaterializedView::read( { /// The behavior of querying a MV is same as querying the underlying stream` - /// Read remote storage - if (isVirtualStorage()) + /// Read remote inner storage + if (usesInnerStorage() && isVirtualStorage()) { readRemote(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage); return; From 0ca1b3effd27475f914b11e4cd0e88e3f60bb24f Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Mon, 13 Oct 2025 23:22:54 +0800 Subject: [PATCH 06/11] Merge pull request #81360 from ClickHouse/chesema-mute-expected-404 mute scary logs when 404 is expected response --- .../MetadataStorageFromPlainObjectStorage.cpp | 4 +- src/IO/Expect404ResponseScope.cpp | 29 +++++++++++ src/IO/Expect404ResponseScope.h | 23 +++++++++ src/IO/S3/AWSLogger.cpp | 49 +++++++++++++++++++ src/IO/S3/Client.cpp | 5 +- src/IO/S3/PocoHTTPClient.cpp | 13 ++++- src/IO/S3/getObjectInfo.cpp | 8 +++ 7 files changed, 126 insertions(+), 5 deletions(-) create mode 100644 src/IO/Expect404ResponseScope.cpp create mode 100644 src/IO/Expect404ResponseScope.h diff --git a/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp b/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp index 92f2508c3a..7fa81c843a 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp @@ -1,10 +1,12 @@ -#include "MetadataStorageFromPlainObjectStorage.h" +#include + #include #include #include #include #include +#include #include #include diff --git a/src/IO/Expect404ResponseScope.cpp b/src/IO/Expect404ResponseScope.cpp new file mode 100644 index 0000000000..2fc121f0fb --- /dev/null +++ b/src/IO/Expect404ResponseScope.cpp @@ -0,0 +1,29 @@ +#include "Expect404ResponseScope.h" + +#include + +namespace DB +{ + +thread_local size_t expected_404_scope_count = 0; + +Expect404ResponseScope::Expect404ResponseScope() + : initial_thread_id(std::this_thread::get_id()) +{ + ++expected_404_scope_count; +} + +Expect404ResponseScope::~Expect404ResponseScope() +{ + // check that instance is destroyed in the same thread + chassert(initial_thread_id == std::this_thread::get_id()); + chassert(expected_404_scope_count); + --expected_404_scope_count; +} + +bool Expect404ResponseScope::is404Expected() +{ + return expected_404_scope_count != 0; +} + +} diff --git a/src/IO/Expect404ResponseScope.h b/src/IO/Expect404ResponseScope.h new file mode 100644 index 0000000000..f7df4a08b7 --- /dev/null +++ b/src/IO/Expect404ResponseScope.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +namespace DB +{ + +// Inside the inner scope +// Remote storage response with PATH_NOT_FOUND error is considered as expected +// No error logs are written, no profile events about errors are incremented +class Expect404ResponseScope +{ +public: + Expect404ResponseScope(); + ~Expect404ResponseScope(); + + static bool is404Expected(); + +private: + std::thread::id initial_thread_id; +}; + +} diff --git a/src/IO/S3/AWSLogger.cpp b/src/IO/S3/AWSLogger.cpp index f6d0cad1e7..989ebf910c 100644 --- a/src/IO/S3/AWSLogger.cpp +++ b/src/IO/S3/AWSLogger.cpp @@ -1,4 +1,7 @@ #include +#include +#include +#include #if USE_AWS_MSK_IAM || USE_AWS_S3 /// proton: updated @@ -54,13 +57,59 @@ Aws::Utils::Logging::LogLevel AWSLogger::GetLogLevel() const return Aws::Utils::Logging::LogLevel::Info; } +namespace +{ +/// This function helps to avoid reading the whole str when strlen is called +bool startsWith(const char * str, const char * prefix) +{ + while (*prefix && *str == *prefix) + { + ++str; + ++prefix; + } + return *prefix == 0; +} + +bool is404Muted(const char * message) +{ + /// This is the way, how to mute scary logs from `AWSXMLClient::BuildAWSError` + /// about 404 when 404 is the expected response + if (!Expect404ResponseScope::is404Expected()) + return false; + + static const char * prefix_str = "HTTP response code: "; + static const size_t prefix_len = strlen(prefix_str); + + if (!startsWith(message, prefix_str)) + return false; + + const char * code_str = message + prefix_len; + size_t code_len = 3; + + // check that strlen(code_str) >= code_len + for (size_t i = 0; i < code_len; ++i) + if (!code_str[i]) + return false; + + UInt64 code = 0; + if (!tryParse(code, code_str, code_len)) + return false; + + return code == Poco::Net::HTTPResponse::HTTP_NOT_FOUND; +} +} + void AWSLogger::Log(Aws::Utils::Logging::LogLevel log_level, const char * tag, const char * format_str, ...) // NOLINT { + if (is404Muted(format_str)) + return; callLogImpl(log_level, tag, format_str); /// FIXME. Variadic arguments? } void AWSLogger::LogStream(Aws::Utils::Logging::LogLevel log_level, const char * tag, const Aws::OStringStream & message_stream) { + if (is404Muted(message_stream.str().c_str())) + return; callLogImpl(log_level, tag, message_stream.str().c_str()); } diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 351043def9..4e55cba3cf 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -400,12 +401,12 @@ Model::UploadPartCopyOutcome Client::UploadPartCopy(const UploadPartCopyRequest Model::DeleteObjectOutcome Client::DeleteObject(const DeleteObjectRequest & request) const { - return doRequest(request, [this](const Model::DeleteObjectRequest & req) { return DeleteObject(req); }); + return doRequest(request, [this](const Model::DeleteObjectRequest & req) { Expect404ResponseScope scope; return DeleteObject(req); }); } Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & request) const { - return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); }); + return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { Expect404ResponseScope scope; return DeleteObjects(req); }); } Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest & request) const diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index 10a9c67878..3f21558a4d 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -504,8 +505,16 @@ void PocoHTTPClient::makeRequestInternalImpl( int status_code = static_cast(poco_response.getStatus()); - if (enable_s3_requests_logging) - LOG_TEST(log, "Response status: {}, {}", status_code, poco_response.getReason()); + if (status_code >= SUCCESS_RESPONSE_MIN && status_code <= SUCCESS_RESPONSE_MAX) + { + if (enable_s3_requests_logging) + LOG_TEST(log, "Response status: {}, {}", status_code, poco_response.getReason()); + } + else if (Poco::Net::HTTPResponse::HTTP_NOT_FOUND != status_code || !Expect404ResponseScope::is404Expected()) + { + /// Error statuses are more important so we show them even if `enable_s3_requests_logging == false`. + LOG_ERROR(log, "Response status: {}, {} HttpMethod={} URI={}", status_code, poco_response.getReason(), request.GetMethod(), request.GetURIString()); /// proton : updates + } if (poco_response.getStatus() == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT) { diff --git a/src/IO/S3/getObjectInfo.cpp b/src/IO/S3/getObjectInfo.cpp index e848229517..765c83c1c2 100644 --- a/src/IO/S3/getObjectInfo.cpp +++ b/src/IO/S3/getObjectInfo.cpp @@ -1,4 +1,6 @@ +#include #include +#include #if USE_AWS_S3 @@ -78,6 +80,10 @@ ObjectInfo getObjectInfo( bool for_disk_s3, bool throw_on_error) { + std::optional scope; // 404 is not an error + if (!throw_on_error) + scope.emplace(); + auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, with_metadata, for_disk_s3); if (object_info) { @@ -112,6 +118,8 @@ bool objectExists( const S3Settings::RequestSettings & request_settings, bool for_disk_s3) { + Expect404ResponseScope scope; // 404 is not an error + auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {}, for_disk_s3); if (object_info) return true; From fa09ebf981ac38b976883e8476d3fa6cc6d89bf2 Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Tue, 14 Oct 2025 14:20:43 +0800 Subject: [PATCH 07/11] honor hybrid hash table in stream join table --- src/Interpreters/ExpressionAnalyzer.cpp | 60 +++++++++++++++++-- .../Streaming/HashJoin/ConcurrentHashJoin.cpp | 30 ++++++++++ .../Streaming/HashJoin/ConcurrentHashJoin.h | 6 ++ .../Streaming/HashJoin/HashJoin.cpp | 4 +- .../Streaming/HashJoin/HashJoin.h | 2 +- .../HashJoin/HybridHashJoin/HashIndex.cpp | 60 +++++++++++++++++++ .../HashJoin/HybridHashJoin/HashIndex.h | 2 + .../HybridHashJoin/HybridHashJoin.cpp | 4 +- .../HybridHashJoin_IndexJoin.cpp | 9 ++- .../Streaming/HashJoin/IHashJoin.h | 1 + .../MemoryHashJoin/MemoryHashJoin.cpp | 20 +------ .../Transforms/JoiningTransform.cpp | 36 +++++++++-- src/Processors/Transforms/JoiningTransform.h | 2 +- ...roup_by_primary_key_stateful_function.yaml | 8 ++- ...08_stream_join_hybrid_hash_table.reference | 20 +++++++ .../99108_stream_join_hybrid_hash_table.sql | 46 ++++++++++++++ 16 files changed, 273 insertions(+), 37 deletions(-) create mode 100644 tests/queries_ported/0_stateless/99108_stream_join_hybrid_hash_table.reference create mode 100644 tests/queries_ported/0_stateless/99108_stream_join_hybrid_hash_table.sql diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index f19122e551..8d00fa2661 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1259,7 +1259,11 @@ JoinPtr SelectQueryExpressionAnalyzer::appendJoin( std::shared_ptr tryKeyValueJoin(std::shared_ptr analyzed_join, const Block & right_sample_block); static std::shared_ptr chooseJoinAlgorithm( - std::shared_ptr analyzed_join, const ColumnsWithTypeAndName & left_sample_columns, std::unique_ptr & joined_plan, ContextPtr context) + std::shared_ptr analyzed_join, + const ColumnsWithTypeAndName & left_sample_columns, + std::unique_ptr & joined_plan, + bool streaming, + ContextPtr context) { const auto & settings = context->getSettings(); @@ -1300,9 +1304,55 @@ static std::shared_ptr chooseJoinAlgorithm( analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PARALLEL_HASH)) { tried_algorithms.push_back(toString(JoinAlgorithm::HASH)); - if (analyzed_join->allowParallelHashJoin()) - return std::make_shared(context, analyzed_join, settings.max_threads, right_sample_block); - return std::make_shared(analyzed_join, right_sample_block); + + /// proton: starts. Enable hybrid hash table at right side for case `Stream join Table` + if (streaming && settings.default_hash_join.value == HashJoinType::Hybrid) + { + const auto & tables = analyzed_join->getTablesWithColumns(); + assert(tables.size() == 2); + + /// In order to reuse `HybridHashJoin`, simulate enrichment join via Append join table(Append) + auto left_join_stream_desc = std::make_shared( + tables[0], + Block{}, + Streaming::DataStreamSemantic::Append, + settings.keep_versions, + settings.join_latency_threshold, + settings.join_quiesce_threshold_ms); + + Streaming::DataStreamSemanticEx right_stream_semantic{Streaming::DataStreamSemantic::Append}; + right_stream_semantic.streaming = false; + auto right_join_stream_desc = std::make_shared( + tables[1], + right_sample_block, + right_stream_semantic, + settings.keep_versions, + settings.join_latency_threshold, + settings.join_quiesce_threshold_ms); + + if (analyzed_join->allowParallelHashJoin()) + return std::make_shared( + analyzed_join, + settings.max_threads, + std::move(left_join_stream_desc), + std::move(right_join_stream_desc), + context->getSpillDirForCurrentQuery("join"), + settings.max_hot_keys); + + return std::make_shared( + analyzed_join, + std::move(left_join_stream_desc), + std::move(right_join_stream_desc), + context->getSpillDirForCurrentQuery("join"), + settings.max_hot_keys); + } + else + { + if (analyzed_join->allowParallelHashJoin()) + return std::make_shared(context, analyzed_join, settings.max_threads, right_sample_block); + return std::make_shared(analyzed_join, right_sample_block); + } + /// proton: ends. } if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE)) @@ -1572,7 +1622,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeJoin( if (syntax->streaming && joined_plan->isStreaming()) join = chooseJoinAlgorithmStreaming(analyzed_join); else - join = chooseJoinAlgorithm(analyzed_join, left_columns, joined_plan, getContext()); + join = chooseJoinAlgorithm(analyzed_join, left_columns, joined_plan, syntax->streaming, getContext()); /// proton : ends return join; diff --git a/src/Interpreters/Streaming/HashJoin/ConcurrentHashJoin.cpp b/src/Interpreters/Streaming/HashJoin/ConcurrentHashJoin.cpp index ece18bfc81..40c5fe0019 100644 --- a/src/Interpreters/Streaming/HashJoin/ConcurrentHashJoin.cpp +++ b/src/Interpreters/Streaming/HashJoin/ConcurrentHashJoin.cpp @@ -133,6 +133,11 @@ void ConcurrentHashJoin::transformHeader(Block & header) hash_joins[0]->data->transformHeader(header); } +const Block & ConcurrentHashJoin::getOutputHeader() const +{ + return hash_joins[0]->data->getOutputHeader(); +} + void ConcurrentHashJoin::insertRightBlock(Block right_block) { auto dispatched_blocks = dispatchBlock(right_key_column_positions, std::move(right_block)); @@ -214,6 +219,9 @@ void ConcurrentHashJoin::joinLeftBlock(Block & left_block) } left_block = concatenateBlocks(joined_blocks); + /// If there is no joined block, return an empty block as a heartbeat + if (!left_block) + left_block = getOutputHeader().cloneEmpty(); } template @@ -283,6 +291,10 @@ Block ConcurrentHashJoin::insertBlockAndJoin(Block & block) } block = concatenateBlocks(joined_blocks); + /// If there is no joined block, return an empty block as a heartbeat + if (!block) + block = getOutputHeader().cloneEmpty(); + return concatenateBlocks(retracted_blocks); } @@ -353,6 +365,10 @@ std::vector ConcurrentHashJoin::insertBlockToRangeBucketAndJoin(Block blo } } + /// If there is no joined block, return an empty block as a heartbeat + if (joined_results.empty()) + joined_results.emplace_back(getOutputHeader().cloneEmpty()); + return joined_results; } @@ -382,6 +398,20 @@ void ConcurrentHashJoin::checkTypesOfKeys(const Block & block) const hash_joins[0]->data->checkTypesOfKeys(block); } +void ConcurrentHashJoin::setTotals(const Block & block) +{ + if (block) + { + std::lock_guard lock(totals_mutex); + totals = block; + } +} + +const Block & ConcurrentHashJoin::getTotals() const +{ + return totals; +} + size_t ConcurrentHashJoin::getTotalRowCount() const { size_t res = 0; diff --git a/src/Interpreters/Streaming/HashJoin/ConcurrentHashJoin.h b/src/Interpreters/Streaming/HashJoin/ConcurrentHashJoin.h index bb22ffffc1..da7f2275c7 100644 --- a/src/Interpreters/Streaming/HashJoin/ConcurrentHashJoin.h +++ b/src/Interpreters/Streaming/HashJoin/ConcurrentHashJoin.h @@ -48,6 +48,7 @@ class ConcurrentHashJoin final : public IHashJoin void postInit(const Block & left_header, const Block & output_header_, UInt64 join_max_cached_bytes_) override; void transformHeader(Block & header) override; + const Block & getOutputHeader() const override; /// For non-bidirectional hash join void insertRightBlock(Block right_block) override; @@ -75,6 +76,8 @@ class ConcurrentHashJoin final : public IHashJoin const TableJoin & getTableJoin() const override { return *table_join; } void checkTypesOfKeys(const Block & block) const override; + void setTotals(const Block & block) override; + const Block & getTotals() const override; size_t getTotalRowCount() const override; size_t getTotalByteCount() const override; bool alwaysReturnsEmptySet() const override; @@ -132,6 +135,9 @@ class ConcurrentHashJoin final : public IHashJoin std::vector> hash_joins; size_t num_used_hash_joins; /// Actual number of used hash joins + std::mutex totals_mutex; + Block totals; + std::vector left_key_column_positions; std::vector right_key_column_positions; diff --git a/src/Interpreters/Streaming/HashJoin/HashJoin.cpp b/src/Interpreters/Streaming/HashJoin/HashJoin.cpp index af6da60be5..a84484d9ce 100644 --- a/src/Interpreters/Streaming/HashJoin/HashJoin.cpp +++ b/src/Interpreters/Streaming/HashJoin/HashJoin.cpp @@ -215,9 +215,11 @@ void HashJoin::init() /// SELECT * FROM left_append_only INNER ASOF JOIN right_append_only ON left_append_only.key = right_append_only.key AND left_append_only.timestamp < right_append_only.timestamp SETTINGS keep_versions=3; /// SELECT * FROM left_append_only INNER LATEST JOIN right_append_only ON left_append_only.key = right_append_only.key; /// `ASOF` keeps multiple versions and `LATEST` only keeps the latest version for the join key + /// Also for `Stream join Table` auto data_enrichment_join = (left_join_ctx.join_stream_desc->data_stream_semantic == DataStreamSemantic::Append && right_join_ctx.join_stream_desc->data_stream_semantic == DataStreamSemantic::Changelog) - || streaming_strictness == Strictness::Asof || (streaming_strictness == Strictness::Latest && streaming_kind != Kind::Full); + || streaming_strictness == Strictness::Asof || (streaming_strictness == Strictness::Latest && streaming_kind != Kind::Full) + || !right_join_ctx.join_stream_desc->data_stream_semantic.streaming; bidirectional_hash_join = !data_enrichment_join; diff --git a/src/Interpreters/Streaming/HashJoin/HashJoin.h b/src/Interpreters/Streaming/HashJoin/HashJoin.h index aaa5887b08..b2a3e07fa3 100644 --- a/src/Interpreters/Streaming/HashJoin/HashJoin.h +++ b/src/Interpreters/Streaming/HashJoin/HashJoin.h @@ -46,7 +46,7 @@ class HashJoin : public IHashJoin UInt64 keepVersions() const { return right_join_ctx.join_stream_desc->keep_versions; } - const Block & getOutputHeader() const { return output_header; } + const Block & getOutputHeader() const override { return output_header; } JoinKind getKind() const { return kind; } JoinStrictness getStrictness() const { return strictness; } diff --git a/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HashIndex.cpp b/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HashIndex.cpp index 31901e4497..3df5bfbed9 100644 --- a/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HashIndex.cpp +++ b/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HashIndex.cpp @@ -228,4 +228,64 @@ std::vector HashIndex::assignDataBlockToRangeBuckets(Blo return bucket_assigned_blocks; } + +size_t HashIndex::approximateCount() const +{ + size_t count = 0; + auto collect_count = [&](const HybridHashJoinMapsVariants & index) { + std::vector maps_vector; + maps_vector.reserve(index.size()); + for (size_t i = 0; i < index.size(); ++i) + maps_vector.push_back(&index[i]); + + hybridJoinDispatch( + join->getStreamingKind(), + join->getStreamingStrictness(), + maps_vector, + [&](auto /*kind_*/, auto /*strictness_*/, const auto & maps_vector_) { + for (const auto & map : maps_vector_) + count += map->table.approximateCount(); + }); + }; + + { + std::scoped_lock lock(mutex); + collect_count(*current_hash_index); + + for (const auto & [_, index_with_timestamps] : range_bucket_hash_indexes) + collect_count(*index_with_timestamps.index); + } + + return count; +} + +size_t HashIndex::getBufferSizeInBytes() const +{ + size_t bytes = 0; + auto collect_bytes = [&](const HybridHashJoinMapsVariants & index) { + std::vector maps_vector; + maps_vector.reserve(index.size()); + for (size_t i = 0; i < index.size(); ++i) + maps_vector.push_back(&index[i]); + + hybridJoinDispatch( + join->getStreamingKind(), + join->getStreamingStrictness(), + maps_vector, + [&](auto /*kind_*/, auto /*strictness_*/, const auto & maps_vector_) { + for (const auto & map : maps_vector_) + bytes += map->table.getBufferSizeInBytes(); + }); + }; + + { + std::scoped_lock lock(mutex); + collect_bytes(*current_hash_index); + + for (const auto & [_, index_with_timestamps] : range_bucket_hash_indexes) + collect_bytes(*index_with_timestamps.index); + } + + return bytes; +} } diff --git a/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HashIndex.h b/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HashIndex.h index b8d4f0a8f4..cededb63ac 100644 --- a/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HashIndex.h +++ b/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HashIndex.h @@ -85,6 +85,8 @@ SERDE struct HashIndex } const CachedBlockMetrics & getJoinMetrics() const { return metrics; } + size_t approximateCount() const; + size_t getBufferSizeInBytes() const; void serialize(WriteBuffer & wb, VersionType version) const; void deserialize(ReadBuffer & rb, VersionType version); diff --git a/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HybridHashJoin.cpp b/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HybridHashJoin.cpp index 21721ec663..ac785f8d07 100644 --- a/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HybridHashJoin.cpp +++ b/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HybridHashJoin.cpp @@ -212,13 +212,13 @@ void HybridHashJoin::cancel() size_t HybridHashJoin::getTotalRowCount() const { - return 0; + return right_data.index->approximateCount() + left_data.index->approximateCount(); } /// Sum size in bytes of all buffers, used for JOIN maps and for all memory pools. size_t HybridHashJoin::getTotalByteCount() const { - return 0; + return right_data.index->getBufferSizeInBytes() + left_data.index->getBufferSizeInBytes(); } bool HybridHashJoin::alwaysReturnsEmptySet() const diff --git a/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HybridHashJoin_IndexJoin.cpp b/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HybridHashJoin_IndexJoin.cpp index ad14729374..138beac471 100644 --- a/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HybridHashJoin_IndexJoin.cpp +++ b/src/Interpreters/Streaming/HashJoin/HybridHashJoin/HybridHashJoin_IndexJoin.cpp @@ -1061,7 +1061,7 @@ void HybridHashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) /// Use left_block to join right hash table void HybridHashJoin::joinLeftBlock(Block & left_block) { - chassert(!bidirectional_hash_join && !range_bidirectional_hash_join && !emit_changelog); + chassert(!bidirectional_hash_join && !range_bidirectional_hash_join); /// SELECT * FROM append_only [INNER | LEFT | RIGHT | FULL] JOIN versioned_kv /// SELECT * FROM append_only ASOF JOIN versioned_kv @@ -1354,7 +1354,8 @@ void HybridHashJoin::transformToOutputBlock(Block & joined_block) const /// Please note we didn't reorder columns according to output header if block is empty to save some cpu cycles /// Caller shall check if the retracted block is empty and avoid pushing this empty block downstream since /// this empty block's structure probably doesn't match the output header - if (!joined_block.rows()) + /// Also for stream join hybrid table + if (!joined_block.rows() || !right_data.join_ctx.join_stream_desc->data_stream_semantic.streaming) return; if constexpr (is_left_block) @@ -1424,6 +1425,10 @@ void HybridHashJoin::transformHeader(Block & header) else joinLeftBlock(header); + /// Doesn't handle _tp_delta for stream join hybrid table + if (!right_data.join_ctx.join_stream_desc->data_stream_semantic.streaming) + return; + /// Remove internal left/right delta column { std::set delta_pos; diff --git a/src/Interpreters/Streaming/HashJoin/IHashJoin.h b/src/Interpreters/Streaming/HashJoin/IHashJoin.h index da2fdcc17e..f3bd4ae3cb 100644 --- a/src/Interpreters/Streaming/HashJoin/IHashJoin.h +++ b/src/Interpreters/Streaming/HashJoin/IHashJoin.h @@ -15,6 +15,7 @@ class IHashJoin : public IJoin virtual void postInit(const Block & left_header, const Block & output_header_, UInt64 join_max_cached_bytes_) = 0; virtual void transformHeader(Block & header) = 0; + virtual const Block & getOutputHeader() const = 0; /// For non-bidirectional hash join virtual void insertRightBlock(Block right_block) = 0; diff --git a/src/Interpreters/Streaming/HashJoin/MemoryHashJoin/MemoryHashJoin.cpp b/src/Interpreters/Streaming/HashJoin/MemoryHashJoin/MemoryHashJoin.cpp index effbf24623..a04ed61d3b 100644 --- a/src/Interpreters/Streaming/HashJoin/MemoryHashJoin/MemoryHashJoin.cpp +++ b/src/Interpreters/Streaming/HashJoin/MemoryHashJoin/MemoryHashJoin.cpp @@ -821,28 +821,12 @@ bool MemoryHashJoin::alwaysReturnsEmptySet() const size_t MemoryHashJoin::getTotalRowCount() const { - size_t res = 0; - /// FIXME - // for (const auto & map : right_data.buffered_data->maps) - // { - // joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { res += map_.getTotalRowCount(right_data.buffered_data->type); }); - // } - - return res; + return right_data.buffered_data->getJoinMetrics().total_rows + left_data.buffered_data->getJoinMetrics().total_rows; } size_t MemoryHashJoin::getTotalByteCount() const { - size_t res = 0; - /// FIXME - - // for (const auto & map : right_data.buffered_data->maps) - // { - // joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { res += map_.getTotalByteCountImpl(right_data.buffered_data->type); }); - // } - // res += right_data.buffered_data->pool.size(); - - return res; + return right_data.buffered_data->getJoinMetrics().totalBytes() + left_data.buffered_data->getJoinMetrics().totalBytes(); } bool MemoryHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/) diff --git a/src/Processors/Transforms/JoiningTransform.cpp b/src/Processors/Transforms/JoiningTransform.cpp index 1a16a672a3..8a58a7122a 100644 --- a/src/Processors/Transforms/JoiningTransform.cpp +++ b/src/Processors/Transforms/JoiningTransform.cpp @@ -4,6 +4,10 @@ #include +/// proton: starts. +#include +/// proton: ends. + namespace DB { @@ -15,10 +19,19 @@ namespace ErrorCodes Block JoiningTransform::transformHeader(Block header, const JoinPtr & join) { LOG_DEBUG(getLogger("JoiningTransform"), "Before join block: '{}'", header.dumpStructure()); - join->checkTypesOfKeys(header); - join->initialize(header); - ExtraBlockPtr tmp; - join->joinBlock(header, tmp); + /// proton: starts. Add for join hybrid hash table + if (auto hash_join = std::dynamic_pointer_cast(join)) + { + hash_join->transformHeader(header); + } + else + { + join->checkTypesOfKeys(header); + join->initialize(header); + ExtraBlockPtr tmp; + join->joinBlock(header, tmp); + } + /// proton: ends. LOG_DEBUG(getLogger("JoiningTransform"), "After join block: '{}'", header.dumpStructure()); return header; } @@ -40,6 +53,11 @@ JoiningTransform::JoiningTransform( { if (!join->isFilled()) inputs.emplace_back(Block(), this); // Wait for FillingRightJoinSideTransform + + /// proton: starts. Add for join hybrid hash table + if (auto hash_join = std::dynamic_pointer_cast(join)) + hash_join->postInit(input_header, output_header, /*join_max_cached_bytes=*/std::numeric_limits::max()); + /// proton: ends. } JoiningTransform::~JoiningTransform() = default; @@ -231,6 +249,16 @@ FillingRightJoinSideTransform::FillingRightJoinSideTransform(Block input_header, , join(std::move(join_)) {} +/// proton: starts. +String FillingRightJoinSideTransform::getName() const +{ + if (auto hash_join = std::dynamic_pointer_cast(join)) + return fmt::format("FillingRightJoinSide({})", hash_join->type()); + else + return "FillingRightJoinSide"; +} +/// proton: ends. + InputPort * FillingRightJoinSideTransform::addTotalsPort() { if (inputs.size() > 1) diff --git a/src/Processors/Transforms/JoiningTransform.h b/src/Processors/Transforms/JoiningTransform.h index dd70ebd1d3..bef411e115 100644 --- a/src/Processors/Transforms/JoiningTransform.h +++ b/src/Processors/Transforms/JoiningTransform.h @@ -95,7 +95,7 @@ class FillingRightJoinSideTransform : public IProcessor { public: FillingRightJoinSideTransform(Block input_header, JoinPtr join_); - String getName() const override { return "FillingRightJoinSide"; } + String getName() const override; InputPort * addTotalsPort(); diff --git a/tests/cluster/smoke/0013_changelog_stream6/04_substream_global_aggregation_partition_by_primary_key_group_by_primary_key_stateful_function.yaml b/tests/cluster/smoke/0013_changelog_stream6/04_substream_global_aggregation_partition_by_primary_key_group_by_primary_key_stateful_function.yaml index a165baf752..9e4424fb7f 100644 --- a/tests/cluster/smoke/0013_changelog_stream6/04_substream_global_aggregation_partition_by_primary_key_group_by_primary_key_stateful_function.yaml +++ b/tests/cluster/smoke/0013_changelog_stream6/04_substream_global_aggregation_partition_by_primary_key_group_by_primary_key_stateful_function.yaml @@ -23,7 +23,7 @@ steps: type: stream query: | select max(val), min(val), avg(val), id from changelog(test14_substream_6, id) - partition by id group by id; + partition by id group by id emit periodic 1ms; schema: - name: max(val) type: int32 @@ -67,6 +67,8 @@ steps: - [1, 1, 1, 2] - [3, 3, 3, 3] - [5, 5, 5, 1] - - [14, 14, 14, 2] - - [10, 10, 10, 1] + - [8, 8, 8, 2] - [9, 9, 9, 3] + - [11, 11, 11, 2] + - [10, 10, 10, 1] + - [14, 14, 14, 2] diff --git a/tests/queries_ported/0_stateless/99108_stream_join_hybrid_hash_table.reference b/tests/queries_ported/0_stateless/99108_stream_join_hybrid_hash_table.reference new file mode 100644 index 0000000000..91a1aa9da8 --- /dev/null +++ b/tests/queries_ported/0_stateless/99108_stream_join_hybrid_hash_table.reference @@ -0,0 +1,20 @@ +====== Stream join hybrid hash table ====== +1 a1 1 a1 +2 a2 2 a2 +3 a3 3 a3 +4 a4 4 a4 +5 a5 5 a5 +1 aa1 1 a1 +--- +5 +5 +====== Concurrent stream join hybrid hash table ====== +1 a1 1 a1 +2 a2 2 a2 +3 a3 3 a3 +4 a4 4 a4 +5 a5 5 a5 +1 aa1 1 a1 +--- +5 +5 diff --git a/tests/queries_ported/0_stateless/99108_stream_join_hybrid_hash_table.sql b/tests/queries_ported/0_stateless/99108_stream_join_hybrid_hash_table.sql new file mode 100644 index 0000000000..cef515e3d2 --- /dev/null +++ b/tests/queries_ported/0_stateless/99108_stream_join_hybrid_hash_table.sql @@ -0,0 +1,46 @@ +drop view if exists 99108_mv; +drop view if exists 99108_mv2; +drop view if exists 99108_mv3; +drop view if exists 99108_mv4; +drop stream if exists 99108_kv; +create stream 99108_kv(i int, v string) primary key i settings mode='versioned_kv',flush_threshold_count=1; + +insert into 99108_kv(i, v) values(1, 'a1')(2, 'a2')(3, 'a3')(4, 'a4')(5, 'a5'); + +--- stream join hybrid hash table +create materialized view 99108_mv as select a.i, a.v, b.i, b.v from 99108_kv as a join table(99108_kv) as b on a.i = b.i settings default_hash_join='hybrid', max_hot_keys=2 STORAGE_SETTINGS flush_threshold_count=1; +create materialized view 99108_mv2 as select count() as cnt from 99108_kv as a join table(99108_kv) as b on a.i = b.i emit periodic 1s settings default_hash_join='hybrid', max_hot_keys=2 STORAGE_SETTINGS flush_threshold_count=1; +--- Concurrent stream join hybrid hash table +create materialized view 99108_mv3 as select a.i, a.v, b.i, b.v from 99108_kv as a join table(99108_kv) as b on a.i = b.i settings default_hash_join='hybrid', max_hot_keys=2, join_algorithm='parallel_hash', max_threads=8 STORAGE_SETTINGS flush_threshold_count=1; +create materialized view 99108_mv4 as select count() as cnt from 99108_kv as a join table(99108_kv) as b on a.i = b.i emit periodic 1s settings default_hash_join='hybrid', max_hot_keys=2, join_algorithm='parallel_hash', max_threads=8 STORAGE_SETTINGS flush_threshold_count=1; + +select sleep(2) format Null; + +--- Validate checkpoint +system pause materialized view 99108_mv; +system pause materialized view 99108_mv2; +system pause materialized view 99108_mv3; +system pause materialized view 99108_mv4; +system resume materialized view 99108_mv; +system resume materialized view 99108_mv2; +system resume materialized view 99108_mv3; +system resume materialized view 99108_mv4; + +select sleep(2) format Null; +insert into 99108_kv(i, v) values(1, 'aa1'); + +select sleep(3) format Null; +select '====== Stream join hybrid hash table ======'; +select * except _tp_time from table(99108_mv) order by _tp_sn, i; +select '---'; +select * except _tp_time from table(99108_mv2) order by _tp_sn; +select '====== Concurrent stream join hybrid hash table ======'; +select * except _tp_time from table(99108_mv3) order by _tp_sn, i; +select '---'; +select * except _tp_time from table(99108_mv4) order by _tp_sn; + +drop view if exists 99108_mv; +drop view if exists 99108_mv2; +drop view if exists 99108_mv3; +drop view if exists 99108_mv4; +drop stream if exists 99108_kv; From 6ab53053d02e791cf5acc914abd83b7f85b60831 Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Fri, 17 Oct 2025 11:24:22 +0800 Subject: [PATCH 08/11] fix streaming distinct processing --- src/AggregateFunctions/IAggregateFunction.h | 6 ++- .../Streaming/AggregateFunctionDistinct.h | 47 ++++++++++++------- .../AggregateFunctionDistinctRetract.h | 21 ++++++--- .../HybridAggregator_Execute.cpp | 2 +- .../Streaming/Aggregator/IAggregator.cpp | 9 ++++ .../Streaming/Aggregator/IAggregator.h | 4 ++ .../MemoryAggregator_ExecuteAndRetract.cpp | 4 +- ...changelog_kv_stream_with_single_shard.yaml | 13 +++-- 8 files changed, 72 insertions(+), 34 deletions(-) diff --git a/src/AggregateFunctions/IAggregateFunction.h b/src/AggregateFunctions/IAggregateFunction.h index 5ed811cc32..d8b5f54206 100644 --- a/src/AggregateFunctions/IAggregateFunction.h +++ b/src/AggregateFunctions/IAggregateFunction.h @@ -189,8 +189,10 @@ class IAggregateFunction : public std::enable_shared_from_this(&rhs); - auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place); - if (find_place == merged_places.end()) - merged_places.emplace_back(merged_place); + if (move_rhs) + { + uintptr_t merged_place = reinterpret_cast(&rhs); + auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place); + if (find_place == merged_places.end()) + merged_places.emplace_back(merged_place); + } } void serialize(WriteBuffer & buf) const @@ -181,7 +184,7 @@ struct AggregateFunctionDistinctGenericData bool use_extra_data = false; - void merge(const Self & rhs, Arena * arena) + void merge(const Self & rhs, Arena * arena, bool move_rhs) { Set::LookupResult it; bool inserted; @@ -216,10 +219,13 @@ struct AggregateFunctionDistinctGenericData set.merge(rhs.set); - uintptr_t merged_place = reinterpret_cast(&rhs); - auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place); - if (find_place == merged_places.end()) - merged_places.emplace_back(merged_place); + if (move_rhs) + { + uintptr_t merged_place = reinterpret_cast(&rhs); + auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place); + if (find_place == merged_places.end()) + merged_places.emplace_back(merged_place); + } } void serialize(WriteBuffer & buf) const @@ -382,7 +388,7 @@ struct AggregateFunctionDistinctGenericDataWithoutArena return argument_columns; } - void merge(const AggregateFunctionDistinctGenericDataWithoutArena & rhs, Arena *) + void merge(const AggregateFunctionDistinctGenericDataWithoutArena & rhs, Arena *, bool move_rhs) { /// Deduplicate owned extra data based on rhs, also make sure it doesn't exist in rhs extra data for (auto next = extra_data_since_last_finalize.begin(); next != extra_data_since_last_finalize.end();) @@ -412,10 +418,13 @@ struct AggregateFunctionDistinctGenericDataWithoutArena set.insert(rhs.set.begin(), rhs.set.end()); - uintptr_t merged_place = reinterpret_cast(&rhs); - auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place); - if (find_place == merged_places.end()) - merged_places.emplace_back(merged_place); + if (move_rhs) + { + uintptr_t merged_place = reinterpret_cast(&rhs); + auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place); + if (find_place == merged_places.end()) + merged_places.emplace_back(merged_place); + } } void serialize(WriteBuffer & buf) const @@ -488,7 +497,13 @@ class AggregateFunctionDistinct : public IAggregateFunctionDataHelperdata(place).merge(this->data(rhs), arena); + this->data(place).merge(this->data(rhs), arena, /*move_rhs=*/true); + nested_func->merge(getNestedPlace(place), getNestedPlace(rhs), arena); + } + + void copy(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + this->data(place).merge(this->data(rhs), arena, /*move_rhs=*/false); nested_func->merge(getNestedPlace(place), getNestedPlace(rhs), arena); } diff --git a/src/AggregateFunctions/Streaming/AggregateFunctionDistinctRetract.h b/src/AggregateFunctions/Streaming/AggregateFunctionDistinctRetract.h index 64a802ceef..6d63c65a9f 100644 --- a/src/AggregateFunctions/Streaming/AggregateFunctionDistinctRetract.h +++ b/src/AggregateFunctions/Streaming/AggregateFunctionDistinctRetract.h @@ -33,7 +33,7 @@ struct AggregateFunctionDistinctRetractGenericData AggregateFunctionDistinctRetractGenericData() : map(INTERNAL_MAP_SIZE) { } - void merge(const Self & rhs) + void merge(const Self & rhs, bool move_rhs) { for (auto next = extra_data_since_last_finalize.begin(); next != extra_data_since_last_finalize.end();) { @@ -68,10 +68,13 @@ struct AggregateFunctionDistinctRetractGenericData map.merge(rhs.map); - uintptr_t merged_place = reinterpret_cast(&rhs); - auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place); - if (find_place == merged_places.end()) - merged_places.emplace_back(merged_place); + if (move_rhs) + { + uintptr_t merged_place = reinterpret_cast(&rhs); + auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place); + if (find_place == merged_places.end()) + merged_places.emplace_back(merged_place); + } } void serialize(WriteBuffer & buf) const @@ -209,10 +212,16 @@ class AggregateFunctionDistinctRetract : public IAggregateFunctionDataHelperdata(place).merge(this->data(rhs)); + this->data(place).merge(this->data(rhs), /*move_rhs=*/true); nested_func->merge(getNestedPlace(place), getNestedPlace(rhs), arena); } + void copy(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + this->data(place).merge(this->data(rhs), /*move_rhs=*/false); + nested_func->copy(getNestedPlace(place), getNestedPlace(rhs), arena); + } + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override { this->data(place).serialize(buf); diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp index ae1b9e7bf5..b113c2366b 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp @@ -182,7 +182,7 @@ template if (retracts_emplace_result.isInserted() && !TrackingCount::empty(src_place + tracking_count_offset)) { auto * dst_place = static_cast(retracts_emplace_result.getMutableMapped()); - mergeAggregateStates(dst_place, src_place, /*arena=*/nullptr); + copyAggregateStates(dst_place, src_place, /*arena=*/nullptr); TrackingCount::merge(dst_place + tracking_count_offset, src_place + tracking_count_offset); } } diff --git a/src/Interpreters/Streaming/Aggregator/IAggregator.cpp b/src/Interpreters/Streaming/Aggregator/IAggregator.cpp index 844bd846e8..7277dbd737 100644 --- a/src/Interpreters/Streaming/Aggregator/IAggregator.cpp +++ b/src/Interpreters/Streaming/Aggregator/IAggregator.cpp @@ -240,6 +240,15 @@ ALWAYS_INLINE void IAggregator::mergeAggregateStates( } } +ALWAYS_INLINE void IAggregator::copyAggregateStates(AggregateDataPtr dst, ConstAggregateDataPtr src, Arena * arena) const +{ + chassert(src); + chassert(dst); + + for (size_t i = 0; i < params->aggregates_size; ++i) + aggregate_functions[i]->copy(dst + offsets_of_aggregate_states[i], src + offsets_of_aggregate_states[i], arena); +} + ALWAYS_INLINE void IAggregator::doDestroyAggregateStates(AggregateDataPtr place) const { if (!all_aggregates_has_trivial_destructor && place) diff --git a/src/Interpreters/Streaming/Aggregator/IAggregator.h b/src/Interpreters/Streaming/Aggregator/IAggregator.h index 437e7dae87..d2df214947 100644 --- a/src/Interpreters/Streaming/Aggregator/IAggregator.h +++ b/src/Interpreters/Streaming/Aggregator/IAggregator.h @@ -165,9 +165,13 @@ class IAggregator PaddedPODArray & places, OutputBlockColumns && out_cols, Arena * arena, bool use_compiled_functions) const; Block insertResultsIntoColumns(PaddedPODArray & places, OutputBlockColumns && out_cols, Arena * arena) const; + /// Merging state for finalization void mergeAggregateStates( AggregateDataPtr dst, ConstAggregateDataPtr src, Arena * arena, bool skip_compiled_aggregate_functions = false) const; + /// Copying state for retraction + void copyAggregateStates(AggregateDataPtr dst, ConstAggregateDataPtr src, Arena * arena) const; + void doDestroyAggregateStates(AggregateDataPtr place) const; private: diff --git a/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator_ExecuteAndRetract.cpp b/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator_ExecuteAndRetract.cpp index 1db13a23da..8c3720460e 100644 --- a/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator_ExecuteAndRetract.cpp +++ b/src/Interpreters/Streaming/Aggregator/MemoryAggregator/MemoryAggregator_ExecuteAndRetract.cpp @@ -57,7 +57,7 @@ std::pair MemoryAggregator::executeAndRetractOnBlock( retract_data = aggregate_data; if (!TrackingUpdatesWithRetract::empty(result.without_key)) { - mergeAggregateStates(retract_data, result.without_key, result.retract_pool.get()); + copyAggregateStates(retract_data, result.without_key, result.retract_pool.get()); TrackingUpdatesWithRetract::merge(retract_data, result.without_key); } } @@ -148,7 +148,7 @@ bool MemoryAggregator::executeAndRetractImpl( retract_data = tmp_retract; if (!TrackingUpdatesWithRetract::empty(aggregate_data)) { - mergeAggregateStates(retract_data, aggregate_data, result.retract_pool.get()); + copyAggregateStates(retract_data, aggregate_data, result.retract_pool.get()); TrackingUpdatesWithRetract::merge(retract_data, aggregate_data); } } diff --git a/tests/cluster/smoke/0013_changelog_stream13/0_global_aggr_on_changelog_kv_stream_with_single_shard.yaml b/tests/cluster/smoke/0013_changelog_stream13/0_global_aggr_on_changelog_kv_stream_with_single_shard.yaml index c54ea954ac..15acf2b681 100644 --- a/tests/cluster/smoke/0013_changelog_stream13/0_global_aggr_on_changelog_kv_stream_with_single_shard.yaml +++ b/tests/cluster/smoke/0013_changelog_stream13/0_global_aggr_on_changelog_kv_stream_with_single_shard.yaml @@ -5,7 +5,6 @@ tags: - single shard - checkpoint - emit changelog -- bug # https://github.com/timeplus-io/proton-enterprise/issues/10548 description: sum_distinct and count_distinct for changelog_kv stream with single shard. cluster: - p1k1 @@ -24,7 +23,7 @@ steps: - name: '12181627214' type: stream query_id: '12181627214' - query: subscribe to select sum_distinct(val), sum(val), count_distinct(val), count(val), _tp_delta from v_12181627 emit changelog settings checkpoint_interval=1; + query: subscribe to select sum_distinct(val), sum(val), count_distinct(val), count(val), _tp_delta from v_12181627 emit changelog periodic 1ms settings checkpoint_interval=1; schema: - name: sum_distinct(val) type: int64 @@ -69,27 +68,27 @@ steps: - type: query sql: insert into v_12181627(id, val) values (4, 40)(5, 40); - type: wait - time: 2 + time: 1 - type: query sql: insert into v_12181627(id, val, _tp_delta) values (4, 40, -1)(50, 40, -1); - type: wait - time: 2 + time: 1 - type: query sql: insert into v_12181627(id, val) values (4, 40)(5, 50); - type: wait - time: 2 + time: 1 - type: query sql: insert into v_12181627(id, val, _tp_delta) values (4, 40, -1)(50, 50, -1); - type: wait - time: 2 + time: 1 - type: query sql: kill query where query_id='12181627214' sync - type: wait - time: 3 + time: 2 - type: query sql: unsubscribe to '12181627214' From 31fd50014daa563c786ffcc41c00b3f7e93933c3 Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Sat, 18 Oct 2025 23:37:25 +0800 Subject: [PATCH 09/11] fix hybrid multishard aggr --- .../TimeBucketHybridHashTable.h | 2 + .../HybridAggregator/HybridAggregator.h | 14 +- .../HybridAggregator_Convert.h | 260 +++++++++++------- .../HybridAggregator_Execute.cpp | 2 +- .../HybridAggregator_WithoutKey.cpp | 68 ++--- .../99103_aggr_with_emit_changelog.reference | 15 +- .../99103_aggr_with_emit_changelog.sql | 54 ++-- 7 files changed, 237 insertions(+), 178 deletions(-) diff --git a/src/Common/HybridHashTable/TimeBucketHybridHashTable.h b/src/Common/HybridHashTable/TimeBucketHybridHashTable.h index a3bc1de729..0b024c8ae6 100644 --- a/src/Common/HybridHashTable/TimeBucketHybridHashTable.h +++ b/src/Common/HybridHashTable/TimeBucketHybridHashTable.h @@ -248,6 +248,8 @@ class TimeBucketHybridHashTable return metrics; } + bool empty() const { return approximateCount() == 0; } + size_t approximateCount() const { size_t estimated_keys = 0; diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h index 58332af41d..c5685839da 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h @@ -101,10 +101,10 @@ class HybridAggregator final : public IAggregator HybridAggregatedDataVariants & result, size_t row_begin, size_t row_end, - AggregateFunctionInstruction * aggregate_instructions) const; + AggregateFunctionInstruction * aggregate_instructions, + bool tracking_retracts) const; - BlocksList convertToBlocksWithoutKey(HybridAggregatedDataVariants & data_variants, bool merged_variants, bool final_) const; - BlocksList convertToBlocksWithoutKeyForRetractsMerged(HybridAggregatedDataVariants & data_variants, bool final_) const; + BlocksList convertToBlocksWithoutKey(HybridAggregatedDataVariants & data_variants, bool final_) const; BlocksList convertToBlocksWithoutKeyForRetracts(HybridAggregatedDataVariants & data_variants, bool final_) const; Block doConvertOnePlace(AggregateDataPtr data, const Block & res_header, bool final_) const; @@ -227,12 +227,8 @@ class HybridAggregator final : public IAggregator template void mergeRetracts( - Table & dst, - Table * dst_retracts, - const std::vector & srcs, - const std::vector
& src_updates, - const std::vector
& src_retracts, - Arena & arena) const; + Table & dst, Table * dst_retracts, const std::vector
& srcs, const std::vector
& src_retracts, Arena & arena) + const; private: friend struct HybridAggregatedDataVariants; diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Convert.h b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Convert.h index 2f5d51797b..087bb47999 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Convert.h +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Convert.h @@ -53,7 +53,7 @@ HybridAggregator::convertToBlocks(IAggregatedDataVariants & variants, size_t /*m } case HybridHashType::WithoutKey: { - blocks = convertToBlocksWithoutKey(data_variants, /*merged_variants=*/false, /*final=*/true); + blocks = convertToBlocksWithoutKey(data_variants, /*final=*/true); break; } @@ -174,20 +174,44 @@ BlocksList HybridAggregator::convertToBlocksForAll(Table & table) const if (params->group_by != IAggregatorParams::GroupBy::UserDefined) { - table.forBatchValue( - std::min(max_block_size, table.getConfig().max_hot_key_count), - [&](const KeyGetter::KeyType & key, auto value, bool flush) { - KeyGetter::insertKeyIntoColumns(key, out_cols.raw_key_columns, key_sizes_ref); - places.emplace_back(static_cast(value.getMapped())); - - /// If reached max block size, finalize the block and start a new one - if (flush) - { - res.emplace_back(insertResultsIntoColumns(places, std::move(out_cols), /*arena=*/nullptr)); - init_out_cols(); - } - }, - done_callback); + if (trackingStateCount()) + { + table.forBatchValue( + std::min(max_block_size, table.getConfig().max_hot_key_count), + [&](const KeyGetter::KeyType & key, auto value, bool flush) { + auto mapped = static_cast(value.getMapped()); + if (!TrackingCount::empty(mapped)) + { + KeyGetter::insertKeyIntoColumns(key, out_cols.raw_key_columns, key_sizes_ref); + places.emplace_back(mapped); + } + + /// If reached max block size, finalize the block and start a new one + if (flush) + { + res.emplace_back(insertResultsIntoColumns(places, std::move(out_cols), /*arena=*/nullptr)); + init_out_cols(); + } + }, + done_callback); + } + else + { + table.forBatchValue( + std::min(max_block_size, table.getConfig().max_hot_key_count), + [&](const KeyGetter::KeyType & key, auto value, bool flush) { + KeyGetter::insertKeyIntoColumns(key, out_cols.raw_key_columns, key_sizes_ref); + places.emplace_back(static_cast(value.getMapped())); + + /// If reached max block size, finalize the block and start a new one + if (flush) + { + res.emplace_back(insertResultsIntoColumns(places, std::move(out_cols), /*arena=*/nullptr)); + init_out_cols(); + } + }, + done_callback); + } } else { @@ -485,7 +509,7 @@ BlocksList HybridAggregator::mergeAndConvertToBlocks( initStates(result); mergeWithoutKey(result, many_data_variants, merge_arena); - return convertToBlocksWithoutKey(result, /*merged_variants=*/true, /*final=*/true); + return convertToBlocksWithoutKey(result, /*final=*/true); } #define M(NAME, IS_TWO_LEVEL) \ @@ -589,8 +613,6 @@ void HybridAggregator::mergeWithoutKey( case TrackingUpdatesType::None: [[fallthrough]]; case TrackingUpdatesType::Updates: - [[fallthrough]]; - case TrackingUpdatesType::UpdatesWithRetract: { for (auto & data_variants : many_data_variants) { @@ -600,19 +622,55 @@ void HybridAggregator::mergeWithoutKey( continue; mergeAggregateStates(result.without_key.get(), src_variants->without_key.get(), &arena); + } + break; + } + case TrackingUpdatesType::UpdatesWithRetract: + { + auto has_retract = std::ranges::any_of(many_data_variants, [](const auto & data_variants) { + chassert(data_variants->aggregatorType() == AggregatorType::Hybrid); + return static_cast(data_variants.get())->without_key_retracts != nullptr; + }); + + for (auto & data_variants : many_data_variants) + { + auto * src_variants = static_cast(data_variants.get()); + if (!src_variants->without_key) + continue; + + /// 1) Merge current state + mergeAggregateStates(result.without_key.get(), src_variants->without_key.get(), &arena); + TrackingCount::merge( + result.without_key.get() + tracking_count_offset, src_variants->without_key.get() + tracking_count_offset); + + /// 2) Merge retract state + if (!has_retract) + continue; + + if (!result.without_key_retracts) + result.initWithoutKeyRetractStates(total_size_of_aggregate_states, align_aggregate_states); if (src_variants->without_key_retracts) { - if (!result.without_key_retracts) - result.initWithoutKeyRetractStates(total_size_of_aggregate_states, align_aggregate_states); - - mergeAggregateStates(result.without_key_retracts.get(), src_variants->without_key_retracts.get(), &arena); + /// 2.1) The key was added/updated in some shard (has retract state or empty retract state), merge with the retracted state, then reset it + if (!TrackingCount::empty(src_variants->without_key_retracts.get())) + { + mergeAggregateStates(result.without_key_retracts.get(), src_variants->without_key_retracts.get(), &arena); + TrackingCount::merge( + result.without_key_retracts.get() + tracking_count_offset, + src_variants->without_key_retracts.get() + tracking_count_offset); + } - /// After merge the retract state to result, clean it up src_variants->resetRetractWithoutKey(); } + else + { + /// 2.2) The key is not found in some shards (no retract state), merge with current state + mergeAggregateStates(result.without_key_retracts.get(), src_variants->without_key.get(), &arena); + TrackingCount::merge( + result.without_key_retracts.get() + tracking_count_offset, src_variants->without_key.get() + tracking_count_offset); + } } - break; } } @@ -642,7 +700,7 @@ ALWAYS_INLINE void HybridAggregator::merge( } case TrackingUpdatesType::UpdatesWithRetract: { - mergeRetracts(dst, dst_retracts, srcs, src_updates, src_retracts, arena); + mergeRetracts(dst, dst_retracts, srcs, src_retracts, arena); break; } } @@ -656,15 +714,34 @@ void HybridAggregator::mergeNormal(Table & dst, const std::vector
& src assert(src != nullptr); /// FIXME, batch - src->forEachKeyValue([&](const Table::KeyType & key, auto value) { - auto src_mapped = static_cast(value.getMapped()); - auto emplace_result = dst.emplaceKey(key, /*disable_spill=*/false); - if (emplace_result.hasError()) - throw Exception::createRuntime(emplace_result.errorCode(), emplace_result.errorString()); - - auto dst_mapped = static_cast(emplace_result.getMutableMapped()); - mergeAggregateStates(dst_mapped, src_mapped, &arena); - }); + if (trackingStateCount()) + { + src->forEachKeyValue([&](const Table::KeyType & key, auto value) { + auto src_mapped = static_cast(value.getMapped()); + if (TrackingCount::empty(src_mapped + tracking_count_offset)) + return; + + auto emplace_result = dst.emplaceKey(key, /*disable_spill=*/false); + if (emplace_result.hasError()) + throw Exception::createRuntime(emplace_result.errorCode(), emplace_result.errorString()); + + auto dst_mapped = static_cast(emplace_result.getMutableMapped()); + mergeAggregateStates(dst_mapped, src_mapped, &arena); + TrackingCount::merge(dst_mapped + tracking_count_offset, src_mapped + tracking_count_offset); + }); + } + else + { + src->forEachKeyValue([&](const Table::KeyType & key, auto value) { + auto src_mapped = static_cast(value.getMapped()); + auto emplace_result = dst.emplaceKey(key, /*disable_spill=*/false); + if (emplace_result.hasError()) + throw Exception::createRuntime(emplace_result.errorCode(), emplace_result.errorString()); + + auto dst_mapped = static_cast(emplace_result.getMutableMapped()); + mergeAggregateStates(dst_mapped, src_mapped, &arena); + }); + } } } @@ -699,87 +776,80 @@ void HybridAggregator::mergeUpdates( template void HybridAggregator::mergeRetracts( - Table & dst, - Table * dst_retracts, - const std::vector
& srcs, - const std::vector
& src_updates, - const std::vector
& src_retracts, - Arena & arena) const + Table & dst, Table * dst_retracts, const std::vector
& srcs, const std::vector
& src_retracts, Arena & arena) const { - /// First, merge all updated retracts to dst_retracts and save new retracts - for (size_t i = 0, num_srcs = srcs.size(); i < num_srcs; ++i) + chassert(dst_retracts); + /// First, collect all retracted keys (including new keys) to dst_retracts + /// For example: + /// (thread) (thread-2) (thread-3) + /// key-1 retract non-retract non-retract + /// key-2 non-retract retract non-retract + /// key-3 non-retract non-retract non-retract + /// The collected keys: [key-1, key-2] + for (auto * src_retract : src_retracts) { - assert(srcs[i]); - if (!src_retracts[i]) + if (!src_retract) continue; - src_updates[i]->forEachKey([&](const KeyGetter::KeyType & key) { - auto find_result = srcs[i]->findKey(key, /*disable_spill=*/true); - if (find_result.hasError()) - throw Exception::createRuntime(find_result.errcode, find_result.errorString()); - - if (!find_result.isFound()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Updated key is not found in source hash table"); - - auto src_retracts_emplace_result = src_retracts[i]->emplaceKey(key, /*disable_spill=*/true); - if (src_retracts_emplace_result.hasError()) - throw Exception::createRuntime(src_retracts_emplace_result.errorCode(), src_retracts_emplace_result.errorString()); - - auto src_place = static_cast(find_result.getMapped()); - auto src_retract = static_cast(src_retracts_emplace_result.getMutableMapped()); - if (src_retracts_emplace_result.isInserted()) - { - /// If retract is empty, save the aggregate state first for next retract - mergeAggregateStates(src_retract, src_place, &arena); - } - else - { - auto dst_retracts_emplace_result = dst_retracts->emplaceKey(key, /*disable_spill=*/true); - if (dst_retracts_emplace_result.hasError()) - throw Exception::createRuntime(dst_retracts_emplace_result.errorCode(), dst_retracts_emplace_result.errorString()); - - auto dst_place = static_cast(dst_retracts_emplace_result.getMutableMapped()); - mergeAggregateStates(dst_place, src_retract, &arena); - - /// After merge src_retract, override it with current aggregate states - destroyAggregateStates(src_retract); - std::memset(src_retract, 0, total_size_of_aggregate_states); - createAggregateStates(src_retract); - mergeAggregateStates(src_retract, src_place, &arena); - } + src_retract->forEachKey([&](const KeyGetter::KeyType & key) { + auto dst_retracts_emplace_result = dst_retracts->emplaceKey(key, /*disable_spill=*/false); + if (dst_retracts_emplace_result.hasError()) + throw Exception::createRuntime(dst_retracts_emplace_result.errorCode(), dst_retracts_emplace_result.errorString()); }); - - srcs[i]->spillIfNecessary(); - src_retracts[i]->spillIfNecessary(); - dst_retracts->spillIfNecessary(); } - /// Second, merge all current aggregate states to dst - for (size_t i = 0, num_srcs = srcs.size(); i < num_srcs; ++i) - { - if (!src_updates[i]) - continue; + /// Second, merge current/retracted aggregate state of collected keys to dst/dst_retracts + /// A special case: If there is no collected key, it means it is the first conversion (Retract is not enabled yet), + /// merge all current state as normal + if (dst_retracts->empty()) + return mergeNormal(dst, srcs, arena); + + dst_retracts->forEachKeyValue([&](const KeyGetter::KeyType & key, auto retract_value) { + for (auto [src, src_retract] : std::views::zip(srcs, src_retracts)) + { + if (!src) + continue; - src_updates[i]->forEachKey([&](const KeyGetter::KeyType & key) { - auto find_result = srcs[i]->findKey(key, /*disable_spill=*/true); + auto find_result = src->findKey(key, /*disable_spill=*/false); if (find_result.hasError()) throw Exception::createRuntime(find_result.errcode, find_result.errorString()); if (!find_result.isFound()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Updates key is not found in source table"); + continue; - auto emplace_result = dst.emplaceKey(key, /*disable_spill=*/true); + /// 1) Merge current state to dst + auto emplace_result = dst.emplaceKey(key, /*disable_spill=*/false); if (emplace_result.hasError()) throw Exception::createRuntime(emplace_result.errorCode(), emplace_result.errorString()); auto src_mapped = static_cast(find_result.getMapped()); auto dst_mapped = static_cast(emplace_result.getMutableMapped()); mergeAggregateStates(dst_mapped, src_mapped, &arena); - }); + TrackingCount::merge(dst_mapped + tracking_count_offset, src_mapped + tracking_count_offset); - srcs[i]->spillIfNecessary(); - dst.spillIfNecessary(); - } + /// 2) Merge retracted state to dst_retracts + /// 2.1) The key was updated in some shard (has retract data), merge with the retracted state + auto dst_retract = static_cast(retract_value.getMutableMapped()); + if (src_retract) + { + auto retract_find_result = src_retract->findKey(key, /*disable_spill=*/false); + if (retract_find_result.hasError()) + throw Exception::createRuntime(retract_find_result.errcode, retract_find_result.errorString()); + + if (retract_find_result.isFound()) + { + auto src_retract_mapped = static_cast(retract_find_result.getMapped()); + mergeAggregateStates(dst_retract, src_retract_mapped, &arena); + TrackingCount::merge(dst_retract + tracking_count_offset, src_retract_mapped + tracking_count_offset); + return; + } + } + + /// 2.2) The key is not found in some shards (no retract data), merge with current state + mergeAggregateStates(dst_retract, src_mapped, &arena); + TrackingCount::merge(dst_retract + tracking_count_offset, src_mapped + tracking_count_offset); + } + }); } } diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp index b113c2366b..2c15eb23c9 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp @@ -73,7 +73,7 @@ std::pair HybridAggregator::doExecuteOnBlock( } case HybridHashType::WithoutKey: { - need_finalization = executeWithoutKeyImpl(result, row_begin, row_end, aggregate_functions_instructions.data()); + need_finalization = executeWithoutKeyImpl(result, row_begin, row_end, aggregate_functions_instructions.data(), tracking_retracts); break; } diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_WithoutKey.cpp b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_WithoutKey.cpp index 06ef78454a..c88eeb3839 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_WithoutKey.cpp +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_WithoutKey.cpp @@ -1,14 +1,29 @@ -#include #include +#include + +#include namespace DB::Streaming { [[nodiscard]] bool HybridAggregator::executeWithoutKeyImpl( - HybridAggregatedDataVariants & result, size_t row_begin, size_t row_end, AggregateFunctionInstruction * aggregate_instructions) const + HybridAggregatedDataVariants & result, + size_t row_begin, + size_t row_end, + AggregateFunctionInstruction * aggregate_instructions, + bool tracking_retracts) const { auto * place = result.without_key.get(); + /// Save current state for retraction before adding values + if (tracking_retracts && !result.without_key_retracts) + { + result.initWithoutKeyRetractStates(total_size_of_aggregate_states, align_aggregate_states); + auto * dst_place = result.without_key_retracts.get(); + copyAggregateStates(dst_place, place, /*arena=*/nullptr); + TrackingCount::merge(dst_place + tracking_count_offset, place + tracking_count_offset); + } + /// Adding values bool should_finalize = false; for (size_t i = 0, func_size = aggregate_functions.size(); i < func_size; ++i) @@ -43,49 +58,23 @@ namespace DB::Streaming } } + if (trackingStateCount()) + TrackingCount::addBatchSinglePlace( + row_begin, row_end, place + tracking_count_offset, aggregate_instructions ? aggregate_instructions->delta_column : nullptr); + return should_finalize; } /// \param merged_variants if \param data_variants is merged which means it is temporary used to convert to blocks -BlocksList -HybridAggregator::convertToBlocksWithoutKey(HybridAggregatedDataVariants & data_variants, bool merged_variants, bool final_) const +BlocksList HybridAggregator::convertToBlocksWithoutKey(HybridAggregatedDataVariants & data_variants, bool final_) const { if (params->tracking_updates_type == TrackingUpdatesType::UpdatesWithRetract) - { - if (merged_variants) - return convertToBlocksWithoutKeyForRetractsMerged(data_variants, final_); - else - return convertToBlocksWithoutKeyForRetracts(data_variants, final_); - } + return convertToBlocksWithoutKeyForRetracts(data_variants, final_); auto res_header = params->getHeader(input_header, final_); return BlocksList{doConvertOnePlace(data_variants.without_key.get(), res_header, final_)}; } -BlocksList HybridAggregator::convertToBlocksWithoutKeyForRetractsMerged(HybridAggregatedDataVariants & data_variants, bool final_) const -{ - assert(params->tracking_updates_type == TrackingUpdatesType::UpdatesWithRetract); - - auto res_header = params->getHeader(input_header, final_); - BlocksList blocks; - if (data_variants.without_key_retracts) - { - blocks.push_back(doConvertOnePlace(data_variants.without_key_retracts.get(), res_header, final_)); - auto retract_delta_col = ColumnInt8::create(blocks.back().rows(), static_cast(-1)); - auto delta_col_type = DataTypeFactory::instance().get(TypeIndex::Int8); - blocks.back().insert(ColumnWithTypeAndName{std::move(retract_delta_col), std::move(delta_col_type), "_tp_delta"}); - } - - blocks.push_back(doConvertOnePlace(data_variants.without_key.get(), res_header, final_)); - { - auto delta_col = ColumnInt8::create(blocks.back().rows(), static_cast(1)); - auto delta_col_type = DataTypeFactory::instance().get(TypeIndex::Int8); - blocks.back().insert(ColumnWithTypeAndName{std::move(delta_col), std::move(delta_col_type), "_tp_delta"}); - } - - return blocks; -} - BlocksList HybridAggregator::convertToBlocksWithoutKeyForRetracts(HybridAggregatedDataVariants & data_variants, bool final_) const { assert(params->tracking_updates_type == TrackingUpdatesType::UpdatesWithRetract); @@ -98,16 +87,7 @@ BlocksList HybridAggregator::convertToBlocksWithoutKeyForRetracts(HybridAggregat auto retract_delta_col = ColumnInt8::create(blocks.back().rows(), static_cast(-1)); auto delta_col_type = DataTypeFactory::instance().get(TypeIndex::Int8); blocks.back().insert(ColumnWithTypeAndName{std::move(retract_delta_col), std::move(delta_col_type), "_tp_delta"}); - /// overwrite with current aggregate states - destroyAggregateStates(data_variants.without_key_retracts.get()); - createAggregateStates(data_variants.without_key_retracts.get()); - mergeAggregateStates(data_variants.without_key_retracts.get(), data_variants.without_key.get(), /*arena=*/nullptr); - } - else - { - /// Save the aggregate states for retraction - data_variants.initWithoutKeyRetractStates(total_size_of_aggregate_states, align_aggregate_states); - mergeAggregateStates(data_variants.without_key_retracts.get(), data_variants.without_key.get(), /*arena=*/nullptr); + data_variants.resetRetractWithoutKey(); } blocks.push_back(doConvertOnePlace(data_variants.without_key.get(), res_header, final_)); diff --git a/tests/queries_ported/0_stateless/99103_aggr_with_emit_changelog.reference b/tests/queries_ported/0_stateless/99103_aggr_with_emit_changelog.reference index 86dea20ec1..41d55c852d 100644 --- a/tests/queries_ported/0_stateless/99103_aggr_with_emit_changelog.reference +++ b/tests/queries_ported/0_stateless/99103_aggr_with_emit_changelog.reference @@ -27,5 +27,16 @@ === Multi-shard aggr with emit changelog results === >>> Pure memory aggr without key: ❌ (FIXME) >>> Pure memory aggr: ❌ (FIXME) ->>> Hybrid aggr without key: ❌ (FIXME) ->>> Hybrid aggr: ❌ (FIXME) +>>> Hybrid aggr without key: +1 1 1 1 +1 1 1 -1 +1 1 2 1 +1 1 2 -1 +1 2 5 1 +>>> Hybrid aggr: +1 k1 k1 1 1 +1 k1 k1 1 -1 +1 k1 k2 2 1 +1 k1 k2 2 -1 +1 k1 k3 3 1 +2 t1 t2 2 1 diff --git a/tests/queries_ported/0_stateless/99103_aggr_with_emit_changelog.sql b/tests/queries_ported/0_stateless/99103_aggr_with_emit_changelog.sql index f6e5cf2a85..b60fea3b4e 100644 --- a/tests/queries_ported/0_stateless/99103_aggr_with_emit_changelog.sql +++ b/tests/queries_ported/0_stateless/99103_aggr_with_emit_changelog.sql @@ -43,47 +43,47 @@ select * except _tp_time from table(99103_mv4) order by _tp_sn, i; select ''; select '=== Multi-shard aggr with emit changelog results ==='; --- drop view if exists 99103_mv; --- drop view if exists 99103_mv2; --- drop view if exists 99103_mv3; --- drop view if exists 99103_mv4; --- drop stream if exists 99103_stream; --- create stream 99103_stream(i int, k string) settings shards=3, flush_threshold_count=1; +drop view if exists 99103_mv; +drop view if exists 99103_mv2; +drop view if exists 99103_mv3; +drop view if exists 99103_mv4; +drop stream if exists 99103_stream; +create stream 99103_stream(i int, k string) settings shards=3, flush_threshold_count=1; -- create materialized view 99103_mv as select min(i), max(i), count(), _tp_delta from 99103_stream emit changelog periodic 3s STORAGE_SETTINGS flush_threshold_count=1; -- create materialized view 99103_mv2 as select i, min(k), max(k), count(), _tp_delta from 99103_stream group by i emit changelog periodic 3s STORAGE_SETTINGS flush_threshold_count=1; --- create materialized view 99103_mv3 as select min(i), max(i), count(), _tp_delta from 99103_stream emit changelog periodic 3s settings default_hash_table='hybrid',max_hot_keys=2 STORAGE_SETTINGS flush_threshold_count=1; --- create materialized view 99103_mv4 as select i, min(k), max(k), count(), _tp_delta from 99103_stream group by i emit changelog periodic 3s settings default_hash_table='hybrid',max_hot_keys=2 STORAGE_SETTINGS flush_threshold_count=1; +create materialized view 99103_mv3 as select min(i), max(i), count(), _tp_delta from 99103_stream emit changelog periodic 3s settings default_hash_table='hybrid',max_hot_keys=2 STORAGE_SETTINGS flush_threshold_count=1; +create materialized view 99103_mv4 as select i, min(k), max(k), count(), _tp_delta from 99103_stream group by i emit changelog periodic 3s settings default_hash_table='hybrid',max_hot_keys=2 STORAGE_SETTINGS flush_threshold_count=1; --- select sleep(1) format Null; --- insert into 99103_stream(i, k) values (1, 'k1'); --- select sleep(3) format Null; --- insert into 99103_stream(i, k) values (1, 'k2'); +select sleep(1) format Null; +insert into 99103_stream(i, k) values (1, 'k1'); +select sleep(3) format Null; +insert into 99103_stream(i, k) values (1, 'k2'); -- Validate checkpoint -- system pause materialized view 99103_mv; -- system pause materialized view 99103_mv2; --- system pause materialized view 99103_mv3; --- system pause materialized view 99103_mv4; +system pause materialized view 99103_mv3; +system pause materialized view 99103_mv4; -- system resume materialized view 99103_mv; -- system resume materialized view 99103_mv2; --- system resume materialized view 99103_mv3; --- system resume materialized view 99103_mv4; --- select sleep(1) format Null; --- select sleep(3) format Null; --- insert into 99103_stream(i, k) values (1, 'k3')(2, 't1')(2, 't2'); --- select sleep(3) format Null; --- select sleep(1) format Null; +system resume materialized view 99103_mv3; +system resume materialized view 99103_mv4; +select sleep(1) format Null; +select sleep(3) format Null; +insert into 99103_stream(i, k) values (1, 'k3')(2, 't1')(2, 't2'); +select sleep(3) format Null; +select sleep(1) format Null; select '>>> Pure memory aggr without key: ❌ (FIXME)'; -- select * except _tp_time from table(99103_mv) order by _tp_sn; --- bug issue: https://github.com/timeplus-io/proton-enterprise/issues/10137 select '>>> Pure memory aggr: ❌ (FIXME)'; -- select * except _tp_time from table(99103_mv2) order by _tp_sn, i; -select '>>> Hybrid aggr without key: ❌ (FIXME)'; --- select * except _tp_time from table(99103_mv3) order by _tp_sn; -select '>>> Hybrid aggr: ❌ (FIXME)'; --- select * except _tp_time from table(99103_mv4) order by _tp_sn, i; +select '>>> Hybrid aggr without key:'; +select * except _tp_time from table(99103_mv3) order by _tp_sn; +select '>>> Hybrid aggr:'; +select * except _tp_time from table(99103_mv4) order by _tp_sn, i; -drop view if exists 99103_mv; -drop view if exists 99103_mv2; +-- drop view if exists 99103_mv; +-- drop view if exists 99103_mv2; drop view if exists 99103_mv3; drop view if exists 99103_mv4; drop stream if exists 99103_stream; From b36a104d5f02129fc0525cc011a3f4a7558d4e22 Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Fri, 24 Oct 2025 04:27:37 +0800 Subject: [PATCH 10/11] remove updates table for emit changelog --- .../HybridAggregator/HybridAggregator.h | 2 +- .../HybridAggregator_Convert.h | 62 +++++++++++-------- .../HybridAggregator_Execute.cpp | 17 +++-- .../HybridAggregator_States.cpp | 7 ++- 4 files changed, 49 insertions(+), 39 deletions(-) diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h index c5685839da..4ae93af3c6 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator.h @@ -157,7 +157,7 @@ class HybridAggregator final : public IAggregator BlocksList convertToBlocksForUpdates(Table & table, Table * updates, bool clear_updates) const; template - BlocksList convertToBlocksForRetracts(Table & table, Table * retracts, Table * updates) const; + BlocksList convertToBlocksForRetracts(Table & table, Table * retracts) const; template BlocksList parallelConvertToBlocks( diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Convert.h b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Convert.h index 087bb47999..7430ea5bb3 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Convert.h +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Convert.h @@ -135,7 +135,7 @@ ALWAYS_INLINE BlocksList HybridAggregator::convertToBlocksImpl(Table & table, Ta case TrackingUpdatesType::Updates: return convertToBlocksForUpdates(table, updates, clear_updates); case TrackingUpdatesType::UpdatesWithRetract: - return convertToBlocksForRetracts(table, updates, retracts); + return convertToBlocksForRetracts(table, retracts); case TrackingUpdatesType::None: return convertToBlocksForAll(table); } @@ -353,9 +353,23 @@ BlocksList HybridAggregator::convertToBlocksForUpdates(Table & table, Table * up } template -BlocksList HybridAggregator::convertToBlocksForRetracts(Table & table, Table * updates, Table * retracts) const +BlocksList HybridAggregator::convertToBlocksForRetracts(Table & table, Table * retracts) const { - assert(updates && retracts); + chassert(retracts); + + /// A special case: If there is no retract key, it means it is the first conversion (Retract is not enabled yet), + /// convert all current state as normal + if (retracts->empty()) + { + auto blocks = convertToBlocksForAll(table); + for (auto & block : blocks) + { + auto delta_col = ColumnInt8::create(block.rows(), static_cast(1)); + auto delta_col_type = DataTypeFactory::instance().get(TypeIndex::Int8); + block.insert(ColumnWithTypeAndName{std::move(delta_col), delta_col_type, ProtonConsts::RESERVED_DELTA_FLAG}); + } + return blocks; + } auto rows = table.approximateCount(); /// +1 for nullKeyData, if `data` doesn't have it - not a problem, just some memory for one excessive row will be preallocated @@ -389,7 +403,7 @@ BlocksList HybridAggregator::convertToBlocksForRetracts(Table & table, Table * u BlocksList blocks; - auto do_retract = [&](const KeyGetter::KeyType & key) { + auto do_retract = [&](const KeyGetter::KeyType & key, auto & retract_value) { auto find_result = table.findKey(key, /*disable_spill=*/true); if (find_result.hasError()) throw Exception::createRuntime(find_result.errcode, find_result.errorString()); @@ -397,19 +411,12 @@ BlocksList HybridAggregator::convertToBlocksForRetracts(Table & table, Table * u if (!find_result.isFound()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Updated key is not found in source hash table"); - auto retracts_find_result = retracts->findKey(key, /*disable_spill=*/true); - if (retracts_find_result.hasError()) - throw Exception::createRuntime(retracts_find_result.errcode, retracts_find_result.errorString()); - - if (retracts_find_result.isFound()) + auto retract = static_cast(retract_value.getMapped()); + if (!TrackingCount::empty(retract + tracking_count_offset)) [[likely]] { - auto retract = static_cast(retracts_find_result.getMapped()); - if (!TrackingCount::empty(retract + tracking_count_offset)) [[likely]] - { - /// Retract row - KeyGetter::insertKeyIntoColumns(key, retract_out_cols.raw_key_columns, key_sizes_ref); - retract_places.push_back(retract); - } + /// Retract row + KeyGetter::insertKeyIntoColumns(key, retract_out_cols.raw_key_columns, key_sizes_ref); + retract_places.push_back(retract); } /// return source mapped value @@ -421,19 +428,19 @@ BlocksList HybridAggregator::convertToBlocksForRetracts(Table & table, Table * u { blocks.emplace_back(insertResultsIntoColumns(retract_places, std::move(retract_out_cols), /*arena=*/nullptr)); auto retract_delta_col = ColumnInt8::create(retract_places.size(), static_cast(-1)); - blocks.back().insert(ColumnWithTypeAndName{std::move(retract_delta_col), delta_col_type, "_tp_delta"}); + blocks.back().insert(ColumnWithTypeAndName{std::move(retract_delta_col), delta_col_type, ProtonConsts::RESERVED_DELTA_FLAG}); blocks.back().setRetract(); } blocks.emplace_back(insertResultsIntoColumns(places, std::move(out_cols), /*arena=*/nullptr)); auto delta_col = ColumnInt8::create(places.size(), static_cast(1)); - blocks.back().insert(ColumnWithTypeAndName{std::move(delta_col), delta_col_type, "_tp_delta"}); + blocks.back().insert(ColumnWithTypeAndName{std::move(delta_col), delta_col_type, ProtonConsts::RESERVED_DELTA_FLAG}); table.spillIfNecessary(places.size()); }; - auto insert_columns = [&](const KeyGetter::KeyType & key) { - auto place = do_retract(key); + auto insert_columns = [&](const KeyGetter::KeyType & key, auto retract_value, bool flush) { + auto place = do_retract(key, retract_value); if (!TrackingCount::empty(place + tracking_count_offset)) [[likely]] { /// Regular row @@ -447,22 +454,23 @@ BlocksList HybridAggregator::convertToBlocksForRetracts(Table & table, Table * u } /// If reached max block size, finalize the block and start a new one - if (places.size() >= max_block_size) + if (flush || places.size() >= max_block_size) { do_insert_columns(); init_out_cols(); } }; - auto errcode = updates->forEachKey(insert_columns); + auto done_callback = [&]() { + if (!places.empty()) + do_insert_columns(); + }; + + auto errcode = retracts->forBatchValue(max_block_size, insert_columns, done_callback); if (errcode != ErrorCodes::OK) throw Exception(errcode, "Failed to convert aggregate states to blocks, error_message'{}'", ErrorCodes::getName(errcode)); - if (!places.empty()) - do_insert_columns(); - - /// After conversion, we need clear updates and retracts for next round of emit - updates->clear(); + /// After conversion, we need clear retracts for next round of emit retracts->clear(); return blocks; diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp index 2c15eb23c9..8dc2f59c1f 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_Execute.cpp @@ -158,15 +158,6 @@ template for (size_t row = row_begin; auto & emplace_result : emplace_results.results) places[row++] = static_cast(emplace_result.getMutableMapped()); - if (updates) - { - auto updates_emplace_results = new_keys ? updates->emplaceNewKeys(keys) : updates->emplaceKeys(keys); - if (updates_emplace_results.hasError()) - throw Exception::createRuntime(updates_emplace_results.errorCode(), updates_emplace_results.errorString()); - - assert(updates_emplace_results.results.size() == rows); - } - if (retracts) { auto retracts_emplace_results = new_keys ? retracts->emplaceNewKeys(keys) : retracts->emplaceKeys(keys); @@ -187,6 +178,14 @@ template } } } + else if (updates) + { + auto updates_emplace_results = new_keys ? updates->emplaceNewKeys(keys) : updates->emplaceKeys(keys); + if (updates_emplace_results.hasError()) + throw Exception::createRuntime(updates_emplace_results.errorCode(), updates_emplace_results.errorString()); + + chassert(updates_emplace_results.results.size() == rows); + } } #if USE_EMBEDDED_COMPILER diff --git a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_States.cpp b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_States.cpp index 51eb9b76ff..c5cd4848f0 100644 --- a/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_States.cpp +++ b/src/Interpreters/Streaming/Aggregator/HybridAggregator/HybridAggregator_States.cpp @@ -46,16 +46,19 @@ void HybridAggregator::initStates(HybridAggregatedDataVariants & result) const { case TrackingUpdatesType::UpdatesWithRetract: { + init_table(result.table, "main"); init_table(result.retracts, "retracts"); - [[fallthrough]]; + break; } case TrackingUpdatesType::Updates: { + init_table(result.table, "main"); + auto config = getSubConfig(result.getID(), "changes"); config.installNoOpCallbacks(); config.validate(); result.updates.init(method_chosen, std::move(config), result.key_sizes, logger, bucket_key_offset); - [[fallthrough]]; + break; } case TrackingUpdatesType::None: { From d809be0ee39ee09d5680ab0eb19d1ad696494583 Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Mon, 27 Oct 2025 17:04:28 +0800 Subject: [PATCH 11/11] fix grok pattern - TIME --- programs/server/grok-patterns | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/programs/server/grok-patterns b/programs/server/grok-patterns index 4af1229176..b2a74b2cde 100644 --- a/programs/server/grok-patterns +++ b/programs/server/grok-patterns @@ -59,7 +59,7 @@ HOUR (?:2[0123]|[01]?[0-9]) MINUTE (?:[0-5][0-9]) # '60' is a leap second in most time standards and thus is valid. SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?) -TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) +TIME %{HOUR}:%{MINUTE}(?::%{SECOND}) # datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it) DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR} DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}