Skip to content

Commit

Permalink
Merge branch 'feat/auto-sync-mithril' into feat/get-block-cli
Browse files Browse the repository at this point in the history
  • Loading branch information
bkioshn committed Sep 3, 2024
2 parents 08376dd + 45f9fad commit 9813363
Show file tree
Hide file tree
Showing 20 changed files with 1,576 additions and 574 deletions.
3 changes: 3 additions & 0 deletions .config/dictionaries/project.dic
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ dreps
Earthfile
encryptor
Errno
etype
excalidraw
fadvise
fcntl
Expand Down Expand Up @@ -115,6 +116,7 @@ logcall
lookaside
maindbname
mapref
Mbits
mdlint
mdns
memeq
Expand Down Expand Up @@ -222,6 +224,7 @@ unixfs
unlinkat
untar
upnp
ureq
utimensat
vitss
vkey
Expand Down
53 changes: 28 additions & 25 deletions hermes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ missing_docs_in_private_items = "deny"

[workspace.dependencies]
# specific commit from the `catalyst` branch
pallas = "0.29.0"
pallas = "0.30.1"
# specific commit from the `catalyst` branch
pallas-hardano = "0.29.0"
pallas-crypto = "0.29.0"
pallas-hardano = "0.30.1"
pallas-crypto = "0.30.1"
cardano-chain-follower = { path = "crates/cardano-chain-follower", version = "0.2.0" }

hermes-ipfs = { path = "crates/hermes-ipfs", version = "0.0.1" }

