Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jemalloc = ["rblib/jemalloc"]
debug = ["tokio/full", "tokio/tracing", "dep:console-subscriber"]

[dependencies]
rblib = { git = "https://github.com/flashbots/rblib", rev = "66c5c15a8834a6f677bec0d6b34f54ffc7cdd4ff" }
rblib = { git = "https://github.com/flashbots/rblib", rev = "9eccd22047c3f8978ae022d575dca749a81684a8" }

futures = "0.3"
tokio = "1.46"
Expand All @@ -60,24 +60,12 @@ atomic-time = "0.1"
secp256k1 = "0.30"
jsonrpsee = "0.26.0"

alloy-json-rpc = "1.0.37"
alloy-serde = "1.0.37"


# reth dependencies
reth-network-peers = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }
reth-db-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }
reth-node-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }
reth-optimism-node = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }
reth-optimism-rpc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }
reth-provider = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }

# debug flag
console-subscriber = { version = "0.4", optional = true }
tracing-subscriber = "0.3.20"

[dev-dependencies]
rblib = { git = "https://github.com/flashbots/rblib", rev = "66c5c15a8834a6f677bec0d6b34f54ffc7cdd4ff", features = [
rblib = { git = "https://github.com/flashbots/rblib", rev = "9eccd22047c3f8978ae022d575dca749a81684a8", features = [
"test-utils",
] }

Expand Down
12 changes: 8 additions & 4 deletions src/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! - The `eth_sendBundle` input parameters and their validation.

use {
crate::platform::Flashblocks,
crate::{platform::Flashblocks, state::FlashblockNumber},
core::convert::Infallible,
rblib::{
alloy::{
Expand Down Expand Up @@ -51,7 +51,7 @@ pub struct FlashblocksBundle {

#[serde(
default,
with = "alloy_serde::quantity::opt",
with = "rblib::alloy::serde::quantity::opt",
skip_serializing_if = "Option::is_none"
)]
pub min_block_number: Option<u64>,
Expand All @@ -60,7 +60,7 @@ pub struct FlashblocksBundle {
/// blocks with a block number higher than this value.
#[serde(
default,
with = "alloy_serde::quantity::opt",
with = "rblib::alloy::serde::quantity::opt",
skip_serializing_if = "Option::is_none"
)]
pub max_block_number: Option<u64>,
Expand Down Expand Up @@ -123,7 +123,11 @@ impl Bundle<Flashblocks> for FlashblocksBundle {

/// Tests the eligibility of the bundle for inclusion in a block before
/// executing any of its transactions.
fn is_eligible(&self, block: &BlockContext<Flashblocks>) -> Eligibility {
fn is_eligible(
&self,
block: &BlockContext<Flashblocks>,
_ctx: &FlashblockNumber,
) -> Eligibility {
if self.txs.is_empty() {
// empty bundles are never eligible
return Eligibility::PermanentlyIneligible;
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod bundle;
mod platform;
mod primitives;
mod state;

pub use {bundle::*, platform::*, primitives::*};
pub use {bundle::*, platform::*, primitives::*, state::*};
60 changes: 37 additions & 23 deletions src/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
//! flashblock partitioning logic.

use {
crate::{Flashblocks, state::FlashblockNumber},
crate::{
Flashblocks,
state::{FlashblockNumber, TargetFlashblocks},
},
core::time::Duration,
rblib::{alloy::consensus::BlockHeader, prelude::*},
std::sync::{Arc, Mutex},
tracing::debug,
};

/// Specifies the limits for individual flashblocks.
Expand All @@ -33,7 +37,7 @@ pub struct FlashblockState {
current_block: Option<u64>,
/// Current flashblock number. Used to check if we're on the first
/// flashblock or to adjust the target number of flashblocks for a block.
flashblock_number: Arc<FlashblockNumber>,
target_flashblocks: Arc<TargetFlashblocks>,
/// Duration for the first flashblock, which may be shortened to absorb
/// timing variance.
first_flashblock_interval: Duration,
Expand All @@ -43,20 +47,20 @@ pub struct FlashblockState {
}

impl FlashblockState {
fn current_gas_limit(&self) -> u64 {
fn current_gas_limit(&self, flashblock_number: &FlashblockNumber) -> u64 {
self
.gas_per_flashblock
.saturating_mul(self.flashblock_number.current())
.saturating_mul(flashblock_number.current())
}
}

impl FlashblockLimits {
pub fn new(
interval: Duration,
flashblock_number: Arc<FlashblockNumber>,
target_flashblocks: Arc<TargetFlashblocks>,
) -> Self {
let state = FlashblockState {
flashblock_number,
target_flashblocks,
..Default::default()
};
FlashblockLimits {
Expand All @@ -77,7 +81,7 @@ impl FlashblockLimits {
pub fn update_state(
&self,
payload: &Checkpoint<Flashblocks>,
enclosing: &Limits,
enclosing: &Limits<Flashblocks>,
) {
let mut state = self.state.lock().expect("mutex is not poisoned");

Expand All @@ -88,38 +92,45 @@ impl FlashblockLimits {
let elapsed = payload.building_since().elapsed();
let remaining_time = payload_deadline.saturating_sub(elapsed);

let (target_flashblock, first_flashblock_interval) =
let (target_flashblocks, first_flashblock_interval) =
self.calculate_flashblocks(payload, remaining_time);

state.gas_per_flashblock = enclosing
.gas_limit
.checked_div(target_flashblock)
.checked_div(target_flashblocks)
.unwrap_or(enclosing.gas_limit);
state.current_block = Some(payload.block().number());
state.first_flashblock_interval = first_flashblock_interval;
state.flashblock_number.reset_current_flashblock();
state
.flashblock_number
.set_target_flashblocks(target_flashblock);
state.target_flashblocks.set(target_flashblocks);

debug!(
target_flashblocks = target_flashblocks,
first_flashblock_interval = ?first_flashblock_interval,
"Set flashblock timing for this block"
);
}
}

/// Returns limits for the current flashblock.
///
/// If all flashblocks have been produced, returns a deadline of 1ms to stop
/// production.
pub fn get_limits(&self, enclosing: &Limits) -> Limits {
pub fn get_limits(
&self,
enclosing: &Limits<Flashblocks>,
flashblock_number: &FlashblockNumber,
) -> Limits<Flashblocks> {
let state = self.state.lock().expect("mutex is not poisoned");
// If flashblock number == 1, we're building the first flashblock
let deadline = if state.flashblock_number.current() == 1 {
let deadline = if flashblock_number.current() == 1 {
state.first_flashblock_interval
} else {
self.interval
};

enclosing
.with_deadline(deadline)
.with_gas_limit(state.current_gas_limit())
.with_gas_limit(state.current_gas_limit(flashblock_number))
}

/// Calculates the number of flashblocks and first flashblock interval for
Expand Down Expand Up @@ -148,29 +159,32 @@ impl ScopedLimits<Flashblocks> for FlashblockLimits {
fn create(
&self,
payload: &Checkpoint<Flashblocks>,
enclosing: &Limits,
) -> Limits {
enclosing: &Limits<Flashblocks>,
) -> Limits<Flashblocks> {
let flashblock_number = payload.context();
// Check the state and reset if we started building next block
self.update_state(payload, enclosing);

let limits = self.get_limits(enclosing);
let limits = self.get_limits(enclosing, flashblock_number);

let state = self.state.lock().expect("mutex is not poisoned");
if state.flashblock_number.in_bounds() {
let flashblock_number = payload.context();
if flashblock_number.current() <= state.target_flashblocks.get() {
let gas_used = payload.cumulative_gas_used();
let remaining_gas = enclosing.gas_limit.saturating_sub(gas_used);
tracing::info!(
"Creating flashblocks limits: {}, payload txs: {}, gas used: {} \
({}%), gas_remaining: {} ({}%), next_block_gas_limit: {} ({}%), gas \
per block: {} ({}%), remaining_time: {}ms, gas_limit: {}",
state.flashblock_number,
flashblock_number,
payload.history().transactions().count(),
gas_used,
(gas_used * 100 / enclosing.gas_limit),
remaining_gas,
(remaining_gas * 100 / enclosing.gas_limit),
state.current_gas_limit(),
(state.current_gas_limit() * 100 / enclosing.gas_limit),
state.current_gas_limit(flashblock_number),
(state.current_gas_limit(flashblock_number) * 100
/ enclosing.gas_limit),
state.gas_per_flashblock,
(state.gas_per_flashblock * 100 / enclosing.gas_limit),
limits.deadline.expect("deadline is set").as_millis(),
Expand Down
54 changes: 31 additions & 23 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@ use {
publish::{PublishFlashblock, WebSocketSink},
rpc::TransactionStatusRpc,
signer::BuilderSigner,
state::FlashblockNumber,
state::TargetFlashblocks,
stop::BreakAfterMaxFlashblocks,
},
platform::Flashblocks,
rblib::{pool::*, prelude::*, steps::*},
reth_optimism_node::{
OpAddOns,
OpEngineApiBuilder,
OpEngineValidatorBuilder,
OpNode,
rblib::{
pool::*,
prelude::*,
reth::optimism::{
node::{OpAddOns, OpEngineApiBuilder, OpEngineValidatorBuilder, OpNode},
rpc::OpEthApiBuilder,
},
steps::*,
},
reth_optimism_rpc::OpEthApiBuilder,
std::sync::Arc,
tracing::info,
};

mod args;
Expand Down Expand Up @@ -103,9 +105,12 @@ fn build_pipeline(
.clone()
.unwrap_or(BuilderSigner::random());

// Multiple steps need to access flashblock number state, so we need to
// initialize it outside
let flashblock_number = Arc::new(FlashblockNumber::new());
let target_flashblocks = Arc::new(TargetFlashblocks::new());

info!(
"cli_args.builder_signer.is_some() = {}",
cli_args.builder_signer.is_some()
);

let pipeline = Pipeline::<Flashblocks>::named("top")
.with_step(OptimismPrologue)
Expand All @@ -124,35 +129,38 @@ fn build_pipeline(
Once,
Pipeline::named("single_flashblock")
.with_pipeline(
Loop,
Once,
Pipeline::named("flashblock_steps")
.with_step(AppendOrders::from_pool(pool).with_ok_on_limit())
.with_step(OrderByPriorityFee::default())
.with_step_if(
cli_args.revert_protection,
RemoveRevertedTransactions::default(),
.with_pipeline(
Loop,
Pipeline::named("inner_flashblock_steps")
.with_step(AppendOrders::from_pool(pool).with_ok_on_limit())
.with_step(OrderByPriorityFee::default())
.with_step_if(
cli_args.revert_protection,
RemoveRevertedTransactions::default(),
)
.with_step(BreakAfterDeadline),
)
.with_step(BreakAfterDeadline)
.with_epilogue_if(
.with_step_if(
cli_args.builder_signer.is_some(),
BuilderEpilogue::with_signer(builder_signer.clone().into())
.with_message(|block| {
format!("Block Number: {}", block.number())
}),
)
.with_epilogue(PublishFlashblock::new(
.with_step(PublishFlashblock::new(
ws.clone(),
flashblock_number.clone(),
cli_args.flashblocks_args.calculate_state_root,
))
.with_limits(FlashblockLimits::new(
interval,
flashblock_number.clone(),
target_flashblocks.clone(),
)),
)
.with_step(BreakAfterDeadline),
)
.with_step(BreakAfterMaxFlashblocks::new(flashblock_number)),
.with_step(BreakAfterMaxFlashblocks::new(target_flashblocks)),
)
.with_limits(Scaled::default().deadline(total_building_time));

Expand Down
6 changes: 4 additions & 2 deletions src/platform.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::bundle::FlashblocksBundle,
crate::{bundle::FlashblocksBundle, state::FlashblockNumber},
rblib::{prelude::*, reth::providers::StateProvider},
serde::{Deserialize, Serialize},
std::sync::Arc,
Expand All @@ -19,8 +19,10 @@ pub struct Flashblocks;

impl Platform for Flashblocks {
type Bundle = FlashblocksBundle;
type CheckpointContext = FlashblockNumber;
type DefaultLimits = types::DefaultLimits<Optimism>;
type EvmConfig = types::EvmConfig<Optimism>;
type ExtraLimits = types::ExtraLimits<Optimism>;
type NodeTypes = types::NodeTypes<Optimism>;
type PooledTransaction = types::PooledTransaction<Optimism>;

Expand All @@ -35,7 +37,7 @@ impl Platform for Flashblocks {
chainspec: &types::ChainSpec<P>,
parent: &types::Header<P>,
attributes: &types::PayloadBuilderAttributes<P>,
) -> types::NextBlockEnvContext<P>
) -> Result<types::NextBlockEnvContext<P>, types::EvmEnvError<P>>
where
P: traits::PlatformExecBounds<Self>,
{
Expand Down
2 changes: 1 addition & 1 deletion src/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ use {
alloy::hex,
reth::{
cli::chainspec::ChainSpecParser,
network_peers::TrustedPeer,
optimism::{
chainspec::OpChainSpec,
cli::{chainspec::OpChainSpecParser, commands::Commands},
},
},
},
reth_network_peers::TrustedPeer,
secp256k1::SecretKey,
std::{
fs::read_to_string,
Expand Down
Loading