diff --git a/Cargo.toml b/Cargo.toml index c496617c..0a08d011 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ required-features = ["failpoints"] [dependencies] byteorder = "1.2" +bytes = "1.0" crc32fast = "1.2" crossbeam = "0.8" fail = "0.5" diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 00000000..5f623c3d --- /dev/null +++ b/src/cache.rs @@ -0,0 +1,321 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{collections::HashMap, mem, ptr::NonNull}; + +use bytes::Bytes; + +use crate::pipe_log::FileBlockHandle; + +struct CacheEntry { + key: FileBlockHandle, + data: Bytes, +} + +struct Node { + prev: NonNull, + next: NonNull, + entry: CacheEntry, + #[cfg(test)] + _leak_check: self::tests::LeakCheck, +} + +pub struct LruCache { + cache: HashMap>, + + cap: usize, + free: usize, + head: *mut Node, + tail: *mut Node, + + #[cfg(test)] + leak_check: self::tests::LeakCheck, +} + +#[inline] +fn need_size(payload: &[u8]) -> usize { + mem::size_of::() + payload.len() +} + +#[inline] +unsafe fn promote(mut node: NonNull, head: &mut *mut Node, tail: &mut *mut Node) { + if node.as_ptr() == *tail { + return; + } + let mut prev = unsafe { node.as_ref().prev }; + let mut next = unsafe { node.as_ref().next }; + if node.as_ptr() == *head { + *head = next.as_ptr(); + next.as_mut().prev = NonNull::dangling(); + } else { + prev.as_mut().next = next; + next.as_mut().prev = prev; + } + (**tail).next = node; + node.as_mut().prev = NonNull::new_unchecked(*tail); + node.as_mut().next = NonNull::dangling(); + *tail = node.as_ptr(); +} + +impl LruCache { + #[inline] + pub fn with_capacity(cap: usize) -> Self { + Self { + cache: HashMap::default(), + cap, + free: cap, + head: std::ptr::null_mut(), + tail: std::ptr::null_mut(), + #[cfg(test)] + leak_check: self::tests::LeakCheck::default(), + } + } + + #[inline] + pub fn insert(&mut self, key: FileBlockHandle, data: Bytes) -> Option { + match self.cache.get_mut(&key) { + None => (), + Some(node) => { + unsafe { + // Technically they should be exact the same. Using the new version + // to avoid potential bugs. + assert_eq!(data.len(), node.as_ref().entry.data.len()); + node.as_mut().entry.data = data.clone(); + promote(*node, &mut self.head, &mut self.tail); + return Some(data); + } + } + } + let need_size = need_size(&data); + if need_size > self.cap { + return Some(data); + } + while self.free < need_size && self.remove_head() {} + + let node = Box::new(Node { + prev: NonNull::dangling(), + next: NonNull::dangling(), + entry: CacheEntry { key, data }, + #[cfg(test)] + _leak_check: self.leak_check.clone(), + }); + + let node = Box::into_raw(node); + + if self.head.is_null() { + self.head = node; + self.tail = node; + } else { + unsafe { + (*self.tail).next = NonNull::new_unchecked(node); + (*node).prev = NonNull::new_unchecked(self.tail); + self.tail = node; + } + } + self.free -= need_size; + self.cache + .insert(key, unsafe { NonNull::new_unchecked(node) }); + None + } + + #[inline] + pub fn get(&mut self, key: &FileBlockHandle) -> Option { + let node = self.cache.get(key)?; + unsafe { + promote(*node, &mut self.head, &mut self.tail); + Some(node.as_ref().entry.data.clone()) + } + } + + #[inline] + fn remove_head(&mut self) -> bool { + if self.head.is_null() { + return false; + } + let mut head = unsafe { Box::from_raw(self.head) }; + if self.head != self.tail { + self.head = head.next.as_ptr(); + head.prev = NonNull::dangling(); + } else { + self.head = std::ptr::null_mut(); + self.tail = std::ptr::null_mut(); + } + self.free += need_size(&head.entry.data); + self.cache.remove(&head.entry.key); + true + } + + #[inline] + pub fn resize(&mut self, new_cap: usize) { + while (self.cap - self.free) > new_cap { + self.remove_head(); + } + self.free = new_cap - (self.cap - self.free); + self.cap = new_cap; + } + + #[inline] + pub fn clear(&mut self) { + for (_, node) in self.cache.drain() { + unsafe { + drop(Box::from_raw(node.as_ptr())); + } + } + self.head = std::ptr::null_mut(); + self.tail = std::ptr::null_mut(); + self.free = self.cap; + } +} + +impl Drop for LruCache { + fn drop(&mut self) { + self.clear(); + } +} + +unsafe impl Sync for LruCache {} +unsafe impl Send for LruCache {} + +#[cfg(test)] +mod tests { + use std::sync::{ + atomic::{AtomicIsize, Ordering}, + Arc, + }; + + use crate::internals::LogQueue; + + use super::*; + + pub struct LeakCheck { + cnt: Arc, + } + + impl LeakCheck { + pub fn clone(&self) -> Self { + self.cnt.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + Self { + cnt: self.cnt.clone(), + } + } + } + + impl Default for LeakCheck { + fn default() -> Self { + Self { + cnt: Arc::new(AtomicIsize::new(1)), + } + } + } + + impl Drop for LeakCheck { + fn drop(&mut self) { + self.cnt.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + } + } + + #[test] + fn test_basic_lru() { + let mut cache = LruCache::with_capacity(1024); + let lc = cache.leak_check.clone(); + let mut key = FileBlockHandle::dummy(LogQueue::Append); + for offset in 0..100 { + key.offset = offset; + cache.insert(key, vec![offset as u8; 10].into()); + } + for offset in 0..100 { + key.offset = offset; + cache.insert(key, vec![offset as u8; 10].into()); + } + let entry_len = need_size(&[0; 10]); + let entries_fit = (1024 / entry_len) as u64; + assert_eq!(cache.cap, 1024); + assert_eq!(cache.free, 1024 % entry_len); + for offset in 0..(100 - entries_fit) { + key.offset = offset; + assert_eq!(cache.get(&key).as_deref(), None, "{offset}"); + } + for offset in (100 - entries_fit)..100 { + key.offset = offset; + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]), + "{offset}" + ); + } + + let offset = 100 - entries_fit; + key.offset = offset; + // Get will promote the entry and it will be the last to be removed. + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]), + "{offset}" + ); + for i in 1..entries_fit { + key.offset = 200 + i; + cache.insert(key, vec![key.offset as u8; 10].into()); + key.offset = offset + i; + assert_eq!(cache.get(&key).as_deref(), None, "{i}"); + } + key.offset = offset; + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]), + "{offset}" + ); + + cache.resize(2048); + assert_eq!(cache.cap, 2048); + assert_eq!(cache.free, 2048 - (entries_fit as usize * entry_len)); + key.offset = 201; + assert_eq!( + cache.get(&key).as_deref(), + Some(&[key.offset as u8; 10] as &[u8]) + ); + key.offset = offset; + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]) + ); + + cache.resize(entry_len); + assert_eq!(cache.cap, entry_len); + assert_eq!(cache.free, 0); + key.offset = 201; + assert_eq!(cache.get(&key).as_deref(), None); + key.offset = offset; + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]) + ); + + cache.resize(entry_len - 1); + assert_eq!(cache.cap, entry_len - 1); + assert_eq!(cache.free, entry_len - 1); + key.offset = offset; + assert_eq!(cache.get(&key).as_deref(), None); + + cache.resize(1024); + cache.clear(); + for offset in 0..100 { + key.offset = offset; + cache.insert(key, vec![offset as u8; 10].into()); + } + for offset in 0..(100 - entries_fit) { + key.offset = offset; + assert_eq!(cache.get(&key).as_deref(), None, "{offset}"); + } + for offset in (100 - entries_fit)..100 { + key.offset = offset; + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]), + "{offset}" + ); + } + + drop(cache); + // If there is leak or double free, the count will unlikely to be 1. + assert_eq!(lc.cnt.load(Ordering::Relaxed), 1); + } +} diff --git a/src/config.rs b/src/config.rs index 184599d3..00d2d08b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -115,6 +115,9 @@ pub struct Config { /// /// Default: None pub prefill_limit: Option, + + /// Initial cache capacity for entries. + pub cache_capacity: ReadableSize, } impl Default for Config { @@ -136,6 +139,7 @@ impl Default for Config { memory_limit: None, enable_log_recycle: false, prefill_for_recycle: false, + cache_capacity: ReadableSize::mb(256), prefill_limit: None, }; // Test-specific configurations. diff --git a/src/engine.rs b/src/engine.rs index a9d368df..679c026e 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,15 +1,17 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::cell::{Cell, RefCell}; use std::marker::PhantomData; use std::path::Path; -use std::sync::{mpsc, Arc, Mutex}; +use std::sync::{mpsc, Arc}; use std::thread::{Builder as ThreadBuilder, JoinHandle}; use std::time::{Duration, Instant}; +use bytes::Bytes; use log::{error, info}; +use parking_lot::Mutex; use protobuf::{parse_from_bytes, Message}; +use crate::cache::LruCache; use crate::config::{Config, RecoveryMode}; use crate::consistency::ConsistencyChecker; use crate::env::{DefaultFileSystem, FileSystem}; @@ -19,7 +21,7 @@ use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilde use crate::log_batch::{Command, LogBatch, MessageExt}; use crate::memtable::{EntryIndex, MemTableRecoverContextFactory, MemTables}; use crate::metrics::*; -use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog}; +use crate::pipe_log::{FileBlockHandle, LogQueue, PipeLog}; use crate::purge::{PurgeHook, PurgeManager}; use crate::write_barrier::{WriteBarrier, Writer}; use crate::{perf_context, Error, GlobalStats, Result}; @@ -97,6 +99,7 @@ where cfg.clone(), memtables.clone(), pipe_log.clone(), + Mutex::new(LruCache::with_capacity(cfg.cache_capacity.0 as usize)), stats.clone(), listeners.clone(), ); @@ -306,6 +309,7 @@ where ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(1.0); return Ok(Some(read_entry_from_file::( self.pipe_log.as_ref(), + &self.purge_manager.cache, &idx, )?)); } @@ -335,7 +339,11 @@ where .read() .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; for i in ents_idx.iter() { - vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); + vec.push(read_entry_from_file::( + self.pipe_log.as_ref(), + &self.purge_manager.cache, + i, + )?); } ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); return Ok(ents_idx.len()); @@ -414,7 +422,7 @@ where P: PipeLog, { fn drop(&mut self) { - self.tx.lock().unwrap().send(()).unwrap(); + self.tx.lock().send(()).unwrap(); if let Some(t) = self.metrics_flusher.take() { t.join().unwrap(); } @@ -546,79 +554,58 @@ where LogItemReader::new_file_reader(file_system, path) } } -} - -struct BlockCache { - key: Cell, - block: RefCell>, -} - -impl BlockCache { - fn new() -> Self { - BlockCache { - key: Cell::new(FileBlockHandle { - id: FileId::new(LogQueue::Append, 0), - offset: 0, - len: 0, - }), - block: RefCell::new(Vec::new()), - } - } - fn insert(&self, key: FileBlockHandle, block: Vec) { - self.key.set(key); - self.block.replace(block); + pub fn resize_internal_cache(&self, new_capacity: usize) { + self.purge_manager.cache.lock().resize(new_capacity); } } -thread_local! { - static BLOCK_CACHE: BlockCache = BlockCache::new(); +#[inline] +fn load_cache

(pipe_log: &P, cache: &Mutex, idx: &EntryIndex) -> Result +where + P: PipeLog, +{ + let mut guard = cache.lock(); + if let Some(v) = guard.get(&idx.entries.unwrap()) { + return Ok(v); + } + drop(guard); + let v = LogBatch::decode_entries_block( + &pipe_log.read_bytes(idx.entries.unwrap())?, + idx.entries.unwrap(), + idx.compression_type, + )?; + cache.lock().insert(idx.entries.unwrap(), v.clone()); + Ok(v) } -pub(crate) fn read_entry_from_file(pipe_log: &P, idx: &EntryIndex) -> Result +pub(crate) fn read_entry_from_file( + pipe_log: &P, + cache: &Mutex, + idx: &EntryIndex, +) -> Result where M: MessageExt, P: PipeLog, { - BLOCK_CACHE.with(|cache| { - if cache.key.get() != idx.entries.unwrap() { - cache.insert( - idx.entries.unwrap(), - LogBatch::decode_entries_block( - &pipe_log.read_bytes(idx.entries.unwrap())?, - idx.entries.unwrap(), - idx.compression_type, - )?, - ); - } - let e = parse_from_bytes( - &cache.block.borrow() - [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], - )?; - assert_eq!(M::index(&e), idx.index); - Ok(e) - }) + let cache = load_cache(pipe_log, cache, idx)?; + let e = parse_from_bytes( + &cache[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], + )?; + assert_eq!(M::index(&e), idx.index); + Ok(e) } -pub(crate) fn read_entry_bytes_from_file

(pipe_log: &P, idx: &EntryIndex) -> Result> +pub(crate) fn read_entry_bytes_from_file

( + pipe_log: &P, + cache: &Mutex, + idx: &EntryIndex, +) -> Result where P: PipeLog, { - BLOCK_CACHE.with(|cache| { - if cache.key.get() != idx.entries.unwrap() { - cache.insert( - idx.entries.unwrap(), - LogBatch::decode_entries_block( - &pipe_log.read_bytes(idx.entries.unwrap())?, - idx.entries.unwrap(), - idx.compression_type, - )?, - ); - } - Ok(cache.block.borrow() - [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize] - .to_owned()) - }) + let cache = load_cache(pipe_log, cache, idx)?; + Ok(cache.slice(idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize)) } #[cfg(test)] @@ -626,6 +613,7 @@ pub(crate) mod tests { use super::*; use crate::env::{ObfuscatedFileSystem, Permission}; use crate::file_pipe_log::{parse_recycled_file_name, FileNameExt}; + use crate::internals::FileId; use crate::log_batch::AtomicGroupBuilder; use crate::pipe_log::Version; use crate::test_util::{generate_entries, PanicGuard}; @@ -2003,16 +1991,16 @@ pub(crate) mod tests { match (parse_append, parse_recycled) { (Some(id), None) if id.queue == LogQueue::Append => { if delete { - self.append_metadata.lock().unwrap().remove(&id.seq) + self.append_metadata.lock().remove(&id.seq) } else { - self.append_metadata.lock().unwrap().insert(id.seq) + self.append_metadata.lock().insert(id.seq) } } (None, Some(seq)) => { if delete { - self.recycled_metadata.lock().unwrap().remove(&seq) + self.recycled_metadata.lock().remove(&seq) } else { - self.recycled_metadata.lock().unwrap().insert(seq) + self.recycled_metadata.lock().insert(seq) } } _ => false, @@ -2072,9 +2060,9 @@ pub(crate) mod tests { let parse_recycled = parse_recycled_file_name(path); match (parse_append, parse_recycled) { (Some(id), None) if id.queue == LogQueue::Append => { - self.append_metadata.lock().unwrap().contains(&id.seq) + self.append_metadata.lock().contains(&id.seq) } - (None, Some(seq)) => self.recycled_metadata.lock().unwrap().contains(&seq), + (None, Some(seq)) => self.recycled_metadata.lock().contains(&seq), _ => false, } } @@ -2118,29 +2106,20 @@ pub(crate) mod tests { assert_eq!(engine.file_count(None), fs.inner.file_count()); let start = engine.file_span(LogQueue::Append).0; // metadata have been deleted. - assert_eq!( - fs.append_metadata.lock().unwrap().iter().next().unwrap(), - &start - ); + assert_eq!(fs.append_metadata.lock().iter().next().unwrap(), &start); let engine = engine.reopen(); assert_eq!(engine.file_count(None), fs.inner.file_count()); let (start, _) = engine.file_span(LogQueue::Append); - assert_eq!( - fs.append_metadata.lock().unwrap().iter().next().unwrap(), - &start - ); + assert_eq!(fs.append_metadata.lock().iter().next().unwrap(), &start); // Simulate recycled metadata. for i in start / 2..start { - fs.append_metadata.lock().unwrap().insert(i); + fs.append_metadata.lock().insert(i); } let engine = engine.reopen(); let (start, _) = engine.file_span(LogQueue::Append); - assert_eq!( - fs.append_metadata.lock().unwrap().iter().next().unwrap(), - &start - ); + assert_eq!(fs.append_metadata.lock().iter().next().unwrap(), &start); } #[test] @@ -2161,7 +2140,7 @@ pub(crate) mod tests { }; let fs = Arc::new(DeleteMonitoredFileSystem::new()); let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap(); - let recycled_start = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap(); + let recycled_start = *fs.recycled_metadata.lock().iter().next().unwrap(); for rid in 1..=10 { engine.append(rid, 1, 11, Some(&entry_data)); } @@ -2178,18 +2157,18 @@ pub(crate) mod tests { assert_eq!(engine.file_count(Some(LogQueue::Append)), 1); // Recycled files have been reused. assert_eq!( - fs.append_metadata.lock().unwrap().iter().next().unwrap(), + fs.append_metadata.lock().iter().next().unwrap(), &(start + 20) ); - let recycled_start_1 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap(); + let recycled_start_1 = *fs.recycled_metadata.lock().iter().next().unwrap(); assert!(recycled_start < recycled_start_1); // Reuse these files. for rid in 1..=5 { engine.append(rid, 1, 11, Some(&entry_data)); } - let start_1 = *fs.append_metadata.lock().unwrap().iter().next().unwrap(); + let start_1 = *fs.append_metadata.lock().iter().next().unwrap(); assert!(start <= start_1); - let recycled_start_2 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap(); + let recycled_start_2 = *fs.recycled_metadata.lock().iter().next().unwrap(); assert!(recycled_start_1 < recycled_start_2); // Reopen the engine and validate the recycled files are reserved @@ -2197,9 +2176,9 @@ pub(crate) mod tests { let engine = engine.reopen(); assert_eq!(file_count, fs.inner.file_count()); assert!(file_count > engine.file_count(None)); - let start_2 = *fs.append_metadata.lock().unwrap().iter().next().unwrap(); + let start_2 = *fs.append_metadata.lock().iter().next().unwrap(); assert_eq!(start_1, start_2); - let recycled_start_3 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap(); + let recycled_start_3 = *fs.recycled_metadata.lock().iter().next().unwrap(); assert_eq!(recycled_start_2, recycled_start_3); } diff --git a/src/filter.rs b/src/filter.rs index 83d32519..d216034c 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -3,6 +3,7 @@ use std::path::Path; use std::sync::Arc; +use bytes::Bytes; use hashbrown::HashMap; use rhai::{Engine, Scope, AST}; use scopeguard::{guard, ScopeGuard}; @@ -291,11 +292,11 @@ impl RhaiFilterMachine { ei.entries.unwrap(), ei.compression_type, )?; - entries.push( - block[ei.entry_offset as usize - ..(ei.entry_offset + ei.entry_len) as usize] - .to_owned(), - ); + // Not using slice to avoid consuming too many memory. + entries.push(Bytes::copy_from_slice( + &block[ei.entry_offset as usize + ..(ei.entry_offset + ei.entry_len) as usize], + )); } log_batch.add_raw_entries(item.raft_group_id, eis, entries)?; } diff --git a/src/lib.rs b/src/lib.rs index 9a786238..49a35f74 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,7 @@ macro_rules! box_err { }); } +mod cache; mod codec; mod config; mod consistency; diff --git a/src/log_batch.rs b/src/log_batch.rs index f8f6cf77..87c8886c 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use std::{mem, u64}; use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; +use bytes::Bytes; use log::error; use num_derive::FromPrimitive; use num_traits::FromPrimitive; @@ -685,7 +686,7 @@ impl LogBatch { &mut self, region_id: u64, mut entry_indexes: Vec, - entries: Vec>, + entries: Vec, ) -> Result<()> { debug_assert!(entry_indexes.len() == entries.len()); debug_assert!(self.buf_state == BufState::Open); @@ -925,19 +926,21 @@ impl LogBatch { buf: &[u8], handle: FileBlockHandle, compression: CompressionType, - ) -> Result> { + ) -> Result { if handle.len > 0 { let _ = verify_checksum_with_signature(&buf[0..handle.len], None)?; match compression { - CompressionType::None => Ok(buf[..handle.len - LOG_BATCH_CHECKSUM_LEN].to_owned()), + CompressionType::None => Ok(Bytes::copy_from_slice( + &buf[..handle.len - LOG_BATCH_CHECKSUM_LEN], + )), CompressionType::Lz4 => { let decompressed = lz4::decompress_block(&buf[..handle.len - LOG_BATCH_CHECKSUM_LEN])?; - Ok(decompressed) + Ok(decompressed.into()) } } } else { - Ok(Vec::new()) + Ok(Bytes::new()) } } } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index b5c87c8a..6362529d 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -25,7 +25,7 @@ pub enum LogQueue { pub type FileSeq = u64; /// A unique identifier for a log file. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct FileId { pub queue: LogQueue, pub seq: FileSeq, @@ -62,7 +62,7 @@ impl std::cmp::PartialOrd for FileId { } /// A logical pointer to a chunk of log file data. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct FileBlockHandle { pub id: FileId, pub offset: u64, diff --git a/src/purge.rs b/src/purge.rs index 94c57064..0d3e293c 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -10,6 +10,7 @@ use fail::fail_point; use log::{info, warn}; use parking_lot::{Mutex, RwLock}; +use crate::cache::LruCache; use crate::config::Config; use crate::engine::read_entry_bytes_from_file; use crate::event_listener::EventListener; @@ -42,6 +43,7 @@ where cfg: Arc, memtables: MemTables, pipe_log: Arc

, + pub(crate) cache: Mutex, global_stats: Arc, listeners: Vec>, @@ -60,6 +62,7 @@ where cfg: Arc, memtables: MemTables, pipe_log: Arc

, + cache: Mutex, global_stats: Arc, listeners: Vec>, ) -> PurgeManager

{ @@ -67,6 +70,7 @@ where cfg, memtables, pipe_log, + cache, global_stats, listeners, force_rewrite_candidates: Arc::new(Mutex::new(HashMap::default())), @@ -342,7 +346,7 @@ where // compression overhead is not too high. let mut entry_indexes = entry_indexes.into_iter().peekable(); while let Some(ei) = entry_indexes.next() { - let entry = read_entry_bytes_from_file(self.pipe_log.as_ref(), &ei)?; + let entry = read_entry_bytes_from_file(self.pipe_log.as_ref(), &self.cache, &ei)?; current_size += entry.len(); current_entries.push(entry); current_entry_indexes.push(ei);