wasmtime = "23.0.1"
wasmtime = "24.0.0"
rusty_ulid = "2.0.0"
anyhow = "1.0.86"
blake2b_simd = "1.0.2"
blake3 = { version = "1.5.1", features = ["rayon", "mmap"] }
blake3 = { version = "1.5.4", features = ["rayon", "mmap"] }
hex-literal = "0.4.1"
thiserror = "1.0.56"
thiserror = "1.0.63"
hex = "0.4.3"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
Expand All @@ -73,18 +73,18 @@ libtest-mimic = "0.7.0"
bip39 = "2.0.0"
iana-time-zone = "0.1.60"
rand = "0.8.5"
bip32 = "0.5.1"
bip32 = "0.5.2"
ed25519-bip32 = "0.4.1"
dashmap = "6.0.1"
once_cell = "1.19.0"
clap = "4.5.3"
clap = "4.5.16"
build-info = "0.0.38"
build-info-build = "0.0.38"
derive_more = "1.0.0"
chrono = "0.4.38"
chrono-tz = "0.9.0"
saffron = "0.1.0"
tokio = { version = "1.36.0", features = [
tokio = { version = "1.39.3", features = [
"macros",
"rt",
"net",
Expand All @@ -94,26 +94,26 @@ tokio = { version = "1.36.0", features = [
tokio-util = { version = "0.7.11", features = ["codec"] }
tokio-stream = "0.1.15"
libsqlite3-sys = "0.30.0"
stringzilla = "3.8.4"
stringzilla = "3.9.3"
serial_test = { version = "3.1.1", features = ["file_locks"] }
hdf5 = { git = "https://github.com/aldanor/hdf5-rust.git", rev = "694e900972fbf5ffbdd1a2294f57a2cc3a91c994", version = "0.8.1", features = [
"static",
"blosc",
] }
temp-dir = "0.1.13"
# needs to enable blosc compression functionality for hdf5 crate
blosc-src = { version = "0.3.0", features = ["lz4", "zlib", "zstd"] }
blosc-src = { version = "0.3.4", features = ["lz4", "zlib", "zstd"] }
num_cpus = "1.16.0"
console = "0.15.8"
serde = "1.0"
serde_json = "1.0"
jsonschema = "0.18.0"
serde = "1.0.209"
serde_json = "1.0.127"
jsonschema = "0.18.1"
url = "2.5.0"
regex = "1.10.4"
humansize = { version = "^2.1.2", default-features = false, features = [
"no_alloc",
] }
reqwest = { version = "0.12.5, >=0.0.0", default-features = false, features = [
reqwest = { version = "0.12.7, >=0.0.0", default-features = false, features = [
"rustls-tls-native-roots",
"http2",
"blocking",
Expand All @@ -130,10 +130,10 @@ mithril-client = { git = "https://github.com/input-output-hk/catalyst-mithril.gi
"num-integer-backend"
] }
mimalloc = "0.1.43"
bytes = "1.6.0"
bytes = "1.7.1"
tar = "0.4.41"
zstd = "0.13.1"
async-trait = "0.1.80"
zstd = "0.13.2"
async-trait = "0.1.81"
dirs = "5.0.1"
futures = "0.3.30"
bytesize = "1.3.0"
Expand All @@ -143,6 +143,7 @@ humantime = "2.1.0"
crossbeam-queue = "0.3.11"
crossbeam-skiplist = "0.1.3"
crossbeam-channel = "0.5.13"
crossbeam-epoch = "0.9.18"
strum = "0.26.3"
strum_macros = "0.26.4"
rayon = "1.10.0"
Expand All @@ -152,18 +153,20 @@ pbkdf2 = "0.12.2"
sha2 = "0.10"
ed25519-dalek = "2.1.1"
x509-cert = "0.2.5"
coset = "0.3.7"
coset = "0.3.8"
libipld = "0.16.0"
libp2p = "0.54.0"
rust-ipfs = "0.11.19"
rustyline-async = "0.4.2"
libp2p = "0.54.1"
rust-ipfs = "0.11.21"
rustyline-async = "0.4.3"
ouroboros = "0.18.4"
memx = "0.1.32"
fmmap = {version = "0.3.3", features = ["sync", "tokio-async"]}
minicbor = {version = "0.24.2", features = ["alloc", "derive", "half"]}
redb = {version = "2.1.1", features = ["cache_metrics"]}
heed = {version = "0.20.3", features = ["mdb_idl_logn_16", "posix-sem"]}
brotli = "6.0.0"
c509-certificate = { git = "https://github.com/input-output-hk/catalyst-voices.git", package = "c509-certificate", branch = "fix/c509-cleanup"}
num-traits = "0.2.19"
logcall = "0.1.9"
logcall = "0.1.9"
ureq = {version = "2.10.1", features=["native-certs"]}
http = "1.1.0"
hickory-resolver = { version = "0.24.1", features = ["dns-over-rustls"] }
moka = { version = "0.12.8", features = ["sync"]}
12 changes: 11 additions & 1 deletion hermes/Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,21 @@ code_format:
cargo +nightly fmtfix

# Run long running developer test for mithril downloading.
run-mithril-download-example: code_format
run-mithril-download-example-preprod: code_format
cargo build -r --package cardano-chain-follower --example follow_chains --features mimalloc
RUST_LOG="error,follow_chains=debug,cardano_chain_follower=debug,mithril-client=debug" \
./target/release/examples/follow_chains --preprod

run-mithril-download-example-preprod-high-dl-bandwidth: code_format
cargo build -r --package cardano-chain-follower --example follow_chains --features mimalloc
RUST_LOG="error,follow_chains=debug,cardano_chain_follower=debug,mithril-client=debug" \
./target/release/examples/follow_chains --preprod --mithril-sync-workers 64 --mithril-sync-chunk-size 16 --mithril-sync-queue-ahead=6

run-mithril-download-example-preprod-conservastive-dl-bandwidth: code_format
cargo build -r --package cardano-chain-follower --example follow_chains --features mimalloc
RUST_LOG="error,follow_chains=debug,cardano_chain_follower=debug,mithril-client=debug" \
./target/release/examples/follow_chains --preprod --mithril-sync-workers 8 --mithril-sync-chunk-size 1 --mithril-sync-queue-ahead=2

run-mithril-download-example-preview: code_format
cargo build -r --package cardano-chain-follower --example follow_chains --features mimalloc
RUST_LOG="error,follow_chains=debug,cardano_chain_follower=debug,mithril-client=debug" \
Expand Down
8 changes: 7 additions & 1 deletion hermes/crates/cardano-chain-follower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ mimalloc = {workspace = true, optional = true}
memx.workspace = true
fmmap.workspace = true
minicbor.workspace = true
heed = {workspace = true, optional = true}
brotli.workspace = true
zstd.workspace = true
c509-certificate.workspace = true
Expand All @@ -52,6 +51,13 @@ blake2b_simd.workspace = true
num-traits.workspace = true
logcall.workspace = true
tracing-log.workspace = true
tar.workspace = true
ureq.workspace = true
http.workspace = true
crossbeam-channel.workspace = true
hickory-resolver.workspace = true
moka.workspace = true
crossbeam-epoch.workspace = true

[dev-dependencies]
hex.workspace = true
Expand Down
89 changes: 75 additions & 14 deletions hermes/crates/cardano-chain-follower/examples/follow_chains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use mimalloc::MiMalloc;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

use std::error::Error;
use std::{error::Error, time::Duration};

use cardano_chain_follower::{
ChainFollower, ChainSyncConfig, ChainUpdate, Kind, Metadata, Network, Point, Statistics,
Expand Down Expand Up @@ -51,6 +51,21 @@ fn process_argument() -> (Vec<Network>, ArgMatches) {
.action(ArgAction::SetTrue),
arg!(--"largest-metadata" "Dump The largest transaction metadata we find (as we find it).")
.action(ArgAction::SetTrue),
arg!(--"mithril-sync-workers" <WORKERS> "The number of workers to use when downloading the blockchain snapshot.")
.value_parser(clap::value_parser!(u16).range(1..))
.action(ArgAction::Set),
arg!(--"mithril-sync-chunk-size" <MB> "The size in MB of each chunk downloaded by a worker.")
.value_parser(clap::value_parser!(u16).range(1..))
.action(ArgAction::Set),
arg!(--"mithril-sync-queue-ahead" <NUM> "The number of chunks pre-queued per worker.")
.value_parser(clap::value_parser!(u16).range(1..))
.action(ArgAction::Set),
arg!(--"mithril-sync-connect-timeout" <SECS> "The HTTP Connection Timeout for mithril downloads, in seconds.")
.value_parser(clap::value_parser!(u64).range(1..))
.action(ArgAction::Set),
arg!(--"mithril-sync-data-read-timeout" <SECS> "The HTTP Data Read Timeout for mithril downloads, in seconds.")
.value_parser(clap::value_parser!(u64).range(1..))
.action(ArgAction::Set),
])
.get_matches();

Expand All @@ -69,10 +84,54 @@ fn process_argument() -> (Vec<Network>, ArgMatches) {
}

/// Start syncing a particular network
async fn start_sync_for(network: &Network) -> Result<(), Box<dyn Error>> {
let cfg = ChainSyncConfig::default_for(*network);
async fn start_sync_for(network: &Network, matches: &ArgMatches) -> Result<(), Box<dyn Error>> {
let mut cfg = ChainSyncConfig::default_for(*network);

info!(chain = cfg.chain.to_string(), "Starting Sync");
let mut mithril_dl_connect_timeout = "Not Set".to_string();
let mut mithril_dl_data_timeout = "Not Set".to_string();

let mut dl_config = cfg.mithril_cfg.dl_config.clone().unwrap_or_default();

if let Some(workers) = matches.get_one::<u16>("mithril-sync-workers") {
dl_config = dl_config.with_workers(*workers as usize);
}
let mithril_dl_workers = format!("{}", dl_config.workers);

if let Some(chunk_size) = matches.get_one::<u16>("mithril-sync-chunk-size") {
dl_config = dl_config.with_chunk_size(*chunk_size as usize * 1024 * 1024);
}
let mithril_dl_chunk_size = format!("{} MBytes", dl_config.chunk_size / (1024 * 1024));

if let Some(queue_ahead) = matches.get_one::<u16>("mithril-sync-queue-ahead") {
dl_config = dl_config.with_queue_ahead(*queue_ahead as usize);
}
let mithril_dl_queue_ahead = format!("{}", dl_config.queue_ahead);

if let Some(connect_timeout) = matches.get_one::<u64>("mithril-sync-connect-timeout") {
dl_config = dl_config.with_connection_timeout(Duration::from_secs(*connect_timeout));
}
if let Some(connect_timeout) = dl_config.connection_timeout {
mithril_dl_connect_timeout = format!("{}", humantime::format_duration(connect_timeout));
}

if let Some(data_timeout) = matches.get_one::<u64>("mithril-sync-data-timeout") {
dl_config = dl_config.with_connection_timeout(Duration::from_secs(*data_timeout));
}
if let Some(data_timeout) = dl_config.data_read_timeout {
mithril_dl_data_timeout = format!("{}", humantime::format_duration(data_timeout));
}

cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);

info!(
chain = cfg.chain.to_string(),
mithril_sync_dl_workers = mithril_dl_workers,
mithril_sync_dl_chunk_size = mithril_dl_chunk_size,
mithril_sync_dl_queue_ahead = mithril_dl_queue_ahead,
mithril_sync_dl_connect_timeout = mithril_dl_connect_timeout,
mithril_sync_dl_data_read_timeout = mithril_dl_data_timeout,
"Starting Sync"
);

if let Err(error) = cfg.run().await {
error!("Failed to start sync task for {} : {}", network, error);
Expand Down Expand Up @@ -297,14 +356,16 @@ async fn get_block(network: Network, p: u64) {
/// Handle get-block subcommand
async fn handle_get_block(matches: &clap::ArgMatches) -> Result<(), Box<dyn Error>> {
let network = match matches.get_one::<String>("NETWORK") {
Some(s) => match s.as_str() {
"preprod" => Network::Preprod,
"preview" => Network::Preview,
"mainnet" => Network::Mainnet,
_ => {
error!("Invalid network specified: {}", s);
return Ok(());
},
Some(s) => {
match s.as_str() {
"preprod" => Network::Preprod,
"preview" => Network::Preview,
"mainnet" => Network::Mainnet,
_ => {
error!("Invalid network specified: {}", s);
return Ok(());
},
}
},
None => {
error!("Network argument is missing.");
Expand All @@ -315,7 +376,7 @@ async fn handle_get_block(matches: &clap::ArgMatches) -> Result<(), Box<dyn Erro
if let Some(&point) = matches.get_one::<u64>("POINT") {
info!("Spawning get_block task");
// Start sync for network
start_sync_for(&network).await?;
start_sync_for(&network, matches).await?;
// Spawn the get_block task
tokio::spawn(get_block(network, point)).await?;
} else {
Expand All @@ -331,7 +392,7 @@ async fn handle_default_case(
) -> Result<(), Box<dyn Error>> {
// Start sync tasks for each network
for network in networks {
start_sync_for(network).await?;
start_sync_for(network, &matches).await?;
}

let mut tasks = Vec::new();
Expand Down
6 changes: 3 additions & 3 deletions hermes/crates/cardano-chain-follower/src/chain_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use std::sync::LazyLock;

use crossbeam_skiplist::SkipMap;
use dashmap::DashMap;
use strum::IntoEnumIterator;
use tokio::{sync::Mutex, task::JoinHandle};
use tracing::{debug, error};
Expand All @@ -27,10 +27,10 @@ const DEFAULT_CHAIN_UPDATE_BUFFER_SIZE: usize = 32;
const DEFAULT_IMMUTABLE_SLOT_WINDOW: u64 = 12 * 60 * 60;

/// Type we use to manage the Sync Task handle map.
type SyncMap = SkipMap<Network, Mutex<Option<JoinHandle<()>>>>;
type SyncMap = DashMap<Network, Mutex<Option<JoinHandle<()>>>>;
/// Handle to the mithril sync thread. One for each Network ONLY.
static SYNC_JOIN_HANDLE_MAP: LazyLock<SyncMap> = LazyLock::new(|| {
let map = SkipMap::new();
let map = DashMap::new();
for network in Network::iter() {
map.insert(network, Mutex::new(None));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ impl ProtectedLiveChainBlockList {
while let Some(previous_block) = purge_start_block_entry.prev() {
let _unused = previous_block.remove();
}

// Try and FORCE the skip map to reclaim its memory
crossbeam_epoch::pin().flush();
crossbeam_epoch::pin().flush();
}

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions hermes/crates/cardano-chain-follower/src/chain_sync_ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{sync::LazyLock, time::Duration};

use crossbeam_skiplist::SkipMap;
use dashmap::DashMap;
use strum::IntoEnumIterator;
use tokio::{
sync::{broadcast, oneshot, RwLock},
Expand Down Expand Up @@ -73,8 +73,8 @@ impl SyncReadyWaiter {
/// Lock to prevent using any blockchain data for a network UNTIL it is synced to TIP.
/// Pre-initialized for all possible blockchains, so it's safe to use `expect` to access a
/// value.
static SYNC_READY: LazyLock<SkipMap<Network, RwLock<SyncReady>>> = LazyLock::new(|| {
let map = SkipMap::new();
static SYNC_READY: LazyLock<DashMap<Network, RwLock<SyncReady>>> = LazyLock::new(|| {
let map = DashMap::new();
for network in Network::iter() {
map.insert(network, RwLock::new(SyncReady::new()));
}
Expand Down
1 change: 1 addition & 0 deletions hermes/crates/cardano-chain-follower/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod network;
mod point;
mod snapshot_id;
mod stats;
pub mod turbo_downloader;
mod utils;
mod witness;

Expand Down
Loading

0 comments on commit 9813363

Please sign in to comment.