diff --git a/config-in-process-copy.toml b/config-in-process-copy.toml new file mode 100644 index 00000000..bdcb5245 --- /dev/null +++ b/config-in-process-copy.toml @@ -0,0 +1,60 @@ +log_json = true +log_level = "info,rbuilder=debug" +redacted_telemetry_server_port = 6061 +redacted_telemetry_server_ip = "0.0.0.0" +full_telemetry_server_port = 6060 +full_telemetry_server_ip = "0.0.0.0" + +chain = "../../genesis.json" +reth_datadir = "/data/reth/execution-data" +el_node_ipc_path = "/tmp/reth.ipc" +gwyneth_chain_ids = [160010, 160011] +l2_server_ports = [8646, 8647] + +coinbase_secret_key = "607a11b45a7219cc61a3d9c5fd08c7eebd602a6a19a977f8d3771d5711a550f2" +relay_secret_key = "607a11b45a7219cc61a3d9c5fd08c7eebd602a6a19a977f8d3771d5711a550f2" +optimistic_relay_secret_key = "607a11b45a7219cc61a3d9c5fd08c7eebd602a6a19a977f8d3771d5711a550f2" + +cl_node_url = ["http://cl-4-lighthouse-gwyneth:4000"] +jsonrpc_server_port = 8645 +jsonrpc_server_ip = "0.0.0.0" +extra_data = "πŸŒΈπŸ€–" +genesis_fork_version = "0x10000038" + +dry_run = false +dry_run_validation_url = "http://localhost:8545" + +ignore_cancellable_orders = true + +max_concurrent_seals = 4 + +sbundle_mergeabe_signers = [] +# slot_delta_to_start_submits_ms is usually negative since we start bidding BEFORE the slot start +# slot_delta_to_start_submits_ms = -5000 +live_builders = ["mp-ordering"] +watchdog_timeout_sec = 99999 + +[[relays]] +name = "gwyneth" +url = "http://0xa55c1285d84ba83a5ad26420cd5ad3091e49c55a813eee651cd467db38a8c8e63192f47955e9376f6b42f6d190571cb5@mev-relay-api:9062" +priority = 0 +use_ssz_for_submit = false +use_gzip_for_submit = false +l1_proposer_pk = "39725efee3fb28614de3bacaffe4cc4bd8c436257e2c8bb887c4b5c4be45e76d" +l1_rollup_contract = "0x9fCF7D13d10dEdF17d0f24C62f0cf4ED462f65b7" + +[[builders]] +name = "mgp-ordering" +algo = "ordering-builder" +discard_txs = true +sorting = "mev-gas-price" +failed_order_retries = 1 +drop_failed_orders = true + +[[builders]] +name = "mp-ordering" +algo = "ordering-builder" +discard_txs = true +sorting = "max-profit" +failed_order_retries = 1 +drop_failed_orders = true \ No newline at end of file diff --git a/config-in-process.toml b/config-in-process.toml index 65a07c7d..f66cad01 100644 --- a/config-in-process.toml +++ b/config-in-process.toml @@ -7,8 +7,8 @@ redacted_telemetry_server_ip = "0.0.0.0" full_telemetry_server_port = 6060 full_telemetry_server_ip = "0.0.0.0" -chain = "/network-configs/genesis.json" - +# chain = "/network-configs/genesis.json" +chain = "../../genesis.json" # running from ./target/debug # Doesn't matter cuz we override it # =================== reth_datadir = "/data/reth/execution-data" @@ -56,9 +56,8 @@ url = "http://0xac6e77dfe25ecd6110b8e780608cce0dab71fdd5ebea22a16c0205200f2f8e2e priority = 0 use_ssz_for_submit = false use_gzip_for_submit = false -l1_rpc_url = "http://172.16.32.10:8545" l1_proposer_pk = "39725efee3fb28614de3bacaffe4cc4bd8c436257e2c8bb887c4b5c4be45e76d" -l1_smart_contract_address = "0x9fCF7D13d10dEdF17d0f24C62f0cf4ED462f65b7" +l1_rollup_contract = "0x9fCF7D13d10dEdF17d0f24C62f0cf4ED462f65b7" [[builders]] name = "mgp-ordering" @@ -74,4 +73,4 @@ algo = "ordering-builder" discard_txs = true sorting = "max-profit" failed_order_retries = 1 -drop_failed_orders = true +drop_failed_orders = true \ No newline at end of file diff --git a/crates/gwyneth-rbuilder/src/main.rs b/crates/gwyneth-rbuilder/src/main.rs index 4e3c2d2a..f6a550fa 100644 --- a/crates/gwyneth-rbuilder/src/main.rs +++ b/crates/gwyneth-rbuilder/src/main.rs @@ -9,19 +9,19 @@ use gwyneth::{ cli::{create_gwyneth_nodes, GwynethArgs}, exex::GwynethFullNode, }; use rbuilder::{ - live_builder::{base_config::load_config_toml_and_env, cli::LiveBuilderConfig, config::Config}, + live_builder::{base_config::load_config_toml_and_env, cli::LiveBuilderConfig, config::Config, gwyneth::{EthApiStream, GwynethMempoolReciever}}, telemetry, }; -use reth::rpc::api::NetApiClient; +use reth::{network::NetworkEventListenerProvider, rpc::{api::NetApiClient, eth::EthApiServer}, transaction_pool::TransactionPool}; use reth_db_api::Database; use reth_node_builder::{EngineNodeLauncher, NodeConfig}; use reth_provider::{ providers::{BlockchainProvider, BlockchainProvider2}, DatabaseProviderFactory, HeaderProvider, StateProviderFactory, }; -use std::{path::PathBuf, process}; +use std::{path::PathBuf, pin::pin, process}; use tokio::task; -use tracing::error; +use tracing::{error, instrument::WithSubscriber}; // Prefer jemalloc for performance reasons. #[cfg(all(feature = "jemalloc", unix))] @@ -39,6 +39,21 @@ fn main() -> eyre::Result<()> { let l1_node_config = builder.config().clone(); let task_executor = builder.task_executor().clone(); let gwyneth_nodes = create_gwyneth_nodes(&arg, task_executor.clone(), &l1_node_config).await; + let gwyneth_mempools = gwyneth_nodes + .iter() + .map(|g| match g { + GwynethFullNode::Provider1(n) => n.pool.new_transactions_listener(), + GwynethFullNode::Provider2(n) => n.pool.new_transactions_listener(), + }) + .collect::>(); + + let l2_ethapis = gwyneth_nodes + .iter() + .map(|g| match g { + GwynethFullNode::Provider1(n) => n.rpc_registry.eth_handlers().pubsub, + GwynethFullNode::Provider2(n) => n.rpc_registry.eth_handlers().pubsub, + }) + .collect::>(); let enable_engine2 = arg.experimental; match enable_engine2 { @@ -58,7 +73,10 @@ fn main() -> eyre::Result<()> { .with_components(EthereumNode::components()) .with_add_ons::() .on_rpc_started(move |ctx, _| { - spawn_rbuilder(&arg, &l1_node_config, ctx.provider().clone(), l2_providers) + println!("Cecilia ==> on_rpc_started"); + let l1_ethapi = ctx.registry.eth_handlers().pubsub; +\ let l1_mempool = ctx.node().components.transaction_pool.new_transactions_listener(); + spawn_rbuilder(&arg, &l1_node_config, ctx.provider().clone(), l2_providers, l1_mempool, gwyneth_mempools) }) .install_exex("Rollup", move |ctx| async { Ok(gwyneth::exex::Rollup::new(ctx, gwyneth_nodes) @@ -96,7 +114,8 @@ fn main() -> eyre::Result<()> { .start()) }) .on_rpc_started(move |ctx, _| { - spawn_rbuilder(&arg, &l1_node_config, ctx.provider().clone(), l2_providers) + let l1_mempool = ctx.node().components.transaction_pool.new_transactions_listener(); + spawn_rbuilder(&arg, &l1_node_config, ctx.provider().clone(), l2_providers, l1_mempool, gwyneth_mempools) }) .launch() .await?; @@ -118,11 +137,14 @@ fn spawn_rbuilder( l1_node_config: &NodeConfig, provider: P, l2_providers: Vec

