diff --git a/evm_arithmetization/src/prover.rs b/evm_arithmetization/src/prover.rs index 2297c9b91..360ff10a8 100644 --- a/evm_arithmetization/src/prover.rs +++ b/evm_arithmetization/src/prover.rs @@ -1,5 +1,5 @@ use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use anyhow::{anyhow, Result}; use itertools::Itertools; @@ -520,6 +520,15 @@ fn build_segment_data( pub struct SegmentDataIterator { interpreter: Interpreter, partial_next_data: Option, + counter: Option>>, +} + +impl SegmentDataIterator { + + pub fn set_counter(&mut self, counter: Arc>) { + self.counter = Some(counter); + } + } impl SegmentDataIterator { @@ -536,6 +545,7 @@ impl SegmentDataIterator { Self { interpreter, partial_next_data: None, + counter: None } } @@ -606,6 +616,13 @@ impl Iterator for SegmentDataIterator { if let Some((data, next_data)) = self.generate_next_segment(self.partial_next_data.clone()) { self.partial_next_data = next_data; + match &self.counter { + Some(counter) => { + let mut num = counter.lock().unwrap(); + *num += 1; + }, + None => (), + } Some((self.interpreter.generation_state.inputs.clone(), data)) } else { None diff --git a/zero_bin/coordinator/src/benchmarking.rs b/zero_bin/coordinator/src/benchmarking.rs index 65ea7d773..32c20bcfd 100644 --- a/zero_bin/coordinator/src/benchmarking.rs +++ b/zero_bin/coordinator/src/benchmarking.rs @@ -22,6 +22,10 @@ pub struct BenchmarkingStats { pub block_number: u64, /// The number of transactions in the block proved pub n_txs: u64, + /// The number of segments + pub n_segs: usize, + /// The number of generation inputs created + pub n_gen_in: usize, /// The cumulative transaction count. This is the number of transactions /// from this block and all blocks beforehand. None implies data not /// available, not 0. @@ -32,7 +36,6 @@ pub struct BenchmarkingStats { /// The amount of time elapsed during the process of proving this block, /// stored as a [Duration] pub total_proof_duration: Duration, - pub prep_duration: Option, pub txproof_duration: Option, pub agg_wait_duration: Option, @@ -64,7 +67,7 @@ impl BenchmarkingStats { /// Returns a header row pub fn header_row() -> String { String::from( - "block_number, number_txs, cumulative_number_txs, fetch_duration, unique_proof_duration, prep_duration, txproof_duration, agg_wait_duration, agg_duration, start_time, end_time, cumulative_elapsed_time, proof_out_duration, gas_used, cumulative_gas_used, difficulty, gas_used_per_tx", + "block_number, number_txs, n_generated_inputs, n_segments, cumulative_number_txs, fetch_duration, unique_proof_duration, prep_duration, txproof_duration, agg_wait_duration, agg_duration, start_time, end_time, cumulative_elapsed_time, proof_out_duration, gas_used, cumulative_gas_used, difficulty, gas_used_per_tx", ) } @@ -101,9 +104,11 @@ impl BenchmarkingStats { #[allow(clippy::format_in_format_args)] pub fn as_csv_row(&self) -> String { format!( - "{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}", + "{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}", self.block_number, self.n_txs, + self.n_gen_in, + self.n_segs, Self::unwrap_to_string(self.cumulative_n_txs), Self::unwrap_duration_to_string(self.fetch_duration), self.total_proof_duration.as_secs_f64(), diff --git a/zero_bin/coordinator/src/fetch.rs b/zero_bin/coordinator/src/fetch.rs index bd3cbf90b..51cd6efaa 100644 --- a/zero_bin/coordinator/src/fetch.rs +++ b/zero_bin/coordinator/src/fetch.rs @@ -22,7 +22,7 @@ use super::input::BlockSource; pub enum FetchError { RpcFetchError(Error), LocalFileErr(Error), - GcsErr(Error) + GcsErr(Error), } impl std::fmt::Display for FetchError { @@ -61,6 +61,7 @@ impl Checkpoint { Self::BlockNumberNegativeOffset(offset) => { BlockId::Number(BlockNumberOrTag::Number(block_number - *offset)) } + #[allow(unreachable_patterns)] _ => BlockId::Number(BlockNumberOrTag::Number(block_number - 1)), } } @@ -155,8 +156,8 @@ pub async fn fetch(source: &BlockSource) -> Result proverinput, Err(err) => { tracing::error!("Failed to convert file into ProverInput: {}", err); - return Err(FetchError::LocalFileErr(err.into())) - }, + return Err(FetchError::LocalFileErr(err)); + } }; Ok(BenchmarkedProverInput { @@ -186,32 +187,35 @@ pub async fn fetch(source: &BlockSource) -> Result match String::from_utf8(byte_data) { Ok(string) => string, Err(err) => { - tracing::error!("Failed to convert returned data into utf8 string: {}", err); + tracing::error!( + "Failed to convert returned data into utf8 string: {}", + err + ); return Err(FetchError::GcsErr(err.into())); - }, + } }, Err(err) => { tracing::error!("Failed to pull witness from GCS: {}", err); return Err(FetchError::GcsErr(err.into())); - }, + } }; match from_string(&string) { Ok(proverinput) => Ok(BenchmarkedProverInput { proverinput, - fetch_times: Vec::new() + fetch_times: Vec::new(), }), Err(err) => { tracing::error!("Failed to deserialize string into ProverInput: {}", err); - Err(FetchError::GcsErr(err.into())) - }, + Err(FetchError::GcsErr(err)) + } } } } } fn from_string(string: &str) -> Result { - let des = &mut serde_json::Deserializer::from_str(&string); + let des = &mut serde_json::Deserializer::from_str(string); match Vec::::deserialize(des) { Ok(blocks) => Ok(ProverInput { blocks }), diff --git a/zero_bin/coordinator/src/input.rs b/zero_bin/coordinator/src/input.rs index abe17f230..c6253fb97 100644 --- a/zero_bin/coordinator/src/input.rs +++ b/zero_bin/coordinator/src/input.rs @@ -40,8 +40,7 @@ pub enum BlockSource { filepath: String, /// The name of the bucket to be used bucket: String, - } - + }, } unsafe impl Send for BlockSource {} diff --git a/zero_bin/coordinator/src/main.rs b/zero_bin/coordinator/src/main.rs index 555e0b08c..7bdb3a12a 100644 --- a/zero_bin/coordinator/src/main.rs +++ b/zero_bin/coordinator/src/main.rs @@ -2,7 +2,6 @@ //! proofs use std::{ env, - path::PathBuf, sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -15,7 +14,6 @@ pub use coordinator::{ input::{self, ProveBlocksInput}, manyprover, proofout, psm, }; -use dotenvy::dotenv; use ops::register; use paladin::{ config::{Config, Serializer}, @@ -23,7 +21,6 @@ use paladin::{ }; // use leader::init; use tracing::{debug, error, info, warn}; -use zero_bin_common::prover_state; pub const SERVER_ADDR_ENVKEY: &str = "SERVER_ADDR"; pub const DFLT_SERVER_ADDR: &str = "0.0.0.0:8080"; @@ -57,7 +54,7 @@ async fn main() -> Result<()> { //------------------------------------------------------------------------ info!("Initializing the request queue"); - let (mut tx, mut rx) = tokio::sync::mpsc::channel::(50); + let (tx, mut rx) = tokio::sync::mpsc::channel::(50); // Store it in a Data for server let post_queue = web::Data::new(tx); diff --git a/zero_bin/coordinator/src/manyprover.rs b/zero_bin/coordinator/src/manyprover.rs index e9346252c..5c6cb65ce 100644 --- a/zero_bin/coordinator/src/manyprover.rs +++ b/zero_bin/coordinator/src/manyprover.rs @@ -224,6 +224,8 @@ impl ManyProver { Some(benchmark_out) => benchmark_out.push(BenchmarkingStats { block_number: benchmark_block_proof.proof.b_height, n_txs: benchmark_block_proof.n_txs, + n_gen_in: benchmark_block_proof.n_gen_in, + n_segs: benchmark_block_proof.n_segs, cumulative_n_txs: Some(cumulative_n_txs), fetch_duration: fetch_time.copied(), total_proof_duration: benchmark_block_proof diff --git a/zero_bin/coordinator/src/psm.rs b/zero_bin/coordinator/src/psm.rs index 2866fd203..5c1a38c9c 100644 --- a/zero_bin/coordinator/src/psm.rs +++ b/zero_bin/coordinator/src/psm.rs @@ -1,9 +1,6 @@ //! This module helps with creating the [ProverStateManager] -use std::{ - default, - env::{self, VarError}, -}; +use std::env::{self, VarError}; use tracing::{error, info, warn}; use zero_bin_common::prover_state::{ @@ -60,8 +57,21 @@ pub fn load_psm_from_env() -> ProverStateManager { panic!("Unable to determine circiut persistence: `{}`", persistence); } Err(env::VarError::NotPresent) => { - warn!("No circuit persistence specified, using default"); - CircuitPersistence::default() + warn!("No circuit persistence specified, using default. Will attempt to use table load strategy if provided."); + + match (tbl_load_strat, CircuitPersistence::default()) { + // If given a tbl_load_strat and Circuit Persistence as a disk is the default, we go + // ahead and modify the tablt load strategy + (Some(tbl_load), CircuitPersistence::Disk(_)) => CircuitPersistence::Disk(tbl_load), + // Given the table load strategy and the default, warn we don't know how to + // apply the table load strategy and return the default + (Some(tbl_load), dflt) => { + warn!("Default circuit persistence is {:?}, unsure how to apply table load strategy ({:?})", dflt, tbl_load); + dflt + } + // Return None and the circuit persistence + (None, dflt) => dflt, + } } Err(env::VarError::NotUnicode(os_str)) => { error!("Non-Unicode circiut persistence: {:?}", os_str); diff --git a/zero_bin/leader/src/client.rs b/zero_bin/leader/src/client.rs index 74910f621..9239fee77 100644 --- a/zero_bin/leader/src/client.rs +++ b/zero_bin/leader/src/client.rs @@ -29,6 +29,7 @@ pub struct ProofParams { } /// The main function for the client. +#[allow(dead_code)] pub(crate) async fn client_main( runtime: Runtime, rpc_params: RpcParams, diff --git a/zero_bin/leader/src/init.rs b/zero_bin/leader/src/init.rs index ac15ba49b..b7c9d65eb 100644 --- a/zero_bin/leader/src/init.rs +++ b/zero_bin/leader/src/init.rs @@ -10,9 +10,10 @@ pub fn tracing() { .init(); } +use std::io; + use dotenvy::dotenv; use tracing::warn; -use std::io; pub const EVM_ARITH_VER_KEY: &str = "EVM_ARITHMETIZATION_PKG_VER"; @@ -26,4 +27,4 @@ pub fn load_dotenvy_vars_if_present() { Ok(_) | Err(dotenvy::Error::Io(io::Error { .. })) => (), Err(e) => warn!("Found local `.env` file but was unable to parse it! (err: {e})",), } -} \ No newline at end of file +} diff --git a/zero_bin/leader/src/main.rs b/zero_bin/leader/src/main.rs index 71bb499ef..999f4cae3 100644 --- a/zero_bin/leader/src/main.rs +++ b/zero_bin/leader/src/main.rs @@ -20,7 +20,6 @@ mod init; mod stdio; pub use init::load_dotenvy_vars_if_present; - pub use init::EVM_ARITH_VER_KEY; fn get_previous_proof(path: Option) -> Result> { @@ -129,5 +128,3 @@ async fn main() -> Result<()> { Ok(()) } - - diff --git a/zero_bin/prover/src/cli.rs b/zero_bin/prover/src/cli.rs index 8f4ccc73e..054f283af 100644 --- a/zero_bin/prover/src/cli.rs +++ b/zero_bin/prover/src/cli.rs @@ -18,13 +18,13 @@ pub struct CliProverConfig { impl Default for CliProverConfig { /// Built to copy the `default_value_t` clap arguments. - /// + /// /// NOTE: May need to be periodically updated. fn default() -> Self { Self { max_cpu_len_log: 20, batch_size: 1, - save_inputs_on_error: false + save_inputs_on_error: false, } } } diff --git a/zero_bin/prover/src/lib.rs b/zero_bin/prover/src/lib.rs index 7352c3ecf..3f47d13b5 100644 --- a/zero_bin/prover/src/lib.rs +++ b/zero_bin/prover/src/lib.rs @@ -7,7 +7,6 @@ use std::{future::Future, time::Duration}; use alloy::primitives::{BlockNumber, U256}; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use cli::CliProverConfig; use futures::stream::FuturesOrdered; use futures::{future::BoxFuture, FutureExt, TryFutureExt, TryStreamExt}; use num_traits::ToPrimitive as _; @@ -49,6 +48,8 @@ pub struct BenchmarkedGeneratedBlockProof { pub agg_dur: Option, pub total_dur: Option, pub n_txs: u64, + pub n_gen_in: usize, + pub n_segs: usize, pub gas_used: u64, pub difficulty: u64, pub start_time: DateTime, @@ -79,6 +80,8 @@ impl BlockProverInput { let prep_start = Instant::now(); let start_time: DateTime = Utc::now(); + use std::sync::{Arc, Mutex}; + // Basic preparation use anyhow::Context as _; use evm_arithmetization::prover::SegmentDataIterator; @@ -100,6 +103,7 @@ impl BlockProverInput { batch_size, )?; + let n_gen_in: usize = block_generation_inputs.len(); let n_txs: u64 = block_generation_inputs .iter() .map(|tx| tx.signed_txns.len() as u64) @@ -124,13 +128,15 @@ impl BlockProverInput { prep_dur.as_secs_f64() ); + let counter: Arc> = Arc::new(Mutex::new(0)); + let proof_start = Instant::now(); let tx_proof_futs: FuturesUnordered<_> = block_generation_inputs .iter() .enumerate() .map(|(idx, txn)| { - let data_iterator = SegmentDataIterator::::new(txn, Some(max_cpu_len_log)); - + let mut data_iterator = SegmentDataIterator::::new(txn, Some(max_cpu_len_log)); + data_iterator.set_counter(Arc::clone(&counter)); Directive::map(IndexedStream::from(data_iterator), &seg_ops) .fold(&agg_ops) .run(runtime) @@ -198,6 +204,8 @@ impl BlockProverInput { agg_wait_dur: Some(agg_wait_dur), agg_dur: Some(agg_dur), n_txs: n_txs as u64, + n_gen_in, + n_segs: *counter.lock().unwrap(), gas_used, difficulty: u64::try_from(difficulty).expect("Difficulty overflow"), start_time, diff --git a/zero_bin/worker/src/main.rs b/zero_bin/worker/src/main.rs index f37d9ef63..fd93c8924 100644 --- a/zero_bin/worker/src/main.rs +++ b/zero_bin/worker/src/main.rs @@ -41,10 +41,8 @@ async fn main() -> Result<()> { let psm = args.prover_state_config.into_prover_state_manager(); info!("Worker ProverStateManager: {:?}", psm); - - psm.initialize()?; - + psm.initialize()?; let runtime = WorkerRuntime::from_config(&args.paladin, register()).await?;