Skip to content

Commit

Permalink
feat: add mempool filters and builder affinity
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Feb 1, 2025
1 parent b5e0311 commit ce42feb
Show file tree
Hide file tree
Showing 20 changed files with 244 additions and 112 deletions.
8 changes: 5 additions & 3 deletions bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ pub(crate) struct BuilderConfig {
count: u64,
// Submitter proxy to use for builders
proxy: Option<Address>,
// Optional filter to apply to the builders
filter_id: Option<String>,
}

impl EntryPointBuilderConfigs {
Expand All @@ -471,14 +473,13 @@ impl EntryPointBuilderConfigs {

impl EntryPointBuilderConfig {
pub fn builders(&self) -> Vec<BuilderSettings> {
let mut index = self.index_offset;
let mut builders = vec![];
for builder in &self.builders {
builders.extend((0..builder.count).map(|i| BuilderSettings {
index: index + i,
index: self.index_offset + i,
submitter_proxy: builder.proxy,
filter_id: builder.filter_id.clone(),
}));
index += builder.count;
}
builders
}
Expand All @@ -489,6 +490,7 @@ fn builder_settings_from_cli(index_offset: u64, count: u64) -> Vec<BuilderSettin
.map(|i| BuilderSettings {
index: index_offset + i,
submitter_proxy: None,
filter_id: None,
})
.collect()
}
Expand Down
10 changes: 10 additions & 0 deletions bin/rundler/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,16 @@ async fn load_configs(

tracing::info!("Mempool configs: {:?}", mempool_configs);

// For now only allow one mempool defined per entry point
let mut entry_points = vec![];
for mempool_config in mempool_configs.0.values() {
let ep = mempool_config.entry_point();
if entry_points.contains(&ep) {
bail!("multiple mempool configs defined for entry point {:?}", ep);
}
entry_points.push(ep);
}

Some(mempool_configs)
} else {
None
Expand Down
44 changes: 27 additions & 17 deletions crates/builder/src/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,14 @@ pub(crate) enum BundleProposerError {

pub(crate) struct BundleProposerImpl<EP, BP> {
builder_index: u64,
builder_tag: String,
settings: Settings,
ep_providers: EP,
bundle_providers: BP,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
condition_not_met_notified: bool,
metric: BuilderProposerMetric,
filter_id: Option<String>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -343,19 +345,23 @@ where
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
builder_index: u64,
builder_tag: String,
ep_providers: EP,
bundle_providers: BP,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
filter_id: Option<String>,
) -> Self {
Self {
builder_index,
builder_tag,
ep_providers,
bundle_providers,
settings,
event_sender,
condition_not_met_notified: false,
metric: BuilderProposerMetric::default(),
filter_id,
}
}

Expand All @@ -380,7 +386,7 @@ where
|| op.uo.max_priority_fee_per_gas() < required_op_fees.max_priority_fee_per_gas
{
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op.uo),
SkipReason::InsufficientFees {
required_fees: required_op_fees,
Expand Down Expand Up @@ -433,7 +439,7 @@ where
"Failed to calculate required pre-verification gas for op: {e:?}, skipping"
);
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
op_hash,
SkipReason::Other {
reason: Arc::new(format!(
Expand All @@ -454,7 +460,7 @@ where

if op.uo.pre_verification_gas() < required_pvg {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
op_hash,
SkipReason::InsufficientPreVerificationGas {
base_fee,
Expand Down Expand Up @@ -506,7 +512,7 @@ where
entity_infos: _,
} => {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
op_hash,
SkipReason::Other {
reason: Arc::new(format!("Failed to simulate op: {error:?}, skipping")),
Expand Down Expand Up @@ -541,7 +547,7 @@ where
Ok(simulation) => simulation,
Err(error) => {
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
OpRejectionReason::FailedRevalidation {
error: error.clone(),
Expand All @@ -566,7 +572,7 @@ where
.contains(Timestamp::now(), TIME_RANGE_BUFFER)
{
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::InvalidTimeRange {
valid_range: simulation.valid_time_range,
Expand All @@ -583,7 +589,7 @@ where
>= self.settings.chain_spec.max_transaction_size_bytes
{
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::TransactionSizeLimit,
));
Expand All @@ -595,7 +601,7 @@ where
gas_spent + op.computation_gas_limit(&self.settings.chain_spec, None);
if required_gas > self.settings.max_bundle_gas {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::GasLimit,
));
Expand All @@ -606,14 +612,14 @@ where
let mut new_expected_storage = context.expected_storage.clone();
if let Err(e) = new_expected_storage.merge(&simulation.expected_storage) {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::ExpectedStorageConflict(e.to_string()),
));
continue;
} else if new_expected_storage.num_slots() > self.settings.max_expected_storage_slots {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::ExpectedStorageLimit,
));
Expand All @@ -630,7 +636,7 @@ where
// batch, but don't reject them (remove them from pool).
info!("Excluding op from {:?} because it accessed the address of another sender in the bundle.", op.sender());
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::AccessedOtherSender { other_sender },
));
Expand Down Expand Up @@ -697,7 +703,7 @@ where

for (index, reason) in to_reject {
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&context.get_op_at(index)?.op),
OpRejectionReason::ConditionNotMet(reason),
));
Expand Down Expand Up @@ -839,7 +845,7 @@ where
HandleOpsOut::Success => Ok(Some(gas_limit)),
HandleOpsOut::FailedOp(index, message) => {
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&context.get_op_at(index)?.op),
OpRejectionReason::FailedInBundle {
message: Arc::new(message.clone()),
Expand Down Expand Up @@ -876,6 +882,7 @@ where
*self.ep_providers.entry_point().address(),
self.settings.max_bundle_size,
self.builder_index,
self.filter_id.clone(),
)
.await
.context("should get ops from pool")?
Expand Down Expand Up @@ -1041,7 +1048,7 @@ where
// iterate in reverse so that we can remove ops without affecting the index of the next op to remove
for index in to_remove.into_iter().rev() {
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&context.get_op_at(index)?.op),
OpRejectionReason::FailedInBundle {
message: Arc::new("post op reverted leading to entry point revert".to_owned()),
Expand Down Expand Up @@ -1143,7 +1150,7 @@ where
.is_none()
{
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op.uo),
SkipReason::UnsupportedAggregator(agg),
));
Expand All @@ -1156,7 +1163,7 @@ where
let gas = op.uo.computation_gas_limit(&self.settings.chain_spec, None);
if gas_left < gas {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op.uo),
SkipReason::GasLimit,
));
Expand Down Expand Up @@ -2882,13 +2889,14 @@ mod tests {
entity_infos: EntityInfos::default(),
aggregator: None,
da_gas_data: Default::default(),
filter_id: None,
})
.collect();

let mut pool_client = MockPool::new();
pool_client
.expect_get_ops()
.returning(move |_, _, _| Ok(ops.clone()));
.returning(move |_, _, _, _| Ok(ops.clone()));

let simulations_by_op: HashMap<_, _> = mock_ops
.into_iter()
Expand Down Expand Up @@ -2994,6 +3002,7 @@ mod tests {

let mut proposer = BundleProposerImpl::new(
0,
"test".to_string(),
ProvidersWithEntryPoint::new(
Arc::new(provider),
Arc::new(entry_point),
Expand All @@ -3012,6 +3021,7 @@ mod tests {
max_expected_storage_slots: MAX_EXPECTED_STORAGE_SLOTS,
},
event_sender,
None,
);

if notify_condition_not_met {
Expand Down
22 changes: 11 additions & 11 deletions crates/builder/src/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub(crate) struct Settings {

#[derive(Debug)]
pub(crate) struct BundleSenderImpl<UO, P, E, T, C> {
builder_index: u64,
builder_tag: String,
bundle_action_receiver: Option<mpsc::Receiver<BundleSenderAction>>,
chain_spec: ChainSpec,
sender_eoa: Address,
Expand Down Expand Up @@ -143,7 +143,7 @@ where
/// Loops forever, attempting to form and send a bundle on each new block,
/// then waiting for one bundle to be mined or dropped before forming the
/// next one.
#[instrument(skip_all, fields(entry_point = self.entry_point.address().to_string(), builder_index = self.builder_index))]
#[instrument(skip_all, fields(entry_point = self.entry_point.address().to_string(), tag = self.builder_tag))]
async fn send_bundles_in_loop<TS: TaskSpawner>(mut self, task_spawner: TS) {
// trigger for sending bundles
let sender_trigger = BundleSenderTrigger::new(
Expand Down Expand Up @@ -179,7 +179,7 @@ where
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
builder_index: u64,
builder_tag: String,
bundle_action_receiver: mpsc::Receiver<BundleSenderAction>,
chain_spec: ChainSpec,
sender_eoa: Address,
Expand All @@ -192,7 +192,7 @@ where
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Self {
builder_index,
builder_tag: builder_tag.clone(),
bundle_action_receiver: Some(bundle_action_receiver),
chain_spec,
sender_eoa,
Expand All @@ -204,7 +204,7 @@ where
event_sender,
metrics: BuilderMetric::new_with_labels(&[
("entry_point", entry_point.address().to_string()),
("builder_index", builder_index.to_string()),
("builder_tag", builder_tag),
]),
entry_point,
_uo_type: PhantomData,
Expand Down Expand Up @@ -371,7 +371,7 @@ where
}

self.emit(BuilderEvent::transaction_mined(
self.builder_index,
self.builder_tag.clone(),
tx_hash,
nonce,
block_number,
Expand All @@ -381,7 +381,7 @@ where
TrackerUpdate::LatestTxDropped { nonce } => {
info!("Latest transaction dropped, starting new bundle attempt");
self.emit(BuilderEvent::latest_transaction_dropped(
self.builder_index,
self.builder_tag.clone(),
nonce,
));
self.metrics.bundle_txns_dropped.increment(1);
Expand All @@ -391,7 +391,7 @@ where
TrackerUpdate::NonceUsedForOtherTx { nonce } => {
info!("Nonce used externally, starting new bundle attempt");
self.emit(BuilderEvent::nonce_used_for_other_transaction(
self.builder_index,
self.builder_tag.clone(),
nonce,
));
self.metrics.bundle_txns_nonce_used.increment(1);
Expand Down Expand Up @@ -578,7 +578,7 @@ where

let Some(bundle_tx) = self.get_bundle_tx(nonce, bundle).await? else {
self.emit(BuilderEvent::formed_bundle(
self.builder_index,
self.builder_tag.clone(),
None,
nonce,
fee_increase_count,
Expand All @@ -602,7 +602,7 @@ where
match send_result {
Ok(tx_hash) => {
self.emit(BuilderEvent::formed_bundle(
self.builder_index,
self.builder_tag.clone(),
Some(BundleTxDetails {
tx_hash,
tx,
Expand Down Expand Up @@ -1791,7 +1791,7 @@ mod tests {
MockPool,
> {
BundleSenderImpl::new(
0,
"any:0".to_string(),
mpsc::channel(1000).1,
ChainSpec::default(),
Address::default(),
Expand Down
Loading

0 comments on commit ce42feb

Please sign in to comment.