Skip to content

Commit

Permalink
mempool impl
Browse files Browse the repository at this point in the history
  • Loading branch information
CeciliaZ030 committed Dec 17, 2024
1 parent 2e03125 commit 2c255ac
Show file tree
Hide file tree
Showing 16 changed files with 614 additions and 76 deletions.
60 changes: 60 additions & 0 deletions config-in-process-copy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
log_json = true
log_level = "info,rbuilder=debug"
redacted_telemetry_server_port = 6061
redacted_telemetry_server_ip = "0.0.0.0"
full_telemetry_server_port = 6060
full_telemetry_server_ip = "0.0.0.0"

chain = "../../genesis.json"
reth_datadir = "/data/reth/execution-data"
el_node_ipc_path = "/tmp/reth.ipc"
gwyneth_chain_ids = [160010, 160011]
l2_server_ports = [8646, 8647]

coinbase_secret_key = "607a11b45a7219cc61a3d9c5fd08c7eebd602a6a19a977f8d3771d5711a550f2"
relay_secret_key = "607a11b45a7219cc61a3d9c5fd08c7eebd602a6a19a977f8d3771d5711a550f2"
optimistic_relay_secret_key = "607a11b45a7219cc61a3d9c5fd08c7eebd602a6a19a977f8d3771d5711a550f2"

cl_node_url = ["http://cl-4-lighthouse-gwyneth:4000"]
jsonrpc_server_port = 8645
jsonrpc_server_ip = "0.0.0.0"
extra_data = "🌸🤖"
genesis_fork_version = "0x10000038"

dry_run = false
dry_run_validation_url = "http://localhost:8545"

ignore_cancellable_orders = true

max_concurrent_seals = 4

sbundle_mergeabe_signers = []
# slot_delta_to_start_submits_ms is usually negative since we start bidding BEFORE the slot start
# slot_delta_to_start_submits_ms = -5000
live_builders = ["mp-ordering"]
watchdog_timeout_sec = 99999

[[relays]]
name = "gwyneth"
url = "http://0xa55c1285d84ba83a5ad26420cd5ad3091e49c55a813eee651cd467db38a8c8e63192f47955e9376f6b42f6d190571cb5@mev-relay-api:9062"
priority = 0
use_ssz_for_submit = false
use_gzip_for_submit = false
l1_proposer_pk = "39725efee3fb28614de3bacaffe4cc4bd8c436257e2c8bb887c4b5c4be45e76d"
l1_rollup_contract = "0x9fCF7D13d10dEdF17d0f24C62f0cf4ED462f65b7"

[[builders]]
name = "mgp-ordering"
algo = "ordering-builder"
discard_txs = true
sorting = "mev-gas-price"
failed_order_retries = 1
drop_failed_orders = true

[[builders]]
name = "mp-ordering"
algo = "ordering-builder"
discard_txs = true
sorting = "max-profit"
failed_order_retries = 1
drop_failed_orders = true
9 changes: 4 additions & 5 deletions config-in-process.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ redacted_telemetry_server_ip = "0.0.0.0"
full_telemetry_server_port = 6060
full_telemetry_server_ip = "0.0.0.0"

chain = "/network-configs/genesis.json"

# chain = "/network-configs/genesis.json"
chain = "../../genesis.json" # running from ./target/debug
# Doesn't matter cuz we override it
# ===================
reth_datadir = "/data/reth/execution-data"
Expand Down Expand Up @@ -56,9 +56,8 @@ url = "http://0xac6e77dfe25ecd6110b8e780608cce0dab71fdd5ebea22a16c0205200f2f8e2e
priority = 0
use_ssz_for_submit = false
use_gzip_for_submit = false
l1_rpc_url = "http://172.16.32.10:8545"
l1_proposer_pk = "39725efee3fb28614de3bacaffe4cc4bd8c436257e2c8bb887c4b5c4be45e76d"
l1_smart_contract_address = "0x9fCF7D13d10dEdF17d0f24C62f0cf4ED462f65b7"
l1_rollup_contract = "0x9fCF7D13d10dEdF17d0f24C62f0cf4ED462f65b7"

