Skip to content

Add blob direct write with partitioned blob files#14457

Draft
xingbowang wants to merge 2 commits intofacebook:mainfrom
xingbowang:2026_03_04_blob_memtable_partition
Draft

Add blob direct write with partitioned blob files#14457
xingbowang wants to merge 2 commits intofacebook:mainfrom
xingbowang:2026_03_04_blob_memtable_partition

Conversation

@xingbowang
Copy link
Copy Markdown
Contributor

@xingbowang xingbowang commented Mar 12, 2026

Summary

Add a new blob direct write feature with partitioned blob files that writes blob values directly to blob files during Put(), bypassing both WAL and memtable for large values. Only the small (~30 byte) BlobIndex pointer is stored in WAL and memtable. This reduces WAL write amplification, memtable memory usage, and blob write lock contention for large-value workloads.

Motivation

With standard blob separation, full blob values are first written to WAL, then stored in the memtable, and only separated into blob files during flush. For workloads with large values (e.g., 4KB–1MB), this means the WAL and memtable carry the full value payload even though it will eventually be stored separately. This wastes WAL bandwidth, inflates memtable memory, and adds unnecessary write amplification.

Additionally, the existing blob file write path uses a single blob file writer per column family, which becomes a serialization bottleneck under concurrent write workloads. Partitioned blob files address this by spreading writes across multiple independent blob files, each with its own lock, enabling true parallel blob I/O from multiple writer threads.

Design

Write Path

  • DBImpl::Put() fast path: For single-key puts where the value exceeds min_blob_size, the blob is written directly to a blob file and a BlobIndex-only WriteBatch is constructed, avoiding full value serialization entirely.
  • DBImpl::WriteImpl() batch path: For multi-key WriteBatch operations, a BlobWriteBatchTransformer iterates the batch, writes qualifying values to blob files, and replaces them with BlobIndex entries before the batch enters WAL/memtable.

BlobFilePartitionManager

A new BlobFilePartitionManager manages partitioned blob files for concurrent writes:

  • Partitioned writes: Multiple blob file partitions (configurable via blob_direct_write_partitions) each with their own mutex, reducing lock contention for concurrent writers.
  • Deferred flush mode (blob_direct_write_buffer_size > 0): Zero-copy buffering where Slice references point directly into the WriteBatch buffer. Background threads flush to disk in batches, amortizing syscall overhead. Includes backpressure with stall watermarks.
  • Sync mode (blob_direct_write_buffer_size = 0): Immediate write-through for maximum durability.
  • Pluggable partition strategy: Custom BlobFilePartitionStrategy interface for key/value-aware partition assignment (default: round-robin).

Flush Integration

  • On memtable flush, BlobFilePartitionManager::SealAllPartitions() finalizes open blob files and injects BlobFileAddition entries into the flush VersionEdit, so blob files are registered in the MANIFEST atomically with the flush SST.
  • Handles mempurge: if a flush is switched to mempurge, sealed blob file additions are returned to the partition manager for the next flush.

Crash Recovery

  • Orphan blob file recovery in DBImpl::Open(): Scans for blob files not registered in the MANIFEST (e.g., from crashes before flush), reads their headers to determine column family, validates records, and registers them via VersionEdit. Runs regardless of current enable_blob_direct_write setting to handle DBs previously opened with the feature.
  • WAL replay produces BlobIndex entries pointing to these recovered blob files, ensuring no data loss.

Read Path

  • DBIter and ArenaWrappedDBIter extended to resolve BlobIndex entries from direct-write blob files.
  • Deferred flush mode includes a 4-tier read fallback: pending records → in-flight records → BlobFileCache → blob file read.

New Options

  • enable_blob_direct_write (bool, default: false) — master switch
  • blob_direct_write_partitions (uint32, default: 1) — number of concurrent blob file partitions
  • blob_direct_write_buffer_size (uint64, default: 4MB) — per-partition write buffer; 0 = sync mode
  • blob_direct_write_use_direct_io (bool, default: false) — O_DIRECT for blob writes
  • blob_direct_write_flush_interval_ms (uint64, default: 0) — periodic background flush interval
  • blob_direct_write_partition_strategy (shared_ptr, default: round-robin)

New Statistics

  • BLOB_DB_DIRECT_WRITE_COUNT — number of blobs written via direct write
  • BLOB_DB_DIRECT_WRITE_BYTES — bytes written via direct write
  • BLOB_DB_DIRECT_WRITE_STALL_COUNT — writer stalls due to backpressure
  • BLOB_DB_COMPRESSION_MICROS — blob compression timing

Testing

  • 61 new tests in db_blob_direct_write_test.cc covering: basic put/get, multi-get, concurrent writers, compression (with Snappy availability checks), crash recovery, orphan recovery, WAL recovery, snapshot isolation, transactions (including 2PC), backpressure, multiple column families, file rotation, statistics, event listeners, file checksums, direct I/O, sync/deferred flush modes, and error injection.
  • db_stress and db_crashtest.py integration for continuous randomized testing.
  • All existing blob tests updated to coexist with the new code paths.
  • Full make check passes (39,454 tests, 0 failures).

New Files

  • db/blob/blob_file_partition_manager.cc/.h — core partition manager (~1,700 lines)
  • db/blob/blob_write_batch_transformer.cc/.h — WriteBatch transformation logic
  • db/blob/db_blob_direct_write_test.cc — comprehensive test suite (~2,000 lines)
  • db/blob/blob_file_completion_callback.cc — SstFileManager and EventListener integration

@meta-cla meta-cla bot added the CLA Signed label Mar 12, 2026
@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 12, 2026

⚠️ clang-tidy: 20 warning(s) on changed lines

Completed in 1772.4s.

Summary by check

Check Count
bugprone-unused-return-value 1
cppcoreguidelines-pro-type-member-init 1
google-readability-avoid-underscore-in-googletest-name 1
performance-inefficient-vector-operation 2
performance-unnecessary-copy-initialization 1
readability-braces-around-statements 14
Total 20

Details

db/blob/blob_file_cache.cc (1 warning(s))
db/blob/blob_file_cache.cc:187:3: warning: the value returned by this function should not be disregarded; neglecting it may lead to errors [bugprone-unused-return-value]
db/blob/blob_file_cache_test.cc (1 warning(s))
db/blob/blob_file_cache_test.cc:199:27: warning: avoid using "_" in test name "InsertBlobFileReader_PopulatesCache" according to Googletest FAQ [google-readability-avoid-underscore-in-googletest-name]
db/blob/blob_file_partition_manager.cc (6 warning(s))
db/blob/blob_file_partition_manager.cc:802:28: warning: statement should be inside braces [readability-braces-around-statements]
db/blob/blob_file_partition_manager.cc:805:23: warning: statement should be inside braces [readability-braces-around-statements]
db/blob/blob_file_partition_manager.cc:1282:32: warning: statement should be inside braces [readability-braces-around-statements]
db/blob/blob_file_partition_manager.cc:1287:34: warning: statement should be inside braces [readability-braces-around-statements]
db/blob/blob_file_partition_manager.cc:1295:21: warning: statement should be inside braces [readability-braces-around-statements]
db/blob/blob_file_partition_manager.cc:1323:34: warning: statement should be inside braces [readability-braces-around-statements]
db/blob/db_blob_direct_write_test.cc (7 warning(s))
db/blob/db_blob_direct_write_test.cc:75:7: warning: uninitialized record type: 'info' [cppcoreguidelines-pro-type-member-init]
db/blob/db_blob_direct_write_test.cc:1004:21: warning: the const qualified variable 'orphan_blob_path' is copy-constructed from a const reference; consider making it a const reference [performance-unnecessary-copy-initialization]
db/blob/db_blob_direct_write_test.cc:4971:28: warning: statement should be inside braces [readability-braces-around-statements]
db/blob/db_blob_direct_write_test.cc:5011:28: warning: statement should be inside braces [readability-braces-around-statements]
db/blob/db_blob_direct_write_test.cc:5121:28: warning: statement should be inside braces [readability-braces-around-statements]
db/blob/db_blob_direct_write_test.cc:5349:5: warning: 'emplace_back' is called inside a loop; consider pre-allocating the container capacity before the loop [performance-inefficient-vector-operation]
db/blob/db_blob_direct_write_test.cc:6155:5: warning: 'push_back' is called inside a loop; consider pre-allocating the container capacity before the loop [performance-inefficient-vector-operation]
db/db_impl/db_impl.cc (1 warning(s))
db/db_impl/db_impl.cc:595:14: warning: statement should be inside braces [readability-braces-around-statements]
db/db_impl/db_impl_compaction_flush.cc (4 warning(s))
db/db_impl/db_impl_compaction_flush.cc:674:16: warning: statement should be inside braces [readability-braces-around-statements]
db/db_impl/db_impl_compaction_flush.cc:709:19: warning: statement should be inside braces [readability-braces-around-statements]
db/db_impl/db_impl_compaction_flush.cc:924:47: warning: statement should be inside braces [readability-braces-around-statements]
db/db_impl/db_impl_compaction_flush.cc:926:14: warning: statement should be inside braces [readability-braces-around-statements]

@xingbowang xingbowang force-pushed the 2026_03_04_blob_memtable_partition branch from f83ed92 to 6987d8b Compare March 19, 2026 16:22
@xingbowang xingbowang changed the title 2026 03 04 blob memtable partition Add blob direct write: bypass WAL and memtable for large blob values Mar 19, 2026
@xingbowang xingbowang changed the title Add blob direct write: bypass WAL and memtable for large blob values Blob direct write with partitions Mar 19, 2026
@xingbowang xingbowang changed the title Blob direct write with partitions Add blob direct write with partitioned blob files Mar 19, 2026
@xingbowang xingbowang force-pushed the 2026_03_04_blob_memtable_partition branch 2 times, most recently from e44da5c to a3af35b Compare March 19, 2026 19:13
// Public interface for blob file partition assignment.
// Users can implement custom strategies to control which partition
// a blob is written to, based on key and value content.
// Used with the blob direct write feature (enable_blob_direct_write).
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could partition feature work without blob direct write feature? Theoretically, it should be able to. Basically, during flush, it will allocate multiple blob files to writes and seal all of them at the end.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good point. Updated the class comment to note that the partition strategy is general-purpose and also applicable during flush (allocating multiple blob files to writers), not just direct write. Will be in the next push.

virtual ~BlobFilePartitionStrategy() = default;

// Select a partition index for the given key and value.
// Returns a value in [0, num_partitions). Out-of-range values are
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should just allow any number in type uint32 returned in the function, then we internally modulo with num_partitions. This allows us to adjust num_partitions at runtime if we need to.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good idea. Changed the API contract: SelectPartition can now return any uint32_t, and the caller applies % num_partitions internally. Removed the InvalidArgument bounds-check in WriteBlob. The num_partitions parameter is kept as a hint. This decouples implementations from the partition count, enabling runtime adjustment. Note: num_partitions_ itself is const uint32_t — immutable after DB open. Will be in the next push.

Comment on lines +44 to +45
// Const: implementations needing internal state (e.g., round-robin
// counters) should use mutable or std::atomic members.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would multiple BlobFilePartitionStrategy be created? Why couldn't we allow it to hold some state and accessed through mutex or something by the application?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, only one BlobFilePartitionStrategy instance is created and shared across all writer threads for the entire lifetime of the DB (from open to close). It's passed as a shared_ptr into the single BlobFilePartitionManager at DB::Open() time.

This is why the thread-safety requirement on SelectPartition() matters — users can use any thread-safe mechanism they want (mutex, atomics, concurrent data structures, etc.). Removed the prescriptive comment about mutable/atomic — the const qualifier on the method and the thread-safety note are sufficient.

const Slice& key, const WideColumns& columns);

