diff --git a/Cargo.lock b/Cargo.lock index f46b0895..54dfe68e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -394,7 +394,7 @@ dependencies = [ "cfg-if", "const-hex", "derive_more", - "foldhash", + "foldhash 0.1.5", "getrandom 0.3.3", "hashbrown 0.15.4", "indexmap 2.10.0", @@ -3412,6 +3412,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -3771,10 +3777,21 @@ checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", "serde", ] +[[package]] +name = "hashbrown" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", +] + [[package]] name = "hashlink" version = "0.9.1" @@ -5149,6 +5166,15 @@ dependencies = [ "hashbrown 0.15.4", ] +[[package]] +name = "lru" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96051b46fc183dc9cd4a223960ef37b9af631b55191852a8274bfef064cda20f" +dependencies = [ + "hashbrown 0.16.0", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -10192,6 +10218,7 @@ dependencies = [ "alloy-serde", "anyhow", "assert_cmd", + "backoff", "bytes", "clap", "ctor", @@ -10205,6 +10232,7 @@ dependencies = [ "hyper-rustls", "hyper-util", "jsonrpsee 0.25.1", + "lru 0.16.2", "metrics", "metrics-derive", "metrics-exporter-prometheus 0.16.2", @@ -10238,6 +10266,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber 0.3.20", "url", + "uuid", "vergen", "vergen-git2", ] diff --git a/Cargo.toml b/Cargo.toml index 55226aec..0695601f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ tokio = { version = "1", features = ["full"] } eyre = "0.6.12" url = "2.2.0" sha2 = { version = "0.10", default-features = false } +backoff = "0.4.0" # Reth deps reth-optimism-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.3" } diff --git a/crates/rollup-boost/Cargo.toml b/crates/rollup-boost/Cargo.toml index 1db81c8e..d7a00817 100644 --- a/crates/rollup-boost/Cargo.toml +++ b/crates/rollup-boost/Cargo.toml @@ -64,6 +64,10 @@ paste = "1.0.15" parking_lot = "0.12.3" tokio-util = { version = "0.7.13" } dashmap = "6.1.0" +backoff.workspace = true +uuid = { version = "1.17.0", features = ["v4", "v7"] } +bytes = "1.10.1" +lru = "0.16" [dev-dependencies] rand = "0.9.0" diff --git a/crates/rollup-boost/src/flashblocks/args.rs b/crates/rollup-boost/src/flashblocks/args.rs index 1b5a7780..47b7ee1e 100644 --- a/crates/rollup-boost/src/flashblocks/args.rs +++ b/crates/rollup-boost/src/flashblocks/args.rs @@ -1,4 +1,6 @@ +use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; use clap::Parser; +use std::time::Duration; use url::Url; #[derive(Parser, Clone, Debug)] @@ -19,7 +21,59 @@ pub struct FlashblocksArgs { #[arg(long, env, default_value = "1112")] pub flashblocks_port: u16, - /// Time used for timeout if builder disconnected + /// Websocket connection configuration + #[command(flatten)] + pub flashblocks_ws_config: FlashblocksWebsocketConfig, +} + +#[derive(Parser, Debug, Clone, Copy)] +pub struct FlashblocksWebsocketConfig { + /// Minimum time for exponential backoff for timeout if builder disconnected + #[arg(long, env, default_value = "10")] + pub flashblock_builder_ws_initial_reconnect_ms: u64, + + /// Maximum time for exponential backoff for timeout if builder disconnected #[arg(long, env, default_value = "5000")] - pub flashblock_builder_ws_reconnect_ms: u64, + pub flashblock_builder_ws_max_reconnect_ms: u64, + + /// Interval in milliseconds between ping messages sent to upstream servers to detect unresponsive connections + #[arg(long, env, default_value = "500")] + pub flashblock_builder_ws_ping_interval_ms: u64, + + /// Timeout in milliseconds to wait for pong responses from upstream servers before considering the connection dead + #[arg(long, env, default_value = "1500")] + pub flashblock_builder_ws_pong_timeout_ms: u64, +} + +impl FlashblocksWebsocketConfig { + /// Creates `ExponentialBackoff` use to control builder websocket reconnection time + pub fn backoff(&self) -> ExponentialBackoff { + ExponentialBackoffBuilder::default() + .with_initial_interval(self.initial_interval()) + .with_max_interval(self.max_interval()) + .with_randomization_factor(0 as f64) + .with_max_elapsed_time(None) + .with_multiplier(2.0) + .build() + } + + /// Returns initial time for exponential backoff + pub fn initial_interval(&self) -> Duration { + Duration::from_millis(self.flashblock_builder_ws_initial_reconnect_ms) + } + + /// Returns maximal time for exponential backoff + pub fn max_interval(&self) -> Duration { + Duration::from_millis(self.flashblock_builder_ws_max_reconnect_ms) + } + + /// Returns ping interval + pub fn ping_interval(&self) -> Duration { + Duration::from_millis(self.flashblock_builder_ws_ping_interval_ms) + } + + /// Returns pong interval + pub fn pong_interval(&self) -> Duration { + Duration::from_millis(self.flashblock_builder_ws_pong_timeout_ms) + } } diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index a5f6e053..4737c66c 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -1,13 +1,21 @@ -use std::time::Duration; - use super::{metrics::FlashblocksWsInboundMetrics, primitives::FlashblocksPayloadV1}; +use crate::FlashblocksWebsocketConfig; +use backoff::ExponentialBackoff; +use backoff::backoff::Backoff; +use bytes::Bytes; use futures::{SinkExt, StreamExt}; +use lru::LruCache; +use std::num::NonZeroUsize; +use std::sync::Arc; +use std::sync::Mutex; use tokio::{sync::mpsc, time::interval}; use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_util::sync::CancellationToken; use tracing::{error, info}; use url::Url; +const MAXIMUM_PINGS: NonZeroUsize = NonZeroUsize::new(60).expect("positive number always non zero"); + #[derive(Debug, thiserror::Error)] enum FlashblocksReceiverError { #[error("WebSocket connection failed: {0}")] @@ -16,8 +24,11 @@ enum FlashblocksReceiverError { #[error("Ping failed")] PingFailed, - #[error("Read timeout")] - ReadTimeout, + #[error("Pong timeout")] + PongTimeout, + + #[error("Websocket haven't return the message")] + MessageMissing, #[error("Connection error: {0}")] ConnectionError(String), @@ -30,39 +41,57 @@ enum FlashblocksReceiverError { #[error("Failed to send message to sender: {0}")] SendError(#[from] Box>), + + #[error("Ping mutex poisoned")] + MutexPoisoned, } pub struct FlashblocksReceiverService { url: Url, sender: mpsc::Sender, - reconnect_ms: u64, + websocket_config: FlashblocksWebsocketConfig, metrics: FlashblocksWsInboundMetrics, } impl FlashblocksReceiverService { - pub fn new(url: Url, sender: mpsc::Sender, reconnect_ms: u64) -> Self { + pub fn new( + url: Url, + sender: mpsc::Sender, + websocket_config: FlashblocksWebsocketConfig, + ) -> Self { Self { url, sender, - reconnect_ms, + websocket_config, metrics: Default::default(), } } pub async fn run(self) { + let mut backoff = self.websocket_config.backoff(); loop { - if let Err(e) = self.connect_and_handle().await { - error!("Flashblocks receiver connection error, retrying in 5 seconds: {e}"); + if let Err(e) = self.connect_and_handle(&mut backoff).await { + let interval = backoff + .next_backoff() + .expect("max_elapsed_time not set, never None"); + error!( + "Flashblocks receiver connection error, retrying in {}ms: {}", + interval.as_millis(), + e + ); self.metrics.reconnect_attempts.increment(1); self.metrics.connection_status.set(0); - tokio::time::sleep(std::time::Duration::from_millis(self.reconnect_ms)).await; + tokio::time::sleep(interval).await; } else { break; } } } - async fn connect_and_handle(&self) -> Result<(), FlashblocksReceiverError> { + async fn connect_and_handle( + &self, + backoff: &mut ExponentialBackoff, + ) -> Result<(), FlashblocksReceiverError> { let (ws_stream, _) = connect_async(self.url.as_str()).await?; let (mut write, mut read) = ws_stream.split(); @@ -72,18 +101,32 @@ impl FlashblocksReceiverService { let cancel_token = CancellationToken::new(); let cancel_for_ping = cancel_token.clone(); + // LRU cache with capacity of 60 pings - automatically evicts oldest entries + let ping_cache = Arc::new(Mutex::new(LruCache::new(MAXIMUM_PINGS))); + let pong_cache = ping_cache.clone(); + let mut ping_interval = interval(self.websocket_config.ping_interval()); let ping_task = tokio::spawn(async move { - let mut ping_interval = interval(Duration::from_millis(500)); - loop { tokio::select! { _ = ping_interval.tick() => { - if write.send(Message::Ping(Default::default())).await.is_err() { + let uuid = uuid::Uuid::now_v7(); + if write.send(Message::Ping(Bytes::copy_from_slice(uuid.as_bytes().as_slice()))).await.is_err() { return Err(FlashblocksReceiverError::PingFailed); } + match ping_cache.lock() { + Ok(mut cache) => { + cache.put(uuid, ()); + } + Err(_) => { + return Err(FlashblocksReceiverError::MutexPoisoned); + } + } } _ = cancel_for_ping.cancelled() => { tracing::debug!("Ping task cancelled"); + if let Err(e) = write.close().await { + tracing::warn!("Failed to close builder ws connection: {}", e); + } return Ok(()); } } @@ -93,40 +136,71 @@ impl FlashblocksReceiverService { let sender = self.sender.clone(); let metrics = self.metrics.clone(); - let read_timeout = Duration::from_millis(1500); + let pong_timeout = self.websocket_config.pong_interval(); let message_handle = tokio::spawn(async move { + let mut pong_interval = interval(pong_timeout); + // We await here because first tick executes immediately + pong_interval.tick().await; loop { - let result = tokio::time::timeout(read_timeout, read.next()) - .await - .map_err(|_| FlashblocksReceiverError::ReadTimeout)?; + tokio::select! { + result = read.next() => { + match result { + Some(Ok(msg)) => match msg { + Message::Text(text) => { + metrics.messages_received.increment(1); + match serde_json::from_str::(&text) { + Ok(flashblocks_msg) => sender.send(flashblocks_msg).await.map_err(|e| { + FlashblocksReceiverError::SendError(Box::new(e)) + })?, + Err(e) => error!("Failed to process flashblock, error: {e}") + } + } + Message::Close(_) => { + return Err(FlashblocksReceiverError::ConnectionClosed); + } + Message::Pong(data) => { + match uuid::Uuid::from_slice(data.as_ref()) { + Ok(uuid) => { + match pong_cache.lock() { + Ok(mut cache) => { + if cache.pop(&uuid).is_some() { + pong_interval.reset(); + } else { + tracing::warn!("Received pong with unknown data:{}", uuid); + } + } + Err(_) => { + return Err(FlashblocksReceiverError::MutexPoisoned); + } + } + } + Err(e) => { + tracing::warn!("Failed to parse pong: {e}"); + } + } - match result { - Some(Ok(msg)) => match msg { - Message::Text(text) => { - metrics.messages_received.increment(1); - if let Ok(flashblocks_msg) = - serde_json::from_str::(&text) - { - sender.send(flashblocks_msg).await.map_err(|e| { - FlashblocksReceiverError::SendError(Box::new(e)) - })?; + } + msg => { + tracing::warn!("Received unexpected message: {:?}", msg); + } + }, + Some(Err(e)) => { + return Err(FlashblocksReceiverError::ConnectionError(e.to_string())); + } + None => { + return Err(FlashblocksReceiverError::MessageMissing); } } - Message::Close(_) => { - return Err(FlashblocksReceiverError::ConnectionClosed); - } - _ => {} }, - Some(Err(e)) => { - return Err(FlashblocksReceiverError::ConnectionError(e.to_string())); - } - None => { - return Err(FlashblocksReceiverError::ReadTimeout); + _ = pong_interval.tick() => { + return Err(FlashblocksReceiverError::PongTimeout); } }; } }); + let connection_start = std::time::Instant::now(); + let result = tokio::select! { result = message_handle => { result.map_err(|e| FlashblocksReceiverError::TaskPanic(e.to_string()))? @@ -137,6 +211,12 @@ impl FlashblocksReceiverService { }; cancel_token.cancel(); + + // Only reset backoff if connection was stable for the max_interval set + // This prevents rapid reconnection loops when a proxy accepts and immediately drops connections + if connection_start.elapsed() >= backoff.max_interval { + backoff.reset(); + } result } } @@ -149,6 +229,7 @@ mod tests { use super::*; use std::net::{SocketAddr, TcpListener}; + use std::sync::atomic::{AtomicBool, Ordering}; async fn start( addr: SocketAddr, @@ -191,16 +272,16 @@ mod tests { loop { tokio::select! { Some(msg) = send_rx.recv() => { - let serialized = serde_json::to_string(&msg).unwrap(); + let serialized = serde_json::to_string(&msg).expect("message serialized"); let utf8_bytes = Utf8Bytes::from(serialized); - write.send(Message::Text(utf8_bytes)).await.unwrap(); + write.send(Message::Text(utf8_bytes)).await.expect("message sent"); }, msg = read.next() => { match msg { // we need to read for the library to handle pong messages Some(Ok(Message::Ping(_))) => { - send_ping_tx.send(()).await.unwrap(); + send_ping_tx.send(()).await.expect("ping notification sent"); }, _ => {} } @@ -233,14 +314,89 @@ mod tests { Ok((term_tx, send_tx, send_ping_rx, url)) } + async fn start_ping_server( + addr: SocketAddr, + send_pongs: Arc, + ) -> eyre::Result<(watch::Receiver, mpsc::Receiver, url::Url)> { + let (term_tx, term_rx) = watch::channel(false); + let (send_ping_tx, send_ping_rx) = mpsc::channel(100); + + let listener = TcpListener::bind(addr)?; + let url = Url::parse(&format!("ws://{addr}"))?; + + listener + .set_nonblocking(true) + .expect("can set TcpListener socket to non-blocking"); + + let listener = tokio::net::TcpListener::from_std(listener) + .expect("can convert TcpListener to tokio TcpListener"); + + tokio::spawn(async move { + loop { + let result = listener.accept().await; + match result { + Ok((connection, _addr)) => { + match accept_async(connection).await { + Ok(ws_stream) => { + let (_, mut read) = ws_stream.split(); + loop { + if send_pongs.load(Ordering::Relaxed) { + let msg = read.next().await; + match msg { + // we need to read for the library to handle pong messages + Some(Ok(Message::Ping(data))) => { + send_ping_tx + .send(data) + .await + .expect("ping data sent"); + } + Some(Err(_)) => { + break; + } + _ => {} + } + } else { + tokio::time::sleep(tokio::time::Duration::from_millis(1)) + .await; + } + } + } + Err(e) => { + eprintln!("Failed to accept WebSocket connection: {}", e); + } + } + } + Err(e) => { + // Optionally break or continue based on error type + if e.kind() == std::io::ErrorKind::Interrupted { + break; + } + } + } + // If we have broken from the loop it means reconnection occurred + term_tx.send(true).expect("channel is up"); + } + }); + + Ok((term_rx, send_ping_rx, url)) + } + #[tokio::test] async fn test_flashblocks_receiver_service() -> eyre::Result<()> { - let addr = "127.0.0.1:8080".parse::().unwrap(); + let addr = "127.0.0.1:8080" + .parse::() + .expect("valid socket address"); let (term, send_msg, _, url) = start(addr).await?; let (tx, mut rx) = mpsc::channel(100); - let service = FlashblocksReceiverService::new(url, tx, 100); + let config = FlashblocksWebsocketConfig { + flashblock_builder_ws_initial_reconnect_ms: 100, + flashblock_builder_ws_max_reconnect_ms: 100, + flashblock_builder_ws_ping_interval_ms: 500, + flashblock_builder_ws_pong_timeout_ms: 2000, + }; + let service = FlashblocksReceiverService::new(url, tx, config); let _ = tokio::spawn(async move { service.run().await; }); @@ -249,14 +405,14 @@ mod tests { send_msg .send(FlashblocksPayloadV1::default()) .await - .expect("Failed to send message"); + .expect("message sent to websocket server"); - let msg = rx.recv().await.expect("Failed to receive message"); + let msg = rx.recv().await.expect("message received from websocket"); assert_eq!(msg, FlashblocksPayloadV1::default()); // Drop the websocket server and start another one with the same address // The FlashblocksReceiverService should reconnect to the new server - term.send(true).unwrap(); + term.send(true).expect("termination signal sent"); // sleep for 1 second to ensure the server is dropped tokio::time::sleep(std::time::Duration::from_secs(1)).await; @@ -266,11 +422,11 @@ mod tests { send_msg .send(FlashblocksPayloadV1::default()) .await - .expect("Failed to send message"); + .expect("message sent to websocket server"); - let msg = rx.recv().await.expect("Failed to receive message"); + let msg = rx.recv().await.expect("message received from websocket"); assert_eq!(msg, FlashblocksPayloadV1::default()); - term.send(true).unwrap(); + term.send(true).expect("termination signal sent"); Ok(()) } @@ -280,20 +436,50 @@ mod tests { // test that if the builder is not sending any messages back, the service will send // ping messages to test the connection periodically - let addr = "127.0.0.1:8081".parse::().unwrap(); - let (_term, _send_msg, mut ping_rx, url) = start(addr).await?; + let addr = "127.0.0.1:8081" + .parse::() + .expect("valid socket address"); + let send_pongs = Arc::new(AtomicBool::new(true)); + let (term, mut ping_rx, url) = start_ping_server(addr, send_pongs.clone()).await?; + let config = FlashblocksWebsocketConfig { + flashblock_builder_ws_initial_reconnect_ms: 100, + flashblock_builder_ws_max_reconnect_ms: 1000, + flashblock_builder_ws_ping_interval_ms: 500, + flashblock_builder_ws_pong_timeout_ms: 2000, + }; let (tx, _rx) = mpsc::channel(100); - let service = FlashblocksReceiverService::new(url, tx, 100); + let service = FlashblocksReceiverService::new(url, tx, config); let _ = tokio::spawn(async move { service.run().await; }); // even if we do not send any messages, we should receive pings to keep the connection alive - for _ in 0..10 { - ping_rx.recv().await.expect("Failed to receive ping"); + for _ in 0..5 { + ping_rx.recv().await.expect("ping received"); } + // Check that server hasn't reconnected because we have answered to pongs + let reconnected = term.has_changed().expect("channel not closed"); + assert!(!reconnected, "not reconnected when we answered to pings"); + send_pongs.store(false, Ordering::Relaxed); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + send_pongs.store(true, Ordering::Relaxed); + // This sleep is to ensure that we will try to read socket and realise it closed + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // One second is not enough to break the connection + let reconnected = term.has_changed().expect("channel not closed"); + assert!(!reconnected, "have reconnected before deadline is reached"); + + send_pongs.store(false, Ordering::Relaxed); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + send_pongs.store(true, Ordering::Relaxed); + // This sleep is to ensure that we will try to read socket and realise it closed + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // 3 seconds will cause reconnect + let reconnected = term.has_changed().expect("channel not closed"); + assert!(reconnected, "haven't reconnected after deadline is reached"); Ok(()) } } diff --git a/crates/rollup-boost/src/flashblocks/launcher.rs b/crates/rollup-boost/src/flashblocks/launcher.rs index fd8f0fb9..8b1e1ed5 100644 --- a/crates/rollup-boost/src/flashblocks/launcher.rs +++ b/crates/rollup-boost/src/flashblocks/launcher.rs @@ -1,5 +1,5 @@ use crate::flashblocks::inbound::FlashblocksReceiverService; -use crate::{FlashblocksService, RpcClient}; +use crate::{FlashblocksService, FlashblocksWebsocketConfig, RpcClient}; use core::net::SocketAddr; use tokio::sync::mpsc; use url::Url; @@ -11,11 +11,11 @@ impl Flashblocks { builder_url: RpcClient, flashblocks_url: Url, outbound_addr: SocketAddr, - reconnect_ms: u64, + websocket_config: FlashblocksWebsocketConfig, ) -> eyre::Result { let (tx, rx) = mpsc::channel(100); - let receiver = FlashblocksReceiverService::new(flashblocks_url, tx, reconnect_ms); + let receiver = FlashblocksReceiverService::new(flashblocks_url, tx, websocket_config); tokio::spawn(async move { let _ = receiver.run().await; }); diff --git a/crates/rollup-boost/src/server.rs b/crates/rollup-boost/src/server.rs index 9749f66f..71a5a6ec 100644 --- a/crates/rollup-boost/src/server.rs +++ b/crates/rollup-boost/src/server.rs @@ -96,7 +96,7 @@ impl RollupBoostServer { builder_client.clone(), inbound_url, outbound_addr, - flashblocks_args.flashblock_builder_ws_reconnect_ms, + flashblocks_args.flashblocks_ws_config, )?); Ok(RollupBoostServer::new( diff --git a/crates/websocket-proxy/Cargo.toml b/crates/websocket-proxy/Cargo.toml index 14785f79..1cf24afe 100644 --- a/crates/websocket-proxy/Cargo.toml +++ b/crates/websocket-proxy/Cargo.toml @@ -24,7 +24,7 @@ metrics-exporter-prometheus = { version = "0.17.0", features = [ http = "1.2.0" axum = { version = "0.8.1", features = ["ws"] } dotenvy = "0.15.7" -backoff = "0.4.0" +backoff.workspace = true reqwest = { version = "0.12.15", default-features = false, features = [ "native-tls", ] }