Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust/cardano-chain-follower): add thread and mmap file stats #150

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion rust/cardano-chain-follower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ tokio = { version = "1.42.0", features = [
] }
tracing = "0.1.41"
tracing-log = "0.2.0"
dashmap = "6.1.0"
dashmap = { version = "6.1.0", features = ["serde"] }
url = "2.5.4"
anyhow = "1.0.95"
chrono = "0.4.39"
Expand All @@ -56,6 +56,7 @@ ureq = { version = "2.12.1", features = ["native-certs"] }
http = "1.2.0"
hickory-resolver = { version = "0.24.2", features = ["dns-over-rustls"] }
moka = { version = "0.12.9", features = ["sync"] }
cpu-time = "1.0.0"

[dev-dependencies]
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
Expand Down
20 changes: 18 additions & 2 deletions rust/cardano-chain-follower/src/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,14 +411,30 @@ async fn live_sync_backfill(
Ok(())
}

/// Call the live sync backfill.
/// This is a helper function to pause and resume the stats thread.
async fn call_live_sync_backfill(
cfg: &ChainSyncConfig, name: &str, update: &MithrilUpdateMessage,
) -> anyhow::Result<()> {
stats::pause_thread(cfg.chain, name);
let result = live_sync_backfill(cfg, update).await;
stats::resume_thread(cfg.chain, name);
result
}

