Skip to content

Commit

Permalink
In-process rbuilder orderpool <> reth transaction-pool connection (fl…
Browse files Browse the repository at this point in the history
…ashbots#339)

Adds a new method to `LiveBuilder` `connect_to_transaction_pool`
allowing in-process connection between the reth transaction pool and
rbuilder orderpool.

This fixes an issue where the orderpool does not receive any
transactions that were cached on disk by reth on start up, and finally
removes the requirement for an ipc connection when running in-process.
  • Loading branch information
liamaharon authored Jan 14, 2025
1 parent 945f039 commit e0ba509
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 28 deletions.
12 changes: 6 additions & 6 deletions crates/op-rbuilder/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use reth_primitives::TransactionSigned;
use reth_provider::{BlockReader, CanonStateSubscriptions, DatabaseProviderFactory};
use reth_tracing::tracing::{debug, info};
use reth_transaction_pool::{
blobstore::DiskFileBlobStore, CoinbaseTipOrdering, EthPooledTransaction,
blobstore::DiskFileBlobStore, CoinbaseTipOrdering, EthPooledTransaction, Pool,
TransactionValidationTaskExecutor,
};
use reth_trie_db::MerklePatriciaTrie;
Expand Down Expand Up @@ -194,16 +194,16 @@ where
.require_l1_data_gas_fee(!ctx.config().dev.dev)
});

let bundle_ops = BundlePoolOps::new(ctx.provider().clone(), self.config)
.await
.expect("Failed to instantiate RbuilderBundlePoolOps");
let transaction_pool = OpRbuilderTransactionPool::new(
let tx_pool = Pool::new(
validator,
CoinbaseTipOrdering::default(),
blob_store,
bundle_ops,
ctx.pool_config(),
);
let bundle_ops = BundlePoolOps::new(ctx.provider().clone(), tx_pool.clone(), self.config)
.await
.expect("Failed to instantiate RbuilderBundlePoolOps");
let transaction_pool = OpRbuilderTransactionPool::new(tx_pool, bundle_ops);

info!(target: "reth::cli", "Transaction pool initialized");
let transactions_path = data_dir.txpool_transactions();
Expand Down
1 change: 1 addition & 0 deletions crates/op-rbuilder/payload_builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ optimism = [
"reth-provider/optimism",
"revm/optimism",
"reth-optimism-consensus/optimism",
"reth-optimism-payload-builder/optimism"
]
73 changes: 72 additions & 1 deletion crates/rbuilder/src/live_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
simulation::OrderSimulationPool,
watchdog::spawn_watchdog_thread,
},
primitives::{MempoolTx, Order, TransactionSignedEcRecoveredWithBlobs},
provider::StateProviderFactory,
telemetry::inc_active_slots,
utils::{
Expand All @@ -32,12 +33,17 @@ use eyre::Context;
use jsonrpsee::RpcModule;
use order_input::ReplaceableOrderPoolCommand;
use payload_events::MevBoostSlotData;
use reth::transaction_pool::{
BlobStore, EthPooledTransaction, Pool, TransactionListenerKind, TransactionOrdering,
TransactionPool, TransactionValidator,
};
use reth_chainspec::ChainSpec;
use reth_primitives::TransactionSignedEcRecovered;
use std::{cmp::min, fmt::Debug, path::PathBuf, sync::Arc, time::Duration};
use time::OffsetDateTime;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};

#[derive(Debug, Clone)]
pub struct TimingsConfig {
Expand Down Expand Up @@ -291,6 +297,43 @@ where
Ok(())
}

/// Connect the builder to a reth [`TransactionPool`].
///
/// This will
/// 1. Add pending and queued transactions to the [`OrderPool`]
/// 2. Subscribe to the pool directly, so the builder is not reliant on
/// IPC to be notified of new transactions.
pub async fn connect_to_transaction_pool<V, T, S>(
&self,
pool: Pool<V, T, S>,
) -> Result<(), eyre::Error>
where
V: TransactionValidator<Transaction = EthPooledTransaction> + 'static,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
// Initialize the orderpool with every item in the reth pool.
for tx in pool
.all_transactions()
.pending_recovered()
.chain(pool.all_transactions().queued_recovered())
{
try_send_to_orderpool(tx, self.orderpool_sender.clone(), pool.clone()).await;
}

