From 5404d7344721023c8b8b72c7bd41da7f633c6e65 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Thu, 30 Mar 2023 19:07:43 +0800 Subject: [PATCH 1/5] *: add lru cache to speed up scanning logs Signed-off-by: Jay Lee --- Cargo.toml | 1 + src/cache.rs | 269 +++++++++++++++++++++++++++++++++++++++++++++++ src/engine.rs | 92 ++++++---------- src/filter.rs | 6 +- src/lib.rs | 1 + src/log_batch.rs | 11 +- src/pipe_log.rs | 4 +- 7 files changed, 317 insertions(+), 67 deletions(-) create mode 100644 src/cache.rs diff --git a/Cargo.toml b/Cargo.toml index c496617c..0a08d011 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ required-features = ["failpoints"] [dependencies] byteorder = "1.2" +bytes = "1.0" crc32fast = "1.2" crossbeam = "0.8" fail = "0.5" diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 00000000..84db9a94 --- /dev/null +++ b/src/cache.rs @@ -0,0 +1,269 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{collections::HashMap, ptr::NonNull, mem}; + +use bytes::Bytes; + +use crate::internals::FileBlockHandle; + +struct CacheEntry { + key: FileBlockHandle, + data: Bytes, +} + +struct Node { + prev: NonNull, + next: NonNull, + entry: CacheEntry, + #[cfg(test)] + _leak_check: self::tests::LeakCheck, +} + +pub struct LruCache { + cache: HashMap>, + + cap: usize, + free: usize, + head: *mut Node, + tail: *mut Node, + + #[cfg(test)] + leak_check: self::tests::LeakCheck, +} + +#[inline] +fn need_size(payload: &[u8]) -> usize { + mem::size_of::() + payload.len() +} + +#[inline] +unsafe fn promote(mut node: NonNull, head: &mut *mut Node, tail: &mut *mut Node) { + if node.as_ptr() == *tail { + return; + } + let mut prev = unsafe { node.as_ref().prev }; + let mut next = unsafe { node.as_ref().next }; + if node.as_ptr() == *head { + *head = next.as_ptr(); + next.as_mut().prev = NonNull::dangling(); + } else { + prev.as_mut().next = next; + next.as_mut().prev = prev; + } + (**tail).next = node; + node.as_mut().prev = NonNull::new_unchecked(*tail); + node.as_mut().next = NonNull::dangling(); + *tail = node.as_ptr(); +} + +impl LruCache { + #[inline] + pub fn with_capacity(cap: usize) -> Self { + Self { + cache: HashMap::default(), + cap, + free: cap, + head: std::ptr::null_mut(), + tail: std::ptr::null_mut(), + #[cfg(test)] + leak_check: self::tests::LeakCheck::default(), + } + } + + #[inline] + pub fn insert(&mut self, key: FileBlockHandle, data: Bytes) -> Option { + match self.cache.get_mut(&key) { + None => (), + Some(node) => { + unsafe { + // Technically they should be exact the same. Using the new version + // to avoid potential bugs. + assert_eq!(data.len(), node.as_ref().entry.data.len()); + node.as_mut().entry.data = data.clone(); + promote(*node, &mut self.head, &mut self.tail); + return Some(data); + } + }, + } + let need_size = need_size(&data); + if need_size > self.cap { + return Some(data); + } + while self.free < need_size && self.remove_head() {} + + let node = Box::new(Node { + prev: NonNull::dangling(), + next: NonNull::dangling(), + entry: CacheEntry { key, data }, + #[cfg(test)] + _leak_check: self.leak_check.clone(), + }); + + let node = Box::into_raw(node); + + if self.head.is_null() { + self.head = node; + self.tail = node; + } else { + unsafe { + (*self.tail).next = NonNull::new_unchecked(node); + (*node).prev = NonNull::new_unchecked(self.tail); + self.tail = node; + } + } + self.free -= need_size; + self.cache.insert(key, unsafe { NonNull::new_unchecked(node) }); + None + } + + #[inline] + pub fn get(&mut self, key: &FileBlockHandle) -> Option { + let node = self.cache.get(key)?; + unsafe { + promote(*node, &mut self.head, &mut self.tail); + Some(node.as_ref().entry.data.clone()) + } + } + + #[inline] + fn remove_head(&mut self) -> bool { + if self.head.is_null() { + return false; + } + let mut head = unsafe { Box::from_raw(self.head) }; + if self.head != self.tail { + self.head = head.next.as_ptr(); + head.prev = NonNull::dangling(); + } else { + self.head = std::ptr::null_mut(); + self.tail = std::ptr::null_mut(); + } + self.free += need_size(&head.entry.data); + self.cache.remove(&head.entry.key); + true + } + + #[inline] + pub fn resize(&mut self, new_cap: usize) { + while (self.cap - self.free) > new_cap { + self.remove_head(); + } + self.free = new_cap - (self.cap - self.free); + self.cap = new_cap; + } +} + +impl Drop for LruCache { + fn drop(&mut self) { + for (_, node) in self.cache.drain() { + unsafe { + drop(Box::from_raw(node.as_ptr())); + } + } + } +} + +unsafe impl Sync for LruCache {} +unsafe impl Send for LruCache {} + +#[cfg(test)] +mod tests { + use std::sync::{atomic::{AtomicIsize, Ordering}, Arc}; + + use crate::internals::LogQueue; + + use super::*; + + pub struct LeakCheck { + cnt: Arc, + } + + impl LeakCheck { + pub fn clone(&self) -> Self { + self.cnt.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + Self { + cnt: self.cnt.clone(), + } + } + } + + impl Default for LeakCheck { + fn default() -> Self { + Self { + cnt: Arc::new(AtomicIsize::new(1)), + } + } + } + + impl Drop for LeakCheck { + fn drop(&mut self) { + self.cnt.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + } + } + + #[test] + fn test_basic_lru() { + let mut cache = LruCache::with_capacity(1024); + let lc = cache.leak_check.clone(); + let mut key = FileBlockHandle::dummy(LogQueue::Append); + for offset in 0..100 { + key.offset = offset; + cache.insert(key, vec![offset as u8; 10].into()); + } + for offset in 0..100 { + key.offset = offset; + cache.insert(key, vec![offset as u8; 10].into()); + } + let entry_len = need_size(&[0; 10]); + let entries_fit = (1024 / entry_len) as u64; + assert_eq!(cache.cap, 1024); + assert_eq!(cache.free, 1024 % entry_len); + for offset in 0..(100 - entries_fit) { + key.offset = offset; + assert_eq!(cache.get(&key).as_deref(), None, "{offset}"); + } + for offset in (100 - entries_fit)..100 { + key.offset = offset; + assert_eq!(cache.get(&key).as_deref(), Some(&[offset as u8; 10] as &[u8]), "{offset}"); + } + + let offset = 100 - entries_fit; + key.offset = offset; + // Get will promote the entry and it will be the last to be removed. + assert_eq!(cache.get(&key).as_deref(), Some(&[offset as u8; 10] as &[u8]), "{offset}"); + for i in 1..entries_fit { + key.offset = 200 + i; + cache.insert(key, vec![key.offset as u8; 10].into()); + key.offset = offset + i; + assert_eq!(cache.get(&key).as_deref(), None, "{i}"); + } + key.offset = offset; + assert_eq!(cache.get(&key).as_deref(), Some(&[offset as u8; 10] as &[u8]), "{offset}"); + + cache.resize(2048); + assert_eq!(cache.cap, 2048); + assert_eq!(cache.free, 2048 - (entries_fit as usize * entry_len)); + key.offset = 201; + assert_eq!(cache.get(&key).as_deref(), Some(&[key.offset as u8; 10] as &[u8])); + key.offset = offset; + assert_eq!(cache.get(&key).as_deref(), Some(&[offset as u8; 10] as &[u8])); + + cache.resize(entry_len); + assert_eq!(cache.cap, entry_len); + assert_eq!(cache.free, 0); + key.offset = 201; + assert_eq!(cache.get(&key).as_deref(), None); + key.offset = offset; + assert_eq!(cache.get(&key).as_deref(), Some(&[offset as u8; 10] as &[u8])); + + cache.resize(entry_len - 1); + assert_eq!(cache.cap, entry_len - 1); + assert_eq!(cache.free, entry_len - 1); + key.offset = offset; + assert_eq!(cache.get(&key).as_deref(), None); + + drop(cache); + // If there is leak or double free, the count will unlikely to be 1. + assert_eq!(lc.cnt.load(Ordering::Relaxed), 1); + } +} diff --git a/src/engine.rs b/src/engine.rs index a9d368df..7ec4f086 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,15 +1,16 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::cell::{Cell, RefCell}; use std::marker::PhantomData; use std::path::Path; use std::sync::{mpsc, Arc, Mutex}; use std::thread::{Builder as ThreadBuilder, JoinHandle}; use std::time::{Duration, Instant}; +use bytes::Bytes; use log::{error, info}; use protobuf::{parse_from_bytes, Message}; +use crate::cache::LruCache; use crate::config::{Config, RecoveryMode}; use crate::consistency::ConsistencyChecker; use crate::env::{DefaultFileSystem, FileSystem}; @@ -19,7 +20,7 @@ use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilde use crate::log_batch::{Command, LogBatch, MessageExt}; use crate::memtable::{EntryIndex, MemTableRecoverContextFactory, MemTables}; use crate::metrics::*; -use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog}; +use crate::pipe_log::{FileBlockHandle, LogQueue, PipeLog}; use crate::purge::{PurgeHook, PurgeManager}; use crate::write_barrier::{WriteBarrier, Writer}; use crate::{perf_context, Error, GlobalStats, Result}; @@ -546,33 +547,31 @@ where LogItemReader::new_file_reader(file_system, path) } } -} - -struct BlockCache { - key: Cell, - block: RefCell>, -} -impl BlockCache { - fn new() -> Self { - BlockCache { - key: Cell::new(FileBlockHandle { - id: FileId::new(LogQueue::Append, 0), - offset: 0, - len: 0, - }), - block: RefCell::new(Vec::new()), - } + pub fn resize_internal_cache(&self, new_capacity: usize) { + CACHE.lock().unwrap().resize(new_capacity); } +} - fn insert(&self, key: FileBlockHandle, block: Vec) { - self.key.set(key); - self.block.replace(block); - } +lazy_static::lazy_static! { + /// Use 256MiB by default. + static ref CACHE: Mutex = Mutex::new(LruCache::with_capacity(256 * 1024 * 1024)); } -thread_local! { - static BLOCK_CACHE: BlockCache = BlockCache::new(); +fn load_cache

(pipe_log: &P, idx: &EntryIndex) -> Result +where P: PipeLog { + let mut cache = CACHE.lock().unwrap(); + if let Some(v) = cache.get(&idx.entries.unwrap()) { + return Ok(v); + } + drop(cache); + let v = LogBatch::decode_entries_block( + &pipe_log.read_bytes(idx.entries.unwrap())?, + idx.entries.unwrap(), + idx.compression_type, + )?; + CACHE.lock().unwrap().insert(idx.entries.unwrap(), v.clone()); + Ok(v) } pub(crate) fn read_entry_from_file(pipe_log: &P, idx: &EntryIndex) -> Result @@ -580,45 +579,21 @@ where M: MessageExt, P: PipeLog, { - BLOCK_CACHE.with(|cache| { - if cache.key.get() != idx.entries.unwrap() { - cache.insert( - idx.entries.unwrap(), - LogBatch::decode_entries_block( - &pipe_log.read_bytes(idx.entries.unwrap())?, - idx.entries.unwrap(), - idx.compression_type, - )?, - ); - } - let e = parse_from_bytes( - &cache.block.borrow() - [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], - )?; - assert_eq!(M::index(&e), idx.index); - Ok(e) - }) + let cache = load_cache(pipe_log, idx)?; + let e = parse_from_bytes( + &cache + [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], + )?; + assert_eq!(M::index(&e), idx.index); + Ok(e) } -pub(crate) fn read_entry_bytes_from_file

(pipe_log: &P, idx: &EntryIndex) -> Result> +pub(crate) fn read_entry_bytes_from_file

(pipe_log: &P, idx: &EntryIndex) -> Result where P: PipeLog, { - BLOCK_CACHE.with(|cache| { - if cache.key.get() != idx.entries.unwrap() { - cache.insert( - idx.entries.unwrap(), - LogBatch::decode_entries_block( - &pipe_log.read_bytes(idx.entries.unwrap())?, - idx.entries.unwrap(), - idx.compression_type, - )?, - ); - } - Ok(cache.block.borrow() - [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize] - .to_owned()) - }) + let cache = load_cache(pipe_log, idx)?; + Ok(cache.slice(idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize)) } #[cfg(test)] @@ -626,6 +601,7 @@ pub(crate) mod tests { use super::*; use crate::env::{ObfuscatedFileSystem, Permission}; use crate::file_pipe_log::{parse_recycled_file_name, FileNameExt}; + use crate::internals::FileId; use crate::log_batch::AtomicGroupBuilder; use crate::pipe_log::Version; use crate::test_util::{generate_entries, PanicGuard}; diff --git a/src/filter.rs b/src/filter.rs index 83d32519..f1b66540 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -3,6 +3,7 @@ use std::path::Path; use std::sync::Arc; +use bytes::Bytes; use hashbrown::HashMap; use rhai::{Engine, Scope, AST}; use scopeguard::{guard, ScopeGuard}; @@ -291,10 +292,11 @@ impl RhaiFilterMachine { ei.entries.unwrap(), ei.compression_type, )?; + // Not using slice to avoid consuming too many memory. entries.push( - block[ei.entry_offset as usize + Bytes::copy_from_slice(&block[ei.entry_offset as usize ..(ei.entry_offset + ei.entry_len) as usize] - .to_owned(), + ), ); } log_batch.add_raw_entries(item.raft_group_id, eis, entries)?; diff --git a/src/lib.rs b/src/lib.rs index 9a786238..49a35f74 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,7 @@ macro_rules! box_err { }); } +mod cache; mod codec; mod config; mod consistency; diff --git a/src/log_batch.rs b/src/log_batch.rs index f8f6cf77..361aeebe 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use std::{mem, u64}; use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; +use bytes::Bytes; use log::error; use num_derive::FromPrimitive; use num_traits::FromPrimitive; @@ -685,7 +686,7 @@ impl LogBatch { &mut self, region_id: u64, mut entry_indexes: Vec, - entries: Vec>, + entries: Vec, ) -> Result<()> { debug_assert!(entry_indexes.len() == entries.len()); debug_assert!(self.buf_state == BufState::Open); @@ -925,19 +926,19 @@ impl LogBatch { buf: &[u8], handle: FileBlockHandle, compression: CompressionType, - ) -> Result> { + ) -> Result { if handle.len > 0 { let _ = verify_checksum_with_signature(&buf[0..handle.len], None)?; match compression { - CompressionType::None => Ok(buf[..handle.len - LOG_BATCH_CHECKSUM_LEN].to_owned()), + CompressionType::None => Ok(Bytes::copy_from_slice(&buf[..handle.len - LOG_BATCH_CHECKSUM_LEN])), CompressionType::Lz4 => { let decompressed = lz4::decompress_block(&buf[..handle.len - LOG_BATCH_CHECKSUM_LEN])?; - Ok(decompressed) + Ok(decompressed.into()) } } } else { - Ok(Vec::new()) + Ok(Bytes::new()) } } } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index b5c87c8a..6362529d 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -25,7 +25,7 @@ pub enum LogQueue { pub type FileSeq = u64; /// A unique identifier for a log file. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct FileId { pub queue: LogQueue, pub seq: FileSeq, @@ -62,7 +62,7 @@ impl std::cmp::PartialOrd for FileId { } /// A logical pointer to a chunk of log file data. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct FileBlockHandle { pub id: FileId, pub offset: u64, From bc4157f44ad6410e5cd0d7590a11abb87e9d0969 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Thu, 30 Mar 2023 19:27:48 +0800 Subject: [PATCH 2/5] format code Signed-off-by: Jay Lee --- src/cache.rs | 45 +++++++++++++++++++++++++++++++++++---------- src/engine.rs | 12 ++++++++---- src/filter.rs | 9 ++++----- src/log_batch.rs | 4 +++- 4 files changed, 50 insertions(+), 20 deletions(-) diff --git a/src/cache.rs b/src/cache.rs index 84db9a94..95c9830d 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,6 +1,6 @@ // Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. -use std::{collections::HashMap, ptr::NonNull, mem}; +use std::{collections::HashMap, mem, ptr::NonNull}; use bytes::Bytes; @@ -83,7 +83,7 @@ impl LruCache { promote(*node, &mut self.head, &mut self.tail); return Some(data); } - }, + } } let need_size = need_size(&data); if need_size > self.cap { @@ -112,7 +112,8 @@ impl LruCache { } } self.free -= need_size; - self.cache.insert(key, unsafe { NonNull::new_unchecked(node) }); + self.cache + .insert(key, unsafe { NonNull::new_unchecked(node) }); None } @@ -168,7 +169,10 @@ unsafe impl Send for LruCache {} #[cfg(test)] mod tests { - use std::sync::{atomic::{AtomicIsize, Ordering}, Arc}; + use std::sync::{ + atomic::{AtomicIsize, Ordering}, + Arc, + }; use crate::internals::LogQueue; @@ -224,13 +228,21 @@ mod tests { } for offset in (100 - entries_fit)..100 { key.offset = offset; - assert_eq!(cache.get(&key).as_deref(), Some(&[offset as u8; 10] as &[u8]), "{offset}"); + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]), + "{offset}" + ); } let offset = 100 - entries_fit; key.offset = offset; // Get will promote the entry and it will be the last to be removed. - assert_eq!(cache.get(&key).as_deref(), Some(&[offset as u8; 10] as &[u8]), "{offset}"); + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]), + "{offset}" + ); for i in 1..entries_fit { key.offset = 200 + i; cache.insert(key, vec![key.offset as u8; 10].into()); @@ -238,15 +250,25 @@ mod tests { assert_eq!(cache.get(&key).as_deref(), None, "{i}"); } key.offset = offset; - assert_eq!(cache.get(&key).as_deref(), Some(&[offset as u8; 10] as &[u8]), "{offset}"); + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]), + "{offset}" + ); cache.resize(2048); assert_eq!(cache.cap, 2048); assert_eq!(cache.free, 2048 - (entries_fit as usize * entry_len)); key.offset = 201; - assert_eq!(cache.get(&key).as_deref(), Some(&[key.offset as u8; 10] as &[u8])); + assert_eq!( + cache.get(&key).as_deref(), + Some(&[key.offset as u8; 10] as &[u8]) + ); key.offset = offset; - assert_eq!(cache.get(&key).as_deref(), Some(&[offset as u8; 10] as &[u8])); + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]) + ); cache.resize(entry_len); assert_eq!(cache.cap, entry_len); @@ -254,7 +276,10 @@ mod tests { key.offset = 201; assert_eq!(cache.get(&key).as_deref(), None); key.offset = offset; - assert_eq!(cache.get(&key).as_deref(), Some(&[offset as u8; 10] as &[u8])); + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]) + ); cache.resize(entry_len - 1); assert_eq!(cache.cap, entry_len - 1); diff --git a/src/engine.rs b/src/engine.rs index 7ec4f086..1ab15002 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -559,7 +559,9 @@ lazy_static::lazy_static! { } fn load_cache

(pipe_log: &P, idx: &EntryIndex) -> Result -where P: PipeLog { +where + P: PipeLog, +{ let mut cache = CACHE.lock().unwrap(); if let Some(v) = cache.get(&idx.entries.unwrap()) { return Ok(v); @@ -570,7 +572,10 @@ where P: PipeLog { idx.entries.unwrap(), idx.compression_type, )?; - CACHE.lock().unwrap().insert(idx.entries.unwrap(), v.clone()); + CACHE + .lock() + .unwrap() + .insert(idx.entries.unwrap(), v.clone()); Ok(v) } @@ -581,8 +586,7 @@ where { let cache = load_cache(pipe_log, idx)?; let e = parse_from_bytes( - &cache - [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], + &cache[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], )?; assert_eq!(M::index(&e), idx.index); Ok(e) diff --git a/src/filter.rs b/src/filter.rs index f1b66540..d216034c 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -293,11 +293,10 @@ impl RhaiFilterMachine { ei.compression_type, )?; // Not using slice to avoid consuming too many memory. - entries.push( - Bytes::copy_from_slice(&block[ei.entry_offset as usize - ..(ei.entry_offset + ei.entry_len) as usize] - ), - ); + entries.push(Bytes::copy_from_slice( + &block[ei.entry_offset as usize + ..(ei.entry_offset + ei.entry_len) as usize], + )); } log_batch.add_raw_entries(item.raft_group_id, eis, entries)?; } diff --git a/src/log_batch.rs b/src/log_batch.rs index 361aeebe..87c8886c 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -930,7 +930,9 @@ impl LogBatch { if handle.len > 0 { let _ = verify_checksum_with_signature(&buf[0..handle.len], None)?; match compression { - CompressionType::None => Ok(Bytes::copy_from_slice(&buf[..handle.len - LOG_BATCH_CHECKSUM_LEN])), + CompressionType::None => Ok(Bytes::copy_from_slice( + &buf[..handle.len - LOG_BATCH_CHECKSUM_LEN], + )), CompressionType::Lz4 => { let decompressed = lz4::decompress_block(&buf[..handle.len - LOG_BATCH_CHECKSUM_LEN])?; From 5ef0467828877efde4ccee076f3da5e11f3d7fda Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Thu, 30 Mar 2023 22:14:20 +0800 Subject: [PATCH 3/5] fix test Signed-off-by: Jay Lee --- src/cache.rs | 33 ++++++++++++++++++++++++++++++--- src/engine.rs | 1 + 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/src/cache.rs b/src/cache.rs index 95c9830d..72ba741a 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -152,15 +152,23 @@ impl LruCache { self.free = new_cap - (self.cap - self.free); self.cap = new_cap; } -} -impl Drop for LruCache { - fn drop(&mut self) { + #[inline] + pub fn clear(&mut self) { for (_, node) in self.cache.drain() { unsafe { drop(Box::from_raw(node.as_ptr())); } } + self.head = std::ptr::null_mut(); + self.tail = std::ptr::null_mut(); + self.free = self.cap; + } +} + +impl Drop for LruCache { + fn drop(&mut self) { + self.clear(); } } @@ -287,6 +295,25 @@ mod tests { key.offset = offset; assert_eq!(cache.get(&key).as_deref(), None); + cache.resize(1024); + cache.clear(); + for offset in 0..100 { + key.offset = offset; + cache.insert(key, vec![offset as u8; 10].into()); + } + for offset in 0..(100 - entries_fit) { + key.offset = offset; + assert_eq!(cache.get(&key).as_deref(), None, "{offset}"); + } + for offset in (100 - entries_fit)..100 { + key.offset = offset; + assert_eq!( + cache.get(&key).as_deref(), + Some(&[offset as u8; 10] as &[u8]), + "{offset}" + ); + } + drop(cache); // If there is leak or double free, the count will unlikely to be 1. assert_eq!(lc.cnt.load(Ordering::Relaxed), 1); diff --git a/src/engine.rs b/src/engine.rs index 1ab15002..1c0e6601 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -650,6 +650,7 @@ pub(crate) mod tests { } fn reopen(self) -> Self { + super::CACHE.lock().unwrap().clear(); let cfg: Config = self.cfg.as_ref().clone(); let file_system = self.pipe_log.file_system(); let mut listeners = self.listeners.clone(); From ce9b1a2619e4c71efc569e7e4d08228200d620d6 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Thu, 30 Mar 2023 23:47:49 +0800 Subject: [PATCH 4/5] make cache local instead of global Global cache is wrong if there are multiple raft engine or raft engine is reopen multiple times. Signed-off-by: Jay Lee --- src/config.rs | 4 +++ src/engine.rs | 94 +++++++++++++++++++++++++-------------------------- src/purge.rs | 6 +++- 3 files changed, 55 insertions(+), 49 deletions(-) diff --git a/src/config.rs b/src/config.rs index 156ae96e..ca95ae33 100644 --- a/src/config.rs +++ b/src/config.rs @@ -108,6 +108,9 @@ pub struct Config { /// /// Default: false pub prefill_for_recycle: bool, + + /// Initial cache capacity for entries. + pub cache_capacity: ReadableSize, } impl Default for Config { @@ -129,6 +132,7 @@ impl Default for Config { memory_limit: None, enable_log_recycle: false, prefill_for_recycle: false, + cache_capacity: ReadableSize::mb(256), }; // Test-specific configurations. #[cfg(test)] diff --git a/src/engine.rs b/src/engine.rs index 1c0e6601..679c026e 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -2,12 +2,13 @@ use std::marker::PhantomData; use std::path::Path; -use std::sync::{mpsc, Arc, Mutex}; +use std::sync::{mpsc, Arc}; use std::thread::{Builder as ThreadBuilder, JoinHandle}; use std::time::{Duration, Instant}; use bytes::Bytes; use log::{error, info}; +use parking_lot::Mutex; use protobuf::{parse_from_bytes, Message}; use crate::cache::LruCache; @@ -98,6 +99,7 @@ where cfg.clone(), memtables.clone(), pipe_log.clone(), + Mutex::new(LruCache::with_capacity(cfg.cache_capacity.0 as usize)), stats.clone(), listeners.clone(), ); @@ -307,6 +309,7 @@ where ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(1.0); return Ok(Some(read_entry_from_file::( self.pipe_log.as_ref(), + &self.purge_manager.cache, &idx, )?)); } @@ -336,7 +339,11 @@ where .read() .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; for i in ents_idx.iter() { - vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); + vec.push(read_entry_from_file::( + self.pipe_log.as_ref(), + &self.purge_manager.cache, + i, + )?); } ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); return Ok(ents_idx.len()); @@ -415,7 +422,7 @@ where P: PipeLog, { fn drop(&mut self) { - self.tx.lock().unwrap().send(()).unwrap(); + self.tx.lock().send(()).unwrap(); if let Some(t) = self.metrics_flusher.take() { t.join().unwrap(); } @@ -549,42 +556,39 @@ where } pub fn resize_internal_cache(&self, new_capacity: usize) { - CACHE.lock().unwrap().resize(new_capacity); + self.purge_manager.cache.lock().resize(new_capacity); } } -lazy_static::lazy_static! { - /// Use 256MiB by default. - static ref CACHE: Mutex = Mutex::new(LruCache::with_capacity(256 * 1024 * 1024)); -} - -fn load_cache

(pipe_log: &P, idx: &EntryIndex) -> Result +#[inline] +fn load_cache

(pipe_log: &P, cache: &Mutex, idx: &EntryIndex) -> Result where P: PipeLog, { - let mut cache = CACHE.lock().unwrap(); - if let Some(v) = cache.get(&idx.entries.unwrap()) { + let mut guard = cache.lock(); + if let Some(v) = guard.get(&idx.entries.unwrap()) { return Ok(v); } - drop(cache); + drop(guard); let v = LogBatch::decode_entries_block( &pipe_log.read_bytes(idx.entries.unwrap())?, idx.entries.unwrap(), idx.compression_type, )?; - CACHE - .lock() - .unwrap() - .insert(idx.entries.unwrap(), v.clone()); + cache.lock().insert(idx.entries.unwrap(), v.clone()); Ok(v) } -pub(crate) fn read_entry_from_file(pipe_log: &P, idx: &EntryIndex) -> Result +pub(crate) fn read_entry_from_file( + pipe_log: &P, + cache: &Mutex, + idx: &EntryIndex, +) -> Result where M: MessageExt, P: PipeLog, { - let cache = load_cache(pipe_log, idx)?; + let cache = load_cache(pipe_log, cache, idx)?; let e = parse_from_bytes( &cache[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], )?; @@ -592,11 +596,15 @@ where Ok(e) } -pub(crate) fn read_entry_bytes_from_file

(pipe_log: &P, idx: &EntryIndex) -> Result +pub(crate) fn read_entry_bytes_from_file

( + pipe_log: &P, + cache: &Mutex, + idx: &EntryIndex, +) -> Result where P: PipeLog, { - let cache = load_cache(pipe_log, idx)?; + let cache = load_cache(pipe_log, cache, idx)?; Ok(cache.slice(idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize)) } @@ -650,7 +658,6 @@ pub(crate) mod tests { } fn reopen(self) -> Self { - super::CACHE.lock().unwrap().clear(); let cfg: Config = self.cfg.as_ref().clone(); let file_system = self.pipe_log.file_system(); let mut listeners = self.listeners.clone(); @@ -1984,16 +1991,16 @@ pub(crate) mod tests { match (parse_append, parse_recycled) { (Some(id), None) if id.queue == LogQueue::Append => { if delete { - self.append_metadata.lock().unwrap().remove(&id.seq) + self.append_metadata.lock().remove(&id.seq) } else { - self.append_metadata.lock().unwrap().insert(id.seq) + self.append_metadata.lock().insert(id.seq) } } (None, Some(seq)) => { if delete { - self.recycled_metadata.lock().unwrap().remove(&seq) + self.recycled_metadata.lock().remove(&seq) } else { - self.recycled_metadata.lock().unwrap().insert(seq) + self.recycled_metadata.lock().insert(seq) } } _ => false, @@ -2053,9 +2060,9 @@ pub(crate) mod tests { let parse_recycled = parse_recycled_file_name(path); match (parse_append, parse_recycled) { (Some(id), None) if id.queue == LogQueue::Append => { - self.append_metadata.lock().unwrap().contains(&id.seq) + self.append_metadata.lock().contains(&id.seq) } - (None, Some(seq)) => self.recycled_metadata.lock().unwrap().contains(&seq), + (None, Some(seq)) => self.recycled_metadata.lock().contains(&seq), _ => false, } } @@ -2099,29 +2106,20 @@ pub(crate) mod tests { assert_eq!(engine.file_count(None), fs.inner.file_count()); let start = engine.file_span(LogQueue::Append).0; // metadata have been deleted. - assert_eq!( - fs.append_metadata.lock().unwrap().iter().next().unwrap(), - &start - ); + assert_eq!(fs.append_metadata.lock().iter().next().unwrap(), &start); let engine = engine.reopen(); assert_eq!(engine.file_count(None), fs.inner.file_count()); let (start, _) = engine.file_span(LogQueue::Append); - assert_eq!( - fs.append_metadata.lock().unwrap().iter().next().unwrap(), - &start - ); + assert_eq!(fs.append_metadata.lock().iter().next().unwrap(), &start); // Simulate recycled metadata. for i in start / 2..start { - fs.append_metadata.lock().unwrap().insert(i); + fs.append_metadata.lock().insert(i); } let engine = engine.reopen(); let (start, _) = engine.file_span(LogQueue::Append); - assert_eq!( - fs.append_metadata.lock().unwrap().iter().next().unwrap(), - &start - ); + assert_eq!(fs.append_metadata.lock().iter().next().unwrap(), &start); } #[test] @@ -2142,7 +2140,7 @@ pub(crate) mod tests { }; let fs = Arc::new(DeleteMonitoredFileSystem::new()); let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap(); - let recycled_start = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap(); + let recycled_start = *fs.recycled_metadata.lock().iter().next().unwrap(); for rid in 1..=10 { engine.append(rid, 1, 11, Some(&entry_data)); } @@ -2159,18 +2157,18 @@ pub(crate) mod tests { assert_eq!(engine.file_count(Some(LogQueue::Append)), 1); // Recycled files have been reused. assert_eq!( - fs.append_metadata.lock().unwrap().iter().next().unwrap(), + fs.append_metadata.lock().iter().next().unwrap(), &(start + 20) ); - let recycled_start_1 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap(); + let recycled_start_1 = *fs.recycled_metadata.lock().iter().next().unwrap(); assert!(recycled_start < recycled_start_1); // Reuse these files. for rid in 1..=5 { engine.append(rid, 1, 11, Some(&entry_data)); } - let start_1 = *fs.append_metadata.lock().unwrap().iter().next().unwrap(); + let start_1 = *fs.append_metadata.lock().iter().next().unwrap(); assert!(start <= start_1); - let recycled_start_2 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap(); + let recycled_start_2 = *fs.recycled_metadata.lock().iter().next().unwrap(); assert!(recycled_start_1 < recycled_start_2); // Reopen the engine and validate the recycled files are reserved @@ -2178,9 +2176,9 @@ pub(crate) mod tests { let engine = engine.reopen(); assert_eq!(file_count, fs.inner.file_count()); assert!(file_count > engine.file_count(None)); - let start_2 = *fs.append_metadata.lock().unwrap().iter().next().unwrap(); + let start_2 = *fs.append_metadata.lock().iter().next().unwrap(); assert_eq!(start_1, start_2); - let recycled_start_3 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap(); + let recycled_start_3 = *fs.recycled_metadata.lock().iter().next().unwrap(); assert_eq!(recycled_start_2, recycled_start_3); } diff --git a/src/purge.rs b/src/purge.rs index 94c57064..0d3e293c 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -10,6 +10,7 @@ use fail::fail_point; use log::{info, warn}; use parking_lot::{Mutex, RwLock}; +use crate::cache::LruCache; use crate::config::Config; use crate::engine::read_entry_bytes_from_file; use crate::event_listener::EventListener; @@ -42,6 +43,7 @@ where cfg: Arc, memtables: MemTables, pipe_log: Arc

, + pub(crate) cache: Mutex, global_stats: Arc, listeners: Vec>, @@ -60,6 +62,7 @@ where cfg: Arc, memtables: MemTables, pipe_log: Arc

, + cache: Mutex, global_stats: Arc, listeners: Vec>, ) -> PurgeManager

{ @@ -67,6 +70,7 @@ where cfg, memtables, pipe_log, + cache, global_stats, listeners, force_rewrite_candidates: Arc::new(Mutex::new(HashMap::default())), @@ -342,7 +346,7 @@ where // compression overhead is not too high. let mut entry_indexes = entry_indexes.into_iter().peekable(); while let Some(ei) = entry_indexes.next() { - let entry = read_entry_bytes_from_file(self.pipe_log.as_ref(), &ei)?; + let entry = read_entry_bytes_from_file(self.pipe_log.as_ref(), &self.cache, &ei)?; current_size += entry.len(); current_entries.push(entry); current_entry_indexes.push(ei); From 02f4c853a2cd39cfc48ec23b82ed68b26e15362a Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Fri, 31 Mar 2023 15:26:01 +0800 Subject: [PATCH 5/5] fix build Signed-off-by: Jay Lee --- src/cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cache.rs b/src/cache.rs index 72ba741a..5f623c3d 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, mem, ptr::NonNull}; use bytes::Bytes; -use crate::internals::FileBlockHandle; +use crate::pipe_log::FileBlockHandle; struct CacheEntry { key: FileBlockHandle,