Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions crates/eth-sparse-mpt/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl SharedCacheV2 {
}
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
enum StorageTrieStatus {
InsertsNotProcessed,
InsertsProcessed,
Expand All @@ -64,7 +64,6 @@ impl StorageTrieStatus {
}
}

/// WARN: Clone will not clone changed tries
#[derive(Debug, Default)]
pub struct RootHashCalculator {
storage: DashMap<Address, Arc<Mutex<StorageCalculator>>, FxBuildHasher>,
Expand All @@ -81,8 +80,20 @@ pub struct RootHashCalculator {
impl Clone for RootHashCalculator {
fn clone(&self) -> Self {
Self {
storage: self
.storage
.iter()
.map(|entry| {
(
*entry.key(),
Arc::new(Mutex::new(entry.value().lock().clone())),
)
})
.collect(),
changed_account: Arc::new(RwLock::new(self.changed_account.read().clone())),
account_trie: self.account_trie.clone(),
shared_cache: self.shared_cache.clone(),
..Default::default()
incremental_account_change: self.incremental_account_change.clone(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
// This file is @generated by prost-build.
/// Mapping of build_info::Version
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BidderVersionInfo {
#[prost(string, tag = "1")]
pub git_commit: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub git_ref: ::prost::alloc::string::String,
#[prost(string, tag = "3")]
pub build_time_utc: ::prost::alloc::string::String,
}
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct Empty {}
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -54,10 +44,41 @@ pub struct LandedBlockInfo {
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LandedBlocksParams {
/// Added field name
#[prost(message, repeated, tag = "1")]
pub landed_block_info: ::prost::alloc::vec::Vec<LandedBlockInfo>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RelaySet {
/// Ids of the relays (MevBoostRelayIDs)
#[prost(string, repeated, tag = "2")]
pub relay_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InitParams {
#[prost(message, repeated, tag = "1")]
pub landed_block_info: ::prost::alloc::vec::Vec<LandedBlockInfo>,
/// All the relays we bid to.
#[prost(message, optional, tag = "2")]
pub all_relay_ids: ::core::option::Option<RelaySet>,
}
/// Mapping of build_info::Version
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BidderVersionInfo {
#[prost(string, tag = "1")]
pub git_commit: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub git_ref: ::prost::alloc::string::String,
#[prost(string, tag = "3")]
pub build_time_utc: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InitReturn {
#[prost(message, optional, tag = "1")]
pub bidder_version: ::core::option::Option<BidderVersionInfo>,
/// RelaySets the bidder will use.
#[prost(message, repeated, tag = "2")]
pub relay_sets: ::prost::alloc::vec::Vec<RelaySet>,
}
/// Generated client implementations.
pub mod bidding_service_client {
#![allow(
Expand Down Expand Up @@ -163,11 +184,8 @@ pub mod bidding_service_client {
/// Returns the version info for the server side.
pub async fn initialize(
&mut self,
request: impl tonic::IntoRequest<super::LandedBlocksParams>,
) -> std::result::Result<
tonic::Response<super::BidderVersionInfo>,
tonic::Status,
> {
request: impl tonic::IntoRequest<super::InitParams>,
) -> std::result::Result<tonic::Response<super::InitReturn>, tonic::Status> {
self.inner
.ready()
.await
Expand Down Expand Up @@ -329,11 +347,8 @@ pub mod bidding_service_server {
/// Returns the version info for the server side.
async fn initialize(
&self,
request: tonic::Request<super::LandedBlocksParams>,
) -> std::result::Result<
tonic::Response<super::BidderVersionInfo>,
tonic::Status,
>;
request: tonic::Request<super::InitParams>,
) -> std::result::Result<tonic::Response<super::InitReturn>, tonic::Status>;
/// BiddingService
async fn create_slot_bidder(
&self,
Expand Down Expand Up @@ -447,16 +462,16 @@ pub mod bidding_service_server {
struct InitializeSvc<T: BiddingService>(pub Arc<T>);
impl<
T: BiddingService,
> tonic::server::UnaryService<super::LandedBlocksParams>
> tonic::server::UnaryService<super::InitParams>
for InitializeSvc<T> {
type Response = super::BidderVersionInfo;
type Response = super::InitReturn;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::LandedBlocksParams>,
request: tonic::Request<super::InitParams>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use parking_lot::Mutex;
use rbuilder::{
live_builder::block_output::bidding_service_interface::{
BiddingService, BlockSealInterfaceForSlotBidder, LandedBlockInfo as RealLandedBlockInfo,
ScrapedRelayBlockBidWithStats, SlotBidder, SlotBlockId,
RelaySet, ScrapedRelayBlockBidWithStats, SlotBidder, SlotBlockId,
},
utils::build_info::Version,
};
Expand All @@ -21,17 +21,19 @@ use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tonic::transport::{Channel, Endpoint, Uri};
use tower::service_fn;
use tracing::error;
use tracing::{error, info};

use crate::{
bidding_service_wrapper::{
bidding_service_client::BiddingServiceClient,
conversion::{real2rpc_block_hash, real2rpc_landed_block_info},
conversion::{
real2rpc_block_hash, real2rpc_landed_block_info, real2rpc_relay_set, rpc2real_relay_set,
},
fast_streams::helpers::{
self, create_blocks_publisher, spawn_slot_bidder_seal_bid_command_subscriber,
BlocksPublisher, ScrapedBidsPublisher,
},
CreateSlotBidderParams, DestroySlotBidderParams, Empty, LandedBlocksParams,
CreateSlotBidderParams, DestroySlotBidderParams, Empty, InitParams, LandedBlocksParams,
MustWinBlockParams,
},
metrics::set_bidding_service_version,
Expand Down Expand Up @@ -66,6 +68,7 @@ pub struct BiddingServiceClientAdapter {
last_session_id: AtomicU64,
scraped_bids_publisher: ScrapedBidsPublisher,
blocks_publisher: Arc<BlocksPublisher>,
relay_sets: Vec<RelaySet>,
}

impl std::fmt::Debug for BiddingServiceClientAdapter {
Expand Down Expand Up @@ -93,6 +96,8 @@ pub enum Error {
InitFailed(tonic::Status),
#[error("ScrapedBidsPublisher error : {0}")]
ScrapedBidsPublisher(#[from] helpers::Error),
#[error("Bidder version not found")]
BidderVersionNotFound,
}

pub type Result<T> = core::result::Result<T, Error>;
Expand All @@ -102,17 +107,20 @@ impl BiddingServiceClientAdapter {
pub async fn new(
uds_path: &str,
landed_blocks_history: &[RealLandedBlockInfo],
all_relay_ids: RelaySet,
cancellation_token: CancellationToken,
) -> Result<Self> {
let session_id_to_slot_bidder = Arc::new(Mutex::new(HashMap::new()));
let commands_sender = Self::init_sender_task(
let (commands_sender, relay_sets) = Self::init_sender_task(
uds_path,
landed_blocks_history,
all_relay_ids,
session_id_to_slot_bidder.clone(),
)
.await?;
spawn_slot_bidder_seal_bid_command_subscriber(
session_id_to_slot_bidder,
relay_sets.clone(),
cancellation_token.clone(),
)?;
let scraped_bids_publisher = ScrapedBidsPublisher::new()?;
Expand All @@ -122,20 +130,26 @@ impl BiddingServiceClientAdapter {
last_session_id: AtomicU64::new(0),
scraped_bids_publisher,
blocks_publisher,
relay_sets,
})
}

fn new_session_id(&self) -> u64 {
self.last_session_id.fetch_add(1, Ordering::Relaxed)
}

// returns the commands_sender to send commands to the bidding service and the relay_sets that it got on the initialize call.
async fn init_sender_task(
uds_path: &str,
landed_blocks_history: &[RealLandedBlockInfo],
all_relay_ids: RelaySet,
session_id_to_slot_bidder: Arc<
Mutex<HashMap<u64, Arc<dyn BlockSealInterfaceForSlotBidder + Send + Sync>>>,
>,
) -> Result<mpsc::UnboundedSender<BiddingServiceClientCommand>> {
) -> Result<(
mpsc::UnboundedSender<BiddingServiceClientCommand>,
Vec<RelaySet>,
)> {
let uds_path = uds_path.to_string();
// Url us dummy but needed to create the Endpoint.
let channel = Endpoint::try_from("http://[::]:50051")
Expand All @@ -148,23 +162,42 @@ impl BiddingServiceClientAdapter {
.await?;
// Create a client
let mut client = BiddingServiceClient::new(channel);
let init_params = LandedBlocksParams {
let init_params = InitParams {
landed_block_info: landed_blocks_history
.iter()
.map(real2rpc_landed_block_info)
.collect(),
all_relay_ids: Some(real2rpc_relay_set(&all_relay_ids)),
};
let bidding_service_version = client
let init_res = client
.initialize(init_params)
.await
.map_err(Error::InitFailed)?;
let bidding_service_version = bidding_service_version.into_inner();
let init_res = init_res.into_inner();
let bidding_service_version = init_res
.bidder_version
.ok_or(Error::BidderVersionNotFound)?;
let relay_sets = init_res.relay_sets.iter().map(rpc2real_relay_set).collect();
info!(?relay_sets, "relay sets received from bidding service");
set_bidding_service_version(Version {
git_commit: bidding_service_version.git_commit,
git_ref: bidding_service_version.git_ref,
build_time_utc: bidding_service_version.build_time_utc,
});
let (commands_sender, mut rx) = mpsc::unbounded_channel::<BiddingServiceClientCommand>();
let (commands_sender, rx) = mpsc::unbounded_channel::<BiddingServiceClientCommand>();
Self::spawn_sender_loop_task(rx, client, session_id_to_slot_bidder);
Ok((commands_sender, relay_sets))
}

/// Spawns a task to execute on client commands received via the channel.
/// Sessions are kept in session_id_to_slot_bidder.
fn spawn_sender_loop_task(
mut rx: mpsc::UnboundedReceiver<BiddingServiceClientCommand>,
mut client: BiddingServiceClient<Channel>,
session_id_to_slot_bidder: Arc<
Mutex<HashMap<u64, Arc<dyn BlockSealInterfaceForSlotBidder + Send + Sync>>>,
>,
) {
// Spawn a task to execute received futures
tokio::spawn(async move {
while let Some(command) = rx.recv().await {
Expand Down Expand Up @@ -201,7 +234,6 @@ impl BiddingServiceClientAdapter {
}
}
});
Ok(commands_sender)
}

/// Calls create_slot_bidder via RPC to init the bidder.
Expand Down Expand Up @@ -276,6 +308,10 @@ impl BiddingService for BiddingServiceClientAdapter {
))
}

fn relay_sets(&self) -> Vec<RelaySet> {
self.relay_sets.clone()
}

fn update_new_landed_blocks_detected(&self, landed_blocks: &[RealLandedBlockInfo]) {
let param = LandedBlocksParams {
landed_block_info: landed_blocks
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
//! Conversion real data <-> gRPC data
use crate::bidding_service_wrapper::LandedBlockInfo as RPCLandedBlockInfo;
use crate::bidding_service_wrapper::{
LandedBlockInfo as RPCLandedBlockInfo, RelaySet as RPCRelaySet,
};

use alloy_primitives::{BlockHash, U256};
use rbuilder::live_builder::block_output::bidding_service_interface::LandedBlockInfo as RealLandedBlockInfo;
use rbuilder::live_builder::block_output::bidding_service_interface::{
LandedBlockInfo as RealLandedBlockInfo, RelaySet as RealRelaySet,
};
use time::OffsetDateTime;
use tonic::Status;

Expand Down Expand Up @@ -43,3 +47,13 @@ pub fn real2rpc_block_hash(v: BlockHash) -> Vec<u8> {
pub fn rpc2real_block_hash(v: &Vec<u8>) -> Result<BlockHash, Status> {
BlockHash::try_from(v.as_slice()).map_err(|_| Status::invalid_argument("rpc BlockHash error"))
}

pub fn real2rpc_relay_set(l: &RealRelaySet) -> RPCRelaySet {
RPCRelaySet {
relay_ids: l.relays().to_vec(),
}
}

pub fn rpc2real_relay_set(l: &RPCRelaySet) -> RealRelaySet {
RealRelaySet::new(l.relay_ids.clone())
}
Loading
Loading