Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

Commit

Permalink
FM-361: Mark seeked snapshots as accessed
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Nov 15, 2023
1 parent 4cdafba commit 14ef420
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 23 deletions.
2 changes: 1 addition & 1 deletion fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ where
) -> AbciResult<response::LoadSnapshotChunk> {
if let Some(ref client) = self.snapshots {
if let Some(snapshot) =
atomically(|| client.find_snapshot(request.height.value(), request.format)).await
atomically(|| client.access_snapshot(request.height.value(), request.format)).await
{
match snapshot.load_chunk(request.chunk as usize) {
Ok(chunk) => {
Expand Down
51 changes: 29 additions & 22 deletions fendermint/vm/snapshot/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use std::path::{Path, PathBuf};
use std::time::Duration;
use std::time::{Duration, SystemTime};

use crate::car;
use crate::manifest::{list_manifests, write_manifest, SnapshotItem, SnapshotManifest};
Expand Down Expand Up @@ -33,14 +33,14 @@ struct SnapshotState {
pub struct SnapshotClient {
/// The client will only notify the manager of snapshottable heights.
snapshot_interval: BlockHeight,
snapshot_state: SnapshotState,
state: SnapshotState,
}

impl SnapshotClient {
/// Set the latest block state parameters and notify the manager.
pub fn on_commit(&self, block_height: BlockHeight, params: FvmStateParams) -> Stm<()> {
if block_height % self.snapshot_interval == 0 {
self.snapshot_state
self.state
.latest_params
.write(Some((params, block_height)))?;
}
Expand All @@ -49,21 +49,30 @@ impl SnapshotClient {

/// List completed snapshots.
pub fn list_snapshots(&self) -> Stm<im::Vector<SnapshotItem>> {
self.snapshot_state.snapshots.read_clone()
self.state.snapshots.read_clone()
}

/// Try to find a snapshot, if it still exists.
pub fn find_snapshot(
///
/// If found, mark it as accessed, so that it doesn't get purged while likely to be requested or read from disk.
pub fn access_snapshot(
&self,
block_height: BlockHeight,
version: SnapshotVersion,
) -> Stm<Option<SnapshotItem>> {
let snapshots = self.snapshot_state.snapshots.read()?;
let s = snapshots
.iter()
.find(|s| s.manifest.block_height == block_height && s.manifest.version == version)
.cloned();
Ok(s)
let mut snapshots = self.state.snapshots.read_clone()?;
let mut snapshot = None;
for s in snapshots.iter_mut() {
if s.manifest.block_height == block_height && s.manifest.version == version {
s.last_access = SystemTime::now();
snapshot = Some(s.clone());
break;
}
}
if snapshot.is_some() {
self.state.snapshots.write(snapshots)?;
}
Ok(snapshot)
}
}

Expand All @@ -85,7 +94,7 @@ pub struct SnapshotManager<BS> {
/// How often to check CometBFT whether it has finished syncing.
sync_poll_interval: Duration,
/// Shared state of snapshots.
snapshot_state: SnapshotState,
state: SnapshotState,
/// Indicate whether CometBFT has finished syncing with the chain,
/// so that we can skip snapshotting old states while catching up.
is_syncing: TVar<bool>,
Expand All @@ -107,7 +116,7 @@ where
) -> anyhow::Result<(Self, SnapshotClient)> {
let snapshot_items = list_manifests(&snapshot_dir).context("failed to list manifests")?;

let snapshot_state = SnapshotState {
let state = SnapshotState {
// Start with nothing to snapshot until we are notified about a new height.
// We could also look back to find the latest height we should have snapshotted.
latest_params: TVar::new(None),
Expand All @@ -121,14 +130,14 @@ where
history_size,
last_access_hold,
sync_poll_interval,
snapshot_state: snapshot_state.clone(),
state: state.clone(),
// Assume we are syncing until we can determine otherwise.
is_syncing: TVar::new(true),
};

let client = SnapshotClient {
snapshot_interval,
snapshot_state,
state,
};

Ok((manager, client))
Expand Down Expand Up @@ -164,7 +173,7 @@ where
retry()?;
}

match self.snapshot_state.latest_params.read()?.as_ref() {
match self.state.latest_params.read()?.as_ref() {
None => retry()?,
unchanged if *unchanged == last_params => retry()?,
Some(new_params) => Ok(new_params.clone()),
Expand All @@ -186,7 +195,7 @@ where
);
// Add the snapshot to the in-memory records.
atomically(|| {
self.snapshot_state
self.state
.snapshots
.modify_mut(|items| items.push_back(item.clone()))
})
Expand All @@ -211,14 +220,12 @@ where
}

let removables = atomically(|| {
self.snapshot_state.snapshots.modify_mut(|snapshots| {
self.state.snapshots.modify_mut(|snapshots| {
let mut removables = Vec::new();
while snapshots.len() > self.history_size {
// Stop at the first snapshot that was accessed recently.
if let Some(last_access) = snapshots
.head()
.map(|s| s.last_access.elapsed().ok())
.flatten()
if let Some(last_access) =
snapshots.head().and_then(|s| s.last_access.elapsed().ok())
{
if last_access <= self.last_access_hold {
break;
Expand Down

0 comments on commit 14ef420

Please sign in to comment.