[[builders]]
name = "mgp-ordering"
Expand All @@ -74,4 +73,4 @@ algo = "ordering-builder"
discard_txs = true
sorting = "max-profit"
failed_order_retries = 1
drop_failed_orders = true
drop_failed_orders = true
108 changes: 98 additions & 10 deletions crates/gwyneth-rbuilder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ use gwyneth::{
cli::{create_gwyneth_nodes, GwynethArgs}, exex::GwynethFullNode,
};
use rbuilder::{
live_builder::{base_config::load_config_toml_and_env, cli::LiveBuilderConfig, config::Config},
live_builder::{base_config::load_config_toml_and_env, cli::LiveBuilderConfig, config::Config, gwyneth::{EthApiStream, GwynethMempoolReciever}},
telemetry,
};
use reth::rpc::api::NetApiClient;
use reth::{network::NetworkEventListenerProvider, rpc::{api::NetApiClient, eth::EthApiServer}, transaction_pool::TransactionPool};
use reth_db_api::Database;
use reth_node_builder::{EngineNodeLauncher, NodeConfig};
use reth_provider::{
providers::{BlockchainProvider, BlockchainProvider2},
DatabaseProviderFactory, HeaderProvider, StateProviderFactory,
};
use std::{path::PathBuf, process};
use std::{path::PathBuf, pin::pin, process};
use tokio::task;
use tracing::error;
use tracing::{error, instrument::WithSubscriber};