// Subscribe to new transactions in-process.
let mut recv = pool.new_transactions_listener_for(TransactionListenerKind::All);
let orderpool_sender = self.orderpool_sender.clone();
tokio::spawn(async move {
while let Some(e) = recv.recv().await {
let tx = e.transaction.transaction.transaction().clone();
try_send_to_orderpool(tx, orderpool_sender.clone(), pool.clone()).await;
}
});

Ok(())
}

// Currently we only need two timings config, depending on whether rbuilder is being
// used in the optimism context. If further customisation is required in the future
// this should be improved on.
Expand Down Expand Up @@ -330,3 +373,31 @@ where
}
Err(eyre::eyre!("Block header not found"))
}

/// Attempts to forward a [`TransactionSignedEcRecovered`] to an orderpool.
///
/// Helper for [`LiveBuilder::connect_to_transaction_pool`].
///
/// Errors are handled internally with a log.
async fn try_send_to_orderpool<V, T, S>(
tx: TransactionSignedEcRecovered,
orderpool_sender: mpsc::Sender<ReplaceableOrderPoolCommand>,
pool: Pool<V, T, S>,
) where
V: TransactionValidator<Transaction = EthPooledTransaction> + 'static,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
match TransactionSignedEcRecoveredWithBlobs::try_from_tx_without_blobs_and_pool(tx, pool) {
Ok(tx) => {
let order = Order::Tx(MempoolTx::new(tx));
let command = ReplaceableOrderPoolCommand::Order(order);
if let Err(e) = orderpool_sender.send(command).await {
error!("Error sending order to orderpool: {:#}", e);
}
}
Err(e) => {
error!("Error creating order from transaction: {:#}", e);
}
}
}
2 changes: 2 additions & 0 deletions crates/reth-rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ reth.workspace = true
reth-node-builder.workspace = true
reth-node-ethereum.workspace = true
reth-provider.workspace = true
reth-transaction-pool.workspace = true
reth-cli-util.workspace = true
reth-db-api.workspace = true
alloy-rlp.workspace = true