// PutEntity with pre-serialized entity (for pass-through in transformers).
static Status PutEntityRaw(WriteBatch* batch, uint32_t column_family_id,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Added detailed comment. PutEntityRaw exists because the WriteBatch::Handler::PutEntityCF callback receives the entity already serialized as a Slice. Using regular PutEntity (which takes WideColumns) would require a redundant deserialize-then-reserialize round-trip. PutEntityRaw accepts the pre-serialized Slice directly.

Note: wide column entities are currently passed through unchanged (no per-column blob separation). Wide column V2 integration with blob direct write will be done in a separate follow-up PR. Will be in the next push.

static size_t GetFirstOffset(WriteBatch* batch);

static Slice Contents(const WriteBatch* batch) { return Slice(batch->rep_); }
static std::string& Rep(WriteBatch* batch) { return batch->rep_; }
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposing rep_, an internal implementation representation seems like a bad idea. Why do we need this, any way to design the API in a safer way? Say lock this down to perform very specific operation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Removed. Rep() was added but never actually used. All internal code accesses rep_ through the existing friend class mechanism or via Contents() (read-only Slice). No need to expose a mutable reference. Will be in the next push.

Comment on lines +1186 to +1187
// NOTE: The partition manager is shared across all column families.
// When multiple CFs enable this feature with different settings,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong. We should have this configuration per column family. Same for the other configurations in this new feature.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Agreed — these configurations should be per-column-family. Currently the partition manager is shared across CFs and settings are aggregated via max(). The architecture note in BlobFilePartitionManager.h already flags this: "Per-CF partition managers would be more consistent with RocksDB's per-CF blob file model." Will address making configs truly per-CF. Will be addressed in the next commit.

Comment on lines +1209 to +1212
// CRASH SAFETY: When buffer_size > 0 and sync=false, buffered blob
// records may be lost on crash even if the WAL survives. WAL replay
// will produce BlobIndex entries pointing to unwritten blob data.
// Use sync=true or buffer_size=0 to avoid this window.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Could you also add a comment on how this work with blob_direct_write_flush_interval_ms?
  2. Could you clarify what happens to those blob index during WAL replay, would they get discarded?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: 1. Will add a comment explaining the interaction: when both buffer_size > 0 and flush_interval_ms > 0, the buffer is flushed whichever comes first — buffer full OR interval elapsed. The interval acts as a latency bound ensuring data reaches disk even under low write rates.

  1. During WAL replay after a crash with buffer_size > 0 and sync=false: the WAL may contain BlobIndex entries whose blob data was still buffered (not flushed to disk). On recovery, orphan blob file recovery in DB::Open() scans for unregistered blob files and validates their records. BlobIndex entries pointing to missing/incomplete blob data will result in read errors (the blob offset/size won't be found in the recovered file). This is the crash safety tradeoff documented in the CRASH SAFETY comment. Use sync=true or buffer_size=0 to avoid this window. Will add clearer documentation. Will be addressed in the next commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 2. looks like the current behavior is:

WAL replay reconstructs the memtable normally. It sees kTypeBlobIndex entries and inserts them as-is — WAL replay doesn't validate that the referenced blob data exists. The BlobIndex entries are structurally valid (correct format, valid file_number/offset/size), so they pass all memtable insertion checks.

Could you add a blobindex validation during wal replay to avoid whether the blob survived crash. If not, throw the entry away. This improve the recovery consistency of the data.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will add BlobIndex validation during WAL replay — when a kTypeBlobIndex entry is replayed, validate that the referenced blob file exists and the offset/size are within bounds. Invalid references should be logged as warnings (or errors depending on severity). Will be addressed in the next commit.

// Only used when enable_blob_direct_write = true.
//
// Default: false
bool blob_direct_write_use_direct_io = false;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a new flag to control the behavior independently? Why not just use the existing direct_io option value in RocksDB and update its comment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good point. The existing use_direct_io_for_flush_and_compaction in DBOptions controls direct I/O for flush/compaction blob file writes. Blob direct write files serve a similar role (write-path blob files). We could reuse that option instead of adding a separate flag. The rationale for a separate flag was that direct-write blob files have different access patterns (small buffered writes vs large sequential flush writes), but the simplicity of reusing the existing option is compelling. Will investigate removing blob_direct_write_use_direct_io and reusing use_direct_io_for_flush_and_compaction. Will be addressed in the next commit.

// blob_direct_write_buffer_size > 0.
//
// Default: 0 (disabled)
uint64_t blob_direct_write_flush_interval_ms = 0;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is only useful when WAL is enabled. If so, similar to above, is there an existing RocksDB configuration for WAL flush interval? If so, could we reuse that value and just update its comment, instead of adding a new configuration?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: This flush interval is specific to blob direct write buffers — it controls how often the in-memory blob record buffer is flushed to the blob file on disk. It is not related to WAL flushing. There is no existing RocksDB configuration that serves this purpose. The closest is wal_bytes_per_sync (periodic WAL sync), but that controls WAL syncing, not blob buffer flushing. The blob buffer flush is needed because the deferred flush model holds blob records in memory and the interval prevents them from sitting there indefinitely under low write rates. That said, per your earlier comment, we should reconsider whether this option is truly needed as a separate config or if the buffer high-water mark + backpressure is sufficient. Will be addressed in the next commit.

// Custom partition strategy for blob direct writes. Controls which
// partition a blob is assigned to based on key and value content.
// If nullptr, uses the default round-robin strategy.
// Only used when enable_blob_direct_write = true.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not true. when enable_blob_direct_write is false, we could still use this during flush.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Correct — the partition strategy should also apply during flush when partitioned blob files are enabled without direct write. Will update the comment to remove the "Only used when enable_blob_direct_write = true" restriction. The strategy applies whenever partitioned blob files are active, regardless of the write path. Will be addressed in the next commit.

Copy link
Copy Markdown
Contributor Author

@xingbowang xingbowang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing review feedback

return save.commit();
}

Status WriteBatchInternal::PutEntityRaw(WriteBatch* b,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, this seems could be replaced with a simpler solution.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Agreed. Since the transformer just passes wide column entities through unchanged (no blob separation for entities yet), we could avoid PutEntityRaw entirely by using a simpler approach — e.g., directly copying the raw bytes from the input batch to the output batch for entity entries, or using the existing PutEntity with a single default-column WideColumns containing the pre-serialized entity. Will investigate the simplest solution and address in the next commit.

mutable_blob_file_metas_.emplace(
blob_file_number, MutableBlobFileMetaData(std::move(shared_meta)));

// Link existing SSTs that reference this blob file (via
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR validate blob files are GCed properly in unit test?
Please check both active and passive GC are properly test in unit test with partitioned direct blob write.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Partially. The existing db_blob_compaction_test.cc has a parameterized DBBlobCompactionTestWithDirectWrite variant that runs all compaction tests (including CompactionReadaheadGarbageCollection which tests active GC with enable_blob_garbage_collection=true and blob_garbage_collection_age_cutoff=1.0) with direct write enabled.

However, there is no dedicated test in db_blob_direct_write_test.cc that explicitly validates:

  1. Active GC (compaction with enable_blob_garbage_collection) properly reclaims direct-write blob files
  2. Passive GC (obsolete blob files cleaned up after all referencing SSTs are deleted)
  3. That the SST-to-blob-file linkage (oldest_blob_file_number) is correctly set for direct-write blobs

The DeleteAndReput test does a CompactRange after overwrites but does not verify blob file count/garbage metrics. Will add dedicated GC validation tests. Will be addressed in the next commit.

Comment on lines +1354 to +1356
// added blob files (from orphan recovery) that may not have SST links
// yet. This is a consequence of not registering blob files in MANIFEST
// at creation time.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there unit test validating this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: The orphan recovery path is tested indirectly via DBBlobDirectWriteTest.WALRecovery, RecoveryAfterFlush, RecoveryWithoutFlush, RecoveryWithRotation, RecoveryWithRotationNoFlush, and MultiCFOrphanRecovery — these crash/reopen scenarios exercise the code path in DB::Open() that registers orphan blob files, which then flows through VersionBuilder::ApplyApplyBlobFileAdditionSaveTo (including this bypass).

However, there is no isolated unit test in version_builder_test.cc that directly validates the has_new_blob_files bypass logic (adding blob file additions without linked SSTs and verifying they survive SaveTo). Will add one. Will be addressed in the next commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: The orphan recovery path is tested indirectly via WALRecovery, RecoveryAfterFlush, RecoveryWithoutFlush, RecoveryWithRotation, RecoveryWithRotationNoFlush, and MultiCFOrphanRecovery tests. However, there is no isolated unit test directly validating the has_new_blob_files bypass logic in SaveTo. Will add one. Will be addressed in the next commit.

flush_scheduler_.Clear();
trim_history_scheduler_.Clear();

// Seal any open blob partition files. The blob files were already
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of wait until flush to seal the blob file, we could seal the blob file through a background process the moment, its memtable become immutable. this could reduce the amount of data loss due to crash recovery.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good idea. Currently blob files are sealed during flush (in FlushMemTableToOutputFileSealAllPartitions), which means the window between memtable becoming immutable and the actual flush completing leaves blob data in unsealed files.

Sealing eagerly when a memtable switches to immutable (e.g., via SwitchMemtable or the FlushScheduler) would reduce the crash recovery window — sealed blob files have complete headers/footers and checksums, making orphan recovery more reliable. It would also allow the sealed blob file additions to be registered in the MANIFEST sooner.

The main consideration is that sealing triggers file close + sync I/O, so doing it on the write path (during memtable switch) could add latency. A background thread that seals asynchronously after the memtable becomes immutable would avoid this. Will investigate this optimization. Will be addressed in a follow-up commit.

const Version* version, const SequenceNumber& sequence,
ReadCallback* read_callback, ReadOnlyMemTable* active_mem,
ColumnFamilyHandleImpl* cfh = nullptr, bool expose_blob_index = false,
Arena* arena = nullptr, BlobFileCache* blob_file_cache = nullptr,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the partitioned blob file flush gets buffered, the read-after-write consistency requires blob cache. What if the blob cache is not set, like a null pointer? Will the code handle it properly? Which means it will always flush?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good question. When blob cache is null (not configured), the deferred flush buffer still provides read-after-write consistency via GetPendingBlobValue() — this is Tier 1 in ResolveBlobDirectWriteIndex. The pending record lookup is an in-memory hash map check, independent of blob cache. Once the buffer is flushed to disk, the blob becomes readable via the normal blob file read path (Tier 2/3). So blob cache is not required for correctness — it is an optimization for repeated reads. Without blob cache, reads after flush go through Version::GetBlob() or direct BlobFileCache reads. The code handles null blob_cache by skipping cache prepopulation and cache lookups.


void BlobFilePartitionManager::DumpTimingStats() const {}

Status BlobFilePartitionManager::ResolveBlobDirectWriteIndex(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we short circuit this check with the max sealed blob file number, and then at the same time we always track the max sealed blob file number whenever a flush is completed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good optimization idea. We could maintain a max_sealed_blob_file_number_ atomic that gets updated after each successful flush/seal. Then in ResolveBlobDirectWriteIndex, if blob_idx.file_number() <= max_sealed_blob_file_number_, we know the file has been sealed and registered in the version, so we can skip the pending record check (Tier 1) and go directly to Version::GetBlob() (Tier 2). This would eliminate the RWMutex read lock + hash map lookup for the common case where blobs are already on disk. Will investigate adding this optimization. Will be addressed in the next commit.

const BlobIndex& blob_idx, const Version* version,
BlobFileCache* blob_file_cache, BlobFilePartitionManager* partition_mgr,
PinnableSlice* blob_value) {
// Tier 1: Check unflushed pending records (deferred flush mode).
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? I remember we have added the pending flush to blob cache, isn't it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good question. Yes, we do prepopulate the blob cache with the uncompressed value at write time (when prepopulate_blob_cache = kFlushOnly), which covers the hot-read-after-write case. However, this ResolveBlobDirectWriteIndex read path is needed for a different scenario: when a reader issues a Get() for a key whose blob is still in the deferred flush buffer (not yet written to disk). The blob cache prepopulation happens with the uncompressed value, but the pending records hold compressed data that needs decompression. The tiered fallback handles: (1) pending/unflushed records, (2) version-based reads for sealed files, (3) BlobFileCache for unsealed files not in the version. If blob cache always has the value, tier 1 would be skipped via cache hit before reaching here. But cache eviction or missing prepopulation means we still need the pending record fallback.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check blob cache first, as it is faster? See analysis here.

Read path order
The current order in ResolveBlobDirectWriteIndex is:

Tier 1: Unflushed pending buffer — GetPendingBlobValue()
Tier 2: Version::GetBlob() — which internally calls BlobSource::GetBlob(), which checks blob cache first, then reads from disk
Tier 3: Direct BlobFileCache read (unsealed files)
Tier 4: Retry pending buffer
So no, the blob cache is NOT checked first. The pending buffer is checked before the blob cache. The order is: pending buffer → blob cache → disk.

Performance comparison
GetPendingBlobValue cost (Tier 1):

ReadLock on file_partition_mutex_ (pthread rwlock acquire/release)
Hash lookup in file_to_partition_ (unordered_map)
MutexLock on per-partition mutex
Hash lookup in pending_index (unordered_map)
If found: string copy of the value (raw_value = *it->second.data)
If compressed: full decompression
Blob cache lookup cost (inside Tier 2 via BlobSource):

Compute CacheKey (hash of db_id + session_id + file_number + offset)
LRU cache Lookup() (sharded hash table, one shard mutex)
If found: pin the cached entry (ref-count increment, no copy)
The blob cache path is more efficient when the value is present:

1 mutex (cache shard) vs 2 mutexes (rwlock + partition mutex)
Zero-copy pin vs string copy + potential decompression
Cache lookup is a well-optimized hot path in RocksDB
The pending buffer is checked first because it's needed for correctness — a value that hasn't been flushed to disk yet won't be readable through any other tier. But when prepopulate_blob_cache is enabled, the value exists in both the pending buffer and the blob cache simultaneously. In that scenario, checking the blob cache first would be faster (one lock, zero-copy) and the pending buffer check is wasted work.

A potential optimization when blob cache prepopulation is enabled: check the blob cache before the pending buffer, and only fall back to the pending buffer on cache miss (eviction). This would make the common case (cache hit) faster at the cost of a slightly slower rare case (cache miss requiring two lookups instead of one).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good point. Will reorder ResolveBlobDirectWriteIndex to check blob cache (via Version::GetBlob which checks cache internally) before the pending buffer check, since cache lookup is faster than mutex + hash map. Will be addressed in the next commit.

result.blob_direct_write_partitions);
result.blob_direct_write_partitions = 64;
}
constexpr uint64_t kMaxBufferSize = 256ULL * 1024 * 1024; // 256MB
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too much, let's do 16MB.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Agreed. Will change the max buffer size cap from 256MB to 16MB. Will be addressed in the next commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's increase the max to 64MB, just for high latency remote network storage device.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will change max buffer cap to 64MB (was set to 16MB, reviewer wants 64MB for high-latency remote storage). Will be addressed in the next commit.

}

