diff --git a/src/server/string_family.cc b/src/server/string_family.cc index a80c8af8ef7e..51cb8175b01e 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -84,6 +84,7 @@ class SetCmd { uint32_t memcache_flags = 0; uint64_t expire_after_ms = 0; // Relative value based on now. 0 means no expiration. optional* prev_val = nullptr; // if set, previous value will be stored if found + optional>* backpressure = nullptr; constexpr bool IsConditionalSet() const { return flags & SET_IF_NOTEXIST || flags & SET_IF_EXISTS; @@ -935,8 +936,12 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string EngineShard* shard = op_args_.shard; // Currently we always try to offload, but Stash may ignore it, if disk I/O is overloaded. - if (auto* ts = shard->tiered_storage(); ts) - ts->TryStash(op_args_.db_cntx.db_index, key, pv); + // If we are beyond the offloading threshold, TryStash might return a backpressure future. + if (auto* ts = shard->tiered_storage(); ts) { + auto bp = ts->TryStash(op_args_.db_cntx.db_index, key, pv, true); + if (bp && params.backpressure) + *params.backpressure = std::move(*bp); + } if (explicit_journal_ && op_args_.shard->journal()) { RecordJournal(params, key, value); @@ -1056,11 +1061,19 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) { if (sparams.flags & SetCmd::SET_GET) sparams.prev_val = &prev; + optional> backpressure; + sparams.backpressure = &backpressure; + OpStatus result = SetGeneric(sparams, key, value, cmnd_cntx); if (result == OpStatus::WRONG_TYPE) { return builder->SendError(kWrongTypeErr); } + // If backpressure was provided, wait with reasonable limit (to avoid client deadlocking). + if (backpressure) { + std::move(backpressure)->GetFor(5ms); + } + if (sparams.flags & SetCmd::SET_GET) { return GetReplies{cmnd_cntx.rb}.Send(std::move(prev)); } diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 58db6480952a..633246763c0f 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -85,6 +85,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Clear IO pending flag for entry void ClearIoPending(OpManager::KeyRef key) { + UnblockBackpressure(key, false); if (auto pv = Find(key); pv) { pv->SetStashPending(false); stats_.total_cancels++; @@ -143,6 +144,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Find entry by key in db_slice and store external segment in place of original value. // Update memory stats void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) { + UnblockBackpressure(key, true); if (auto* pv = Find(key); pv) { auto* stats = GetDbTableStats(key.first); @@ -169,6 +171,12 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { SetExternal({sub_dbid, sub_key}, sub_segment); } + // If any backpressure (throttling) is active, notify that the operation finished + void UnblockBackpressure(OpManager::KeyRef id, bool result) { + if (auto node = ts_->stash_backpressure_.extract(id); !node.empty()) + node.mapped().Resolve(result); + } + struct { uint64_t total_stashes = 0, total_cancels = 0, total_fetches = 0; uint64_t total_defrags = 0; @@ -309,6 +317,8 @@ error_code TieredStorage::Open(string_view base_path) { } void TieredStorage::Close() { + for (auto& [_, f] : stash_backpressure_) + f.Resolve(false); op_manager_->Close(); } @@ -350,9 +360,10 @@ template TieredStorage::TResult TieredStorage::Modify( DbIndex dbid, std::string_view key, const PrimeValue& value, std::function modf); -bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) { +std::optional> TieredStorage::TryStash(DbIndex dbid, string_view key, + PrimeValue* value, bool provide_bp) { if (!ShouldStash(*value)) - return false; + return {}; // This invariant should always hold because ShouldStash tests for IoPending flag. CHECK(!bins_->IsPending(dbid, key)); @@ -361,7 +372,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) { // with a lot of underutilized disk space. if (op_manager_->GetStats().pending_stash_cnt >= config_.write_depth_limit) { ++stats_.stash_overflow_cnt; - return false; + return {}; } StringOrView raw_string = value->GetRawString(); @@ -375,15 +386,21 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) { } else if (auto bin = bins_->Stash(dbid, key, raw_string.view()); bin) { id = bin->first; ec = op_manager_->Stash(id, bin->second); + } else { + return {}; // Silently added to bin } if (ec) { LOG_IF(ERROR, ec != errc::file_too_large) << "Stash failed immediately" << ec.message(); visit([this](auto id) { op_manager_->ClearIoPending(id); }, id); - return false; + return {}; } - return true; + // If we are in the active offloading phase, throttle stashes by providing backpressure future + if (provide_bp && ShouldOffload()) + return stash_backpressure_[{dbid, string{key}}]; + + return {}; } void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) { @@ -405,6 +422,11 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) { void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) { DCHECK(value->HasStashPending()); + + // If any previous write was happening, it has been cancelled + if (auto node = stash_backpressure_.extract(make_pair(dbid, key)); !node.empty()) + std::move(node.mapped()).Resolve(false); + if (OccupiesWholePages(value->Size())) { op_manager_->Delete(KeyRef(dbid, key)); } else if (auto bin = bins_->Delete(dbid, key); bin) { diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 1d79ee3c5bdd..903e1c625da8 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -11,6 +11,7 @@ #include "server/common.h" #include "server/table.h" #include "server/tiering/common.h" +#include "server/tiering/entry_map.h" #include "server/tx_base.h" #include "util/fibers/future.h" @@ -59,9 +60,10 @@ class TieredStorage { TResult Modify(DbIndex dbid, std::string_view key, const PrimeValue& value, std::function modf); - // Stash value. Sets IO_PENDING flag and unsets it on error or when finished - // Returns true if item was scheduled for stashing. - bool TryStash(DbIndex dbid, std::string_view key, PrimeValue* value); + // Stash value. Sets IO_PENDING flag and unsets it on error or when finished. + // Returns optional future for backpressure if `provide_bp` is set and conditions are met. + std::optional> TryStash(DbIndex dbid, std::string_view key, + PrimeValue* value, bool provide_bp = false); // Delete value, must be offloaded (external type) void Delete(DbIndex dbid, PrimeValue* value); @@ -105,6 +107,9 @@ class TieredStorage { PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off + // Stash operations waiting for completion to throttle + tiering::EntryMap<::util::fb2::Future> stash_backpressure_; + std::unique_ptr op_manager_; std::unique_ptr bins_; diff --git a/src/server/tiering/entry_map.h b/src/server/tiering/entry_map.h new file mode 100644 index 000000000000..ec8a7feae633 --- /dev/null +++ b/src/server/tiering/entry_map.h @@ -0,0 +1,39 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include + +#include "server/tx_base.h" + +namespace dfly::tiering { + +namespace detail { +struct Hasher { + using is_transparent = void; + template size_t operator()(const std::pair& p) const { + return absl::HashOf(p); + } +}; + +struct Eq { + using is_transparent = void; + template + bool operator()(const std::pair& l, const std::pair& r) const { + const auto& [i1, s1] = l; + const auto& [i2, s2] = r; + return i1 == i2 && s1 == s2; + } +}; +} // namespace detail + +// Map of key (db index, string key) -> T with heterogeneous lookup +template +using EntryMap = + absl::flat_hash_map, T, detail::Hasher, detail::Eq>; + +} // namespace dfly::tiering diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index 50a800186985..df1dd8a33c64 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -122,10 +122,7 @@ std::vector> SmallBins::ReportStashAborted(BinId } std::optional SmallBins::Delete(DbIndex dbid, std::string_view key) { - std::pair key_pair{dbid, key}; - auto it = current_bin_.find(key_pair); - - if (it != current_bin_.end()) { + if (auto it = current_bin_.find(make_pair(dbid, key)); it != current_bin_.end()) { size_t stashed_size = StashedValueSize(it->second); DCHECK_GE(current_bin_bytes_, stashed_size); @@ -135,7 +132,7 @@ std::optional SmallBins::Delete(DbIndex dbid, std::string_view } for (auto& [id, keys] : pending_bins_) { - if (keys.erase(key_pair)) + if (keys.erase(make_pair(dbid, key))) return keys.empty() ? std::make_optional(id) : std::nullopt; } return std::nullopt; diff --git a/src/server/tiering/small_bins.h b/src/server/tiering/small_bins.h index d2f198f6e491..849f1291c9d3 100644 --- a/src/server/tiering/small_bins.h +++ b/src/server/tiering/small_bins.h @@ -11,6 +11,7 @@ #include #include "server/tiering/disk_storage.h" +#include "server/tiering/entry_map.h" namespace dfly::tiering { @@ -84,12 +85,10 @@ class SmallBins { BinId last_bin_id_ = 0; unsigned current_bin_bytes_ = 0; - absl::flat_hash_map, std::string> current_bin_; + tiering::EntryMap current_bin_; // Pending stashes, their keys and value sizes - absl::flat_hash_map /* key*/, DiskSegment>> - pending_bins_; + absl::flat_hash_map> pending_bins_; // Map of bins that were stashed and should be deleted when number of entries reaches 0 absl::flat_hash_map stashed_bins_; diff --git a/tests/dragonfly/tiering_test.py b/tests/dragonfly/tiering_test.py index 00111b710374..63cdcc110037 100644 --- a/tests/dragonfly/tiering_test.py +++ b/tests/dragonfly/tiering_test.py @@ -98,9 +98,9 @@ async def run(sub_ops): "tiered_prefix": "/tmp/tiered/backing_master", "maxmemory": "2.0G", "cache_mode": True, - "tiered_offload_threshold": "0.9", + "tiered_offload_threshold": "0.6", "tiered_upload_threshold": "0.2", - "tiered_storage_write_depth": 100, + "tiered_storage_write_depth": 1000, } ) async def test_full_sync(async_client: aioredis.Redis, df_factory: DflyInstanceFactory): @@ -109,7 +109,7 @@ async def test_full_sync(async_client: aioredis.Redis, df_factory: DflyInstanceF cache_mode=True, maxmemory="2.0G", tiered_prefix="/tmp/tiered/backing_replica", - tiered_offload_threshold="0.8", + tiered_offload_threshold="0.5", tiered_storage_write_depth=1000, ) replica.start()