Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

Commit

Permalink
FM-361: Serve snapshot chunks (#431)
Browse files Browse the repository at this point in the history
* FM-361: Serve snapshot chunks

* FM-361: Add SnapshotItem::last_access

* FM-361: Do not purge within last_access + last_access_hold

* FM-361: Mark seeked snapshots as accessed
  • Loading branch information
aakoshh authored Nov 27, 2023
1 parent 0de9778 commit cac5517
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 40 deletions.
24 changes: 24 additions & 0 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,4 +864,28 @@ where
Ok(Default::default())
}
}

/// Used during state sync to retrieve chunks of snapshots from peers.
async fn load_snapshot_chunk(
&self,
request: request::LoadSnapshotChunk,
) -> AbciResult<response::LoadSnapshotChunk> {
if let Some(ref client) = self.snapshots {
if let Some(snapshot) =
atomically(|| client.access_snapshot(request.height.value(), request.format)).await
{
match snapshot.load_chunk(request.chunk as usize) {
Ok(chunk) => {
return Ok(response::LoadSnapshotChunk {
chunk: chunk.into(),
});
}
Err(e) => {
tracing::warn!("failed to load chunk: {e:#}");
}
}
}
}
Ok(Default::default())
}
}
108 changes: 74 additions & 34 deletions fendermint/vm/snapshot/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use std::path::{Path, PathBuf};
use std::time::Duration;
use std::time::{Duration, SystemTime};

