From e93c53e53cceb154f2ec580a379623a4b6f1e212 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Tue, 28 Oct 2025 16:16:13 +0300 Subject: [PATCH 1/7] fix(tiering): Basic stash backpressure --- src/server/string_family.cc | 14 ++++++++++++-- src/server/tiered_storage.cc | 37 +++++++++++++++++++++++++++++++----- src/server/tiered_storage.h | 7 +++++-- 3 files changed, 49 insertions(+), 9 deletions(-) diff --git a/src/server/string_family.cc b/src/server/string_family.cc index a80c8af8ef7e..66e76e2a969f 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -84,6 +84,7 @@ class SetCmd { uint32_t memcache_flags = 0; uint64_t expire_after_ms = 0; // Relative value based on now. 0 means no expiration. optional* prev_val = nullptr; // if set, previous value will be stored if found + optional>* backpressure = nullptr; constexpr bool IsConditionalSet() const { return flags & SET_IF_NOTEXIST || flags & SET_IF_EXISTS; @@ -935,8 +936,11 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string EngineShard* shard = op_args_.shard; // Currently we always try to offload, but Stash may ignore it, if disk I/O is overloaded. - if (auto* ts = shard->tiered_storage(); ts) - ts->TryStash(op_args_.db_cntx.db_index, key, pv); + if (auto* ts = shard->tiered_storage(); ts) { + auto bp = ts->TryStash(op_args_.db_cntx.db_index, key, pv); + if (bp && params.backpressure) + *params.backpressure = std::move(*bp); + } if (explicit_journal_ && op_args_.shard->journal()) { RecordJournal(params, key, value); @@ -1056,11 +1060,17 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) { if (sparams.flags & SetCmd::SET_GET) sparams.prev_val = &prev; + optional> backpressure; + sparams.backpressure = &backpressure; + OpStatus result = SetGeneric(sparams, key, value, cmnd_cntx); if (result == OpStatus::WRONG_TYPE) { return builder->SendError(kWrongTypeErr); } + if (backpressure) + std::move(backpressure)->GetFor(100ms); + if (sparams.flags & SetCmd::SET_GET) { return GetReplies{cmnd_cntx.rb}.Send(std::move(prev)); } diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index b1af91ff80b5..f3adcec14e49 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -85,6 +85,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Clear IO pending flag for entry void ClearIoPending(OpManager::KeyRef key) { + FlagBackpressure(key, false); if (auto pv = Find(key); pv) { pv->SetStashPending(false); stats_.total_cancels++; @@ -143,6 +144,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Find entry by key in db_slice and store external segment in place of original value. // Update memory stats void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) { + FlagBackpressure(key, true); if (auto* pv = Find(key); pv) { auto* stats = GetDbTableStats(key.first); @@ -169,6 +171,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { SetExternal({sub_dbid, sub_key}, sub_segment); } + void FlagBackpressure(OpManager::KeyRef id, bool result) { + const auto& [dbid, key] = id; + if (auto it = ts_->backpressure_.find({dbid, string{key}}); it != ts_->backpressure_.end()) { + it->second.Resolve(result); + ts_->backpressure_.erase(it); + } + } + struct { uint64_t total_stashes = 0, total_cancels = 0, total_fetches = 0; uint64_t total_defrags = 0; @@ -309,6 +319,8 @@ error_code TieredStorage::Open(string_view base_path) { } void TieredStorage::Close() { + for (auto& [_, f] : backpressure_) + f.Resolve(false); op_manager_->Close(); } @@ -350,9 +362,10 @@ template TieredStorage::TResult TieredStorage::Modify( DbIndex dbid, std::string_view key, const PrimeValue& value, std::function modf); -bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) { +std::optional> TieredStorage::TryStash(DbIndex dbid, string_view key, + PrimeValue* value) { if (!ShouldStash(*value)) - return false; + return {}; // This invariant should always hold because ShouldStash tests for IoPending flag. CHECK(!bins_->IsPending(dbid, key)); @@ -361,7 +374,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) { // with a lot of underutilized disk space. if (op_manager_->GetStats().pending_stash_cnt >= config_.write_depth_limit) { ++stats_.stash_overflow_cnt; - return false; + return {}; } StringOrView raw_string = value->GetRawString(); @@ -375,15 +388,24 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) { } else if (auto bin = bins_->Stash(dbid, key, raw_string.view()); bin) { id = bin->first; ec = op_manager_->Stash(id, bin->second); + } else { + return {}; // Silently added to bin } if (ec) { LOG_IF(ERROR, ec != errc::file_too_large) << "Stash failed immediately" << ec.message(); visit([this](auto id) { op_manager_->ClearIoPending(id); }, id); - return false; + return {}; + } + + // Throttle stashes over the offload boundary + if (ShouldOffload()) { + util::fb2::Future fut; + backpressure_[{dbid, string{key}}] = fut; + return fut; } - return true; + return {}; } void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) { @@ -405,6 +427,11 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) { void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) { DCHECK(value->HasStashPending()); + + // If any previous write was happening, it has been cancelled + if (auto node = backpressure_.extract({dbid, string{key}}); !node.empty()) + std::move(node.mapped()).Resolve(false); + if (OccupiesWholePages(value->Size())) { op_manager_->Delete(KeyRef(dbid, key)); } else if (auto bin = bins_->Delete(dbid, key); bin) { diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 1d79ee3c5bdd..41457badf48f 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -60,8 +60,9 @@ class TieredStorage { std::function modf); // Stash value. Sets IO_PENDING flag and unsets it on error or when finished - // Returns true if item was scheduled for stashing. - bool TryStash(DbIndex dbid, std::string_view key, PrimeValue* value); + // Returns opional backpressure. + std::optional> TryStash(DbIndex dbid, std::string_view key, + PrimeValue* value); // Delete value, must be offloaded (external type) void Delete(DbIndex dbid, PrimeValue* value); @@ -105,6 +106,8 @@ class TieredStorage { PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off + absl::flat_hash_map, ::util::fb2::Future> backpressure_; + std::unique_ptr op_manager_; std::unique_ptr bins_; From 4f1283f07a82c5b93c1d158f669d64a4628b9baa Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Wed, 29 Oct 2025 10:32:15 +0300 Subject: [PATCH 2/7] fixes Signed-off-by: Vladislav Oleshko --- src/server/string_family.cc | 9 +++++--- src/server/tiered_storage.cc | 22 +++++++------------ src/server/tiered_storage.h | 10 +++++---- src/server/tiering/entry_map.h | 37 ++++++++++++++++++++++++++++++++ src/server/tiering/small_bins.cc | 7 ++---- src/server/tiering/small_bins.h | 7 +++--- 6 files changed, 62 insertions(+), 30 deletions(-) create mode 100644 src/server/tiering/entry_map.h diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 66e76e2a969f..9c77df046c0d 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -936,8 +936,9 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string EngineShard* shard = op_args_.shard; // Currently we always try to offload, but Stash may ignore it, if disk I/O is overloaded. + // If we are beyound the offloading threshold, TryStash might return a backpressure future. if (auto* ts = shard->tiered_storage(); ts) { - auto bp = ts->TryStash(op_args_.db_cntx.db_index, key, pv); + auto bp = ts->TryStash(op_args_.db_cntx.db_index, key, pv, true); if (bp && params.backpressure) *params.backpressure = std::move(*bp); } @@ -1068,8 +1069,10 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) { return builder->SendError(kWrongTypeErr); } - if (backpressure) - std::move(backpressure)->GetFor(100ms); + // If backpressure was provided, wait with reasonable limit (to avoid client deadlocking). + if (backpressure) { + std::move(backpressure)->GetFor(10ms); + } if (sparams.flags & SetCmd::SET_GET) { return GetReplies{cmnd_cntx.rb}.Send(std::move(prev)); diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index f3adcec14e49..e6230b84f2f1 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -172,11 +172,8 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { } void FlagBackpressure(OpManager::KeyRef id, bool result) { - const auto& [dbid, key] = id; - if (auto it = ts_->backpressure_.find({dbid, string{key}}); it != ts_->backpressure_.end()) { - it->second.Resolve(result); - ts_->backpressure_.erase(it); - } + if (auto node = ts_->stash_backpressure_.extract(id); !node.empty()) + node.mapped().Resolve(result); } struct { @@ -319,7 +316,7 @@ error_code TieredStorage::Open(string_view base_path) { } void TieredStorage::Close() { - for (auto& [_, f] : backpressure_) + for (auto& [_, f] : stash_backpressure_) f.Resolve(false); op_manager_->Close(); } @@ -363,7 +360,7 @@ template TieredStorage::TResult TieredStorage::Modify( std::function modf); std::optional> TieredStorage::TryStash(DbIndex dbid, string_view key, - PrimeValue* value) { + PrimeValue* value, bool provide_bp) { if (!ShouldStash(*value)) return {}; @@ -398,12 +395,9 @@ std::optional> TieredStorage::TryStash(DbIndex dbid, str return {}; } - // Throttle stashes over the offload boundary - if (ShouldOffload()) { - util::fb2::Future fut; - backpressure_[{dbid, string{key}}] = fut; - return fut; - } + // If we are in the active offloading phase, throttle stashes by providing backpressure future + if (provide_bp && ShouldOffload()) + return stash_backpressure_[{dbid, string{key}}]; return {}; } @@ -429,7 +423,7 @@ void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* DCHECK(value->HasStashPending()); // If any previous write was happening, it has been cancelled - if (auto node = backpressure_.extract({dbid, string{key}}); !node.empty()) + if (auto node = stash_backpressure_.extract(make_pair(dbid, key)); !node.empty()) std::move(node.mapped()).Resolve(false); if (OccupiesWholePages(value->Size())) { diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 41457badf48f..744d4cb8eeb9 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -11,6 +11,7 @@ #include "server/common.h" #include "server/table.h" #include "server/tiering/common.h" +#include "server/tiering/entry_map.h" #include "server/tx_base.h" #include "util/fibers/future.h" @@ -59,10 +60,10 @@ class TieredStorage { TResult Modify(DbIndex dbid, std::string_view key, const PrimeValue& value, std::function modf); - // Stash value. Sets IO_PENDING flag and unsets it on error or when finished - // Returns opional backpressure. + // Stash value. Sets IO_PENDING flag and unsets it on error or when finished. + // Returns optional future for backpressure if `provide_bp` is set and conditons are met. std::optional> TryStash(DbIndex dbid, std::string_view key, - PrimeValue* value); + PrimeValue* value, bool provide_bp = false); // Delete value, must be offloaded (external type) void Delete(DbIndex dbid, PrimeValue* value); @@ -106,7 +107,8 @@ class TieredStorage { PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off - absl::flat_hash_map, ::util::fb2::Future> backpressure_; + // Stash operations waiting for completion to throttle + tiering::EntryMap<::util::fb2::Future> stash_backpressure_; std::unique_ptr op_manager_; std::unique_ptr bins_; diff --git a/src/server/tiering/entry_map.h b/src/server/tiering/entry_map.h new file mode 100644 index 000000000000..b91f03150190 --- /dev/null +++ b/src/server/tiering/entry_map.h @@ -0,0 +1,37 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include + +#include "server/tx_base.h" + +namespace dfly::tiering { + +namespace detail { +struct Hasher { + using is_transparent = void; + template size_t operator()(const std::pair& p) const { + return absl::Hash(p)(); + } +}; + +struct Eq { + using is_transparent = void; + template + bool operator()(const std::pair& l, const std::pair& r) const { + return l == r; + } +}; +} // namespace detail + +// Map of key (db index, string key) -> T with heterogeneous lookup +template +using EntryMap = + absl::flat_hash_map, T, detail::Hasher, detail::Eq>; + +} // namespace dfly::tiering diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index 50a800186985..df1dd8a33c64 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -122,10 +122,7 @@ std::vector> SmallBins::ReportStashAborted(BinId } std::optional SmallBins::Delete(DbIndex dbid, std::string_view key) { - std::pair key_pair{dbid, key}; - auto it = current_bin_.find(key_pair); - - if (it != current_bin_.end()) { + if (auto it = current_bin_.find(make_pair(dbid, key)); it != current_bin_.end()) { size_t stashed_size = StashedValueSize(it->second); DCHECK_GE(current_bin_bytes_, stashed_size); @@ -135,7 +132,7 @@ std::optional SmallBins::Delete(DbIndex dbid, std::string_view } for (auto& [id, keys] : pending_bins_) { - if (keys.erase(key_pair)) + if (keys.erase(make_pair(dbid, key))) return keys.empty() ? std::make_optional(id) : std::nullopt; } return std::nullopt; diff --git a/src/server/tiering/small_bins.h b/src/server/tiering/small_bins.h index d2f198f6e491..849f1291c9d3 100644 --- a/src/server/tiering/small_bins.h +++ b/src/server/tiering/small_bins.h @@ -11,6 +11,7 @@ #include #include "server/tiering/disk_storage.h" +#include "server/tiering/entry_map.h" namespace dfly::tiering { @@ -84,12 +85,10 @@ class SmallBins { BinId last_bin_id_ = 0; unsigned current_bin_bytes_ = 0; - absl::flat_hash_map, std::string> current_bin_; + tiering::EntryMap current_bin_; // Pending stashes, their keys and value sizes - absl::flat_hash_map /* key*/, DiskSegment>> - pending_bins_; + absl::flat_hash_map> pending_bins_; // Map of bins that were stashed and should be deleted when number of entries reaches 0 absl::flat_hash_map stashed_bins_; From a549dded2238d23f51f2c7640f23c0ed6a24f907 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Wed, 29 Oct 2025 10:54:18 +0300 Subject: [PATCH 3/7] more fixes Signed-off-by: Vladislav Oleshko --- src/server/tiering/entry_map.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/server/tiering/entry_map.h b/src/server/tiering/entry_map.h index b91f03150190..ec8a7feae633 100644 --- a/src/server/tiering/entry_map.h +++ b/src/server/tiering/entry_map.h @@ -16,7 +16,7 @@ namespace detail { struct Hasher { using is_transparent = void; template size_t operator()(const std::pair& p) const { - return absl::Hash(p)(); + return absl::HashOf(p); } }; @@ -24,7 +24,9 @@ struct Eq { using is_transparent = void; template bool operator()(const std::pair& l, const std::pair& r) const { - return l == r; + const auto& [i1, s1] = l; + const auto& [i2, s2] = r; + return i1 == i2 && s1 == s2; } }; } // namespace detail From d44bc8bdcef63706650152ad0d67ee68726fbed7 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Thu, 30 Oct 2025 11:16:38 +0300 Subject: [PATCH 4/7] small fixes --- src/server/tiered_storage.cc | 7 ++++--- tests/dragonfly/tiering_test.py | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index d4b2e69bb497..633246763c0f 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -85,7 +85,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Clear IO pending flag for entry void ClearIoPending(OpManager::KeyRef key) { - FlagBackpressure(key, false); + UnblockBackpressure(key, false); if (auto pv = Find(key); pv) { pv->SetStashPending(false); stats_.total_cancels++; @@ -144,7 +144,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Find entry by key in db_slice and store external segment in place of original value. // Update memory stats void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) { - FlagBackpressure(key, true); + UnblockBackpressure(key, true); if (auto* pv = Find(key); pv) { auto* stats = GetDbTableStats(key.first); @@ -171,7 +171,8 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { SetExternal({sub_dbid, sub_key}, sub_segment); } - void FlagBackpressure(OpManager::KeyRef id, bool result) { + // If any backpressure (throttling) is active, notify that the operation finished + void UnblockBackpressure(OpManager::KeyRef id, bool result) { if (auto node = ts_->stash_backpressure_.extract(id); !node.empty()) node.mapped().Resolve(result); } diff --git a/tests/dragonfly/tiering_test.py b/tests/dragonfly/tiering_test.py index 00111b710374..63cdcc110037 100644 --- a/tests/dragonfly/tiering_test.py +++ b/tests/dragonfly/tiering_test.py @@ -98,9 +98,9 @@ async def run(sub_ops): "tiered_prefix": "/tmp/tiered/backing_master", "maxmemory": "2.0G", "cache_mode": True, - "tiered_offload_threshold": "0.9", + "tiered_offload_threshold": "0.6", "tiered_upload_threshold": "0.2", - "tiered_storage_write_depth": 100, + "tiered_storage_write_depth": 1000, } ) async def test_full_sync(async_client: aioredis.Redis, df_factory: DflyInstanceFactory): @@ -109,7 +109,7 @@ async def test_full_sync(async_client: aioredis.Redis, df_factory: DflyInstanceF cache_mode=True, maxmemory="2.0G", tiered_prefix="/tmp/tiered/backing_replica", - tiered_offload_threshold="0.8", + tiered_offload_threshold="0.5", tiered_storage_write_depth=1000, ) replica.start() From 622705daac7c64227f88c33bcfd3a556e0560459 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 2 Nov 2025 08:05:09 +0200 Subject: [PATCH 5/7] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Roman Gershman --- src/server/string_family.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 9c77df046c0d..3bd825a41944 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -936,7 +936,7 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string EngineShard* shard = op_args_.shard; // Currently we always try to offload, but Stash may ignore it, if disk I/O is overloaded. - // If we are beyound the offloading threshold, TryStash might return a backpressure future. + // If we are beyond the offloading threshold, TryStash might return a backpressure future. if (auto* ts = shard->tiered_storage(); ts) { auto bp = ts->TryStash(op_args_.db_cntx.db_index, key, pv, true); if (bp && params.backpressure) From 49d0a7635b86ce90c3b0ebc5fdc13e89c8f49ec0 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 2 Nov 2025 08:06:45 +0200 Subject: [PATCH 6/7] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Roman Gershman --- src/server/tiered_storage.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 744d4cb8eeb9..903e1c625da8 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -61,7 +61,7 @@ class TieredStorage { std::function modf); // Stash value. Sets IO_PENDING flag and unsets it on error or when finished. - // Returns optional future for backpressure if `provide_bp` is set and conditons are met. + // Returns optional future for backpressure if `provide_bp` is set and conditions are met. std::optional> TryStash(DbIndex dbid, std::string_view key, PrimeValue* value, bool provide_bp = false); From 0eda36eb31278c8f581f47f5a3df52eca43b90d3 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Sun, 2 Nov 2025 13:21:23 +0300 Subject: [PATCH 7/7] Make timeout 5ms --- src/server/string_family.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 3bd825a41944..51cb8175b01e 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -1071,7 +1071,7 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) { // If backpressure was provided, wait with reasonable limit (to avoid client deadlocking). if (backpressure) { - std::move(backpressure)->GetFor(10ms); + std::move(backpressure)->GetFor(5ms); } if (sparams.flags & SetCmd::SET_GET) {