@@ -4,7 +4,7 @@ use parking_lot::Mutex;
44use rbuilder:: {
55 live_builder:: block_output:: bidding_service_interface:: {
66 BiddingService , BlockSealInterfaceForSlotBidder , LandedBlockInfo as RealLandedBlockInfo ,
7- ScrapedRelayBlockBidWithStats , SlotBidder , SlotBlockId ,
7+ RelaySet , ScrapedRelayBlockBidWithStats , SlotBidder , SlotBlockId ,
88 } ,
99 utils:: build_info:: Version ,
1010} ;
@@ -21,17 +21,19 @@ use tokio::sync::mpsc;
2121use tokio_util:: sync:: CancellationToken ;
2222use tonic:: transport:: { Channel , Endpoint , Uri } ;
2323use tower:: service_fn;
24- use tracing:: error;
24+ use tracing:: { error, info } ;
2525
2626use crate :: {
2727 bidding_service_wrapper:: {
2828 bidding_service_client:: BiddingServiceClient ,
29- conversion:: { real2rpc_block_hash, real2rpc_landed_block_info} ,
29+ conversion:: {
30+ real2rpc_block_hash, real2rpc_landed_block_info, real2rpc_relay_set, rpc2real_relay_set,
31+ } ,
3032 fast_streams:: helpers:: {
3133 self , create_blocks_publisher, spawn_slot_bidder_seal_bid_command_subscriber,
3234 BlocksPublisher , ScrapedBidsPublisher ,
3335 } ,
34- CreateSlotBidderParams , DestroySlotBidderParams , Empty , LandedBlocksParams ,
36+ CreateSlotBidderParams , DestroySlotBidderParams , Empty , InitParams , LandedBlocksParams ,
3537 MustWinBlockParams ,
3638 } ,
3739 metrics:: set_bidding_service_version,
@@ -66,6 +68,7 @@ pub struct BiddingServiceClientAdapter {
6668 last_session_id : AtomicU64 ,
6769 scraped_bids_publisher : ScrapedBidsPublisher ,
6870 blocks_publisher : Arc < BlocksPublisher > ,
71+ relay_sets : Vec < RelaySet > ,
6972}
7073
7174impl std:: fmt:: Debug for BiddingServiceClientAdapter {
@@ -93,6 +96,8 @@ pub enum Error {
9396 InitFailed ( tonic:: Status ) ,
9497 #[ error( "ScrapedBidsPublisher error : {0}" ) ]
9598 ScrapedBidsPublisher ( #[ from] helpers:: Error ) ,
99+ #[ error( "Bidder version not found" ) ]
100+ BidderVersionNotFound ,
96101}
97102
98103pub type Result < T > = core:: result:: Result < T , Error > ;
@@ -102,17 +107,20 @@ impl BiddingServiceClientAdapter {
102107 pub async fn new (
103108 uds_path : & str ,
104109 landed_blocks_history : & [ RealLandedBlockInfo ] ,
110+ all_relay_ids : RelaySet ,
105111 cancellation_token : CancellationToken ,
106112 ) -> Result < Self > {
107113 let session_id_to_slot_bidder = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
108- let commands_sender = Self :: init_sender_task (
114+ let ( commands_sender, relay_sets ) = Self :: init_sender_task (
109115 uds_path,
110116 landed_blocks_history,
117+ all_relay_ids,
111118 session_id_to_slot_bidder. clone ( ) ,
112119 )
113120 . await ?;
114121 spawn_slot_bidder_seal_bid_command_subscriber (
115122 session_id_to_slot_bidder,
123+ relay_sets. clone ( ) ,
116124 cancellation_token. clone ( ) ,
117125 ) ?;
118126 let scraped_bids_publisher = ScrapedBidsPublisher :: new ( ) ?;
@@ -122,20 +130,26 @@ impl BiddingServiceClientAdapter {
122130 last_session_id : AtomicU64 :: new ( 0 ) ,
123131 scraped_bids_publisher,
124132 blocks_publisher,
133+ relay_sets,
125134 } )
126135 }
127136
128137 fn new_session_id ( & self ) -> u64 {
129138 self . last_session_id . fetch_add ( 1 , Ordering :: Relaxed )
130139 }
131140
141+ // returns the commands_sender to send commands to the bidding service and the relay_sets that it got on the initialize call.
132142 async fn init_sender_task (
133143 uds_path : & str ,
134144 landed_blocks_history : & [ RealLandedBlockInfo ] ,
145+ all_relay_ids : RelaySet ,
135146 session_id_to_slot_bidder : Arc <
136147 Mutex < HashMap < u64 , Arc < dyn BlockSealInterfaceForSlotBidder + Send + Sync > > > ,
137148 > ,
138- ) -> Result < mpsc:: UnboundedSender < BiddingServiceClientCommand > > {
149+ ) -> Result < (
150+ mpsc:: UnboundedSender < BiddingServiceClientCommand > ,
151+ Vec < RelaySet > ,
152+ ) > {
139153 let uds_path = uds_path. to_string ( ) ;
140154 // Url us dummy but needed to create the Endpoint.
141155 let channel = Endpoint :: try_from ( "http://[::]:50051" )
@@ -148,23 +162,42 @@ impl BiddingServiceClientAdapter {
148162 . await ?;
149163 // Create a client
150164 let mut client = BiddingServiceClient :: new ( channel) ;
151- let init_params = LandedBlocksParams {
165+ let init_params = InitParams {
152166 landed_block_info : landed_blocks_history
153167 . iter ( )
154168 . map ( real2rpc_landed_block_info)
155169 . collect ( ) ,
170+ all_relay_ids : Some ( real2rpc_relay_set ( & all_relay_ids) ) ,
156171 } ;
157- let bidding_service_version = client
172+ let init_res = client
158173 . initialize ( init_params)
159174 . await
160175 . map_err ( Error :: InitFailed ) ?;
161- let bidding_service_version = bidding_service_version. into_inner ( ) ;
176+ let init_res = init_res. into_inner ( ) ;
177+ let bidding_service_version = init_res
178+ . bidder_version
179+ . ok_or ( Error :: BidderVersionNotFound ) ?;
180+ let relay_sets = init_res. relay_sets . iter ( ) . map ( rpc2real_relay_set) . collect ( ) ;
181+ info ! ( ?relay_sets, "relay sets received from bidding service" ) ;
162182 set_bidding_service_version ( Version {
163183 git_commit : bidding_service_version. git_commit ,
164184 git_ref : bidding_service_version. git_ref ,
165185 build_time_utc : bidding_service_version. build_time_utc ,
166186 } ) ;
167- let ( commands_sender, mut rx) = mpsc:: unbounded_channel :: < BiddingServiceClientCommand > ( ) ;
187+ let ( commands_sender, rx) = mpsc:: unbounded_channel :: < BiddingServiceClientCommand > ( ) ;
188+ Self :: spawn_sender_loop_task ( rx, client, session_id_to_slot_bidder) ;
189+ Ok ( ( commands_sender, relay_sets) )
190+ }
191+
192+ /// Spawns a task to execute on client commands received via the channel.
193+ /// Sessions are kept in session_id_to_slot_bidder.
194+ fn spawn_sender_loop_task (
195+ mut rx : mpsc:: UnboundedReceiver < BiddingServiceClientCommand > ,
196+ mut client : BiddingServiceClient < Channel > ,
197+ session_id_to_slot_bidder : Arc <
198+ Mutex < HashMap < u64 , Arc < dyn BlockSealInterfaceForSlotBidder + Send + Sync > > > ,
199+ > ,
200+ ) {
168201 // Spawn a task to execute received futures
169202 tokio:: spawn ( async move {
170203 while let Some ( command) = rx. recv ( ) . await {
@@ -201,7 +234,6 @@ impl BiddingServiceClientAdapter {
201234 }
202235 }
203236 } ) ;
204- Ok ( commands_sender)
205237 }
206238
207239 /// Calls create_slot_bidder via RPC to init the bidder.
@@ -276,6 +308,10 @@ impl BiddingService for BiddingServiceClientAdapter {
276308 ) )
277309 }
278310
311+ fn relay_sets ( & self ) -> Vec < RelaySet > {
312+ self . relay_sets . clone ( )
313+ }
314+
279315 fn update_new_landed_blocks_detected ( & self , landed_blocks : & [ RealLandedBlockInfo ] ) {
280316 let param = LandedBlocksParams {
281317 landed_block_info : landed_blocks
0 commit comments