diff --git a/fendermint/app/src/app.rs b/fendermint/app/src/app.rs index af1b2f2f..82bc463e 100644 --- a/fendermint/app/src/app.rs +++ b/fendermint/app/src/app.rs @@ -887,7 +887,14 @@ where tracing::info!(?manifest, "received snapshot offer"); // We can look at the version but currently there's only one. match atomically_or_err(|| client.offer_snapshot(manifest.clone())).await { - Ok(()) => { + Ok(path) => { + tracing::info!( + download_dir = path.to_string_lossy().to_string(), + height = manifest.block_height, + size = manifest.size, + chunks = manifest.chunks, + "downloading snapshot" + ); return Ok(response::OfferSnapshot::Accept); } Err(SnapshotError::IncompatibleVersion(version)) => { @@ -919,13 +926,47 @@ where if let Some(ref client) = self.snapshots { match atomically_or_err(|| { - client.apply_chunk(request.index, request.chunk.clone().into()) + client.save_chunk(request.index, request.chunk.clone().into()) }) .await { - Ok(completed) => { - if completed { - tracing::info!("received all snapshot chunks"); + Ok(snapshot) => { + if let Some(snapshot) = snapshot { + tracing::info!( + download_dir = snapshot.snapshot_dir.to_string_lossy().to_string(), + height = snapshot.manifest.block_height, + "received all snapshot chunks", + ); + + // Ideally we would import into some isolated store then validate, + // but for now let's trust that all is well. + if let Err(e) = snapshot.import(self.state_store_clone(), true).await { + tracing::error!(error =? e, "failed to import snapshot"); + return Ok(response::ApplySnapshotChunk { + result: response::ApplySnapshotChunkResult::RejectSnapshot, + ..default + }); + } + + tracing::info!( + height = snapshot.manifest.block_height, + "imported snapshot" + ); + + // Now insert the new state into the history. + let mut state = self.committed_state()?; + state.block_height = snapshot.manifest.block_height; + state.state_params = snapshot.manifest.state_params; + self.set_committed_state(state)?; + + // TODO: We can remove the `current_download` from the STM + // state here which would cause it to get dropped from /tmp, + // but for now let's keep it just in case we need to investigate + // some problem. + + // We could also move the files into our own snapshot directory + // so that we can offer it to others, but again let's hold on + // until we have done more robust validation. } return Ok(response::ApplySnapshotChunk { result: response::ApplySnapshotChunkResult::Accept, diff --git a/fendermint/vm/interpreter/src/fvm/state/snapshot.rs b/fendermint/vm/interpreter/src/fvm/state/snapshot.rs index f7bcce0c..6507c35b 100644 --- a/fendermint/vm/interpreter/src/fvm/state/snapshot.rs +++ b/fendermint/vm/interpreter/src/fvm/state/snapshot.rs @@ -9,7 +9,7 @@ use cid::Cid; use futures_core::Stream; use fvm::state_tree::StateTree; use fvm_ipld_blockstore::Blockstore; -use fvm_ipld_car::{load_car_unchecked, CarHeader}; +use fvm_ipld_car::{load_car, load_car_unchecked, CarHeader}; use fvm_ipld_encoding::{from_slice, CborStore, DAG_CBOR}; use libipld::Ipld; use serde::{Deserialize, Serialize}; @@ -61,10 +61,19 @@ where } /// Read the snapshot from file and load all the data into the store - pub async fn read_car(path: impl AsRef, store: BS) -> anyhow::Result { + pub async fn read_car( + path: impl AsRef, + store: BS, + validate: bool, + ) -> anyhow::Result { let file = tokio::fs::File::open(path).await?; - let roots = load_car_unchecked(&store, file.compat()).await?; + let roots = if validate { + load_car(&store, file.compat()).await? + } else { + load_car_unchecked(&store, file.compat()).await? + }; + if roots.len() != 1 { return Err(anyhow!("invalid snapshot, should have 1 root cid")); } @@ -193,6 +202,14 @@ where Ok((root_cid, streamer)) } + + pub fn block_height(&self) -> BlockHeight { + self.block_height + } + + pub fn state_params(&self) -> &FvmStateParams { + &self.state_params + } } #[pin_project::pin_project] @@ -369,7 +386,7 @@ mod tests { assert!(r.is_ok()); let new_store = MemoryBlockstore::new(); - let Snapshot::V1(loaded_snapshot) = Snapshot::read_car(tmp_file.path(), new_store) + let Snapshot::V1(loaded_snapshot) = Snapshot::read_car(tmp_file.path(), new_store, true) .await .unwrap(); diff --git a/fendermint/vm/snapshot/src/client.rs b/fendermint/vm/snapshot/src/client.rs index 5f819ac0..2d75093f 100644 --- a/fendermint/vm/snapshot/src/client.rs +++ b/fendermint/vm/snapshot/src/client.rs @@ -1,7 +1,7 @@ // Copyright 2022-2023 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT -use std::{sync::Arc, time::SystemTime}; +use std::{path::PathBuf, sync::Arc, time::SystemTime}; use async_stm::{abort, Stm, StmResult, TVar}; use fendermint_vm_interpreter::fvm::state::{ @@ -13,7 +13,7 @@ use tempfile::tempdir; use crate::{ manifest, state::{SnapshotDownload, SnapshotState}, - SnapshotError, SnapshotItem, SnapshotManifest, + SnapshotError, SnapshotItem, SnapshotManifest, MANIFEST_FILE_NAME, }; /// Interface to snapshot state for the application. @@ -74,19 +74,39 @@ impl SnapshotClient { /// If the offered snapshot is accepted, we create a temporary directory to hold the chunks /// and remember it as our current snapshot being downloaded. - pub fn offer_snapshot(&self, manifest: SnapshotManifest) -> StmResult<(), SnapshotError> { + pub fn offer_snapshot(&self, manifest: SnapshotManifest) -> StmResult { if manifest.version != 1 { abort(SnapshotError::IncompatibleVersion(manifest.version)) } else { match tempdir() { Ok(dir) => { + // Create a `parts` sub-directory for the chunks. + if let Err(e) = std::fs::create_dir(dir.path().join("parts")) { + return abort(SnapshotError::from(e)); + }; + + // Save the manifest into the temp directory; + // that way we can always see on the file system what's happening. + let json = match serde_json::to_string_pretty(&manifest) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) + { + Ok(json) => json, + Err(e) => return abort(SnapshotError::from(e)), + }; + if let Err(e) = std::fs::write(dir.path().join(MANIFEST_FILE_NAME), json) { + return abort(SnapshotError::from(e)); + } + + let download_path = dir.path().into(); let download = SnapshotDownload { manifest, download_dir: Arc::new(dir), next_index: TVar::new(0), }; + self.state.current_download.write(Some(download))?; - Ok(()) + + Ok(download_path) } Err(e) => abort(SnapshotError::from(e))?, } @@ -95,8 +115,15 @@ impl SnapshotClient { /// Take a chunk sent to us by a remote peer. This is our chance to validate chunks on the fly. /// - /// Return a flag indicating whether all the chunks have been received and loaded to the blockstore. - pub fn apply_chunk(&self, index: u32, contents: Vec) -> StmResult { + /// Returns `None` while there are more chunks to download and `Some` when all + /// the chunks have been received and basic file integrity validated. + /// + /// Then we can import the snapshot into the blockstore separately. + pub fn save_chunk( + &self, + index: u32, + contents: Vec, + ) -> StmResult, SnapshotError> { if let Some(cd) = self.state.current_download.read()?.as_ref() { let next_index = cd.next_index.read_clone()?; if index != next_index { @@ -106,6 +133,7 @@ impl SnapshotClient { .download_dir .as_ref() .path() + .join("parts") .join(format!("{}.part", index)); // We are doing IO inside the STM transaction, but that's okay because there is no contention on the download. @@ -119,8 +147,11 @@ impl SnapshotClient { match manifest::parts_checksum(cd.download_dir.as_ref()) { Ok(checksum) => { if checksum == cd.manifest.checksum { - // TODO: Import Snapshot. - Ok(true) + let item = SnapshotItem::new( + cd.download_dir.path().into(), + cd.manifest.clone(), + ); + Ok(Some(item)) } else { abort(SnapshotError::WrongChecksum( cd.manifest.checksum, @@ -134,7 +165,7 @@ impl SnapshotClient { ))), } } else { - Ok(false) + Ok(None) } } Err(e) => { diff --git a/fendermint/vm/snapshot/src/lib.rs b/fendermint/vm/snapshot/src/lib.rs index 8fe50985..cd9e6b5c 100644 --- a/fendermint/vm/snapshot/src/lib.rs +++ b/fendermint/vm/snapshot/src/lib.rs @@ -7,6 +7,15 @@ mod manager; mod manifest; mod state; +/// The file name to export the CAR to. +const SNAPSHOT_FILE_NAME: &str = "snapshot.car"; + +/// The file name in snapshot directories that contains the manifest. +const MANIFEST_FILE_NAME: &str = "manifest.json"; + +/// Name of the subdirectory where `{idx}.part` files are stored within a snapshot. +const PARTS_DIR_NAME: &str = "parts"; + pub use client::SnapshotClient; pub use error::SnapshotError; pub use manager::SnapshotManager; diff --git a/fendermint/vm/snapshot/src/manager.rs b/fendermint/vm/snapshot/src/manager.rs index 1135bb7b..bb102922 100644 --- a/fendermint/vm/snapshot/src/manager.rs +++ b/fendermint/vm/snapshot/src/manager.rs @@ -6,7 +6,7 @@ use std::time::Duration; use crate::manifest::{file_checksum, list_manifests, write_manifest, SnapshotManifest}; use crate::state::SnapshotState; -use crate::{car, SnapshotClient, SnapshotItem}; +use crate::{car, SnapshotClient, SnapshotItem, PARTS_DIR_NAME, SNAPSHOT_FILE_NAME}; use anyhow::Context; use async_stm::{atomically, retry, TVar}; use fendermint_vm_interpreter::fvm::state::snapshot::{BlockHeight, Snapshot}; @@ -14,9 +14,6 @@ use fendermint_vm_interpreter::fvm::state::FvmStateParams; use fvm_ipld_blockstore::Blockstore; use tendermint_rpc::Client; -/// The file name to export the CAR to. -const SNAPSHOT_FILE_NAME: &str = "snapshot.car"; - /// Create snapshots at regular block intervals. pub struct SnapshotManager { /// Blockstore @@ -199,8 +196,8 @@ where .context("failed to create temp dir for snapshot")?; let snapshot_path = temp_dir.path().join(SNAPSHOT_FILE_NAME); - let checksum_path = temp_dir.path().join("parts.sha256"); - let parts_path = temp_dir.path().join("parts"); + let checksum_path = temp_dir.path().join(format!("{PARTS_DIR_NAME}.sha256")); + let parts_path = temp_dir.path().join(PARTS_DIR_NAME); // TODO: See if we can reuse the contents of an existing CAR file. @@ -304,7 +301,7 @@ mod tests { use fvm::engine::MultiEngine; use quickcheck::Arbitrary; - use crate::manifest; + use crate::{manifest, PARTS_DIR_NAME}; use super::SnapshotManager; @@ -388,8 +385,9 @@ mod tests { assert_eq!(snapshots.len(), 1, "can list manifests"); assert_eq!(snapshots[0], snapshot); - let checksum = manifest::parts_checksum(snapshot.snapshot_dir.as_path().join("parts")) - .expect("parts checksum can be calculated"); + let checksum = + manifest::parts_checksum(snapshot.snapshot_dir.as_path().join(PARTS_DIR_NAME)) + .expect("parts checksum can be calculated"); assert_eq!( checksum, snapshot.manifest.checksum, diff --git a/fendermint/vm/snapshot/src/manifest.rs b/fendermint/vm/snapshot/src/manifest.rs index 6d8bf722..6427f98c 100644 --- a/fendermint/vm/snapshot/src/manifest.rs +++ b/fendermint/vm/snapshot/src/manifest.rs @@ -11,10 +11,7 @@ use fendermint_vm_interpreter::fvm::state::{ use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; -use crate::SnapshotItem; - -/// The file name in snapshot directories that contains the manifest. -const MANIFEST_FILE_NAME: &str = "manifest.json"; +use crate::{SnapshotItem, MANIFEST_FILE_NAME}; #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] pub struct SnapshotManifest { @@ -115,6 +112,19 @@ pub fn file_checksum(path: impl AsRef) -> anyhow::Result pub fn parts_checksum(path: impl AsRef) -> anyhow::Result { let mut hasher = Sha256::new(); + let chunks = list_parts(path)?; + + for path in chunks { + let mut file = std::fs::File::open(path).context("failed to open part")?; + let _ = std::io::copy(&mut file, &mut hasher)?; + } + + let hash = hasher.finalize().into(); + Ok(tendermint::Hash::Sha256(hash)) +} + +/// List all the `{idx}.part` files in a directory. +pub fn list_parts(path: impl AsRef) -> anyhow::Result> { let mut chunks = std::fs::read_dir(path.as_ref()) .unwrap() .collect::, _>>() @@ -142,13 +152,7 @@ pub fn parts_checksum(path: impl AsRef) -> anyhow::Result(&self, store: BS, validate: bool) -> anyhow::Result> + where + BS: Blockstore + Send + 'static, + { + let parts = manifest::list_parts(self.snapshot_dir.join(PARTS_DIR_NAME)) + .context("failed to list snapshot parts")?; + + // 1. Restore the snapshots into a complete `snapshot.car` file. + let car_path = self.snapshot_dir.join(SNAPSHOT_FILE_NAME); + let mut car_file = File::create(&car_path).context("failed to create CAR file")?; + + for part in parts { + let mut part_file = File::open(&part).with_context(|| { + format!("failed to open snapshot part {}", part.to_string_lossy()) + })?; + + io::copy(&mut part_file, &mut car_file)?; + } + + // 2. Import the contents. + let result = Snapshot::read_car(&car_path, store, validate).await; + + // 3. Remove the restored file. + std::fs::remove_file(&car_path).context("failed to remove CAR file")?; + + // If the import failed, or it fails to validate, it will leave unwanted data in the blockstore. + // + // We could do the import into a namespace which is separate from the state store, and move the data + // if everything we see what successful, but it would need more database API exposed that we don't + // currently have access to. At the moment our best bet to remove the data is to implement garbage + // collection - if the CIDs are unreachable through state roots, they will be removed. + // + // Another thing worth noting is that the `Snapshot` imports synthetic records into the blockstore + // that did not exist in the original: the metadata, an some technical constructs that point at + // the real data and store application state (which is verfied below). It's not easy to get rid + // of these: the `Blockstore` doesn't allow us to delete CIDs, and the `Snapshot` doesn't readily + // expose what the CIDs of the extra records were. Our other option would be to load the data + // into a staging area (see above) and then walk the DAG and only load what is reachable from + // the state root. + // + // Inserting CIDs into the state store which did not exist in the original seem like a vector + // of attack that could be used to cause consensus failure: if the attacker deployed a contract + // that looked up a CID that validators who imported a snapshot have, but others don't, that + // would cause a fork. However, his is why the FVM doesn't currently allow the deployment of + // user defined Wasm actors: the FEVM actors do not allow the lookup of arbitrary CIDs, so they + // are safe, while Wasm actors with direct access to the IPLD SDK methods would be vulnerable. + // Once the FVM implements the "reachability analysis" feature, it won't matter if we have an + // extra record or not. + // + // Actually a very similar situation arises with garbage collection: since the length of history + // is configurable, whether some CIDs are (still) present or not depends on how the validator + // configured their nodes, and cannot be allowed to cause a failure. + let snapshot = result.context("failed to import the snapshot into the blockstore")?; + + // 4. See if we actually imported what we thought we would. + if validate { + match snapshot { + Snapshot::V1(ref snapshot) => { + if snapshot.block_height() != self.manifest.block_height { + bail!( + "invalid snapshot block height; expected {}, imported {}", + self.manifest.block_height, + snapshot.block_height() + ); + } + if *snapshot.state_params() != self.manifest.state_params { + bail!( + "invalid state params; expected {:?}, imported {:?}", + self.manifest.state_params, + snapshot.state_params() + ) + } + } + } + } + + Ok(snapshot) + } } /// An ongoing, incomplete download of a snapshot.