PERF_TIMER_GUARD(write_pre_and_post_process_time);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't randomly add empty line on the code blocks that are not changed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will remove the spurious empty line. Will be addressed in the next commit.


// Fast path for blob direct write: write blob value directly to blob file
// and build a WriteBatch with only the ~30 byte BlobIndex entry.
// This avoids serializing the full value into WriteBatch rep_ (saves a 4KB
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4KB is too specific.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will remove the specific "4KB" reference from the comment — the actual savings depend on value size. Will be addressed in the next commit.

// and build a WriteBatch with only the ~30 byte BlobIndex entry.
// This avoids serializing the full value into WriteBatch rep_ (saves a 4KB
// memcpy) and skips TransformBatch in WriteImpl (saves iteration overhead).
if (blob_partition_manager_ != nullptr && !o.disableWAL) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

direct blob write should work with/without WAL.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Agreed. The !o.disableWAL check is wrong — blob direct write should work regardless of WAL setting. Without WAL, there is more data loss risk on crash (blob files written but no WAL to replay BlobIndex entries), but that is acceptable for some workloads. Will remove the disableWAL guard from both the Put fast path and the WriteImpl batch path. Will be addressed in the next commit.

// path which builds a BlobIndex-only batch directly).
std::optional<WriteBatch> transformed_batch_storage;
if (my_batch != nullptr && blob_partition_manager_ != nullptr &&
my_batch->HasPut() && !write_options.disableWAL) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, direct blob write work with/without WAL. Without WAL, it just has more data loss on crash, which works for some of the workload.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Same as above — will remove the !write_options.disableWAL check from WriteImpl. Blob direct write works with or without WAL. Will be addressed in the next commit.

