Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class SetCmd {
uint32_t memcache_flags = 0;
uint64_t expire_after_ms = 0; // Relative value based on now. 0 means no expiration.
optional<StringResult>* prev_val = nullptr; // if set, previous value will be stored if found
optional<util::fb2::Future<bool>>* backpressure = nullptr;

constexpr bool IsConditionalSet() const {
return flags & SET_IF_NOTEXIST || flags & SET_IF_EXISTS;
Expand Down Expand Up @@ -935,8 +936,12 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string
EngineShard* shard = op_args_.shard;

// Currently we always try to offload, but Stash may ignore it, if disk I/O is overloaded.
if (auto* ts = shard->tiered_storage(); ts)
ts->TryStash(op_args_.db_cntx.db_index, key, pv);
// If we are beyond the offloading threshold, TryStash might return a backpressure future.
if (auto* ts = shard->tiered_storage(); ts) {
auto bp = ts->TryStash(op_args_.db_cntx.db_index, key, pv, true);
if (bp && params.backpressure)
*params.backpressure = std::move(*bp);
}

if (explicit_journal_ && op_args_.shard->journal()) {
RecordJournal(params, key, value);
Expand Down Expand Up @@ -1056,11 +1061,19 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) {
if (sparams.flags & SetCmd::SET_GET)
sparams.prev_val = &prev;

optional<util::fb2::Future<bool>> backpressure;
sparams.backpressure = &backpressure;

OpStatus result = SetGeneric(sparams, key, value, cmnd_cntx);
if (result == OpStatus::WRONG_TYPE) {
return builder->SendError(kWrongTypeErr);
}

// If backpressure was provided, wait with reasonable limit (to avoid client deadlocking).
if (backpressure) {
std::move(backpressure)->GetFor(5ms);
}

if (sparams.flags & SetCmd::SET_GET) {
return GetReplies{cmnd_cntx.rb}.Send(std::move(prev));
}
Expand Down
32 changes: 27 additions & 5 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {

// Clear IO pending flag for entry
void ClearIoPending(OpManager::KeyRef key) {
UnblockBackpressure(key, false);
if (auto pv = Find(key); pv) {
pv->SetStashPending(false);
stats_.total_cancels++;
Expand Down Expand Up @@ -143,6 +144,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
// Find entry by key in db_slice and store external segment in place of original value.
// Update memory stats
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
UnblockBackpressure(key, true);
if (auto* pv = Find(key); pv) {
auto* stats = GetDbTableStats(key.first);

Expand All @@ -169,6 +171,12 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
SetExternal({sub_dbid, sub_key}, sub_segment);
}

// If any backpressure (throttling) is active, notify that the operation finished
void UnblockBackpressure(OpManager::KeyRef id, bool result) {
if (auto node = ts_->stash_backpressure_.extract(id); !node.empty())
node.mapped().Resolve(result);
}

struct {
uint64_t total_stashes = 0, total_cancels = 0, total_fetches = 0;
uint64_t total_defrags = 0;
Expand Down Expand Up @@ -309,6 +317,8 @@ error_code TieredStorage::Open(string_view base_path) {
}

void TieredStorage::Close() {
for (auto& [_, f] : stash_backpressure_)
f.Resolve(false);
op_manager_->Close();
}

Expand Down Expand Up @@ -350,9 +360,10 @@ template TieredStorage::TResult<size_t> TieredStorage::Modify(
DbIndex dbid, std::string_view key, const PrimeValue& value,
std::function<size_t(std::string*)> modf);

bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, string_view key,
PrimeValue* value, bool provide_bp) {
if (!ShouldStash(*value))
return false;
return {};

// This invariant should always hold because ShouldStash tests for IoPending flag.
CHECK(!bins_->IsPending(dbid, key));
Expand All @@ -361,7 +372,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
// with a lot of underutilized disk space.
if (op_manager_->GetStats().pending_stash_cnt >= config_.write_depth_limit) {
++stats_.stash_overflow_cnt;
return false;
return {};
}

StringOrView raw_string = value->GetRawString();
Expand All @@ -375,15 +386,21 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
} else if (auto bin = bins_->Stash(dbid, key, raw_string.view()); bin) {
id = bin->first;
ec = op_manager_->Stash(id, bin->second);
} else {
return {}; // Silently added to bin
}

if (ec) {
LOG_IF(ERROR, ec != errc::file_too_large) << "Stash failed immediately" << ec.message();
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
return false;
return {};
}

return true;
// If we are in the active offloading phase, throttle stashes by providing backpressure future
if (provide_bp && ShouldOffload())
return stash_backpressure_[{dbid, string{key}}];

return {};
}

void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
Expand All @@ -405,6 +422,11 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {

void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
DCHECK(value->HasStashPending());

// If any previous write was happening, it has been cancelled
if (auto node = stash_backpressure_.extract(make_pair(dbid, key)); !node.empty())
std::move(node.mapped()).Resolve(false);

if (OccupiesWholePages(value->Size())) {
op_manager_->Delete(KeyRef(dbid, key));
} else if (auto bin = bins_->Delete(dbid, key); bin) {
Expand Down
11 changes: 8 additions & 3 deletions src/server/tiered_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "server/common.h"
#include "server/table.h"
#include "server/tiering/common.h"
#include "server/tiering/entry_map.h"
#include "server/tx_base.h"
#include "util/fibers/future.h"

Expand Down Expand Up @@ -59,9 +60,10 @@ class TieredStorage {
TResult<T> Modify(DbIndex dbid, std::string_view key, const PrimeValue& value,
std::function<T(std::string*)> modf);

// Stash value. Sets IO_PENDING flag and unsets it on error or when finished
// Returns true if item was scheduled for stashing.
bool TryStash(DbIndex dbid, std::string_view key, PrimeValue* value);
// Stash value. Sets IO_PENDING flag and unsets it on error or when finished.
// Returns optional future for backpressure if `provide_bp` is set and conditions are met.
std::optional<util::fb2::Future<bool>> TryStash(DbIndex dbid, std::string_view key,
PrimeValue* value, bool provide_bp = false);

// Delete value, must be offloaded (external type)
void Delete(DbIndex dbid, PrimeValue* value);
Expand Down Expand Up @@ -105,6 +107,9 @@ class TieredStorage {

PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off

// Stash operations waiting for completion to throttle
tiering::EntryMap<::util::fb2::Future<bool>> stash_backpressure_;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either a future or... a blocking counter? I don't see how we can easily use something more lightweight


std::unique_ptr<ShardOpManager> op_manager_;
std::unique_ptr<tiering::SmallBins> bins_;

Expand Down
39 changes: 39 additions & 0 deletions src/server/tiering/entry_map.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2025, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <absl/container/flat_hash_map.h>

#include <string>

#include "server/tx_base.h"

namespace dfly::tiering {

namespace detail {
struct Hasher {
using is_transparent = void;
template <typename S> size_t operator()(const std::pair<DbIndex, S>& p) const {
return absl::HashOf(p);
}
};

struct Eq {
using is_transparent = void;
template <typename S1, typename S2>
bool operator()(const std::pair<DbIndex, S1>& l, const std::pair<DbIndex, S2>& r) const {
const auto& [i1, s1] = l;
const auto& [i2, s2] = r;
return i1 == i2 && s1 == s2;
}
};
} // namespace detail

// Map of key (db index, string key) -> T with heterogeneous lookup
template <typename T>
using EntryMap =
absl::flat_hash_map<std::pair<DbIndex, std::string>, T, detail::Hasher, detail::Eq>;

} // namespace dfly::tiering
7 changes: 2 additions & 5 deletions src/server/tiering/small_bins.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,7 @@ std::vector<std::pair<DbIndex, std::string>> SmallBins::ReportStashAborted(BinId
}

std::optional<SmallBins::BinId> SmallBins::Delete(DbIndex dbid, std::string_view key) {
std::pair<DbIndex, std::string> key_pair{dbid, key};
auto it = current_bin_.find(key_pair);

if (it != current_bin_.end()) {
if (auto it = current_bin_.find(make_pair(dbid, key)); it != current_bin_.end()) {
size_t stashed_size = StashedValueSize(it->second);
DCHECK_GE(current_bin_bytes_, stashed_size);

Expand All @@ -135,7 +132,7 @@ std::optional<SmallBins::BinId> SmallBins::Delete(DbIndex dbid, std::string_view
}

for (auto& [id, keys] : pending_bins_) {
if (keys.erase(key_pair))
if (keys.erase(make_pair(dbid, key)))
return keys.empty() ? std::make_optional(id) : std::nullopt;
}
return std::nullopt;
Expand Down
7 changes: 3 additions & 4 deletions src/server/tiering/small_bins.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <vector>

#include "server/tiering/disk_storage.h"
#include "server/tiering/entry_map.h"

namespace dfly::tiering {

Expand Down Expand Up @@ -84,12 +85,10 @@ class SmallBins {
BinId last_bin_id_ = 0;

unsigned current_bin_bytes_ = 0;
absl::flat_hash_map<std::pair<DbIndex, std::string>, std::string> current_bin_;
tiering::EntryMap<std::string> current_bin_;

// Pending stashes, their keys and value sizes
absl::flat_hash_map<unsigned /* id */,
absl::flat_hash_map<std::pair<DbIndex, std::string> /* key*/, DiskSegment>>
pending_bins_;
absl::flat_hash_map<unsigned /* id */, tiering::EntryMap<DiskSegment>> pending_bins_;

// Map of bins that were stashed and should be deleted when number of entries reaches 0
absl::flat_hash_map<size_t /*offset*/, StashInfo> stashed_bins_;
Expand Down
6 changes: 3 additions & 3 deletions tests/dragonfly/tiering_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ async def run(sub_ops):
"tiered_prefix": "/tmp/tiered/backing_master",
"maxmemory": "2.0G",
"cache_mode": True,
"tiered_offload_threshold": "0.9",
"tiered_offload_threshold": "0.6",
"tiered_upload_threshold": "0.2",
"tiered_storage_write_depth": 100,
"tiered_storage_write_depth": 1000,
}
)
async def test_full_sync(async_client: aioredis.Redis, df_factory: DflyInstanceFactory):
Expand All @@ -109,7 +109,7 @@ async def test_full_sync(async_client: aioredis.Redis, df_factory: DflyInstanceF
cache_mode=True,
maxmemory="2.0G",
tiered_prefix="/tmp/tiered/backing_replica",
tiered_offload_threshold="0.8",
tiered_offload_threshold="0.5",
tiered_storage_write_depth=1000,
)
replica.start()
Expand Down
Loading