Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 118 additions & 8 deletions zilliqa/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl FromSql for EvmGas {
}
}

#[derive(Copy, Clone)]
pub enum BlockFilter {
Hash(Hash),
View(u64),
Expand Down Expand Up @@ -218,19 +219,37 @@ impl From<alloy::eips::BlockNumberOrTag> for BlockFilter {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct BlockAndReceipts {
pub block: Block,
pub receipts: Vec<TransactionReceipt>,
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct BlockAndReceiptsAndTransactions {
pub block: Block,
pub receipts: Vec<TransactionReceipt>,
pub transactions: Vec<VerifiedTransaction>,
}

impl From<BlockAndReceiptsAndTransactions> for BlockAndReceipts {
fn from(value: BlockAndReceiptsAndTransactions) -> Self {
BlockAndReceipts {
block: value.block,
receipts: value.receipts,
}
}
}

impl From<&BlockAndReceiptsAndTransactions> for BlockAndReceipts {
fn from(value: &BlockAndReceiptsAndTransactions) -> Self {
BlockAndReceipts {
block: value.block.clone(),
receipts: value.receipts.clone(),
}
}
}

const ROCKSDB_MIGRATE_AT: &str = "migrate_at";
const ROCKSDB_RESTORE_AT: &str = "restore_at";

Expand All @@ -239,6 +258,14 @@ const ROCKSDB_RESTORE_AT: &str = "restore_at";
/// operators to re-sync.
const CURRENT_DB_VERSION: &str = "1";

#[derive(Default, Debug)]
pub struct DbCache {
finalized_block_and_receipts_and_transactions: RwLock<Option<BlockAndReceiptsAndTransactions>>,
finalized_block_and_receipts: RwLock<Option<BlockAndReceipts>>,
finalized_block: RwLock<Option<Block>>,
finalized_transactionless_block: RwLock<Option<Block>>,
}

#[derive(Debug)]
pub struct Db {
pool: Arc<Pool<SqliteConnectionManager>>,
Expand All @@ -250,6 +277,8 @@ pub struct Db {
executable_blocks_height: Option<u64>,
/// Active state migration
active_migrate: bool,
/// Cache
db_cache: DbCache,
}

impl Db {
Expand Down Expand Up @@ -344,6 +373,7 @@ impl Db {
executable_blocks_height,
kvdb: Arc::new(rdb),
active_migrate,
db_cache: Default::default(),
})
}

Expand Down Expand Up @@ -707,7 +737,16 @@ impl Db {

pub fn set_finalized_view(&self, view: u64) -> Result<()> {
let db = self.pool.get()?;
self.set_finalized_view_with_db_tx(&db, view)
self.set_finalized_view_with_db_tx(&db, view)?;
// Invalidate the finalized block caches
*self
.db_cache
.finalized_block_and_receipts_and_transactions
.write() = None;
*self.db_cache.finalized_block_and_receipts.write() = None;
*self.db_cache.finalized_block.write() = None;
*self.db_cache.finalized_transactionless_block.write() = None;
Ok(())
}

pub fn get_finalized_view(&self) -> Result<Option<u64>> {
Expand Down Expand Up @@ -1020,6 +1059,16 @@ impl Db {
}

pub fn get_transactionless_block(&self, filter: BlockFilter) -> Result<Option<Block>> {
if let BlockFilter::Finalized = filter {
if let Some(cached) = self
.db_cache
.finalized_transactionless_block
.read()
.as_ref()
{
return Ok(Some(cached.clone()));
}
}
fn make_block(row: &Row) -> rusqlite::Result<Block> {
Ok(Block {
header: BlockHeader {
Expand All @@ -1040,7 +1089,7 @@ impl Db {
})
}
// Remember to add to `query_planner_stability_guarantee()` test below
Ok(match filter {
let block = match filter {
BlockFilter::Hash(hash) => {
self.pool.get()?.prepare_cached(concat!(
"SELECT block_hash, view, height, qc, signature, state_root_hash, transactions_root_hash, receipts_root_hash, timestamp, gas_used, gas_limit, agg FROM blocks ",
Expand Down Expand Up @@ -1092,10 +1141,31 @@ impl Db {
self.get_transactionless_block(BlockFilter::Height(0))?
}
},
})
};

if let BlockFilter::Finalized = filter {
*self.db_cache.finalized_transactionless_block.write() = block.clone();
}

Ok(block)
}

pub fn get_block(&self, filter: BlockFilter) -> Result<Option<Block>> {
if let BlockFilter::Finalized = filter {
if let Some(cached) = self.db_cache.finalized_block.read().as_ref() {
return Ok(Some(cached.clone()));
} else if let Some(cached) = self.db_cache.finalized_block_and_receipts.read().as_ref()
{
return Ok(Some(cached.clone().block));
} else if let Some(cached) = self
.db_cache
.finalized_block_and_receipts_and_transactions
.read()
.as_ref()
{
return Ok(Some(cached.clone().block));
}
}
let Some(mut block) = self.get_transactionless_block(filter)? else {
return Ok(None);
};
Expand All @@ -1114,10 +1184,27 @@ impl Db {
.query_map([block.header.hash], |row| row.get(0))?
.collect::<Result<Vec<Hash>, _>>()?;
block.transactions = transaction_hashes;

if let BlockFilter::Finalized = filter {
*self.db_cache.finalized_block.write() = Some(block.clone());
}

Ok(Some(block))
}

pub fn get_block_and_receipts(&self, filter: BlockFilter) -> Result<Option<BlockAndReceipts>> {
if let BlockFilter::Finalized = filter {
if let Some(cached) = self.db_cache.finalized_block_and_receipts.read().as_ref() {
return Ok(Some(cached.clone()));
} else if let Some(cached) = self
.db_cache
.finalized_block_and_receipts_and_transactions
.read()
.as_ref()
{
return Ok(Some(cached.clone().into()));
}
}
let Some(mut block) = self.get_transactionless_block(filter)? else {
return Ok(None);
};
Expand All @@ -1133,13 +1220,29 @@ impl Db {
let transaction_hashes = receipts.iter().map(|x| x.tx_hash).collect();
block.transactions = transaction_hashes;

Ok(Some(BlockAndReceipts { block, receipts }))
let result = BlockAndReceipts { block, receipts };

if let BlockFilter::Finalized = filter {
*self.db_cache.finalized_block_and_receipts.write() = Some(result.clone());
}

Ok(Some(result))
}

pub fn get_block_and_receipts_and_transactions(
&self,
filter: BlockFilter,
) -> Result<Option<BlockAndReceiptsAndTransactions>> {
if let BlockFilter::Finalized = filter {
if let Some(cached) = self
.db_cache
.finalized_block_and_receipts_and_transactions
.read()
.as_ref()
{
return Ok(Some(cached.clone()));
}
}
let Some(mut block) = self.get_transactionless_block(filter)? else {
return Ok(None);
};
Expand Down Expand Up @@ -1172,11 +1275,18 @@ impl Db {

assert_eq!(receipts.len(), transactions.len());

Ok(Some(BlockAndReceiptsAndTransactions {
let result = BlockAndReceiptsAndTransactions {
block,
receipts,
transactions,
}))
};
if let BlockFilter::Finalized = filter {
*self
.db_cache
.finalized_block_and_receipts_and_transactions
.write() = Some(result.clone());
}
Ok(Some(result))
}

pub fn contains_block(&self, block_hash: &Hash) -> Result<bool> {
Expand Down