From 0ab83df3d600aab59ac0766365a4c858c72d760e Mon Sep 17 00:00:00 2001 From: Hermi Date: Fri, 15 Aug 2025 23:57:31 +0800 Subject: [PATCH 1/2] feat(grpc): enhance block and chain point handling with height and timestamp calculations --- src/serve/grpc/query.rs | 172 +++++++++++++++++++++++++++++++++++----- src/serve/grpc/sync.rs | 74 ++++++++++++++++- src/serve/grpc/watch.rs | 66 ++++++++++++--- 3 files changed, 279 insertions(+), 33 deletions(-) diff --git a/src/serve/grpc/query.rs b/src/serve/grpc/query.rs index 90bec0a65..8f9ab18f0 100644 --- a/src/serve/grpc/query.rs +++ b/src/serve/grpc/query.rs @@ -8,12 +8,17 @@ use tracing::info; use super::masking::apply_mask; use crate::prelude::*; - -pub fn point_to_u5c(_ledger: &T, point: &ChainPoint) -> u5c::query::ChainPoint { - u5c::query::ChainPoint { - slot: point.slot(), - hash: point.hash().map(|h| h.to_vec()).unwrap_or_default().into(), - ..Default::default() +use pallas::ledger::traverse::wellknown::{MAINNET_MAGIC, TESTNET_MAGIC, PREVIEW_MAGIC, PRE_PRODUCTION_MAGIC}; + +/// Get the CAIP-2 blockchain identifier for the given network magic +fn get_caip2_identifier(network_magic: u32) -> String { + let network_magic = network_magic as u64; + match network_magic { + MAINNET_MAGIC => format!("cardano-mainnet:{}", network_magic), + TESTNET_MAGIC => format!("cardano-testnet:{}", network_magic), + PREVIEW_MAGIC => format!("cardano-preview:{}", network_magic), + PRE_PRODUCTION_MAGIC => format!("cardano-preprod:{}", network_magic), + _ => format!("cardano:{}", network_magic), // fallback for unknown networks } } @@ -34,6 +39,61 @@ where Self { domain, mapper } } + + fn point_to_u5c(&self, point: &ChainPoint) -> u5c::query::ChainPoint { + match point { + ChainPoint::Origin => u5c::query::ChainPoint { + slot: 0, + hash: vec![].into(), + height: 0, + timestamp: 0, + }, + ChainPoint::Slot(slot) | ChainPoint::Specific(slot, _) => { + // Calculate height by looking up block from slot + let height = self.domain + .archive() + .get_block_by_slot(slot) + .map(|block| { + block.map(|body| { + // Parse the block to get the height + use pallas::ledger::traverse::MultiEraBlock; + if let Ok(parsed_block) = MultiEraBlock::decode(&body) { + parsed_block.number() + } else { + *slot // Fallback to slot + } + }).unwrap_or(*slot) + }) + .unwrap_or(*slot); + + // Calculate timestamp from slot using proper era handling + // Get protocol parameter updates up to this slot + let updates = self.domain.state() + .get_pparams(*slot) + .ok() + .and_then(|updates| { + updates.into_iter() + .map(TryInto::try_into) + .collect::, _>>() + .ok() + }) + .unwrap_or_default(); + + // Get chain summary with proper era handling + let summary = pparams::fold_with_hacks(self.domain.genesis(), &updates, *slot); + + // Calculate timestamp using the canonical function + let timestamp = dolos_cardano::slot_time(*slot, &summary) as u64; + + u5c::query::ChainPoint { + slot: *slot, + hash: point.hash().map(|h| h.to_vec()).unwrap_or_default().into(), + height, + timestamp, + } + } + } + } } fn into_status(err: impl std::error::Error) -> Status { @@ -251,7 +311,7 @@ where ) .into(), }), - ledger_tip: Some(point_to_u5c(&self.domain, &tip)), + ledger_tip: tip.as_ref().map(|p| self.point_to_u5c(p)), }; if let Some(mask) = message.field_mask { @@ -302,7 +362,7 @@ where .read_cursor() .map_err(|e| Status::internal(e.to_string()))? .as_ref() - .map(|p| point_to_u5c(&self.domain, p)); + .map(|p| self.point_to_u5c(p)); Ok(Response::new(u5c::query::ReadUtxosResponse { items, @@ -349,7 +409,7 @@ where .read_cursor() .map_err(|e| Status::internal(e.to_string()))? .as_ref() - .map(|p| point_to_u5c(&self.domain, p)); + .map(|p| self.point_to_u5c(p)); Ok(Response::new(u5c::query::SearchUtxosResponse { items, @@ -390,7 +450,7 @@ where .read_cursor() .map_err(|e| Status::internal(e.to_string()))? .as_ref() - .map(|p| point_to_u5c(&self.domain, p)); + .map(|p| self.point_to_u5c(p)); let mut response = u5c::query::ReadTxResponse { tx: Some(u5c::query::AnyChainTx { @@ -418,23 +478,99 @@ where async fn read_genesis( &self, - request: Request, + _request: Request, ) -> Result, Status> { - let _message = request.into_inner(); + info!("received read_genesis grpc query"); + + let genesis = self.domain.genesis(); + let tip = self.domain.state().cursor().map_err(into_status)?; + + // Get current protocol parameters if available + let current_params = if let Some(tip_point) = tip.as_ref() { + let updates = self + .domain + .state() + .get_pparams(tip_point.slot()) + .map_err(into_status)?; + + let updates: Vec<_> = updates + .into_iter() + .map(TryInto::try_into) + .try_collect::<_, _, pallas::codec::minicbor::decode::Error>() + .map_err(|e| Status::internal(e.to_string()))?; + + let summary = pparams::fold_with_hacks(genesis, &updates, tip_point.slot()); + let era = summary.era_for_slot(tip_point.slot()); + Some(era.pparams.clone()) + } else { + None + }; - info!("received new grpc query"); + // Map genesis + let unified_genesis = self.mapper.map_genesis( + &genesis.byron, + &genesis.shelley, + &genesis.alonzo, + &genesis.conway, + current_params, + ); + + // Get genesis hash + let genesis_hash = if let Ok(Some(point)) = self.domain.state().cursor() { + match point { + ChainPoint::Origin | ChainPoint::Slot(_) => vec![], + ChainPoint::Specific(_, hash) => hash.to_vec(), + } + } else { + vec![] + }; - todo!() + let response = u5c::query::ReadGenesisResponse { + genesis: genesis_hash.into(), + caip2: get_caip2_identifier(unified_genesis.network_magic), + config: Some(u5c::query::read_genesis_response::Config::Cardano(unified_genesis)), + }; + + Ok(Response::new(response)) } async fn read_era_summary( &self, - request: Request, + _request: Request, ) -> Result, Status> { - let _message = request.into_inner(); + info!("received read_era_summary grpc query"); + + let genesis = self.domain.genesis(); + let tip = self.domain.state().cursor().map_err(into_status)?; + + // Get current protocol parameters if available + let current_params = if let Some(tip_point) = tip.as_ref() { + let updates = self + .domain + .state() + .get_pparams(tip_point.slot()) + .map_err(into_status)?; + + let updates: Vec<_> = updates + .into_iter() + .map(TryInto::try_into) + .try_collect::<_, _, pallas::codec::minicbor::decode::Error>() + .map_err(|e| Status::internal(e.to_string()))?; + + let summary = pparams::fold_with_hacks(genesis, &updates, tip_point.slot()); + let era = summary.era_for_slot(tip_point.slot()); + Some(era.pparams.clone()) + } else { + None + }; - info!("received new grpc query"); + // Map era summaries using pallas + let era_summaries = self.mapper.map_era_summaries(current_params); - todo!() + let response = u5c::query::ReadEraSummaryResponse { + summary: Some(u5c::query::read_era_summary_response::Summary::Cardano(era_summaries)), + }; + + Ok(Response::new(response)) } } diff --git a/src/serve/grpc/sync.rs b/src/serve/grpc/sync.rs index cd29c50e8..242e61de9 100644 --- a/src/serve/grpc/sync.rs +++ b/src/serve/grpc/sync.rs @@ -40,12 +40,20 @@ fn raw_to_blockref( mapper: &Mapper, body: &BlockBody, ) -> Option { + use pallas::ledger::traverse::MultiEraBlock; + let u5c::cardano::Block { header, .. } = mapper.map_block_cbor(body); + + // Decode the block to get the actual height + let height = MultiEraBlock::decode(body) + .ok() + .map(|block| block.number()) + .unwrap_or(0); header.map(|h| u5c::sync::BlockRef { slot: h.slot, hash: h.hash, - height: h.height, + height, }) } @@ -100,6 +108,39 @@ where cancel, } } + + fn point_to_blockref(&self, point: &ChainPoint) -> u5c::sync::BlockRef { + use pallas::ledger::traverse::MultiEraBlock; + + match point { + ChainPoint::Origin => u5c::sync::BlockRef { + slot: 0, + hash: vec![].into(), + height: 0, + }, + ChainPoint::Slot(slot) | ChainPoint::Specific(slot, _) => { + // Try to look up the block to get the actual height + let height = self.domain + .archive() + .get_block_by_slot(slot) + .ok() + .and_then(|block| { + block.and_then(|body| { + MultiEraBlock::decode(&body) + .ok() + .map(|block| block.number()) + }) + }) + .unwrap_or(*slot); // Fallback to slot if lookup fails + + u5c::sync::BlockRef { + slot: *slot, + hash: point.hash().map(|h| h.to_vec()).unwrap_or_default().into(), + height, + } + } + } + } } #[async_trait::async_trait] @@ -217,10 +258,35 @@ where .map_err(|e| Status::internal(format!("Unable to read WAL: {e:?}")))? .ok_or(Status::internal("chain has no data."))?; + // Calculate timestamp from slot using proper era handling + let timestamp = match &point { + ChainPoint::Origin => 0, + ChainPoint::Slot(slot) | ChainPoint::Specific(slot, _) => { + use dolos_cardano::pparams; + + // Get protocol parameter updates up to this slot + let updates = self.domain.state() + .get_pparams(*slot) + .ok() + .and_then(|updates| { + updates.into_iter() + .map(TryInto::try_into) + .collect::, _>>() + .ok() + }) + .unwrap_or_default(); + + // Get chain summary with proper era handling + let summary = pparams::fold_with_hacks(self.domain.genesis(), &updates, *slot); + + // Calculate timestamp using the canonical function + dolos_cardano::slot_time(*slot, &summary) as u64 + } + }; + let response = u5c::sync::ReadTipResponse { - tip: Some(point_to_blockref(&point)), - // TODO: impl - timestamp: 0, + tip: Some(self.point_to_blockref(&point)), + timestamp, }; Ok(Response::new(response)) diff --git a/src/serve/grpc/watch.rs b/src/serve/grpc/watch.rs index 2a5e8b3f1..4d429f6e1 100644 --- a/src/serve/grpc/watch.rs +++ b/src/serve/grpc/watch.rs @@ -146,13 +146,53 @@ fn apply_predicate(predicate: &u5c::watch::TxPredicate, tx: &u5c::cardano::Tx) - tx_matches && !not_clause && and_clause && or_clause } -fn block_to_txs( +fn block_to_txs( block: &RawBlock, + domain: &D, mapper: &interop::Mapper, request: &u5c::watch::WatchTxRequest, -) -> Vec { - let block = MultiEraBlock::decode(block).unwrap(); - let txs = block.txs(); +) -> Vec +where + D::State: LedgerContext, +{ + let parsed_block = MultiEraBlock::decode(block).unwrap(); + let txs = parsed_block.txs(); + + // Map the block to get header information + let mapped_block = mapper.map_block_cbor(block); + + // Calculate timestamp for the block + let block_slot = parsed_block.slot(); + let timestamp = { + use dolos_cardano::pparams; + + // Get protocol parameter updates up to this slot + let updates = domain.state() + .get_pparams(block_slot) + .ok() + .and_then(|updates| { + updates.into_iter() + .map(TryInto::try_into) + .collect::, _>>() + .ok() + }) + .unwrap_or_default(); + + // Get chain summary with proper era handling + let summary = pparams::fold_with_hacks(domain.genesis(), &updates, block_slot); + + // Calculate timestamp using the canonical function + dolos_cardano::slot_time(block_slot, &summary) as u64 + }; + + let block_header = mapped_block.header.map(|h| u5c::watch::AnyChainBlock { + native_bytes: vec![].into(), // Just header info, not full block bytes + chain: u5c::watch::any_chain_block::Chain::Cardano(u5c::cardano::Block { + header: Some(h), + body: None, // Only include header, not full body + timestamp, + }).into(), + }); txs.iter() .map(|x: &pallas::ledger::traverse::MultiEraTx<'_>| mapper.map_tx(x)) @@ -164,24 +204,27 @@ fn block_to_txs( }) .map(|x| u5c::watch::AnyChainTx { chain: Some(u5c::watch::any_chain_tx::Chain::Cardano(x)), - // TODO(p): should it be none? - block: None, + block: block_header.clone(), }) .collect() } -fn roll_to_watch_response( +fn roll_to_watch_response( + domain: &D, mapper: &interop::Mapper, log: &TipEvent, request: &u5c::watch::WatchTxRequest, -) -> impl Stream { +) -> impl Stream +where + D::State: LedgerContext, +{ let txs: Vec<_> = match log { - TipEvent::Apply(_, block) => block_to_txs(block, mapper, request) + TipEvent::Apply(_, block) => block_to_txs(block, domain, mapper, request) .into_iter() .map(u5c::watch::watch_tx_response::Action::Apply) .map(|x| u5c::watch::WatchTxResponse { action: Some(x) }) .collect(), - TipEvent::Undo(_, block) => block_to_txs(block, mapper, request) + TipEvent::Undo(_, block) => block_to_txs(block, domain, mapper, request) .into_iter() .map(u5c::watch::watch_tx_response::Action::Undo) .map(|x| u5c::watch::WatchTxResponse { action: Some(x) }) @@ -245,9 +288,10 @@ where ChainStream::start::(self.domain.clone(), intersect, self.cancel.clone()); let mapper = self.mapper.clone(); + let domain = self.domain.clone(); let stream = stream - .flat_map(move |log| roll_to_watch_response(&mapper, &log, &inner_req)) + .flat_map(move |log| roll_to_watch_response(&domain, &mapper, &log, &inner_req)) .map(Ok); Ok(Response::new(Box::pin(stream))) From 961cd8a10daf756d14b3d4115afae080e6f14178 Mon Sep 17 00:00:00 2001 From: Hermi Date: Wed, 29 Oct 2025 18:02:30 +0800 Subject: [PATCH 2/2] fix(grpc): update point_to_u5c to return Result and handle errors properly --- src/serve/grpc/query.rs | 73 +++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 40 deletions(-) diff --git a/src/serve/grpc/query.rs b/src/serve/grpc/query.rs index 8f9ab18f0..44698b71e 100644 --- a/src/serve/grpc/query.rs +++ b/src/serve/grpc/query.rs @@ -40,57 +40,55 @@ where Self { domain, mapper } } - fn point_to_u5c(&self, point: &ChainPoint) -> u5c::query::ChainPoint { + fn point_to_u5c(&self, point: &ChainPoint) -> Result { match point { - ChainPoint::Origin => u5c::query::ChainPoint { + ChainPoint::Origin => Ok(u5c::query::ChainPoint { slot: 0, hash: vec![].into(), height: 0, timestamp: 0, - }, + }), ChainPoint::Slot(slot) | ChainPoint::Specific(slot, _) => { // Calculate height by looking up block from slot - let height = self.domain + let body = self.domain .archive() .get_block_by_slot(slot) - .map(|block| { - block.map(|body| { - // Parse the block to get the height - use pallas::ledger::traverse::MultiEraBlock; - if let Ok(parsed_block) = MultiEraBlock::decode(&body) { - parsed_block.number() - } else { - *slot // Fallback to slot - } - }).unwrap_or(*slot) - }) - .unwrap_or(*slot); + .map_err(into_status)? + .ok_or_else(|| Status::not_found(format!("Block not found at slot {}", slot)))?; + + // Parse the block to get the height + use pallas::ledger::traverse::MultiEraBlock; + let parsed_block = MultiEraBlock::decode(&body) + .map_err(|e| Status::internal(format!("Failed to decode block at slot {}: {}", slot, e)))?; + + let height = parsed_block.number(); // Calculate timestamp from slot using proper era handling // Get protocol parameter updates up to this slot let updates = self.domain.state() .get_pparams(*slot) - .ok() - .and_then(|updates| { - updates.into_iter() - .map(TryInto::try_into) - .collect::, _>>() - .ok() - }) - .unwrap_or_default(); + .map_err(into_status)?; + + let updates: Vec<_> = updates + .into_iter() + .map(TryInto::try_into) + .collect::>() + .map_err(|e: pallas::codec::minicbor::decode::Error| { + Status::internal(format!("Failed to convert protocol parameters: {}", e)) + })?; // Get chain summary with proper era handling let summary = pparams::fold_with_hacks(self.domain.genesis(), &updates, *slot); - + // Calculate timestamp using the canonical function let timestamp = dolos_cardano::slot_time(*slot, &summary) as u64; - u5c::query::ChainPoint { + Ok(u5c::query::ChainPoint { slot: *slot, hash: point.hash().map(|h| h.to_vec()).unwrap_or_default().into(), height, timestamp, - } + }) } } } @@ -311,7 +309,7 @@ where ) .into(), }), - ledger_tip: tip.as_ref().map(|p| self.point_to_u5c(p)), + ledger_tip: Some(self.point_to_u5c(&tip)?), }; if let Some(mask) = message.field_mask { @@ -362,7 +360,8 @@ where .read_cursor() .map_err(|e| Status::internal(e.to_string()))? .as_ref() - .map(|p| self.point_to_u5c(p)); + .map(|p| self.point_to_u5c(p)) + .transpose()?; Ok(Response::new(u5c::query::ReadUtxosResponse { items, @@ -409,7 +408,8 @@ where .read_cursor() .map_err(|e| Status::internal(e.to_string()))? .as_ref() - .map(|p| self.point_to_u5c(p)); + .map(|p| self.point_to_u5c(p)) + .transpose()?; Ok(Response::new(u5c::query::SearchUtxosResponse { items, @@ -450,7 +450,8 @@ where .read_cursor() .map_err(|e| Status::internal(e.to_string()))? .as_ref() - .map(|p| self.point_to_u5c(p)); + .map(|p| self.point_to_u5c(p)) + .transpose()?; let mut response = u5c::query::ReadTxResponse { tx: Some(u5c::query::AnyChainTx { @@ -515,15 +516,7 @@ where current_params, ); - // Get genesis hash - let genesis_hash = if let Ok(Some(point)) = self.domain.state().cursor() { - match point { - ChainPoint::Origin | ChainPoint::Slot(_) => vec![], - ChainPoint::Specific(_, hash) => hash.to_vec(), - } - } else { - vec![] - }; + let genesis_hash = genesis.shelley_hash.to_vec(); let response = u5c::query::ReadGenesisResponse { genesis: genesis_hash.into(),