, + l1_mempool: GwynethMempoolReciever, + mempools: Vec ) -> eyre::Result<()> where DB: Database + Clone + 'static, P: DatabaseProviderFactory + StateProviderFactory + HeaderProvider + Clone + 'static, { + println!("Cecilia ==> spawn_rbuilder"); let arg = arg.clone(); let l1_node_config = l1_node_config.clone(); let _handle = task::spawn(async move { @@ -134,9 +156,75 @@ where config.l1_config.update_in_process_setting(&l1_node_config); config.base_config.update_in_process_setting(arg, l1_node_config); - // TODO: Check removing this is OK. It seems reth already sets up the global tracing - // subscriber, so this fails - // config.base_config.setup_tracing_subscriber().expect("Failed to set up rbuilder tracing subscriber"); + // Spawn redacted server that is safe for tdx builders to expose + telemetry::servers::redacted::spawn( + config.base_config().redacted_telemetry_server_address(), + ) + .await?; + + // Spawn debug server that exposes detailed operational information + telemetry::servers::full::spawn( + config.base_config.full_telemetry_server_address(), + config.version_for_telemetry(), + config.base_config.log_enable_dynamic, + ) + .await?; + let builder = config + .new_builder(provider, l2_providers, Some(l1_mempool), Some(mempools), Default::default()) + .await?; + + builder.run().await?; + + Ok::<(), eyre::Error>(()) + } + .await; + + if let Err(e) = result { + error!("Fatal rbuilder error: {}", e); + process::exit(1); + } + + error!("rbuilder stopped unexpectedly"); + process::exit(1); + }); + Ok(()) +} + + +use futures::future::IntoStream; +use futures::stream::TakeUntil; +use futures::{FutureExt, Stream, StreamExt}; + +fn spawn_rbuilder_( + arg: &GwynethArgs, + l1_node_config: &NodeConfig, + l1_provider: P, + l2_providers: Vec

, + l1_ethapi: dyn EthApiStream, + l2_ethapis: Vec, +) -> eyre::Result<()> +where + DB: Database + Clone + 'static, + P: DatabaseProviderFactory + StateProviderFactory + HeaderProvider + Clone + 'static, +{ + + + let a = l1_ethapi.new_headers_stream(); + let mut stream = pin!( + a.into_stream() + ); + + println!("Cecilia ==> spawn_rbuilder"); + let arg = arg.clone(); + let l1_node_config = l1_node_config.clone(); + let _handle = task::spawn(async move { + let result = async { + let mut config: Config = load_config_toml_and_env( + arg.rbuilder_config.clone().expect("Gwyneth-rbuilder needs config path") + )?; + // Where we set L1 rpc, proposer pk and rollup contract address + config.l1_config.update_in_process_setting(&l1_node_config); + config.base_config.update_in_process_setting(arg, l1_node_config); // Spawn redacted server that is safe for tdx builders to expose telemetry::servers::redacted::spawn( @@ -152,7 +240,7 @@ where ) .await?; let builder = config - .new_builder(provider, l2_providers, Default::default()) + .new_builder(l1_provider, l2_providers, Some(l1_mempool), Some(mempools), Default::default()) .await?; builder.run().await?; diff --git a/crates/rbuilder/src/bin/dummy-builder.rs b/crates/rbuilder/src/bin/dummy-builder.rs index 3b1e2b51..c4e0a941 100644 --- a/crates/rbuilder/src/bin/dummy-builder.rs +++ b/crates/rbuilder/src/bin/dummy-builder.rs @@ -21,16 +21,10 @@ use rbuilder::{ base_config::{ DEFAULT_EL_NODE_IPC_PATH, DEFAULT_INCOMING_BUNDLES_PORT, DEFAULT_IP, DEFAULT_RETH_DB_PATH, - }, - config::create_provider_factory, - layer2_info::{Layer2Info}, - order_input::{ + }, config::create_provider_factory, gwyneth::GwynethNodes, layer2_info::Layer2Info, order_input::{ OrderInputConfig, DEFAULT_INPUT_CHANNEL_BUFFER_SIZE, DEFAULT_RESULTS_CHANNEL_TIMEOUT, DEFAULT_SERVE_MAX_CONNECTIONS, - }, - payload_events::{MevBoostSlotData, MevBoostSlotDataGenerator}, - simulation::SimulatedOrderCommand, - LiveBuilder, + }, payload_events::{MevBoostSlotData, MevBoostSlotDataGenerator}, simulation::SimulatedOrderCommand, LiveBuilder }, primitives::{ mev_boost::{MevBoostRelay, RelayConfig}, @@ -74,7 +68,7 @@ async fn main() -> eyre::Result<()> { ); let layer2_info = Layer2Info::new(vec![], &vec![], &vec![]).await?; - + let gwyneth_nodes = GwynethNodes{ nodes: HashMap::default() }; let builder = LiveBuilder::< ProviderFactoryReopener>, Arc, @@ -106,10 +100,12 @@ async fn main() -> eyre::Result<()> { extra_data: Vec::new(), blocklist: Default::default(), global_cancellation: cancel.clone(), + l1_mempool: None, extra_rpc: RpcModule::new(()), sink_factory: Box::new(TraceBlockSinkFactory {}), builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))], layer2_info, + gwyneth_nodes, }; let ctrlc = tokio::spawn(async move { diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs index 715930e2..41568323 100644 --- a/crates/rbuilder/src/live_builder/base_config.rs +++ b/crates/rbuilder/src/live_builder/base_config.rs @@ -2,7 +2,7 @@ //! use crate::{ building::builders::UnfinishedBlockBuildingSinkFactory, - live_builder::{order_input::OrderInputConfig, LiveBuilder}, + live_builder::{gwyneth::MempoolListener, order_input::OrderInputConfig, LiveBuilder}, roothash::RootHashConfig, telemetry::{setup_reloadable_tracing_subscriber, LoggerConfig}, utils::{ @@ -10,13 +10,13 @@ use crate::{ ProviderFactoryReopener, ProviderFactoryUnchecked, Signer, }, }; -use ahash::HashSet; +use ahash::{HashMap, HashSet}; use alloy_primitives::{Address, B256}; use eyre::{eyre, Context, Result}; use gwyneth::cli::GwynethArgs; use jsonrpsee::RpcModule; use lazy_static::lazy_static; -use reth::{builder::NodeConfig, tasks::pool::BlockingTaskPool}; +use reth::{builder::NodeConfig, tasks::pool::BlockingTaskPool, transaction_pool::{EthPooledTransaction, NewTransactionEvent}}; use reth_chainspec::ChainSpec; use reth_db::{Database, DatabaseEnv}; use reth_node_core::args::utils::chain_value_parser; @@ -27,6 +27,7 @@ use reth_provider::{ use serde::{Deserialize, Deserializer}; use serde_with::{serde_as, DeserializeAs}; use sqlx::PgPool; +use tokio::sync::mpsc::Receiver; use std::{ env::var, fs::read_to_string, @@ -38,7 +39,7 @@ use std::{ }; use tracing::warn; -use super::SlotSource; +use super::{gwyneth::{GwynethMempoolReciever, GwynethNodes}, SlotSource}; use crate::live_builder::Layer2Info; @@ -50,7 +51,7 @@ const ENV_PREFIX: &str = "env:"; /// The final configuration should usually include one of this and use it to create the base LiveBuilder to then upgrade it as needed. #[serde_as] #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] -#[serde(default, deny_unknown_fields)] +#[serde(default)] pub struct BaseConfig { pub full_telemetry_server_port: u16, pub full_telemetry_server_ip: Option, @@ -229,38 +230,58 @@ impl BaseConfig { { let provider_factory = self.create_provider_factory()?; let l2_provider_factory = self.gwyneth_provider_reopeners()?; - self.create_builder_with_provider_factory::>, Arc, SlotSourceType>( + self.create_in_process_builder::>, Arc, SlotSourceType>( cancellation_token, sink_factory, slot_source, provider_factory, - l2_provider_factory + l2_provider_factory, + None, + None, ) .await } /// Allows instantiating a [`LiveBuilder`] with an existing provider factory - pub async fn create_builder_with_provider_factory( + pub async fn create_in_process_builder( &self, cancellation_token: tokio_util::sync::CancellationToken, sink_factory: Box, slot_source: SlotSourceType, provider: P, l2_providers: Vec

, + l1_mempool: Option, + mempools: Option>, ) -> eyre::Result> where DB: Database + Clone + 'static, P: DatabaseProviderFactory + StateProviderFactory + HeaderProvider + Clone + 'static, SlotSourceType: SlotSource, { + println!("Cecilia ==> BaseConfig::create_in_process_builder"); if self.l2_ipc_paths.is_none() { eyre::bail!("IPC should be provided with config or in-process GwynethArgs"); } - let layer2_info = tokio::runtime::Handle::current().block_on(Layer2Info::

::new( - l2_providers, - &self.l2_ipc_paths.clone().unwrap(), - &self.l2_server_ports.clone().unwrap(), - ))?; + let (gwyneth_nodes, layer2_info) = if let Some(mempools) = mempools { + ( + GwynethNodes::new(l2_providers, mempools, self.l2_server_ports.clone().unwrap())?, + Layer2Info { + nodes: HashMap::default(), + ipc_providers: Arc::new(std::sync::RwLock::new(HashMap::default())), + } + ) + } else { + ( + GwynethNodes { + nodes: HashMap::default(), + }, + tokio::runtime::Handle::current().block_on(Layer2Info::

::new( + l2_providers.clone(), + &self.l2_ipc_paths.clone().unwrap(), + &self.l2_server_ports.clone().unwrap(), + ))? + ) + }; Ok(LiveBuilder:: { watchdog_timeout: self.watchdog_timeout(), error_storage_path: self.error_storage_path.clone(), @@ -275,6 +296,7 @@ impl BaseConfig { blocklist: self.blocklist()?, global_cancellation: cancellation_token, + l1_mempool: l1_mempool.map(MempoolListener::new), extra_rpc: RpcModule::new(()), sink_factory, @@ -282,6 +304,7 @@ impl BaseConfig { run_sparse_trie_prefetcher: self.root_hash_use_sparse_trie, layer2_info, + gwyneth_nodes, }) } diff --git a/crates/rbuilder/src/live_builder/cli.rs b/crates/rbuilder/src/live_builder/cli.rs index eaf7861f..0c89aaf1 100644 --- a/crates/rbuilder/src/live_builder/cli.rs +++ b/crates/rbuilder/src/live_builder/cli.rs @@ -18,7 +18,7 @@ use crate::{ utils::build_info::Version, }; -use super::{base_config::BaseConfig, LiveBuilder}; +use super::{base_config::BaseConfig, gwyneth::GwynethMempoolReciever, LiveBuilder}; #[derive(Parser, Debug)] enum Cli { @@ -49,6 +49,8 @@ pub trait LiveBuilderConfig: Debug + DeserializeOwned + Sync { &self, provider: P, l2_providers: Vec

, + l1_mempoool: Option, + mempools: Option>, cancellation_token: CancellationToken, ) -> impl std::future::Future>> + Send @@ -109,7 +111,7 @@ where // For out-of-process builders only let l2_providers = config.base_config().gwyneth_provider_reopeners()?; let builder = config - .new_builder(provider, l2_providers, cancel.clone()) + .new_builder(provider, l2_providers, None, None, cancel.clone()) .await?; let ctrlc = tokio::spawn(async move { diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index 3c541f9d..dc9130f5 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -12,7 +12,7 @@ use super::{ }, block_sealing_bidder_factory::BlockSealingBidderFactory, relay_submit::{RelaySubmitSinkFactory, SubmissionConfig}, - }, + }, gwyneth::GwynethMempoolReciever, }; use crate::{ beacon_api_client::Client, @@ -311,14 +311,20 @@ impl LiveBuilderConfig for Config { } async fn new_builder( &self, + provider: P, l2_providers: Vec

, + l1_mempool: Option, + mempools: Option>, cancellation_token: tokio_util::sync::CancellationToken, ) -> eyre::Result> where DB: Database + Clone + 'static, P: DatabaseProviderFactory + StateProviderFactory + HeaderProvider + Clone + 'static, { + // println!("Cecilia ==> LiveBuilderConfig::new_builder {:?}", mempools.as_ref().unwrap().len()); + println!("Cecilia ==> LiveBuilderConfig::new_builder"); + let (sink_sealed_factory, relays) = self.l1_config.create_relays_sealed_sink_factory( self.base_config.chain_spec()?, Box::new(NullBidObserver {}), @@ -348,12 +354,14 @@ impl LiveBuilderConfig for Config { ); let live_builder = self .base_config - .create_builder_with_provider_factory( + .create_in_process_builder( cancellation_token, sink_factory, payload_event, provider, l2_providers, + l1_mempool, + mempools ) .await?; let root_hash_config = self.base_config.live_root_hash_config()?; diff --git a/crates/rbuilder/src/live_builder/gwyneth.rs b/crates/rbuilder/src/live_builder/gwyneth.rs new file mode 100644 index 00000000..e88d576d --- /dev/null +++ b/crates/rbuilder/src/live_builder/gwyneth.rs @@ -0,0 +1,174 @@ +use ahash::HashMap; +use alloy_eips::{BlockHashOrNumber, BlockId}; +use alloy_primitives::U256; +use alloy_provider::{IpcConnect, Provider, ProviderBuilder, RootProvider}; +use alloy_pubsub::PubSubFrontend; +use alloy_rpc_types::{Block, BlockTransactionsKind}; +use eyre::Result; +use futures::future::IntoStream; +use futures::stream::TakeUntil; +use futures::{FutureExt, Stream, StreamExt}; +use reth::network::NetworkInfo; +use reth::rpc::eth::pubsub::EthPubSubInner; +use reth::transaction_pool::{EthPoolTransaction, EthPooledTransaction, NewTransactionEvent}; +use reth_db::{Database, DatabaseEnv}; +use reth_evm::provider; +use reth_node_core::args::utils::chain_value_parser; +use reth_primitives::{Header, TransactionSignedEcRecovered}; +use reth_provider::{BlockReader, CanonStateSubscriptions, DatabaseProviderFactory, EvmEnvProvider, HeaderProvider, StateProvider, StateProviderFactory}; +use revm_primitives::B256; +use tokio::sync::mpsc::Receiver; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; +use std::future::Future; +use std::net::Ipv4Addr; +use std::path::{Path, PathBuf}; +use std::pin::pin; +use std::sync::{Arc, Mutex, RwLock}; +use std::task::Poll; +use std::time::Duration; +use tracing::{event, warn}; + +use crate::primitives::TransactionSignedEcRecoveredWithBlobs; + +use super::order_input::OrderInputConfig; + +pub type GwynethMempoolReciever = Receiver>; + +#[derive(Debug)] +pub struct GwynethNode

{ + pub mempool_listener: MempoolListener, + pub provider: P, + pub order_input_config: OrderInputConfig, +} + +#[derive(Debug, Default)] +pub struct GwynethNodes

{ + pub nodes: HashMap>, +} + +impl

GwynethNodes

+where + P: StateProviderFactory + HeaderProvider + Clone + 'static, + +{ + pub fn new( + providers: Vec

, + mempools: Vec, + server_ports: Vec, + ) -> Result { + println!("Cecilia ==> GwynethNodes::new {:?}", server_ports); + let mut nodes = HashMap::default(); + for ((provider, mempool), port) in providers + .into_iter() + .zip(mempools.into_iter()) + .zip(server_ports.iter()) + { + nodes.insert( + 0, + GwynethNode::

{ + mempool_listener: MempoolListener { inner: Arc::new(Mutex::new(mempool)) }, + provider: provider, + order_input_config: OrderInputConfig::new( + true, + false, + PathBuf::new(), + *port, + Ipv4Addr::new(0, 0, 0, 0), + 4096, + Duration::from_millis(50), + 10_000, + ), + }, + ); + } + Ok(Self { + nodes, + }) + } + + pub async fn get_latest_header(&self, chain_id: u64) -> Result> { + println!("Cecilia ==> GwynethNodes::get_latest_header {:?}", chain_id); + if let Some(provider) = self.nodes.get(&chain_id).map(|n| n.provider.clone()) { + let number = provider.last_block_number()?; + provider + .header_by_number(number) + .map_err(|e| eyre::eyre!("Error getting latest header for chain_id {}: {:?}", chain_id, e)) + + } else { + eyre::bail!("No provider found for chain_id: {}", chain_id) + } + } +} + +#[derive(Debug, Clone)] +pub struct MempoolListener { + inner: Arc> +} + +impl MempoolListener { + pub fn new(mempool: GwynethMempoolReciever) -> Self { + Self { + inner: Arc::new(Mutex::new(mempool)) + } + } +} + +impl Future for MempoolListener { + type Output = TransactionSignedEcRecoveredWithBlobs; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + println!("Cecilia ==> MempoolListener::poll"); + let mut this = self + .get_mut() + .inner + .try_lock() + .expect("Mempool listener mutex poisoned"); + match this.poll_recv(cx) { + std::task::Poll::Ready(Some(event)) => { + println!("New transaction: {:?}", event); + + let mut pooled_tx = event.transaction.transaction.clone(); + let tx = pooled_tx.transaction().clone(); + + let tx_with_blobs = if let Some(blob) = pooled_tx.take_blob().maybe_sidecar() { + TransactionSignedEcRecoveredWithBlobs { + tx, + blobs_sidecar: Arc::new(blob.clone()), + metadata: Default::default(), + } + } else { + TransactionSignedEcRecoveredWithBlobs::new_no_blobs(tx).unwrap() + }; + std::task::Poll::Ready(tx_with_blobs) + }, + std::task::Poll::Ready(None) => { + panic!("Mempool listener closed") + }, + std::task::Poll::Pending => std::task::Poll::Pending, + } + } +} + +pub trait EthApiStream { + pub fn new_headers_stream(&self) -> impl Stream; + pub fn full_pending_transaction_stream( + &self, + ) -> impl Stream; +} + +impl EthApiStream for EthPubSubInner +where + Provider: BlockReader + EvmEnvProvider + 'static, + Events: CanonStateSubscriptions + 'static, + Network: NetworkInfo + 'static, +{ + pub fn new_headers_stream(&self) -> impl Stream { + self.events.canonical_state_stream() + } + + pub fn full_pending_transaction_stream( + &self, + ) -> impl Stream { + self.events.new_transaction_stream() + } +} \ No newline at end of file diff --git a/crates/rbuilder/src/live_builder/layer2_info.rs b/crates/rbuilder/src/live_builder/layer2_info.rs index 831fde19..14f6dc72 100644 --- a/crates/rbuilder/src/live_builder/layer2_info.rs +++ b/crates/rbuilder/src/live_builder/layer2_info.rs @@ -22,7 +22,7 @@ pub struct GwynethNode

{ pub order_input_config: OrderInputConfig, } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct Layer2Info

{ pub ipc_providers: Arc, PathBuf)>>>, // Changed to RwLock pub nodes: HashMap>, diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index 0cdd2a65..4a75439b 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -8,6 +8,7 @@ pub mod order_input; pub mod payload_events; pub mod simulation; pub mod watchdog; +pub mod gwyneth; use crate::{ building::{ @@ -28,6 +29,7 @@ use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_primitives::{Address, B256, U256}; use building::BlockBuildingPool; use eyre::Context; +use gwyneth::{GwynethNodes, MempoolListener}; use jsonrpsee::RpcModule; use payload_events::MevBoostSlotData; use reth::{primitives::Header, providers::HeaderProvider}; @@ -82,11 +84,13 @@ where pub blocklist: HashSet

, pub global_cancellation: CancellationToken, + pub l1_mempool: Option, pub sink_factory: Box, pub builders: Vec>>, pub extra_rpc: RpcModule<()>, pub layer2_info: Layer2Info

, + pub gwyneth_nodes: GwynethNodes

, } impl LiveBuilder @@ -104,6 +108,7 @@ where } pub async fn run(self) -> eyre::Result<()> { + println!("Cecilia ==> LiveBuilder::run"); info!("Builder block list size: {}", self.blocklist.len(),); info!( "Builder coinbase address: {:?}", @@ -119,11 +124,13 @@ where let mut inner_jobs_handles = Vec::new(); let mut payload_events_channel = self.blocks_source.recv_slot_channel(); + // Cecilia!: call start_orderpool_jobs L1 let mut orderpool_subscribers = HashMap::default(); let orderpool_subscriber = { let (handle, sub) = start_orderpool_jobs( self.order_input_config, self.provider.clone(), + self.l1_mempool.clone(), self.extra_rpc, self.global_cancellation.clone(), ) @@ -136,11 +143,13 @@ where let mut providers = HashMap::default(); providers.insert(self.chain_chain_spec.chain.id(), self.provider.clone()); - for (chain_id, node) in self.layer2_info.nodes.iter() { + // Cecilia!: call start_orderpool_jobs L2 + for (chain_id, node) in self.gwyneth_nodes.nodes.iter() { let orderpool_subscriber = { let (handle, sub) = start_orderpool_jobs( node.order_input_config.clone(), node.provider.clone(), + Some(node.mempool_listener.clone()), RpcModule::new(()), self.global_cancellation.clone(), ) diff --git a/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs b/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs index 1a1e0e18..39854916 100644 --- a/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs +++ b/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs @@ -26,26 +26,29 @@ pub async fn spawn_clean_orderpool_job

( where P: StateProviderFactory + 'static, { - let ipc = IpcConnect::new(config.ipc_path); - let provider = ProviderBuilder::new().on_ipc(ipc).await?; + // let ipc = IpcConnect::new(config.ipc_path); + // let provider = ProviderBuilder::new().on_ipc(ipc).await?; let handle = tokio::spawn(async move { info!("Clean orderpool job: started"); - let new_block_stream = match provider.subscribe_blocks().await { - Ok(subscription) => subscription - .into_stream() - .take_until(global_cancellation.cancelled()), - Err(err) => { - error!("Failed to subscribe to a new block stream: {:?}", err); - global_cancellation.cancel(); - return; - } - }; - let mut new_block_stream = pin!(new_block_stream); - - while let Some(block) = new_block_stream.next().await { - let block_number = block.header.number; + // let new_block_stream = match provider.subscribe_blocks().await { + // Ok(subscription) => subscription + // .into_stream() + // .take_until(global_cancellation.cancelled()), + // Err(err) => { + // error!("Failed to subscribe to a new block stream: {:?}", err); + // global_cancellation.cancel(); + // return; + // } + // }; + // let mut new_block_stream = pin!(new_block_stream); + let mut block_number = provider_factory.last_block_number().unwrap(); + loop { + if provider_factory.last_block_number().unwrap() == block_number { + continue; + } + block_number = provider_factory.last_block_number().unwrap(); set_current_block(block_number); let state = match provider_factory.latest() { Ok(state) => state, @@ -72,7 +75,7 @@ where "Cleaned orderpool", ); } - + global_cancellation.cancel(); info!("Clean orderpool job: finished"); }); diff --git a/crates/rbuilder/src/live_builder/order_input/mempool_fetcher.rs b/crates/rbuilder/src/live_builder/order_input/mempool_fetcher.rs new file mode 100644 index 00000000..b429c239 --- /dev/null +++ b/crates/rbuilder/src/live_builder/order_input/mempool_fetcher.rs @@ -0,0 +1,171 @@ +use super::{OrderInputConfig, ReplaceableOrderPoolCommand}; +use crate::{ + live_builder::gwyneth::MempoolListener, primitives::{MempoolTx, Order, TransactionSignedEcRecoveredWithBlobs}, telemetry::add_txfetcher_time_to_query +}; +use alloy_primitives::{hex, Bytes, FixedBytes}; +use alloy_provider::{IpcConnect, Provider, ProviderBuilder, RootProvider}; +use alloy_pubsub::PubSubFrontend; +use futures::{FutureExt, StreamExt}; +use std::{pin::pin, time::Instant}; +use tokio::{ + sync::{mpsc, mpsc::error::SendTimeoutError}, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, trace}; + +/// Subscribes to EL mempool and pushes new txs as orders in results. +/// This version allows 4844 by subscribing to subscribe_pending_txs to get the hashes and then calling eth_getRawTransactionByHash +/// to get the raw tx that, in case of 4844 tx, may include blobs. +/// In the future we may consider updating reth so we can process blob txs in a different task to avoid slowing down non blob txs. +pub async fn subscribe_to_mempool_with_blobs( + config: OrderInputConfig, + mempool: MempoolListener, + results: mpsc::Sender, + global_cancel: CancellationToken, +) -> eyre::Result> { + println!("Cecilia ==> mempool_fetcher::subscribe_to_mempool_with_blobs"); + let handle = tokio::spawn(async move { + info!("Subscribe to txpool with blobs: started"); + + let mut stream = pin!( + mempool.into_stream().take_until(global_cancel.cancelled()) + ); + + while let Some(tx) = stream.next().await { + println!("Cecilia debug: Some txn arrived on {:?}", tx); + + let start = Instant::now(); + let tx = MempoolTx::new(tx); + + let order = Order::Tx(tx); + let order_id = order.id(); + + let parse_duration = start.elapsed(); + trace!(order = ?order.id(), parse_duration_mus = parse_duration.as_micros(), "Mempool transaction received with blobs"); + + add_txfetcher_time_to_query(parse_duration); + println!( + "Dani debug: About to send order to results channel. Order ID: {:?}", + order_id + ); + match results + .send_timeout( + ReplaceableOrderPoolCommand::Order(order), + config.results_channel_timeout, + ) + .await + { + Ok(()) => {} + Err(SendTimeoutError::Timeout(_)) => { + error!("Failed to send txpool tx to results channel, timeout"); + } + Err(SendTimeoutError::Closed(_)) => { + break; + } + } + println!( + "Dani debug: Successfully sent order to results channel. Order ID: {:?}", + order_id + ); + } + + // stream is closed, cancelling token because builder can't work without this stream + global_cancel.cancel(); + info!("Subscribe to txpool: finished"); + }); + + Ok(handle) +} + + +// #[cfg(test)] +// mod test { +// use super::*; +// use alloy_consensus::{SidecarBuilder, SimpleCoder}; +// use alloy_network::{EthereumWallet, TransactionBuilder, TransactionBuilder4844}; +// use alloy_node_bindings::Anvil; +// use alloy_primitives::U256; +// use alloy_provider::{Provider, ProviderBuilder}; +// use alloy_rpc_types::TransactionRequest; +// use alloy_signer_local::PrivateKeySigner; + +// #[tokio::test] +// /// Test that the fetcher can retrieve transactions (both normal and blob) from the txpool +// async fn test_fetcher_retrieves_transactions() { +// let anvil = Anvil::new() +// .args(["--ipc", "/tmp/anvil.ipc"]) +// .try_spawn() +// .unwrap(); + +// let (sender, mut receiver) = mpsc::channel(10); +// subscribe_to_mempool_with_blobs( +// OrderInputConfig::default_e2e(), +// sender, +// CancellationToken::new(), +// ) +// .await +// .unwrap(); + +// let signer: PrivateKeySigner = anvil.keys()[0].clone().into(); +// let wallet = EthereumWallet::from(signer); + +// let provider = ProviderBuilder::new() +// .with_recommended_fillers() +// .wallet(wallet) +// .on_http(anvil.endpoint().parse().unwrap()); + +// let alice = anvil.addresses()[0]; + +// let sidecar: SidecarBuilder = +// SidecarBuilder::from_slice("Blobs are fun!".as_bytes()); +// let sidecar = sidecar.build().unwrap(); + +// let gas_price = provider.get_gas_price().await.unwrap(); +// let eip1559_est = provider.estimate_eip1559_fees(None).await.unwrap(); + +// let tx = TransactionRequest::default() +// .with_to(alice) +// .with_nonce(0) +// .with_max_fee_per_blob_gas(gas_price) +// .with_max_fee_per_gas(eip1559_est.max_fee_per_gas) +// .with_max_priority_fee_per_gas(eip1559_est.max_priority_fee_per_gas) +// .with_blob_sidecar(sidecar); + +// let pending_tx = provider.send_transaction(tx).await.unwrap(); +// let recv_tx = receiver.recv().await.unwrap(); + +// let tx_with_blobs = match recv_tx { +// ReplaceableOrderPoolCommand::Order(Order::Tx(MempoolTx { tx_with_blobs })) => { +// Some(tx_with_blobs) +// } +// _ => None, +// } +// .unwrap(); + +// assert_eq!(tx_with_blobs.hash(), *pending_tx.tx_hash()); +// assert_eq!(tx_with_blobs.blobs_sidecar.blobs.len(), 1); + +// // send another tx without blobs +// let tx = TransactionRequest::default() +// .with_to(alice) +// .with_nonce(1) +// .with_value(U256::from(1)) +// .with_max_fee_per_gas(eip1559_est.max_fee_per_gas) +// .with_max_priority_fee_per_gas(eip1559_est.max_priority_fee_per_gas); + +// let pending_tx = provider.send_transaction(tx).await.unwrap(); +// let recv_tx = receiver.recv().await.unwrap(); + +// let tx_without_blobs = match recv_tx { +// ReplaceableOrderPoolCommand::Order(Order::Tx(MempoolTx { tx_with_blobs })) => { +// Some(tx_with_blobs) +// } +// _ => None, +// } +// .unwrap(); + +// assert_eq!(tx_without_blobs.hash(), *pending_tx.tx_hash()); +// assert_eq!(tx_without_blobs.blobs_sidecar.blobs.len(), 0); +// } +// } diff --git a/crates/rbuilder/src/live_builder/order_input/mod.rs b/crates/rbuilder/src/live_builder/order_input/mod.rs index d0f33b62..cac95b1d 100644 --- a/crates/rbuilder/src/live_builder/order_input/mod.rs +++ b/crates/rbuilder/src/live_builder/order_input/mod.rs @@ -7,12 +7,13 @@ pub mod orderpool; pub mod replaceable_order_sink; pub mod rpc_server; pub mod txpool_fetcher; +pub mod mempool_fetcher; use self::{ orderpool::{OrderPool, OrderPoolSubscriptionId}, replaceable_order_sink::ReplaceableOrderSink, }; -use crate::primitives::{serialize::CancelShareBundle, BundleReplacementKey, Order}; +use crate::{backtest::fetch::mempool, primitives::{serialize::CancelShareBundle, BundleReplacementKey, Order}}; use jsonrpsee::RpcModule; use reth_provider::StateProviderFactory; use std::{ @@ -25,7 +26,7 @@ use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::{info, trace, warn}; -use super::base_config::BaseConfig; +use super::{base_config::BaseConfig, gwyneth::MempoolListener}; /// Thread safe access to OrderPool to get orderflow #[derive(Debug)] @@ -173,6 +174,7 @@ impl ReplaceableOrderPoolCommand { } } +// Cecilia! /// Starts all the tokio tasks to handle order flow: /// - Mempool /// - RPC @@ -182,13 +184,14 @@ impl ReplaceableOrderPoolCommand { pub async fn start_orderpool_jobs

( config: OrderInputConfig, provider_factory: P, + mempool: Option, extra_rpc: RpcModule<()>, global_cancel: CancellationToken, ) -> eyre::Result<(JoinHandle<()>, OrderPoolSubscriber)> where P: StateProviderFactory + 'static, { - println!("Dani debug: start_orderpool_jobs"); + println!("Cecilia ==> start_orderpool_jobs"); if config.ignore_cancellable_orders { warn!("ignore_cancellable_orders is set to true, some order input is ignored"); } @@ -217,12 +220,19 @@ where global_cancel.clone(), ) .await?; - let txpool_fetcher = txpool_fetcher::subscribe_to_txpool_with_blobs( - config.clone(), - order_sender.clone(), - global_cancel.clone(), - ) - .await?; + let txpool_fetcher = match mempool { + Some(mempool) => mempool_fetcher::subscribe_to_mempool_with_blobs( + config.clone(), + mempool.clone(), + order_sender.clone(), + global_cancel.clone(), + ).await?, + None => txpool_fetcher::subscribe_to_txpool_with_blobs( + config.clone(), + order_sender.clone(), + global_cancel.clone(), + ).await? + }; let handle = tokio::spawn(async move { info!("OrderPoolJobs: started"); diff --git a/crates/rbuilder/src/live_builder/payload_events/mod.rs b/crates/rbuilder/src/live_builder/payload_events/mod.rs index 2cc21acf..9013c834 100644 --- a/crates/rbuilder/src/live_builder/payload_events/mod.rs +++ b/crates/rbuilder/src/live_builder/payload_events/mod.rs @@ -111,7 +111,7 @@ impl MevBoostSlotDataGenerator { /// Note that with MEV-boost the validator may change the fee_recipient when registering to the Relays. pub fn spawn(self) -> (JoinHandle<()>, mpsc::UnboundedReceiver) { //let relays = RelaysForSlotData::new(&self.relays); - + println!("==> MevBoostSlotDataGenerator::spawn cl clients \n{:?}", self.cls); let (send, receive) = mpsc::unbounded_channel(); let handle = tokio::spawn(async move { let mut source = PayloadSourceMuxer::new( diff --git a/crates/rbuilder/src/live_builder/simulation/mod.rs b/crates/rbuilder/src/live_builder/simulation/mod.rs index fc69de4a..fadf066f 100644 --- a/crates/rbuilder/src/live_builder/simulation/mod.rs +++ b/crates/rbuilder/src/live_builder/simulation/mod.rs @@ -221,6 +221,7 @@ mod tests { cancel.clone(), ); + // Cecilia! // Create a simple tx that sends to coinbase 5 wei. let coinbase_profit = 5; // max_priority_fee will be 0 diff --git a/crates/rbuilder/src/proposing/mod.rs b/crates/rbuilder/src/proposing/mod.rs index a7894433..e55e3fac 100644 --- a/crates/rbuilder/src/proposing/mod.rs +++ b/crates/rbuilder/src/proposing/mod.rs @@ -79,12 +79,6 @@ impl BlockProposer { }; let propose_data = propose_data.abi_encode(); - // if num_txs == 1 { - // println!("skip propose"); - // // If there's only the payout tx, don't propose - // return Ok(()); - // } - let decoded_transactions: Vec = decode_transactions(&meta.txList); println!("decoded_transactions: {:?}", decoded_transactions);