diff --git a/crates/eth-sparse-mpt/src/v2/mod.rs b/crates/eth-sparse-mpt/src/v2/mod.rs index 5fb7fce39..4986e84f7 100644 --- a/crates/eth-sparse-mpt/src/v2/mod.rs +++ b/crates/eth-sparse-mpt/src/v2/mod.rs @@ -47,7 +47,7 @@ impl SharedCacheV2 { } } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] enum StorageTrieStatus { InsertsNotProcessed, InsertsProcessed, @@ -64,7 +64,6 @@ impl StorageTrieStatus { } } -/// WARN: Clone will not clone changed tries #[derive(Debug, Default)] pub struct RootHashCalculator { storage: DashMap>, FxBuildHasher>, @@ -81,8 +80,20 @@ pub struct RootHashCalculator { impl Clone for RootHashCalculator { fn clone(&self) -> Self { Self { + storage: self + .storage + .iter() + .map(|entry| { + ( + *entry.key(), + Arc::new(Mutex::new(entry.value().lock().clone())), + ) + }) + .collect(), + changed_account: Arc::new(RwLock::new(self.changed_account.read().clone())), + account_trie: self.account_trie.clone(), shared_cache: self.shared_cache.clone(), - ..Default::default() + incremental_account_change: self.incremental_account_change.clone(), } } } diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/bidding_service.rs b/crates/rbuilder-operator/src/bidding_service_wrapper/bidding_service.rs index 32e52cda6..d36e65a2b 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/bidding_service.rs +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/bidding_service.rs @@ -1,14 +1,4 @@ // This file is @generated by prost-build. -/// Mapping of build_info::Version -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct BidderVersionInfo { - #[prost(string, tag = "1")] - pub git_commit: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub git_ref: ::prost::alloc::string::String, - #[prost(string, tag = "3")] - pub build_time_utc: ::prost::alloc::string::String, -} #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Empty {} #[derive(Clone, Copy, PartialEq, ::prost::Message)] @@ -54,10 +44,41 @@ pub struct LandedBlockInfo { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct LandedBlocksParams { - /// Added field name #[prost(message, repeated, tag = "1")] pub landed_block_info: ::prost::alloc::vec::Vec, } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RelaySet { + /// Ids of the relays (MevBoostRelayIDs) + #[prost(string, repeated, tag = "2")] + pub relay_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InitParams { + #[prost(message, repeated, tag = "1")] + pub landed_block_info: ::prost::alloc::vec::Vec, + /// All the relays we bid to. + #[prost(message, optional, tag = "2")] + pub all_relay_ids: ::core::option::Option, +} +/// Mapping of build_info::Version +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BidderVersionInfo { + #[prost(string, tag = "1")] + pub git_commit: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub git_ref: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub build_time_utc: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InitReturn { + #[prost(message, optional, tag = "1")] + pub bidder_version: ::core::option::Option, + /// RelaySets the bidder will use. + #[prost(message, repeated, tag = "2")] + pub relay_sets: ::prost::alloc::vec::Vec, +} /// Generated client implementations. pub mod bidding_service_client { #![allow( @@ -163,11 +184,8 @@ pub mod bidding_service_client { /// Returns the version info for the server side. pub async fn initialize( &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -329,11 +347,8 @@ pub mod bidding_service_server { /// Returns the version info for the server side. async fn initialize( &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; /// BiddingService async fn create_slot_bidder( &self, @@ -447,16 +462,16 @@ pub mod bidding_service_server { struct InitializeSvc(pub Arc); impl< T: BiddingService, - > tonic::server::UnaryService + > tonic::server::UnaryService for InitializeSvc { - type Response = super::BidderVersionInfo; + type Response = super::InitReturn; type Future = BoxFuture< tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/client/bidding_service_client_adapter.rs b/crates/rbuilder-operator/src/bidding_service_wrapper/client/bidding_service_client_adapter.rs index 5d94706b8..59f94283c 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/client/bidding_service_client_adapter.rs +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/client/bidding_service_client_adapter.rs @@ -4,7 +4,7 @@ use parking_lot::Mutex; use rbuilder::{ live_builder::block_output::bidding_service_interface::{ BiddingService, BlockSealInterfaceForSlotBidder, LandedBlockInfo as RealLandedBlockInfo, - ScrapedRelayBlockBidWithStats, SlotBidder, SlotBlockId, + RelaySet, ScrapedRelayBlockBidWithStats, SlotBidder, SlotBlockId, }, utils::build_info::Version, }; @@ -21,17 +21,19 @@ use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tonic::transport::{Channel, Endpoint, Uri}; use tower::service_fn; -use tracing::error; +use tracing::{error, info}; use crate::{ bidding_service_wrapper::{ bidding_service_client::BiddingServiceClient, - conversion::{real2rpc_block_hash, real2rpc_landed_block_info}, + conversion::{ + real2rpc_block_hash, real2rpc_landed_block_info, real2rpc_relay_set, rpc2real_relay_set, + }, fast_streams::helpers::{ self, create_blocks_publisher, spawn_slot_bidder_seal_bid_command_subscriber, BlocksPublisher, ScrapedBidsPublisher, }, - CreateSlotBidderParams, DestroySlotBidderParams, Empty, LandedBlocksParams, + CreateSlotBidderParams, DestroySlotBidderParams, Empty, InitParams, LandedBlocksParams, MustWinBlockParams, }, metrics::set_bidding_service_version, @@ -66,6 +68,7 @@ pub struct BiddingServiceClientAdapter { last_session_id: AtomicU64, scraped_bids_publisher: ScrapedBidsPublisher, blocks_publisher: Arc, + relay_sets: Vec, } impl std::fmt::Debug for BiddingServiceClientAdapter { @@ -93,6 +96,8 @@ pub enum Error { InitFailed(tonic::Status), #[error("ScrapedBidsPublisher error : {0}")] ScrapedBidsPublisher(#[from] helpers::Error), + #[error("Bidder version not found")] + BidderVersionNotFound, } pub type Result = core::result::Result; @@ -102,17 +107,20 @@ impl BiddingServiceClientAdapter { pub async fn new( uds_path: &str, landed_blocks_history: &[RealLandedBlockInfo], + all_relay_ids: RelaySet, cancellation_token: CancellationToken, ) -> Result { let session_id_to_slot_bidder = Arc::new(Mutex::new(HashMap::new())); - let commands_sender = Self::init_sender_task( + let (commands_sender, relay_sets) = Self::init_sender_task( uds_path, landed_blocks_history, + all_relay_ids, session_id_to_slot_bidder.clone(), ) .await?; spawn_slot_bidder_seal_bid_command_subscriber( session_id_to_slot_bidder, + relay_sets.clone(), cancellation_token.clone(), )?; let scraped_bids_publisher = ScrapedBidsPublisher::new()?; @@ -122,6 +130,7 @@ impl BiddingServiceClientAdapter { last_session_id: AtomicU64::new(0), scraped_bids_publisher, blocks_publisher, + relay_sets, }) } @@ -129,13 +138,18 @@ impl BiddingServiceClientAdapter { self.last_session_id.fetch_add(1, Ordering::Relaxed) } + // returns the commands_sender to send commands to the bidding service and the relay_sets that it got on the initialize call. async fn init_sender_task( uds_path: &str, landed_blocks_history: &[RealLandedBlockInfo], + all_relay_ids: RelaySet, session_id_to_slot_bidder: Arc< Mutex>>, >, - ) -> Result> { + ) -> Result<( + mpsc::UnboundedSender, + Vec, + )> { let uds_path = uds_path.to_string(); // Url us dummy but needed to create the Endpoint. let channel = Endpoint::try_from("http://[::]:50051") @@ -148,23 +162,42 @@ impl BiddingServiceClientAdapter { .await?; // Create a client let mut client = BiddingServiceClient::new(channel); - let init_params = LandedBlocksParams { + let init_params = InitParams { landed_block_info: landed_blocks_history .iter() .map(real2rpc_landed_block_info) .collect(), + all_relay_ids: Some(real2rpc_relay_set(&all_relay_ids)), }; - let bidding_service_version = client + let init_res = client .initialize(init_params) .await .map_err(Error::InitFailed)?; - let bidding_service_version = bidding_service_version.into_inner(); + let init_res = init_res.into_inner(); + let bidding_service_version = init_res + .bidder_version + .ok_or(Error::BidderVersionNotFound)?; + let relay_sets = init_res.relay_sets.iter().map(rpc2real_relay_set).collect(); + info!(?relay_sets, "relay sets received from bidding service"); set_bidding_service_version(Version { git_commit: bidding_service_version.git_commit, git_ref: bidding_service_version.git_ref, build_time_utc: bidding_service_version.build_time_utc, }); - let (commands_sender, mut rx) = mpsc::unbounded_channel::(); + let (commands_sender, rx) = mpsc::unbounded_channel::(); + Self::spawn_sender_loop_task(rx, client, session_id_to_slot_bidder); + Ok((commands_sender, relay_sets)) + } + + /// Spawns a task to execute on client commands received via the channel. + /// Sessions are kept in session_id_to_slot_bidder. + fn spawn_sender_loop_task( + mut rx: mpsc::UnboundedReceiver, + mut client: BiddingServiceClient, + session_id_to_slot_bidder: Arc< + Mutex>>, + >, + ) { // Spawn a task to execute received futures tokio::spawn(async move { while let Some(command) = rx.recv().await { @@ -201,7 +234,6 @@ impl BiddingServiceClientAdapter { } } }); - Ok(commands_sender) } /// Calls create_slot_bidder via RPC to init the bidder. @@ -276,6 +308,10 @@ impl BiddingService for BiddingServiceClientAdapter { )) } + fn relay_sets(&self) -> Vec { + self.relay_sets.clone() + } + fn update_new_landed_blocks_detected(&self, landed_blocks: &[RealLandedBlockInfo]) { let param = LandedBlocksParams { landed_block_info: landed_blocks diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/conversion.rs b/crates/rbuilder-operator/src/bidding_service_wrapper/conversion.rs index 5e174a64f..4d12c22ce 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/conversion.rs +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/conversion.rs @@ -1,8 +1,12 @@ //! Conversion real data <-> gRPC data -use crate::bidding_service_wrapper::LandedBlockInfo as RPCLandedBlockInfo; +use crate::bidding_service_wrapper::{ + LandedBlockInfo as RPCLandedBlockInfo, RelaySet as RPCRelaySet, +}; use alloy_primitives::{BlockHash, U256}; -use rbuilder::live_builder::block_output::bidding_service_interface::LandedBlockInfo as RealLandedBlockInfo; +use rbuilder::live_builder::block_output::bidding_service_interface::{ + LandedBlockInfo as RealLandedBlockInfo, RelaySet as RealRelaySet, +}; use time::OffsetDateTime; use tonic::Status; @@ -43,3 +47,13 @@ pub fn real2rpc_block_hash(v: BlockHash) -> Vec { pub fn rpc2real_block_hash(v: &Vec) -> Result { BlockHash::try_from(v.as_slice()).map_err(|_| Status::invalid_argument("rpc BlockHash error")) } + +pub fn real2rpc_relay_set(l: &RealRelaySet) -> RPCRelaySet { + RPCRelaySet { + relay_ids: l.relays().to_vec(), + } +} + +pub fn rpc2real_relay_set(l: &RPCRelaySet) -> RealRelaySet { + RealRelaySet::new(l.relay_ids.clone()) +} diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs index bdaa0e5da..592f73e42 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs @@ -27,7 +27,7 @@ use iceoryx2::{ use parking_lot::Mutex; use rbuilder::{ live_builder::block_output::bidding_service_interface::{ - BlockSealInterfaceForSlotBidder, ScrapedRelayBlockBidWithStats, + BlockSealInterfaceForSlotBidder, RelaySet, ScrapedRelayBlockBidWithStats, }, utils::sync::{Watch, THREAD_BLOCKING_DURATION}, }; @@ -293,11 +293,12 @@ pub struct LastItemPublisher { } impl LastItemPublisher { - pub fn new + 'static>( + pub fn new( item_service_name: &'static str, got_item_event_name: &'static str, max_publishers: usize, max_subscribers: usize, + item_to_rpc: impl Fn(ItemType) -> Option + Send + Sync + 'static, cancellation_token: CancellationToken, ) -> Result { let last_item: Arc> = Arc::new(Watch::new()); @@ -325,8 +326,15 @@ impl LastItemPublisher { }; while !cancellation_token.is_cancelled() { if let Some(item) = last_item.wait_for_data() { - if let Err(err) = notifying_publisher.send(ItemTypeRPC::from(item)) { - error!(item_service_name,err=?err, "LastItemPublisher notifying_publisher.send failed. Bid lost."); + if let Some(item_rpc) = item_to_rpc(item) { + if let Err(err) = notifying_publisher.send(item_rpc) { + error!(item_service_name,err=?err, "LastItemPublisher notifying_publisher.send failed. Bid lost."); + } + } else { + error!( + item_service_name, + "LastItemPublisher item_to_rpc returned None. Item lost." + ); } } } @@ -340,6 +348,24 @@ impl LastItemPublisher { } } + /// Same as new when ItemTypeRPC is From. + pub fn new_with_from + 'static>( + item_service_name: &'static str, + got_item_event_name: &'static str, + max_publishers: usize, + max_subscribers: usize, + cancellation_token: CancellationToken, + ) -> Result { + Self::new( + item_service_name, + got_item_event_name, + max_publishers, + max_subscribers, + |item| Some(ItemTypeRPC::from(item)), + cancellation_token, + ) + } + pub fn send(&self, item: ItemType) { self.last_item.set(item); } @@ -349,7 +375,7 @@ pub type BlocksPublisher = LastItemPublisher Result { - BlocksPublisher::new::( + BlocksPublisher::new_with_from::( BLOCKS_SERVICE_NAME, GOT_SCRAPED_BIDS_OR_BLOCKS_EVENT_NAME, BLOCKS_SERVICE_MAX_PUBLISHERS, @@ -361,13 +387,16 @@ pub fn create_blocks_publisher( pub type SlotBidderSealBidCommandPublisher = LastItemPublisher; pub fn create_slot_bidder_seal_bid_command_publisher( + relay_sets: &[RelaySet], cancellation_token: CancellationToken, ) -> Result { + let relay_sets = relay_sets.to_vec(); SlotBidderSealBidCommandPublisher::new::( SLOT_BIDDER_SEAL_BID_COMMAND_SERVICE_NAME, GOT_SLOT_BIDDER_SEAL_BID_COMMAND_EVENT_NAME, SLOT_BIDDER_SEAL_BID_COMMAND_SERVICE_MAX_PUBLISHERS, SLOT_BIDDER_SEAL_BID_COMMAND_SERVICE_MAX_SUBSCRIBERS, + move |item| SlotBidderSealBidCommandRPC::try_from(item, &relay_sets), cancellation_token, ) } @@ -400,6 +429,7 @@ pub fn spawn_slot_bidder_seal_bid_command_subscriber( session_id_to_slot_bidder: Arc< Mutex>>, >, + relay_sets: Vec, cancellation_token: CancellationToken, ) -> Result<(), Error> { let init_done = Arc::new(Watch::>::new()); @@ -425,7 +455,11 @@ pub fn spawn_slot_bidder_seal_bid_command_subscriber( .get_mut(&sample.session_id) .cloned(); if let Some(bidder) = bidder { - bidder.seal_bid(sample.into()); + if let Some(sample) = SlotBidderSealBidCommandRPC::into_slot_bidder_seal_bid_command(&sample, &relay_sets) { + bidder.seal_bid(sample); + } else { + error!("got seal bid command but could not convert to SlotBidderSealBidCommand"); + } } else { warn!("got seal bid command but no bidder found",); } diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/types.rs b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/types.rs index bc692e6e6..93cde3494 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/types.rs +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/types.rs @@ -7,10 +7,12 @@ use iceoryx2_bb_container::byte_string::FixedSizeByteString; use rbuilder::{ building::builders::BuiltBlockId, live_builder::block_output::bidding_service_interface::{ - BuiltBlockDescriptorForSlotBidder, ScrapedRelayBlockBidWithStats, SlotBidderSealBidCommand, + BuiltBlockDescriptorForSlotBidder, PayoutInfo, RelaySet, ScrapedRelayBlockBidWithStats, + SlotBidderSealBidCommand, }, utils::{offset_datetime_to_timestamp_us, timestamp_us_to_offset_datetime}, }; +use tracing::error; /// Used sometimes to generalize some latency code checks. pub trait WithCreationTime { @@ -54,6 +56,8 @@ impl From for bid_scraper::types::PublisherType { const MAX_RELAY_NAME_LENGTH: usize = 100; const MAX_PUBLISHER_NAME_LENGTH: usize = 100; const MAX_EXTRA_DATA_LENGTH: usize = 32; +/// In practice we will never have more than a few (2). +const MAX_RELAY_SETS_COUNT: usize = 10; const ADDRESS_DATA_LENGTH: usize = 20; const HASH_DATA_LENGTH: usize = 32; const U256_DATA_LENGTH: usize = 32; @@ -189,47 +193,102 @@ impl From for BuiltBlockDescriptorForSlotB pub type SlotBidderSealBidCommandWithSessionId = (SlotBidderSealBidCommand, u64); -#[derive(Debug, Clone, Copy, ZeroCopySend)] +#[derive(Debug, Copy, Clone, ZeroCopySend, Default)] +#[type_name("PayoutInfoRPC")] +#[repr(C)] +pub struct PayoutInfoRPC { + /// Index of the relay set returned by the bidding service on initialize. + pub relay_set_index: usize, + pub payout_tx_value: [u8; U256_DATA_LENGTH], + pub subsidy: [u8; U256_DATA_LENGTH], +} + +impl PayoutInfoRPC { + /// If it fails to find the relay set, returns None. + /// relay_sets should be the same as the ones returned by the bidding service on initialize. + fn try_from(value: PayoutInfo, relay_sets: &[RelaySet]) -> Option { + let relay_set_index = relay_sets.iter().position(|r| r == &value.relays)?; + Some(Self { + relay_set_index, + payout_tx_value: value.payout_tx_value.to_le_bytes(), + subsidy: value.subsidy.to_le_bytes(), + }) + } + + /// If it fails to find the relay set, returns None. + /// relay_sets should be the same as the ones returned by the bidding service on initialize. + #[allow(clippy::wrong_self_convention)] + fn into_play_info(&self, relay_sets: &[RelaySet]) -> Option { + Some(PayoutInfo { + relays: relay_sets.get(self.relay_set_index)?.clone(), + payout_tx_value: U256::from_le_bytes(self.payout_tx_value), + subsidy: I256::from_le_bytes(self.subsidy), + }) + } +} + +#[derive(Debug, Copy, Clone, ZeroCopySend)] #[type_name("SlotBidderSealBidCommandRPC")] #[repr(C)] pub struct SlotBidderSealBidCommandRPC { pub session_id: u64, pub block_id: u64, - pub payout_tx_value: [u8; U256_DATA_LENGTH], - pub subsidy: [u8; U256_DATA_LENGTH], pub seen_competition_bid: Option<[u8; U256_DATA_LENGTH]>, /// When this bid is a reaction so some event (eg: new block, new competition bid) we put here /// the creation time of that event so we can measure our reaction time. pub trigger_creation_time_us: Option, + /// Count of valid payout infos. + pub payout_infos_count: usize, + /// Payout infos beyond payout_infos_count will be ignored. + pub payout_infos: [PayoutInfoRPC; MAX_RELAY_SETS_COUNT], } -impl From for SlotBidderSealBidCommandRPC { - fn from(value: SlotBidderSealBidCommandWithSessionId) -> Self { - Self { +impl SlotBidderSealBidCommandRPC { + pub fn try_from( + value: SlotBidderSealBidCommandWithSessionId, + relay_sets: &[RelaySet], + ) -> Option { + let mut payout_infos = [PayoutInfoRPC::default(); MAX_RELAY_SETS_COUNT]; + let payout_infos_count = value.0.payout_info.len(); + if payout_infos_count > MAX_RELAY_SETS_COUNT { + error!( + payout_infos_count, + MAX_RELAY_SETS_COUNT, "Too many payout infos" + ); + return None; + } + for (index, payout_info_item) in value.0.payout_info.into_iter().enumerate() { + payout_infos[index] = PayoutInfoRPC::try_from(payout_info_item, relay_sets)?; + } + Some(Self { session_id: value.1, block_id: value.0.block_id.0, - payout_tx_value: value.0.payout_tx_value.to_le_bytes(), seen_competition_bid: value.0.seen_competition_bid.map(|k| k.to_le_bytes()), trigger_creation_time_us: value .0 .trigger_creation_time .map(offset_datetime_to_timestamp_us), - subsidy: value.0.subsidy.to_le_bytes(), - } + payout_infos_count, + payout_infos, + }) } -} -impl From for SlotBidderSealBidCommand { - fn from(val: SlotBidderSealBidCommandRPC) -> Self { - SlotBidderSealBidCommand { + pub fn into_slot_bidder_seal_bid_command( + val: &SlotBidderSealBidCommandRPC, + relay_sets: &[RelaySet], + ) -> Option { + let mut payout_info = Vec::new(); + for index in 0..std::cmp::min(val.payout_infos_count, MAX_RELAY_SETS_COUNT) { + payout_info.push(val.payout_infos[index].into_play_info(relay_sets)?); + } + Some(SlotBidderSealBidCommand { block_id: BuiltBlockId(val.block_id), - payout_tx_value: U256::from_le_bytes(val.payout_tx_value), - subsidy: I256::from_le_bytes(val.subsidy), seen_competition_bid: val.seen_competition_bid.map(|k| U256::from_le_bytes(k)), trigger_creation_time: val .trigger_creation_time_us .map(timestamp_us_to_offset_datetime), - } + payout_info, + }) } } diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/proto/bidding_service.proto b/crates/rbuilder-operator/src/bidding_service_wrapper/proto/bidding_service.proto index 9aefad688..eb7a6ee7e 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/proto/bidding_service.proto +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/proto/bidding_service.proto @@ -15,7 +15,7 @@ service BiddingService { // Call after connection before calling anything. This will really create the BiddingService on the server side. // Returns the version info for the server side. - rpc Initialize(LandedBlocksParams) returns (BidderVersionInfo); + rpc Initialize(InitParams) returns (InitReturn); // BiddingService rpc CreateSlotBidder(CreateSlotBidderParams) returns (Empty); @@ -25,14 +25,6 @@ service BiddingService { rpc UpdateFailedReadingNewLandedBlocks(Empty) returns (Empty); } - -// Mapping of build_info::Version -message BidderVersionInfo { - string git_commit = 1; - string git_ref = 2; - string build_time_utc = 3; -} - message Empty { } @@ -66,5 +58,29 @@ message LandedBlockInfo { } message LandedBlocksParams { - repeated LandedBlockInfo landed_block_info = 1; // Added field name + repeated LandedBlockInfo landed_block_info = 1; +} + +message RelaySet { + // Ids of the relays (MevBoostRelayIDs) + repeated string relay_ids = 2; +} + +message InitParams { + repeated LandedBlockInfo landed_block_info = 1; + // All the relays we bid to. + RelaySet all_relay_ids = 2; +} + +// Mapping of build_info::Version +message BidderVersionInfo { + string git_commit = 1; + string git_ref = 2; + string build_time_utc = 3; +} + +message InitReturn { + BidderVersionInfo bidder_version = 1; + // RelaySets the bidder will use. + repeated RelaySet relay_sets = 2; } \ No newline at end of file diff --git a/crates/rbuilder-operator/src/blocks_processor.rs b/crates/rbuilder-operator/src/blocks_processor.rs index 785f25a29..59180d211 100644 --- a/crates/rbuilder-operator/src/blocks_processor.rs +++ b/crates/rbuilder-operator/src/blocks_processor.rs @@ -5,7 +5,8 @@ use jsonrpsee::core::{client::ClientT, traits::ToRpcParams}; use rbuilder::{ building::BuiltBlockTrace, live_builder::{ - block_output::bidding_service_interface::BidObserver, payload_events::MevBoostSlotData, + block_output::bidding_service_interface::{BidObserver, RelaySet}, + payload_events::MevBoostSlotData, }, utils::error_storage::store_error_event, }; @@ -74,16 +75,26 @@ pub struct BlockProcessorDelayedPayments { type ConsumeBuiltBlockRequest = ( BlocksProcessorHeader, + // ordersClosedAt String, + // sealedAt String, + // commitedBundles Vec, + // allBundles Vec, + // usedSbundles Vec, alloy_rpc_types_beacon::relay::BidTrace, + // builderName String, + // trueBidValue U256, + // bestBidValue U256, Vec, + // Relays + Vec, ); /// Struct to avoid copying ConsumeBuiltBlockRequest since HttpClient::request eats the parameter. @@ -139,6 +150,7 @@ impl BlocksProcessorClient { built_block_trace: &BuiltBlockTrace, builder_name: String, best_bid_value: U256, + relays: RelaySet, ) -> eyre::Result<()> { let execution_payload_v1 = match submit_block_request { AlloySubmitBlockRequest::Capella(request) => &request.execution_payload.payload_inner, @@ -236,6 +248,11 @@ impl BlocksProcessorClient { built_block_trace.true_bid_value, best_bid_value, delayed_payments, + relays + .relays() + .iter() + .map(|relay| relay.to_string()) + .collect(), ); let request = ConsumeBuiltBlockRequestArc::new(params); let backoff = backoff(); @@ -360,9 +377,11 @@ impl built_block_trace: Arc, builder_name: String, best_bid_value: U256, + relays: &RelaySet, ) { let client = self.client.clone(); let parent_span = Span::current(); + let relays = relays.clone(); tokio::spawn(async move { let block_processor_result = client .submit_built_block( @@ -370,6 +389,7 @@ impl &built_block_trace, builder_name, best_bid_value, + relays, ) .await; if let Err(err) = block_processor_result { diff --git a/crates/rbuilder-operator/src/flashbots_config.rs b/crates/rbuilder-operator/src/flashbots_config.rs index d4239e4bd..4e245b6f1 100644 --- a/crates/rbuilder-operator/src/flashbots_config.rs +++ b/crates/rbuilder-operator/src/flashbots_config.rs @@ -16,7 +16,9 @@ use rbuilder::{ }, live_builder::{ base_config::BaseConfig, - block_output::bidding_service_interface::{BidObserver, LandedBlockInfo}, + block_output::bidding_service_interface::{ + BidObserver, BiddingService, LandedBlockInfo, RelaySet, + }, cli::LiveBuilderConfig, config::{ build_backtest_block_ordering_builder, create_builder_from_sink, create_builders, @@ -134,7 +136,11 @@ impl LiveBuilderConfig for FlashbotsConfig { create_wallet_balance_watcher(provider.clone(), &self.base_config).await?; let bidding_service = self - .create_bidding_service(&landed_blocks, cancellation_token.clone()) + .create_bidding_service( + &landed_blocks, + self.l1_config.relays_ids(), + cancellation_token.clone(), + ) .await?; let bid_observer = self.create_bid_observer(&cancellation_token).await?; @@ -143,6 +149,7 @@ impl LiveBuilderConfig for FlashbotsConfig { create_sink_factory_and_relays( &self.base_config, &self.l1_config, + bidding_service.relay_sets().to_vec(), wallet_balance_watcher, bid_observer, bidding_service.clone(), @@ -240,11 +247,13 @@ impl FlashbotsConfig { pub async fn create_bidding_service( &self, landed_blocks_history: &[LandedBlockInfo], + all_relay_ids: RelaySet, cancellation_token: CancellationToken, ) -> eyre::Result> { let bidding_service_client = BiddingServiceClientAdapter::new( &self.bidding_service_ipc_path, landed_blocks_history, + all_relay_ids, cancellation_token, ) .await @@ -435,6 +444,7 @@ impl BidObserver for RbuilderOperatorBidObserver { built_block_trace: Arc, builder_name: String, best_bid_value: U256, + relays: &RelaySet, ) { if let Some(p) = self.block_processor.as_ref() { p.block_submitted( @@ -443,6 +453,7 @@ impl BidObserver for RbuilderOperatorBidObserver { built_block_trace.clone(), builder_name.clone(), best_bid_value, + relays, ) } if let Some(p) = self.tbv_pusher.as_ref() { @@ -452,6 +463,7 @@ impl BidObserver for RbuilderOperatorBidObserver { built_block_trace, builder_name, best_bid_value, + relays, ) } } diff --git a/crates/rbuilder-operator/src/true_block_value_push/best_true_value_observer.rs b/crates/rbuilder-operator/src/true_block_value_push/best_true_value_observer.rs index 530e2a9d5..8e867f2fd 100644 --- a/crates/rbuilder-operator/src/true_block_value_push/best_true_value_observer.rs +++ b/crates/rbuilder-operator/src/true_block_value_push/best_true_value_observer.rs @@ -3,7 +3,8 @@ use alloy_signer_local::PrivateKeySigner; use rbuilder::{ building::BuiltBlockTrace, live_builder::{ - block_output::bidding_service_interface::BidObserver, payload_events::MevBoostSlotData, + block_output::bidding_service_interface::{BidObserver, RelaySet}, + payload_events::MevBoostSlotData, }, }; use redis::RedisError; @@ -78,6 +79,7 @@ impl BidObserver for BestTrueValueObserver { built_block_trace: Arc, builder_name: String, _best_bid_value: alloy_primitives::U256, + _relays: &RelaySet, ) { let block_info = BuiltBlockInfo::new( slot_data.block(), diff --git a/crates/rbuilder/src/building/built_block_trace.rs b/crates/rbuilder/src/building/built_block_trace.rs index 30ff6d994..0cb20628e 100644 --- a/crates/rbuilder/src/building/built_block_trace.rs +++ b/crates/rbuilder/src/building/built_block_trace.rs @@ -44,6 +44,8 @@ pub struct BuiltBlockTrace { pub fill_time: Duration, pub finalize_time: Duration, pub finalize_adjust_time: Duration, + /// Overhead added by creating the MultiPrefinalizedBlock which makes some extra copies. + pub multi_bid_copy_duration: Duration, pub root_hash_time: Duration, /// Value we saw in the competition when we decided to make this bid. pub seen_competition_bid: Option, @@ -99,6 +101,7 @@ impl BuiltBlockTrace { picked_by_sealer_at: OffsetDateTime::now_utc(), build_block_id, subsidy: I256::ZERO, + multi_bid_copy_duration: Duration::ZERO, } } diff --git a/crates/rbuilder/src/live_builder/block_output/bidding_service_interface.rs b/crates/rbuilder/src/live_builder/block_output/bidding_service_interface.rs index f26f3e1cb..aa89c97cb 100644 --- a/crates/rbuilder/src/live_builder/block_output/bidding_service_interface.rs +++ b/crates/rbuilder/src/live_builder/block_output/bidding_service_interface.rs @@ -8,6 +8,7 @@ use bid_scraper::{ }; use derivative::Derivative; use mockall::automock; +use rbuilder_primitives::mev_boost::MevBoostRelayID; use time::OffsetDateTime; use tokio_util::sync::CancellationToken; @@ -31,6 +32,7 @@ pub trait BidObserver: std::fmt::Debug { built_block_trace: Arc, builder_name: String, best_bid_value: U256, + relays: &RelaySet, ); } @@ -45,6 +47,7 @@ impl BidObserver for NullBidObserver { _built_block_trace: Arc, _builder_name: String, _best_bid_value: U256, + _relays: &RelaySet, ) { } } @@ -100,16 +103,48 @@ impl BuiltBlockDescriptorForSlotBidder { } } +/// A set of relays that get the same bid. +#[derive(Clone, Eq, PartialEq, Debug, Hash)] +pub struct RelaySet { + /// sorted alphabetically to make eq work + relays: Vec, +} + +impl RelaySet { + pub fn new(mut relays: Vec) -> Self { + relays.sort(); + Self { relays } + } + + pub fn relays(&self) -> &[MevBoostRelayID] { + &self.relays + } +} + +impl From> for RelaySet { + fn from(relays: Vec) -> Self { + Self::new(relays) + } +} + #[derive(Clone, Eq, PartialEq, Debug)] -pub struct SlotBidderSealBidCommand { - pub block_id: BuiltBlockId, +pub struct PayoutInfo { + /// Relays that should get this bid. + pub relays: RelaySet, pub payout_tx_value: U256, /// Subsidy used in the bid. pub subsidy: I256, +} + +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct SlotBidderSealBidCommand { + pub block_id: BuiltBlockId, pub seen_competition_bid: Option, /// When this bid is a reaction so some event (eg: new block, new competition bid) we put here /// the creation time of that event so we can measure our reaction time. pub trigger_creation_time: Option, + /// All the different bids to be made. + pub payout_info: Vec, } #[automock] @@ -156,6 +191,9 @@ pub trait BiddingService: Send + Sync { block_seal_handle: Box, cancel: CancellationToken, ) -> Arc; + /// set of all RelaySet that will be used to bid. + /// Not &[RelaySet] because it caused problems with some Mutex. + fn relay_sets(&self) -> Vec; fn observe_relay_bids(&self, bid: ScrapedRelayBlockBidWithStats); @@ -167,6 +205,7 @@ pub trait BiddingService: Send + Sync { pub struct BiddingService2BidSender { inner: Arc, } + impl BiddingService2BidSender { pub fn new(inner: Arc) -> Self { Self { inner } diff --git a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs index 0d7ee2dfa..514616200 100644 --- a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs +++ b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs @@ -1,6 +1,8 @@ use crate::{ building::builders::Block, - live_builder::payload_events::MevBoostSlotData, + live_builder::{ + block_output::bidding_service_interface::RelaySet, payload_events::MevBoostSlotData, + }, mev_boost::{ sign_block_for_relay, BLSBlockSigner, MevBoostRelayBidSubmitter, RelayError, RelaySlotData, SubmitBlockErr, @@ -86,7 +88,15 @@ impl BlockBuildingSink for PendingBlockCellToBlockBuildingSink { } } +/// Sink that can sends a block to a specific relay set. +/// A little ugly add the Relay concept at this level.... +#[automock] +pub trait MultiRelayBlockBuildingSink: std::fmt::Debug + Send + Sync { + fn new_block(&self, relay_set: RelaySet, block: Block); +} + /// Final destination of blocks (eg: submit to the relays). +/// The destination of the blocks is abstracted, can be a single relay or a set of relays. #[automock] pub trait BlockBuildingSink: std::fmt::Debug + Send + Sync { fn new_block(&self, block: Block); @@ -141,10 +151,27 @@ async fn run_submit_to_relays_job( ) -> Option { let mut res = None; + let relay_set = RelaySet::new( + relays + .iter() + .map(|relay| relay.id()) + .cloned() + .collect::>(), + ); + let (regular_relays, optimistic_relays) = relays .into_iter() .partition::, _>(|relay| !relay.optimistic()); + let regular_relays_ids = regular_relays + .iter() + .map(|relay| relay.id()) + .collect::>(); + let optimistic_relays_ids = optimistic_relays + .iter() + .map(|relay| relay.id()) + .collect::>(); + let mut last_bid_hash = None; 'submit: loop { tokio::select! { @@ -221,12 +248,24 @@ async fn run_submit_to_relays_job( payload_id = slot_data.payload_id, hash = ?block.sealed_block.hash(), gas = block.sealed_block.gas_used, - txs = block.sealed_block.body().transactions.len(), - bundles, + block_id = block.trace.build_block_id.0, builder_name = block.builder_name, + regular_relays_ids = ?regular_relays_ids, + optimistic_relays_ids = ?optimistic_relays_ids, + ); + info!( + parent: &submission_span, + available_orders_statistics = ?block.trace.available_orders_statistics, + considered_orders_statistics = ?block.trace.considered_orders_statistics, + failed_orders_statistics = ?block.trace.failed_orders_statistics, + filtered_build_considered_orders_statistics = ?block.trace.filtered_build_considered_orders_statistics, + filtered_build_failed_orders_statistics = ?block.trace.filtered_build_failed_orders_statistics, + bundles, + txs = block.sealed_block.body().transactions.len(), fill_time_ms = duration_ms(block.trace.fill_time), finalize_time_ms = duration_ms(block.trace.finalize_time), finalize_adjust_time_ms = duration_ms(block.trace.finalize_adjust_time), + multi_bid_copy_duration_ms = duration_ms(block.trace.multi_bid_copy_duration), l1_orders_closed_at = ?block.trace.orders_closed_at, l2_chosen_as_best_at = ?block.trace.chosen_as_best_at, l3_sent_to_bidder = ?block.trace.sent_to_bidder, @@ -235,15 +274,6 @@ async fn run_submit_to_relays_job( l6_picked_by_sealer_at = ?block.trace.picked_by_sealer_at, l7_orders_sealed_at = ?block.trace.orders_sealed_at, latency_ms = latency.whole_milliseconds(), - block_id = block.trace.build_block_id.0, - ); - info!( - parent: &submission_span, - available_orders_statistics = ?block.trace.available_orders_statistics, - considered_orders_statistics = ?block.trace.considered_orders_statistics, - failed_orders_statistics = ?block.trace.failed_orders_statistics, - filtered_build_considered_orders_statistics = ?block.trace.filtered_build_considered_orders_statistics, - filtered_build_failed_orders_statistics = ?block.trace.filtered_build_failed_orders_statistics, "Submitting bid", ); inc_initiated_submissions(optimistic_config.is_some()); @@ -335,6 +365,7 @@ async fn run_submit_to_relays_job( Arc::new(block.trace), builder_name, bid_metadata.value.top_competitor_bid.unwrap_or_default(), + &relay_set, ); }) } @@ -622,31 +653,35 @@ async fn submit_bid_to_the_relay( } } -/// Real life BuilderSinkFactory that send the blocks to the Relay +/// Real life MultiRelayBlockBuildingSink that send the blocks to the Relays #[derive(Debug)] pub struct RelaySubmitSinkFactory { submission_config: Arc, relays: Vec, + /// We expect to get bids only for this specific relay sets. + relay_sets: Vec, } impl RelaySubmitSinkFactory { pub fn new( submission_config: SubmissionConfig, relays: Vec, + relay_sets: Vec, ) -> Self { Self { submission_config: Arc::new(submission_config), relays, + relay_sets, } } + /// Creates a run_submit_to_relays_job task per RelaySet and returns a RelaySetDispatcher that will submit the blocks to associated task. + /// Filters out RelaySet with no registered relays. pub fn create_builder_sink( &self, slot_data: MevBoostSlotData, cancel: CancellationToken, - ) -> Box { - let pending_block_cell = Arc::new(PendingBlockCell::default()); - + ) -> Box { // Collect all relays to submit to. let mut relays = Vec::new(); for relay in &self.relays { @@ -658,23 +693,67 @@ impl RelaySubmitSinkFactory { relays.push(relay.clone()); } } - - // Spawn the task to submit to selected relays and keep track of subsidized blocks. - tokio::spawn({ - let pending = pending_block_cell.clone(); - let config = self.submission_config.clone(); - async move { - let last_info = - run_submit_to_relays_job(pending, slot_data, relays, config, cancel).await; - if let Some(info) = last_info { - if info.bid_value > info.true_bid_value { - inc_subsidized_blocks(false); - add_subsidy_value(info.bid_value - info.true_bid_value, false); + let mut sinks = HashMap::<_, Box>::default(); + for relay_set in &self.relay_sets { + let relays_in_set = relays + .iter() + .filter(|relay| relay_set.relays().contains(relay.id())) + .cloned() + .collect::>(); + if !relays_in_set.is_empty() { + let pending_block_cell = Arc::new(PendingBlockCell::default()); + // Spawn the task to submit to selected relays and keep track of subsidized blocks. + tokio::spawn({ + let pending = pending_block_cell.clone(); + let config = self.submission_config.clone(); + let slot_data = slot_data.clone(); + let cancel = cancel.clone(); + async move { + let last_info = run_submit_to_relays_job( + pending, + slot_data, + relays_in_set, + config, + cancel, + ) + .await; + if let Some(info) = last_info { + if info.bid_value > info.true_bid_value { + inc_subsidized_blocks(false); + add_subsidy_value(info.bid_value - info.true_bid_value, false); + } + } } - } + }); + sinks.insert( + relay_set.clone(), + Box::new(PendingBlockCellToBlockBuildingSink { pending_block_cell }), + ); } - }); + } + Box::new(RelaySetDispatcher::new(sinks)) + } +} + +/// Implements MultiRelayBlockBuildingSink by dispatching the blocks to the corresponding relay sets. + +#[derive(Debug)] +struct RelaySetDispatcher { + sinks: HashMap>, +} - Box::new(PendingBlockCellToBlockBuildingSink { pending_block_cell }) +impl RelaySetDispatcher { + pub fn new(sinks: HashMap>) -> Self { + Self { sinks } + } +} + +impl MultiRelayBlockBuildingSink for RelaySetDispatcher { + fn new_block(&self, relay_set: RelaySet, block: Block) { + if let Some(sink) = self.sinks.get(&relay_set) { + sink.new_block(block); + } else { + error!(relay_set = ?relay_set, "Relay set not found"); + } } } diff --git a/crates/rbuilder/src/live_builder/block_output/true_value_bidding_service.rs b/crates/rbuilder/src/live_builder/block_output/true_value_bidding_service.rs index ee70a8f56..8b2a85c93 100644 --- a/crates/rbuilder/src/live_builder/block_output/true_value_bidding_service.rs +++ b/crates/rbuilder/src/live_builder/block_output/true_value_bidding_service.rs @@ -1,19 +1,51 @@ use std::sync::Arc; +use ahash::{HashMap, HashSet}; use alloy_primitives::U256; +use rbuilder_primitives::mev_boost::MevBoostRelayID; use tokio_util::sync::CancellationToken; use super::bidding_service_interface::*; +/// Bidding service that bids the true block value + subsidy to all relays. pub struct NewTrueBlockValueBiddingService { - pub subsidy: U256, - pub slot_delta_to_start_bidding: time::Duration, + slot_delta_to_start_bidding: time::Duration, + relay_sets_subsidies: HashMap, +} + +impl NewTrueBlockValueBiddingService { + pub fn new( + subsidy: U256, + subsidy_overrides: HashMap, + slot_delta_to_start_bidding: time::Duration, + all_relays: RelaySet, + ) -> Self { + let mut default_relay_set: HashSet = + all_relays.relays().iter().cloned().collect(); + let mut relay_sets_subsidies = HashMap::default(); + for (relay, subsidy) in subsidy_overrides { + default_relay_set.remove(&relay); + relay_sets_subsidies.insert(RelaySet::new(vec![relay]), subsidy); + } + if !default_relay_set.is_empty() { + relay_sets_subsidies.insert( + RelaySet::new(default_relay_set.into_iter().collect()), + subsidy, + ); + } + + Self { + slot_delta_to_start_bidding, + relay_sets_subsidies, + } + } } pub struct NewTrueBlockValueSlotBidder { - subsidy: U256, bid_start_time: time::OffsetDateTime, block_seal_handle: Box, + /// Will generate one bid per RelaySet + relay_sets_subsidies: HashMap, } impl SlotBidder for NewTrueBlockValueSlotBidder { @@ -23,10 +55,17 @@ impl SlotBidder for NewTrueBlockValueSlotBidder { } self.block_seal_handle.seal_bid(SlotBidderSealBidCommand { block_id: block_descriptor.id, - payout_tx_value: block_descriptor.true_block_value + self.subsidy, seen_competition_bid: None, trigger_creation_time: Some(time::OffsetDateTime::now_utc()), - subsidy: self.subsidy.try_into().unwrap(), + payout_info: self + .relay_sets_subsidies + .iter() + .map(|(relay_set, subsidy)| PayoutInfo { + relays: relay_set.clone(), + payout_tx_value: block_descriptor.true_block_value + subsidy, + subsidy: (*subsidy).try_into().unwrap(), + }) + .collect(), }) } } @@ -41,12 +80,16 @@ impl BiddingService for NewTrueBlockValueBiddingService { ) -> Arc { let bid_start_time = slot_timestamp + self.slot_delta_to_start_bidding; Arc::new(NewTrueBlockValueSlotBidder { - subsidy: self.subsidy, bid_start_time, block_seal_handle, + relay_sets_subsidies: self.relay_sets_subsidies.clone(), }) } + fn relay_sets(&self) -> Vec { + self.relay_sets_subsidies.keys().cloned().collect() + } + fn observe_relay_bids(&self, _bid: ScrapedRelayBlockBidWithStats) {} fn update_new_landed_blocks_detected(&self, _landed_blocks: &[LandedBlockInfo]) {} diff --git a/crates/rbuilder/src/live_builder/block_output/unfinished_block_processing.rs b/crates/rbuilder/src/live_builder/block_output/unfinished_block_processing.rs index dadf90f74..175eca779 100644 --- a/crates/rbuilder/src/live_builder/block_output/unfinished_block_processing.rs +++ b/crates/rbuilder/src/live_builder/block_output/unfinished_block_processing.rs @@ -1,3 +1,4 @@ +use ahash::HashMap; /// Unfinished block processing handles blocks that are produced by block building algorithms. /// /// 1. Block building algorithm produces unfinished blocks `BiddableUnfinishedBlock` and submits it to the `UnfinishedBuiltBlocksInput` @@ -14,7 +15,11 @@ use alloy_primitives::{utils::format_ether, I256, U256}; use derivative::Derivative; use parking_lot::Mutex; -use std::sync::Arc; +use rbuilder_primitives::mev_boost::MevBoostRelayID; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use time::OffsetDateTime; use tracing::{error, info, trace, warn}; @@ -33,10 +38,14 @@ use crate::{ InsertPayoutTxErr, ThreadBlockBuildingContext, }, live_builder::{ - payload_events::MevBoostSlotData, wallet_balance_watcher::WalletBalanceWatcher, + block_output::{ + bidding_service_interface::RelaySet, relay_submit::MultiRelayBlockBuildingSink, + }, + payload_events::MevBoostSlotData, + wallet_balance_watcher::WalletBalanceWatcher, }, provider::StateProviderFactory, - telemetry::add_trigger_to_bid_round_trip_time, + telemetry::{add_block_multi_bid_copy_duration, add_trigger_to_bid_round_trip_time}, utils::sync::Watch, }; @@ -49,7 +58,6 @@ use super::{ relay_submit::RelaySubmitSinkFactory, }; -use super::relay_submit::BlockBuildingSink; use crate::live_builder::building::built_block_cache::BuiltBlockCache; /// UnfinishedBlockBuildingSinkFactory creates UnfinishedBuiltBlocksInput @@ -70,6 +78,8 @@ pub struct UnfinishedBuiltBlocksInputFactory

