Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 147 additions & 18 deletions src/serve/grpc/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ use tracing::info;

use super::masking::apply_mask;
use crate::prelude::*;

pub fn point_to_u5c<T: LedgerContext>(_ledger: &T, point: &ChainPoint) -> u5c::query::ChainPoint {
u5c::query::ChainPoint {
slot: point.slot(),
hash: point.hash().map(|h| h.to_vec()).unwrap_or_default().into(),
..Default::default()
use pallas::ledger::traverse::wellknown::{MAINNET_MAGIC, TESTNET_MAGIC, PREVIEW_MAGIC, PRE_PRODUCTION_MAGIC};

/// Get the CAIP-2 blockchain identifier for the given network magic
fn get_caip2_identifier(network_magic: u32) -> String {
let network_magic = network_magic as u64;
match network_magic {
MAINNET_MAGIC => format!("cardano-mainnet:{}", network_magic),
TESTNET_MAGIC => format!("cardano-testnet:{}", network_magic),
PREVIEW_MAGIC => format!("cardano-preview:{}", network_magic),
PRE_PRODUCTION_MAGIC => format!("cardano-preprod:{}", network_magic),
_ => format!("cardano:{}", network_magic), // fallback for unknown networks
}
}

Expand All @@ -34,6 +39,59 @@ where

Self { domain, mapper }
}

fn point_to_u5c(&self, point: &ChainPoint) -> Result<u5c::query::ChainPoint, Status> {
match point {
ChainPoint::Origin => Ok(u5c::query::ChainPoint {
slot: 0,
hash: vec![].into(),
height: 0,
timestamp: 0,
}),
ChainPoint::Slot(slot) | ChainPoint::Specific(slot, _) => {
// Calculate height by looking up block from slot
let body = self.domain
.archive()
.get_block_by_slot(slot)
.map_err(into_status)?
.ok_or_else(|| Status::not_found(format!("Block not found at slot {}", slot)))?;

// Parse the block to get the height
use pallas::ledger::traverse::MultiEraBlock;
let parsed_block = MultiEraBlock::decode(&body)
.map_err(|e| Status::internal(format!("Failed to decode block at slot {}: {}", slot, e)))?;

let height = parsed_block.number();

// Calculate timestamp from slot using proper era handling
// Get protocol parameter updates up to this slot
let updates = self.domain.state()
.get_pparams(*slot)
.map_err(into_status)?;

let updates: Vec<_> = updates
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, _>>()
.map_err(|e: pallas::codec::minicbor::decode::Error| {
Status::internal(format!("Failed to convert protocol parameters: {}", e))
})?;

// Get chain summary with proper era handling
let summary = pparams::fold_with_hacks(self.domain.genesis(), &updates, *slot);

// Calculate timestamp using the canonical function
let timestamp = dolos_cardano::slot_time(*slot, &summary) as u64;

Ok(u5c::query::ChainPoint {
slot: *slot,
hash: point.hash().map(|h| h.to_vec()).unwrap_or_default().into(),
height,
timestamp,
})
}
}
}
}

fn into_status(err: impl std::error::Error) -> Status {
Expand Down Expand Up @@ -251,7 +309,7 @@ where
)
.into(),
}),
ledger_tip: Some(point_to_u5c(&self.domain, &tip)),
ledger_tip: Some(self.point_to_u5c(&tip)?),
};

