From 3a0767494baf1bd9c2c392bf132af62a6959e149 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Mon, 18 Nov 2024 16:41:05 +0800 Subject: [PATCH] Pure state sync refactoring (part-1) (#6249) This pure refactoring of state sync is preparing for https://github.com/paritytech/polkadot-sdk/issues/4. As the rough plan in https://github.com/paritytech/polkadot-sdk/issues/4#issuecomment-2439588876, there will be two PRs for the state sync refactoring. This first PR focuses on isolating the function `process_state_key_values()` as the central point for storing received state data in memory. This function will later be adapted to forward the state data directly to the DB layer for persistent sync. A follow-up PR will handle the encapsulation of `StateSyncMetadata` to support this persistent storage. Although there are many commits in this PR, each commit is small and intentionally incremental to facilitate a smoother review, please review them commit by commit. Each commit should represent an equivalent rewrite of the existing logic, with one exception https://github.com/paritytech/polkadot-sdk/commit/bb447b2daa01fecf3bf0ca20c451ac6147f3f8c7, which has a slight deviation from the original but is correct IMHO. Please give this commit special attention during the review. --- prdoc/pr_6249.prdoc | 10 ++ .../network/sync/src/strategy/state_sync.rs | 165 +++++++++--------- 2 files changed, 88 insertions(+), 87 deletions(-) create mode 100644 prdoc/pr_6249.prdoc diff --git a/prdoc/pr_6249.prdoc b/prdoc/pr_6249.prdoc new file mode 100644 index 0000000000000..52fa10b226270 --- /dev/null +++ b/prdoc/pr_6249.prdoc @@ -0,0 +1,10 @@ +title: Pure state sync refactoring (part-1) + +doc: +- audience: Node Dev + description: | + The pure refactoring of state sync is preparing for https://github.com/paritytech/polkadot-sdk/issues/4. This is the first part, focusing on isolating the function `process_state_key_values()` as the central point for storing received state data in memory. This function will later be adapted to forward the state data directly to the DB layer to resolve the OOM issue and support persistent state sync. + +crates: +- name: sc-network-sync + bump: none diff --git a/substrate/client/network/sync/src/strategy/state_sync.rs b/substrate/client/network/sync/src/strategy/state_sync.rs index 1ed1de7c8efaa..7a0cc11916094 100644 --- a/substrate/client/network/sync/src/strategy/state_sync.rs +++ b/substrate/client/network/sync/src/strategy/state_sync.rs @@ -19,12 +19,12 @@ //! State sync support. use crate::{ - schema::v1::{StateEntry, StateRequest, StateResponse}, + schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse}, LOG_TARGET, }; use codec::{Decode, Encode}; use log::debug; -use sc_client_api::{CompactProof, ProofProvider}; +use sc_client_api::{CompactProof, KeyValueStates, ProofProvider}; use sc_consensus::ImportedState; use smallvec::SmallVec; use sp_core::storage::well_known_keys; @@ -132,6 +132,80 @@ where skip_proof, } } + + fn process_state_key_values( + &mut self, + state_root: Vec, + key_values: impl IntoIterator, Vec)>, + ) { + let is_top = state_root.is_empty(); + + let entry = self.state.entry(state_root).or_default(); + + if entry.0.len() > 0 && entry.1.len() > 1 { + // Already imported child_trie with same root. + // Warning this will not work with parallel download. + return; + } + + let mut child_storage_roots = Vec::new(); + + for (key, value) in key_values { + // Skip all child key root (will be recalculated on import) + if is_top && well_known_keys::is_child_storage_key(key.as_slice()) { + child_storage_roots.push((value, key)); + } else { + self.imported_bytes += key.len() as u64; + entry.0.push((key, value)); + } + } + + for (root, storage_key) in child_storage_roots { + self.state.entry(root).or_default().1.push(storage_key); + } + } + + fn process_state_verified(&mut self, values: KeyValueStates) { + for values in values.0 { + self.process_state_key_values(values.state_root, values.key_values); + } + } + + fn process_state_unverified(&mut self, response: StateResponse) -> bool { + let mut complete = true; + // if the trie is a child trie and one of its parent trie is empty, + // the parent cursor stays valid. + // Empty parent trie content only happens when all the response content + // is part of a single child trie. + if self.last_key.len() == 2 && response.entries[0].entries.is_empty() { + // Do not remove the parent trie position. + self.last_key.pop(); + } else { + self.last_key.clear(); + } + for state in response.entries { + debug!( + target: LOG_TARGET, + "Importing state from {:?} to {:?}", + state.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)), + state.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)), + ); + + if !state.complete { + if let Some(e) = state.entries.last() { + self.last_key.push(e.key.clone()); + } + complete = false; + } + + let KeyValueStateEntry { state_root, entries, complete: _ } = state; + self.process_state_key_values( + state_root, + entries.into_iter().map(|StateEntry { key, value }| (key, value)), + ); + } + complete + } } impl StateSyncProvider for StateSync @@ -181,94 +255,11 @@ where debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed); }; - for values in values.0 { - let key_values = if values.state_root.is_empty() { - // Read child trie roots. - values - .key_values - .into_iter() - .filter(|key_value| { - if well_known_keys::is_child_storage_key(key_value.0.as_slice()) { - self.state - .entry(key_value.1.clone()) - .or_default() - .1 - .push(key_value.0.clone()); - false - } else { - true - } - }) - .collect() - } else { - values.key_values - }; - let entry = self.state.entry(values.state_root).or_default(); - if entry.0.len() > 0 && entry.1.len() > 1 { - // Already imported child_trie with same root. - // Warning this will not work with parallel download. - } else if entry.0.is_empty() { - for (key, _value) in key_values.iter() { - self.imported_bytes += key.len() as u64; - } - - entry.0 = key_values; - } else { - for (key, value) in key_values { - self.imported_bytes += key.len() as u64; - entry.0.push((key, value)) - } - } - } + self.process_state_verified(values); self.imported_bytes += proof_size; complete } else { - let mut complete = true; - // if the trie is a child trie and one of its parent trie is empty, - // the parent cursor stays valid. - // Empty parent trie content only happens when all the response content - // is part of a single child trie. - if self.last_key.len() == 2 && response.entries[0].entries.is_empty() { - // Do not remove the parent trie position. - self.last_key.pop(); - } else { - self.last_key.clear(); - } - for state in response.entries { - debug!( - target: LOG_TARGET, - "Importing state from {:?} to {:?}", - state.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)), - state.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)), - ); - - if !state.complete { - if let Some(e) = state.entries.last() { - self.last_key.push(e.key.clone()); - } - complete = false; - } - let is_top = state.state_root.is_empty(); - let entry = self.state.entry(state.state_root).or_default(); - if entry.0.len() > 0 && entry.1.len() > 1 { - // Already imported child trie with same root. - } else { - let mut child_roots = Vec::new(); - for StateEntry { key, value } in state.entries { - // Skip all child key root (will be recalculated on import). - if is_top && well_known_keys::is_child_storage_key(key.as_slice()) { - child_roots.push((value, key)); - } else { - self.imported_bytes += key.len() as u64; - entry.0.push((key, value)) - } - } - for (root, storage_key) in child_roots { - self.state.entry(root).or_default().1.push(storage_key); - } - } - } - complete + self.process_state_unverified(response) }; if complete { self.complete = true;