{ /// If set to true blocks will be finalized before notifying BiddingService /// This reduces latency for creating block with concrete proposer payout value. adjust_finalized_blocks: bool, + /// relay sets well get on bids. + relay_sets: Vec, } impl UnfinishedBuiltBlocksInputFactory

{ @@ -78,12 +88,14 @@ impl UnfinishedBuiltBlocksInputFactory

{ block_sink_factory: RelaySubmitSinkFactory, wallet_balance_watcher: WalletBalanceWatcher

, adjust_finalized_blocks: bool, + relay_sets: Vec, ) -> Self { Self { bidding_service, block_sink_factory, wallet_balance_watcher, adjust_finalized_blocks, + relay_sets, } } @@ -107,14 +119,11 @@ impl UnfinishedBuiltBlocksInputFactory

{ } } - let finished_block_sink = self - .block_sink_factory - .create_builder_sink(slot_data.clone(), cancel.clone()); - let input = UnfinishedBuiltBlocksInput::new( built_block_cache, - finished_block_sink, + slot_data.relay_registrations.keys().cloned().collect(), self.adjust_finalized_blocks, + self.relay_sets.clone(), cancel.clone(), ); @@ -131,12 +140,32 @@ impl UnfinishedBuiltBlocksInputFactory

{ .spawn(move || input_clone.run_prefinalize_thread(slot_bidder)) .unwrap(); - let input_clone = input.clone(); - std::thread::Builder::new() - .name("finalize_worker".into()) - .spawn(move || input_clone.run_finalize_thread()) - .unwrap(); - + let block_sink: Arc = self + .block_sink_factory + .create_builder_sink(slot_data.clone(), cancel.clone()) + .into(); + + for (relay_set, last_finalize_command) in input.last_finalize_commands.iter() { + let finalized_blocks = input.pre_finalized_multi_blocks.clone(); + let cancellation_token = cancel.clone(); + let adjust_finalized_blocks = self.adjust_finalized_blocks; + let relay_set = relay_set.clone(); + let last_finalize_command = last_finalize_command.clone(); + let block_sink = block_sink.clone(); + std::thread::Builder::new() + .name("finalize_worker".into()) + .spawn(move || { + UnfinishedBuiltBlocksInput::run_finalize_thread( + relay_set, + block_sink, + finalized_blocks, + last_finalize_command, + adjust_finalized_blocks, + cancellation_token, + ) + }) + .unwrap(); + } input } } @@ -177,6 +206,7 @@ impl PrefinalizedBlockInner { } } +/// Prefinalized block ready to be finalized for a specific relay set whose finalizing thread is listening to finalize_input. #[derive(Debug, Clone)] struct PrefinalizedBlock { block_id: BuiltBlockId, @@ -189,6 +219,7 @@ impl PrefinalizedBlock { fn new( block_id: BuiltBlockId, chosen_as_best_at: OffsetDateTime, + sent_to_bidder: OffsetDateTime, block_building_helper: Box, local_ctx: ThreadBlockBuildingContext, ) -> Self { @@ -198,7 +229,7 @@ impl PrefinalizedBlock { block_building_helper, local_ctx: Some(local_ctx), })), - sent_to_bidder: OffsetDateTime::now_utc(), + sent_to_bidder, chosen_as_best_at, } } @@ -214,48 +245,154 @@ struct FinalizeCommand { bid_received_at: OffsetDateTime, /// Bid sent to the sealer thread sent_to_sealer: OffsetDateTime, + /// Overhead added by creating the MultiPrefinalizedBlock which makes some extra copies. + multi_bid_copy_duration: Duration, +} + +#[derive(Debug, Clone)] +struct PrefinalizedBlockWithFinalizeInput { + pub prefinalized_block: PrefinalizedBlock, + pub finalize_input: Arc>, +} + +/// PrefinalizedBlock that we should use for each relay set since the each have their one finalize thread and data. +#[derive(Debug, Clone)] +struct MultiPrefinalizedBlock { + pub block_id: BuiltBlockId, + pub prefinalized_blocks_by_relay_set: HashMap, + pub creation_duration: Duration, +} + +impl MultiPrefinalizedBlock { + /// Creates one PrefinalizedBlock per RelaySet cloning block_building_helper/local_ctx for all but the last one. + fn new( + block_id: BuiltBlockId, + last_finalize_commands: &HashMap>>, + chosen_as_best_at: OffsetDateTime, + sent_to_bidder: OffsetDateTime, + block_building_helper: Box, + local_ctx: ThreadBlockBuildingContext, + ) -> Self { + let start = Instant::now(); + let last_index = last_finalize_commands.len() - 1; + let mut prefinalized_blocks_by_relay_set = HashMap::default(); + + let mut insert_prefinalized_block = + |block_building_helper, local_ctx, relay_set, finalize_input| { + let prefinalized_block = PrefinalizedBlock::new( + block_id, + chosen_as_best_at, + sent_to_bidder, + block_building_helper, + local_ctx, + ); + prefinalized_blocks_by_relay_set.insert( + relay_set, + PrefinalizedBlockWithFinalizeInput { + prefinalized_block, + finalize_input, + }, + ); + }; + for (index, (relay_set, last_finalize_command)) in last_finalize_commands.iter().enumerate() + { + if index != last_index { + insert_prefinalized_block( + block_building_helper.box_clone(), + local_ctx.clone(), + relay_set.clone(), + last_finalize_command.clone(), + ); + } else { + insert_prefinalized_block( + block_building_helper, + local_ctx, + relay_set.clone(), + last_finalize_command.clone(), + ); + break; + } + } + + let creation_duration = start.elapsed(); + add_block_multi_bid_copy_duration(creation_duration); + Self { + block_id, + prefinalized_blocks_by_relay_set, + creation_duration, + } + } } +/// UnfinishedBuiltBlocksInput is the main struct that handles the unfinished blocks. +/// A run_finalize_thread is spawned for each relay set. +/// Has 2 modes: +/// - adjust_finalized_blocks: we should end with only using this mode. +/// New blocks are stored on last_unfinalized_block +/// run_prefinalize_thread polls last_unfinalized_block prefinalizes them (calling block_building_helper.finalize_block) and adds them to finalized_blocks +/// When we get a bid (seal_command) we search for the corresponding prefinalized block in finalized_blocks and set a new last_finalize_command for the associated relay set. +/// run_finalize_thread polls last_finalize_command and finalizes them by adjusting the payout value (command.finalize_block calls block_building_helper.adjust_finalized_block) +/// block is send! self.block_building_sink.new_block +/// +/// - !adjust_finalized_blocks: to be deprecated. +/// New blocks are stored on last_unfinalized_block +/// run_prefinalize_thread polls last_unfinalized_block adds them to finalized_blocks (no work is done here) +/// When we get a bid (seal_command) we search for the corresponding prefinalized block in finalized_blocks and set a new last_finalize_command for the associated relay set. +/// run_finalize_thread polls last_finalize_command and seals if from scratch (command.finalize_block calls block_building_helper.finalize_block) +/// block is send! self.block_building_sink.new_block #[derive(Derivative, Clone)] #[derivative(Debug)] pub struct UnfinishedBuiltBlocksInput { + /// We call update_from_new_unfinished_block for each new_block. built_block_cache: Arc, - best_block_from_algorithms: Arc>, + /// Last unfinalized block we got on new_block. + /// It's waiting to be prefinalized by run_prefinalize_thread. #[derivative(Debug = "ignore")] last_unfinalized_block: Arc>, - unused_prefinalized_blocks: Arc>>, + /// We keep old PrefinalizedBlockInner to recycle the ThreadBlockBuildingContext for new blocks. + unused_prefinalized_block_inners: Arc>>>>, last_block_id: Arc>, - finalized_blocks: Arc>>, + /// run_prefinalize_thread leaves blocks here (can be prefinalized or not depending on adjust_finalized_blocks) + pre_finalized_multi_blocks: Arc>>, - last_finalize_command: Arc>, + /// Set by seal_command. + /// There is one spawned run_finalize_thread polling the asociated Watch. + /// Each run_finalize_thread finalizes the FinalizeCommands (bid adjust or full seal depending on adjust_finalized_blocks) and sends them to block_building_sink. + last_finalize_commands: HashMap>>, cancellation_token: CancellationToken, - #[derivative(Debug = "ignore")] - block_building_sink: Arc, + /// See [UnfinishedBuiltBlocksInput] comments. adjust_finalized_blocks: bool, + /// Registered relays for this slot, useful to avoid sealing bids for relays that are not registered for this slot. + registered_relays: Vec, } impl UnfinishedBuiltBlocksInput { fn new( built_block_cache: Arc, - block_building_sink: Box, + registered_relays: Vec, adjust_finalized_blocks: bool, + relay_sets: Vec, cancellation_token: CancellationToken, ) -> Self { + let last_finalize_commands = relay_sets + .iter() + .map(|relay_set| (relay_set.clone(), Arc::new(Watch::new()))) + .collect(); Self { built_block_cache, best_block_from_algorithms: Arc::new(Mutex::new(BestBlockFromAlgorithms::default())), last_unfinalized_block: Arc::new(Watch::new()), - unused_prefinalized_blocks: Arc::new(Mutex::new(Vec::new())), + unused_prefinalized_block_inners: Arc::new(Mutex::new(Vec::new())), last_block_id: Arc::new(Mutex::new(0)), - finalized_blocks: Arc::new(Mutex::new(Vec::new())), - last_finalize_command: Arc::new(Watch::new()), + pre_finalized_multi_blocks: Arc::new(Mutex::new(Vec::new())), + last_finalize_commands, cancellation_token, - block_building_sink: block_building_sink.into(), adjust_finalized_blocks, + registered_relays, } } @@ -300,37 +437,68 @@ impl UnfinishedBuiltBlocksInput { trace!(?bid, "Received seal command"); - let mut unused_blocks = Vec::new(); - let mut found_block: Option = None; + let mut unused_multi_blocks = Vec::new(); + let mut found_multi_block: Option = None; { - let mut finalized_blocks = self.finalized_blocks.lock(); + let mut pre_finalized_blocks = self.pre_finalized_multi_blocks.lock(); let mut i = 0; - while i < finalized_blocks.len() { - if finalized_blocks[i].block_id.0 < bid.block_id.0 { - unused_blocks.push(finalized_blocks.remove(i)); + while i < pre_finalized_blocks.len() { + if pre_finalized_blocks[i].block_id.0 < bid.block_id.0 { + unused_multi_blocks.push(pre_finalized_blocks.remove(i)); continue; } - if finalized_blocks[i].block_id == bid.block_id { - found_block = Some(finalized_blocks[i].clone()); + if pre_finalized_blocks[i].block_id == bid.block_id { + found_multi_block = Some(pre_finalized_blocks[i].clone()); break; } i += 1; } } - self.unused_prefinalized_blocks - .lock() - .append(&mut unused_blocks); - if let Some(prefinalized_block) = found_block { - let sent_to_sealer = OffsetDateTime::now_utc(); - let finalize_command = FinalizeCommand { - prefinalized_block, - value: bid.payout_tx_value, - seen_competition_bid: bid.seen_competition_bid, - bid_received_at, - sent_to_sealer, - subsidy: bid.subsidy, - }; - self.last_finalize_command.set(finalize_command); + + { + let mut unused_prefinalized_block_inners = self.unused_prefinalized_block_inners.lock(); + for unused_block in unused_multi_blocks { + if let Some(prefinalized_block_with_finalize_input) = unused_block + .prefinalized_blocks_by_relay_set + .values() + .next() + { + unused_prefinalized_block_inners.push( + prefinalized_block_with_finalize_input + .prefinalized_block + .inner + .clone(), + ); + } + } + } + + if let Some(mut multi_prefinalized_block) = found_multi_block { + for payout_info in bid.payout_info.into_iter() { + if let Some(prefinalized_block_with_finalize_input) = multi_prefinalized_block + .prefinalized_blocks_by_relay_set + .remove(&payout_info.relays) + { + let sent_to_sealer = OffsetDateTime::now_utc(); + let finalize_command = FinalizeCommand { + prefinalized_block: prefinalized_block_with_finalize_input + .prefinalized_block, + value: payout_info.payout_tx_value, + seen_competition_bid: bid.seen_competition_bid, + bid_received_at, + sent_to_sealer, + subsidy: payout_info.subsidy, + multi_bid_copy_duration: multi_prefinalized_block.creation_duration, + }; + prefinalized_block_with_finalize_input + .finalize_input + .set(finalize_command); + } else { + error!( + "Seal command discarded, last_finalize_command was not found for relay set" + ); + } + } } else { warn!("Seal command discarded, prefinalized block was not found"); } @@ -341,8 +509,8 @@ impl UnfinishedBuiltBlocksInput { impl UnfinishedBuiltBlocksInput { fn local_ctx(&self) -> ThreadBlockBuildingContext { // we try to reuse ThreadBlockBuildingContext from previously built blocks (as they contain useful caches) - if let Some(last_prefin_block) = self.unused_prefinalized_blocks.lock().pop() { - let mut inner = last_prefin_block.inner.lock(); + if let Some(last_prefin_block) = self.unused_prefinalized_block_inners.lock().pop() { + let mut inner = last_prefin_block.lock(); inner.local_ctx.take().unwrap_or_default() } else { ThreadBlockBuildingContext::default() @@ -398,13 +566,20 @@ impl UnfinishedBuiltBlocksInput { } }; } - let prefinalized_result = PrefinalizedBlock::new( + + //let multi_prefinalized_block = MultiPrefinalizedBlock::new_single_prefinalized_block( + let multi_prefinalized_block = MultiPrefinalizedBlock::new( block_id, + &self.last_finalize_commands, chosen_as_best_at, + OffsetDateTime::now_utc(), block_building_helper, local_ctx, ); - self.finalized_blocks.lock().push(prefinalized_result); + self.pre_finalized_multi_blocks + .lock() + .push(multi_prefinalized_block); + // Must update creation time here because since constructor we did some stuff and we want to measure only bidding core timings. block_descriptor.creation_time = OffsetDateTime::now_utc(); slot_bidder.notify_new_built_block(block_descriptor); @@ -416,13 +591,19 @@ impl UnfinishedBuiltBlocksInput { // finalize_worker impl UnfinishedBuiltBlocksInput { - fn run_finalize_thread(self) { + fn run_finalize_thread( + relay_set: RelaySet, + block_building_sink: Arc, + pre_finalized_blocks: Arc>>, + last_finalize_command: Arc>, + adjust_finalized_blocks: bool, + cancellation_token: CancellationToken, + ) { loop { - if self.cancellation_token.is_cancelled() { + if cancellation_token.is_cancelled() { break; } - let finalize_command = if let Some(command) = self.last_finalize_command.wait_for_data() - { + let finalize_command = if let Some(command) = last_finalize_command.wait_for_data() { command } else { continue; @@ -443,7 +624,7 @@ impl UnfinishedBuiltBlocksInput { finalize_command.value, finalize_command.subsidy, finalize_command.seen_competition_bid, - self.adjust_finalized_blocks, + adjust_finalized_blocks, ) { Ok(Some(result)) => { trace!("Finalized block"); @@ -455,11 +636,11 @@ impl UnfinishedBuiltBlocksInput { } Err(err) => { // remove this block from a list of prefinalized blocks as it can be inconsistent - self.finalized_blocks.lock().retain(|block| { + pre_finalized_blocks.lock().retain(|block| { block.block_id != finalize_command.prefinalized_block.block_id }); - let log_error = if self.adjust_finalized_blocks { + let log_error = if adjust_finalized_blocks { // always log this error as its not expected when adjusting blocks true } else { @@ -475,12 +656,13 @@ impl UnfinishedBuiltBlocksInput { } }; result.block.trace.bid_received_at = finalize_command.bid_received_at; + result.block.trace.multi_bid_copy_duration = finalize_command.multi_bid_copy_duration; result.block.trace.sent_to_sealer = finalize_command.sent_to_sealer; result.block.trace.picked_by_sealer_at = picked_by_sealer_at; result.block.trace.chosen_as_best_at = finalize_command.prefinalized_block.chosen_as_best_at; result.block.trace.sent_to_bidder = finalize_command.prefinalized_block.sent_to_bidder; - self.block_building_sink.new_block(result.block); + block_building_sink.new_block(relay_set.clone(), result.block); } } } diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index 5992e29c0..e5dce83e6 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -33,7 +33,8 @@ use crate::{ live_builder::{ base_config::default_ip, block_output::{ - bidding_service_interface::BiddingService2BidSender, relay_submit::OptimisticV3Config, + bidding_service_interface::{BiddingService2BidSender, RelaySet}, + relay_submit::OptimisticV3Config, }, cli::LiveBuilderConfig, payload_events::MevBoostSlotDataGenerator, @@ -111,6 +112,13 @@ pub struct BuilderConfig { pub builder: SpecificBuilderConfig, } +#[derive(Debug, Clone, Deserialize, PartialEq, Default)] +#[serde(default, deny_unknown_fields)] +pub struct SubsidyConfig { + pub relay: MevBoostRelayID, + pub value: String, +} + #[serde_as] #[derive(Debug, Clone, Deserialize, PartialEq)] #[serde(default, deny_unknown_fields)] @@ -129,6 +137,9 @@ pub struct Config { pub slot_delta_to_start_bidding_ms: Option, /// Value added to the bids (see TrueBlockValueBiddingService). pub subsidy: Option, + /// Overrides subsidy. + #[serde(default)] + pub subsidy_overrides: Vec, } const DEFAULT_SLOT_DELTA_TO_START_BIDDING_MS: i64 = -8000; @@ -250,6 +261,16 @@ impl L1Config { Ok(()) } + pub fn relays_ids(&self) -> RelaySet { + let mut effective_enabled_relays: std::collections::HashSet = + self.enabled_relays.iter().cloned().collect(); + effective_enabled_relays.extend(self.relays.iter().map(|r| r.name.clone())); + effective_enabled_relays + .into_iter() + .collect::>() + .into() + } + pub fn create_relays( &self, ) -> eyre::Result<( @@ -262,13 +283,11 @@ impl L1Config { relay_configs.insert(relay.name.clone(), relay); } // For backwards compatibility: add all user-configured relays to enabled_relays - let mut effective_enabled_relays: std::collections::HashSet = - self.enabled_relays.iter().cloned().collect(); - effective_enabled_relays.extend(self.relays.iter().map(|r| r.name.clone())); + let effective_enabled_relays = self.relays_ids(); // Create enabled relays let mut submitters = Vec::new(); let mut slot_info_providers = Vec::new(); - for relay_name in effective_enabled_relays.iter() { + for relay_name in effective_enabled_relays.relays().iter() { match relay_configs.get(relay_name) { Some(relay_config) => { let url = match relay_config.url.parse() { @@ -370,6 +389,7 @@ impl L1Config { pub fn create_relays_sealed_sink_factory( &self, chain_spec: Arc, + relay_sets: Vec, bid_observer: Box, ) -> eyre::Result<( RelaySubmitSinkFactory, @@ -438,7 +458,8 @@ impl L1Config { eyre::bail!("No slot info providers provided"); } - let sink_factory = RelaySubmitSinkFactory::new(submission_config, submitters.clone()); + let sink_factory = + RelaySubmitSinkFactory::new(submission_config, submitters.clone(), relay_sets); let adjustment_fee_payers = self .relays @@ -479,13 +500,23 @@ impl LiveBuilderConfig for Config { .unwrap_or(DEFAULT_SLOT_DELTA_TO_START_BIDDING_MS), ); - let bidding_service = Arc::new(NewTrueBlockValueBiddingService { - subsidy: subsidy + let all_relays_set = self.l1_config.relays_ids(); + let mut subsidy_overrides = HashMap::default(); + for subsidy_override in self.subsidy_overrides.iter() { + subsidy_overrides.insert( + subsidy_override.relay.clone(), + parse_ether(&subsidy_override.value)?, + ); + } + let bidding_service = Arc::new(NewTrueBlockValueBiddingService::new( + subsidy .as_ref() .map(|s| parse_ether(s)) .unwrap_or(Ok(U256::ZERO))?, - slot_delta_to_start_bidding: slot_delta_to_start_bidding_ms, - }); + subsidy_overrides, + slot_delta_to_start_bidding_ms, + all_relays_set.clone(), + )); let (wallet_balance_watcher, _) = create_wallet_balance_watcher(provider.clone(), &self.base_config).await?; @@ -494,6 +525,7 @@ impl LiveBuilderConfig for Config { create_sink_factory_and_relays( &self.base_config, &self.l1_config, + bidding_service.relay_sets(), wallet_balance_watcher, Box::new(NullBidObserver {}), bidding_service, @@ -704,6 +736,7 @@ impl Default for Config { ], slot_delta_to_start_bidding_ms: None, subsidy: None, + subsidy_overrides: Vec::new(), } } } @@ -1051,6 +1084,7 @@ where pub async fn create_sink_factory_and_relays

