diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index ef21f144a38a..1a366aa1a9a1 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -68,7 +68,7 @@ impl> Command { provider_factory: ProviderFactory, task_executor: &TaskExecutor, static_file_producer: StaticFileProducer>, - ) -> eyre::Result> + ) -> eyre::Result> where N: ProviderNodeTypes + CliNodeTypes, Client: EthBlockClient + 'static, diff --git a/crates/cli/commands/src/import.rs b/crates/cli/commands/src/import.rs index 1f297ad33bdd..8afd1feb8ce0 100644 --- a/crates/cli/commands/src/import.rs +++ b/crates/cli/commands/src/import.rs @@ -166,7 +166,7 @@ pub fn build_import_pipeline( static_file_producer: StaticFileProducer>, disable_exec: bool, executor: E, -) -> eyre::Result<(Pipeline, impl Stream>)> +) -> eyre::Result<(Pipeline, impl Stream>)> where N: ProviderNodeTypes + CliNodeTypes, C: Consensus, Error = ConsensusError> + 'static, diff --git a/crates/cli/commands/src/stage/unwind.rs b/crates/cli/commands/src/stage/unwind.rs index 7171a45bb23c..5aa7670dd32e 100644 --- a/crates/cli/commands/src/stage/unwind.rs +++ b/crates/cli/commands/src/stage/unwind.rs @@ -108,7 +108,7 @@ impl> Command self, config: Config, provider_factory: ProviderFactory, - ) -> Result, eyre::Error> { + ) -> Result, eyre::Error> { let stage_conf = &config.stages; let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default(); diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index 81c5ca5ad79a..38ac2adea591 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -79,7 +79,7 @@ where chain_spec: Arc, client: Client, incoming_requests: EngineMessageStream, - pipeline: Pipeline, + pipeline: Pipeline, pipeline_task_spawner: Box, provider: ProviderFactory, blockchain_db: BlockchainProvider, diff --git a/crates/engine/tree/src/backfill.rs b/crates/engine/tree/src/backfill.rs index 370ca7e52ec5..43fb525b8a2f 100644 --- a/crates/engine/tree/src/backfill.rs +++ b/crates/engine/tree/src/backfill.rs @@ -8,6 +8,7 @@ //! These modes are mutually exclusive and the node can only be in one mode at a time. use futures::FutureExt; +use reth_errors::{BlockExecError, BlockExecutionError}; use reth_provider::providers::ProviderNodeTypes; use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult}; use reth_tasks::TaskSpawner; @@ -64,13 +65,16 @@ pub enum BackfillAction { /// The events that can be emitted on backfill sync. #[derive(Debug)] -pub enum BackfillEvent { +pub enum BackfillEvent +where + E: BlockExecError, +{ /// Backfill sync started. Started(PipelineTarget), /// Backfill sync finished. /// /// If this is returned, backfill sync is idle. - Finished(Result), + Finished(Result>), /// Sync task was dropped after it was started, unable to receive it because /// channel closed. This would indicate a panicked task. TaskDropped(String), @@ -90,7 +94,10 @@ pub struct PipelineSync { impl PipelineSync { /// Create a new instance. - pub fn new(pipeline: Pipeline, pipeline_task_spawner: Box) -> Self { + pub fn new( + pipeline: Pipeline, + pipeline_task_spawner: Box, + ) -> Self { Self { pipeline_task_spawner, pipeline_state: PipelineState::Idle(Some(pipeline)), @@ -212,11 +219,11 @@ impl BackfillSync for PipelineSync { /// blockchain tree any messages that would result in database writes, since it would result in a /// deadlock. #[derive(Debug)] -enum PipelineState { +enum PipelineState { /// Pipeline is idle. - Idle(Option>), + Idle(Option>), /// Pipeline is running and waiting for a response - Running(oneshot::Receiver>), + Running(oneshot::Receiver>), } impl PipelineState { diff --git a/crates/engine/tree/src/test_utils.rs b/crates/engine/tree/src/test_utils.rs index de6485ce9c08..d7be8a9249b1 100644 --- a/crates/engine/tree/src/test_utils.rs +++ b/crates/engine/tree/src/test_utils.rs @@ -1,5 +1,6 @@ use alloy_primitives::B256; use reth_chainspec::ChainSpec; +use reth_errors::BlockExecutionError; use reth_network_p2p::test_utils::TestFullBlockClient; use reth_primitives::{BlockBody, SealedHeader}; use reth_provider::{ @@ -43,12 +44,15 @@ impl TestPipelineBuilder { } /// Builds the pipeline. - pub fn build(self, chain_spec: Arc) -> Pipeline { + pub fn build( + self, + chain_spec: Arc, + ) -> Pipeline { reth_tracing::init_test_tracing(); // Setup pipeline let (tip_tx, _tip_rx) = watch::channel(B256::default()); - let pipeline = Pipeline::::builder() + let pipeline = Pipeline::::builder() .add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default())) .with_tip_sender(tip_tx); diff --git a/crates/errors/src/lib.rs b/crates/errors/src/lib.rs index fc464eb98cbd..cce3756b8bd9 100644 --- a/crates/errors/src/lib.rs +++ b/crates/errors/src/lib.rs @@ -16,7 +16,7 @@ mod error; pub use error::{RethError, RethResult}; pub use reth_consensus::ConsensusError; -pub use reth_execution_errors::{BlockExecutionError, BlockValidationError}; +pub use reth_execution_errors::{BlockExecError, BlockExecutionError, BlockValidationError}; pub use reth_storage_errors::{ db::DatabaseError, provider::{ProviderError, ProviderResult}, diff --git a/crates/ethereum/evm/src/execute.rs b/crates/ethereum/evm/src/execute.rs index 8bc3272272b2..5254a6e94353 100644 --- a/crates/ethereum/evm/src/execute.rs +++ b/crates/ethereum/evm/src/execute.rs @@ -69,6 +69,8 @@ where Transaction = reth_primitives::TransactionSigned, >, { + type Error = BlockExecutionError; + type Primitives = EthPrimitives; type Strategy + Display>> = diff --git a/crates/evm/execution-errors/src/lib.rs b/crates/evm/execution-errors/src/lib.rs index cc723fa110ff..7ac2b5378a07 100644 --- a/crates/evm/execution-errors/src/lib.rs +++ b/crates/evm/execution-errors/src/lib.rs @@ -17,6 +17,7 @@ use alloc::{ }; use alloy_eips::BlockNumHash; use alloy_primitives::B256; +use core::fmt::Display; use reth_consensus::ConsensusError; use reth_prune_types::PruneSegmentError; use reth_storage_errors::provider::ProviderError; @@ -134,6 +135,21 @@ pub enum BlockExecutionError { Internal(#[from] InternalBlockExecutionError), } +/// Generic block execution error. +pub trait BlockExecError: + Display + Sync + Send + std::fmt::Debug + 'static + From + From +{ + /// Returns `true` if the error is a state root error. + fn is_state_root_error(&self) -> bool; +} + +impl BlockExecError for BlockExecutionError { + /// Returns `true` if the error is a state root error. + fn is_state_root_error(&self) -> bool { + matches!(self, Self::Validation(BlockValidationError::StateRoot(_))) + } +} + impl BlockExecutionError { /// Create a new [`BlockExecutionError::Internal`] variant, containing a /// [`InternalBlockExecutionError::Other`] error. @@ -157,11 +173,6 @@ impl BlockExecutionError { _ => None, } } - - /// Returns `true` if the error is a state root error. - pub const fn is_state_root_error(&self) -> bool { - matches!(self, Self::Validation(BlockValidationError::StateRoot(_))) - } } impl From for BlockExecutionError { diff --git a/crates/evm/src/either.rs b/crates/evm/src/either.rs index 4faeb1a72030..7a59b7f8a620 100644 --- a/crates/evm/src/either.rs +++ b/crates/evm/src/either.rs @@ -19,10 +19,12 @@ use revm::State; impl BlockExecutorProvider for Either where A: BlockExecutorProvider, - B: BlockExecutorProvider, + B: BlockExecutorProvider, { type Primitives = A::Primitives; + type Error = A::Error; + type Executor + Display>> = Either, B::Executor>; diff --git a/crates/evm/src/execute.rs b/crates/evm/src/execute.rs index 2b221f14564a..5f5b02dcc02f 100644 --- a/crates/evm/src/execute.rs +++ b/crates/evm/src/execute.rs @@ -1,6 +1,7 @@ //! Traits for execution. use alloy_consensus::BlockHeader; +use reth_execution_errors::BlockExecError; // Re-export execution types pub use reth_execution_errors::{ BlockExecutionError, BlockValidationError, InternalBlockExecutionError, @@ -80,7 +81,7 @@ pub trait BatchExecutor { /// The output type for the executor. type Output; /// The error type returned by the executor. - type Error; + type Error: BlockExecError; /// Executes the next block in the batch, verifies the output and updates the state internally. fn execute_and_verify_one(&mut self, input: Self::Input<'_>) -> Result<(), Self::Error>; @@ -137,6 +138,9 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { /// Receipt type. type Primitives: NodePrimitives; + /// The error type returned by the executor. + type Error: BlockExecError; + /// An executor that can execute a single block given a database. /// /// # Verification @@ -152,7 +156,7 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { DB, Input<'a> = &'a RecoveredBlock<::Block>, Output = BlockExecutionOutput<::Receipt>, - Error = BlockExecutionError, + Error = Self::Error, >; /// An executor that can execute a batch of blocks given a database. @@ -160,7 +164,7 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { DB, Input<'a> = &'a RecoveredBlock<::Block>, Output = ExecutionOutcome<::Receipt>, - Error = BlockExecutionError, + Error = Self::Error, >; /// Creates a new executor for single block execution. @@ -197,7 +201,7 @@ pub trait BlockExecutionStrategy { type Primitives: NodePrimitives; /// The error type returned by this strategy's methods. - type Error: From + core::error::Error; + type Error: BlockExecError; /// Initialize the strategy with the given transaction environment overrides. fn init(&mut self, _tx_env_overrides: Box) {} @@ -249,6 +253,9 @@ pub trait BlockExecutionStrategy { /// A strategy factory that can create block execution strategies. pub trait BlockExecutionStrategyFactory: Send + Sync + Clone + Unpin + 'static { + /// The error type returned by this strategy's methods. + type Error: BlockExecError; + /// Primitive types used by the strategy. type Primitives: NodePrimitives; @@ -256,7 +263,7 @@ pub trait BlockExecutionStrategyFactory: Send + Sync + Clone + Unpin + 'static { type Strategy + Display>>: BlockExecutionStrategy< DB = DB, Primitives = Self::Primitives, - Error = BlockExecutionError, + Error = Self::Error, >; /// Creates a strategy using the give database. @@ -291,6 +298,8 @@ impl BlockExecutorProvider for BasicBlockExecutorProvider where F: BlockExecutionStrategyFactory, { + type Error = F::Error; + type Primitives = F::Primitives; type Executor + Display>> = @@ -421,12 +430,12 @@ where impl BatchExecutor for BasicBatchExecutor where - S: BlockExecutionStrategy, + S: BlockExecutionStrategy, DB: Database + Display>, { type Input<'a> = &'a RecoveredBlock<::Block>; type Output = ExecutionOutcome<::Receipt>; - type Error = BlockExecutionError; + type Error = S::Error; fn execute_and_verify_one(&mut self, block: Self::Input<'_>) -> Result<(), Self::Error> { if self.batch_record.first_block().is_none() { @@ -525,6 +534,7 @@ mod tests { struct TestExecutorProvider; impl BlockExecutorProvider for TestExecutorProvider { + type Error = BlockExecutionError; type Primitives = EthPrimitives; type Executor + Display>> = TestExecutor; type BatchExecutor + Display>> = TestExecutor; @@ -623,6 +633,7 @@ mod tests { } impl BlockExecutionStrategyFactory for TestExecutorStrategyFactory { + type Error = BlockExecutionError; type Primitives = EthPrimitives; type Strategy + Display>> = TestExecutorStrategy; diff --git a/crates/evm/src/metrics.rs b/crates/evm/src/metrics.rs index 4787bf9ce5f7..94ef44d9b688 100644 --- a/crates/evm/src/metrics.rs +++ b/crates/evm/src/metrics.rs @@ -145,6 +145,7 @@ mod tests { use super::*; use alloy_eips::eip7685::Requests; use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter}; + use reth_execution_errors::BlockExecutionError; use revm::db::BundleState; use revm_primitives::{ Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot, B256, U256, @@ -162,7 +163,7 @@ mod tests { where Self: 'a; type Output = BlockExecutionOutput<()>; - type Error = std::convert::Infallible; + type Error = BlockExecutionError; fn execute(self, _input: Self::Input<'_>) -> Result { Ok(BlockExecutionOutput { diff --git a/crates/evm/src/noop.rs b/crates/evm/src/noop.rs index 27e2e9623929..aaca6f20aaf6 100644 --- a/crates/evm/src/noop.rs +++ b/crates/evm/src/noop.rs @@ -25,6 +25,8 @@ pub struct NoopBlockExecutorProvider

(core::marker::PhantomData

); impl BlockExecutorProvider for NoopBlockExecutorProvider

{ type Primitives = P; + type Error = BlockExecutionError; + type Executor + Display>> = Self; type BatchExecutor + Display>> = Self; diff --git a/crates/evm/src/test_utils.rs b/crates/evm/src/test_utils.rs index 2eaf7fdc5aa1..001ee32eb314 100644 --- a/crates/evm/src/test_utils.rs +++ b/crates/evm/src/test_utils.rs @@ -35,6 +35,8 @@ impl MockExecutorProvider { impl BlockExecutorProvider for MockExecutorProvider { type Primitives = EthPrimitives; + type Error = BlockExecutionError; + type Executor + Display>> = Self; type BatchExecutor + Display>> = Self; diff --git a/crates/exex/exex/src/backfill/job.rs b/crates/exex/exex/src/backfill/job.rs index 4f1ac8e97aa4..6ef0e270ae05 100644 --- a/crates/exex/exex/src/backfill/job.rs +++ b/crates/exex/exex/src/backfill/job.rs @@ -39,7 +39,10 @@ pub struct BackfillJob { impl Iterator for BackfillJob where - E: BlockExecutorProvider>, + E: BlockExecutorProvider< + Primitives: NodePrimitives, + Error = BlockExecutionError, + >, P: HeaderProvider + BlockReader + StateProviderFactory, { type Item = BackfillJobResult>; @@ -55,7 +58,10 @@ where impl BackfillJob where - E: BlockExecutorProvider>, + E: BlockExecutorProvider< + Primitives: NodePrimitives, + Error = BlockExecutionError, + >, P: BlockReader + HeaderProvider + StateProviderFactory, { /// Converts the backfill job into a single block backfill job. @@ -161,7 +167,10 @@ pub struct SingleBlockBackfillJob { impl Iterator for SingleBlockBackfillJob where - E: BlockExecutorProvider>, + E: BlockExecutorProvider< + Primitives: NodePrimitives, + Error = BlockExecutionError, + >, P: HeaderProvider + BlockReader + StateProviderFactory, { type Item = BackfillJobResult<( @@ -176,7 +185,10 @@ where impl SingleBlockBackfillJob where - E: BlockExecutorProvider>, + E: BlockExecutorProvider< + Primitives: NodePrimitives, + Error = BlockExecutionError, + >, P: HeaderProvider + BlockReader + StateProviderFactory, { /// Converts the single block backfill job into a stream. diff --git a/crates/exex/exex/src/backfill/stream.rs b/crates/exex/exex/src/backfill/stream.rs index 30b28b5c66dc..d352dffc0e90 100644 --- a/crates/exex/exex/src/backfill/stream.rs +++ b/crates/exex/exex/src/backfill/stream.rs @@ -114,7 +114,11 @@ where impl Stream for StreamBackfillJob> where - E: BlockExecutorProvider> + Clone + 'static, + E: BlockExecutorProvider< + Primitives: NodePrimitives, + Error = BlockExecutionError, + > + Clone + + 'static, P: BlockReader + StateProviderFactory + Clone + Unpin + 'static, { type Item = BackfillJobResult>; @@ -147,7 +151,11 @@ where impl Stream for StreamBackfillJob> where - E: BlockExecutorProvider> + Clone + 'static, + E: BlockExecutorProvider< + Primitives: NodePrimitives, + Error = BlockExecutionError, + > + Clone + + 'static, P: BlockReader + StateProviderFactory + Clone + Unpin + 'static, { type Item = BackfillJobResult>; diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index f9f5dfc914e8..d084f5e68507 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -2,7 +2,7 @@ use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle}; use alloy_consensus::BlockHeader; use futures::{Stream, StreamExt}; use reth_chainspec::Head; -use reth_evm::execute::BlockExecutorProvider; +use reth_evm::execute::{BlockExecutionError, BlockExecutorProvider}; use reth_exex_types::ExExHead; use reth_node_api::NodePrimitives; use reth_primitives::EthPrimitives; @@ -105,8 +105,10 @@ where impl ExExNotificationsStream for ExExNotifications where P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider> - + Clone + E: BlockExecutorProvider< + Primitives: NodePrimitives, + Error = BlockExecutionError, + > + Clone + Unpin + 'static, { @@ -157,8 +159,10 @@ where impl Stream for ExExNotifications where P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider> - + Clone + E: BlockExecutorProvider< + Primitives: NodePrimitives, + Error = BlockExecutionError, + > + Clone + Unpin + 'static, { @@ -301,8 +305,10 @@ where impl ExExNotificationsWithHead where P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider> - + Clone + E: BlockExecutorProvider< + Primitives: NodePrimitives, + Error = BlockExecutionError, + > + Clone + Unpin + 'static, { @@ -381,8 +387,10 @@ where impl Stream for ExExNotificationsWithHead where P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider> - + Clone + E: BlockExecutorProvider< + Primitives: NodePrimitives, + Error = BlockExecutionError, + > + Clone + Unpin + 'static, { diff --git a/crates/node/api/src/node.rs b/crates/node/api/src/node.rs index 498297c2db8b..74e5f861b319 100644 --- a/crates/node/api/src/node.rs +++ b/crates/node/api/src/node.rs @@ -8,7 +8,7 @@ use reth_db_api::{ Database, }; use reth_engine_primitives::BeaconConsensusEngineHandle; -use reth_evm::execute::BlockExecutorProvider; +use reth_evm::execute::{BlockExecutionError, BlockExecutorProvider}; use reth_network_api::FullNetwork; use reth_node_core::node_config::NodeConfig; use reth_node_types::{HeaderTy, NodeTypes, NodeTypesWithDBAdapter, NodeTypesWithEngine, TxTy}; @@ -55,7 +55,10 @@ pub trait FullNodeComponents: FullNodeTypes + Clone + 'static { type Evm: ConfigureEvm

, Transaction = TxTy>; /// The type that knows how to execute blocks. - type Executor: BlockExecutorProvider::Primitives>; + type Executor: BlockExecutorProvider< + Primitives = ::Primitives, + Error = BlockExecutionError, + >; /// The consensus type of the node. type Consensus: FullConsensus<::Primitives, Error = ConsensusError> diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index 00cc9d58f96a..6a9707c4c5b1 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -37,7 +37,7 @@ pub fn build_networked_pipeline( static_file_producer: StaticFileProducer>, executor: Executor, exex_manager_handle: ExExManagerHandle, -) -> eyre::Result> +) -> eyre::Result> where N: ProviderNodeTypes, Client: BlockClient> + 'static, @@ -83,7 +83,7 @@ pub fn build_pipeline( static_file_producer: StaticFileProducer>, executor: Executor, exex_manager_handle: ExExManagerHandle, -) -> eyre::Result> +) -> eyre::Result> where N: ProviderNodeTypes, H: HeaderDownloader
> + 'static, diff --git a/crates/optimism/evm/src/error.rs b/crates/optimism/evm/src/error.rs index 461f8c11e4fb..4082d0159dd2 100644 --- a/crates/optimism/evm/src/error.rs +++ b/crates/optimism/evm/src/error.rs @@ -1,10 +1,10 @@ //! Error types for the Optimism EVM module. use alloc::string::String; -use reth_evm::execute::BlockExecutionError; +use reth_evm::execute::{BlockExecutionError, ProviderError}; /// Optimism Block Executor Errors -#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +#[derive(Debug, thiserror::Error)] pub enum OpBlockExecutionError { /// Error when trying to parse L1 block info #[error("could not get L1 block info from L2 block: {message}")] @@ -21,10 +21,7 @@ pub enum OpBlockExecutionError { /// Thrown when a database account could not be loaded. #[error("failed to load account {_0}")] AccountLoadFailed(alloy_primitives::Address), -} - -impl From for BlockExecutionError { - fn from(err: OpBlockExecutionError) -> Self { - Self::other(err) - } + /// Thrown when a L1 block execution failed. + #[error(transparent)] + Eth(#[from] BlockExecutionError), } diff --git a/crates/optimism/evm/src/execute.rs b/crates/optimism/evm/src/execute.rs index bfac67fbb573..6dbdf5eea8b5 100644 --- a/crates/optimism/evm/src/execute.rs +++ b/crates/optimism/evm/src/execute.rs @@ -79,6 +79,7 @@ where + ConfigureEvm
, { type Primitives = N; + type Error = OpBlockExecutionError; type Strategy + Display>> = OpExecutionStrategy; @@ -153,7 +154,7 @@ where { type DB = DB; type Primitives = N; - type Error = BlockExecutionError; + type Error = OpBlockExecutionError; fn init(&mut self, tx_env_overrides: Box) { self.tx_env_overrides = Some(tx_env_overrides); @@ -170,12 +171,14 @@ where let mut evm = self.evm_config.evm_for_block(&mut self.state, block.header()); - self.system_caller.apply_beacon_root_contract_call( - block.header().timestamp, - block.header().number, - block.header().parent_beacon_block_root, - &mut evm, - )?; + self.system_caller + .apply_beacon_root_contract_call( + block.header().timestamp, + block.header().number, + block.header().parent_beacon_block_root, + &mut evm, + ) + .map_err(OpBlockExecutionError::Eth)?; // Ensure that the create2deployer is force-deployed at the canyon transition. Optimism // blocks will always have at least a single transaction in them (the L1 info transaction), @@ -205,11 +208,12 @@ where if transaction.gas_limit() > block_available_gas && (is_regolith || !transaction.is_deposit()) { - return Err(BlockValidationError::TransactionGasLimitMoreThanAvailableBlockGas { - transaction_gas_limit: transaction.gas_limit(), - block_available_gas, - } - .into()) + return Err(OpBlockExecutionError::Eth(BlockExecutionError::Validation( + BlockValidationError::TransactionGasLimitMoreThanAvailableBlockGas { + transaction_gas_limit: transaction.gas_limit(), + block_available_gas, + }, + ))); } // Cache the depositor account prior to the state transition for the deposit nonce. @@ -236,10 +240,12 @@ where let result_and_state = evm.transact().map_err(move |err| { let new_err = err.map_db_err(|e| e.into()); // Ensure hash is calculated for error log, if not already done - BlockValidationError::EVM { - hash: transaction.recalculate_hash(), - error: Box::new(new_err), - } + OpBlockExecutionError::Eth(BlockExecutionError::Validation( + BlockValidationError::EVM { + hash: transaction.recalculate_hash(), + error: Box::new(new_err), + }, + )) })?; trace!( @@ -301,11 +307,14 @@ where ) -> Result { let balance_increments = post_block_balance_increments(&self.chain_spec.clone(), block); // increment balances - self.state - .increment_balances(balance_increments.clone()) - .map_err(|_| BlockValidationError::IncrementBalanceFailed)?; + self.state.increment_balances(balance_increments.clone()).map_err(|_| { + OpBlockExecutionError::Eth(BlockExecutionError::Validation( + BlockValidationError::IncrementBalanceFailed, + )) + })?; // call state hook with changes due to balance increments. - let balance_state = balance_increment_state(&balance_increments, &mut self.state)?; + let balance_state = balance_increment_state(&balance_increments, &mut self.state) + .map_err(OpBlockExecutionError::Eth)?; self.system_caller.on_state(&balance_state); Ok(Requests::default()) diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index 59637ce81b0e..e8fde5fe382c 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -15,6 +15,7 @@ use alloy_rpc_types_trace::geth::{ use async_trait::async_trait; use jsonrpsee::core::RpcResult; use reth_chainspec::EthereumHardforks; +use reth_errors::BlockExecutionError; use reth_evm::{ env::EvmEnv, execute::{BlockExecutorProvider, Executor}, @@ -84,8 +85,10 @@ impl DebugApi { impl DebugApi where Eth: EthApiTypes + TraceExt + 'static, - BlockExecutor: - BlockExecutorProvider>>, + BlockExecutor: BlockExecutorProvider< + Primitives: NodePrimitives>, + Error = BlockExecutionError, + >, { /// Acquires a permit to execute a tracing call. async fn acquire_trace_permit(&self) -> Result { @@ -816,8 +819,10 @@ where impl DebugApiServer for DebugApi where Eth: EthApiTypes + EthTransactions + TraceExt + 'static, - BlockExecutor: - BlockExecutorProvider>>, + BlockExecutor: BlockExecutorProvider< + Primitives: NodePrimitives>, + Error = BlockExecutionError, + >, { /// Handler for `debug_getRawHeader` async fn raw_header(&self, block_id: BlockId) -> RpcResult { diff --git a/crates/rpc/rpc/src/validation.rs b/crates/rpc/rpc/src/validation.rs index c621c8b9790c..10eac69da7af 100644 --- a/crates/rpc/rpc/src/validation.rs +++ b/crates/rpc/rpc/src/validation.rs @@ -93,7 +93,7 @@ where + ChainSpecProvider + StateProviderFactory + 'static, - E: BlockExecutorProvider, + E: BlockExecutorProvider, { /// Validates the given block and a [`BidTrace`] against it. pub async fn validate_message_against_block( diff --git a/crates/stages/api/src/error.rs b/crates/stages/api/src/error.rs index b63dd20f77c1..9b8009d7e6bb 100644 --- a/crates/stages/api/src/error.rs +++ b/crates/stages/api/src/error.rs @@ -1,7 +1,7 @@ use crate::PipelineEvent; use alloy_eips::eip1898::BlockWithParent; use reth_consensus::ConsensusError; -use reth_errors::{BlockExecutionError, DatabaseError, RethError}; +use reth_errors::{BlockExecError, BlockExecutionError, DatabaseError, RethError}; use reth_network_p2p::error::DownloadError; use reth_provider::ProviderError; use reth_prune::{PruneSegment, PruneSegmentError, PrunerError}; @@ -11,18 +11,21 @@ use tokio::sync::broadcast::error::SendError; /// Represents the specific error type within a block error. #[derive(Error, Debug)] -pub enum BlockErrorKind { +pub enum BlockErrorKind { /// The block encountered a validation error. #[error("validation error: {0}")] Validation(#[from] ConsensusError), /// The block encountered an execution error. #[error("execution error: {0}")] - Execution(#[from] BlockExecutionError), + Execution(E), } -impl BlockErrorKind { +impl BlockErrorKind +where + E: BlockExecError, +{ /// Returns `true` if the error is a state root error. - pub const fn is_state_root_error(&self) -> bool { + pub fn is_state_root_error(&self) -> bool { match self { Self::Validation(err) => err.is_state_root_error(), Self::Execution(err) => err.is_state_root_error(), @@ -32,7 +35,7 @@ impl BlockErrorKind { /// A stage execution error. #[derive(Error, Debug)] -pub enum StageError { +pub enum StageError { /// The stage encountered an error related to a block. #[error("stage encountered an error in block #{number}: {error}", number = block.block.number)] Block { @@ -40,7 +43,7 @@ pub enum StageError { block: Box, /// The specific error type, either consensus or execution error. #[source] - error: BlockErrorKind, + error: BlockErrorKind, }, /// The stage encountered a downloader error where the responses cannot be attached to the /// current head. @@ -121,7 +124,10 @@ pub enum StageError { Fatal(Box), } -impl StageError { +impl StageError +where + E: BlockExecError, +{ /// If the error is fatal the pipeline will stop. pub const fn is_fatal(&self) -> bool { matches!( @@ -139,7 +145,10 @@ impl StageError { } } -impl From for StageError { +impl From for StageError +where + E: BlockExecError, +{ fn from(source: std::io::Error) -> Self { Self::Fatal(Box::new(source)) } @@ -147,10 +156,10 @@ impl From for StageError { /// A pipeline execution error. #[derive(Error, Debug)] -pub enum PipelineError { +pub enum PipelineError { /// The pipeline encountered an irrecoverable error in one of the stages. #[error(transparent)] - Stage(#[from] StageError), + Stage(#[from] StageError), /// The pipeline encountered a database error. #[error(transparent)] Database(#[from] DatabaseError), diff --git a/crates/stages/api/src/pipeline/builder.rs b/crates/stages/api/src/pipeline/builder.rs index 45bdc2d89427..0f45ddc15154 100644 --- a/crates/stages/api/src/pipeline/builder.rs +++ b/crates/stages/api/src/pipeline/builder.rs @@ -1,14 +1,18 @@ use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageId, StageSet}; use alloy_primitives::{BlockNumber, B256}; +use reth_errors::BlockExecError; use reth_provider::{providers::ProviderNodeTypes, DatabaseProviderFactory, ProviderFactory}; use reth_static_file::StaticFileProducer; use tokio::sync::watch; /// Builds a [`Pipeline`]. #[must_use = "call `build` to construct the pipeline"] -pub struct PipelineBuilder { +pub struct PipelineBuilder +where + E: BlockExecError, +{ /// All configured stages in the order they will be executed. - stages: Vec>, + stages: Vec>, /// The maximum block number to sync to. max_block: Option, /// A receiver for the current chain tip to sync to. @@ -17,11 +21,14 @@ pub struct PipelineBuilder { fail_on_unwind: bool, } -impl PipelineBuilder { +impl PipelineBuilder +where + E: BlockExecError, +{ /// Add a stage to the pipeline. pub fn add_stage(mut self, stage: S) -> Self where - S: Stage + 'static, + S: Stage + 'static, { self.stages.push(Box::new(stage)); self @@ -34,7 +41,7 @@ impl PipelineBuilder { /// To customize the stages in the set (reorder, disable, insert a stage) call /// [`builder`][StageSet::builder] on the set which will convert it to a /// [`StageSetBuilder`][crate::StageSetBuilder]. - pub fn add_stages>(mut self, set: Set) -> Self { + pub fn add_stages>(mut self, set: Set) -> Self { let states = set.builder().build(); self.stages.reserve_exact(states.len()); for stage in states { @@ -74,7 +81,7 @@ impl PipelineBuilder { self, provider_factory: ProviderFactory, static_file_producer: StaticFileProducer>, - ) -> Pipeline + ) -> Pipeline where N: ProviderNodeTypes, ProviderFactory: DatabaseProviderFactory, @@ -94,7 +101,10 @@ impl PipelineBuilder { } } -impl Default for PipelineBuilder { +impl Default for PipelineBuilder +where + E: BlockExecError, +{ fn default() -> Self { Self { stages: Vec::new(), @@ -106,7 +116,10 @@ impl Default for PipelineBuilder { } } -impl std::fmt::Debug for PipelineBuilder { +impl std::fmt::Debug for PipelineBuilder +where + E: BlockExecError, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PipelineBuilder") .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::>()) diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 2cb98d44f93d..3eb8b698e5c8 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -28,18 +28,18 @@ use crate::{ }; pub use builder::*; use progress::*; -use reth_errors::RethResult; +use reth_errors::{BlockExecError, RethResult}; pub use set::*; /// A container for a queued stage. -pub(crate) type BoxedStage = Box>; +pub(crate) type BoxedStage = Box>; /// The future that returns the owned pipeline and the result of the pipeline run. See /// [`Pipeline::run_as_fut`]. -pub type PipelineFut = Pin> + Send>>; +pub type PipelineFut = Pin> + Send>>; /// The pipeline type itself with the result of [`Pipeline::run_as_fut`] -pub type PipelineWithResult = (Pipeline, Result); +pub type PipelineWithResult = (Pipeline, Result>); #[cfg_attr(doc, aquamarine::aquamarine)] /// A staged sync pipeline. @@ -63,11 +63,11 @@ pub type PipelineWithResult = (Pipeline, Result { +pub struct Pipeline { /// Provider factory. provider_factory: ProviderFactory, /// All configured stages in the order they will be executed. - stages: Vec as DatabaseProviderFactory>::ProviderRW>>, + stages: Vec as DatabaseProviderFactory>::ProviderRW, E>>, /// The maximum block number to sync to. max_block: Option, static_file_producer: StaticFileProducer>, @@ -83,10 +83,13 @@ pub struct Pipeline { fail_on_unwind: bool, } -impl Pipeline { +impl Pipeline +where + E: BlockExecError, +{ /// Construct a pipeline using a [`PipelineBuilder`]. - pub fn builder() -> PipelineBuilder< as DatabaseProviderFactory>::ProviderRW> - { + pub fn builder( + ) -> PipelineBuilder< as DatabaseProviderFactory>::ProviderRW, E> { PipelineBuilder::default() } @@ -110,9 +113,12 @@ impl Pipeline { } } -impl Pipeline { +impl Pipeline +where + E: BlockExecError, +{ /// Registers progress metrics for each registered stage - pub fn register_metrics(&mut self) -> Result<(), PipelineError> { + pub fn register_metrics(&mut self) -> Result<(), PipelineError> { let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) }; let provider = self.provider_factory.provider()?; @@ -130,7 +136,7 @@ impl Pipeline { /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the /// pipeline and its result as a future. #[track_caller] - pub fn run_as_fut(mut self, target: Option) -> PipelineFut { + pub fn run_as_fut(mut self, target: Option) -> PipelineFut { // TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for // updating metrics. let _ = self.register_metrics(); // ignore error @@ -161,7 +167,7 @@ impl Pipeline { /// Run the pipeline in an infinite loop. Will terminate early if the user has specified /// a `max_block` in the pipeline. - pub async fn run(&mut self) -> Result<(), PipelineError> { + pub async fn run(&mut self) -> Result<(), PipelineError> { let _ = self.register_metrics(); // ignore error loop { @@ -202,7 +208,7 @@ impl Pipeline { /// This will be [`ControlFlow::Continue`] or [`ControlFlow::NoProgress`] of the _last_ stage in /// the pipeline (for example the `Finish` stage). Or [`ControlFlow::Unwind`] of the stage /// that caused the unwind. - pub async fn run_loop(&mut self) -> Result { + pub async fn run_loop(&mut self) -> Result> { self.move_to_static_files()?; let mut previous_stage = None; @@ -279,7 +285,7 @@ impl Pipeline { &mut self, to: BlockNumber, bad_block: Option, - ) -> Result<(), PipelineError> { + ) -> Result<(), PipelineError> { // Unwind stages in reverse order of execution let unwind_pipeline = self.stages.iter_mut().rev(); @@ -380,7 +386,7 @@ impl Pipeline { &mut self, previous_stage: Option, stage_index: usize, - ) -> Result { + ) -> Result> { let total_stages = self.stages.len(); let stage = &mut self.stages[stage_index]; @@ -494,12 +500,12 @@ impl Pipeline { } } -fn on_stage_error( +fn on_stage_error( factory: &ProviderFactory, stage_id: StageId, prev_checkpoint: Option, - err: StageError, -) -> Result, PipelineError> { + err: StageError, +) -> Result, PipelineError> { if let StageError::DetachedHead { local_head, header, error } = err { warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head"); @@ -581,7 +587,10 @@ fn on_stage_error( } } -impl std::fmt::Debug for Pipeline { +impl std::fmt::Debug for Pipeline +where + E: BlockExecError, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Pipeline") .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::>()) @@ -600,7 +609,7 @@ mod tests { use crate::{test_utils::TestStage, UnwindOutput}; use assert_matches::assert_matches; use reth_consensus::ConsensusError; - use reth_errors::ProviderError; + use reth_errors::{BlockExecutionError, ProviderError}; use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB}; use reth_prune::PruneModes; use reth_testing_utils::generators::{self, random_block_with_parent}; @@ -648,7 +657,7 @@ mod tests { let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter(); let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter(); - let mut pipeline = Pipeline::::builder() + let mut pipeline = Pipeline::::builder() .add_stage(stage_a) .add_stage(stage_b) .with_max_block(10) @@ -734,7 +743,7 @@ mod tests { let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter(); let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter(); - let mut pipeline = Pipeline::::builder() + let mut pipeline = Pipeline::::builder() .add_stage(stage_a) .add_stage(stage_b) .add_stage(stage_c) @@ -865,7 +874,7 @@ mod tests { async fn unwind_pipeline_with_intermediate_progress() { let provider_factory = create_test_provider_factory(); - let mut pipeline = Pipeline::::builder() + let mut pipeline = Pipeline::::builder() .add_stage( TestStage::new(StageId::Other("A")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) @@ -965,7 +974,7 @@ mod tests { async fn run_pipeline_with_unwind() { let provider_factory = create_test_provider_factory(); - let mut pipeline = Pipeline::::builder() + let mut pipeline = Pipeline::::builder() .add_stage( TestStage::new(StageId::Other("A")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) @@ -1086,7 +1095,7 @@ mod tests { async fn pipeline_error_handling() { // Non-fatal let provider_factory = create_test_provider_factory(); - let mut pipeline = Pipeline::::builder() + let mut pipeline = Pipeline::::builder() .add_stage( TestStage::new(StageId::Other("NonFatal")) .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) @@ -1102,7 +1111,7 @@ mod tests { // Fatal let provider_factory = create_test_provider_factory(); - let mut pipeline = Pipeline::::builder() + let mut pipeline = Pipeline::::builder() .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err( StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)), ))) diff --git a/crates/stages/api/src/pipeline/set.rs b/crates/stages/api/src/pipeline/set.rs index c8fbf4c71d8e..b18834c370ca 100644 --- a/crates/stages/api/src/pipeline/set.rs +++ b/crates/stages/api/src/pipeline/set.rs @@ -1,3 +1,5 @@ +use reth_errors::BlockExecError; + use crate::{Stage, StageId}; use std::{ collections::HashMap, @@ -10,26 +12,35 @@ use std::{ /// individual stage sets to determine what kind of configuration they expose. /// /// Individual stages in the set can be added, removed and overridden using [`StageSetBuilder`]. -pub trait StageSet: Sized { +pub trait StageSet: Sized +where + E: BlockExecError, +{ /// Configures the stages in the set. - fn builder(self) -> StageSetBuilder; + fn builder(self) -> StageSetBuilder; /// Overrides the given [`Stage`], if it is in this set. /// /// # Panics /// /// Panics if the [`Stage`] is not in this set. - fn set + 'static>(self, stage: S) -> StageSetBuilder { + fn set + 'static>(self, stage: S) -> StageSetBuilder { self.builder().set(stage) } } -struct StageEntry { - stage: Box>, +struct StageEntry +where + E: BlockExecError, +{ + stage: Box>, enabled: bool, } -impl Debug for StageEntry { +impl Debug for StageEntry +where + E: BlockExecError, +{ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("StageEntry") .field("stage", &self.stage.id()) @@ -44,18 +55,27 @@ impl Debug for StageEntry { /// to the final sync pipeline before/after their dependencies. /// /// Stages inside the set can be disabled, enabled, overridden and reordered. -pub struct StageSetBuilder { - stages: HashMap>, +pub struct StageSetBuilder +where + E: BlockExecError, +{ + stages: HashMap>, order: Vec, } -impl Default for StageSetBuilder { +impl Default for StageSetBuilder +where + E: BlockExecError, +{ fn default() -> Self { Self { stages: HashMap::default(), order: Vec::new() } } } -impl Debug for StageSetBuilder { +impl Debug for StageSetBuilder +where + E: BlockExecError, +{ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("StageSetBuilder") .field("stages", &self.stages) @@ -64,14 +84,17 @@ impl Debug for StageSetBuilder { } } -impl StageSetBuilder { +impl StageSetBuilder +where + E: BlockExecError, +{ fn index_of(&self, stage_id: StageId) -> usize { let index = self.order.iter().position(|&id| id == stage_id); index.unwrap_or_else(|| panic!("Stage does not exist in set: {stage_id}")) } - fn upsert_stage_state(&mut self, stage: Box>, added_at_index: usize) { + fn upsert_stage_state(&mut self, stage: Box>, added_at_index: usize) { let stage_id = stage.id(); if self.stages.insert(stage.id(), StageEntry { stage, enabled: true }).is_some() { if let Some(to_remove) = self @@ -91,7 +114,7 @@ impl StageSetBuilder { /// # Panics /// /// Panics if the [`Stage`] is not in this set. - pub fn set + 'static>(mut self, stage: S) -> Self { + pub fn set + 'static>(mut self, stage: S) -> Self { let entry = self .stages .get_mut(&stage.id()) @@ -103,7 +126,7 @@ impl StageSetBuilder { /// Adds the given [`Stage`] at the end of this set. /// /// If the stage was already in the group, it is removed from its previous place. - pub fn add_stage + 'static>(mut self, stage: S) -> Self { + pub fn add_stage + 'static>(mut self, stage: S) -> Self { let target_index = self.order.len(); self.order.push(stage.id()); self.upsert_stage_state(Box::new(stage), target_index); @@ -113,7 +136,7 @@ impl StageSetBuilder { /// Adds the given [`Stage`] at the end of this set if it's [`Some`]. /// /// If the stage was already in the group, it is removed from its previous place. - pub fn add_stage_opt + 'static>(self, stage: Option) -> Self { + pub fn add_stage_opt + 'static>(self, stage: Option) -> Self { if let Some(stage) = stage { self.add_stage(stage) } else { @@ -125,7 +148,7 @@ impl StageSetBuilder { /// /// If a stage is in both sets, it is removed from its previous place in this set. Because of /// this, it is advisable to merge sets first and re-order stages after if needed. - pub fn add_set>(mut self, set: Set) -> Self { + pub fn add_set>(mut self, set: Set) -> Self { for stage in set.builder().build() { let target_index = self.order.len(); self.order.push(stage.id()); @@ -141,7 +164,11 @@ impl StageSetBuilder { /// # Panics /// /// Panics if the dependency stage is not in this set. - pub fn add_before + 'static>(mut self, stage: S, before: StageId) -> Self { + pub fn add_before + 'static>( + mut self, + stage: S, + before: StageId, + ) -> Self { let target_index = self.index_of(before); self.order.insert(target_index, stage.id()); self.upsert_stage_state(Box::new(stage), target_index); @@ -155,7 +182,7 @@ impl StageSetBuilder { /// # Panics /// /// Panics if the dependency stage is not in this set. - pub fn add_after + 'static>(mut self, stage: S, after: StageId) -> Self { + pub fn add_after + 'static>(mut self, stage: S, after: StageId) -> Self { let target_index = self.index_of(after) + 1; self.order.insert(target_index, stage.id()); self.upsert_stage_state(Box::new(stage), target_index); @@ -236,7 +263,7 @@ impl StageSetBuilder { } /// Consumes the builder and returns the contained [`Stage`]s in the order specified. - pub fn build(mut self) -> Vec>> { + pub fn build(mut self) -> Vec>> { let mut stages = Vec::new(); for id in &self.order { if let Some(entry) = self.stages.remove(id) { @@ -249,7 +276,10 @@ impl StageSetBuilder { } } -impl StageSet for StageSetBuilder { +impl StageSet for StageSetBuilder +where + E: BlockExecError, +{ fn builder(self) -> Self { self } diff --git a/crates/stages/api/src/stage.rs b/crates/stages/api/src/stage.rs index 368269782a29..67e2616be224 100644 --- a/crates/stages/api/src/stage.rs +++ b/crates/stages/api/src/stage.rs @@ -1,5 +1,6 @@ use crate::{error::StageError, StageCheckpoint, StageId}; use alloy_primitives::{BlockNumber, TxNumber}; +use reth_errors::{BlockExecError, BlockExecutionError}; use reth_provider::{BlockReader, ProviderError}; use std::{ cmp::{max, min}, @@ -70,13 +71,14 @@ impl ExecInput { /// Return the next block range determined the number of transactions within it. /// This function walks the block indices until either the end of the range is reached or /// the number of transactions exceeds the threshold. - pub fn next_block_range_with_transaction_threshold( + pub fn next_block_range_with_transaction_threshold( &self, provider: &Provider, tx_threshold: u64, - ) -> Result<(Range, RangeInclusive, bool), StageError> + ) -> Result<(Range, RangeInclusive, bool), StageError> where Provider: BlockReader, + E: BlockExecError, { let start_block = self.next_block(); let target_block = self.target(); @@ -190,7 +192,10 @@ pub struct UnwindOutput { /// /// Stages receive [`DBProvider`](reth_provider::DBProvider). #[auto_impl::auto_impl(Box)] -pub trait Stage: Send + Sync { +pub trait Stage: Send + Sync +where + E: BlockExecError, +{ /// Get the ID of the stage. /// /// Stage IDs must be unique. @@ -224,21 +229,25 @@ pub trait Stage: Send + Sync { &mut self, _cx: &mut Context<'_>, _input: ExecInput, - ) -> Poll> { + ) -> Poll>> { Poll::Ready(Ok(())) } /// Execute the stage. /// It is expected that the stage will write all necessary data to the database /// upon invoking this method. - fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result; + fn execute( + &mut self, + provider: &Provider, + input: ExecInput, + ) -> Result>; /// Post execution commit hook. /// /// This is called after the stage has been executed and the data has been committed by the /// provider. The stage may want to pass some data from [`Self::execute`] via the internal /// field. - fn post_execute_commit(&mut self) -> Result<(), StageError> { + fn post_execute_commit(&mut self) -> Result<(), StageError> { Ok(()) } @@ -247,28 +256,31 @@ pub trait Stage: Send + Sync { &mut self, provider: &Provider, input: UnwindInput, - ) -> Result; + ) -> Result>; /// Post unwind commit hook. /// /// This is called after the stage has been unwound and the data has been committed by the /// provider. The stage may want to pass some data from [`Self::unwind`] via the internal /// field. - fn post_unwind_commit(&mut self) -> Result<(), StageError> { + fn post_unwind_commit(&mut self) -> Result<(), StageError> { Ok(()) } } /// [Stage] trait extension. -pub trait StageExt: Stage { +pub trait StageExt: Stage +where + E: BlockExecError, +{ /// Utility extension for the `Stage` trait that invokes `Stage::poll_execute_ready` /// with [`poll_fn`] context. For more information see [`Stage::poll_execute_ready`]. fn execute_ready( &mut self, input: ExecInput, - ) -> impl Future> + Send { + ) -> impl Future>> + Send { poll_fn(move |cx| self.poll_execute_ready(cx, input)) } } -impl> StageExt for S {} +impl> StageExt for S where E: BlockExecError {} diff --git a/crates/stages/api/src/test_utils.rs b/crates/stages/api/src/test_utils.rs index 1f15e55140ed..18fe4fd08aa7 100644 --- a/crates/stages/api/src/test_utils.rs +++ b/crates/stages/api/src/test_utils.rs @@ -1,6 +1,7 @@ #![allow(missing_docs)] use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use reth_errors::BlockExecError; use std::{ collections::VecDeque, sync::{ @@ -13,15 +14,18 @@ use std::{ /// /// This can be used to mock expected outputs of [`Stage::execute`] and [`Stage::unwind`] #[derive(Debug)] -pub struct TestStage { +pub struct TestStage { id: StageId, - exec_outputs: VecDeque>, - unwind_outputs: VecDeque>, + exec_outputs: VecDeque>>, + unwind_outputs: VecDeque>>, post_execute_commit_counter: Arc, post_unwind_commit_counter: Arc, } -impl TestStage { +impl TestStage +where + E: BlockExecError, +{ pub fn new(id: StageId) -> Self { Self { id, @@ -32,25 +36,25 @@ impl TestStage { } } - pub fn with_exec(mut self, exec_outputs: VecDeque>) -> Self { + pub fn with_exec(mut self, exec_outputs: VecDeque>>) -> Self { self.exec_outputs = exec_outputs; self } pub fn with_unwind( mut self, - unwind_outputs: VecDeque>, + unwind_outputs: VecDeque>>, ) -> Self { self.unwind_outputs = unwind_outputs; self } - pub fn add_exec(mut self, output: Result) -> Self { + pub fn add_exec(mut self, output: Result>) -> Self { self.exec_outputs.push_back(output); self } - pub fn add_unwind(mut self, output: Result) -> Self { + pub fn add_unwind(mut self, output: Result>) -> Self { self.unwind_outputs.push_back(output); self } @@ -68,30 +72,33 @@ impl TestStage { } } -impl Stage for TestStage { +impl Stage for TestStage +where + E: BlockExecError, +{ fn id(&self) -> StageId { self.id } - fn execute(&mut self, _: &Provider, _input: ExecInput) -> Result { + fn execute(&mut self, _: &Provider, _input: ExecInput) -> Result> { self.exec_outputs .pop_front() .unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id)) } - fn post_execute_commit(&mut self) -> Result<(), StageError> { + fn post_execute_commit(&mut self) -> Result<(), StageError> { self.post_execute_commit_counter.fetch_add(1, Ordering::Relaxed); Ok(()) } - fn unwind(&mut self, _: &Provider, _input: UnwindInput) -> Result { + fn unwind(&mut self, _: &Provider, _input: UnwindInput) -> Result> { self.unwind_outputs .pop_front() .unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id)) } - fn post_unwind_commit(&mut self) -> Result<(), StageError> { + fn post_unwind_commit(&mut self) -> Result<(), StageError> { self.post_unwind_commit_counter.fetch_add(1, Ordering::Relaxed); Ok(()) diff --git a/crates/stages/stages/Cargo.toml b/crates/stages/stages/Cargo.toml index e7114eeb16ac..d72d27cea014 100644 --- a/crates/stages/stages/Cargo.toml +++ b/crates/stages/stages/Cargo.toml @@ -21,6 +21,7 @@ reth-db.workspace = true reth-db-api.workspace = true reth-etl.workspace = true reth-evm.workspace = true +reth-execution-errors.workspace = true reth-exex.workspace = true reth-network-p2p.workspace = true reth-primitives = { workspace = true, features = ["secp256k1"] } @@ -64,7 +65,6 @@ reth-chainspec.workspace = true reth-primitives = { workspace = true, features = ["test-utils", "arbitrary"] } reth-db = { workspace = true, features = ["test-utils", "mdbx"] } reth-evm-ethereum.workspace = true -reth-execution-errors.workspace = true reth-consensus = { workspace = true, features = ["test-utils"] } reth-network-p2p = { workspace = true, features = ["test-utils"] } reth-downloaders.workspace = true diff --git a/crates/stages/stages/benches/criterion.rs b/crates/stages/stages/benches/criterion.rs index 2bbb5ce0a542..42909ce1a1be 100644 --- a/crates/stages/stages/benches/criterion.rs +++ b/crates/stages/stages/benches/criterion.rs @@ -5,6 +5,7 @@ use alloy_primitives::BlockNumber; use criterion::{criterion_main, measurement::WallTime, BenchmarkGroup, Criterion}; use reth_config::config::{EtlConfig, TransactionLookupConfig}; use reth_db::{test_utils::TempDatabase, Database, DatabaseEnv}; +use reth_execution_errors::BlockExecutionError; use reth_provider::{test_utils::MockNodeTypesWithDB, DatabaseProvider, DatabaseProviderFactory}; use reth_stages::{ stages::{MerkleStage, SenderRecoveryStage, TransactionLookupStage}, @@ -152,7 +153,10 @@ fn measure_stage( label: String, ) where S: Clone - + Stage as Database>::TXMut, MockNodeTypesWithDB>>, + + Stage< + DatabaseProvider< as Database>::TXMut, MockNodeTypesWithDB>, + BlockExecutionError, + >, F: Fn(S, &TestStageDB, StageRange), { let stage_range = ( diff --git a/crates/stages/stages/src/sets.rs b/crates/stages/stages/src/sets.rs index 37c84be83a5b..f882570683bf 100644 --- a/crates/stages/stages/src/sets.rs +++ b/crates/stages/stages/src/sets.rs @@ -46,6 +46,7 @@ use alloy_primitives::B256; use reth_config::config::StageConfig; use reth_consensus::{Consensus, ConsensusError}; use reth_evm::execute::BlockExecutorProvider; +use reth_execution_errors::{BlockExecError, BlockExecutionError}; use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}; use reth_primitives_traits::Block; use reth_provider::HeaderSyncGapProvider; @@ -137,13 +138,13 @@ where { /// Appends the default offline stages and default finish stage to the given builder. pub fn add_offline_stages( - default_offline: StageSetBuilder, + default_offline: StageSetBuilder, executor_factory: E, stages_config: StageConfig, prune_modes: PruneModes, - ) -> StageSetBuilder + ) -> StageSetBuilder where - OfflineStages: StageSet, + OfflineStages: StageSet, { StageSetBuilder::default() .add_set(default_offline) @@ -152,16 +153,16 @@ where } } -impl StageSet for DefaultStages +impl StageSet for DefaultStages where P: HeaderSyncGapProvider + 'static, H: HeaderDownloader + 'static, B: BodyDownloader + 'static, E: BlockExecutorProvider, - OnlineStages: StageSet, - OfflineStages: StageSet, + OnlineStages: StageSet, + OfflineStages: StageSet, { - fn builder(self) -> StageSetBuilder { + fn builder(self) -> StageSetBuilder { Self::add_offline_stages( self.online.builder(), self.executor_factory, @@ -223,10 +224,10 @@ where pub fn builder_with_headers( headers: HeaderStage, body_downloader: B, - ) -> StageSetBuilder + ) -> StageSetBuilder where - HeaderStage: Stage, - BodyStage: Stage, + HeaderStage: Stage, + BodyStage: Stage, { StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader)) } @@ -239,10 +240,10 @@ where header_downloader: H, consensus: Arc>, stages_config: StageConfig, - ) -> StageSetBuilder + ) -> StageSetBuilder where - BodyStage: Stage, - HeaderStage: Stage, + BodyStage: Stage, + HeaderStage: Stage, { StageSetBuilder::default() .add_stage(HeaderStage::new( @@ -256,15 +257,16 @@ where } } -impl StageSet for OnlineStages +impl StageSet for OnlineStages where P: HeaderSyncGapProvider + 'static, H: HeaderDownloader
::Header> + 'static, B: BodyDownloader + 'static, - HeaderStage: Stage, - BodyStage: Stage, + HeaderStage: Stage, + BodyStage: Stage, + E: BlockExecError, { - fn builder(self) -> StageSetBuilder { + fn builder(self) -> StageSetBuilder { StageSetBuilder::default() .add_stage(HeaderStage::new( self.provider, @@ -308,16 +310,16 @@ impl OfflineStages { } } -impl StageSet for OfflineStages +impl StageSet for OfflineStages where E: BlockExecutorProvider, - ExecutionStages: StageSet, - PruneSenderRecoveryStage: Stage, - HashingStages: StageSet, - HistoryIndexingStages: StageSet, - PruneStage: Stage, + ExecutionStages: StageSet, + PruneSenderRecoveryStage: Stage, + HashingStages: StageSet, + HistoryIndexingStages: StageSet, + PruneStage: Stage, { - fn builder(self) -> StageSetBuilder { + fn builder(self) -> StageSetBuilder { ExecutionStages::new( self.executor_factory, self.stages_config.clone(), @@ -365,13 +367,13 @@ impl ExecutionStages { } } -impl StageSet for ExecutionStages +impl StageSet for ExecutionStages where E: BlockExecutorProvider, - SenderRecoveryStage: Stage, - ExecutionStage: Stage, + SenderRecoveryStage: Stage, + ExecutionStage: Stage, { - fn builder(self) -> StageSetBuilder { + fn builder(self) -> StageSetBuilder { StageSetBuilder::default() .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery)) .add_stage(ExecutionStage::from_config( @@ -391,13 +393,14 @@ pub struct HashingStages { stages_config: StageConfig, } -impl StageSet for HashingStages +impl StageSet for HashingStages where - MerkleStage: Stage, - AccountHashingStage: Stage, - StorageHashingStage: Stage, + MerkleStage: Stage, + AccountHashingStage: Stage, + StorageHashingStage: Stage, + E: BlockExecError, { - fn builder(self) -> StageSetBuilder { + fn builder(self) -> StageSetBuilder { StageSetBuilder::default() .add_stage(MerkleStage::default_unwind()) .add_stage(AccountHashingStage::new( @@ -422,13 +425,14 @@ pub struct HistoryIndexingStages { prune_modes: PruneModes, } -impl StageSet for HistoryIndexingStages +impl StageSet for HistoryIndexingStages where - TransactionLookupStage: Stage, - IndexStorageHistoryStage: Stage, - IndexAccountHistoryStage: Stage, + TransactionLookupStage: Stage, + IndexStorageHistoryStage: Stage, + IndexAccountHistoryStage: Stage, + E: BlockExecError, { - fn builder(self) -> StageSetBuilder { + fn builder(self) -> StageSetBuilder { StageSetBuilder::default() .add_stage(TransactionLookupStage::new( self.stages_config.transaction_lookup, diff --git a/crates/stages/stages/src/stages/bodies.rs b/crates/stages/stages/src/stages/bodies.rs index 7c796ec6ad10..307dad401db7 100644 --- a/crates/stages/stages/src/stages/bodies.rs +++ b/crates/stages/stages/src/stages/bodies.rs @@ -2,6 +2,7 @@ use super::missing_static_data_error; use futures_util::TryStreamExt; use reth_db::{tables, transaction::DbTx}; use reth_db_api::{cursor::DbCursorRO, transaction::DbTxMut}; +use reth_execution_errors::BlockExecutionError; use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}; use reth_primitives::StaticFileSegment; use reth_provider::{ @@ -68,7 +69,7 @@ impl BodyStage { &self, provider: &Provider, unwind_block: Option, - ) -> Result<(), StageError> + ) -> Result<(), StageError> where Provider: DBProvider + BlockReader + StaticFileProviderFactory, { @@ -142,7 +143,7 @@ impl BodyStage { } } -impl Stage for BodyStage +impl Stage for BodyStage where Provider: DBProvider + StaticFileProviderFactory @@ -160,7 +161,7 @@ where &mut self, cx: &mut Context<'_>, input: ExecInput, - ) -> Poll> { + ) -> Poll>> { if input.target_reached() || self.buffer.is_some() { return Poll::Ready(Ok(())) } @@ -186,7 +187,11 @@ where /// Download block bodies from the last checkpoint for this stage up until the latest synced /// header, limited by the stage's batch size. - fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + fn execute( + &mut self, + provider: &Provider, + input: ExecInput, + ) -> Result> { if input.target_reached() { return Ok(ExecOutput::done(input.checkpoint())) } @@ -226,7 +231,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { self.buffer.take(); self.ensure_consistency(provider, Some(input.unwind_to))?; @@ -489,6 +494,7 @@ mod tests { models::{StoredBlockBodyIndices, StoredBlockOmmers}, transaction::{DbTx, DbTxMut}, }; + use reth_execution_errors::BlockExecutionError; use reth_network_p2p::{ bodies::{ downloader::{BodyDownloader, BodyDownloaderResult}, @@ -544,6 +550,7 @@ mod tests { } impl StageTestRunner for BodyTestRunner { + type E = BlockExecutionError; type S = BodyStage; fn db(&self) -> &TestStageDB { diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index 5557beda519a..10bb53eddc41 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -165,7 +165,7 @@ where provider: impl StatsReader, start_block: u64, max_block: u64, - ) -> Result { + ) -> Result> { let mut prune_modes = self.prune_modes.clone(); // If we're not executing MerkleStage from scratch (by threshold or first-sync), then erase @@ -192,7 +192,7 @@ where provider: &Provider, checkpoint: u64, unwind_to: Option, - ) -> Result<(), StageError> + ) -> Result<(), StageError> where Provider: StaticFileProviderFactory + DBProvider + BlockReader + HeaderProvider, { @@ -262,7 +262,7 @@ where } } -impl Stage for ExecutionStage +impl Stage for ExecutionStage where E: BlockExecutorProvider, Provider: DBProvider @@ -284,14 +284,18 @@ where &mut self, cx: &mut Context<'_>, _: ExecInput, - ) -> Poll> { + ) -> Poll>> { ready!(self.exex_manager_handle.poll_ready(cx)); Poll::Ready(Ok(())) } /// Execute the stage - fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + fn execute( + &mut self, + provider: &Provider, + input: ExecInput, + ) -> Result> { if input.target_reached() { return Ok(ExecOutput::done(input.checkpoint())) } @@ -360,7 +364,7 @@ where header.parent_hash(), NumHash::new(header.number(), header.hash_slow()), )), - error: BlockErrorKind::Execution(error), + error: BlockErrorKind::::Execution(error), } }) })?; @@ -457,7 +461,7 @@ where }) } - fn post_execute_commit(&mut self) -> Result<(), StageError> { + fn post_execute_commit(&mut self) -> Result<(), StageError> { let Some(chain) = self.post_execute_commit_input.take() else { return Ok(()) }; // NOTE: We can ignore the error here, since an error means that the channel is closed, @@ -475,7 +479,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX)); if range.is_empty() { @@ -531,7 +535,7 @@ where Ok(UnwindOutput { checkpoint }) } - fn post_unwind_commit(&mut self) -> Result<(), StageError> { + fn post_unwind_commit(&mut self) -> Result<(), StageError> { let Some(chain) = self.post_unwind_commit_input.take() else { return Ok(()) }; // NOTE: We can ignore the error here, since an error means that the channel is closed, diff --git a/crates/stages/stages/src/stages/finish.rs b/crates/stages/stages/src/stages/finish.rs index 0b8f8f9ee1ce..e4316df19eed 100644 --- a/crates/stages/stages/src/stages/finish.rs +++ b/crates/stages/stages/src/stages/finish.rs @@ -1,3 +1,4 @@ +use reth_execution_errors::BlockExecutionError; use reth_stages_api::{ ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput, }; @@ -10,7 +11,7 @@ use reth_stages_api::{ #[non_exhaustive] pub struct FinishStage; -impl Stage for FinishStage { +impl Stage for FinishStage { fn id(&self) -> StageId { StageId::Finish } @@ -19,7 +20,7 @@ impl Stage for FinishStage { &mut self, _provider: &Provider, input: ExecInput, - ) -> Result { + ) -> Result> { Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) } @@ -27,7 +28,7 @@ impl Stage for FinishStage { &mut self, _provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) }) } } @@ -39,6 +40,7 @@ mod tests { stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB, UnwindStageTestRunner, }; + use reth_execution_errors::BlockExecutionError; use reth_primitives::SealedHeader; use reth_provider::providers::StaticFileWriter; use reth_testing_utils::{ @@ -54,6 +56,7 @@ mod tests { } impl StageTestRunner for FinishTestRunner { + type E = BlockExecutionError; type S = FinishStage; fn db(&self) -> &TestStageDB { diff --git a/crates/stages/stages/src/stages/hashing_account.rs b/crates/stages/stages/src/stages/hashing_account.rs index d256883e1618..34ffafb912bd 100644 --- a/crates/stages/stages/src/stages/hashing_account.rs +++ b/crates/stages/stages/src/stages/hashing_account.rs @@ -7,6 +7,7 @@ use reth_db_api::{ transaction::{DbTx, DbTxMut}, }; use reth_etl::Collector; +use reth_execution_errors::{BlockExecError, BlockExecutionError}; use reth_primitives::Account; use reth_provider::{AccountExtReader, DBProvider, HashingWriter, StatsReader}; use reth_stages_api::{ @@ -61,7 +62,10 @@ impl AccountHashingStage { pub fn seed( provider: &reth_provider::DatabaseProvider, opts: SeedOpts, - ) -> Result, StageError> + ) -> Result< + Vec<(alloy_primitives::Address, reth_primitives::Account)>, + StageError, + > where N::Primitives: reth_primitives_traits::FullNodePrimitives< Block = reth_primitives::Block, @@ -131,7 +135,7 @@ impl Default for AccountHashingStage { } } -impl Stage for AccountHashingStage +impl Stage for AccountHashingStage where Provider: DBProvider + HashingWriter + AccountExtReader + StatsReader, { @@ -141,7 +145,11 @@ where } /// Execute the stage. - fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + fn execute( + &mut self, + provider: &Provider, + input: ExecInput, + ) -> Result> { if input.target_reached() { return Ok(ExecOutput::done(input.checkpoint())) } @@ -232,7 +240,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); @@ -252,10 +260,13 @@ where } /// Flushes channels hashes to ETL collector. -fn collect( +fn collect( channels: &mut Vec, RawValue)>>, collector: &mut Collector, RawValue>, -) -> Result<(), StageError> { +) -> Result<(), StageError> +where + E: BlockExecError, +{ for channel in channels.iter_mut() { while let Ok((key, v)) = channel.recv() { collector.insert(key, v)?; @@ -354,6 +365,7 @@ mod tests { use super::*; use crate::test_utils::TestStageDB; use alloy_primitives::Address; + use reth_execution_errors::BlockExecutionError; use reth_provider::DatabaseProviderFactory; pub(crate) struct AccountHashingTestRunner { @@ -430,6 +442,7 @@ mod tests { } impl StageTestRunner for AccountHashingTestRunner { + type E = BlockExecutionError; type S = AccountHashingStage; fn db(&self) -> &TestStageDB { diff --git a/crates/stages/stages/src/stages/hashing_storage.rs b/crates/stages/stages/src/stages/hashing_storage.rs index 7967aa8542c6..0681e82a3d4f 100644 --- a/crates/stages/stages/src/stages/hashing_storage.rs +++ b/crates/stages/stages/src/stages/hashing_storage.rs @@ -9,6 +9,7 @@ use reth_db_api::{ transaction::{DbTx, DbTxMut}, }; use reth_etl::Collector; +use reth_execution_errors::BlockExecutionError; use reth_primitives::StorageEntry; use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader}; use reth_stages_api::{ @@ -62,7 +63,7 @@ impl Default for StorageHashingStage { } } -impl Stage for StorageHashingStage +impl Stage for StorageHashingStage where Provider: DBProvider + StorageReader + HashingWriter + StatsReader, { @@ -72,7 +73,11 @@ where } /// Execute the stage. - fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + fn execute( + &mut self, + provider: &Provider, + input: ExecInput, + ) -> Result> { let tx = provider.tx_ref(); if input.target_reached() { return Ok(ExecOutput::done(input.checkpoint())) @@ -165,7 +170,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); @@ -187,7 +192,7 @@ where fn collect( channels: &mut Vec, CompactU256)>>, collector: &mut Collector, CompactU256>, -) -> Result<(), StageError> { +) -> Result<(), StageError> { for channel in channels.iter_mut() { while let Ok((key, v)) = channel.recv() { collector.insert(key, v)?; @@ -219,6 +224,7 @@ mod tests { cursor::{DbCursorRW, DbDupCursorRO}, models::StoredBlockBodyIndices, }; + use reth_execution_errors::BlockExecutionError; use reth_primitives::SealedBlock; use reth_primitives_traits::SignedTransaction; use reth_provider::providers::StaticFileWriter; @@ -313,6 +319,7 @@ mod tests { } impl StageTestRunner for StorageHashingTestRunner { + type E = BlockExecutionError; type S = StorageHashingStage; fn db(&self) -> &TestStageDB { diff --git a/crates/stages/stages/src/stages/headers.rs b/crates/stages/stages/src/stages/headers.rs index cbec9c9ae4c3..8677dadc5862 100644 --- a/crates/stages/stages/src/stages/headers.rs +++ b/crates/stages/stages/src/stages/headers.rs @@ -11,6 +11,7 @@ use reth_db_api::{ DbTxUnwindExt, }; use reth_etl::Collector; +use reth_execution_errors::BlockExecutionError; use reth_network_p2p::headers::{downloader::HeaderDownloader, error::HeadersDownloaderError}; use reth_primitives::{NodePrimitives, SealedHeader, StaticFileSegment}; use reth_primitives_traits::{serde_bincode_compat, FullBlockHeader}; @@ -91,7 +92,10 @@ where /// /// Writes to static files ( `Header | HeaderTD | HeaderHash` ) and [`tables::HeaderNumbers`] /// database table. - fn write_headers

(&mut self, provider: &P) -> Result + fn write_headers

( + &mut self, + provider: &P, + ) -> Result> where P: DBProvider + StaticFileProviderFactory, Downloader: HeaderDownloader

::BlockHeader>, @@ -198,7 +202,7 @@ where } } -impl Stage for HeaderStage +impl Stage for HeaderStage where Provider: DBProvider + StaticFileProviderFactory, P: HeaderSyncGapProvider
::BlockHeader>, @@ -214,7 +218,7 @@ where &mut self, cx: &mut Context<'_>, input: ExecInput, - ) -> Poll> { + ) -> Poll>> { let current_checkpoint = input.checkpoint(); // Return if stage has already completed the gap on the ETL files @@ -287,7 +291,11 @@ where /// Download the headers in reverse order (falling block numbers) /// starting from the tip of the chain - fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + fn execute( + &mut self, + provider: &Provider, + input: ExecInput, + ) -> Result> { let current_checkpoint = input.checkpoint(); if self.sync_gap.as_ref().ok_or(StageError::MissingSyncGap)?.is_closed() { @@ -335,7 +343,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { self.sync_gap.take(); // First unwind the db tables, until the unwind_to block number. use the walker to unwind @@ -421,6 +429,7 @@ mod tests { use reth_downloaders::headers::reverse_headers::{ ReverseHeadersDownloader, ReverseHeadersDownloaderBuilder, }; + use reth_execution_errors::BlockExecutionError; use reth_network_p2p::test_utils::{TestHeaderDownloader, TestHeadersClient}; use reth_provider::{test_utils::MockNodeTypesWithDB, BlockNumReader}; use tokio::sync::watch; @@ -456,6 +465,7 @@ mod tests { impl + 'static> StageTestRunner for HeadersTestRunner { + type E = BlockExecutionError; type S = HeaderStage, D>; fn db(&self) -> &TestStageDB { diff --git a/crates/stages/stages/src/stages/index_account_history.rs b/crates/stages/stages/src/stages/index_account_history.rs index 38c238e5d988..c39ae8c33f1d 100644 --- a/crates/stages/stages/src/stages/index_account_history.rs +++ b/crates/stages/stages/src/stages/index_account_history.rs @@ -3,6 +3,7 @@ use alloy_primitives::Address; use reth_config::config::{EtlConfig, IndexHistoryConfig}; use reth_db::tables; use reth_db_api::{models::ShardedKey, table::Decode, transaction::DbTxMut}; +use reth_execution_errors::BlockExecutionError; use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter}; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ @@ -42,7 +43,7 @@ impl Default for IndexAccountHistoryStage { } } -impl Stage for IndexAccountHistoryStage +impl Stage for IndexAccountHistoryStage where Provider: DBProvider + HistoryWriter + PruneCheckpointReader + PruneCheckpointWriter, @@ -57,7 +58,7 @@ where &mut self, provider: &Provider, mut input: ExecInput, - ) -> Result { + ) -> Result> { if let Some((target_prunable_block, prune_mode)) = self .prune_mode .map(|mode| { @@ -103,17 +104,22 @@ where } info!(target: "sync::stages::index_account_history::exec", ?first_sync, "Collecting indices"); - let collector = - collect_history_indices::<_, tables::AccountChangeSets, tables::AccountsHistory, _>( - provider, - range.clone(), - ShardedKey::new, - |(index, value)| (index, value.address), - &self.etl_config, - )?; + let collector = collect_history_indices::< + _, + tables::AccountChangeSets, + tables::AccountsHistory, + _, + BlockExecutionError, + >( + provider, + range.clone(), + ShardedKey::new, + |(index, value)| (index, value.address), + &self.etl_config, + )?; info!(target: "sync::stages::index_account_history::exec", "Loading indices into database"); - load_history_indices::<_, tables::AccountsHistory, _>( + load_history_indices::<_, tables::AccountsHistory, _, BlockExecutionError>( provider, collector, first_sync, @@ -130,7 +136,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); @@ -159,6 +165,7 @@ mod tests { }, transaction::DbTx, }; + use reth_execution_errors::BlockExecutionError; use reth_provider::{providers::StaticFileWriter, DatabaseProviderFactory}; use reth_testing_utils::generators::{ self, random_block_range, random_changeset_range, random_contract_account_range, @@ -511,6 +518,7 @@ mod tests { } impl StageTestRunner for IndexAccountHistoryTestRunner { + type E = BlockExecutionError; type S = IndexAccountHistoryStage; fn db(&self) -> &TestStageDB { diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index ba61e6312302..8adb4361ea24 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -7,6 +7,7 @@ use reth_db_api::{ table::Decode, transaction::DbTxMut, }; +use reth_execution_errors::BlockExecutionError; use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter}; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; @@ -44,7 +45,7 @@ impl Default for IndexStorageHistoryStage { } } -impl Stage for IndexStorageHistoryStage +impl Stage for IndexStorageHistoryStage where Provider: DBProvider + PruneCheckpointWriter + HistoryWriter + PruneCheckpointReader, @@ -59,7 +60,7 @@ where &mut self, provider: &Provider, mut input: ExecInput, - ) -> Result { + ) -> Result> { if let Some((target_prunable_block, prune_mode)) = self .prune_mode .map(|mode| { @@ -105,19 +106,24 @@ where } info!(target: "sync::stages::index_storage_history::exec", ?first_sync, "Collecting indices"); - let collector = - collect_history_indices::<_, tables::StorageChangeSets, tables::StoragesHistory, _>( - provider, - BlockNumberAddress::range(range.clone()), - |AddressStorageKey((address, storage_key)), highest_block_number| { - StorageShardedKey::new(address, storage_key, highest_block_number) - }, - |(key, value)| (key.block_number(), AddressStorageKey((key.address(), value.key))), - &self.etl_config, - )?; + let collector = collect_history_indices::< + _, + tables::StorageChangeSets, + tables::StoragesHistory, + _, + BlockExecutionError, + >( + provider, + BlockNumberAddress::range(range.clone()), + |AddressStorageKey((address, storage_key)), highest_block_number| { + StorageShardedKey::new(address, storage_key, highest_block_number) + }, + |(key, value)| (key.block_number(), AddressStorageKey((key.address(), value.key))), + &self.etl_config, + )?; info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database"); - load_history_indices::<_, tables::StoragesHistory, _>( + load_history_indices::<_, tables::StoragesHistory, _, BlockExecutionError>( provider, collector, first_sync, @@ -136,7 +142,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); @@ -164,6 +170,7 @@ mod tests { }, transaction::DbTx, }; + use reth_execution_errors::BlockExecutionError; use reth_primitives::StorageEntry; use reth_provider::{providers::StaticFileWriter, DatabaseProviderFactory}; use reth_testing_utils::generators::{ @@ -533,6 +540,7 @@ mod tests { } impl StageTestRunner for IndexStorageHistoryTestRunner { + type E = BlockExecutionError; type S = IndexStorageHistoryStage; fn db(&self) -> &TestStageDB { diff --git a/crates/stages/stages/src/stages/merkle.rs b/crates/stages/stages/src/stages/merkle.rs index 3d36964a713e..8156c8bee5e2 100644 --- a/crates/stages/stages/src/stages/merkle.rs +++ b/crates/stages/stages/src/stages/merkle.rs @@ -4,6 +4,7 @@ use reth_codecs::Compact; use reth_consensus::ConsensusError; use reth_db::tables; use reth_db_api::transaction::{DbTx, DbTxMut}; +use reth_execution_errors::{BlockExecError, BlockExecutionError}; use reth_primitives::{GotExpected, SealedHeader}; use reth_provider::{ DBProvider, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter, @@ -100,7 +101,7 @@ impl MerkleStage { pub fn get_execution_checkpoint( &self, provider: &impl StageCheckpointReader, - ) -> Result, StageError> { + ) -> Result, StageError> { let buf = provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?.unwrap_or_default(); @@ -117,7 +118,7 @@ impl MerkleStage { &self, provider: &impl StageCheckpointWriter, checkpoint: Option, - ) -> Result<(), StageError> { + ) -> Result<(), StageError> { let mut buf = vec![]; if let Some(checkpoint) = checkpoint { debug!( @@ -131,7 +132,7 @@ impl MerkleStage { } } -impl Stage for MerkleStage +impl Stage for MerkleStage where Provider: DBProvider + TrieWriter @@ -151,7 +152,11 @@ where } /// Execute the stage. - fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + fn execute( + &mut self, + provider: &Provider, + input: ExecInput, + ) -> Result> { let threshold = match self { Self::Unwind => { info!(target: "sync::stages::merkle::unwind", "Stage is always skipped"); @@ -291,7 +296,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { let tx = provider.tx_ref(); let range = input.unwind_block_range(); if matches!(self, Self::Execution { .. }) { @@ -344,11 +349,11 @@ where /// Check that the computed state root matches the root in the expected header. #[inline] -fn validate_state_root( +fn validate_state_root( got: B256, expected: SealedHeader, target_block: BlockNumber, -) -> Result<(), StageError> { +) -> Result<(), StageError> { if got == expected.state_root() { Ok(()) } else { @@ -372,6 +377,7 @@ mod tests { use alloy_primitives::{keccak256, U256}; use assert_matches::assert_matches; use reth_db_api::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}; + use reth_execution_errors::BlockExecutionError; use reth_primitives::{SealedBlock, StaticFileSegment, StorageEntry}; use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory}; use reth_stages_api::StageUnitCheckpoint; @@ -477,6 +483,7 @@ mod tests { } impl StageTestRunner for MerkleTestRunner { + type E = BlockExecutionError; type S = MerkleStage; fn db(&self) -> &TestStageDB { diff --git a/crates/stages/stages/src/stages/prune.rs b/crates/stages/stages/src/stages/prune.rs index 4bd29882712b..4cc8a31b766e 100644 --- a/crates/stages/stages/src/stages/prune.rs +++ b/crates/stages/stages/src/stages/prune.rs @@ -1,4 +1,5 @@ use reth_db::{table::Value, transaction::DbTxMut}; +use reth_execution_errors::BlockExecutionError; use reth_primitives::NodePrimitives; use reth_provider::{ BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter, @@ -36,7 +37,7 @@ impl PruneStage { } } -impl Stage for PruneStage +impl Stage for PruneStage where Provider: DBProvider + PruneCheckpointReader @@ -48,7 +49,11 @@ where StageId::Prune } - fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + fn execute( + &mut self, + provider: &Provider, + input: ExecInput, + ) -> Result> { let mut pruner = PrunerBuilder::default() .segments(self.prune_modes.clone()) .delete_limit(self.commit_threshold) @@ -96,7 +101,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { // We cannot recover the data that was pruned in `execute`, so we just update the // checkpoints. let prune_checkpoints = provider.get_prune_checkpoints()?; @@ -125,7 +130,7 @@ impl PruneSenderRecoveryStage { } } -impl Stage for PruneSenderRecoveryStage +impl Stage for PruneSenderRecoveryStage where Provider: DBProvider + PruneCheckpointReader @@ -137,7 +142,11 @@ where StageId::PruneSenderRecovery } - fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + fn execute( + &mut self, + provider: &Provider, + input: ExecInput, + ) -> Result> { let mut result = self.0.execute(provider, input)?; // Adjust the checkpoint to the highest pruned block number of the Sender Recovery segment @@ -158,7 +167,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { self.0.unwind(provider, input) } } @@ -171,6 +180,7 @@ mod tests { TestRunnerError, TestStageDB, UnwindStageTestRunner, }; use alloy_primitives::B256; + use reth_execution_errors::BlockExecutionError; use reth_primitives::SealedBlock; use reth_primitives_traits::SignedTransaction; use reth_provider::{ @@ -187,6 +197,7 @@ mod tests { } impl StageTestRunner for PruneTestRunner { + type E = BlockExecutionError; type S = PruneStage; fn db(&self) -> &TestStageDB { diff --git a/crates/stages/stages/src/stages/sender_recovery.rs b/crates/stages/stages/src/stages/sender_recovery.rs index 34598714a18b..be8dc9ccd707 100644 --- a/crates/stages/stages/src/stages/sender_recovery.rs +++ b/crates/stages/stages/src/stages/sender_recovery.rs @@ -7,6 +7,7 @@ use reth_db_api::{ transaction::{DbTx, DbTxMut}, DbTxUnwindExt, }; +use reth_execution_errors::{BlockExecError, BlockExecutionError}; use reth_primitives::{GotExpected, NodePrimitives, StaticFileSegment}; use reth_primitives_traits::SignedTransaction; use reth_provider::{ @@ -31,7 +32,8 @@ const BATCH_SIZE: usize = 100_000; const WORKER_CHUNK_SIZE: usize = 100; /// Type alias for a sender that transmits the result of sender recovery. -type RecoveryResultSender = mpsc::Sender>>; +type RecoveryResultSender = + mpsc::Sender>>>; /// The sender recovery stage iterates over existing transactions, /// recovers the transaction signer and stores them @@ -56,7 +58,7 @@ impl Default for SenderRecoveryStage { } } -impl Stage for SenderRecoveryStage +impl Stage for SenderRecoveryStage where Provider: DBProvider + BlockReader @@ -73,7 +75,11 @@ where /// [`BlockBodyIndices`][reth_db::tables::BlockBodyIndices], /// collect transactions within that range, recover signer for each transaction and store /// entries in the [`TransactionSenders`][reth_db::tables::TransactionSenders] table. - fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + fn execute( + &mut self, + provider: &Provider, + input: ExecInput, + ) -> Result> { if input.target_reached() { return Ok(ExecOutput::done(input.checkpoint())) } @@ -122,7 +128,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); // Lookup latest tx id that we should unwind to @@ -139,15 +145,16 @@ where } } -fn recover_range( +fn recover_range( tx_range: Range, provider: &Provider, - tx_batch_sender: mpsc::Sender, RecoveryResultSender)>>, + tx_batch_sender: mpsc::Sender, RecoveryResultSender)>>, senders_cursor: &mut CURSOR, -) -> Result<(), StageError> +) -> Result<(), StageError> where Provider: DBProvider + HeaderProvider + StaticFileProviderFactory, CURSOR: DbCursorRW, + E: BlockExecError, { debug!(target: "sync::stages::sender_recovery", ?tx_range, "Sending batch for processing"); @@ -176,7 +183,7 @@ where Ok(result) => result, Err(error) => { return match *error { - SenderRecoveryStageError::FailedRecovery(err) => { + SenderRecoveryStageError::::FailedRecovery(err) => { // get the block number for the bad transaction let block_number = provider .tx_ref() @@ -197,11 +204,13 @@ where ), }) } - SenderRecoveryStageError::StageError(err) => Err(err), - SenderRecoveryStageError::RecoveredSendersMismatch(expectation) => { + SenderRecoveryStageError::::StageError(err) => Err(err), + SenderRecoveryStageError::::RecoveredSendersMismatch(expectation) => { Err(StageError::Fatal( - SenderRecoveryStageError::RecoveredSendersMismatch(expectation) - .into(), + SenderRecoveryStageError::::RecoveredSendersMismatch( + expectation, + ) + .into(), )) } } @@ -217,7 +226,7 @@ where let expected = tx_range.end - tx_range.start; if processed_transactions != expected { return Err(StageError::Fatal( - SenderRecoveryStageError::RecoveredSendersMismatch(GotExpected { + SenderRecoveryStageError::::RecoveredSendersMismatch(GotExpected { got: processed_transactions, expected, }) @@ -230,15 +239,16 @@ where /// Spawns a thread to handle the recovery of transaction senders for /// specified chunks of a given batch. It processes incoming ranges, fetching and recovering /// transactions in parallel using global rayon pool -fn setup_range_recovery( +fn setup_range_recovery( provider: &Provider, -) -> mpsc::Sender, RecoveryResultSender)>> +) -> mpsc::Sender, RecoveryResultSender)>> where Provider: DBProvider + HeaderProvider + StaticFileProviderFactory>, + E: BlockExecError, { - let (tx_sender, tx_receiver) = mpsc::channel::, RecoveryResultSender)>>(); + let (tx_sender, tx_receiver) = mpsc::channel::, RecoveryResultSender)>>(); let static_file_provider = provider.static_file_provider(); // We do not use `tokio::task::spawn_blocking` because, during a shutdown, @@ -303,10 +313,10 @@ where } #[inline] -fn recover_sender( +fn recover_sender( (tx_id, tx): (TxNumber, T), rlp_buf: &mut Vec, -) -> Result<(u64, Address), Box> { +) -> Result<(u64, Address), Box>> { rlp_buf.clear(); // We call [Signature::encode_and_recover_unchecked] because transactions run in the pipeline // are known to be valid - this means that we do not need to check whether or not the `s` @@ -320,7 +330,9 @@ fn recover_sender( Ok((tx_id, sender)) } -fn stage_checkpoint(provider: &Provider) -> Result +fn stage_checkpoint( + provider: &Provider, +) -> Result> where Provider: StatsReader + StaticFileProviderFactory + PruneCheckpointReader, { @@ -342,7 +354,10 @@ where #[derive(Error, Debug)] #[error(transparent)] -enum SenderRecoveryStageError { +enum SenderRecoveryStageError +where + E: BlockExecError, +{ /// A transaction failed sender recovery #[error(transparent)] FailedRecovery(#[from] FailedSenderRecoveryError), @@ -353,7 +368,7 @@ enum SenderRecoveryStageError { /// A different type of stage error occurred #[error(transparent)] - StageError(#[from] StageError), + StageError(#[from] StageError), } #[derive(Error, Debug)] @@ -373,6 +388,7 @@ mod tests { use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; use reth_db_api::cursor::DbCursorRO; + use reth_execution_errors::BlockExecutionError; use reth_primitives::{SealedBlock, TransactionSigned}; use reth_primitives_traits::SignedTransaction; use reth_provider::{ @@ -621,6 +637,7 @@ mod tests { } impl StageTestRunner for SenderRecoveryTestRunner { + type E = BlockExecutionError; type S = SenderRecoveryStage; fn db(&self) -> &TestStageDB { diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index 872af3baf950..85205826c7e3 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -8,6 +8,7 @@ use reth_db_api::{ transaction::{DbTx, DbTxMut}, }; use reth_etl::Collector; +use reth_execution_errors::BlockExecutionError; use reth_primitives::NodePrimitives; use reth_primitives_traits::SignedTransaction; use reth_provider::{ @@ -56,7 +57,7 @@ impl TransactionLookupStage { } } -impl Stage for TransactionLookupStage +impl Stage for TransactionLookupStage where Provider: DBProvider + PruneCheckpointWriter @@ -76,7 +77,7 @@ where &mut self, provider: &Provider, mut input: ExecInput, - ) -> Result { + ) -> Result> { if let Some((target_prunable_block, prune_mode)) = self .prune_mode .map(|mode| { @@ -191,7 +192,7 @@ where &mut self, provider: &Provider, input: UnwindInput, - ) -> Result { + ) -> Result> { let tx = provider.tx_ref(); let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size); @@ -223,7 +224,9 @@ where } } -fn stage_checkpoint(provider: &Provider) -> Result +fn stage_checkpoint( + provider: &Provider, +) -> Result> where Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader, { @@ -255,6 +258,7 @@ mod tests { }; use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; + use reth_execution_errors::BlockExecutionError; use reth_primitives::SealedBlock; use reth_provider::{ providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory, @@ -479,6 +483,7 @@ mod tests { } impl StageTestRunner for TransactionLookupTestRunner { + type E = BlockExecutionError; type S = TransactionLookupStage; fn db(&self) -> &TestStageDB { diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index add013d40710..e59118de32c5 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -10,6 +10,7 @@ use reth_db_api::{ DatabaseError, }; use reth_etl::Collector; +use reth_execution_errors::BlockExecError; use reth_primitives::StaticFileSegment; use reth_provider::{ providers::StaticFileProvider, BlockReader, DBProvider, ProviderError, @@ -39,18 +40,19 @@ const DEFAULT_CACHE_THRESHOLD: u64 = 100_000; /// /// As a result, the `Collector` will contain entries such as `(Address1.3, [1,2,3])` and /// `(Address1.300, [100,300])`. The entries may be stored across one or more files. -pub(crate) fn collect_history_indices( +pub(crate) fn collect_history_indices( provider: &Provider, range: impl RangeBounds, sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key, partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P), etl_config: &EtlConfig, -) -> Result, StageError> +) -> Result, StageError> where Provider: DBProvider, CS: Table, H: Table, P: Copy + Eq + Hash, + E: BlockExecError, { let mut changeset_cursor = provider.tx_ref().cursor_read::()?; @@ -65,7 +67,7 @@ where BlockNumberList::new_pre_sorted(indices.iter().copied()), )?; } - Ok::<(), StageError>(()) + Ok::<(), StageError>(()) }; // observability @@ -106,18 +108,19 @@ where /// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length /// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial /// key shard is stored. -pub(crate) fn load_history_indices( +pub(crate) fn load_history_indices( provider: &Provider, mut collector: Collector, append_only: bool, sharded_key_factory: impl Clone + Fn(P, u64) -> ::Key, decode_key: impl Fn(Vec) -> Result<::Key, DatabaseError>, get_partial: impl Fn(::Key) -> P, -) -> Result<(), StageError> +) -> Result<(), StageError> where Provider: DBProvider, H: Table, P: Copy + Default + Eq, + E: BlockExecError, { let mut write_cursor = provider.tx_ref().cursor_write::()?; let mut current_partial = P::default(); @@ -191,14 +194,14 @@ where } /// Shard and insert the indices list according to [`LoadMode`] and its length. -pub(crate) fn load_indices( +pub(crate) fn load_indices( cursor: &mut C, partial_key: P, list: &mut Vec, sharded_key_factory: &impl Fn(P, BlockNumber) -> ::Key, append_only: bool, mode: LoadMode, -) -> Result<(), StageError> +) -> Result<(), StageError> where C: DbCursorRO + DbCursorRW, H: Table, @@ -251,14 +254,15 @@ impl LoadMode { /// Called when database is ahead of static files. Attempts to find the first block we are missing /// transactions for. -pub(crate) fn missing_static_data_error( +pub(crate) fn missing_static_data_error( last_tx_num: TxNumber, static_file_provider: &StaticFileProvider, provider: &Provider, segment: StaticFileSegment, -) -> Result +) -> Result, ProviderError> where Provider: BlockReader + StaticFileProviderFactory, + E: BlockExecError, { let mut last_block = static_file_provider.get_highest_static_file_block(segment).unwrap_or_default(); diff --git a/crates/stages/stages/src/test_utils/runner.rs b/crates/stages/stages/src/test_utils/runner.rs index c3d25b99536a..39492a1013d9 100644 --- a/crates/stages/stages/src/test_utils/runner.rs +++ b/crates/stages/stages/src/test_utils/runner.rs @@ -1,5 +1,6 @@ use super::TestStageDB; use reth_db::{test_utils::TempDatabase, Database, DatabaseEnv}; +use reth_execution_errors::BlockExecError; use reth_provider::{test_utils::MockNodeTypesWithDB, DatabaseProvider, ProviderError}; use reth_stages_api::{ ExecInput, ExecOutput, Stage, StageError, StageExt, UnwindInput, UnwindOutput, @@ -19,8 +20,11 @@ pub(crate) enum TestRunnerError { /// A generic test runner for stages. pub(crate) trait StageTestRunner { - type S: Stage as Database>::TXMut, MockNodeTypesWithDB>> - + 'static; + type E: BlockExecError; + type S: Stage< + DatabaseProvider< as Database>::TXMut, MockNodeTypesWithDB>, + Self::E, + > + 'static; /// Return a reference to the database. fn db(&self) -> &TestStageDB; @@ -43,7 +47,10 @@ pub(crate) trait ExecuteStageTestRunner: StageTestRunner { ) -> Result<(), TestRunnerError>; /// Run [`Stage::execute`] and return a receiver for the result. - fn execute(&self, input: ExecInput) -> oneshot::Receiver> { + fn execute( + &self, + input: ExecInput, + ) -> oneshot::Receiver>> { let (tx, rx) = oneshot::channel(); let (db, mut stage) = (self.db().factory.clone(), self.stage()); tokio::spawn(async move { @@ -69,7 +76,7 @@ pub(crate) trait UnwindStageTestRunner: StageTestRunner { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError>; /// Run [`Stage::unwind`] and return a receiver for the result. - async fn unwind(&self, input: UnwindInput) -> Result { + async fn unwind(&self, input: UnwindInput) -> Result> { let (tx, rx) = oneshot::channel(); let (db, mut stage) = (self.db().factory.clone(), self.stage()); tokio::spawn(async move { diff --git a/crates/stages/stages/src/test_utils/set.rs b/crates/stages/stages/src/test_utils/set.rs index ef6d278ba448..75dfc2f97740 100644 --- a/crates/stages/stages/src/test_utils/set.rs +++ b/crates/stages/stages/src/test_utils/set.rs @@ -1,25 +1,35 @@ use super::TEST_STAGE_ID; use crate::{StageSet, StageSetBuilder}; +use reth_execution_errors::BlockExecError; use reth_stages_api::{test_utils::TestStage, ExecOutput, StageError, UnwindOutput}; use std::collections::VecDeque; #[derive(Default, Debug)] -pub struct TestStages { - exec_outputs: VecDeque>, - unwind_outputs: VecDeque>, +pub struct TestStages +where + E: BlockExecError, +{ + exec_outputs: VecDeque>>, + unwind_outputs: VecDeque>>, } -impl TestStages { +impl TestStages +where + E: BlockExecError, +{ pub const fn new( - exec_outputs: VecDeque>, - unwind_outputs: VecDeque>, + exec_outputs: VecDeque>>, + unwind_outputs: VecDeque>>, ) -> Self { Self { exec_outputs, unwind_outputs } } } -impl StageSet for TestStages { - fn builder(self) -> StageSetBuilder { +impl StageSet for TestStages +where + E: BlockExecError, +{ + fn builder(self) -> StageSetBuilder { StageSetBuilder::default().add_stage( TestStage::new(TEST_STAGE_ID) .with_exec(self.exec_outputs) diff --git a/examples/custom-beacon-withdrawals/src/main.rs b/examples/custom-beacon-withdrawals/src/main.rs index 7bb8a77d2598..bbd76bdbc660 100644 --- a/examples/custom-beacon-withdrawals/src/main.rs +++ b/examples/custom-beacon-withdrawals/src/main.rs @@ -89,6 +89,7 @@ pub struct CustomExecutorStrategyFactory { } impl BlockExecutionStrategyFactory for CustomExecutorStrategyFactory { + type Error = BlockExecutionError; type Primitives = EthPrimitives; type Strategy + Display>> = CustomExecutorStrategy;