Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
"db/blob/blob_file_cache.cc",
"db/blob/blob_file_completion_callback.cc",
"db/blob/blob_file_garbage.cc",
"db/blob/blob_file_meta.cc",
"db/blob/blob_file_partition_manager.cc",
"db/blob/blob_file_reader.cc",
"db/blob/blob_garbage_meter.cc",
"db/blob/blob_log_format.cc",
"db/blob/blob_log_sequential_reader.cc",
"db/blob/blob_log_writer.cc",
"db/blob/blob_source.cc",
"db/blob/blob_write_batch_transformer.cc",
"db/blob/orphan_blob_file_resolver.cc",
"db/blob/prefetch_buffer_collection.cc",
"db/builder.cc",
"db/c.cc",
Expand Down Expand Up @@ -4804,6 +4808,12 @@ cpp_unittest_wrapper(name="db_blob_corruption_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_blob_direct_write_test",
srcs=["db/blob/db_blob_direct_write_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_blob_index_test",
srcs=["db/blob/db_blob_index_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -707,14 +707,18 @@ set(SOURCES
db/blob/blob_file_addition.cc
db/blob/blob_file_builder.cc
db/blob/blob_file_cache.cc
db/blob/blob_file_completion_callback.cc
db/blob/blob_file_garbage.cc
db/blob/blob_file_meta.cc
db/blob/blob_file_partition_manager.cc
db/blob/blob_file_reader.cc
db/blob/blob_garbage_meter.cc
db/blob/blob_log_format.cc
db/blob/blob_log_sequential_reader.cc
db/blob/blob_log_writer.cc
db/blob/blob_source.cc
db/blob/blob_write_batch_transformer.cc
db/blob/orphan_blob_file_resolver.cc
db/blob/prefetch_buffer_collection.cc
db/builder.cc
db/c.cc
Expand Down Expand Up @@ -1387,6 +1391,7 @@ if(WITH_TESTS)
db/blob/blob_source_test.cc
db/blob/db_blob_basic_test.cc
db/blob/db_blob_compaction_test.cc
db/blob/db_blob_direct_write_test.cc
db/blob/db_blob_corruption_test.cc
db/blob/db_blob_index_test.cc
db/column_family_test.cc
Expand Down
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ PARALLEL_TEST = $(filter-out $(NON_PARALLEL_TEST), $(ROCKSDBTESTS_SUBSET))
TESTS_PLATFORM_DEPENDENT := \
db_basic_test \
db_blob_basic_test \
db_blob_direct_write_test \
db_encryption_test \
external_sst_file_basic_test \
auto_roll_logger_test \
Expand Down Expand Up @@ -1048,6 +1049,7 @@ ifneq ($(PLATFORM), OS_AIX)
$(PYTHON) tools/check_all_python.py
ifndef ASSERT_STATUS_CHECKED # not yet working with these tests
$(PYTHON) tools/ldb_test.py
$(PYTHON) tools/db_crashtest_test.py
sh tools/rocksdb_dump_test.sh
endif
endif
Expand All @@ -1065,6 +1067,10 @@ check_some: $(ROCKSDBTESTS_SUBSET)
ldb_tests: ldb
$(PYTHON) tools/ldb_test.py

.PHONY: db_crashtest_tests
db_crashtest_tests:
$(PYTHON) tools/db_crashtest_test.py

include crash_test.mk

asan_check: clean
Expand Down Expand Up @@ -1444,6 +1450,9 @@ db_blob_basic_test: $(OBJ_DIR)/db/blob/db_blob_basic_test.o $(TEST_LIBRARY) $(LI
db_blob_compaction_test: $(OBJ_DIR)/db/blob/db_blob_compaction_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_blob_direct_write_test: $(OBJ_DIR)/db/blob/db_blob_direct_write_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_readonly_with_timestamp_test: $(OBJ_DIR)/db/db_readonly_with_timestamp_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
27 changes: 19 additions & 8 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include "db/arena_wrapped_db_iter.h"

#include "db/blob/blob_file_cache.h"
#include "db/column_family.h"
#include "memory/arena.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
Expand Down Expand Up @@ -44,18 +46,21 @@ void ArenaWrappedDBIter::Init(
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t version_number,
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
bool expose_blob_index, bool allow_refresh, ReadOnlyMemTable* active_mem) {
bool expose_blob_index, bool allow_refresh, ReadOnlyMemTable* active_mem,
BlobFileCache* blob_file_cache,
BlobFilePartitionManager* blob_partition_mgr) {
read_options_ = read_options;
if (!CheckFSFeatureSupport(env->GetFileSystem().get(),
FSSupportedOps::kAsyncIO)) {
read_options_.async_io = false;
}
read_options_.total_order_seek |= ioptions.prefix_seek_opt_in_only;

db_iter_ = DBIter::NewIter(
env, read_options_, ioptions, mutable_cf_options,
ioptions.user_comparator, /*internal_iter=*/nullptr, version, sequence,
read_callback, active_mem, cfh, expose_blob_index, &arena_);
db_iter_ = DBIter::NewIter(env, read_options_, ioptions, mutable_cf_options,
ioptions.user_comparator,
/*internal_iter=*/nullptr, version, sequence,
read_callback, active_mem, cfh, expose_blob_index,
&arena_, blob_file_cache, blob_partition_mgr);

sv_number_ = version_number;
allow_refresh_ = allow_refresh;
Expand Down Expand Up @@ -164,9 +169,13 @@ void ArenaWrappedDBIter::DoRefresh(const Snapshot* snapshot,
if (read_callback_) {
read_callback_->Refresh(read_seq);
}
// Obtain blob_partition_manager from CFD so refreshed iterators can
// still resolve unflushed write-path blob values.
BlobFilePartitionManager* blob_partition_mgr = cfd->blob_partition_manager();
Init(env, read_options_, cfd->ioptions(), sv->mutable_cf_options, sv->current,
read_seq, sv->version_number, read_callback_, cfh_, expose_blob_index_,
allow_refresh_, allow_mark_memtable_for_flush_ ? sv->mem : nullptr);
allow_refresh_, allow_mark_memtable_for_flush_ ? sv->mem : nullptr,
cfd->blob_file_cache(), blob_partition_mgr);

InternalIterator* internal_iter = db_impl->NewInternalIterator(
read_options_, cfd, sv, &arena_, read_seq,
Expand Down Expand Up @@ -254,13 +263,15 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, const SequenceNumber& sequence,
ReadCallback* read_callback, DBImpl* db_impl, bool expose_blob_index,
bool allow_refresh, bool allow_mark_memtable_for_flush) {
bool allow_refresh, bool allow_mark_memtable_for_flush,
BlobFilePartitionManager* blob_partition_mgr) {
ArenaWrappedDBIter* db_iter = new ArenaWrappedDBIter();
db_iter->Init(env, read_options, cfh->cfd()->ioptions(),
sv->mutable_cf_options, sv->current, sequence,
sv->version_number, read_callback, cfh, expose_blob_index,
allow_refresh,
allow_mark_memtable_for_flush ? sv->mem : nullptr);
allow_mark_memtable_for_flush ? sv->mem : nullptr,
cfh->cfd()->blob_file_cache(), blob_partition_mgr);
if (cfh != nullptr && allow_refresh) {
db_iter->StoreRefreshInfo(cfh, read_callback, expose_blob_index);
}
Expand Down
7 changes: 5 additions & 2 deletions db/arena_wrapped_db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ class ArenaWrappedDBIter : public Iterator {
const SequenceNumber& sequence, uint64_t version_number,
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
bool expose_blob_index, bool allow_refresh,
ReadOnlyMemTable* active_mem);
ReadOnlyMemTable* active_mem,
BlobFileCache* blob_file_cache = nullptr,
BlobFilePartitionManager* blob_partition_mgr = nullptr);

// Store some parameters so we can refresh the iterator at a later point
// with these same params
Expand Down Expand Up @@ -144,5 +146,6 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, const SequenceNumber& sequence,
ReadCallback* read_callback, DBImpl* db_impl, bool expose_blob_index,
bool allow_refresh, bool allow_mark_memtable_for_flush);
bool allow_refresh, bool allow_mark_memtable_for_flush,
BlobFilePartitionManager* blob_partition_mgr = nullptr);
} // namespace ROCKSDB_NAMESPACE
36 changes: 32 additions & 4 deletions db/blob/blob_file_addition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ namespace ROCKSDB_NAMESPACE {
enum BlobFileAddition::CustomFieldTags : uint32_t {
kEndMarker,

kPhysicalFileSize,

// Add forward compatible fields here

/////////////////////////////////////////////////////////////////////
Expand All @@ -41,6 +43,13 @@ void BlobFileAddition::EncodeTo(std::string* output) const {
// CustomFieldTags above) followed by a length prefixed slice. Unknown custom
// fields will be ignored during decoding unless they're in the forward
// incompatible range.
if (file_size_ != 0 && file_size_ != DefaultFileSize(total_blob_bytes_)) {
std::string encoded_file_size;
PutVarint64(&encoded_file_size, file_size_);

PutVarint32(output, kPhysicalFileSize);
PutLengthPrefixedSlice(output, Slice(encoded_file_size));
}

TEST_SYNC_POINT_CALLBACK("BlobFileAddition::EncodeTo::CustomFields", output);

Expand Down Expand Up @@ -73,6 +82,8 @@ Status BlobFileAddition::DecodeFrom(Slice* input) {
return Status::Corruption(class_name, "Error decoding checksum value");
}
checksum_value_ = checksum_value.ToString();
file_size_ = ResolveFileSize(blob_file_number_, total_blob_bytes_,
/*file_size=*/0);

while (true) {
uint32_t custom_field_tag = 0;
Expand All @@ -94,6 +105,21 @@ Status BlobFileAddition::DecodeFrom(Slice* input) {
return Status::Corruption(class_name,
"Error decoding custom field value");
}

switch (custom_field_tag) {
case kPhysicalFileSize: {
uint64_t file_size = 0;
if (!GetVarint64(&custom_field_value, &file_size) ||
!custom_field_value.empty()) {
return Status::Corruption(class_name, "Error decoding file size");
}
file_size_ =
ResolveFileSize(blob_file_number_, total_blob_bytes_, file_size);
break;
}
default:
break;
}
}

return Status::OK();
Expand Down Expand Up @@ -122,7 +148,8 @@ bool operator==(const BlobFileAddition& lhs, const BlobFileAddition& rhs) {
lhs.GetTotalBlobCount() == rhs.GetTotalBlobCount() &&
lhs.GetTotalBlobBytes() == rhs.GetTotalBlobBytes() &&
lhs.GetChecksumMethod() == rhs.GetChecksumMethod() &&
lhs.GetChecksumValue() == rhs.GetChecksumValue();
lhs.GetChecksumValue() == rhs.GetChecksumValue() &&
lhs.GetFileSize() == rhs.GetFileSize();
}

bool operator!=(const BlobFileAddition& lhs, const BlobFileAddition& rhs) {
Expand All @@ -134,6 +161,7 @@ std::ostream& operator<<(std::ostream& os,
os << "blob_file_number: " << blob_file_addition.GetBlobFileNumber()
<< " total_blob_count: " << blob_file_addition.GetTotalBlobCount()
<< " total_blob_bytes: " << blob_file_addition.GetTotalBlobBytes()
<< " file_size: " << blob_file_addition.GetFileSize()
<< " checksum_method: " << blob_file_addition.GetChecksumMethod()
<< " checksum_value: "
<< Slice(blob_file_addition.GetChecksumValue()).ToString(/* hex */ true);
Expand All @@ -145,9 +173,9 @@ JSONWriter& operator<<(JSONWriter& jw,
const BlobFileAddition& blob_file_addition) {
jw << "BlobFileNumber" << blob_file_addition.GetBlobFileNumber()
<< "TotalBlobCount" << blob_file_addition.GetTotalBlobCount()
<< "TotalBlobBytes" << blob_file_addition.GetTotalBlobBytes()
<< "ChecksumMethod" << blob_file_addition.GetChecksumMethod()
<< "ChecksumValue"
<< "TotalBlobBytes" << blob_file_addition.GetTotalBlobBytes() << "FileSize"
<< blob_file_addition.GetFileSize() << "ChecksumMethod"
<< blob_file_addition.GetChecksumMethod() << "ChecksumValue"
<< Slice(blob_file_addition.GetChecksumValue()).ToString(/* hex */ true);

return jw;
Expand Down
26 changes: 24 additions & 2 deletions db/blob/blob_file_addition.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <string>

#include "db/blob/blob_constants.h"
#include "db/blob/blob_log_format.h"
#include "rocksdb/rocksdb_namespace.h"

namespace ROCKSDB_NAMESPACE {
Expand All @@ -25,12 +26,14 @@ class BlobFileAddition {

BlobFileAddition(uint64_t blob_file_number, uint64_t total_blob_count,
uint64_t total_blob_bytes, std::string checksum_method,
std::string checksum_value)
std::string checksum_value, uint64_t file_size = 0)
: blob_file_number_(blob_file_number),
total_blob_count_(total_blob_count),
total_blob_bytes_(total_blob_bytes),
checksum_method_(std::move(checksum_method)),
checksum_value_(std::move(checksum_value)) {
checksum_value_(std::move(checksum_value)),
file_size_(
ResolveFileSize(blob_file_number, total_blob_bytes, file_size)) {
assert(checksum_method_.empty() == checksum_value_.empty());
}

Expand All @@ -39,6 +42,7 @@ class BlobFileAddition {
uint64_t GetTotalBlobBytes() const { return total_blob_bytes_; }
const std::string& GetChecksumMethod() const { return checksum_method_; }
const std::string& GetChecksumValue() const { return checksum_value_; }
uint64_t GetFileSize() const { return file_size_; }

void EncodeTo(std::string* output) const;
Status DecodeFrom(Slice* input);
Expand All @@ -49,11 +53,29 @@ class BlobFileAddition {
private:
enum CustomFieldTags : uint32_t;

static uint64_t DefaultFileSize(uint64_t total_blob_bytes) {
return BlobLogHeader::kSize + total_blob_bytes + BlobLogFooter::kSize;
}

static uint64_t ResolveFileSize(uint64_t blob_file_number,
uint64_t total_blob_bytes,
uint64_t file_size) {
if (file_size != 0) {
return file_size;
}
return blob_file_number == kInvalidBlobFileNumber
? 0
: DefaultFileSize(total_blob_bytes);
}

uint64_t blob_file_number_ = kInvalidBlobFileNumber;
uint64_t total_blob_count_ = 0;
uint64_t total_blob_bytes_ = 0;
std::string checksum_method_;
std::string checksum_value_;
// Physical sealed file size. This can exceed the logical blob bytes when a
// direct-write file contains orphaned records that remain on disk.
uint64_t file_size_ = 0;
};

bool operator==(const BlobFileAddition& lhs, const BlobFileAddition& rhs);
Expand Down
23 changes: 23 additions & 0 deletions db/blob/blob_file_addition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ TEST_F(BlobFileAdditionTest, Empty) {
ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(), 0);
ASSERT_TRUE(blob_file_addition.GetChecksumMethod().empty());
ASSERT_TRUE(blob_file_addition.GetChecksumValue().empty());
ASSERT_EQ(blob_file_addition.GetFileSize(), 0);

TestEncodeDecode(blob_file_addition);
}
Expand All @@ -59,6 +60,28 @@ TEST_F(BlobFileAdditionTest, NonEmpty) {
ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(), total_blob_bytes);
ASSERT_EQ(blob_file_addition.GetChecksumMethod(), checksum_method);
ASSERT_EQ(blob_file_addition.GetChecksumValue(), checksum_value);
ASSERT_EQ(blob_file_addition.GetFileSize(),
total_blob_bytes + BlobLogHeader::kSize + BlobLogFooter::kSize);

TestEncodeDecode(blob_file_addition);
}

TEST_F(BlobFileAdditionTest, NonDefaultFileSize) {
constexpr uint64_t blob_file_number = 124;
constexpr uint64_t total_blob_count = 2;
constexpr uint64_t total_blob_bytes = 123456;
constexpr uint64_t file_size =
total_blob_bytes + BlobLogHeader::kSize + BlobLogFooter::kSize + 128;
const std::string checksum_method("SHA1");
const std::string checksum_value(
"\xbd\xb7\xf3\x4a\x59\xdf\xa1\x59\x2c\xe7\xf5\x2e\x99\xf9\x8c\x57\x0c\x52"
"\x5c\xbd");

BlobFileAddition blob_file_addition(blob_file_number, total_blob_count,
total_blob_bytes, checksum_method,
checksum_value, file_size);

ASSERT_EQ(blob_file_addition.GetFileSize(), file_size);

TestEncodeDecode(blob_file_addition);
}
Expand Down
Loading
Loading