( base_config: &BaseConfig, l1_config: &L1Config, + relay_sets: Vec, wallet_balance_watcher: WalletBalanceWatcher

, bid_observer: Box, bidding_service: Arc, @@ -1063,8 +1097,12 @@ pub async fn create_sink_factory_and_relays

( where P: StateProviderFactory + Clone + 'static, { - let (sink_sealed_factory, slot_info_provider, adjustment_fee_payers) = - l1_config.create_relays_sealed_sink_factory(base_config.chain_spec()?, bid_observer)?; + let (sink_sealed_factory, slot_info_provider, adjustment_fee_payers) = l1_config + .create_relays_sealed_sink_factory( + base_config.chain_spec()?, + relay_sets.clone(), + bid_observer, + )?; if !l1_config.relay_bid_scrapers.is_empty() { let sender = Arc::new(BiddingService2BidSender::new(bidding_service.clone())); @@ -1080,6 +1118,7 @@ where sink_sealed_factory, wallet_balance_watcher, base_config.adjust_finalized_blocks, + relay_sets, ); Ok((sink_factory, slot_info_provider, adjustment_fee_payers)) diff --git a/crates/rbuilder/src/mev_boost/mod.rs b/crates/rbuilder/src/mev_boost/mod.rs index 27e083770..1ca77c523 100644 --- a/crates/rbuilder/src/mev_boost/mod.rs +++ b/crates/rbuilder/src/mev_boost/mod.rs @@ -71,7 +71,7 @@ fn is_ignorable_relay_error(code: StatusCode, text: &str) -> bool { #[derive(Debug, Clone, Deserialize, PartialEq, Eq, Default)] #[serde(deny_unknown_fields)] pub struct RelayConfig { - pub name: String, + pub name: MevBoostRelayID, pub url: String, #[serde(default)] pub grpc_url: Option, @@ -253,7 +253,7 @@ pub struct MevBoostRelayBidSubmitter { impl MevBoostRelayBidSubmitter { pub fn new( client: RelayClient, - id: String, + id: MevBoostRelayID, config: &RelaySubmitConfig, test_relay: bool, ) -> eyre::Result { diff --git a/crates/rbuilder/src/telemetry/metrics/mod.rs b/crates/rbuilder/src/telemetry/metrics/mod.rs index 82957cf35..3ca2cae77 100644 --- a/crates/rbuilder/src/telemetry/metrics/mod.rs +++ b/crates/rbuilder/src/telemetry/metrics/mod.rs @@ -362,6 +362,13 @@ register_metrics! { &[] ) .unwrap(); + pub static BLOCK_MULTI_BID_COPY_DURATION: HistogramVec = HistogramVec::new( + HistogramOpts::new("block_multi_bid_copy_duration", "Block Multi Bid Copy Duration overhead (ms)") + .buckets(exponential_buckets_range(0.01, 50.0, 100)), + &[] + ) + .unwrap(); + pub static ORDER_SIMULATION_TIME: HistogramVec = HistogramVec::new( HistogramOpts::new("order_simulation_time", "Order Simulation Time (ms)") .buckets(exponential_buckets_range(0.01, 200.0, 200)), @@ -369,6 +376,7 @@ register_metrics! { ) .unwrap(); + // E2E tracing metrics // The goal of these two metrics is: // 1. Cover as many lines of code as possible without any gaps. @@ -485,6 +493,12 @@ pub fn inc_order_input_rpc_errors(method: &str) { ORDER_INPUT_RPC_ERROR.with_label_values(&[method]).inc(); } +pub fn add_block_multi_bid_copy_duration(duration: Duration) { + BLOCK_MULTI_BID_COPY_DURATION + .with_label_values(&[]) + .observe(duration_ms(duration)); +} + #[allow(clippy::too_many_arguments)] pub fn add_finalized_block_metrics( built_block_trace: &BuiltBlockTrace, diff --git a/examples/config/rbuilder/config-live-example.toml b/examples/config/rbuilder/config-live-example.toml index 02e2994e2..3fbc2cef5 100644 --- a/examples/config/rbuilder/config-live-example.toml +++ b/examples/config/rbuilder/config-live-example.toml @@ -35,6 +35,13 @@ live_builders = ["mp-ordering", "mgp-ordering", "parallel"] enabled_relays = ["flashbots"] + +subsidy = "0.01" + +[[subsidy_overrides]] +relay = "flashbots_test2" +value = "0.05" + # This can be used with test-relay [[relays]] name = "flashbots_test" @@ -42,6 +49,16 @@ url = "http://localhost:80" mode = "full" max_bid_eth = "0.05" + +# This can be used with test-relay +[[relays]] +name = "flashbots_test2" +url = "http://localhost:80" +mode = "full" +max_bid_eth = "0.05" + + + [[builders]] name = "mgp-ordering" algo = "ordering-builder"