Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1874,7 +1874,23 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
unsigned hash_cnt;
string filter_data;
SET_OR_UNEXPECT(LoadLen(nullptr), hash_cnt);
SET_OR_UNEXPECT(FetchGenericString(), filter_data);

unsigned total_size = 0;
SET_OR_UNEXPECT(LoadLen(nullptr), total_size);

filter_data.resize(total_size);
size_t offset = 0;
while (offset < total_size) {
unsigned chunk_size = 0;
SET_OR_UNEXPECT(LoadLen(nullptr), chunk_size);
error_code ec = FetchBuf(chunk_size, filter_data.data() + offset);
if (ec) {
return make_unexpected(ec);
}

offset += chunk_size;
}

size_t bit_len = filter_data.size() * 8;
if (!is_power2(bit_len)) { // must be power of two
return Unexpected(errc::rdb_file_corrupted);
Expand Down
11 changes: 9 additions & 2 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -623,11 +623,18 @@ std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) {
RETURN_ON_ERR(SaveLen(sbf->hashfunc_cnt(i)));

string_view blob = sbf->data(i);
RETURN_ON_ERR(SaveString(blob));
size_t num_chunks = (blob.size() + kFilterChunkSize - 1) / kFilterChunkSize;
RETURN_ON_ERR(SaveLen(blob.size()));

for (size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx) {
size_t offset = chunk_idx * kFilterChunkSize;
size_t chunk_len = std::min(kFilterChunkSize, blob.size() - offset);
RETURN_ON_ERR(SaveString(blob.substr(offset, chunk_len)));
}

FlushState flush_state = FlushState::kFlushMidEntry;
if ((i + 1) == sbf->num_filters())
flush_state = FlushState::kFlushEndEntry;

FlushIfNeeded(flush_state);
}

Expand Down
2 changes: 2 additions & 0 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ extern "C" {
#include "server/journal/types.h"
#include "server/table.h"

constexpr size_t kFilterChunkSize = 1ULL << 26;

typedef struct rax rax;
typedef struct streamCG streamCG;
typedef struct quicklistNode quicklistNode;
Expand Down
27 changes: 27 additions & 0 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,33 @@ TEST_F(RdbTest, SBF) {
EXPECT_THAT(Run({"BF.EXISTS", "k", "1"}), IntArg(1));
}

TEST_F(RdbTest, SBFLargeFilterChunking) {
max_memory_limit = 200000000;

// Using this set of parameters for the BF.RESERVE command resulted in a
// filter size large enough to require chunking (> 64 MB).
const double error_rate = 0.001;
const size_t capacity = 50'000'000;
const size_t num_items = 100;

size_t collisions = 0;

Run({"BF.RESERVE", "large_key", std::to_string(error_rate), std::to_string(capacity)});
for (size_t i = 0; i < num_items; i++) {
auto res = Run({"BF.ADD", "large_key", absl::StrCat("item", i)});
if (*res.GetInt() == 0)
collisions++;
}
EXPECT_LT(static_cast<double>(collisions) / num_items, error_rate);

Run({"debug", "reload"});
EXPECT_EQ(Run({"type", "large_key"}), "MBbloom--");

for (size_t i = 0; i < num_items; i++) {
EXPECT_THAT(Run({"BF.EXISTS", "large_key", absl::StrCat("item", i)}), IntArg(1));
}
}

TEST_F(RdbTest, RestoreSearchIndexNameStartingWithColon) {
// Create an index with a name that starts with ':' and add a sample document
EXPECT_EQ(Run({"FT.CREATE", ":Order:index", "ON", "HASH", "PREFIX", "1", ":Order:", "SCHEMA",
Expand Down
Loading