tokio.workspace = true
clap.workspace = true
Expand Down
17 changes: 11 additions & 6 deletions crates/reth-rbuilder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use reth_provider::{
providers::{BlockchainProvider, BlockchainProvider2},
BlockReader, DatabaseProviderFactory, HeaderProvider,
};
use reth_transaction_pool::{blobstore::DiskFileBlobStore, EthTransactionPool};
use std::{path::PathBuf, process};
use tokio::task;
use tracing::{error, info, warn};
Expand Down Expand Up @@ -82,8 +83,8 @@ fn main() {
.with_types_and_provider::<EthereumNode, BlockchainProvider2<_>>()
.with_components(EthereumNode::components())
.with_add_ons(EthereumAddOns::default())
.on_rpc_started(move |ctx, _| {
spawn_rbuilder(ctx.provider().clone(), extra_args.rbuilder_config);
.on_node_started(move |node| {
spawn_rbuilder(node.provider().clone(), node.pool().clone(), extra_args.rbuilder_config);
Ok(())
})
.launch_with_fn(|builder| {
Expand All @@ -103,8 +104,8 @@ fn main() {
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(EthereumNode::components())
.with_add_ons::<EthereumAddOns<_>>(Default::default())
.on_rpc_started(move |ctx, _| {
spawn_rbuilder(ctx.provider().clone(), extra_args.rbuilder_config);
.on_node_started(move |node| {
spawn_rbuilder(node.provider().clone(), node.pool().clone(), extra_args.rbuilder_config);
Ok(())
})
.launch().await?;
Expand All @@ -121,8 +122,11 @@ fn main() {
/// Spawns a tokio rbuilder task.
///
/// Takes down the entire process if the rbuilder errors or stops.
fn spawn_rbuilder<P>(provider: P, config_path: PathBuf)
where
fn spawn_rbuilder<P>(
provider: P,
pool: EthTransactionPool<P, DiskFileBlobStore>,
config_path: PathBuf,
) where
P: DatabaseProviderFactory<Provider: BlockReader>
+ reth_provider::StateProviderFactory
+ HeaderProvider
Expand Down Expand Up @@ -156,6 +160,7 @@ where
Default::default(),
)
.await?;
builder.connect_to_transaction_pool(pool).await?;
builder.run().await?;

Ok::<(), eyre::Error>(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ rbuilder = { path = "../../../rbuilder" }
alloy-primitives.workspace = true
alloy-rpc-types-beacon.workspace = true
reth-primitives = { workspace = true }
reth-provider = {workspace = true}
reth-transaction-pool = { workspace = true }
reth-provider = { workspace = true }

derive_more = { workspace = true }
eyre = { workspace = true }
tokio = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ use rbuilder::{
};
use reth_primitives::TransactionSigned;
use reth_provider::{BlockReader, DatabaseProviderFactory, HeaderProvider};
use reth_transaction_pool::{
BlobStore, EthPooledTransaction, Pool, TransactionOrdering, TransactionValidator,
};
use tokio::{
sync::{
mpsc::{self, error::SendError},
Expand Down Expand Up @@ -88,13 +91,20 @@ impl SlotSource for OurSlotSource {
}

impl BundlePoolOps {
pub async fn new<P>(provider: P, config: Config) -> Result<Self, Error>
pub async fn new<P, V, T, S>(
provider: P,
pool: Pool<V, T, S>,
config: Config,
) -> Result<Self, Error>
where
P: DatabaseProviderFactory<Provider: BlockReader>
+ reth_provider::StateProviderFactory
+ HeaderProvider
+ Clone
+ 'static,
V: TransactionValidator<Transaction = EthPooledTransaction> + 'static,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
// Create the payload source to trigger new block building
let cancellation_token = CancellationToken::new();
Expand All @@ -108,7 +118,6 @@ impl BundlePoolOps {
block_building_helper_tx,
};

// Spawn the builder!
let builder_strategy = BuilderConfig {
name: "mp-ordering".to_string(),
builder: SpecificBuilderConfig::OrderingBuilder(OrderingBuilderConfig {
Expand Down Expand Up @@ -162,6 +171,10 @@ impl BundlePoolOps {
.await
.expect("Failed to start full telemetry server");

builder
.connect_to_transaction_pool(pool)
.await
.expect("Failed to connect to reth pool");
builder.run().await.unwrap();

Ok::<(), ()>
Expand Down
18 changes: 6 additions & 12 deletions crates/transaction-pool-bundle-ext/src/bundle_supported_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use reth_primitives::PooledTransactionsElement;
use reth_transaction_pool::{
AllPoolTransactions, AllTransactionsEvents, BestTransactions, BestTransactionsAttributes,
BlobStore, BlobStoreError, BlockInfo, CanonicalStateUpdate, EthPoolTransaction,
GetPooledTransactionLimit, NewBlobSidecar, NewTransactionEvent, Pool, PoolConfig, PoolResult,
PoolSize, PropagatedTransactions, TransactionEvents, TransactionListenerKind,
TransactionOrdering, TransactionOrigin, TransactionPool,
TransactionPoolExt as TransactionPoolBlockInfoExt, TransactionValidator, ValidPoolTransaction,
GetPooledTransactionLimit, NewBlobSidecar, NewTransactionEvent, Pool, PoolResult, PoolSize,
PropagatedTransactions, TransactionEvents, TransactionListenerKind, TransactionOrdering,
TransactionOrigin, TransactionPool, TransactionPoolExt as TransactionPoolBlockInfoExt,
TransactionValidator, ValidPoolTransaction,
};
use std::{collections::HashSet, future::Future, sync::Arc};
use tokio::sync::mpsc::Receiver;
Expand Down Expand Up @@ -67,15 +67,9 @@ where
S: BlobStore,
B: BundlePoolOperations,
{
pub fn new(
validator: V,
ordering: T,
blob_store: S,
bundle_ops: B,
tx_pool_config: PoolConfig,
) -> Self {
pub fn new(pool: Pool<V, T, S>, bundle_ops: B) -> Self {
Self {
tx_pool: Pool::<V, T, S>::new(validator, ordering, blob_store, tx_pool_config),
tx_pool: pool,
bundle_pool: Arc::new(BundlePool::<B>::new(bundle_ops)),
}
}
Expand Down

0 comments on commit e0ba509

Please sign in to comment.