Skip to content

Commit

Permalink
feat(starknet_batcher): emit proposal metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
yair-starkware committed Jan 14, 2025
1 parent 308287b commit 9d9a97d
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 10 deletions.
82 changes: 77 additions & 5 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl Batcher {
&mut self,
propose_block_input: ProposeBlockInput,
) -> BatcherResult<()> {
let proposal_metrics_handle = ProposalMetricsHandle::new();
let active_height = self.active_height.ok_or(BatcherError::NoActiveHeight)?;
verify_block_input(
active_height,
Expand Down Expand Up @@ -192,8 +193,13 @@ impl Batcher {
)
.map_err(|_| BatcherError::InternalError)?;

self.spawn_proposal(propose_block_input.proposal_id, block_builder, abort_signal_sender)
.await?;
self.spawn_proposal(
propose_block_input.proposal_id,
block_builder,
abort_signal_sender,
proposal_metrics_handle,
)
.await?;

self.propose_tx_streams.insert(propose_block_input.proposal_id, output_tx_receiver);
Ok(())
Expand All @@ -204,6 +210,7 @@ impl Batcher {
&mut self,
validate_block_input: ValidateBlockInput,
) -> BatcherResult<()> {
let proposal_metrics_handle = ProposalMetricsHandle::new();
let active_height = self.active_height.ok_or(BatcherError::NoActiveHeight)?;
verify_block_input(
active_height,
Expand Down Expand Up @@ -239,8 +246,13 @@ impl Batcher {
)
.map_err(|_| BatcherError::InternalError)?;

self.spawn_proposal(validate_block_input.proposal_id, block_builder, abort_signal_sender)
.await?;
self.spawn_proposal(
validate_block_input.proposal_id,
block_builder,
abort_signal_sender,
proposal_metrics_handle,
)
.await?;

self.validate_tx_streams.insert(validate_block_input.proposal_id, input_tx_sender);
Ok(())
Expand Down Expand Up @@ -497,15 +509,28 @@ impl Batcher {
proposal_id: ProposalId,
mut block_builder: Box<dyn BlockBuilderTrait>,
abort_signal_sender: tokio::sync::oneshot::Sender<()>,
proposal_metrics_handle: ProposalMetricsHandle,
) -> BatcherResult<()> {
info!("Starting generation of a new proposal with id {}.", proposal_id);

let active_proposal = self.active_proposal.clone();
let executed_proposals = self.executed_proposals.clone();
let mut moved_proposal_metrics_handle = proposal_metrics_handle;

let join_handle = tokio::spawn(
async move {
let result = block_builder.build_block().await.map_err(Arc::new);
let result = match block_builder.build_block().await {
Ok(artifacts) => {
moved_proposal_metrics_handle.set_succeeded();
Ok(artifacts)
}
Err(BlockBuilderError::Aborted) => {
moved_proposal_metrics_handle.set_aborted();
Err(BlockBuilderError::Aborted)
}
Err(e) => Err(e),
}
.map_err(Arc::new);

// The proposal is done, clear the active proposal.
// Keep the proposal result only if it is the same as the active proposal.
Expand Down Expand Up @@ -614,3 +639,50 @@ impl BatcherStorageWriterTrait for papyrus_storage::StorageWriter {
}

impl ComponentStarter for Batcher {}

/// A handle to update the proposal metrics when the proposal is created and dropped.
#[derive(Debug)]
struct ProposalMetricsHandle {
finish_status: ProposalFinishStatus,
}

impl ProposalMetricsHandle {
pub fn new() -> Self {
counter!(crate::metrics::PROPOSAL_STARTED.name).increment(1);
Self { finish_status: ProposalFinishStatus::Failed }
}

pub fn set_succeeded(&mut self) {
debug!("Proposal succeeded.");
self.finish_status = ProposalFinishStatus::Succeeded;
}

pub fn set_aborted(&mut self) {
debug!("Proposal aborted.");
self.finish_status = ProposalFinishStatus::Aborted;
}
}

#[derive(Debug)]
enum ProposalFinishStatus {
Succeeded,
Aborted,
Failed,
}

impl Drop for ProposalMetricsHandle {
fn drop(&mut self) {
debug!("Proposal finished: {:?}", self.finish_status);
match self.finish_status {
ProposalFinishStatus::Succeeded => {
counter!(crate::metrics::PROPOSAL_SUCCEEDED.name).increment(1)
}
ProposalFinishStatus::Aborted => {
counter!(crate::metrics::PROPOSAL_ABORTED.name).increment(1)
}
ProposalFinishStatus::Failed => {
counter!(crate::metrics::PROPOSAL_FAILED.name).increment(1)
}
}
}
}
101 changes: 99 additions & 2 deletions crates/starknet_batcher/src/batcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,16 @@ async fn consecutive_heights_success() {
#[rstest]
#[tokio::test]
async fn validate_block_full_flow() {
let recorder = PrometheusBuilder::new().build_recorder();
let _g = metrics::set_default_local_recorder(&recorder);
let metrics_handle = recorder.handle();
let mut batcher =
batcher_with_active_validate_block(Ok(BlockExecutionArtifacts::create_for_testing())).await;
let metrics = metrics_handle.render();
assert_eq!(
parse_numeric_metric::<u64>(&metrics, crate::metrics::PROPOSAL_STARTED.name),
Some(1)
);

let send_proposal_input_txs = SendProposalContentInput {
proposal_id: PROPOSAL_ID,
Expand All @@ -289,6 +297,11 @@ async fn validate_block_full_flow() {
batcher.send_proposal_content(finish_proposal).await.unwrap(),
SendProposalContentResponse { response: ProposalStatus::Finished(proposal_commitment()) }
);
let metrics = metrics_handle.render();
assert_eq!(
parse_numeric_metric::<u64>(&metrics, crate::metrics::PROPOSAL_SUCCEEDED.name),
Some(1)
);
}

#[rstest]
Expand Down Expand Up @@ -353,20 +366,34 @@ async fn send_proposal_content_after_finish_or_abort(
#[rstest]
#[tokio::test]
async fn send_proposal_content_abort() {
let mut batcher =
batcher_with_active_validate_block(Ok(BlockExecutionArtifacts::create_for_testing())).await;
let recorder = PrometheusBuilder::new().build_recorder();
let _g = metrics::set_default_local_recorder(&recorder);
let metrics_handle = recorder.handle();
let mut batcher = batcher_with_active_validate_block(Err(BlockBuilderError::Aborted)).await;
let metrics = metrics_handle.render();
assert_proposal_metrics(&metrics, 1, 0, 0, 0);

let send_abort_proposal =
SendProposalContentInput { proposal_id: PROPOSAL_ID, content: SendProposalContent::Abort };
assert_eq!(
batcher.send_proposal_content(send_abort_proposal).await.unwrap(),
SendProposalContentResponse { response: ProposalStatus::Aborted }
);

// The block builder is running in a separate task, and the proposal metrics are emitted from
// that task, so we need to wait for them.
// TODO: Find a way to wait for the metrics to be emitted.
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let metrics = metrics_handle.render();
assert_proposal_metrics(&metrics, 1, 0, 0, 1);
}

#[rstest]
#[tokio::test]
async fn propose_block_full_flow() {
let recorder = PrometheusBuilder::new().build_recorder();
let _g = metrics::set_default_local_recorder(&recorder);
let metrics_handle = recorder.handle();
// Expecting 3 chunks of streamed txs.
let expected_streamed_txs = test_txs(0..STREAMING_CHUNK_SIZE * 2 + 1);

Expand Down Expand Up @@ -409,6 +436,12 @@ async fn propose_block_full_flow() {
let exhausted =
batcher.get_proposal_content(GetProposalContentInput { proposal_id: PROPOSAL_ID }).await;
assert_matches!(exhausted, Err(BatcherError::ProposalNotFound { .. }));

let metrics = metrics_handle.render();
assert_eq!(
parse_numeric_metric::<u64>(&metrics, crate::metrics::PROPOSAL_SUCCEEDED.name),
Some(1)
);
}

#[rstest]
Expand Down Expand Up @@ -455,6 +488,9 @@ async fn get_content_from_unknown_proposal() {
#[rstest]
#[tokio::test]
async fn consecutive_proposal_generation_success() {
let recorder = PrometheusBuilder::new().build_recorder();
let _g = metrics::set_default_local_recorder(&recorder);
let metrics_handle = recorder.handle();
let mut block_builder_factory = MockBlockBuilderFactoryTrait::new();
for _ in 0..2 {
mock_create_builder_for_propose_block(
Expand Down Expand Up @@ -485,18 +521,37 @@ async fn consecutive_proposal_generation_success() {
batcher.send_proposal_content(finish_proposal).await.unwrap();
batcher.await_active_proposal().await;
}

let metrics = metrics_handle.render();
assert_proposal_metrics(&metrics, 4, 4, 0, 0);
}

#[rstest]
#[tokio::test]
async fn concurrent_proposals_generation_fail() {
let recorder = PrometheusBuilder::new().build_recorder();
let _g = metrics::set_default_local_recorder(&recorder);
let metrics_handle = recorder.handle();
let mut batcher =
batcher_with_active_validate_block(Ok(BlockExecutionArtifacts::create_for_testing())).await;

// Make sure another proposal can't be generated while the first one is still active.
let result = batcher.propose_block(propose_block_input(ProposalId(1))).await;

assert_matches!(result, Err(BatcherError::AnotherProposalInProgress { .. }));

// Finish the first proposal.
batcher
.send_proposal_content(SendProposalContentInput {
proposal_id: ProposalId(0),
content: SendProposalContent::Finish,
})
.await
.unwrap();
batcher.await_active_proposal().await;

let metrics = metrics_handle.render();
assert_proposal_metrics(&metrics, 2, 1, 1, 0);
}

#[rstest]
Expand Down Expand Up @@ -643,3 +698,45 @@ pub fn test_state_diff() -> ThinStateDiff {
..Default::default()
}
}

fn assert_proposal_metrics(
metrics: &str,
expected_started: u64,
expected_succeeded: u64,
expected_failed: u64,
expected_aborted: u64,
) {
let started = parse_numeric_metric::<u64>(metrics, crate::metrics::PROPOSAL_STARTED.name);
let succeeded = parse_numeric_metric::<u64>(metrics, crate::metrics::PROPOSAL_SUCCEEDED.name);
let failed = parse_numeric_metric::<u64>(metrics, crate::metrics::PROPOSAL_FAILED.name);
let aborted = parse_numeric_metric::<u64>(metrics, crate::metrics::PROPOSAL_ABORTED.name);

assert_eq!(
started,
Some(expected_started),
"unexpected value proposal_started, expected {} got {:?}",
expected_started,
started,
);
assert_eq!(
succeeded,
Some(expected_succeeded),
"unexpected value proposal_succeeded, expected {} got {:?}",
expected_succeeded,
succeeded,
);
assert_eq!(
failed,
Some(expected_failed),
"unexpected value proposal_failed, expected {} got {:?}",
expected_failed,
failed,
);
assert_eq!(
aborted,
Some(expected_aborted),
"unexpected value proposal_aborted, expected {} got {:?}",
expected_aborted,
aborted,
);
}
22 changes: 19 additions & 3 deletions crates/starknet_batcher/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ use starknet_api::block::BlockNumber;

pub const STORAGE_HEIGHT: Metric =
Metric { name: "batcher_storage_height", description: "The height of the batcher storage" };
pub const PROPOSAL_STARTED: Metric =
Metric { name: "batcher_proposal_started", description: "Counter of proposals started" };
pub const PROPOSAL_SUCCEEDED: Metric =
Metric { name: "batcher_proposal_succeeded", description: "Counter of successful proposals" };
pub const PROPOSAL_FAILED: Metric =
Metric { name: "batcher_proposal_failed", description: "Counter of failed proposals" };
pub const PROPOSAL_ABORTED: Metric =
Metric { name: "batcher_proposal_aborted", description: "Counter of aborted proposals" };

pub struct Metric {
pub name: &'static str,
Expand All @@ -11,9 +19,17 @@ pub struct Metric {

pub fn register_metrics(storage_height: BlockNumber) {
// Ideally, we would have a `Gauge` here because of reverts, but we can't because
// the value will need to implement `Into<f64>` and `BlockNumber` doesn't.
// the value will need to implement `Into<f64>` and `BlockNumber` doesn't (it is u64).
// In case of reverts, consider calling `absolute`.
let storage_height_metric = counter!(STORAGE_HEIGHT.name);
counter!(STORAGE_HEIGHT.name).absolute(storage_height.0);
describe_counter!(STORAGE_HEIGHT.name, STORAGE_HEIGHT.description);
storage_height_metric.absolute(storage_height.0);

counter!(PROPOSAL_STARTED.name).absolute(0);
describe_counter!(PROPOSAL_STARTED.name, PROPOSAL_STARTED.description);
counter!(PROPOSAL_SUCCEEDED.name).absolute(0);
describe_counter!(PROPOSAL_SUCCEEDED.name, PROPOSAL_SUCCEEDED.description);
counter!(PROPOSAL_FAILED.name).absolute(0);
describe_counter!(PROPOSAL_FAILED.name, PROPOSAL_FAILED.description);
counter!(PROPOSAL_ABORTED.name).absolute(0);
describe_counter!(PROPOSAL_ABORTED.name, PROPOSAL_ABORTED.description);
}

0 comments on commit 9d9a97d

Please sign in to comment.