/// Backfill and Purge the live chain, based on the Mithril Sync updates.
async fn live_sync_backfill_and_purge(
cfg: ChainSyncConfig, mut rx: mpsc::Receiver<MithrilUpdateMessage>,
mut sync_ready: SyncReadyWaiter,
) {
/// Thread name for stats.
const THREAD_NAME: &str = "LiveSyncBackfillAndPurge";
apskhem marked this conversation as resolved.
Show resolved Hide resolved

stats::start_thread(cfg.chain, THREAD_NAME, true);
// Wait for first Mithril Update advice, which triggers a BACKFILL of the Live Data.
let Some(update) = rx.recv().await else {
error!("Mithril Sync Failed, can not continue chain sync either.");
stats::stop_thread(cfg.chain, THREAD_NAME);
return;
};

Expand All @@ -433,7 +449,7 @@ async fn live_sync_backfill_and_purge(
// We will re-attempt backfill, until its successful.
// Backfill is atomic, it either fully works, or none of the live-chain is changed.
debug!("Mithril Tip has advanced to: {update:?} : BACKFILL");
while let Err(error) = live_sync_backfill(&cfg, &update).await {
while let Err(error) = call_live_sync_backfill(&cfg, THREAD_NAME, &update).await {
error!("Mithril Backfill Sync Failed: {}", error);
sleep(Duration::from_secs(10)).await;
}
Expand Down Expand Up @@ -464,6 +480,7 @@ async fn live_sync_backfill_and_purge(
loop {
let Some(update) = rx.recv().await else {
error!("Mithril Sync Failed, can not continue chain sync either.");
stats::stop_thread(cfg.chain, THREAD_NAME);
return;
};

Expand Down Expand Up @@ -539,7 +556,6 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU
loop {
// We never have a connection if we end up around the loop, so make a new one.
let mut peer = persistent_reconnect(&cfg.relay_address, cfg.chain).await;

match resync_live_tip(&mut peer, cfg.chain).await {
Ok(tip) => debug!("Tip Resynchronized to {tip}"),
Err(error) => {
Expand Down
23 changes: 20 additions & 3 deletions rust/cardano-chain-follower/src/chain_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! each network. Chain Followers use the data supplied by the Chain-Sync.
//! This module configures the chain sync processes.

use std::sync::LazyLock;
use std::{panic, sync::LazyLock};

use cardano_blockchain_types::Network;
use dashmap::DashMap;
Expand Down Expand Up @@ -125,6 +125,8 @@ impl ChainSyncConfig {
///
/// `Error`: On error.
pub async fn run(self) -> Result<()> {
/// Thread name for stats.
const THREAD_NAME: &str = "ChainSync";
debug!(
chain = self.chain.to_string(),
"Chain Synchronization Starting"
Expand All @@ -150,8 +152,23 @@ impl ChainSyncConfig {
// Start the Mithril Snapshot Follower
let rx = self.mithril_cfg.run().await?;

// Start Chain Sync
*locked_handle = Some(tokio::spawn(chain_sync(self.clone(), rx)));
// Wrap inside a panic catcher to detect if the task panics.
let result = panic::catch_unwind(|| {
stats::start_thread(self.chain, THREAD_NAME, true);
// Start Chain Sync
tokio::spawn(chain_sync(self.clone(), rx))
});

if let Ok(handle) = result {
*locked_handle = Some(handle);
} else {
// Chain sync panic, stop the thread and log.
error!(
chain = self.chain.to_string(),
"Chain Sync for {} : PANICKED", self.chain
);
stats::stop_thread(self.chain, THREAD_NAME);
}

// sync_map.insert(chain, handle);
debug!("Chain Sync for {} : Started", self.chain);
Expand Down
8 changes: 6 additions & 2 deletions rust/cardano-chain-follower/src/chain_sync_ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
};
use tracing::error;

use crate::chain_update;
use crate::{chain_update, stats};

/// Data we hold related to sync being ready or not.
struct SyncReady {
Expand Down Expand Up @@ -85,9 +85,13 @@ static SYNC_READY: LazyLock<DashMap<Network, RwLock<SyncReady>>> = LazyLock::new
/// Write Lock the `SYNC_READY` lock for a network.
/// When we are signaled to be ready, set it to true and release the lock.
pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter {
/// Thread name for stats.
const THREAD_NAME: &str = "WaitForSyncReady";

let (tx, rx) = oneshot::channel::<()>();

tokio::spawn(async move {
stats::start_thread(chain, THREAD_NAME, true);
// We are safe to use `expect` here because the SYNC_READY list is exhaustively
// initialized. Its a Serious BUG if that not True, so panic is OK.
#[allow(clippy::expect_used)]
Expand All @@ -101,7 +105,7 @@ pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter {
if let Ok(()) = rx.await {
status.ready = true;
}

stats::stop_thread(chain, THREAD_NAME);
// If the channel closes early, we can NEVER use the Blockchain data.
});

Expand Down
1 change: 1 addition & 0 deletions rust/cardano-chain-follower/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod mithril_snapshot_data;
mod mithril_snapshot_iterator;
mod mithril_snapshot_sync;
mod mithril_turbo_downloader;
mod mmap_file;
mod snapshot_id;
mod stats;
pub mod turbo_downloader;
Expand Down
20 changes: 15 additions & 5 deletions rust/cardano-chain-follower/src/mithril_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,35 @@

use std::path::Path;

use cardano_blockchain_types::Point;
use cardano_blockchain_types::{Network, Point};
use pallas_hardano::storage::immutable::FallibleBlock;
use tokio::task;

use crate::error::{Error, Result};
use crate::{
error::{Error, Result},
stats,
};

/// Synchronous Immutable block iterator.
pub(crate) type ImmutableBlockIterator = Box<dyn Iterator<Item = FallibleBlock> + Send + Sync>;

/// Get a mithril snapshot iterator.
pub(crate) async fn make_mithril_iterator(
path: &Path, start: &Point,
path: &Path, start: &Point, chain: Network,
) -> Result<ImmutableBlockIterator> {
/// Thread name for stats.
const THREAD_NAME: &str = "MithrilIterator";

let path = path.to_path_buf();
let start = start.clone();
// Initial input
let res = task::spawn_blocking(move || {
pallas_hardano::storage::immutable::read_blocks_from_point(&path, start.clone().into())
.map_err(|error| Error::MithrilSnapshot(Some(error)))
stats::start_thread(chain, THREAD_NAME, false);
let result =
pallas_hardano::storage::immutable::read_blocks_from_point(&path, start.clone().into())
.map_err(|error| Error::MithrilSnapshot(Some(error)));
stats::stop_thread(chain, THREAD_NAME);
result
})
.await;

Expand Down
23 changes: 22 additions & 1 deletion rust/cardano-chain-follower/src/mithril_snapshot_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Configuration for the Mithril Snapshot used by the follower.

use std::{
panic,
path::{Path, PathBuf},
str::FromStr,
sync::LazyLock,
Expand All @@ -24,6 +25,7 @@ use crate::{
mithril_snapshot_data::{latest_mithril_snapshot_id, SnapshotData},
mithril_snapshot_sync::background_mithril_update,
snapshot_id::SnapshotId,
stats,
turbo_downloader::DlConfig,
};

Expand Down Expand Up @@ -384,6 +386,9 @@ impl MithrilSnapshotConfig {

/// Run a Mithril Follower for the given network and configuration.
pub(crate) async fn run(&self) -> Result<mpsc::Receiver<MithrilUpdateMessage>> {
/// Thread name for stats.
const THREAD_NAME: &str = "MithrilSnapshotUpdater";

debug!(
chain = self.chain.to_string(),
"Mithril Auto-update : Starting"
Expand Down Expand Up @@ -413,7 +418,23 @@ impl MithrilSnapshotConfig {
let (tx, rx) = mpsc::channel::<MithrilUpdateMessage>(2);

// let handle = tokio::spawn(background_mithril_update(chain, self.clone(), tx));
*locked_handle = Some(tokio::spawn(background_mithril_update(self.clone(), tx)));

// Wrap inside a panic catcher to detect if the task panics.
let result = panic::catch_unwind(|| {
stats::start_thread(self.chain, THREAD_NAME, true);
tokio::spawn(background_mithril_update(self.clone(), tx))
});

if let Ok(handle) = result {
*locked_handle = Some(handle);
} else {
// Mithril update panic, stop the thread and log.
error!(
chain = self.chain.to_string(),
"Background Mithril Update for {} : PANICKED", self.chain
);
stats::stop_thread(self.chain, THREAD_NAME);
}

// sync_map.insert(chain, handle);
debug!(
Expand Down
15 changes: 11 additions & 4 deletions rust/cardano-chain-follower/src/mithril_snapshot_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tracing_log::log;
use crate::{
error::{Error, Result},
mithril_query::{make_mithril_iterator, ImmutableBlockIterator},
stats,
};

/// Search backwards by 60 slots (seconds) looking for a previous block.
Expand Down Expand Up @@ -73,7 +74,7 @@ impl MithrilSnapshotIterator {
chain: Network, path: &Path, from: &Point, search_interval: u64,
) -> Option<MithrilSnapshotIterator> {
let point = probe_point(from, search_interval);
let Ok(mut iterator) = make_mithril_iterator(path, &point).await else {
let Ok(mut iterator) = make_mithril_iterator(path, &point, chain).await else {
return None;
};

Expand Down Expand Up @@ -116,7 +117,7 @@ impl MithrilSnapshotIterator {
let this = this?;

// Remake the iterator, based on the new known point.
let Ok(iterator) = make_mithril_iterator(path, &this).await else {
let Ok(iterator) = make_mithril_iterator(path, &this, chain).await else {
return None;
};

Expand Down Expand Up @@ -176,7 +177,7 @@ impl MithrilSnapshotIterator {

debug!("Actual Mithril Iterator Start: {}", from);

let iterator = make_mithril_iterator(path, from).await?;
let iterator = make_mithril_iterator(path, from, chain).await?;

Ok(MithrilSnapshotIterator {
inner: Arc::new(Mutex::new(MithrilSnapshotIteratorInner {
Expand All @@ -191,12 +192,18 @@ impl MithrilSnapshotIterator {
/// Get the next block, in a way that is Async friendly.
/// Returns the next block, or None if there are no more blocks.
pub(crate) async fn next(&self) -> Option<MultiEraBlock> {
/// Thread name for stats.
const THREAD_NAME: &str = "MithrilSnapshotIterator::Next";

let inner = self.inner.clone();

let res = task::spawn_blocking(move || {
#[allow(clippy::unwrap_used)] // Unwrap is safe here because the lock can't be poisoned.
let mut inner_iterator = inner.lock().unwrap();
inner_iterator.next()
stats::start_thread(inner_iterator.chain, THREAD_NAME, false);
let next = inner_iterator.next();
stats::stop_thread(inner_iterator.chain, THREAD_NAME);
next
})
.await;

Expand Down
15 changes: 13 additions & 2 deletions rust/cardano-chain-follower/src/mithril_snapshot_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,20 @@ async fn get_mithril_snapshot_and_certificate(
async fn validate_mithril_snapshot(
chain: Network, certificate: &MithrilCertificate, path: &Path,
) -> bool {
/// Thread name for stats.
const THREAD_NAME: &str = "ValidateMithrilSnapshot";

let cert = certificate.clone();
let mithril_path = path.to_path_buf();
match tokio::spawn(async move {
// This can be long running and CPU Intensive.
// So we spawn it off to a background task.
MessageBuilder::new()
stats::start_thread(chain, THREAD_NAME, true);
let result = MessageBuilder::new()
.compute_snapshot_message(&cert, &mithril_path)
.await
.await;
stats::stop_thread(chain, THREAD_NAME);
result
})
.await
{
Expand Down Expand Up @@ -516,7 +522,11 @@ async fn check_snapshot_to_download(
fn background_validate_mithril_snapshot(
chain: Network, certificate: MithrilCertificate, tmp_path: PathBuf,
) -> tokio::task::JoinHandle<bool> {
/// Thread name for stats.
const THREAD_NAME: &str = "BGValidateMithrilSnapshot";

tokio::spawn(async move {
stats::start_thread(chain, THREAD_NAME, true);
debug!(
"Mithril Snapshot background updater for: {} : Check Certificate.",
chain
Expand All @@ -540,6 +550,7 @@ fn background_validate_mithril_snapshot(
chain
);

stats::stop_thread(chain, THREAD_NAME);
true
})
}
Expand Down
Loading
Loading