Skip to content

Commit

Permalink
chore: move state root task result handling to fn (#13892)
Browse files Browse the repository at this point in the history
Co-authored-by: Roman Krasiuk <[email protected]>
Co-authored-by: Federico Gimenez <[email protected]>
  • Loading branch information
3 people authored Jan 21, 2025
1 parent 50dae68 commit c4b147c
Showing 1 changed file with 76 additions and 52 deletions.
128 changes: 76 additions & 52 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ use reth_trie::{
hashed_cursor::HashedPostStateCursorFactory,
prefix_set::TriePrefixSetsMut,
proof::ProofBlindedProviderFactory,
trie_cursor::InMemoryTrieCursorFactory,
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
updates::{TrieUpdates, TrieUpdatesSorted},
HashedPostState, HashedPostStateSorted, TrieInput,
};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::EvmState;
use root::{StateRootComputeOutcome, StateRootConfig, StateRootTask};
use root::{StateRootComputeOutcome, StateRootConfig, StateRootHandle, StateRootTask};
use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, VecDeque},
Expand All @@ -67,7 +67,7 @@ use std::{
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
Arc,
},
time::Instant,
time::{Duration, Instant},
};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Expand Down Expand Up @@ -2364,55 +2364,20 @@ where
// different view of the database.
let (state_root, trie_updates, root_elapsed) = if persistence_not_in_progress {
if self.config.use_state_root_task() {
match state_root_handle
.expect("state root handle must exist if use_state_root_task is true")
.wait_for_result()
{
Ok(StateRootComputeOutcome {
state_root: (task_state_root, task_trie_updates),
time_from_last_update,
..
}) => {
info!(
target: "engine::tree",
block = ?sealed_block.num_hash(),
?task_state_root,
task_elapsed = ?time_from_last_update,
"Task state root finished"
);

if task_state_root != block.header().state_root() ||
self.config.always_compare_trie_updates()
{
if task_state_root != block.header().state_root() {
debug!(target: "engine::tree", "Task state root does not match block state root");
}

let (regular_root, regular_updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;

if regular_root == block.header().state_root() {
compare_trie_updates(
in_memory_trie_cursor.expect("in memory trie cursor must exist if use_state_root_task is true"),
task_trie_updates.clone(),
regular_updates,
)
.map_err(ProviderError::from)?;
} else {
debug!(target: "engine::tree", "Regular state root does not match block state root");
}
}

(task_state_root, task_trie_updates, time_from_last_update)
}
Err(error) => {
info!(target: "engine::tree", ?error, "Failed to wait for state root task result");
// Fall back to sequential calculation
let (root, updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;
(root, updates, root_time.elapsed())
}
}
let state_root_handle = state_root_handle
.expect("state root handle must exist if use_state_root_task is true");
let in_memory_trie_cursor = in_memory_trie_cursor
.expect("in memory trie cursor must exist if use_state_root_task is true");

// Handle state root result from task using handle
self.handle_state_root_result(
state_root_handle,
sealed_block.as_ref(),
&hashed_state,
&state_provider,
in_memory_trie_cursor,
root_time,
)?
} else {
match self
.compute_state_root_parallel(block.header().parent_hash(), &hashed_state)
Expand Down Expand Up @@ -2589,6 +2554,65 @@ where
))
}

/// Waits for the result on the input [`StateRootHandle`], and handles it, falling back to
/// the hash builder-based state root calculation if it fails.
fn handle_state_root_result(
&self,
state_root_handle: StateRootHandle,
sealed_block: &SealedBlock<N::Block>,
hashed_state: &HashedPostState,
state_provider: impl StateRootProvider,
in_memory_trie_cursor: impl TrieCursorFactory,
root_time: Instant,
) -> Result<(B256, TrieUpdates, Duration), InsertBlockErrorKind> {
match state_root_handle.wait_for_result() {
Ok(StateRootComputeOutcome {
state_root: (task_state_root, task_trie_updates),
time_from_last_update,
..
}) => {
info!(
target: "engine::tree",
block = ?sealed_block.num_hash(),
?task_state_root,
task_elapsed = ?time_from_last_update,
"Task state root finished"
);

if task_state_root != sealed_block.header().state_root() ||
self.config.always_compare_trie_updates()
{
if task_state_root != sealed_block.header().state_root() {
debug!(target: "engine::tree", "Task state root does not match block state root");
}

let (regular_root, regular_updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;

if regular_root == sealed_block.header().state_root() {
compare_trie_updates(
in_memory_trie_cursor,
task_trie_updates.clone(),
regular_updates,
)
.map_err(ProviderError::from)?;
} else {
debug!(target: "engine::tree", "Regular state root does not match block state root");
}
}

Ok((task_state_root, task_trie_updates, time_from_last_update))
}
Err(error) => {
info!(target: "engine::tree", ?error, "Failed to wait for state root task result");
// Fall back to sequential calculation
let (root, updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;
Ok((root, updates, root_time.elapsed()))
}
}
}

/// Attempts to find the header for the given block hash if it is canonical.
pub fn find_canonical_header(
&self,
Expand Down

0 comments on commit c4b147c

Please sign in to comment.