Comment on lines +575 to +590
if (!blob_s.ok()) {
return blob_s;
}
if (transformed) {
if (!blob_partition_manager_->IsDeferredFlushMode()) {
blob_s = blob_partition_manager_->FlushAllOpenFiles(write_options);
if (!blob_s.ok()) {
return blob_s;
}
}
my_batch = &*transformed_batch_storage;
if (write_options.sync) {
blob_s = blob_partition_manager_->SyncAllOpenFiles(write_options);
if (!blob_s.ok()) {
return blob_s;
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any of the step go wrong here, could there be memory leak? Are the resource cleaned up properly? How does the caller handle this?

There are no memory leaks on any of these error paths. All C++ objects in play use RAII:

transformed_batch_storage (std::optional) — stack-allocated, destructor runs automatically on return
blob_s (Status) — stack-allocated
settings_provider (lambda) — stack-allocated
The stack unwinds cleanly on every early return.

However, there's a more subtle concern: orphaned blob data. Consider the failure sequence:

Line 572-574 — TransformBatch succeeds: blob values have been written to open blob files, and the transformed batch contains BlobIndex entries referencing them.
Line 580 — FlushAllOpenFiles fails, or line 587 — SyncAllOpenFiles fails.
The function returns an error — the WriteBatch with BlobIndex entries is never committed to WAL/memtable.
Now you have blob data sitting in open blob files that no key references. Those bytes are effectively wasted space. But this isn't a persistent problem because:

The blob files are still open and tracked by the partition manager.
When the files are eventually sealed (on next successful flush, shutdown, etc.), those unreferenced blob records become dead weight in the blob file but don't cause correctness issues — no BlobIndex points to them, so they're simply never read.
Compaction/GC will eventually reclaim the space when the blob file is garbage collected.
Similarly, if TransformBatch itself partially fails on line 575 (some blobs written, then an error on a later entry), the same orphaned-data situation applies.

So: no memory leaks, no correctness bugs, but a minor space amplification until GC cleans up the unreferenced blob records.

For orphaned blob data, is the total data counter updated correctly? How is it updated? If it is not updated correctly, then garbage tracking may have a bug and would not be able to do GC correctly, right?

Now let me check how GC decides which blob files to collect — whether it uses the blob_count/total_blob_bytes from BlobFileAddition or reference-counts from SSTs.

Now I have the full picture. Here's the answer:

You're raising an excellent point. Let me walk through the GC mechanism.

How blob GC tracks garbage: RocksDB's blob GC uses a differential approach via BlobGarbageMeter. During compaction, it counts blob references going into the compaction (inflow) vs. coming out (outflow). The difference is recorded as AddBlobFileGarbage(file_number, garbage_count, garbage_bytes) in the VersionEdit. These garbage counters are accumulated in SharedBlobFileMetaData alongside the total_blob_count/total_blob_bytes that were set when the blob file was registered via BlobFileAddition.

The force-GC threshold check (version_set.cc line 4181):

version_set.cc
Lines 4181-4184
if (sum_garbage_blob_bytes <
blob_garbage_collection_force_threshold * sum_total_blob_bytes) {
return;
}
This compares accumulated garbage_blob_bytes against total_blob_bytes.

The problem with orphaned blob data: When TransformBatch succeeds but the write fails to commit (lines 575-590 error paths), the blob data is written and the partition's blob_count/total_blob_bytes are incremented (line 874-878). When the blob file is eventually sealed, these inflated counters flow into the BlobFileAddition and become total_blob_bytes in the MANIFEST.

However, since no SST ever contains a BlobIndex pointing to these orphaned records:

No SST links to them — linked_ssts won't include any SST referencing these blobs, so they won't be seen during compaction inflow/outflow tracking
Garbage is never recorded — BlobGarbageMeter only sees blob references that flow through compaction. Orphaned records were never in any SST, so they never appear as inflow, and thus never get counted as garbage
The ratio garbage_bytes / total_blob_bytes is artificially low — total_blob_bytes is inflated by the orphaned data, but garbage_bytes doesn't account for it
So yes, you've identified a real accounting issue: orphaned blob records inflate total_blob_bytes without a corresponding path to ever increment garbage_blob_bytes, making the garbage ratio appear lower than it actually is. This could delay GC triggering. The blob file would still eventually be collected (once all real references are compacted away and the file has no linked_ssts), but the force-threshold heuristic would be less accurate.

That said, this is a narrow edge case in practice — it only occurs when blob writes succeed but the subsequent flush/sync in WriteImpl fails, which is an I/O error scenario where the system likely has bigger problems.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Excellent analysis. You are correct on all points.

Memory safety: No leaks — all objects are RAII/stack-allocated and clean up on return.

Orphaned blob data GC accounting: This is a real issue. When TransformBatch succeeds but the write fails to commit, the orphaned blob records inflate total_blob_bytes in BlobFileAddition without any corresponding garbage_blob_bytes increment (since no SST ever referenced them, BlobGarbageMeter never sees them in compaction inflow/outflow). This makes the garbage_bytes / total_blob_bytes ratio artificially low, potentially delaying force-GC triggering.

Possible mitigations:

  1. Track uncommitted blob writes: Maintain a counter of bytes written but not committed. On write failure, subtract from partition counters before seal. This keeps total_blob_bytes accurate.
  2. Rollback on failure: On TransformBatch success but write failure, mark the blob records as "rolled back" so they are excluded from BlobFileAddition counts. This is more complex since the records are already in the blob file — would need a tombstone/skip mechanism.
  3. Accept the inaccuracy: As noted, this is a narrow edge case (I/O error during flush/sync after successful blob write), and the blob file will still be GC-ed eventually when all real references are compacted away.

Option 1 seems most practical. Will add a TODO comment noting this accounting gap and consider implementing the counter approach. Will be addressed in the next commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point on write-batch abort, or transaction abort. We should implementing No.1: Track uncommitted blob writes. It is very important to make sure total_blob_bytes is accurate.

These are critical correctness issues. Please add unit test to verify thoroughly with all edge case covered after the fix.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Agreed — tracking uncommitted blob writes is critical for accurate total_blob_bytes. With the new WAL-replay-based recovery redesign, this becomes less urgent (recovery re-creates blob files with accurate counts), but for the runtime path (write failure mid-batch), we still need to track and rollback blob counts. Will implement as part of the recovery redesign. Will be addressed in the next commit.

// and sealed-but-unregistered blob files must be recovered to prevent
// data loss when WAL replay creates BlobIndex entries pointing to them.
if (s.ok()) {
{
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this double bracket here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will remove the unnecessary double bracket. Will be addressed in the next commit.

} else {
persist_options_status.PermitUncheckedError();
}
// Discover and register blob files from blob direct write that aren't
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the new recovery logic added for orphan blob file below, does the PR add enough unit test to cover every if/else branch? Make sure all of them are covered properly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will audit all if/else branches in the orphan recovery logic and add tests for uncovered paths. Current tests (WALRecovery, RecoveryAfterFlush, RecoveryWithoutFlush, RecoveryWithRotation, MultiCFOrphanRecovery, OrphanBlobFileSurvivesSaveTo) cover the main paths but may miss edge cases like corrupt headers, zero-size files, or files already registered. Will add comprehensive branch coverage tests. Will be addressed in the next commit.

continue;
}

// Read blob file header to determine the correct column family.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the new recovery logic is always executed, what would happen to normal blobdb feature, when the new feature is not enabled. Let's say during flush, a blob file was half written. Before the change, the blob file is discarded. After this change, the half written blob file will be added to CF. Will it be GCed properly? Do we have a unit test for this?

Good question. Let me walk through the scenario step by step.

Scenario: Normal BlobDB (direct write disabled), crash during flush leaves a half-written blob file on disk.

What happens with the new orphan recovery code:

On DB::Open(), the recovery code finds the half-written .blob file not in the MANIFEST
It has a valid header (normal BlobDB blob files use the same BlobLogHeader)
It's registered via LogAndApply with a scanned blob_count and total_blob_bytes
Now the blob file is in the MANIFEST with zero linked SSTs — because in normal BlobDB, the memtable has regular Put entries (not BlobIndex), so the next flush creates entirely new blob files
Will it be GC'd? There's a safety net in VersionBuilder::AddBlobFileIfNeeded (line 1328-1331):

version_builder.cc
Lines 1328-1331
if (linked_ssts.empty() &&
meta->GetGarbageBlobCount() >= meta->GetTotalBlobCount()) {
return;
}
This drops a blob file from the version when it has zero linked SSTs AND garbage >= total. But for a freshly registered orphan file, garbage_blob_count is 0 and total_blob_count is > 0. So this condition is not met — the file stays in the version.

The problems:

Assertion failure in debug builds: ComputeFilesMarkedForForcedBlobGC at line 4165 has assert(!linked_ssts.empty()) on the oldest blob file. If this orphaned file happens to be the oldest, it crashes in debug mode.

Permanent space leak in release builds: The blob file has zero linked SSTs, so no compaction can ever produce AddBlobFileGarbage entries for it. The GC condition (garbage >= total) is never satisfied. The file persists in the MANIFEST forever.

No test for this: There's no test that exercises normal BlobDB with a crash during flush after this change.

This looks like a real regression. The orphan recovery code should have a way to distinguish direct-write blob files from normal BlobDB blob files, or at the very least, should only register orphan files if there are BlobIndex entries in the WAL/memtable that reference them. Without that, any half-written blob file from a normal flush crash gets permanently stuck.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Critical point. The current recovery logic runs unconditionally, which means a half-written blob file from a normal flush crash (no direct write) would be picked up by orphan recovery and potentially registered with incorrect record counts. This could cause correctness issues. We should either: (1) only run orphan recovery when enable_blob_direct_write was previously enabled (track this in MANIFEST or a marker file), or (2) add validation that skips files with incomplete records/missing footer in the recovery scan. Option 2 is safer since it handles the case where the feature was enabled then disabled. Will add proper validation and a unit test for this scenario. Will be addressed in the next commit.

Comment on lines +2753 to +2754
fs_s = impl->immutable_db_options_.fs->NewRandomAccessFile(
blob_path, FileOptions(), &rfile, nullptr);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice, if we could just create 1 FSRandomAccessFile for reading file content and reuse it. Instead of create 1 sequencial at line 2711 and 1 random access here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good suggestion. Will refactor to use a single FSRandomAccessFile for both the header read (currently sequential at line 2711) and the record scanning (random access at line 2754). This avoids opening the file twice. Will be addressed in the next commit.

Comment on lines +2758 to +2767
fs_s = rfile->Read(file_size - BlobLogFooter::kSize,
BlobLogFooter::kSize, IOOptions(),
&footer_slice, &footer_buf[0], nullptr);
if (fs_s.ok() && footer_slice.size() == BlobLogFooter::kSize) {
BlobLogFooter footer;
has_footer = footer.DecodeFrom(footer_slice).ok();
if (has_footer) {
footer_blob_count = footer.blob_count;
}
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is not right. The blob value could contain any random data from user. What if the random data matches the footer format by accident? It would cause data corruption. We should not parse it in this way. Instead, we should scan from the beginning of the file, parsing one blob record at a time following blob file format. E.g. it would parse following data sequentially, also validate their checksum along the way.

| Record X |
+-----------------------------+
| key length (Fixed64) |
| value length (Fixed64) |
| expiration (Fixed64) |
| header CRC (Fixed32) |
| blob CRC (Fixed32) |
| key (key_len) |
| value (value_len) |

With this being said, maybe we should reuse FSSequentialFile created above, instead of creating a new FSRandomAccessFile.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this way, total_blob_bytes would be aggregated correctly as the file is scanned through.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: You are absolutely right — this is a correctness bug. Parsing from the end looking for a footer is unsafe because user blob data could accidentally match the footer format. The correct approach is to scan from the beginning of the file after the header, parsing blob records sequentially (key length, key, value length, value, CRC) and validating checksums along the way. This gives us an accurate record count and total bytes, and naturally handles truncated files (the scan stops at the first incomplete/corrupt record). Will rewrite the orphan recovery to use forward sequential scanning with checksum validation. Will be addressed in the next commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Confirmed — the forward sequential scan now aggregates total_blob_bytes as each record is parsed and validated. This is already implemented in Batch A.

s = blob_partition_manager_->SealAllPartitions(write_options,
&write_path_additions);
if (s.ok() && !write_path_additions.empty()) {
jobs[0]->AddExternalBlobFileAdditions(std::move(write_path_additions));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed before, this needs to be fixed. We need per CF blob_partition_manager_, so that the blob files are added to the right CF.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Agreed. CC round 2 already refactored to per-CF BlobFilePartitionManager (stored in ColumnFamilyData). The FlushMemTableToOutputFile and AtomicFlush paths now seal the flushing CFs manager. This change is in the current working tree and will be in the next commit.

Comment on lines +2716 to +2733
// Resolve blob index from immutable memtable if needed.
if (s.ok() && is_blob_index && is_blob_ptr == &is_blob_index &&
get_impl_options.value) {
BlobIndex blob_idx;
s = blob_idx.DecodeFrom(*(get_impl_options.value->GetSelf()));
if (s.ok()) {
get_impl_options.value->Reset();
BlobFileCache* blob_cache =
blob_partition_manager_ ? static_cast<ColumnFamilyHandleImpl*>(
get_impl_options.column_family)
->cfd()
->blob_file_cache()
: nullptr;
s = ResolveBlobIndexForWritePath(
read_options, key, blob_idx, sv->current, blob_cache,
blob_partition_manager_.get(), get_impl_options.value);
}
is_blob_index = false;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code block looked exactly same as the one from 2683 to 2700. Could we extract a common function or lambda to dedup this to make this easier to maintain.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will extract a common helper function/lambda to deduplicate the blob index resolution logic at lines 2683-2700 and 2716-2733. Will be addressed in the next commit.

} else {
lookup_current = false;
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are bunch of testing Gap for GET and MultiGET. Please add unit tests to cover them.

Get path gaps:

Immutable memtable read — No test explicitly reads from the immutable memtable (switch memtable via TEST_FlushMemTable with wait=false or sync point to pause flush, then Get). The resolve block at lines 2716-2733 is only implicitly covered if a flush happens to be in progress during other tests.

GetEntity / wide column read — The resolve logic at lines 2684-2685 checks get_impl_options.value but doesn't handle get_impl_options.columns. If GetEntity is called on a key with a BlobIndex in the memtable, the blob index won't be resolved. No test covers this.

GetMergeOperands — Not tested with blob direct write. The code at lines 2740-2757 uses is_blob_index = nullptr for the merge operand path, so blob indices wouldn't be detected there.

Deferred flush pending records read — The 4-tier fallback (GetPendingBlobValue → Version → BlobFileCache → retry) is only indirectly exercised by ConcurrentReadersAndWriters. No isolated test verifies each tier individually.

BlobIndex decode corruption — No test verifies that a corrupted BlobIndex in the memtable returns a proper error status through Get.

MultiGet path gaps:

Immutable memtable MultiGet — As noted, no test reads from immutable memtable via MultiGet.

Mixed source MultiGet — No test where some keys are in memtable and others in SST, verifying both resolve paths work together in a single MultiGet call.

Multi-CF MultiGet — No MultiGet test spanning multiple column families with blob direct write.

MultiGet decode corruption — No test for DecodeFrom failure at line 3497 in the MultiGet path.

MultiGet with GetEntity — Same as point 2 but for the batched path.

The most concerning gap is #2 (GetEntity/wide columns) — that's not just a missing test, it's likely a missing code path entirely. The resolve blocks only handle get_impl_options.value, not get_impl_options.columns.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will add comprehensive tests covering the identified gaps:

  1. Immutable memtable read — switch memtable without waiting for flush, then Get
  2. GetEntity / wide column read — verify resolve logic for wide columns with direct write
  3. MultiGet blob resolution — test MultiGet with mix of blob and non-blob keys from direct write
  4. Error path coverage — inject errors in blob file read to verify error propagation

Will be addressed in the next commit.

// db_impl_open.cc reconciles unregistered blob files. This creates file
// number gaps and relies entirely on orphan recovery for crash consistency.
// Consider registering blob files in the MANIFEST at creation time with
// a "pending" state to eliminate the need for orphan recovery.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this comment, it is too complicated to implement this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will remove the "Consider registering blob files in the MANIFEST at creation time with a pending state" comment. Will be addressed in the next commit.

// complexity: BG thread pool, pending/in-flight record tracking, 4-tier
// read fallback, and backpressure logic. For large values (64KB+), the
// per-record syscall overhead is proportionally small. The sync-only path
// (buffer_size=0) is approximately 15 lines.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

15 lines is too specific, just say it is significantly simpler

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will change to "significantly simpler". Will be addressed in the next commit.

// - Read path checks pending records for unflushed data
//
// The deferred flush model (~500+ lines) provides significant syscall
// reduction for small values (e.g., 250x for 4KB values) but adds
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

250x and 4KB are too specific. just remove it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will remove the specific numbers. Will be addressed in the next commit.

Comment on lines +113 to +122
CompressionType blob_compression_type = kNoCompression,
uint64_t buffer_size = 0, bool use_direct_io = false,
uint64_t flush_interval_ms = 0,
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
FileChecksumGenFactory* file_checksum_gen_factory = nullptr,
const FileTypeSet& checksum_handoff_file_types = {},
BlobFileCompletionCallback* blob_callback = nullptr,
const std::string& db_id = "", const std::string& db_session_id = "",
Logger* info_log = nullptr);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove the default values for arguments. If they are only used in test, just pass them in test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will remove all default argument values from the BlobFilePartitionManager constructor and update all call sites (production and test) to pass values explicitly. Will be addressed in the next commit.

Status SealAllPartitions(const WriteOptions& write_options,
std::vector<BlobFileAddition>* additions);

void TakeCompletedBlobFileAdditions(std::vector<BlobFileAddition>* additions);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment for the function

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will add a doc comment for TakeCompletedBlobFileAdditions. Will be addressed in the next commit.


uint64_t blob_direct_write_buffer_size;

uint64_t blob_direct_write_flush_interval_ms;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need something similar for sync as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will add a bool blob_direct_write_sync (or reuse use_fsync from ImmutableDBOptions) to control whether blob direct write files are synced after each flush. Will be addressed in the next commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ok. Let's leave this as it is. We can address this in another PR

Comment on lines +192 to +193
// WARNING: Dynamic option changes via SetOptions() are NOT reflected here.
// Affected options: enable_blob_direct_write, min_blob_size,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some of the setting should be able to support dynamic update, such as
uint64_t min_blob_size = 0;
CompressionType compression_type = kNoCompression;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Agreed. min_blob_size and compression_type are per-CF mutable options that should support dynamic update via SetOptions(). Currently the cached settings snapshot is only built at DB open. To support dynamic updates, we need to hook into the SetOptions() callback and call UpdateCachedSettings() when these change. The partition manager already has the UpdateCachedSettings mechanism — we just need to wire it to the options change path. Will be addressed in the next commit.

Comment on lines +225 to +226
// Dump per-operation timing breakdown to stderr (for benchmarking).
void DumpTimingStats() const;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean this up

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will clean up. Will be addressed in the next commit.


private:
// A pending blob record waiting to be flushed to disk.
// Owns the key and value data. Use std::list so that pointers into
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be deque

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will change from list to deque. Will be addressed in the next commit.

Comment on lines +293 to +299
// Per-partition pending blob index for O(1) read-path lookup by
// (file_number, blob_offset). Protected by this partition's mutex,
// eliminating the global serialization point that a shared index would
// create across all partitions.
std::unordered_map<PendingBlobKey, PendingBlobValueEntry,
PendingBlobKeyHash>
pending_index;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably the most tricky part of this new PR. Please see below analysis, and refine the documentation of the code to improve clarify and remove the stale function and abandoned design.

PR 14457: pending_index Concurrency Analysis

Overview

The pending_index in BlobFilePartitionManager::Partition is a per-partition
hash map that enables O(1) read-path lookups for blob values that have been
buffered in memory but not yet flushed to disk (deferred flush mode). It maps
(file_number, blob_offset) to a raw const std::string* pointer into
PendingRecord::value.

The most subtle aspect of this design is that pending_index entries
intentionally outlive their source container during mutex-free background
I/O. This document traces the full lifecycle and identifies documentation gaps.

Pending Index Entry Lifecycle

Phase 1: Creation (under partition mutex)

WriteBlob() acquires the partition mutex and calls WriteBlobDeferred():

// blob_file_partition_manager.cc:648-657
partition->pending_records.push_back(
    {std::move(key_copy_), std::move(value_copy_), fn, *blob_offset});

partition->pending_index[{fn, *blob_offset}] = {
    &partition->pending_records.back().value, partition->compression};

At this point:

  • The PendingRecord is owned by the partition's pending_records deque.
  • The pending_index entry stores a raw pointer to the record's value string.
  • std::deque::push_back does not invalidate pointers to existing elements,
    so previously-created index entries remain valid.

Phase 2: Background Flush — Move Under Mutex, I/O Without It

FlushPendingRecords() (called by the BG thread) does a three-step dance:

Step 1 (mutex HELD):
  - Move pending_records → local `records` variable (O(1) deque move)
  - Clear pending_records
  - Grab raw pointer to writer
  - DO NOT remove pending_index entries

Step 2 (mutex RELEASED):
  - FlushRecordsToDisk() — actual disk I/O using local `records`
  - OS flush

Step 3 (mutex RE-ACQUIRED via RemoveFromPendingIndex):
  - Remove all flushed entries from pending_index
  - Signal pending_cv

The critical invariant: During Step 2, pending_index entries still exist
and their const std::string* pointers are valid. This works because:

  1. std::deque move construction transfers internal block pointers — element
    addresses remain stable. The C++ standard guarantees that references,
    pointers, and iterators to elements of the moved-from container remain
    valid and now refer to elements in the new container.

  2. The local records variable owns the PendingRecord objects and their
    value strings. It is not destroyed until after RemoveFromPendingIndex
    completes.

  3. This means concurrent readers (via GetPendingBlobValue) can still find
    and copy values from the index while the BG thread is doing I/O.

Phase 3: Read Path (under partition mutex)

GetPendingBlobValue() looks up in the index and copies the string:

// blob_file_partition_manager.cc:720-728
{
  MutexLock lock(&partition->mutex);
  auto it = partition->pending_index.find({file_number, offset});
  if (it == partition->pending_index.end()) {
    return Status::NotFound();
  }
  raw_value = *it->second.data;     // copies the string
  compression = it->second.compression;
}

The copy is essential: after releasing the mutex, the BG thread could complete
its flush and destroy the local records variable, freeing the string data.

Phase 4: Cleanup (under partition mutex)

After I/O completes, RemoveFromPendingIndex re-acquires the partition mutex
and erases all flushed entries:

// blob_file_partition_manager.cc:686-690
void BlobFilePartitionManager::RemoveFromPendingIndex(
    Partition* partition, const std::deque<PendingRecord>& records) {
  MutexLock lock(&partition->mutex);
  RemoveFromPendingIndexLocked(partition, records);
}

Only after this returns does the local records destructor run, freeing string
data. The ordering is guaranteed by sequential execution within the BG thread.

The Race That Tier 4 Covers

The 4-tier read fallback in ResolveBlobDirectWriteIndex is:

  1. Pending records (unflushed deferred data in partition manager)
  2. Version::GetBlob (standard path for registered blob files)
  3. BlobFileCache (direct read for unregistered files)
  4. Retry pending records (safety net for BG flush race)

Tier 4 exists because of this specific race:

Time ──────────────────────────────────────────────────────►

BG Thread:  ... FlushRecordsToDisk ...  RemoveFromPendingIndex  (data on disk)
                                              │
Reader:                          Tier 1 ──────┤──── Tier 2 ──── Tier 3
                                 (miss!)      │     (miss!)      (miss!)
                                              │
                                     Index entry removed,
                                     but file not yet readable
                                     via Version or BlobFileCache

Between RemoveFromPendingIndex (entry gone from index) and the blob file
becoming readable through BlobFileCache or Version::GetBlob, there is a
window where all three tiers return NotFound. Tier 4 retries the pending
index as a safety net — by the time the retry executes, either the data has
landed on disk (Tier 2/3 would succeed on a subsequent attempt) or new pending
records have appeared.

Note: Tier 4 may not fully close this race if the file is flushed to disk but
not yet OS-visible. In practice this is mitigated by the OS flush in
FlushPendingRecords happening before RemoveFromPendingIndex.

Documentation Gaps in the Current Code

1. No Consolidated Lifecycle Documentation

The full lifecycle (create → survive deque move → valid during BG I/O → remove
after flush → data freed) is spread across 4 functions in 2 files. No single
comment ties them together.

Existing comments (scattered):

  • Header line 287: "Uses std::deque so that push_back does not invalidate
    pointers" (covers creation, not the move)
  • Header line 293: "Protected by this partition's mutex" (correct but
    incomplete — doesn't mention the outliving pattern)
  • Impl line 959: "Records remain visible via pending_index until
    RemoveFromPendingIndex is called after flush" (closest to a lifecycle
    comment, but doesn't explain why the pointers remain valid)

2. Deque Move Stability Not Documented

The design relies on std::deque move construction preserving element
addresses. This C++ standard guarantee is non-obvious and never stated in the
code. Someone unfamiliar with this guarantee could reasonably conclude the
pointers become dangling after the move.

3. Read-Side Copy Requirement Not Explained

GetPendingBlobValue copies the string under the mutex (raw_value = *it->second.data), but there is no comment explaining why a copy (rather than
a reference or Slice) is required. The reason: the BG thread may free the
data after the reader releases the mutex.

4. Tier 4 Race Not Explained

The 4-tier list in the header (line 212) says "safety net for BG flush race"
but does not describe the specific race condition (the gap between pending
index removal and disk readability).

5. Stale Header Comment

The class-level comment (header lines 87-91) describes a "zero-copy" model
with Slice references into WriteBatch buffer and AdoptBatchBuffer().
The actual implementation pre-copies key and value into std::string outside
the mutex (impl lines 836-841). AdoptBatchBuffer appears only in this
comment, never in actual code. This suggests the design was revised but the
comment was not updated.

6. Inaccurate Container Reference

The PendingRecord struct comment (header line 230) says "Use std::list" but
the actual container is std::deque. Stale from an earlier iteration.

Suggested Fix

Add a consolidated block comment above the pending_index field in the header,
or above FlushPendingRecords in the implementation, describing:

  1. The full entry lifecycle across write/flush/read/cleanup
  2. The deque move stability guarantee that makes it safe
  3. Why readers must copy data under the mutex
  4. The specific race that Tier 4 covers

Fix the stale header comments (lines 87-91 "zero-copy" / "AdoptBatchBuffer",
line 230 "std::list").

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Excellent analysis — all 6 documentation gaps are valid. Will address all of them:

  1. Consolidated lifecycle comment — will add a block comment above pending_index documenting the full create → survive deque move → valid during BG I/O → remove after flush → data freed lifecycle.

  2. Deque move stability — will document the C++ standard guarantee that std::deque move construction preserves element addresses (pointers/references to elements of the moved-from container remain valid in the new container).

  3. Read-side copy requirement — will add comment in GetPendingBlobValue explaining why it copies the string (BG thread may free data after mutex release).

  4. Tier 4 race — will expand the ResolveBlobDirectWriteIndex header comment to describe the specific race window between pending index removal and disk readability.

  5. Stale "zero-copy" / "AdoptBatchBuffer" comment — will fix lines 87-91. The actual implementation pre-copies key/value into std::string outside the mutex. AdoptBatchBuffer was from an earlier design and never implemented. Will update to describe the actual pre-copy approach.

  6. "std::list" → "std::deque" — will fix line 230 stale container reference.

Will be addressed in the next commit.

Comment on lines +312 to +315
void RemoveFromPendingIndex(Partition* partition,
const std::deque<PendingRecord>& records);
void RemoveFromPendingIndexLocked(Partition* partition,
const std::deque<PendingRecord>& records);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comments. Make sure all new functions have comments, except the one override its parent class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will add doc comments to all new functions in blob_file_partition_manager.h that are missing them (private helper functions like RemoveFromPendingIndex, AddFilePartitionMapping, ResetPartitionState, etc.). Will be addressed in the next commit.

Comment on lines +365 to +388
const uint32_t num_partitions_;
std::shared_ptr<BlobFilePartitionStrategy> strategy_;
FileNumberAllocator file_number_allocator_;
FileSystem* fs_;
SystemClock* clock_;
Statistics* statistics_;
FileOptions file_options_;
std::string db_path_;
uint64_t blob_file_size_;
bool use_fsync_;
uint64_t buffer_size_;
uint64_t high_water_mark_;
uint64_t flush_interval_us_; // Periodic flush interval in microseconds.

CompressionType blob_compression_type_;

std::shared_ptr<IOTracer> io_tracer_;
std::vector<std::shared_ptr<EventListener>> listeners_;
FileChecksumGenFactory* file_checksum_gen_factory_;
FileTypeSet checksum_handoff_file_types_;
BlobFileCompletionCallback* blob_callback_;
std::string db_id_;
std::string db_session_id_;
Logger* info_log_;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documents for non-trivial member variables to explain how they are used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for other ones

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will add doc comments for all non-trivial member variables (strategy_, file_number_allocator_, high_water_mark_, flush_interval_us_, blob_compression_type_, etc.). Will be addressed in the next commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will ensure ALL non-trivial member variables have doc comments, not just the ones listed. Will be addressed in the next commit.

Logger* info_log_;

std::vector<std::unique_ptr<Partition>> partitions_;
// RCU-based settings cache: readers load the raw pointer (zero overhead),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing. Why do we need RCU? Early comment mentioned that we don't dynamically update this setting. However, as discussed early, we do want to dynamically update some of the options. Please adjust this accordingly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good point. With per-CF partition managers, the RCU-based SettingsMap is overkill — each manager only serves one CF. Will simplify to a direct BlobDirectWriteSettings member (protected by mutex or atomic) instead of the atomic pointer + copy-on-write map. The dynamic update path (SetOptions -> UpdateCachedSettings) will update this simpler member directly. Will be addressed in the next commit.

Comment on lines +403 to +404
// Uses port::RWMutex (pthread_rwlock_t) instead of std::shared_mutex
// for TSAN compatibility (libc++ std::shared_mutex may lack TSAN
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this detail. not useful

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will remove the unnecessary detail. Will be addressed in the next commit.

// Uses port::RWMutex (pthread_rwlock_t) instead of std::shared_mutex
// for TSAN compatibility (libc++ std::shared_mutex may lack TSAN
// annotations).
using FilePartitionMap = std::unordered_map<uint64_t, uint32_t>;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems over kill for using a using for a trivial unordered_map

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will inline the type instead of using a using alias. Will be addressed in the next commit.

Comment on lines +419 to +424
bool bg_stop_{false};
bool bg_seal_in_progress_{false}; // Prevents BG threads from picking up new
// work during SealAllPartitions.
port::CondVar bg_seal_done_cv_; // Signaled when seal completes.
Status bg_status_; // First error from background thread.
std::atomic<bool> bg_has_error_{false}; // Lock-free check for bg_status_.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR 14457: Synchronization Simplification Analysis

Current Synchronization Inventory

The BlobFilePartitionManager uses 8 mutexes/RWMutexes, 4 condition
variables
, and 4+ atomics:

# Primitive Type Purpose
1 partition->mutex per-partition Mutex Core partition state: writer, pending_records, pending_index, counters, completed_files
2 partition->pending_cv per-partition CondVar Backpressure: wake stalled writers after flush drains pending_bytes
3 bg_mutex_ global Mutex BG thread pool coordination: ready_queue_, stop/seal flags, per-partition bg_queue/bg_in_flight
4 bg_cv_ global CondVar Wake BG threads when new work is submitted
5 bg_drain_cv_ global CondVar Wake SealAllPartitions when in-flight BG work completes
6 bg_seal_done_cv_ global CondVar Wake BG threads when seal finishes and they can resume
7 file_partition_mutex_ global RWMutex Protects file_to_partition_ map (file→partition routing for reads)
8 completed_files_mutex_ global Mutex Dead code — declared but never locked anywhere
9 settings_write_mutex_ global std::mutex RCU copy-on-write for settings updates
10 pending_bytes per-partition atomic Backpressure threshold check (lock-free)
11 flush_queued per-partition atomic Dedup flush submissions to BG thread
12 blobs_written_since_seal_ global atomic Fast-path skip in SealAllPartitions
13 bg_has_error_ global atomic Lock-free BG error check on write path
14 cached_settings_ global atomic ptr RCU settings read path (lock-free)

Plus associated state: retired_settings_ vector, ready_queue_ deque,
bg_threads_ vector, bg_stop_, bg_seal_in_progress_, bg_status_,
per-partition bg_queue, bg_in_flight.

Simplification Opportunities

1. Remove completed_files_mutex_ — dead code

What: completed_files_mutex_ is declared at line 410 of the header but
never locked anywhere in the .cc file. The completed_files vector on each
partition is already protected by partition->mutex.

Evidence: grep -r completed_files_mutex_ returns exactly one hit — the
declaration.

Impact: Eliminates 1 mutex. Zero risk — it's unused.

Effort: Trivial — delete the declaration.


2. Remove the RCU settings apparatus

What: The cached_settings_ atomic pointer, settings_write_mutex_,
and retired_settings_ vector implement an RCU (Read-Copy-Update) pattern
for per-CF blob direct write settings.

Why it's unnecessary: UpdateCachedSettings() is only called during
DBImpl::Open(), which is single-threaded. The comment at line 188 explicitly
states "Called only during DB open (single-threaded, no concurrent readers)."
Furthermore, the WARNING at lines 192-198 explicitly says dynamic option
changes via SetOptions() are NOT reflected — the DB must be restarted.

Since the map is built once during init and never modified, the full RCU
apparatus (atomic pointer, copy-on-write mutex, retired pointer list) is
unnecessary. A simple std::unique_ptr<const SettingsMap> set once during
open would suffice. The happens-before relationship from thread creation
guarantees visibility to all writer threads spawned after Open() returns.

Eliminates: 1 mutex (settings_write_mutex_), 1 atomic
(cached_settings_), retired_settings_ vector (memory leak of intermediate
maps from init loop).

Impact: Slightly simpler reader path (plain pointer dereference vs atomic
load), eliminates N-1 leaked SettingsMap allocations from init loop (where
N = number of column families).

Effort: Trivial — build complete map in Open(), store as unique_ptr.


3. Eliminate file_partition_mutex_ by encoding partition index in BlobIndex

What: The file_to_partition_ map + file_partition_mutex_ (RWMutex)
exists solely so GetPendingBlobValue() can route a (file_number, offset)
lookup to the correct partition's pending_index.

Current read path:

  1. Acquire file_partition_mutex_ read lock
  2. Look up file_number in file_to_partition_ hash map → partition_idx
  3. Release read lock
  4. Acquire partition->mutex
  5. Look up in pending_index

Proposed: Encode the partition_idx (a few bits — max partitions is small)
into the BlobIndex alongside file_number, offset, and size. The read
path becomes:

  1. Extract partition_idx from BlobIndex (no lock, no map)
  2. Acquire partition->mutex
  3. Look up in pending_index

Eliminates: 1 RWMutex (file_partition_mutex_), 1 hash map
(file_to_partition_), AddFilePartitionMapping()/
RemoveFilePartitionMapping() calls on every file open/close.

Trade-off: Requires a BlobIndex format change (adding a small field).
This is a new feature so there's no backwards compatibility concern for the
direct-write BlobIndex variant. The standard BlobIndex used by flush-based
blob separation is unchanged.

Alternative (no format change): Since partition count is small (typically
1-16), scan all partitions' pending_index under their respective mutexes.
For N ≤ 16, this linear scan is likely faster than the RWMutex + hash map
overhead. However, this acquires up to N mutexes per read, which is worse
under contention.

Effort: Moderate — BlobIndex encoding change + read path update.


4. Collapse 3 BG condition variables into 1

What: bg_cv_, bg_drain_cv_, and bg_seal_done_cv_ are three separate
condition variables on the same bg_mutex_. They exist to avoid spurious
wakeups:

  • bg_cv_: signals BG threads that new work is available
  • bg_drain_cv_: signals DrainBackgroundWork() that in-flight work finished
  • bg_seal_done_cv_: signals BG threads that SealAllPartitions finished

Why they can be merged: All three protect conditions on the same mutex.
Each waiter already re-checks its condition in a loop (standard CV pattern).
Spurious wakeups cause a re-check and re-wait — correct behavior, just
slightly less efficient.

The paths using bg_drain_cv_ and bg_seal_done_cv_ are cold:

  • DrainBackgroundWork runs once per memtable flush
  • bg_seal_done_cv_ wait happens only during the brief seal window

Occasional spurious wakeups on cold paths have negligible performance impact.

Proposed: Replace all three with a single bg_cv_. All Signal/Wait
calls use the same CV. Each waiter's loop condition ensures correctness.

Eliminates: 2 condition variables, reduces cognitive overhead of tracking
which CV to signal in each scenario.

Trade-off: More SignalAll() wakeups reaching waiters whose condition
hasn't changed. For the seal path (cold, once per flush), this is negligible.

Effort: Easy — mechanical replacement.


5. Replace custom BG thread pool with RocksDB's Env::Schedule

What: The BlobFilePartitionManager implements a custom thread pool:

  • bg_threads_ vector of std::thread
  • BackgroundIOLoop() (~120 lines) with work stealing from ready_queue_
  • bg_mutex_ + 3 CVs for coordination
  • Per-partition bg_queue, bg_in_flight, flush_queued
  • bg_stop_, bg_seal_in_progress_ flags
  • Shutdown logic in destructor

RocksDB already has Env::Schedule() with priority levels
(Priority::BOTTOM, Priority::LOW, etc.) and Env::WaitForJobs().

Proposed design:

  • Each flush/seal becomes a lambda scheduled via Env::Schedule(Priority::BOTTOM, ...)
  • Backpressure: writer submits flush and waits on pending_cv (unchanged)
  • Drain before seal: use an atomic in-flight counter + CV, or Env::WaitForJobs()
  • Seal coordination: pause scheduling (don't submit new work), drain, seal inline

Eliminates: bg_threads_, bg_mutex_, bg_cv_, bg_drain_cv_,
bg_seal_done_cv_, ready_queue_, bg_stop_, bg_seal_in_progress_,
bg_status_, bg_has_error_, per-partition bg_queue, bg_in_flight,
flush_queued, BackgroundIOLoop(), SubmitSeal(), SubmitFlush(),
DrainBackgroundWork() — approximately 200+ lines and 1 global mutex +
3 CVs + 2 atomics
.

Trade-off:

  • Less control over thread-to-partition affinity (but the current code doesn't
    do this anyway — any BG thread can process any partition).
  • Env::Schedule callbacks share the thread pool with compaction/flush
    background jobs. Using Priority::BOTTOM minimizes interference, but under
    heavy compaction load, blob flushes could be delayed.
  • Seal coordination becomes slightly different: instead of bg_seal_in_progress_
    flag, just stop submitting new work and wait for in-flight callbacks to
    complete via an atomic counter.

Effort: Moderate — requires reworking BG scheduling but eliminates
significant complexity.


Rejected Simplifications

Item 2: Remove RCU settings — KEEP AS-IS

Although UpdateCachedSettings() is currently only called during single-threaded
DB open, the RCU pattern is reasonable forward-looking design. If dynamic
SetOptions() support for blob direct write settings is ever added, the
infrastructure is already in place. Removing it now only to re-add it later
would be unnecessary churn. The overhead is minimal (one atomic load per
Put()) and the code is self-contained.

Item 3: Encode partition index in BlobIndex — DO NOT DO THIS

This would modify the BlobIndex serialization format, which is a persistent
on-disk format
. Blob indices are stored in the WAL, memtable, and SST files,
and must remain readable across RocksDB versions. Changing the format for an
internal synchronization optimization creates format compatibility concerns.
Even though blob direct write is a new feature, the BlobIndex encoding is
shared with the existing flush-based blob separation path and should not be
forked. The file_partition_mutex_ is a read-mostly RWMutex on a small map —
the overhead is already lightweight and not worth a format change.

Summary

# Simplification Eliminates Performance impact Effort Decision
1 Remove completed_files_mutex_ 1 dead mutex None Trivial DO
2 Remove RCU settings 1 mutex, 1 atomic None Trivial KEEP — future-proofs dynamic SetOptions()
3 Encode partition in BlobIndex 1 RWMutex, 1 map Read-path improvement Moderate REJECT — on-disk format change not warranted
4 Collapse BG CVs to 1 2 condition variables Negligible spurious wakeups Easy DO
5 Use Env::Schedule ~200 lines, 1 mutex, 3 CVs, 2 atomics Same or better Moderate DO

Recommended priority:

  1. Item 1 is a free win — no risk, trivial effort (dead code removal).
  2. Item 4 is easy and reduces the mental model significantly.
  3. Item 5 is the highest-leverage structural change — it eliminates the most
    code and the most complex synchronization (the custom BG thread pool).

After accepted simplifications (items 1, 4, 5):

  • Per-partition: mutex + pending_cv (essential for deferred flush)
  • Global: Env::Schedule replaces custom thread pool (or 1 bg_mutex_ + 1 CV
    if keeping custom pool with collapsed CVs)
  • Retained as-is: file_partition_mutex_ (lightweight RWMutex), RCU settings
    cache (future-proofing), pending_bytes / blobs_written_since_seal_ atomics
  • Eliminated: 1 dead mutex, 2 condition variables, ~200 lines of custom thread
    pool code

This reduces the synchronization surface from 14 primitives to approximately
8-9, while preserving the deferred flush model, forward compatibility for
dynamic options, and the on-disk BlobIndex format.


6. Documentation: Synchronization Protocol Needs Comprehensive Docs

Independent of code simplifications, the existing synchronization protocol is
insufficiently documented. The concurrency design is correct but extremely
tricky to understand — the invariants, lock ordering, and data ownership
transfers are spread across multiple functions in two files with no centralized
explanation. A new contributor (or reviewer) must read ~500 lines of
interleaved code and mentally reconstruct the protocol.

What needs to be documented

A. Consolidated synchronization overview at the class level. The header
comment (lines 70-101) describes the feature at a high level but says nothing
about which locks protect which state, the lock ordering discipline ("never
nest partition->mutex and bg_mutex_"), or the overall coordination model
between writers, BG threads, and seal.

Suggested: add a block comment in the header above the private: section
listing:

  • Every lock and what it protects
  • The lock ordering rule (no nesting between partition->mutex and bg_mutex_)
  • The three operation flows (write, flush, seal) and which locks each acquires

B. pending_index lifecycle. As detailed in pending_index_analysis.md,
the most subtle aspect — pending_index entries intentionally outliving their
source deque during mutex-free BG I/O — has no consolidated documentation.
The four phases (create under mutex → survive deque move → valid during BG
I/O → remove after flush) should be described in a single block comment,
along with:

  • The C++ guarantee that std::deque move preserves element addresses
  • Why readers must copy data under the mutex (BG thread may free data after
    mutex release)
  • The specific race that Tier 4 of the read fallback covers

C. SealAllPartitions coordination protocol. The two-phase seal
(capture under per-partition mutex → I/O without mutex) with BG thread
pausing (bg_seal_in_progress_) is the most complex coordination sequence.
A step-by-step comment at the top of SealAllPartitions should explain:

  • Why bg_seal_in_progress_ is needed (prevent BG threads from racing with
    seal on the same BlobLogWriter)
  • Why DrainBackgroundWork ignores queued work when bg_seal_in_progress_
    is set (SealAllPartitions handles it inline; without this, deadlock)
  • How writers interact during seal (they can open new files and continue)

D. Backpressure protocol. The interaction between pending_bytes
(atomic), buffer_size_ / high_water_mark_ thresholds, SubmitFlush,
pending_cv (timed wait + re-submit loop), and BG thread flush → decrement
→ signal should be documented as a coherent protocol, not scattered across
WriteBlob and FlushRecordsToDisk.

E. Fix stale comments. Several comments describe abandoned designs:

  • Header lines 87-91: "zero-copy" / "AdoptBatchBuffer()" — the actual
    implementation pre-copies key and value into std::string. There is no
    AdoptBatchBuffer anywhere in the codebase.
  • Header line 230: "Use std::list" — the actual container is std::deque.
  • Header lines 391-396: "RCU-based settings cache" — correctly describes the
    mechanism, but doesn't mention that the writer path is only called during
    single-threaded DB open, making the RCU pattern unnecessary.

Why this matters

The deferred flush path is ~500 lines of concurrent code with multiple
interleaving operations. Without clear documentation:

  • Code reviewers must reconstruct the synchronization protocol from scratch,
    increasing review time and risk of missed bugs.
  • Future modifications (e.g., adding dynamic SetOptions() support) are
    likely to introduce races because the invariants are implicit.
  • The bg_seal_in_progress_ / DrainBackgroundWork interaction already
    required a deadlock fix (see comment at lines 579-588 of the .cc file),
    indicating the protocol is error-prone even for the original author.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Excellent analysis. Will implement the recommended simplifications:

  1. Remove completed_files_mutex_ — dead code, trivial
  2. Collapse 3 BG CVs into 1 — easy, reduces mental model
  3. Replace custom BG thread pool with Env::Schedule — highest leverage, eliminates ~200 lines

And the documentation improvements:

  • A. Consolidated synchronization overview at class level
  • B. pending_index lifecycle (already partially done in Batch D)
  • C. SealAllPartitions coordination protocol
  • D. Backpressure protocol documentation
  • E. Fix remaining stale comments

Agreed on keeping RCU settings (item 2) and rejecting BlobIndex format change (item 3).

Will be addressed in the next commit.

// changeable via SetOptions() (registered without kMutable flag).
// Requires DB reopen to change. The structural options (partitions,
// buffer_size, etc.) are in ImmutableCFOptions.
bool enable_blob_direct_write;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we should not put it here. We should put it in ImmutableCFOptions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will move enable_blob_direct_write from MutableCFOptions to ImmutableCFOptions. Will update all references accordingly. Will be addressed in the next commit.

Comment on lines +1242 to +1243
// Used when enable_blob_direct_write = true (write path) and also
// during flush when partitioned blob files are enabled.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing. remove this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will remove the flush reference from the partition strategy option comment. Will be addressed in the next commit.

Comment on lines +29 to +30
// Used with the blob direct write feature (enable_blob_direct_write)
// and also applicable during flush when partitioned blob files are enabled.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing. Remove this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will remove the "also applicable during flush" part from the BlobFilePartitionStrategy class comment. Will be addressed in the next commit.

Comment on lines 1179 to +1182
// DURABILITY: When WriteOptions::sync is true, blob files are synced
// before WAL write. When sync is false, both blob and WAL data are
// buffered in OS cache.
// buffered in OS cache. The sync method (fsync vs fdatasync) is
// controlled by DBOptions::use_fsync, shared with the rest of the DB.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the batch API is handled properly, but not the Put API. Please fix this. see analysis below.

Here's the complete sync-on-write flow. There are two code paths:

Batch path (WriteImpl, lines 610-617 in db_impl_write.cc):

After TransformBatch writes blob values to partition managers, if write_options.sync is true:

db_impl_write.cc
Lines 610-617
if (write_options.sync) {
for (auto* mgr : used_managers) {
blob_s = mgr->SyncAllOpenFiles(write_options);
if (!blob_s.ok()) {
return blob_s;
}
}
}
SyncAllOpenFiles (lines 1146-1166 in blob_file_partition_manager.cc) does two things:

In deferred mode (buffer_size_ > 0): submits flushes for all partitions and drains all background work — ensuring buffered records are written to the OS
Then for every partition with an open writer, calls partition->writer->Sync(write_options) which does a real fsync/fdatasync

blob_file_partition_manager.cc
Lines 1146-1166
Status BlobFilePartitionManager::SyncAllOpenFiles(
const WriteOptions& write_options) {
if (buffer_size_ > 0) {
for (auto& partition : partitions_) {
SubmitFlush(partition.get());
}
DrainBackgroundWork();
// ... check bg_status_ ...
}
for (auto& partition : partitions_) {
MutexLock lock(&partition->mutex);
if (partition->writer) {
Status s = partition->writer->Sync(write_options);
// ...
Fast path (DBImpl::Put, lines 34-77): Note that it does not sync the blob file itself. It calls mgr->WriteBlob() then passes the batch to WriteImpl. Since the batch has HAS_BLOB_INDEX and no HAS_PUT, the TransformBatch block in WriteImpl is skipped — and so is the SyncAllOpenFiles call. The blob data is only synced when the WAL is synced later in the standard write path (which calls fsync on the WAL, not the blob file).

This looks like a bug in the fast path: when sync=true, the blob file data written by WriteBlob() is not fsynced before the WAL write returns. The batch path handles this correctly, but the Put() fast path does not call SyncAllOpenFiles. The durability guarantee documented at lines 1179-1180 ("blob files are synced before WAL write") is not upheld for the single-key Put() fast path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Critical — the Put fast path writes the blob but does not sync the blob file when write_options.sync=true, unlike the batch path which calls SyncAllOpenFiles. Will add sync call in the Put fast path after WriteBlob when sync is requested. Will be addressed in the next commit.

result.blob_direct_write_partitions = 64;
}
constexpr uint64_t kMaxBufferSize = 256ULL * 1024 * 1024; // 256MB
constexpr uint64_t kMaxBufferSize = 16ULL * 1024 * 1024; // 16MB
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's change this to 64MB. 16MB might be too small for high latency network storage.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will change max buffer cap to 64MB. Will be addressed in the next commit.

Comment on lines +2746 to +2750
// Forward scan: parse blob records sequentially from after the
// header. Validate both header CRC and blob CRC for each record.
// Stop at the first incomplete or corrupt record — this handles
// truncated files from crashes as well as the footer (which is
// not a valid record and will fail header CRC validation).
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is existing code that does exactly this: BlobLogSequentialReader. It provides ReadHeader(), ReadRecord() (with configurable read levels including CRC validation via kReadHeaderKeyBlob), and ReadFooter() — the same sequential forward scan with CRC validation that the orphan recovery code reimplements manually.

Compare the two:

Existing BlobLogSequentialReader::ReadRecord (lines 64-113 of blob_log_sequential_reader.cc):

Reads BlobLogRecord::kHeaderSize bytes
Calls record->DecodeHeaderFrom() (validates header CRC)
At kReadHeaderKeyBlob level: reads key + value, calls record->CheckBlobCRC()
Tracks position via next_byte_
Orphan recovery inline code (lines 2755-2793 of db_impl_open.cc):

Reads BlobLogRecord::kHeaderSize bytes
Calls record.DecodeHeaderFrom() (validates header CRC)
Reads key + value, calls record.CheckBlobCRC()
Tracks position via scan_offset
They do the same thing. The orphan recovery code is essentially a hand-rolled version of BlobLogSequentialReader with kReadHeaderKeyBlob. The only behavioral difference is:

BlobLogSequentialReader returns Status::Corruption on short reads (treating them as errors)
The orphan recovery code does break on short reads (treating them as graceful end-of-valid-data)
That break-vs-error distinction matters for crash recovery (partial writes should be tolerated, not treated as corruption), but BlobLogSequentialReader could be reused with a wrapper that catches Corruption status from truncated reads. Or a new ReadLevel / method could be added to BlobLogSequentialReader that returns a "partial" status instead of corruption.

Interestingly, BlobLogSequentialReader appears to be nearly dead code — it's only used in blob_file_builder_test.cc for test verification. The TODO comment on line 31 of blob_log_sequential_reader.cc even says "// TODO: rate limit BlobLogSequentialReader reads (it appears unused?)". So the PR duplicated code from an existing utility that was already available but apparently overlooked.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to reuse the existing code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good catch — BlobLogSequentialReader already implements the exact same forward scan with CRC validation. Will refactor the orphan recovery code to use BlobLogSequentialReader instead of the hand-rolled scan. The only difference (break on truncation vs error) can be handled by catching the Corruption status from short reads and treating it as end-of-valid-data. This eliminates the code duplication and reuses well-tested existing infrastructure. Will be addressed in the next commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic just scans and checks whether the blob file was complete. If it was not, it just throw it away. It does not salvage existing entries out.

We need to rethink about the recovery process. There are a few complexity here.

  1. blob is half written. We need to recover data from it.
  2. wal is replayed to recover sst.

There are 2 goals:

  1. Try to salvage as much records as possible.
  2. Need to recovery blob file in a right way, so that its total bytes is tracked properly.

I was thinking instead of scan through the blob files, and try to recover they (rewrite it, apply footer), maybe we should just replay the wal and create a new blob file and throw away the old one. As we replay the wal, it will perform blob look up. If blob value could not be found, just throw the record away in SST. If the record is found, copy it and store it in a new blob file, go through normal blob rotate, sst flush recovery flow. Once recovery is done, flush and seal the last sst and blob files. Then delete the orphaned obsolete blob files.

Here is the detailed plan. Please execute it.


name: Blob Recovery Redesign
overview: Redesign blob direct write crash recovery to use WAL-replay-based blob resolution instead of the current orphan-file-scan-and-register approach, ensuring correct total_blob_bytes tracking and maximum record salvage.
todos:

  • id: create-resolver
    content: Create OrphanBlobFileResolver class (h/cc) with scan, open, resolve, stats methods
    status: pending
  • id: integrate-recover
    content: Create resolver before WAL replay in Recover(), destroy after, remove old orphan recovery code
    status: pending
  • id: modify-inserter
    content: Modify MemTableInserter::PutBlobIndexCF to resolve orphan BlobIndex entries during recovery
    status: pending
  • id: add-dbimpl-member
    content: Add orphan_blob_resolver_ member and accessor to DBImpl
    status: pending
  • id: update-build
    content: Update src.mk, CMakeLists.txt, run buckify
    status: pending
  • id: add-tests
    content: Add recovery test cases to db_blob_direct_write_test.cc
    status: pending
  • id: add-metrics
    content: Add recovery tickers and logging
    status: pending
    isProject: false

WAL-Replay-Based Blob Recovery Redesign

Problem Statement

The current orphan blob recovery (db/db_impl/db_impl_open.cc lines 2650-2847) has fundamental issues:

  1. Registers half-written files as-is -- counts only CRC-validated records but doesn't salvage data from them properly. The total_blob_bytes is approximated from file size, not computed from actual record data.
  2. Dangling BlobIndex entries -- WAL replay inserts BlobIndex entries into memtables before orphan recovery runs. If a buffered blob record was lost (OS page cache not fsynced), the BlobIndex survives but points to missing data. These dangling pointers persist in SSTs permanently.
  3. No per-record salvage -- the current code operates at file granularity (register or skip), not record granularity. A file with 99 valid records and 1 truncated record gets registered, but a file with 0 valid records gets skipped entirely.

Proposed Solution

Replace the file-scan-and-register approach with a WAL-replay-based recovery that resolves each BlobIndex individually, converts recoverable entries back to raw values, and lets the existing flush infrastructure create new, properly-tracked blob files.

Key Insight

WriteLevel0TableForRecovery already passes &blob_file_additions to BuildTable (line 2109 of db/db_impl/db_impl_open.cc), so BlobFileBuilder is created during recovery flush when enable_blob_files=true. If we convert orphan BlobIndex entries to raw kTypeValue during WAL replay, the recovery flush will naturally re-extract large values into new blob files with correct total_blob_bytes tracking.

Recovery Timeline (Current vs. Proposed)

Current flow:

  1. WAL Replay -- BlobIndex entries inserted as-is into memtable
  2. Recovery Flush (WriteLevel0TableForRecovery) -- BlobIndex entries written to SSTs
  3. LogAndApplyForRecovery -- MANIFEST updated
  4. Orphan Blob Scan and Register -- blob files retroactively registered
  5. DeleteObsoleteFiles

Proposed flow:

  1. Identify Orphan Blob Files -- scan DB dir, build orphan set (lightweight, no record scanning)
  2. WAL Replay with BlobIndex Resolution -- for each orphan BlobIndex, read blob from orphan file, convert to raw kTypeValue; if blob missing/corrupt, discard the entry
  3. Recovery Flush -- BlobFileBuilder creates new, properly-tracked blob files from the raw values
  4. LogAndApplyForRecovery -- MANIFEST updated with new blob files
  5. DeleteObsoleteFiles -- old orphan blob files cleaned up (not in live set)

Detailed Design

Phase 1: Pre-WAL-Replay -- Identify Orphan Blob Files

Where: New code in DBImpl::Recover() in db/db_impl/db_impl_open.cc, after versions_->Recover() (line 536) but before RecoverLogFiles() (line 816).

What:

  • List all .blob files in the DB directory
  • For each, check if it's registered in any CF's VersionStorageInfo
  • Build a set: std::unordered_set<uint64_t> orphan_blob_file_numbers
  • For each orphan file, open an FSRandomAccessFile handle and store in a map
  • Read and validate the BlobLogHeader of each orphan file (verify it's a valid blob file)

This is lightweight -- no record scanning, just file listing and header validation. Files that fail header validation are excluded (cleaned up later by DeleteObsoleteFiles).

Phase 2: Modified WAL Replay -- BlobIndex Resolution

Where: Modify MemTableInserter::PutBlobIndexCF() in db/write_batch.cc (line 2902).

What: Add a recovery mode that intercepts kTypeBlobIndex entries:

PutBlobIndexCF(cf_id, key, blob_index_value):
  1. If not in recovery mode -> existing behavior (insert as kTypeBlobIndex)
  2. Decode BlobIndex -> get (file_number, offset, value_size, compression)
  3. Check if file_number is in orphan set:
     - If NO (registered file) -> insert as kTypeBlobIndex (unchanged)
     - If YES (orphan file) -> attempt resolution:
       a. Read blob record from orphan file at the specified offset
       b. Validate header CRC and blob CRC (reuse BlobLogRecord validation)
       c. Verify key matches
       d. Decompress if needed
       e. If successful -> insert as kTypeValue with the raw value
       f. If failed -> skip entry, log warning, increment counter

Threading the resolver to MemTableInserter:

The call chain is:

  • InsertLogRecordToMemtable (db/db_impl/db_impl_open.cc:1573)
  • -> WriteBatchInternal::InsertInto (db/write_batch.cc:3271)
  • -> MemTableInserter constructor (line 3278)
  • -> batch->Iterate(&inserter) (line 3283)
  • -> PutBlobIndexCF (line 2902)

InsertInto already takes a DB* db parameter (line 3275). During recovery, db is the DBImpl*. The resolver is stored on DBImpl and accessed via DBImpl::GetOrphanBlobResolver(). The MemTableInserter already has a db_ member.

Phase 3: OrphanBlobFileResolver Implementation

New class in db/blob/orphan_blob_file_resolver.h and db/blob/orphan_blob_file_resolver.cc:

class OrphanBlobFileResolver {
 public:
  // Scan DB directory, identify orphan blob files, open file handles.
  static Status Create(FileSystem* fs, const std::string& dbname,
                       VersionSet* versions,
                       std::unique_ptr<OrphanBlobFileResolver>* resolver);

  // Check if file_number is an orphan.
  bool IsOrphan(uint64_t file_number) const;

  // Try to read blob value from an orphan file.
  // Returns OK + raw value on success.
  // Returns NotFound/Corruption on failure (record missing/corrupt).
  Status TryResolveBlob(uint64_t file_number, uint64_t offset,
                        uint64_t value_size, CompressionType compression,
                        const Slice& user_key, std::string* value);

  // Stats: how many entries were resolved vs. discarded.
  uint64_t resolved_count() const;
  uint64_t discarded_count() const;
};

Reading blob records uses the same approach as BlobFileReader::GetBlob but operating directly on FSRandomAccessFile:

  1. Compute record offset: record_offset = offset - key_size - BlobLogRecord::kHeaderSize
  2. Read BlobLogRecord::kHeaderSize bytes, call DecodeHeaderFrom (validates header CRC)
  3. Read key + value body, call CheckBlobCRC (validates data CRC)
  4. Verify key matches user_key
  5. Decompress if compression != kNoCompression

This reuses the existing BlobLogRecord validation code from db/blob/blob_log_format.h rather than duplicating it.

Phase 4: Recovery Flush Creates New Blob Files

No code changes needed. WriteLevel0TableForRecovery already:

  • Passes &blob_file_additions to BuildTable (line 2109)
  • BuildTable creates BlobFileBuilder when enable_blob_files=true
  • BlobFileBuilder extracts values >= min_blob_size into new blob files with correct total_blob_bytes (computed as kHeaderSize + key.size() + blob.size() per record)
  • After flush, blob file additions are added to the version edit (lines 2187-2189)
  • Stats are tracked (lines 2213-2218)

The resolved raw values we inserted as kTypeValue during WAL replay are re-extracted by BlobFileBuilder into new, properly-sealed blob files.

Phase 5: Cleanup

No code changes needed. After recovery:

  • Old orphan blob files are NOT in the live set (not registered in any version)
  • DeleteObsoleteFiles (line 2888) deletes them
  • The OrphanBlobFileResolver is destroyed, closing the file handles

Files to Modify

  • db/blob/orphan_blob_file_resolver.h -- New file. Class declaration
  • db/blob/orphan_blob_file_resolver.cc -- New file. Implementation: scan, open, resolve, stats
  • db/db_impl/db_impl.h -- Add std::unique_ptr<OrphanBlobFileResolver> orphan_blob_resolver_ member and accessor
  • db/db_impl/db_impl_open.cc -- (1) Create OrphanBlobFileResolver before WAL replay in Recover(). (2) Destroy it after WAL replay. (3) Remove current orphan blob recovery code (lines 2650-2847).
  • db/write_batch.cc -- Modify MemTableInserter::PutBlobIndexCF to check db_->GetOrphanBlobResolver() during recovery. If orphan BlobIndex, call TryResolveBlob, insert as kTypeValue on success, skip on failure.
  • src.mk -- Add orphan_blob_file_resolver.cc
  • CMakeLists.txt -- Add orphan_blob_file_resolver.cc
  • After updating src.mk, run python3 buckifier/buckify_rocksdb.py to regenerate BUCK

Edge Cases

  • BlobIndex pointing to a registered file -- passes through unchanged as kTypeBlobIndex. Only orphan file numbers are intercepted.
  • Overwritten keys -- if key K has two versions in WAL (both BlobIndex), both are inserted into the memtable. The newer version shadows the older one naturally. During flush, CompactionIterator handles deduplication.
  • Delete after BlobIndex -- the Delete is inserted into memtable and shadows the recovered entry. Correct behavior.
  • Merge on BlobIndex -- the Merge operand is inserted into the memtable. During flush, if the base was a recovered raw value (kTypeValue), the merge works normally.
  • Large number of orphan blobs -- resolved values live in memtable as raw values. MaybeWriteLevel0TableForRecovery flushes when write_buffer_size is exceeded, bounding memory.
  • DB opened with enable_blob_files=false after previous direct-write session -- BlobFileBuilder won't be created during recovery flush, so resolved values stay inline in SSTs. This is correct: the user disabled blob files, so values are stored inline.
  • Orphan file with 0 valid records -- IsOrphan returns true, but TryResolveBlob fails for every offset. All entries pointing to it are discarded. The file is deleted by DeleteObsoleteFiles.
  • BlobIndex with compression -- TryResolveBlob uses the compression type from the BlobIndex (encoded at write time). The decompression will succeed if the data is valid.

Testing

Add new test cases in db/blob/db_blob_direct_write_test.cc:

  • RecoveryRewritesOrphanBlobs -- Write with direct write, crash without flush, reopen, verify data readable AND verify old orphan blob files are gone and new blob files exist with correct metadata.
  • RecoveryDiscardsMissingBlobs -- Write BlobIndex entries in WAL pointing to non-existent blob data, reopen, verify those keys return NotFound (discarded during recovery).
  • RecoveryPartialFile -- Write multiple blobs, truncate the blob file mid-record, reopen, verify records before truncation are recovered, records after are discarded.
  • RecoveryMixedRegisteredAndOrphan -- Flush some data (creating registered blob files), write more without flush (creating orphan files), reopen, verify both sets of data are readable.
  • RecoveryWithBlobFilesDisabled -- Write with direct write, reopen with enable_blob_files=false, verify recovered values are stored inline in SSTs.
  • RecoveryTotalBlobBytesCorrect -- After recovery, verify GetBlobFileMetaData for new blob files has correct total_blob_bytes matching the actual record data.

Logging and Metrics

  • Log at INFO level: "Recovered N blob records from M orphan files, discarded K entries with missing/corrupt blob data"
  • Add new tickers: BLOB_DB_ORPHAN_RECOVERY_RECORDS_RESOLVED and BLOB_DB_ORPHAN_RECOVERY_RECORDS_DISCARDED
  • Log the first 10 discarded entries at WARN level with the key (truncated), file number, and error status. After 10, suppress per-entry logging to avoid log spam during recovery of heavily corrupted databases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Understood — this is a fundamental redesign of the recovery approach. Will implement the WAL-replay-based blob recovery as specified:

  1. Create OrphanBlobFileResolver class (scan, open, resolve, stats)
  2. Integrate before WAL replay in Recover(), destroy after
  3. Modify MemTableInserter::PutBlobIndexCF to resolve orphan BlobIndex entries
  4. Remove current orphan blob recovery code (lines 2650-2847)
  5. Add comprehensive recovery tests
  6. Add metrics and logging

Key insight: resolved raw values go into memtable as kTypeValue, and the existing BlobFileBuilder in recovery flush creates new properly-tracked blob files. Old orphan files are cleaned up by DeleteObsoleteFiles.

Will be addressed in the next commit.

BlobFilePartitionManager* blob_partition_manager() const {
return blob_partition_manager_.get();
}
bool has_blob_direct_write() const { return has_blob_direct_write_; }
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this? what if db is opened with all CF disabling direct write. but then add a new column family that have direct write enabled?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Good question. has_blob_direct_write_ is currently a fast-path check in WriteImpl to skip the transformer loop when no CF has direct write enabled. If a CF is added dynamically with direct write enabled after Open, has_blob_direct_write_ would be stale (false) and the transformer would be skipped incorrectly.

Two options:

  1. Remove has_blob_direct_write_ and check per-CF in the write path (slightly more overhead but correct)
  2. Update has_blob_direct_write_ when CFs are added/dropped

Actually, the cleanest fix is to just check if any CF passed to WriteImpl has a blob_partition_manager set, which is already handled by the per-CF lookup in WriteImpl. The has_blob_direct_write_ field can be removed entirely. Will be addressed in the next commit.

// Helper to resolve a write-path blob index found in mem or imm.
// Decodes BlobIndex from the value, resets it, and resolves via the
// 4-tier fallback. Returns true if blob resolution was attempted.
auto maybe_resolve_blob = [&]() -> bool {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit fragile. Let's convert this into a member function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will convert the maybe_resolve_blob lambda into a proper member function of DBImpl. Will be addressed in the next commit.

Comment on lines +119 to +124
Status s = WideColumnSerialization::Deserialize(entity_input, columns);
if (!s.ok()) {
return s;
}
return WriteBatchInternal::PutEntity(output_batch_, column_family_id, key,
columns);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a pass through way to avoid Deserialize and reserialize?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will add a raw pass-through path: if the entity does not need transformation (no column values exceed min_blob_size — which is always true since we do not yet separate individual wide column values), directly copy the raw entity bytes to the output batch without deserializing. Since PutEntityCF entities always pass through unchanged, this avoids the round-trip entirely. Will be addressed in the next commit.

inline bool IsValidBlobOffset(uint64_t value_offset, uint64_t key_size,
uint64_t value_size, uint64_t file_size) {
uint64_t value_size, uint64_t file_size,
bool has_footer = true) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the default argument, just pass it explicitly on the call site.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Bot]: Will remove the default argument for has_footer and update all call sites to pass it explicitly. Will be addressed in the next commit.

@xingbowang xingbowang force-pushed the 2026_03_04_blob_memtable_partition branch from e434b14 to 85f3118 Compare March 28, 2026 22:00
Write blob values directly to blob files during DB::Put/Write,
storing only a small BlobIndex in WAL and memtable. This eliminates
double-write amplification for large values.

Core components:
- BlobFilePartitionManager: concurrent blob file partitions with
  background flush/seal via Env::Schedule for deferred I/O
- BlobWriteBatchTransformer: extracts blob values from WriteBatch,
  writes them to blob files, replaces with BlobIndex references
- OrphanBlobFileResolver: crash recovery of orphaned blob files
  during WAL replay at DB::Open
- DB::Put fast path for single-key writes (avoids WriteBatch
  serialization of the full value)
- 4-tier read fallback for read-after-write visibility:
  Version::GetBlob -> pending_records -> BlobFileCache -> pending retry
- Secondary DB read support

New options (AdvancedColumnFamilyOptions):
- enable_blob_direct_write: enable the feature
- blob_direct_write_partitions: number of concurrent partitions
- blob_direct_write_buffer_size: per-partition write buffer size
- blob_direct_write_flush_interval_ms: periodic background flush interval
- blob_direct_write_partition_strategy: custom partition assignment
  (O_DIRECT controlled by existing use_direct_io_for_flush_and_compaction)

Test Plan: make check, stress test
@xingbowang xingbowang force-pushed the 2026_03_04_blob_memtable_partition branch from 85f3118 to 70cb4b6 Compare March 29, 2026 15:30
- Fix ROCKS_LOG_DEBUG indentation in db_impl_compaction_flush.cc and db_impl_write.cc
- Fix unused fn_copy variable in blob_file_partition_manager.cc (clang -Werror,-Wunused-variable)
- Add missing license header to tools/db_crashtest_test.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant