From dca666cbb6d87056f851274ac68bee786f733817 Mon Sep 17 00:00:00 2001 From: cryptoAtwill <108330426+cryptoAtwill@users.noreply.github.com> Date: Thu, 30 Nov 2023 04:26:36 +0800 Subject: [PATCH] Remove look ahead (#442) * remove look ahead in query topdown * use three pointers * up crate * fix error * Update fendermint/vm/topdown/src/sync/syncer.rs Co-authored-by: Akosh Farkash * fix from integration testing * remove to_confirm pointer * restrict memory size * dont propose until interval reached * update latest height * split tendermint query to separete file * Update fendermint/vm/topdown/src/sync/syncer.rs Co-authored-by: adlrocha <6717133+adlrocha@users.noreply.github.com> * fix test linting * fix tests * more tests * check hash when executing * sync * sort by nonce * fmt * more unit tests * New proposal logic (#447) * new proposal logic * fmt * fix typo * minor changes * adding check * ignore proposal height * Fix type * expose config params --------- Co-authored-by: Alfonso de la Rocha --------- Signed-off-by: Alfonso de la Rocha Co-authored-by: Akosh Farkash Co-authored-by: adlrocha <6717133+adlrocha@users.noreply.github.com> Co-authored-by: Alfonso de la Rocha --- fendermint/app/settings/src/lib.rs | 6 + fendermint/app/src/cmd/run.rs | 14 +- fendermint/vm/interpreter/src/chain.rs | 19 +- fendermint/vm/topdown/src/cache.rs | 4 + fendermint/vm/topdown/src/error.rs | 4 - fendermint/vm/topdown/src/finality/fetch.rs | 296 ++++++++-- fendermint/vm/topdown/src/finality/mod.rs | 177 +----- fendermint/vm/topdown/src/finality/null.rs | 326 +++++++++-- fendermint/vm/topdown/src/lib.rs | 63 ++- fendermint/vm/topdown/src/proxy.rs | 30 +- fendermint/vm/topdown/src/sync.rs | 550 ------------------- fendermint/vm/topdown/src/sync/mod.rs | 172 ++++++ fendermint/vm/topdown/src/sync/pointers.rs | 50 ++ fendermint/vm/topdown/src/sync/syncer.rs | 524 ++++++++++++++++++ fendermint/vm/topdown/src/sync/tendermint.rs | 45 ++ fendermint/vm/topdown/src/toggle.rs | 36 +- infra/Makefile.toml | 2 + infra/scripts/fendermint.toml | 2 + 18 files changed, 1445 insertions(+), 875 deletions(-) delete mode 100644 fendermint/vm/topdown/src/sync.rs create mode 100644 fendermint/vm/topdown/src/sync/mod.rs create mode 100644 fendermint/vm/topdown/src/sync/pointers.rs create mode 100644 fendermint/vm/topdown/src/sync/syncer.rs create mode 100644 fendermint/vm/topdown/src/sync/tendermint.rs diff --git a/fendermint/app/settings/src/lib.rs b/fendermint/app/settings/src/lib.rs index 1f45d2df..c95d17e7 100644 --- a/fendermint/app/settings/src/lib.rs +++ b/fendermint/app/settings/src/lib.rs @@ -112,6 +112,12 @@ pub struct TopDownConfig { /// conservative and avoid other from rejecting the proposal because they don't see the /// height as final yet. pub chain_head_delay: BlockHeight, + /// The number of blocks on top of `chain_head_delay` to wait before proposing a height + /// as final on the parent chain, to avoid slight disagreements between validators whether + /// a block is final, or not just yet. + pub proposal_delay: BlockHeight, + /// The max number of blocks one should make the topdown proposal + pub max_proposal_range: BlockHeight, /// Parent syncing cron period, in seconds #[serde_as(as = "DurationSeconds")] pub polling_interval: Duration, diff --git a/fendermint/app/src/cmd/run.rs b/fendermint/app/src/cmd/run.rs index b5256536..54574ee8 100644 --- a/fendermint/app/src/cmd/run.rs +++ b/fendermint/app/src/cmd/run.rs @@ -162,12 +162,14 @@ async fn run(settings: Settings) -> anyhow::Result<()> { let (parent_finality_provider, ipc_tuple) = if settings.ipc.is_topdown_enabled() { info!("topdown finality enabled"); let topdown_config = settings.ipc.topdown_config()?; - let config = fendermint_vm_topdown::Config { - chain_head_delay: topdown_config.chain_head_delay, - polling_interval: topdown_config.polling_interval, - exponential_back_off: topdown_config.exponential_back_off, - exponential_retry_limit: topdown_config.exponential_retry_limit, - }; + let config = fendermint_vm_topdown::Config::new( + topdown_config.chain_head_delay, + topdown_config.polling_interval, + topdown_config.exponential_back_off, + topdown_config.exponential_retry_limit, + ) + .with_proposal_delay(topdown_config.proposal_delay) + .with_max_proposal_range(topdown_config.max_proposal_range); let ipc_provider = Arc::new(create_ipc_provider_proxy(&settings)?); let finality_provider = CachedFinalityProvider::uninitialized(config.clone(), ipc_provider.clone()).await?; diff --git a/fendermint/vm/interpreter/src/chain.rs b/fendermint/vm/interpreter/src/chain.rs index 3bbc448c..5d7dfd57 100644 --- a/fendermint/vm/interpreter/src/chain.rs +++ b/fendermint/vm/interpreter/src/chain.rs @@ -264,14 +264,21 @@ where "chain interpreter committed topdown finality", ); + // The commitment of the finality for block `N` triggers + // the execution of all side-effects up till `N-1`, as for + // deferred execution chains, this is the latest state that + // we know for sure that we have available. + let execution_fr = prev_height; + let execution_to = finality.height - 1; + // error happens if we cannot get the validator set from ipc agent after retries let validator_changes = provider - .validator_changes_from(prev_height + 1, finality.height) + .validator_changes_from(execution_fr, execution_to) .await .context("failed to fetch validator changes")?; tracing::debug!( - from = prev_height + 1, - to = finality.height, + from = execution_fr, + to = execution_to, msgs = validator_changes.len(), "chain interpreter received total validator changes" ); @@ -282,13 +289,13 @@ where // error happens if we cannot get the cross messages from ipc agent after retries let msgs = provider - .top_down_msgs_from(prev_height + 1, p.height as u64, &finality.block_hash) + .top_down_msgs_from(execution_fr, execution_to) .await .context("failed to fetch top down messages")?; tracing::debug!( number_of_messages = msgs.len(), - start = prev_height + 1, - end = p.height, + start = execution_fr, + end = execution_to, "chain interpreter received topdown msgs", ); diff --git a/fendermint/vm/topdown/src/cache.rs b/fendermint/vm/topdown/src/cache.rs index 235a2a5b..a6a48968 100644 --- a/fendermint/vm/topdown/src/cache.rs +++ b/fendermint/vm/topdown/src/cache.rs @@ -50,6 +50,10 @@ impl SequentialKeyCache { self.increment } + pub fn size(&self) -> usize { + self.data.len() + } + pub fn is_empty(&self) -> bool { self.data.is_empty() } diff --git a/fendermint/vm/topdown/src/error.rs b/fendermint/vm/topdown/src/error.rs index b826e351..b0780599 100644 --- a/fendermint/vm/topdown/src/error.rs +++ b/fendermint/vm/topdown/src/error.rs @@ -15,8 +15,4 @@ pub enum Error { ParentChainReorgDetected, #[error("Cannot query parent at height {1}: {0}")] CannotQueryParent(String, BlockHeight), - /// This error happens when querying top down messages, the block ahead are all null rounds. - /// See `parent_views_at_height` for detailed explanation - #[error("Look ahead limit reached from {0}: {1}")] - LookAheadLimitReached(BlockHeight, BlockHeight), } diff --git a/fendermint/vm/topdown/src/finality/fetch.rs b/fendermint/vm/topdown/src/finality/fetch.rs index a0a7a349..018317d3 100644 --- a/fendermint/vm/topdown/src/finality/fetch.rs +++ b/fendermint/vm/topdown/src/finality/fetch.rs @@ -78,7 +78,7 @@ impl ParentViewProvider for CachedF tracing::debug!( number_of_messages = r.len(), height = h, - "obtained validator change set", + "fetched validator change set", ); v.append(&mut r); } @@ -86,61 +86,17 @@ impl ParentViewProvider for CachedF Ok(v) } - async fn validator_changes( - &self, - height: BlockHeight, - ) -> anyhow::Result> { - let r = self.inner.validator_changes(height).await?; - - if let Some(v) = r { - return Ok(v); - } - - let r = retry!( - self.config.exponential_back_off, - self.config.exponential_retry_limit, - self.parent_client - .get_validator_changes(height) - .await - .map(|r| r.value) - ); - - handle_null_round(r, Vec::new) - } - - /// Should always return the top down messages, only when ipc parent_client is down after exponential - /// retries - async fn top_down_msgs( - &self, - height: BlockHeight, - block_hash: &BlockHash, - ) -> anyhow::Result> { - let r = self.inner.top_down_msgs(height).await?; - - if let Some(v) = r { - return Ok(v); - } - - let r = retry!( - self.config.exponential_back_off, - self.config.exponential_retry_limit, - self.parent_client - .get_top_down_msgs_with_hash(height, block_hash) - .await - ); - - handle_null_round(r, Vec::new) - } - + /// Get top down message in the range `from` to `to`, both inclusive. For the check to be valid, one + /// should not pass a height `to` that is a null block, otherwise the check is useless. In debug + /// mode, it will throw an error. async fn top_down_msgs_from( &self, from: BlockHeight, to: BlockHeight, - block_hash: &BlockHash, ) -> anyhow::Result> { let mut v = vec![]; for h in from..=to { - let mut r = self.top_down_msgs(h, block_hash).await?; + let mut r = self.top_down_msgs(h).await?; tracing::debug!( number_of_top_down_messages = r.len(), height = h, @@ -181,6 +137,51 @@ impl CachedFinalityProvider { let genesis = parent_client.get_genesis_epoch().await?; Ok(Self::new(config, genesis, None, parent_client)) } + + /// Should always return the top down messages, only when ipc parent_client is down after exponential + /// retries + async fn validator_changes( + &self, + height: BlockHeight, + ) -> anyhow::Result> { + let r = self.inner.validator_changes(height).await?; + + if let Some(v) = r { + return Ok(v); + } + + let r = retry!( + self.config.exponential_back_off, + self.config.exponential_retry_limit, + self.parent_client + .get_validator_changes(height) + .await + .map(|r| r.value) + ); + + handle_null_round(r, Vec::new) + } + + /// Should always return the top down messages, only when ipc parent_client is down after exponential + /// retries + async fn top_down_msgs(&self, height: BlockHeight) -> anyhow::Result> { + let r = self.inner.top_down_msgs(height).await?; + + if let Some(v) = r { + return Ok(v); + } + + let r = retry!( + self.config.exponential_back_off, + self.config.exponential_retry_limit, + self.parent_client + .get_top_down_msgs(height) + .await + .map(|r| r.value) + ); + + handle_null_round(r, Vec::new) + } } impl CachedFinalityProvider { @@ -190,7 +191,7 @@ impl CachedFinalityProvider { committed_finality: Option, parent_client: Arc, ) -> Self { - let inner = FinalityWithNull::new(genesis_epoch, committed_finality); + let inner = FinalityWithNull::new(config.clone(), genesis_epoch, committed_finality); Self { inner, config, @@ -202,10 +203,11 @@ impl CachedFinalityProvider { self.inner.block_hash_at_height(height) } - pub fn first_non_null_parent_hash(&self, height: BlockHeight) -> Stm> { - self.inner.first_non_null_parent_hash(height) + pub fn latest_height_in_cache(&self) -> Stm> { + self.inner.latest_height_in_cache() } + /// Get the latest height tracked in the provider, includes both cache and last committed finality pub fn latest_height(&self) -> Stm> { self.inner.latest_height() } @@ -226,13 +228,160 @@ impl CachedFinalityProvider { ) -> StmResult<(), Error> { self.inner.new_parent_view(height, maybe_payload) } + + /// Returns the number of blocks cached. + pub fn cached_blocks(&self) -> Stm { + self.inner.cached_blocks() + } } #[cfg(test)] mod tests { + use crate::finality::ParentViewPayload; + use crate::proxy::ParentQueryProxy; + use crate::{ + BlockHeight, CachedFinalityProvider, Config, IPCParentFinality, ParentViewProvider, + SequentialKeyCache, NULL_ROUND_ERR_MSG, + }; + use anyhow::anyhow; + use async_trait::async_trait; + use fvm_shared::address::Address; + use fvm_shared::econ::TokenAmount; + use ipc_provider::manager::{GetBlockHashResult, TopDownQueryPayload}; + use ipc_sdk::cross::{CrossMsg, StorableMsg}; + use ipc_sdk::staking::{StakingChange, StakingChangeRequest, StakingOperation}; + use ipc_sdk::subnet_id::SubnetID; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; use std::time::Duration; + /// Creates a mock of a new parent blockchain view. The key is the height and the value is the + /// block hash. If block hash is None, it means the current height is a null block. + macro_rules! new_parent_blocks { + ($($key:expr => $val:expr),* ,) => ( + hash_map!($($key => $val),*) + ); + ($($key:expr => $val:expr),*) => ({ + let mut map = SequentialKeyCache::sequential(); + $( map.append($key, $val).unwrap(); )* + map + }); + } + + struct TestParentProxy { + blocks: SequentialKeyCache>, + } + + #[async_trait] + impl ParentQueryProxy for TestParentProxy { + async fn get_chain_head_height(&self) -> anyhow::Result { + Ok(self.blocks.upper_bound().unwrap()) + } + + async fn get_genesis_epoch(&self) -> anyhow::Result { + Ok(self.blocks.lower_bound().unwrap() - 1) + } + + async fn get_block_hash(&self, height: BlockHeight) -> anyhow::Result { + let r = self.blocks.get_value(height).unwrap(); + if r.is_none() { + return Err(anyhow!(NULL_ROUND_ERR_MSG)); + } + + for h in (self.blocks.lower_bound().unwrap()..height).rev() { + let v = self.blocks.get_value(h).unwrap(); + if v.is_none() { + continue; + } + return Ok(GetBlockHashResult { + parent_block_hash: v.clone().unwrap().0, + block_hash: r.clone().unwrap().0, + }); + } + panic!("invalid testing data") + } + + async fn get_top_down_msgs( + &self, + height: BlockHeight, + ) -> anyhow::Result>> { + let r = self.blocks.get_value(height).cloned().unwrap(); + if r.is_none() { + return Err(anyhow!(NULL_ROUND_ERR_MSG)); + } + let r = r.unwrap(); + Ok(TopDownQueryPayload { + value: r.2, + block_hash: r.0, + }) + } + + async fn get_validator_changes( + &self, + height: BlockHeight, + ) -> anyhow::Result>> { + let r = self.blocks.get_value(height).cloned().unwrap(); + if r.is_none() { + return Err(anyhow!(NULL_ROUND_ERR_MSG)); + } + let r = r.unwrap(); + Ok(TopDownQueryPayload { + value: r.1, + block_hash: r.0, + }) + } + } + + fn new_provider( + blocks: SequentialKeyCache>, + ) -> CachedFinalityProvider { + let config = Config { + chain_head_delay: 2, + polling_interval: Default::default(), + exponential_back_off: Default::default(), + exponential_retry_limit: 0, + max_proposal_range: Some(1), + max_cache_blocks: None, + proposal_delay: None, + }; + let genesis_epoch = blocks.lower_bound().unwrap(); + let proxy = Arc::new(TestParentProxy { blocks }); + let committed_finality = IPCParentFinality { + height: genesis_epoch, + block_hash: vec![0; 32], + }; + + CachedFinalityProvider::new(config, genesis_epoch, Some(committed_finality), proxy) + } + + fn new_cross_msg(nonce: u64) -> CrossMsg { + let subnet_id = SubnetID::new(10, vec![Address::new_id(1000)]); + let mut msg = StorableMsg::new_fund_msg( + &subnet_id, + &Address::new_id(1), + &Address::new_id(2), + TokenAmount::from_atto(100), + ) + .unwrap(); + msg.nonce = nonce; + + CrossMsg { + msg, + wrapped: false, + } + } + + fn new_validator_changes(configuration_number: u64) -> StakingChangeRequest { + StakingChangeRequest { + configuration_number, + change: StakingChange { + op: StakingOperation::Deposit, + payload: vec![], + validator: Address::new_id(1), + }, + } + } + #[tokio::test] async fn test_retry() { struct Test { @@ -255,4 +404,47 @@ mod tests { // execute the first time, retries twice assert_eq!(t.nums_run.load(Ordering::SeqCst), 3); } + + #[tokio::test] + async fn test_query_topdown_msgs() { + let parent_blocks = new_parent_blocks!( + 100 => Some((vec![0; 32], vec![], vec![new_cross_msg(0)])), // genesis block + 101 => Some((vec![1; 32], vec![], vec![new_cross_msg(1)])), + 102 => Some((vec![2; 32], vec![], vec![new_cross_msg(2)])), + 103 => Some((vec![3; 32], vec![], vec![new_cross_msg(3)])), + 104 => None, + 105 => None, + 106 => Some((vec![6; 32], vec![], vec![new_cross_msg(6)])) + ); + let provider = new_provider(parent_blocks); + let messages = provider.top_down_msgs_from(100, 106).await.unwrap(); + + assert_eq!( + messages, + vec![ + new_cross_msg(0), + new_cross_msg(1), + new_cross_msg(2), + new_cross_msg(3), + new_cross_msg(6), + ] + ) + } + + #[tokio::test] + async fn test_query_validator_changes() { + let parent_blocks = new_parent_blocks!( + 100 => Some((vec![0; 32], vec![new_validator_changes(0)], vec![])), // genesis block + 101 => Some((vec![1; 32], vec![new_validator_changes(1)], vec![])), + 102 => Some((vec![2; 32], vec![], vec![])), + 103 => Some((vec![3; 32], vec![new_validator_changes(3)], vec![])), + 104 => None, + 105 => None, + 106 => Some((vec![6; 32], vec![new_validator_changes(6)], vec![])) + ); + let provider = new_provider(parent_blocks); + let messages = provider.validator_changes_from(100, 106).await.unwrap(); + + assert_eq!(messages.len(), 4) + } } diff --git a/fendermint/vm/topdown/src/finality/mod.rs b/fendermint/vm/topdown/src/finality/mod.rs index 1c764723..2e8c40f9 100644 --- a/fendermint/vm/topdown/src/finality/mod.rs +++ b/fendermint/vm/topdown/src/finality/mod.rs @@ -44,17 +44,13 @@ pub(crate) fn topdown_cross_msgs(p: &ParentViewPayload) -> Vec { mod tests { use crate::proxy::ParentQueryProxy; use crate::{ - BlockHash, BlockHeight, CachedFinalityProvider, Config, IPCParentFinality, - ParentFinalityProvider, + BlockHeight, CachedFinalityProvider, Config, IPCParentFinality, ParentFinalityProvider, }; use async_stm::atomically_or_err; use async_trait::async_trait; - use fvm_shared::address::Address; - use fvm_shared::econ::TokenAmount; use ipc_provider::manager::{GetBlockHashResult, TopDownQueryPayload}; - use ipc_sdk::cross::{CrossMsg, StorableMsg}; + use ipc_sdk::cross::CrossMsg; use ipc_sdk::staking::StakingChangeRequest; - use ipc_sdk::subnet_id::SubnetID; use std::sync::Arc; use tokio::time::Duration; @@ -74,12 +70,14 @@ mod tests { Ok(GetBlockHashResult::default()) } - async fn get_top_down_msgs_with_hash( + async fn get_top_down_msgs( &self, _height: BlockHeight, - _block_hash: &BlockHash, - ) -> anyhow::Result> { - Ok(vec![]) + ) -> anyhow::Result>> { + Ok(TopDownQueryPayload { + value: vec![], + block_hash: vec![], + }) } async fn get_validator_changes( @@ -110,109 +108,14 @@ mod tests { polling_interval: Duration::from_secs(10), exponential_back_off: Duration::from_secs(10), exponential_retry_limit: 10, + max_proposal_range: None, + max_cache_blocks: None, + proposal_delay: None, }; CachedFinalityProvider::new(config, 10, Some(genesis_finality()), mocked_agent_proxy()) } - fn new_cross_msg(nonce: u64) -> CrossMsg { - let subnet_id = SubnetID::new(10, vec![Address::new_id(1000)]); - let mut msg = StorableMsg::new_fund_msg( - &subnet_id, - &Address::new_id(1), - &Address::new_id(2), - TokenAmount::from_atto(100), - ) - .unwrap(); - msg.nonce = nonce; - - CrossMsg { - msg, - wrapped: false, - } - } - - #[tokio::test] - async fn test_next_proposal_works() { - let provider = new_provider(); - - atomically_or_err(|| { - let r = provider.next_proposal()?; - assert!(r.is_none()); - - provider.new_parent_view(10, Some((vec![1u8; 32], vec![], vec![])))?; - - let r = provider.next_proposal()?; - assert!(r.is_some()); - - // inject data - for i in 11..=100 { - provider.new_parent_view(i, Some((vec![1u8; 32], vec![], vec![])))?; - } - - let proposal = provider.next_proposal()?.unwrap(); - let target_block = 100; - assert_eq!( - proposal, - IPCParentFinality { - height: target_block, - block_hash: vec![1u8; 32], - } - ); - - assert_eq!(provider.latest_height()?.unwrap(), 100); - - Ok(()) - }) - .await - .unwrap(); - } - - #[tokio::test] - async fn test_next_proposal_null_round_works() { - let provider = new_provider(); - - atomically_or_err(|| { - let r = provider.next_proposal()?; - assert!(r.is_none()); - - provider.new_parent_view(10, Some((vec![1u8; 32], vec![], vec![])))?; - - // inject data - for i in 11..=100 { - provider.new_parent_view(i, None)?; - } - // no proposal - assert_eq!(provider.next_proposal()?, None); - - let first_non_null_parent_hash = provider.first_non_null_parent_hash(100)?; - assert_eq!(first_non_null_parent_hash, Some(vec![1u8; 32])); - assert_eq!(provider.latest_height()?.unwrap(), 100); - - provider.new_parent_view(101, Some((vec![2u8; 32], vec![], vec![])))?; - let f = provider.next_proposal()?.unwrap(); - assert_eq!(f.block_hash, vec![2u8; 32]); - assert_eq!(f.height, 101); - - provider.set_new_finality( - IPCParentFinality { - height: 101, - block_hash: vec![2u8; 32], - }, - Some(genesis_finality()), - )?; - - for i in 102..=110 { - provider.new_parent_view(i, None)?; - } - let first_non_null_parent_hash = provider.first_non_null_parent_hash(100)?; - assert_eq!(first_non_null_parent_hash, Some(vec![2u8; 32])); - Ok(()) - }) - .await - .unwrap(); - } - #[tokio::test] async fn test_finality_works() { let provider = new_provider(); @@ -272,62 +175,4 @@ mod tests { .await .unwrap(); } - - #[tokio::test] - async fn test_top_down_msgs_works() { - let config = Config { - chain_head_delay: 2, - polling_interval: Duration::from_secs(10), - exponential_back_off: Duration::from_secs(10), - exponential_retry_limit: 10, - }; - - let genesis_finality = IPCParentFinality { - height: 0, - block_hash: vec![0; 32], - }; - - let provider = - CachedFinalityProvider::new(config, 10, Some(genesis_finality), mocked_agent_proxy()); - - let cross_msgs_batch1 = vec![new_cross_msg(0), new_cross_msg(1), new_cross_msg(2)]; - let cross_msgs_batch2 = vec![new_cross_msg(3), new_cross_msg(4), new_cross_msg(5)]; - let cross_msgs_batch3 = vec![new_cross_msg(6), new_cross_msg(7), new_cross_msg(8)]; - let cross_msgs_batch4 = vec![new_cross_msg(9), new_cross_msg(10), new_cross_msg(11)]; - - atomically_or_err(|| { - provider.new_parent_view( - 100, - Some((vec![1u8; 32], vec![], cross_msgs_batch1.clone())), - )?; - - provider.new_parent_view( - 101, - Some((vec![1u8; 32], vec![], cross_msgs_batch2.clone())), - )?; - - provider.new_parent_view( - 102, - Some((vec![1u8; 32], vec![], cross_msgs_batch3.clone())), - )?; - provider.new_parent_view( - 103, - Some((vec![1u8; 32], vec![], cross_msgs_batch4.clone())), - )?; - - let mut v1 = cross_msgs_batch1.clone(); - let v2 = cross_msgs_batch2.clone(); - v1.extend(v2); - let finality = IPCParentFinality { - height: 103, - block_hash: vec![1u8; 32], - }; - let next_proposal = provider.next_proposal()?.unwrap(); - assert_eq!(next_proposal, finality); - - Ok(()) - }) - .await - .unwrap(); - } } diff --git a/fendermint/vm/topdown/src/finality/null.rs b/fendermint/vm/topdown/src/finality/null.rs index d549900b..5bc778c0 100644 --- a/fendermint/vm/topdown/src/finality/null.rs +++ b/fendermint/vm/topdown/src/finality/null.rs @@ -4,14 +4,16 @@ use crate::finality::{ ensure_sequential, topdown_cross_msgs, validator_changes, ParentViewPayload, }; -use crate::{BlockHash, BlockHeight, Error, IPCParentFinality, SequentialKeyCache}; +use crate::{BlockHash, BlockHeight, Config, Error, IPCParentFinality, SequentialKeyCache}; use async_stm::{abort, atomically, Stm, StmResult, TVar}; use ipc_sdk::cross::CrossMsg; use ipc_sdk::staking::StakingChangeRequest; +use std::cmp::min; /// Finality provider that can handle null blocks #[derive(Clone)] pub struct FinalityWithNull { + config: Config, genesis_epoch: BlockHeight, /// Cached data that always syncs with the latest parent chain proactively cached_data: TVar>>, @@ -21,8 +23,13 @@ pub struct FinalityWithNull { } impl FinalityWithNull { - pub fn new(genesis_epoch: BlockHeight, committed_finality: Option) -> Self { + pub fn new( + config: Config, + genesis_epoch: BlockHeight, + committed_finality: Option, + ) -> Self { Self { + config, genesis_epoch, cached_data: TVar::new(SequentialKeyCache::sequential()), last_committed_finality: TVar::new(committed_finality), @@ -49,25 +56,6 @@ impl FinalityWithNull { Ok(r) } - pub fn first_non_null_parent_hash(&self, height: BlockHeight) -> Stm> { - let cache = self.cached_data.read()?; - if let Some(lower_bound) = cache.lower_bound() { - for h in (lower_bound..height).rev() { - if let Some(Some(p)) = cache.get_value(h) { - return Ok(Some(p.0.clone())); - } - } - } - - // nothing is found in cache, check the last committed finality - let last_committed_finality = self.last_committed_finality.read_clone()?; - if let Some(f) = last_committed_finality { - Ok(Some(f.block_hash)) - } else { - Ok(None) - } - } - pub fn last_committed_finality(&self) -> Stm> { self.last_committed_finality.read_clone() } @@ -91,19 +79,14 @@ impl FinalityWithNull { } pub fn next_proposal(&self) -> Stm> { - let height = if let Some(h) = self.latest_height()? { + let height = if let Some(h) = self.propose_next_height()? { h } else { - tracing::debug!("no proposal yet as height not available"); return Ok(None); }; - let block_hash = if let Some(h) = self.block_hash_at_height(height)? { - h - } else { - // Oops, we have a null round in parent, skip this proposal and wait for future blocks. - return Ok(None); - }; + // safe to unwrap as we make sure null height will not be proposed + let block_hash = self.block_hash_at_height(height)?.unwrap(); let proposal = IPCParentFinality { height, block_hash }; tracing::debug!(proposal = proposal.to_string(), "new proposal"); @@ -128,7 +111,8 @@ impl FinalityWithNull { let height = finality.height; self.cached_data.update(|mut cache| { - cache.remove_key_below(height + 1); + // only remove cache below height, but not at height, as we have delayed execution + cache.remove_key_below(height); cache })?; @@ -137,18 +121,109 @@ impl FinalityWithNull { } impl FinalityWithNull { + /// Returns the number of blocks cached. + pub(crate) fn cached_blocks(&self) -> Stm { + let cache = self.cached_data.read()?; + Ok(cache.size() as BlockHeight) + } + pub(crate) fn block_hash_at_height(&self, height: BlockHeight) -> Stm> { + if let Some(f) = self.last_committed_finality.read()?.as_ref() { + if f.height == height { + return Ok(Some(f.block_hash.clone())); + } + } + self.get_at_height(height, |i| i.0.clone()) } - pub(crate) fn latest_height(&self) -> Stm> { + pub(crate) fn latest_height_in_cache(&self) -> Stm> { let cache = self.cached_data.read()?; Ok(cache.upper_bound()) } + + /// Get the latest height tracked in the provider, includes both cache and last committed finality + pub(crate) fn latest_height(&self) -> Stm> { + let h = if let Some(h) = self.latest_height_in_cache()? { + h + } else if let Some(p) = self.last_committed_finality()? { + p.height + } else { + return Ok(None); + }; + Ok(Some(h)) + } } /// All the private functions impl FinalityWithNull { + /// Get the first non-null block in the range [start, end]. + fn first_non_null_block_before(&self, height: BlockHeight) -> Stm> { + let cache = self.cached_data.read()?; + Ok(cache.lower_bound().and_then(|lower_bound| { + for h in (lower_bound..height).rev() { + if let Some(Some(_)) = cache.get_value(h) { + return Some(h); + } + } + None + })) + } + + fn propose_next_height(&self) -> Stm> { + let latest_height = if let Some(h) = self.latest_height_in_cache()? { + h + } else { + tracing::debug!("no proposal yet as height not available"); + return Ok(None); + }; + + let last_committed_height = if let Some(h) = self.last_committed_finality.read_clone()? { + h.height + } else { + unreachable!("last committed finality will be available at this point"); + }; + + let max_proposal_height = last_committed_height + self.config.max_proposal_range(); + let candidate_height = min(max_proposal_height, latest_height); + tracing::debug!(max_proposal_height, candidate_height, "propose heights"); + + let first_non_null_height = + if let Some(h) = self.first_non_null_block_before(candidate_height)? { + h + } else { + tracing::debug!(height = candidate_height, "no non-null block found before"); + return Ok(None); + }; + + tracing::debug!(first_non_null_height, candidate_height); + // an extra layer of delay + let maybe_proposal_height = + self.first_non_null_block_before(first_non_null_height - self.config.proposal_delay())?; + tracing::debug!( + delayed_height = maybe_proposal_height, + delay = self.config.proposal_delay() + ); + if let Some(proposal_height) = maybe_proposal_height { + // this is possible due to delayed execution as the proposed height's data cannot be + // executed because they have yet to be executed. + return if last_committed_height == proposal_height { + tracing::debug!( + last_committed_height, + proposal_height, + "no new blocks from cache, not proposing" + ); + Ok(None) + } else { + tracing::debug!(proposal_height, "new proposal height"); + Ok(Some(proposal_height)) + }; + } + + tracing::debug!(last_committed_height, "no non-null block after delay"); + Ok(None) + } + fn handle_null_block T, D: Fn() -> T>( &self, height: BlockHeight, @@ -237,15 +312,23 @@ impl FinalityWithNull { // the incoming proposal has height already committed, reject if last_committed_finality.height >= proposal.height { + tracing::debug!( + last_committed = last_committed_finality.height, + proposed = proposal.height, + "proposed height already committed", + ); return Ok(false); } - if let Some(latest_height) = self.latest_height()? { + if let Some(latest_height) = self.latest_height_in_cache()? { + let r = latest_height >= proposal.height; + tracing::debug!(is_true = r, "incoming proposal height seen?"); // requires the incoming height cannot be more advanced than our trusted parent node - Ok(latest_height >= proposal.height) + Ok(r) } else { // latest height is not found, meaning we dont have any prefetched cache, we just be - // strict and vote no simply because we don't know.. + // strict and vote no simply because we don't know. + tracing::debug!("reject proposal, no data in cache"); Ok(false) } } @@ -253,10 +336,183 @@ impl FinalityWithNull { fn check_block_hash(&self, proposal: &IPCParentFinality) -> Stm { Ok( if let Some(block_hash) = self.block_hash_at_height(proposal.height)? { - block_hash == proposal.block_hash + let r = block_hash == proposal.block_hash; + tracing::debug!(proposal = proposal.to_string(), is_same = r, "same hash?"); + r } else { + tracing::debug!(proposal = proposal.to_string(), "reject, hash not found"); false }, ) } } + +#[cfg(test)] +mod tests { + use crate::finality::{FinalityWithNull, ParentViewPayload}; + use crate::{BlockHeight, Config, IPCParentFinality}; + use async_stm::{atomically, atomically_or_err}; + + async fn new_provider( + mut blocks: Vec<(BlockHeight, Option)>, + ) -> FinalityWithNull { + let config = Config { + chain_head_delay: 2, + polling_interval: Default::default(), + exponential_back_off: Default::default(), + exponential_retry_limit: 0, + max_proposal_range: Some(6), + max_cache_blocks: None, + proposal_delay: Some(2), + }; + let committed_finality = IPCParentFinality { + height: blocks[0].0, + block_hash: vec![0; 32], + }; + + blocks.remove(0); + + let f = FinalityWithNull::new(config, 1, Some(committed_finality)); + for (h, p) in blocks { + atomically_or_err(|| f.new_parent_view(h, p.clone())) + .await + .unwrap(); + } + f + } + + #[tokio::test] + async fn test_happy_path() { + // max_proposal_range is 6. proposal_delay is 2 + let parent_blocks = vec![ + (100, Some((vec![0; 32], vec![], vec![]))), // last committed block + (101, Some((vec![1; 32], vec![], vec![]))), // cache start + (102, Some((vec![2; 32], vec![], vec![]))), // final proposal height + (103, Some((vec![3; 32], vec![], vec![]))), // final delayed height + (104, Some((vec![4; 32], vec![], vec![]))), + (105, Some((vec![5; 32], vec![], vec![]))), // first non null block + (106, Some((vec![6; 32], vec![], vec![]))), // max proposal height (last committed + 6) + (107, Some((vec![7; 32], vec![], vec![]))), + (108, Some((vec![8; 32], vec![], vec![]))), // cache latest height + ]; + let provider = new_provider(parent_blocks).await; + + let f = IPCParentFinality { + height: 102, + block_hash: vec![2; 32], + }; + assert_eq!( + atomically(|| provider.next_proposal()).await, + Some(f.clone()) + ); + + // Test set new finality + atomically(|| { + let last = provider.last_committed_finality.read_clone()?; + provider.set_new_finality(f.clone(), last) + }) + .await; + + assert_eq!( + atomically(|| provider.last_committed_finality()).await, + Some(f.clone()) + ); + + // this ensures sequential insertion is still valid + atomically_or_err(|| provider.new_parent_view(109, None)) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_not_enough_view() { + // max_proposal_range is 6. proposal_delay is 2 + let parent_blocks = vec![ + (100, Some((vec![0; 32], vec![], vec![]))), // last committed block + (101, Some((vec![1; 32], vec![], vec![]))), // cache start and final height + (102, Some((vec![2; 32], vec![], vec![]))), // delayed height + (103, Some((vec![3; 32], vec![], vec![]))), + (104, Some((vec![4; 32], vec![], vec![]))), // first non null block + (105, Some((vec![4; 32], vec![], vec![]))), // cache latest height + // max proposal height is 106 + ]; + let provider = new_provider(parent_blocks).await; + + assert_eq!( + atomically(|| provider.next_proposal()).await, + Some(IPCParentFinality { + height: 101, + block_hash: vec![1; 32] + }) + ); + } + + #[tokio::test] + async fn test_with_all_null_blocks() { + // max_proposal_range is 10. proposal_delay is 2 + let parent_blocks = vec![ + (102, Some((vec![2; 32], vec![], vec![]))), // last committed block + (103, None), + (104, None), + (105, None), + (106, None), + (107, None), + (108, None), + (109, None), + (110, Some((vec![4; 32], vec![], vec![]))), // cache latest height + // max proposal height is 112 + ]; + let mut provider = new_provider(parent_blocks).await; + provider.config.max_proposal_range = Some(8); + + assert_eq!(atomically(|| provider.next_proposal()).await, None); + } + + #[tokio::test] + async fn test_with_partially_null_blocks_i() { + // max_proposal_range is 10. proposal_delay is 2 + let parent_blocks = vec![ + (102, Some((vec![2; 32], vec![], vec![]))), // last committed block + (103, None), + (104, None), // we wont have a proposal because after delay, there is no more non-null proposal + (105, None), + (106, None), + (107, Some((vec![7; 32], vec![], vec![]))), // first non null block + (108, None), + (109, None), + (110, Some((vec![10; 32], vec![], vec![]))), // cache latest height + // max proposal height is 112 + ]; + let mut provider = new_provider(parent_blocks).await; + provider.config.max_proposal_range = Some(8); + + assert_eq!(atomically(|| provider.next_proposal()).await, None); + } + + #[tokio::test] + async fn test_with_partially_null_blocks_ii() { + // max_proposal_range is 10. proposal_delay is 2 + let parent_blocks = vec![ + (102, Some((vec![2; 32], vec![], vec![]))), // last committed block + (103, Some((vec![3; 32], vec![], vec![]))), // first non null delayed block, final + (104, None), + (105, None), // delayed block + (106, None), + (107, Some((vec![7; 32], vec![], vec![]))), // first non null block + (108, None), + (109, None), + (110, Some((vec![10; 32], vec![], vec![]))), // cache latest height + // max proposal height is 112 + ]; + let mut provider = new_provider(parent_blocks).await; + provider.config.max_proposal_range = Some(8); + + assert_eq!( + atomically(|| provider.next_proposal()).await, + Some(IPCParentFinality { + height: 103, + block_hash: vec![3; 32] + }) + ); + } +} diff --git a/fendermint/vm/topdown/src/lib.rs b/fendermint/vm/topdown/src/lib.rs index 6a143160..3edc0751 100644 --- a/fendermint/vm/topdown/src/lib.rs +++ b/fendermint/vm/topdown/src/lib.rs @@ -31,6 +31,10 @@ pub type BlockHash = Bytes; /// The null round error message pub(crate) const NULL_ROUND_ERR_MSG: &str = "requested epoch was a null round"; +/// Default topdown proposal height range +pub(crate) const DEFAULT_MAX_PROPOSAL_RANGE: BlockHeight = 100; +pub(crate) const DEFAULT_MAX_CACHE_BLOCK: BlockHeight = 500; +pub(crate) const DEFAULT_PROPOSAL_DELAY: BlockHeight = 2; #[derive(Debug, Clone, Deserialize)] pub struct Config { @@ -45,6 +49,53 @@ pub struct Config { pub exponential_back_off: Duration, /// The max number of retries for exponential backoff before giving up pub exponential_retry_limit: usize, + /// The max number of blocks one should make the topdown proposal + pub max_proposal_range: Option, + /// Max number of blocks that should be stored in cache + pub max_cache_blocks: Option, + pub proposal_delay: Option, +} + +impl Config { + pub fn new( + chain_head_delay: BlockHeight, + polling_interval: Duration, + exponential_back_off: Duration, + exponential_retry_limit: usize, + ) -> Self { + Self { + chain_head_delay, + polling_interval, + exponential_back_off, + exponential_retry_limit, + max_proposal_range: None, + max_cache_blocks: None, + proposal_delay: None, + } + } + + pub fn with_max_proposal_range(mut self, max_proposal_range: BlockHeight) -> Self { + self.max_proposal_range = Some(max_proposal_range); + self + } + + pub fn with_proposal_delay(mut self, proposal_delay: BlockHeight) -> Self { + self.proposal_delay = Some(proposal_delay); + self + } + + pub fn max_proposal_range(&self) -> BlockHeight { + self.max_proposal_range + .unwrap_or(DEFAULT_MAX_PROPOSAL_RANGE) + } + + pub fn proposal_delay(&self) -> BlockHeight { + self.proposal_delay.unwrap_or(DEFAULT_PROPOSAL_DELAY) + } + + pub fn max_cache_blocks(&self) -> BlockHeight { + self.max_cache_blocks.unwrap_or(DEFAULT_MAX_CACHE_BLOCK) + } } /// The finality view for IPC parent at certain height. @@ -87,23 +138,11 @@ pub trait ParentViewProvider { from: BlockHeight, to: BlockHeight, ) -> anyhow::Result>; - /// Get the validator changes at height. - async fn validator_changes( - &self, - height: BlockHeight, - ) -> anyhow::Result>; - /// Get the top down messages at height. - async fn top_down_msgs( - &self, - height: BlockHeight, - block_hash: &BlockHash, - ) -> anyhow::Result>; /// Get the top down messages from and to height. async fn top_down_msgs_from( &self, from: BlockHeight, to: BlockHeight, - block_hash: &BlockHash, ) -> anyhow::Result>; } diff --git a/fendermint/vm/topdown/src/proxy.rs b/fendermint/vm/topdown/src/proxy.rs index 75d211f2..c78e9f83 100644 --- a/fendermint/vm/topdown/src/proxy.rs +++ b/fendermint/vm/topdown/src/proxy.rs @@ -1,8 +1,8 @@ // Copyright 2022-2023 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT -use crate::{BlockHash, BlockHeight}; -use anyhow::{anyhow, bail}; +use crate::BlockHeight; +use anyhow::anyhow; use async_trait::async_trait; use fvm_shared::clock::ChainEpoch; use ipc_provider::manager::{GetBlockHashResult, TopDownQueryPayload}; @@ -25,11 +25,10 @@ pub trait ParentQueryProxy { async fn get_block_hash(&self, height: BlockHeight) -> anyhow::Result; /// Get the top down messages at epoch with the block hash at that height - async fn get_top_down_msgs_with_hash( + async fn get_top_down_msgs( &self, height: BlockHeight, - block_hash: &BlockHash, - ) -> anyhow::Result>; + ) -> anyhow::Result>>; /// Get the validator set at the specified height async fn get_validator_changes( @@ -83,21 +82,18 @@ impl ParentQueryProxy for IPCProviderProxy { } /// Get the top down messages from the starting to the ending height. - async fn get_top_down_msgs_with_hash( + async fn get_top_down_msgs( &self, height: BlockHeight, - block_hash: &BlockHash, - ) -> anyhow::Result> { - let res = self - .ipc_provider + ) -> anyhow::Result>> { + self.ipc_provider .get_top_down_msgs(&self.child_subnet, height as ChainEpoch) - .await?; - - if res.block_hash != *block_hash { - bail!("unexpected blockhash at height {height}"); - } - - Ok(res.value) + .await + .map(|mut v| { + // sort ascending, we dont assume the changes are ordered + v.value.sort_by(|a, b| a.msg.nonce.cmp(&b.msg.nonce)); + v + }) } /// Get the validator set at the specified height. diff --git a/fendermint/vm/topdown/src/sync.rs b/fendermint/vm/topdown/src/sync.rs deleted file mode 100644 index 2bab91ec..00000000 --- a/fendermint/vm/topdown/src/sync.rs +++ /dev/null @@ -1,550 +0,0 @@ -// Copyright 2022-2023 Protocol Labs -// SPDX-License-Identifier: Apache-2.0, MIT -//! A constant running process that fetch or listener to parent state - -use crate::error::Error; -use crate::finality::ParentViewPayload; -use crate::proxy::{IPCProviderProxy, ParentQueryProxy}; -use crate::{ - is_null_round_str, BlockHash, BlockHeight, CachedFinalityProvider, Config, IPCParentFinality, - ParentFinalityProvider, Toggle, -}; - -use async_stm::{atomically, atomically_or_err, Stm}; -use ipc_provider::manager::GetBlockHashResult; - -use anyhow::{anyhow, Context}; - -use ethers::utils::hex; -use std::cmp::{max, min}; -use std::sync::Arc; -use std::time::Duration; - -/// The max number of blocks polling should query each parent view update. If the number of blocks -/// polled equals this value, it would stop polling for this iteration and commit the result to cache. -const MAX_PARENT_VIEW_BLOCK_GAP: BlockHeight = 100; -/// When polling parent view, if the number of top down messages exceeds this limit, -/// the polling will stop for this iteration and commit the result to cache. -const TOPDOWN_MSG_LEN_THRESHOLD: usize = 500; - -type GetParentViewPayload = Vec<(BlockHeight, Option)>; - -/// Query the parent finality from the block chain state -pub trait ParentFinalityStateQuery { - /// Get the latest committed finality from the state - fn get_latest_committed_finality(&self) -> anyhow::Result>; -} - -/// Constantly syncing with parent through polling -struct PollingParentSyncer { - config: Config, - parent_view_provider: Arc>>, - parent_client: Arc, - committed_state_query: Arc, - tendermint_client: C, -} - -/// Queries the starting finality for polling. First checks the committed finality, if none, that -/// means the chain has just started, then query from the parent to get the genesis epoch. -async fn query_starting_finality( - query: &Arc, - parent_client: &Arc, -) -> anyhow::Result { - loop { - let mut finality = match query.get_latest_committed_finality() { - Ok(Some(finality)) => finality, - Ok(None) => { - tracing::debug!("app not ready for query yet"); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - } - Err(e) => { - tracing::warn!(error = e.to_string(), "cannot get committed finality"); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - } - }; - tracing::info!(finality = finality.to_string(), "latest finality committed"); - - // this means there are no previous committed finality yet, we fetch from parent to get - // the genesis epoch of the current subnet and its corresponding block hash. - if finality.height == 0 { - let genesis_epoch = parent_client.get_genesis_epoch().await?; - tracing::debug!(genesis_epoch = genesis_epoch, "obtained genesis epoch"); - let r = parent_client.get_block_hash(genesis_epoch).await?; - tracing::debug!( - block_hash = hex::encode(&r.block_hash), - "obtained genesis block hash", - ); - - finality = IPCParentFinality { - height: genesis_epoch, - block_hash: r.block_hash, - }; - tracing::info!( - genesis_finality = finality.to_string(), - "no previous finality committed, fetched from genesis epoch" - ); - } - - return Ok(finality); - } -} - -/// Start the polling parent syncer in the background -pub async fn launch_polling_syncer( - query: T, - config: Config, - view_provider: Arc>>, - parent_client: Arc, - tendermint_client: C, -) -> anyhow::Result<()> -where - T: ParentFinalityStateQuery + Send + Sync + 'static, - C: tendermint_rpc::Client + Send + Sync + 'static, -{ - if !view_provider.is_enabled() { - return Err(anyhow!("provider not enabled, enable to run syncer")); - } - - let query = Arc::new(query); - let finality = query_starting_finality(&query, &parent_client).await?; - atomically(|| view_provider.set_new_finality(finality.clone(), None)).await; - - tracing::info!( - finality = finality.to_string(), - "launching parent syncer with last committed finality" - ); - - let poll = PollingParentSyncer::new( - config, - view_provider, - parent_client, - query, - tendermint_client, - ); - poll.start(); - - Ok(()) -} - -impl PollingParentSyncer { - pub fn new( - config: Config, - parent_view_provider: Arc>>, - parent_client: Arc, - query: Arc, - tendermint_client: C, - ) -> Self { - Self { - config, - parent_view_provider, - parent_client, - committed_state_query: query, - tendermint_client, - } - } -} - -impl PollingParentSyncer -where - T: ParentFinalityStateQuery + Send + Sync + 'static, - C: tendermint_rpc::Client + Send + Sync + 'static, -{ - /// Start the parent finality listener in the background - pub fn start(self) { - let config = self.config; - let provider = self.parent_view_provider; - let parent_client = self.parent_client; - let query = self.committed_state_query; - let tendermint_client = self.tendermint_client; - - let mut interval = tokio::time::interval(config.polling_interval); - - tokio::spawn(async move { - loop { - interval.tick().await; - - if let Err(e) = sync_with_parent( - &config, - &parent_client, - &provider, - &query, - &tendermint_client, - ) - .await - { - tracing::error!(error = e.to_string(), "sync with parent encountered error"); - } - } - }); - } -} - -/// Syncing with parent with the below steps: -/// 1. Get the latest height in cache or latest height committed increment by 1 as the -/// starting height -/// 2. Get the latest chain head height deduct away N blocks as the ending height -/// 3. Fetches the data between starting and ending height -/// 4. Update the data into cache -async fn sync_with_parent( - config: &Config, - parent_proxy: &Arc, - provider: &Arc>>, - query: &Arc, - tendermint_client: &C, -) -> anyhow::Result<()> -where - T: ParentFinalityStateQuery + Send + Sync + 'static, - C: tendermint_rpc::Client + Send + Sync + 'static, -{ - let status: tendermint_rpc::endpoint::status::Response = tendermint_client - .status() - .await - .context("failed to get Tendermint status")?; - - if status.sync_info.catching_up { - tracing::debug!("syncing with peer, skip parent finality syncing this round"); - return Ok(()); - } - - let (last_recorded_height, last_height_hash) = - if let Some(h) = last_recorded_data(provider).await? { - h - } else { - // cannot get starting recorded height, we just wait for the next loop execution - return Ok(()); - }; - - let parent_chain_head_height = parent_proxy.get_chain_head_height().await?; - // sanity check - if parent_chain_head_height < config.chain_head_delay { - tracing::debug!("latest height not more than the chain head delay"); - return Ok(()); - } - - // we consider the chain head finalized only after the `chain_head_delay` - let max_ending_height = parent_chain_head_height - config.chain_head_delay; - - tracing::debug!( - last_recorded_height = last_recorded_height, - parent_chain_head_height = parent_chain_head_height, - max_ending_height = max_ending_height, - "syncing heights", - ); - - if last_recorded_height == max_ending_height { - tracing::debug!( - last_recorded_height = last_recorded_height, - "the parent has yet to produce a new block" - ); - return Ok(()); - } - - // we are going backwards in terms of block height, the latest block height is lower - // than our previously fetched head. It could be a chain reorg. We clear all the cache - // in `provider` and start from scratch - if last_recorded_height > max_ending_height { - tracing::warn!( - last_recorded_height = last_recorded_height, - max_ending_height = max_ending_height, - "last recorded height more than max ending height" - ); - return reset_cache(parent_proxy, provider, query).await; - } - - // we are adding 1 to the height because we are fetching block by block, we also configured - // the sequential cache to use increment == 1. - let starting_height = last_recorded_height + 1; - let ending_height = min( - max_ending_height, - MAX_PARENT_VIEW_BLOCK_GAP + starting_height, - ); - tracing::debug!( - start = starting_height, - end = ending_height, - "parent view range" - ); - - let new_parent_views = parent_views_in_block_range( - parent_proxy, - last_height_hash, - starting_height, - ending_height, - max_ending_height, - ) - .await?; - - atomically_or_err::<_, Error, _>(move || { - for (height, maybe_payload) in new_parent_views.clone() { - provider.new_parent_view(height, maybe_payload)?; - } - Ok(()) - }) - .await?; - - tracing::debug!(height = ending_height, "updated new parent views to"); - - Ok(()) -} - -/// Reset the cache in the face of a reorg -async fn reset_cache( - parent_proxy: &Arc, - provider: &Arc>>, - query: &Arc, -) -> anyhow::Result<()> { - let finality = query_starting_finality(query, parent_proxy).await?; - atomically(|| provider.reset(finality.clone())).await; - Ok(()) -} - -/// A util struct that tracks the last recorded height -enum LastRecordedBlock { - FilledBlock { - height: BlockHeight, - hash: BlockHash, - }, - NullBlock(BlockHeight), - Empty, -} - -impl LastRecordedBlock { - fn filled(height: BlockHeight, hash: BlockHash) -> Self { - Self::FilledBlock { height, hash } - } - - fn null(height: BlockHeight) -> Self { - Self::NullBlock(height) - } - - fn empty() -> Self { - Self::Empty - } -} - -/// Getting the last recorded block height/hash -async fn last_recorded_data( - provider: &Arc>>, -) -> anyhow::Result> { - match atomically(|| last_recorded_block(provider)).await { - LastRecordedBlock::Empty => Ok(None), - LastRecordedBlock::FilledBlock { height, hash } => Ok(Some((height, hash))), - LastRecordedBlock::NullBlock(height) => { - tracing::info!(height, "last recorded height is a null block"); - - // Imaging the list of blocks as follows: - // - // F0 B0 B1 N0 N1 B2 B3 B4 - // - // where F0 represents the last committed finality, B* represents non-null blocks and - // N* represents null blocks. - // - // Currently the last recorded block is N1, so the next block to sync in parent is B2. - // The response from getting block hash at height B2 from fvm eth apis would return: - // - // Block height: B2, Block hash: hash(B2), Parent block hash: hash(B1) - // - // F0 B0 B1 N0 N1 B2 - // B0' N0' N1' N2' B2 B3 B4 <====== reorged chain case 1 - // B0 B1 B2' B3' B2 <====== reorged chain case 2 - // B0 B1 N0' B1' B2 <====== reorged chain case 3 - // - // If last recorded block is null (say N1), to ensure the chain has not reorg before B2: - // we just need to get the first non null parent in cache or committed finality and use - // that block hash as previous block hash in following steps. - match atomically(|| provider.first_non_null_parent_hash(height)).await { - None => unreachable!("should have last committed finality at this point"), - Some(hash) => { - tracing::info!( - block_height = height, - parent_hash = hex::encode(&hash), - "First non null parent", - ); - Ok(Some((height, hash))) - } - } - } - } -} - -/// Obtains the last recorded block from provider cache or from last committed finality height. -fn last_recorded_block( - provider: &Arc>>, -) -> Stm { - let latest_height = if let Some(h) = provider.latest_height()? { - h - } else if let Some(f) = provider.last_committed_finality()? { - // this means provider has cleared cache, but only previous committed finality - return Ok(LastRecordedBlock::filled(f.height, f.block_hash)); - } else { - return Ok(LastRecordedBlock::empty()); - }; - - if let Some(hash) = provider.block_hash(latest_height)? { - Ok(LastRecordedBlock::filled(latest_height, hash)) - } else { - Ok(LastRecordedBlock::null(latest_height)) - } -} - -/// Obtain the new parent views for the input block height range -async fn parent_views_in_block_range( - parent_proxy: &Arc, - mut previous_hash: BlockHash, - start_height: BlockHeight, - end_height: BlockHeight, - max_ending_height: BlockHeight, -) -> Result { - let mut updates = vec![]; - let mut total_top_down_msgs = 0; - - for h in start_height..=end_height { - match parent_views_at_height(parent_proxy, &previous_hash, h, max_ending_height).await { - Ok((hash, changeset, cross_msgs)) => { - total_top_down_msgs += cross_msgs.len(); - - tracing::debug!( - height = h, - previous_previous_hahs = hex::encode(&previous_hash), - previous_hash = hex::encode(&hash), - "matching hashes", - ); - previous_hash = hash.clone(); - - updates.push((h, Some((hash, changeset, cross_msgs)))); - if total_top_down_msgs >= TOPDOWN_MSG_LEN_THRESHOLD { - break; - } - } - // Handles lotus null round error. - // - // This is the error that we see when there is a null round: - // https://github.com/filecoin-project/lotus/blob/7bb1f98ac6f5a6da2cc79afc26d8cd9fe323eb30/node/impl/full/eth.go#L164 - // This happens when we request the block for a round without blocks in the tipset. - // A null round will never have a block, which means that we can advance to the next height. - Err(e) => { - let err_msg = e.to_string(); - if is_null_round_str(&err_msg) { - tracing::warn!(height = h, "null round detected, skip"); - updates.push((h, None)); - } else if let Error::LookAheadLimitReached(start, limit) = e { - tracing::warn!( - start_height = start, - limit_height = limit, - "look ahead limit reached, store updates so far in cache", - ); - break; - } else { - return Err(e); - } - } - } - } - - tracing::debug!(?updates, "obtained parent view updates"); - - Ok(updates) -} - -/// Obtain the new parent views for the target height. -/// -/// For `max_ending_height`, the explanation is as follows: -/// Say the current height is h and we need to fetch the top down messages. For `lotus`, the state -/// at height h is only finalized at h + 1. The block hash at height h will return empty top down -/// messages. In this case, we need to get the block hash at height h + 1 to query the top down messages. -/// Sadly, the height h + 1 could be null block, we need to continuously look ahead until we found -/// a height that is not null. But we cannot go all the way to the block head as it's not considered -/// final yet. So we need to use a `max_ending_height` that restricts how far as head we should go. -/// If we still cannot find a height that is non-null, maybe we should try later -async fn parent_views_at_height( - parent_proxy: &Arc, - previous_hash: &BlockHash, - height: BlockHeight, - max_ending_height: BlockHeight, -) -> Result { - let block_hash_res = parent_proxy - .get_block_hash(height) - .await - .map_err(|e| Error::CannotQueryParent(e.to_string(), height))?; - if block_hash_res.parent_block_hash != *previous_hash { - tracing::warn!( - height, - parent_hash = hex::encode(&block_hash_res.parent_block_hash), - previous_hash = hex::encode(previous_hash), - "parent block hash diff than previous hash", - ); - return Err(Error::ParentChainReorgDetected); - } - - let changes_res = parent_proxy - .get_validator_changes(height) - .await - .map_err(|e| Error::CannotQueryParent(e.to_string(), height))?; - if changes_res.block_hash != block_hash_res.block_hash { - tracing::warn!( - height, - change_set_hash = hex::encode(&changes_res.block_hash), - block_hash = hex::encode(&block_hash_res.block_hash), - "change set block hash does not equal block hash", - ); - return Err(Error::ParentChainReorgDetected); - } - - // for `lotus`, the state at height h is only finalized at h + 1. The block hash - // at height h will return empty top down messages. In this case, we need to get - // the block hash at height h + 1 to query the top down messages. - // Sadly, the height h + 1 could be null block, we need to continuously look ahead - // until we found a height that is not null - let next_hash = first_non_null_block_hash(parent_proxy, height + 1, max_ending_height).await?; - if next_hash.parent_block_hash != block_hash_res.block_hash { - tracing::warn!( - next_block_height = height + 1, - next_block_parent = hex::encode(&next_hash.parent_block_hash), - block_hash = hex::encode(&block_hash_res.block_hash), - "next block's parent hash does not equal block hash", - ); - return Err(Error::ParentChainReorgDetected); - } - let top_down_msgs_res = parent_proxy - .get_top_down_msgs_with_hash(height, &next_hash.block_hash) - .await - .map_err(|e| Error::CannotQueryParent(e.to_string(), height))?; - - Ok(( - block_hash_res.block_hash, - changes_res.value, - top_down_msgs_res, - )) -} - -/// Get the first non-null block hash in between heights. If height is a null round, then we need -/// to look further util we find one that is not null round. -async fn first_non_null_block_hash( - parent_proxy: &Arc, - start: BlockHeight, - mut end: BlockHeight, -) -> Result { - // at least we run for height - end = max(start, end); - - for h in start..=end { - match parent_proxy.get_block_hash(h).await { - Ok(h) => return Ok(h), - Err(e) => { - let msg = e.to_string(); - if is_null_round_str(&msg) { - tracing::warn!( - height = h, - error = e.to_string(), - "look ahead height is a null round" - ); - continue; - } else { - return Err(Error::CannotQueryParent(msg, h)); - } - } - } - } - Err(Error::LookAheadLimitReached(start, end)) -} diff --git a/fendermint/vm/topdown/src/sync/mod.rs b/fendermint/vm/topdown/src/sync/mod.rs new file mode 100644 index 00000000..9d30aea3 --- /dev/null +++ b/fendermint/vm/topdown/src/sync/mod.rs @@ -0,0 +1,172 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT +//! A constant running process that fetch or listener to parent state + +mod pointers; +mod syncer; +mod tendermint; + +use crate::proxy::ParentQueryProxy; +use crate::sync::syncer::LotusParentSyncer; +use crate::sync::tendermint::TendermintAwareSyncer; +use crate::{CachedFinalityProvider, Config, IPCParentFinality, ParentFinalityProvider, Toggle}; +use anyhow::anyhow; +use async_stm::atomically; +use ethers::utils::hex; +use std::sync::Arc; +use std::time::Duration; + +/// Query the parent finality from the block chain state +pub trait ParentFinalityStateQuery { + /// Get the latest committed finality from the state + fn get_latest_committed_finality(&self) -> anyhow::Result>; +} + +/// Constantly syncing with parent through polling +struct PollingParentSyncer { + config: Config, + parent_view_provider: Arc>>, + parent_client: Arc

, + committed_state_query: Arc, + tendermint_client: C, +} + +/// Queries the starting finality for polling. First checks the committed finality, if none, that +/// means the chain has just started, then query from the parent to get the genesis epoch. +async fn query_starting_finality( + query: &Arc, + parent_client: &Arc

, +) -> anyhow::Result +where + T: ParentFinalityStateQuery + Send + Sync + 'static, + P: ParentQueryProxy + Send + Sync + 'static, +{ + loop { + let mut finality = match query.get_latest_committed_finality() { + Ok(Some(finality)) => finality, + Ok(None) => { + tracing::debug!("app not ready for query yet"); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + Err(e) => { + tracing::warn!(error = e.to_string(), "cannot get committed finality"); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + tracing::info!(finality = finality.to_string(), "latest finality committed"); + + // this means there are no previous committed finality yet, we fetch from parent to get + // the genesis epoch of the current subnet and its corresponding block hash. + if finality.height == 0 { + let genesis_epoch = parent_client.get_genesis_epoch().await?; + tracing::debug!(genesis_epoch = genesis_epoch, "obtained genesis epoch"); + let r = parent_client.get_block_hash(genesis_epoch).await?; + tracing::debug!( + block_hash = hex::encode(&r.block_hash), + "obtained genesis block hash", + ); + + finality = IPCParentFinality { + height: genesis_epoch, + block_hash: r.block_hash, + }; + tracing::info!( + genesis_finality = finality.to_string(), + "no previous finality committed, fetched from genesis epoch" + ); + } + + return Ok(finality); + } +} + +/// Start the polling parent syncer in the background +pub async fn launch_polling_syncer( + query: T, + config: Config, + view_provider: Arc>>, + parent_client: Arc

, + tendermint_client: C, +) -> anyhow::Result<()> +where + T: ParentFinalityStateQuery + Send + Sync + 'static, + C: tendermint_rpc::Client + Send + Sync + 'static, + P: ParentQueryProxy + Send + Sync + 'static, +{ + if !view_provider.is_enabled() { + return Err(anyhow!("provider not enabled, enable to run syncer")); + } + + let query = Arc::new(query); + let finality = query_starting_finality(&query, &parent_client).await?; + atomically(|| view_provider.set_new_finality(finality.clone(), None)).await; + + tracing::info!( + finality = finality.to_string(), + "launching parent syncer with last committed finality" + ); + + let poll = PollingParentSyncer::new( + config, + view_provider, + parent_client, + query, + tendermint_client, + ); + poll.start(); + + Ok(()) +} + +impl PollingParentSyncer { + pub fn new( + config: Config, + parent_view_provider: Arc>>, + parent_client: Arc

, + query: Arc, + tendermint_client: C, + ) -> Self { + Self { + config, + parent_view_provider, + parent_client, + committed_state_query: query, + tendermint_client, + } + } +} + +impl PollingParentSyncer +where + T: ParentFinalityStateQuery + Send + Sync + 'static, + C: tendermint_rpc::Client + Send + Sync + 'static, + P: ParentQueryProxy + Send + Sync + 'static, +{ + /// Start the parent finality listener in the background + pub fn start(self) { + let config = self.config; + let provider = self.parent_view_provider; + let parent_client = self.parent_client; + let query = self.committed_state_query; + let tendermint_client = self.tendermint_client; + + let mut interval = tokio::time::interval(config.polling_interval); + + tokio::spawn(async move { + let lotus_syncer = LotusParentSyncer::new(config, parent_client, provider, query) + .await + .expect(""); + let mut tendermint_syncer = TendermintAwareSyncer::new(lotus_syncer, tendermint_client); + + loop { + interval.tick().await; + + if let Err(e) = tendermint_syncer.sync().await { + tracing::error!(error = e.to_string(), "sync with parent encountered error"); + } + } + }); + } +} diff --git a/fendermint/vm/topdown/src/sync/pointers.rs b/fendermint/vm/topdown/src/sync/pointers.rs new file mode 100644 index 00000000..6dd6e0c1 --- /dev/null +++ b/fendermint/vm/topdown/src/sync/pointers.rs @@ -0,0 +1,50 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::{BlockHash, BlockHeight}; +use ethers::utils::hex; +use std::fmt::{Display, Formatter}; + +#[derive(Clone, Debug)] +pub(crate) struct SyncPointers { + tail: Option<(BlockHeight, BlockHash)>, + head: BlockHeight, +} + +impl SyncPointers { + pub fn new(head: BlockHeight) -> Self { + Self { tail: None, head } + } + + pub fn head(&self) -> BlockHeight { + self.head + } + + pub fn tail(&self) -> Option<(BlockHeight, BlockHash)> { + self.tail.clone() + } + + pub fn advance_head(&mut self) { + self.head += 1; + } + + pub fn set_tail(&mut self, height: BlockHeight, hash: BlockHash) { + self.tail = Some((height, hash)); + } +} + +impl Display for SyncPointers { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some((height, hash)) = &self.tail { + write!( + f, + "{{tail: {{height: {}, hash: {}}}, head: {}}}", + height, + hex::encode(hash), + self.head + ) + } else { + write!(f, "{{tail: None, head: {}}}", self.head) + } + } +} diff --git a/fendermint/vm/topdown/src/sync/syncer.rs b/fendermint/vm/topdown/src/sync/syncer.rs new file mode 100644 index 00000000..0e3b8784 --- /dev/null +++ b/fendermint/vm/topdown/src/sync/syncer.rs @@ -0,0 +1,524 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT +//! The inner type of parent syncer + +use crate::finality::ParentViewPayload; +use crate::proxy::ParentQueryProxy; +use crate::sync::pointers::SyncPointers; +use crate::sync::{query_starting_finality, ParentFinalityStateQuery}; +use crate::{ + is_null_round_str, BlockHash, BlockHeight, CachedFinalityProvider, Config, Error, Toggle, +}; +use anyhow::anyhow; +use async_stm::{atomically, atomically_or_err}; +use ethers::utils::hex; +use std::sync::Arc; + +/// Parent syncer that constantly poll parent. This struct handles lotus null blocks and deferred +/// execution. For ETH based parent, it should work out of the box as well. +pub(crate) struct LotusParentSyncer { + config: Config, + parent_proxy: Arc

, + provider: Arc>>, + query: Arc, + + /// The pointers that indicate which height to poll parent next + sync_pointers: SyncPointers, +} + +impl LotusParentSyncer +where + T: ParentFinalityStateQuery + Send + Sync + 'static, + P: ParentQueryProxy + Send + Sync + 'static, +{ + pub async fn new( + config: Config, + parent_proxy: Arc

, + provider: Arc>>, + query: Arc, + ) -> anyhow::Result { + let last_committed_finality = atomically(|| provider.last_committed_finality()) + .await + .ok_or_else(|| anyhow!("parent finality not ready"))?; + + Ok(Self { + config, + parent_proxy, + provider, + query, + sync_pointers: SyncPointers::new(last_committed_finality.height), + }) + } + + /// There are 2 pointers, each refers to a block height, when syncing with parent. As Lotus has + /// delayed execution and null round, we need to ensure the topdown messages and validator + /// changes polled are indeed finalized and executed. The following three pointers are introduced: + /// - tail: The next block height in cache to be confirmed executed, could be None + /// - head: The latest block height fetched in cache, finalized but may not be executed. + /// + /// Say we have block chain as follows: + /// NonNullBlock(1) -> NonNullBlock(2) -> NullBlock(3) -> NonNullBlock(4) -> NullBlock(5) -> NonNullBlock(6) + /// and block height 1 is the previously finalized and executed block height. + /// + /// At the beginning, head == 1 and tail == None. With a new block height fetched, + /// `head = 2`. Since height at 2 is not a null block, `tail = Some(2)`, because we cannot be sure + /// block 2 has executed yet. When a new block is fetched, `head = 3`. Since head is a null block, we + /// cannot confirm block height 2. When `head = 4`, it's not a null block, we can confirm block 2 is + /// executed (also with some checks to ensure no reorg has occurred). We fetch block 2's data and set + /// `tail = Some(4)`. + /// The data fetch at block height 2 is pushed to cache and height 2 is ready to be proposed. + /// + /// At height 6, it's block height 4 will be confirmed and its data pushed to cache. At the same + /// time, since block 3 is a null block, empty data will also be pushed to cache. Block 4 is ready + /// to be proposed. + pub async fn sync(&mut self) -> anyhow::Result<()> { + let chain_head = if let Some(h) = self.finalized_chain_head().await? { + h + } else { + return Ok(()); + }; + tracing::debug!( + chain_head, + pointers = self.sync_pointers.to_string(), + "syncing heights" + ); + + if self.detected_reorg_by_height(chain_head) { + tracing::warn!( + pointers = self.sync_pointers.to_string(), + chain_head, + "reorg detected from height" + ); + return self.reset_cache().await; + } + + if !self.has_new_blocks(chain_head) { + tracing::debug!("the parent has yet to produce a new block"); + return Ok(()); + } + + if self.exceed_cache_size_limit().await { + tracing::debug!("exceeded cache size limit"); + return Ok(()); + } + + self.poll_next().await?; + + Ok(()) + } +} + +impl LotusParentSyncer +where + T: ParentFinalityStateQuery + Send + Sync + 'static, + P: ParentQueryProxy + Send + Sync + 'static, +{ + async fn exceed_cache_size_limit(&self) -> bool { + let max_cache_blocks = self.config.max_cache_blocks(); + atomically(|| self.provider.cached_blocks()).await > max_cache_blocks + } + + /// Poll the next block height. Returns finalized and executed block data. + async fn poll_next(&mut self) -> Result<(), Error> { + let height = self.sync_pointers.head() + 1; + let parent_block_hash = self.non_null_parent_hash().await; + + tracing::debug!( + height, + parent_block_hash = hex::encode(&parent_block_hash), + "polling height with parent hash" + ); + + let block_hash_res = match self.parent_proxy.get_block_hash(height).await { + Ok(res) => res, + Err(e) => { + let err = e.to_string(); + if is_null_round_str(&err) { + tracing::debug!(height, "detected null round at height"); + + self.sync_pointers.advance_head(); + + return Ok(()); + } + return Err(Error::CannotQueryParent(err, height)); + } + }; + + if block_hash_res.parent_block_hash != parent_block_hash { + tracing::warn!( + height, + parent_hash = hex::encode(&block_hash_res.parent_block_hash), + previous_hash = hex::encode(&parent_block_hash), + "parent block hash diff than previous hash", + ); + return Err(Error::ParentChainReorgDetected); + } + + if let Some((to_confirm_height, to_confirm_hash)) = self.sync_pointers.tail() { + tracing::debug!( + height, + confirm = to_confirm_height, + "non-null round at height, confirmed previous height" + ); + + let data = self.fetch_data(to_confirm_height, to_confirm_hash).await?; + atomically_or_err::<_, Error, _>(|| { + // we only push the null block in cache when we confirmed a block so that in cache + // the latest height is always a confirmed non null block. + let latest_height = self + .provider + .latest_height()? + .expect("provider contains data at this point"); + for h in (latest_height + 1)..to_confirm_height { + self.provider.new_parent_view(h, None)?; + tracing::debug!(height = h, "found null block pushed to cache"); + } + self.provider + .new_parent_view(to_confirm_height, Some(data.clone()))?; + tracing::debug!(height = to_confirm_height, "non-null block pushed to cache"); + Ok(()) + }) + .await?; + } else { + tracing::debug!(height, "non-null round at height, waiting for confirmation"); + }; + + self.sync_pointers + .set_tail(height, block_hash_res.block_hash); + self.sync_pointers.advance_head(); + + Ok(()) + } + + async fn fetch_data( + &self, + height: BlockHeight, + block_hash: BlockHash, + ) -> Result { + let changes_res = self + .parent_proxy + .get_validator_changes(height) + .await + .map_err(|e| Error::CannotQueryParent(e.to_string(), height))?; + if changes_res.block_hash != block_hash { + tracing::warn!( + height, + change_set_hash = hex::encode(&changes_res.block_hash), + block_hash = hex::encode(&block_hash), + "change set block hash does not equal block hash", + ); + return Err(Error::ParentChainReorgDetected); + } + + let topdown_msgs_res = self + .parent_proxy + .get_top_down_msgs(height) + .await + .map_err(|e| Error::CannotQueryParent(e.to_string(), height))?; + if topdown_msgs_res.block_hash != block_hash { + tracing::warn!( + height, + topdown_msgs_hash = hex::encode(&topdown_msgs_res.block_hash), + block_hash = hex::encode(&block_hash), + "topdown messages block hash does not equal block hash", + ); + return Err(Error::ParentChainReorgDetected); + } + + Ok((block_hash, changes_res.value, topdown_msgs_res.value)) + } + + /// We only want the non-null parent block's hash + async fn non_null_parent_hash(&self) -> BlockHash { + if let Some((height, hash)) = self.sync_pointers.tail() { + tracing::debug!( + pending_height = height, + "previous non null parent is the pending confirmation block" + ); + return hash; + }; + + atomically(|| { + Ok(if let Some(h) = self.provider.latest_height_in_cache()? { + tracing::debug!( + previous_confirmed_height = h, + "found previous non null block in cache" + ); + // safe to unwrap as we have height recorded + self.provider.block_hash(h)?.unwrap() + } else if let Some(p) = self.provider.last_committed_finality()? { + tracing::debug!( + previous_confirmed_height = p.height, + "no cache, found previous non null block as last committed finality" + ); + p.block_hash + } else { + unreachable!("guaranteed to non null block hash, report bug please") + }) + }) + .await + } + + fn has_new_blocks(&self, height: BlockHeight) -> bool { + self.sync_pointers.head() < height + } + + fn detected_reorg_by_height(&self, height: BlockHeight) -> bool { + // If the below is true, we are going backwards in terms of block height, the latest block + // height is lower than our previously fetched head. It could be a chain reorg. + self.sync_pointers.head() > height + } + + async fn finalized_chain_head(&self) -> anyhow::Result> { + let parent_chain_head_height = self.parent_proxy.get_chain_head_height().await?; + // sanity check + if parent_chain_head_height < self.config.chain_head_delay { + tracing::debug!("latest height not more than the chain head delay"); + return Ok(None); + } + + // we consider the chain head finalized only after the `chain_head_delay` + Ok(Some( + parent_chain_head_height - self.config.chain_head_delay, + )) + } + + /// Reset the cache in the face of a reorg + async fn reset_cache(&self) -> anyhow::Result<()> { + let finality = query_starting_finality(&self.query, &self.parent_proxy).await?; + atomically(|| self.provider.reset(finality.clone())).await; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::proxy::ParentQueryProxy; + use crate::sync::syncer::LotusParentSyncer; + use crate::sync::ParentFinalityStateQuery; + use crate::{ + BlockHash, BlockHeight, CachedFinalityProvider, Config, IPCParentFinality, + SequentialKeyCache, Toggle, NULL_ROUND_ERR_MSG, + }; + use anyhow::anyhow; + use async_stm::atomically; + use async_trait::async_trait; + use ipc_provider::manager::{GetBlockHashResult, TopDownQueryPayload}; + use ipc_sdk::cross::CrossMsg; + use ipc_sdk::staking::StakingChangeRequest; + use std::sync::Arc; + + struct TestParentFinalityStateQuery { + latest_finality: IPCParentFinality, + } + + impl ParentFinalityStateQuery for TestParentFinalityStateQuery { + fn get_latest_committed_finality(&self) -> anyhow::Result> { + Ok(Some(self.latest_finality.clone())) + } + } + + struct TestParentProxy { + blocks: SequentialKeyCache>, + } + + #[async_trait] + impl ParentQueryProxy for TestParentProxy { + async fn get_chain_head_height(&self) -> anyhow::Result { + Ok(self.blocks.upper_bound().unwrap()) + } + + async fn get_genesis_epoch(&self) -> anyhow::Result { + Ok(self.blocks.lower_bound().unwrap() - 1) + } + + async fn get_block_hash(&self, height: BlockHeight) -> anyhow::Result { + let r = self.blocks.get_value(height).unwrap(); + if r.is_none() { + return Err(anyhow!(NULL_ROUND_ERR_MSG)); + } + + for h in (self.blocks.lower_bound().unwrap()..height).rev() { + let v = self.blocks.get_value(h).unwrap(); + if v.is_none() { + continue; + } + return Ok(GetBlockHashResult { + parent_block_hash: v.clone().unwrap(), + block_hash: r.clone().unwrap(), + }); + } + panic!("invalid testing data") + } + + async fn get_top_down_msgs( + &self, + height: BlockHeight, + ) -> anyhow::Result>> { + Ok(TopDownQueryPayload { + value: vec![], + block_hash: self.blocks.get_value(height).cloned().unwrap().unwrap(), + }) + } + + async fn get_validator_changes( + &self, + height: BlockHeight, + ) -> anyhow::Result>> { + Ok(TopDownQueryPayload { + value: vec![], + block_hash: self.blocks.get_value(height).cloned().unwrap().unwrap(), + }) + } + } + + async fn new_syncer( + blocks: SequentialKeyCache>, + ) -> LotusParentSyncer { + let config = Config { + chain_head_delay: 2, + polling_interval: Default::default(), + exponential_back_off: Default::default(), + exponential_retry_limit: 0, + max_proposal_range: Some(1), + max_cache_blocks: None, + proposal_delay: None, + }; + let genesis_epoch = blocks.lower_bound().unwrap(); + let proxy = Arc::new(TestParentProxy { blocks }); + let committed_finality = IPCParentFinality { + height: genesis_epoch, + block_hash: vec![0; 32], + }; + + let provider = CachedFinalityProvider::new( + config.clone(), + genesis_epoch, + Some(committed_finality.clone()), + proxy.clone(), + ); + LotusParentSyncer::new( + config, + proxy, + Arc::new(Toggle::enabled(provider)), + Arc::new(TestParentFinalityStateQuery { + latest_finality: committed_finality, + }), + ) + .await + .unwrap() + } + + /// Creates a mock of a new parent blockchain view. The key is the height and the value is the + /// block hash. If block hash is None, it means the current height is a null block. + macro_rules! new_parent_blocks { + ($($key:expr => $val:expr),* ,) => ( + hash_map!($($key => $val),*) + ); + ($($key:expr => $val:expr),*) => ({ + let mut map = SequentialKeyCache::sequential(); + $( map.append($key, $val).unwrap(); )* + map + }); + } + + #[tokio::test] + async fn happy_path() { + let parent_blocks = new_parent_blocks!( + 100 => Some(vec![0; 32]), // genesis block + 101 => Some(vec![1; 32]), + 102 => Some(vec![2; 32]), + 103 => Some(vec![3; 32]), + 104 => Some(vec![4; 32]), + 105 => Some(vec![5; 32]) // chain head + ); + + let mut syncer = new_syncer(parent_blocks).await; + + assert_eq!(syncer.sync_pointers.head(), 100); + assert_eq!(syncer.sync_pointers.tail(), None); + + // sync block 101, which is a non-null block + let r = syncer.sync().await; + assert!(r.is_ok()); + assert_eq!(syncer.sync_pointers.head(), 101); + assert_eq!(syncer.sync_pointers.tail(), Some((101, vec![1; 32]))); + // latest height is None as we are yet to confirm block 101, so latest height should equal + // to the last committed finality initialized, which is the genesis block 100 + assert_eq!( + atomically(|| syncer.provider.latest_height()).await, + Some(100) + ); + + // sync block 101, which is a non-null block + let r = syncer.sync().await; + assert!(r.is_ok()); + assert_eq!(syncer.sync_pointers.head(), 102); + assert_eq!(syncer.sync_pointers.tail(), Some((102, vec![2; 32]))); + assert_eq!( + atomically(|| syncer.provider.latest_height()).await, + Some(101) + ); + } + + #[tokio::test] + async fn with_non_null_block() { + let parent_blocks = new_parent_blocks!( + 100 => Some(vec![0; 32]), // genesis block + 101 => None, + 102 => None, + 103 => None, + 104 => Some(vec![4; 32]), + 105 => None, + 106 => None, + 107 => None, + 108 => Some(vec![5; 32]), + 109 => None, + 110 => None, + 111 => None + ); + + let mut syncer = new_syncer(parent_blocks).await; + + assert_eq!(syncer.sync_pointers.head(), 100); + assert_eq!(syncer.sync_pointers.tail(), None); + + // sync block 101 to 103, which are null blocks + for h in 101..=103 { + let r = syncer.sync().await; + assert!(r.is_ok()); + assert_eq!(syncer.sync_pointers.head(), h); + assert_eq!(syncer.sync_pointers.tail(), None); + } + + // sync block 104, which is a non-null block + syncer.sync().await.unwrap(); + assert_eq!(syncer.sync_pointers.head(), 104); + assert_eq!(syncer.sync_pointers.tail(), Some((104, vec![4; 32]))); + // latest height is None as we are yet to confirm block 104, so latest height should equal + // to the last committed finality initialized, which is the genesis block 100 + assert_eq!( + atomically(|| syncer.provider.latest_height()).await, + Some(100) + ); + + // sync block 105 to 107, which are null blocks + for h in 105..=107 { + let r = syncer.sync().await; + assert!(r.is_ok()); + assert_eq!(syncer.sync_pointers.head(), h); + assert_eq!(syncer.sync_pointers.tail(), Some((104, vec![4; 32]))); + } + + // sync block 108, which is a non-null block + syncer.sync().await.unwrap(); + assert_eq!(syncer.sync_pointers.head(), 108); + assert_eq!(syncer.sync_pointers.tail(), Some((108, vec![5; 32]))); + // latest height is None as we are yet to confirm block 108, so latest height should equal + // to the previous confirmed block, which is 104 + assert_eq!( + atomically(|| syncer.provider.latest_height()).await, + Some(104) + ); + } +} diff --git a/fendermint/vm/topdown/src/sync/tendermint.rs b/fendermint/vm/topdown/src/sync/tendermint.rs new file mode 100644 index 00000000..9101a42a --- /dev/null +++ b/fendermint/vm/topdown/src/sync/tendermint.rs @@ -0,0 +1,45 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT +//! The tendermint aware syncer + +use crate::proxy::ParentQueryProxy; +use crate::sync::syncer::LotusParentSyncer; +use crate::sync::ParentFinalityStateQuery; +use anyhow::Context; + +/// Tendermint aware syncer +pub(crate) struct TendermintAwareSyncer { + inner: LotusParentSyncer, + tendermint_client: C, +} + +impl TendermintAwareSyncer +where + T: ParentFinalityStateQuery + Send + Sync + 'static, + C: tendermint_rpc::Client + Send + Sync + 'static, + P: ParentQueryProxy + Send + Sync + 'static, +{ + pub fn new(inner: LotusParentSyncer, tendermint_client: C) -> Self { + Self { + inner, + tendermint_client, + } + } + + pub async fn sync(&mut self) -> anyhow::Result<()> { + if self.is_syncing_peer().await? { + tracing::debug!("syncing with peer, skip parent finality syncing this round"); + return Ok(()); + } + self.inner.sync().await + } + + async fn is_syncing_peer(&self) -> anyhow::Result { + let status: tendermint_rpc::endpoint::status::Response = self + .tendermint_client + .status() + .await + .context("failed to get Tendermint status")?; + Ok(status.sync_info.catching_up) + } +} diff --git a/fendermint/vm/topdown/src/toggle.rs b/fendermint/vm/topdown/src/toggle.rs index 95c94429..b6650266 100644 --- a/fendermint/vm/topdown/src/toggle.rs +++ b/fendermint/vm/topdown/src/toggle.rs @@ -61,35 +61,13 @@ impl ParentViewProvider for Toggl } } - async fn validator_changes( - &self, - height: BlockHeight, - ) -> anyhow::Result> { - match self.inner.as_ref() { - Some(p) => p.validator_changes(height).await, - None => Err(anyhow!("provider is toggled off")), - } - } - - async fn top_down_msgs( - &self, - height: BlockHeight, - block_hash: &BlockHash, - ) -> anyhow::Result> { - match self.inner.as_ref() { - Some(p) => p.top_down_msgs(height, block_hash).await, - None => Err(anyhow!("provider is toggled off")), - } - } - async fn top_down_msgs_from( &self, from: BlockHeight, to: BlockHeight, - block_hash: &BlockHash, ) -> anyhow::Result> { match self.inner.as_ref() { - Some(p) => p.top_down_msgs_from(from, to, block_hash).await, + Some(p) => p.top_down_msgs_from(from, to).await, None => Err(anyhow!("provider is toggled off")), } } @@ -118,12 +96,12 @@ impl

Toggle> { self.perform_or_else(|p| p.block_hash(height), None) } - pub fn latest_height(&self) -> Stm> { - self.perform_or_else(|p| p.latest_height(), None) + pub fn latest_height_in_cache(&self) -> Stm> { + self.perform_or_else(|p| p.latest_height_in_cache(), None) } - pub fn first_non_null_parent_hash(&self, height: BlockHeight) -> Stm> { - self.perform_or_else(|p| p.first_non_null_parent_hash(height), None) + pub fn latest_height(&self) -> Stm> { + self.perform_or_else(|p| p.latest_height(), None) } pub fn last_committed_finality(&self) -> Stm> { @@ -141,4 +119,8 @@ impl

Toggle> { pub fn reset(&self, finality: IPCParentFinality) -> Stm<()> { self.perform_or_else(|p| p.reset(finality), ()) } + + pub fn cached_blocks(&self) -> Stm { + self.perform_or_else(|p| p.cached_blocks(), BlockHeight::MAX) + } } diff --git a/infra/Makefile.toml b/infra/Makefile.toml index 5e2c6f5c..4f8ab609 100644 --- a/infra/Makefile.toml +++ b/infra/Makefile.toml @@ -37,6 +37,8 @@ PARENT_GATEWAY = { value = "0x56948d2CFaa2EF355B8C08Ac925202db212146D1", conditi PARENT_REGISTRY = { value = "0x6A4884D2B6A597792dC68014D4B7C117cca5668e", condition = { env_not_set = ["PARENT_REGISTRY"] } } FM_NETWORK = { value = "test", condition = { env_not_set = ["FM_NETWORK"] } } TOPDOWN_CHAIN_HEAD_DELAY = { value = "10", condition = { env_not_set = ["TOPDOWN_CHAIN_HEAD_DELAY"] } } +TOPDOWN_PROPOSAL_DELAY = { value = "2", condition = { env_not_set = ["TOPDOWN_PROPOSAL_DELAY"] } } +TOPDOWN_MAX_PROPOSAL_RANGE = { value = "100", condition = { env_not_set = ["TOPDOWN_MAX_PROPOSAL_RANGE"] } } # Comma-separated list of bootstrap nodes to be used by the CometBFT node. BOOTSTRAPS = { value = "", condition = { env_not_set = ["BOOTSTRAPS"] } } PRIVATE_KEY_PATH = { value = "", condition = { env_not_set = ["PRIVATE_KEY_PATH"] } } diff --git a/infra/scripts/fendermint.toml b/infra/scripts/fendermint.toml index 6d1a89c5..0910c5f0 100644 --- a/infra/scripts/fendermint.toml +++ b/infra/scripts/fendermint.toml @@ -57,6 +57,8 @@ docker run \ --env FM_IPC__TOPDOWN__EXPONENTIAL_BACK_OFF=5 \ --env FM_IPC__TOPDOWN__EXPONENTIAL_RETRY_LIMIT=5 \ --env FM_IPC__TOPDOWN__POLLING_INTERVAL=10 \ + --env FM_IPC__TOPDOWN__PROPOSAL_DELAY=${TOPDOWN_PROPOSAL_DELAY} \ + --env FM_IPC__TOPDOWN__MAX_PROPOSAL_RANGE=${TOPDOWN_MAX_PROPOSAL_RANGE} \ --env FM_ABCI__LISTEN__HOST=0.0.0.0 \ --env FM_ETH__LISTEN__HOST=0.0.0.0 \ --env FM_TENDERMINT_RPC_URL=http://${CMT_CONTAINER_NAME}:26657 \