use crate::car;
use crate::manifest::{list_manifests, write_manifest, SnapshotItem, SnapshotManifest};
use anyhow::Context;
use async_stm::{atomically, retry, Stm, TVar};
use fendermint_vm_interpreter::fvm::state::snapshot::{BlockHeight, BlockStateParams, Snapshot};
use fendermint_vm_interpreter::fvm::state::snapshot::{
BlockHeight, BlockStateParams, Snapshot, SnapshotVersion,
};
use fendermint_vm_interpreter::fvm::state::FvmStateParams;
use fvm_ipld_blockstore::Blockstore;
use sha2::{Digest, Sha256};
Expand All @@ -31,14 +33,14 @@ struct SnapshotState {
pub struct SnapshotClient {
/// The client will only notify the manager of snapshottable heights.
snapshot_interval: BlockHeight,
snapshot_state: SnapshotState,
state: SnapshotState,
}

impl SnapshotClient {
/// Set the latest block state parameters and notify the manager.
pub fn on_commit(&self, block_height: BlockHeight, params: FvmStateParams) -> Stm<()> {
if block_height % self.snapshot_interval == 0 {
self.snapshot_state
self.state
.latest_params
.write(Some((params, block_height)))?;
}
Expand All @@ -47,7 +49,30 @@ impl SnapshotClient {

/// List completed snapshots.
pub fn list_snapshots(&self) -> Stm<im::Vector<SnapshotItem>> {
self.snapshot_state.snapshots.read_clone()
self.state.snapshots.read_clone()
}

/// Try to find a snapshot, if it still exists.
///
/// If found, mark it as accessed, so that it doesn't get purged while likely to be requested or read from disk.
pub fn access_snapshot(
&self,
block_height: BlockHeight,
version: SnapshotVersion,
) -> Stm<Option<SnapshotItem>> {
let mut snapshots = self.state.snapshots.read_clone()?;
let mut snapshot = None;
for s in snapshots.iter_mut() {
if s.manifest.block_height == block_height && s.manifest.version == version {
s.last_access = SystemTime::now();
snapshot = Some(s.clone());
break;
}
}
if snapshot.is_some() {
self.state.snapshots.write(snapshots)?;
}
Ok(snapshot)
}
}

Expand All @@ -58,15 +83,18 @@ pub struct SnapshotManager<BS> {
/// Location to store completed snapshots.
snapshot_dir: PathBuf,
/// Target size in bytes for snapshot chunks.
snapshot_chunk_size: usize,
chunk_size: usize,
/// Number of snapshots to keep.
///
/// 0 means unlimited.
snapshot_history_size: usize,
/// Shared state of snapshots.
snapshot_state: SnapshotState,
history_size: usize,
/// Time to hold on from purging a snapshot after a remote client
/// asked for a chunk from it.
last_access_hold: Duration,
/// How often to check CometBFT whether it has finished syncing.
sync_poll_interval: Duration,
/// Shared state of snapshots.
state: SnapshotState,
/// Indicate whether CometBFT has finished syncing with the chain,
/// so that we can skip snapshotting old states while catching up.
is_syncing: TVar<bool>,
Expand All @@ -81,13 +109,14 @@ where
store: BS,
snapshot_interval: BlockHeight,
snapshot_dir: PathBuf,
snapshot_chunk_size: usize,
snapshot_history_size: usize,
chunk_size: usize,
history_size: usize,
last_access_hold: Duration,
sync_poll_interval: Duration,
) -> anyhow::Result<(Self, SnapshotClient)> {
let snapshot_items = list_manifests(&snapshot_dir).context("failed to list manifests")?;

let snapshot_state = SnapshotState {
let state = SnapshotState {
// Start with nothing to snapshot until we are notified about a new height.
// We could also look back to find the latest height we should have snapshotted.
latest_params: TVar::new(None),
Expand All @@ -97,17 +126,18 @@ where
let manager = Self {
store,
snapshot_dir,
snapshot_chunk_size,
snapshot_history_size,
snapshot_state: snapshot_state.clone(),
chunk_size,
history_size,
last_access_hold,
sync_poll_interval,
state: state.clone(),
// Assume we are syncing until we can determine otherwise.
is_syncing: TVar::new(true),
};

let client = SnapshotClient {
snapshot_interval,
snapshot_state,
state,
};

Ok((manager, client))
Expand Down Expand Up @@ -143,7 +173,7 @@ where
retry()?;
}

match self.snapshot_state.latest_params.read()?.as_ref() {
match self.state.latest_params.read()?.as_ref() {
None => retry()?,
unchanged if *unchanged == last_params => retry()?,
Some(new_params) => Ok(new_params.clone()),
Expand All @@ -165,7 +195,7 @@ where
);
// Add the snapshot to the in-memory records.
atomically(|| {
self.snapshot_state
self.state
.snapshots
.modify_mut(|items| items.push_back(item.clone()))
})
Expand All @@ -185,14 +215,22 @@ where

/// Remove snapshot directories if we have more than the desired history size.
async fn prune_history(&self) {
if self.snapshot_history_size == 0 {
if self.history_size == 0 {
return;
}

let removables = atomically(|| {
self.snapshot_state.snapshots.modify_mut(|snapshots| {
self.state.snapshots.modify_mut(|snapshots| {
let mut removables = Vec::new();
while snapshots.len() > self.snapshot_history_size {
while snapshots.len() > self.history_size {
// Stop at the first snapshot that was accessed recently.
if let Some(last_access) =
snapshots.head().and_then(|s| s.last_access.elapsed().ok())
{
if last_access <= self.last_access_hold {
break;
}
}
if let Some(snapshot) = snapshots.pop_front() {
removables.push(snapshot);
} else {
Expand Down Expand Up @@ -260,12 +298,9 @@ where
// They can be listed in the right order with e.g. `ls | sort -n`
// Alternatively we could pad them with zeroes based on the original file size and the chunk size,
// but this way it will be easier to return them based on a numeric index.
let chunks_count = car::split(
&snapshot_path,
&parts_path,
self.snapshot_chunk_size,
|idx| format!("{idx}.part"),
)
let chunks_count = car::split(&snapshot_path, &parts_path, self.chunk_size, |idx| {
format!("{idx}.part")
})
.await
.context("failed to split CAR into chunks")?;

Expand All @@ -288,10 +323,7 @@ where
std::fs::remove_file(snapshot_dir.join(SNAPSHOT_FILE_NAME))
.context("failed to remove CAR file")?;

Ok(SnapshotItem {
snapshot_dir,
manifest,
})
Ok(SnapshotItem::new(snapshot_dir, manifest))
}
}

Expand Down Expand Up @@ -400,6 +432,7 @@ mod tests {
temp_dir.path().into(),
10000,
1,
Duration::ZERO,
never_poll_sync,
)
.expect("failed to create snapshot manager");
Expand Down Expand Up @@ -449,9 +482,16 @@ mod tests {
assert_eq!(snapshots[0], snapshot);

// Create a new manager instance
let (_, new_client) =
SnapshotManager::new(store, 1, temp_dir.path().into(), 10000, 1, never_poll_sync)
.expect("failed to create snapshot manager");
let (_, new_client) = SnapshotManager::new(
store,
1,
temp_dir.path().into(),
10000,
1,
Duration::ZERO,
never_poll_sync,
)
.expect("failed to create snapshot manager");

let snapshots = atomically(|| new_client.list_snapshots()).await;
assert!(!snapshots.is_empty(), "loads manifests on start");
Expand Down
43 changes: 37 additions & 6 deletions fendermint/vm/snapshot/src/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
// Copyright 2022-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use std::path::{Path, PathBuf};
use std::{
path::{Path, PathBuf},
time::SystemTime,
};

use anyhow::Context;
use anyhow::{bail, Context};
use fendermint_vm_interpreter::fvm::state::{
snapshot::{BlockHeight, SnapshotVersion},
FvmStateParams,
Expand Down Expand Up @@ -38,7 +41,37 @@ pub struct SnapshotManifest {
pub struct SnapshotItem {
/// Directory containing this snapshot, ie. the manifest ane the parts.
pub snapshot_dir: PathBuf,
/// Parsed `manifest.json` contents.
pub manifest: SnapshotManifest,
/// Last time a peer asked for a chunk from this snapshot.
pub last_access: SystemTime,
}

impl SnapshotItem {
pub fn new(snapshot_dir: PathBuf, manifest: SnapshotManifest) -> Self {
Self {
snapshot_dir,
manifest,
last_access: SystemTime::UNIX_EPOCH,
}
}
/// Load the data from disk.
///
/// Returns an error if the chunk isn't within range or if the file doesn't exist any more.
pub fn load_chunk(&self, chunk: usize) -> anyhow::Result<Vec<u8>> {
if chunk >= self.manifest.chunks {
bail!(
"cannot load chunk {chunk}; only have {} in the snapshot",
self.manifest.chunks
);
}
let chunk_file = self.snapshot_dir.join("{chunk}.part");

let content = std::fs::read(&chunk_file)
.with_context(|| format!("failed to read chunk {}", chunk_file.to_string_lossy()))?;

Ok(content)
}
}

/// Save a manifest along with the other snapshot files into a snapshot specific directory.
Expand Down Expand Up @@ -86,13 +119,11 @@ pub fn list_manifests(snapshot_dir: impl AsRef<Path>) -> anyhow::Result<Vec<Snap

// Parse manifests
let mut items = Vec::new();

for (snapshot_dir, manifest) in manifests {
let json = std::fs::read_to_string(&manifest).context("failed to open manifest")?;
match serde_json::from_str(&json) {
Ok(manifest) => items.push(SnapshotItem {
snapshot_dir,
manifest,
}),
Ok(manifest) => items.push(SnapshotItem::new(snapshot_dir, manifest)),
Err(e) => {
tracing::error!(
manifest = manifest.to_string_lossy().to_string(),
Expand Down

0 comments on commit cac5517

Please sign in to comment.