// Prefer jemalloc for performance reasons.
#[cfg(all(feature = "jemalloc", unix))]
Expand All @@ -39,6 +39,21 @@ fn main() -> eyre::Result<()> {
let l1_node_config = builder.config().clone();
let task_executor = builder.task_executor().clone();
let gwyneth_nodes = create_gwyneth_nodes(&arg, task_executor.clone(), &l1_node_config).await;
let gwyneth_mempools = gwyneth_nodes
.iter()
.map(|g| match g {
GwynethFullNode::Provider1(n) => n.pool.new_transactions_listener(),
GwynethFullNode::Provider2(n) => n.pool.new_transactions_listener(),
})
.collect::<Vec<_>>();

let l2_ethapis = gwyneth_nodes
.iter()
.map(|g| match g {
GwynethFullNode::Provider1(n) => n.rpc_registry.eth_handlers().pubsub,
GwynethFullNode::Provider2(n) => n.rpc_registry.eth_handlers().pubsub,
})
.collect::<Vec<_>>();

let enable_engine2 = arg.experimental;
match enable_engine2 {
Expand All @@ -58,7 +73,10 @@ fn main() -> eyre::Result<()> {
.with_components(EthereumNode::components())
.with_add_ons::<EthereumAddOns>()
.on_rpc_started(move |ctx, _| {
spawn_rbuilder(&arg, &l1_node_config, ctx.provider().clone(), l2_providers)
println!("Cecilia ==> on_rpc_started");
let l1_ethapi = ctx.registry.eth_handlers().pubsub;
\ let l1_mempool = ctx.node().components.transaction_pool.new_transactions_listener();
spawn_rbuilder(&arg, &l1_node_config, ctx.provider().clone(), l2_providers, l1_mempool, gwyneth_mempools)
})
.install_exex("Rollup", move |ctx| async {
Ok(gwyneth::exex::Rollup::new(ctx, gwyneth_nodes)
Expand Down Expand Up @@ -96,7 +114,8 @@ fn main() -> eyre::Result<()> {
.start())
})
.on_rpc_started(move |ctx, _| {
spawn_rbuilder(&arg, &l1_node_config, ctx.provider().clone(), l2_providers)
let l1_mempool = ctx.node().components.transaction_pool.new_transactions_listener();
spawn_rbuilder(&arg, &l1_node_config, ctx.provider().clone(), l2_providers, l1_mempool, gwyneth_mempools)
})
.launch()
.await?;
Expand All @@ -118,11 +137,14 @@ fn spawn_rbuilder<P, DB>(
l1_node_config: &NodeConfig,
provider: P,
l2_providers: Vec<P>,
l1_mempool: GwynethMempoolReciever,
mempools: Vec<GwynethMempoolReciever>
) -> eyre::Result<()>
where
DB: Database + Clone + 'static,
P: DatabaseProviderFactory<DB> + StateProviderFactory + HeaderProvider + Clone + 'static,
{
println!("Cecilia ==> spawn_rbuilder");
let arg = arg.clone();
let l1_node_config = l1_node_config.clone();
let _handle = task::spawn(async move {
Expand All @@ -134,9 +156,75 @@ where
config.l1_config.update_in_process_setting(&l1_node_config);
config.base_config.update_in_process_setting(arg, l1_node_config);

// TODO: Check removing this is OK. It seems reth already sets up the global tracing
// subscriber, so this fails
// config.base_config.setup_tracing_subscriber().expect("Failed to set up rbuilder tracing subscriber");
// Spawn redacted server that is safe for tdx builders to expose
telemetry::servers::redacted::spawn(
config.base_config().redacted_telemetry_server_address(),
)
.await?;

// Spawn debug server that exposes detailed operational information
telemetry::servers::full::spawn(
config.base_config.full_telemetry_server_address(),
config.version_for_telemetry(),
config.base_config.log_enable_dynamic,
)
.await?;
let builder = config
.new_builder(provider, l2_providers, Some(l1_mempool), Some(mempools), Default::default())
.await?;

builder.run().await?;

Ok::<(), eyre::Error>(())
}
.await;

if let Err(e) = result {
error!("Fatal rbuilder error: {}", e);
process::exit(1);
}

error!("rbuilder stopped unexpectedly");
process::exit(1);
});
Ok(())
}


use futures::future::IntoStream;
use futures::stream::TakeUntil;
use futures::{FutureExt, Stream, StreamExt};

fn spawn_rbuilder_<P, DB>(
arg: &GwynethArgs,
l1_node_config: &NodeConfig,
l1_provider: P,
l2_providers: Vec<P>,
l1_ethapi: dyn EthApiStream,
l2_ethapis: Vec<dyn EthApiStream>,
) -> eyre::Result<()>
where
DB: Database + Clone + 'static,
P: DatabaseProviderFactory<DB> + StateProviderFactory + HeaderProvider + Clone + 'static,
{


let a = l1_ethapi.new_headers_stream();
let mut stream = pin!(
a.into_stream()
);

println!("Cecilia ==> spawn_rbuilder");
let arg = arg.clone();
let l1_node_config = l1_node_config.clone();
let _handle = task::spawn(async move {
let result = async {
let mut config: Config = load_config_toml_and_env(
arg.rbuilder_config.clone().expect("Gwyneth-rbuilder needs config path")
)?;
// Where we set L1 rpc, proposer pk and rollup contract address
config.l1_config.update_in_process_setting(&l1_node_config);
config.base_config.update_in_process_setting(arg, l1_node_config);

// Spawn redacted server that is safe for tdx builders to expose
telemetry::servers::redacted::spawn(
Expand All @@ -152,7 +240,7 @@ where
)
.await?;
let builder = config
.new_builder(provider, l2_providers, Default::default())
.new_builder(l1_provider, l2_providers, Some(l1_mempool), Some(mempools), Default::default())
.await?;

builder.run().await?;
Expand Down
14 changes: 5 additions & 9 deletions crates/rbuilder/src/bin/dummy-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,10 @@ use rbuilder::{
base_config::{
DEFAULT_EL_NODE_IPC_PATH, DEFAULT_INCOMING_BUNDLES_PORT, DEFAULT_IP,
DEFAULT_RETH_DB_PATH,
},
config::create_provider_factory,
layer2_info::{Layer2Info},
order_input::{
}, config::create_provider_factory, gwyneth::GwynethNodes, layer2_info::Layer2Info, order_input::{
OrderInputConfig, DEFAULT_INPUT_CHANNEL_BUFFER_SIZE, DEFAULT_RESULTS_CHANNEL_TIMEOUT,
DEFAULT_SERVE_MAX_CONNECTIONS,
},
payload_events::{MevBoostSlotData, MevBoostSlotDataGenerator},
simulation::SimulatedOrderCommand,
LiveBuilder,
}, payload_events::{MevBoostSlotData, MevBoostSlotDataGenerator}, simulation::SimulatedOrderCommand, LiveBuilder
},
primitives::{
mev_boost::{MevBoostRelay, RelayConfig},
Expand Down Expand Up @@ -74,7 +68,7 @@ async fn main() -> eyre::Result<()> {
);

let layer2_info = Layer2Info::new(vec![], &vec![], &vec![]).await?;

let gwyneth_nodes = GwynethNodes{ nodes: HashMap::default() };
let builder = LiveBuilder::<
ProviderFactoryReopener<Arc<DatabaseEnv>>,
Arc<DatabaseEnv>,
Expand Down Expand Up @@ -106,10 +100,12 @@ async fn main() -> eyre::Result<()> {
extra_data: Vec::new(),
blocklist: Default::default(),
global_cancellation: cancel.clone(),
l1_mempool: None,
extra_rpc: RpcModule::new(()),
sink_factory: Box::new(TraceBlockSinkFactory {}),
builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))],
layer2_info,
gwyneth_nodes,
};

let ctrlc = tokio::spawn(async move {
Expand Down
Loading

0 comments on commit 2c255ac

Please sign in to comment.