if let Some(mask) = message.field_mask {
Expand Down Expand Up @@ -302,7 +360,8 @@ where
.read_cursor()
.map_err(|e| Status::internal(e.to_string()))?
.as_ref()
.map(|p| point_to_u5c(&self.domain, p));
.map(|p| self.point_to_u5c(p))
.transpose()?;

Ok(Response::new(u5c::query::ReadUtxosResponse {
items,
Expand Down Expand Up @@ -349,7 +408,8 @@ where
.read_cursor()
.map_err(|e| Status::internal(e.to_string()))?
.as_ref()
.map(|p| point_to_u5c(&self.domain, p));
.map(|p| self.point_to_u5c(p))
.transpose()?;

Ok(Response::new(u5c::query::SearchUtxosResponse {
items,
Expand Down Expand Up @@ -390,7 +450,8 @@ where
.read_cursor()
.map_err(|e| Status::internal(e.to_string()))?
.as_ref()
.map(|p| point_to_u5c(&self.domain, p));
.map(|p| self.point_to_u5c(p))
.transpose()?;

let mut response = u5c::query::ReadTxResponse {
tx: Some(u5c::query::AnyChainTx {
Expand Down Expand Up @@ -418,23 +479,91 @@ where

async fn read_genesis(
&self,
request: Request<u5c::query::ReadGenesisRequest>,
_request: Request<u5c::query::ReadGenesisRequest>,
) -> Result<Response<u5c::query::ReadGenesisResponse>, Status> {
let _message = request.into_inner();
info!("received read_genesis grpc query");

let genesis = self.domain.genesis();
let tip = self.domain.state().cursor().map_err(into_status)?;

// Get current protocol parameters if available
let current_params = if let Some(tip_point) = tip.as_ref() {
let updates = self
.domain
.state()
.get_pparams(tip_point.slot())
.map_err(into_status)?;

let updates: Vec<_> = updates
.into_iter()
.map(TryInto::try_into)
.try_collect::<_, _, pallas::codec::minicbor::decode::Error>()
.map_err(|e| Status::internal(e.to_string()))?;

let summary = pparams::fold_with_hacks(genesis, &updates, tip_point.slot());
let era = summary.era_for_slot(tip_point.slot());
Some(era.pparams.clone())
} else {
None
};

info!("received new grpc query");
// Map genesis
let unified_genesis = self.mapper.map_genesis(
&genesis.byron,
&genesis.shelley,
&genesis.alonzo,
&genesis.conway,
current_params,
);

let genesis_hash = genesis.shelley_hash.to_vec();

let response = u5c::query::ReadGenesisResponse {
genesis: genesis_hash.into(),
caip2: get_caip2_identifier(unified_genesis.network_magic),
config: Some(u5c::query::read_genesis_response::Config::Cardano(unified_genesis)),
};

todo!()
Ok(Response::new(response))
}

async fn read_era_summary(
&self,
request: Request<u5c::query::ReadEraSummaryRequest>,
_request: Request<u5c::query::ReadEraSummaryRequest>,
) -> Result<Response<u5c::query::ReadEraSummaryResponse>, Status> {
let _message = request.into_inner();
info!("received read_era_summary grpc query");

let genesis = self.domain.genesis();
let tip = self.domain.state().cursor().map_err(into_status)?;

// Get current protocol parameters if available
let current_params = if let Some(tip_point) = tip.as_ref() {
let updates = self
.domain
.state()
.get_pparams(tip_point.slot())
.map_err(into_status)?;

let updates: Vec<_> = updates
.into_iter()
.map(TryInto::try_into)
.try_collect::<_, _, pallas::codec::minicbor::decode::Error>()
.map_err(|e| Status::internal(e.to_string()))?;

let summary = pparams::fold_with_hacks(genesis, &updates, tip_point.slot());
let era = summary.era_for_slot(tip_point.slot());
Some(era.pparams.clone())
} else {
None
};

info!("received new grpc query");
// Map era summaries using pallas
let era_summaries = self.mapper.map_era_summaries(current_params);

todo!()
let response = u5c::query::ReadEraSummaryResponse {
summary: Some(u5c::query::read_era_summary_response::Summary::Cardano(era_summaries)),
};

Ok(Response::new(response))
}
}
74 changes: 70 additions & 4 deletions src/serve/grpc/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,20 @@ fn raw_to_blockref<C: LedgerContext>(
mapper: &Mapper<C>,
body: &BlockBody,
) -> Option<u5c::sync::BlockRef> {
use pallas::ledger::traverse::MultiEraBlock;

let u5c::cardano::Block { header, .. } = mapper.map_block_cbor(body);

// Decode the block to get the actual height
let height = MultiEraBlock::decode(body)
.ok()
.map(|block| block.number())
.unwrap_or(0);

header.map(|h| u5c::sync::BlockRef {
slot: h.slot,
hash: h.hash,
height: h.height,
height,
})
}

Expand Down Expand Up @@ -100,6 +108,39 @@ where
cancel,
}
}

fn point_to_blockref(&self, point: &ChainPoint) -> u5c::sync::BlockRef {
use pallas::ledger::traverse::MultiEraBlock;

match point {
ChainPoint::Origin => u5c::sync::BlockRef {
slot: 0,
hash: vec![].into(),
height: 0,
},
ChainPoint::Slot(slot) | ChainPoint::Specific(slot, _) => {
// Try to look up the block to get the actual height
let height = self.domain
.archive()
.get_block_by_slot(slot)
.ok()
.and_then(|block| {
block.and_then(|body| {
MultiEraBlock::decode(&body)
.ok()
.map(|block| block.number())
})
})
.unwrap_or(*slot); // Fallback to slot if lookup fails

u5c::sync::BlockRef {
slot: *slot,
hash: point.hash().map(|h| h.to_vec()).unwrap_or_default().into(),
height,
}
}
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -217,10 +258,35 @@ where
.map_err(|e| Status::internal(format!("Unable to read WAL: {e:?}")))?
.ok_or(Status::internal("chain has no data."))?;

// Calculate timestamp from slot using proper era handling
let timestamp = match &point {
ChainPoint::Origin => 0,
ChainPoint::Slot(slot) | ChainPoint::Specific(slot, _) => {
use dolos_cardano::pparams;

// Get protocol parameter updates up to this slot
let updates = self.domain.state()
.get_pparams(*slot)
.ok()
.and_then(|updates| {
updates.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()
.ok()
})
.unwrap_or_default();

// Get chain summary with proper era handling
let summary = pparams::fold_with_hacks(self.domain.genesis(), &updates, *slot);

// Calculate timestamp using the canonical function
dolos_cardano::slot_time(*slot, &summary) as u64
}
};

let response = u5c::sync::ReadTipResponse {
tip: Some(point_to_blockref(&point)),
// TODO: impl
timestamp: 0,
tip: Some(self.point_to_blockref(&point)),
timestamp,
};

Ok(Response::new(response))
Expand Down
Loading