Skip to content

Commit

Permalink
chore(consensus): update logs for building a proposal (starkware-libs…
Browse files Browse the repository at this point in the history
  • Loading branch information
matan-starkware authored Jan 9, 2025
1 parent 9523120 commit 6f72e5c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 45 deletions.
18 changes: 10 additions & 8 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ impl SingleHeightConsensus {
ShcEvent::BuildProposal(StateMachineEvent::GetProposal(proposal_id, round)) => {
let old = self.proposals.insert(round, proposal_id);
assert!(old.is_none(), "There should be no entry for round {round} when proposing");
info!("Built proposal: {:?} in round {round}", proposal_id);
assert_eq!(
round,
self.state_machine.round(),
"State machine should not progress while awaiting proposal"
);
info!(%round, proposal_content_id = ?proposal_id, "Built proposal.");
let leader_fn =
|round: Round| -> ValidatorId { context.proposer(self.height, round) };
let sm_events = self
Expand Down Expand Up @@ -433,8 +438,7 @@ impl SingleHeightConsensus {
.await?,
);
}
StateMachineEvent::TimeoutPropose(round) => {
info!("Starting round {round} as Validator");
StateMachineEvent::TimeoutPropose(_) => {
ret_val.push(ShcTask::TimeoutPropose(self.timeouts.proposal_timeout, event));
}
StateMachineEvent::TimeoutPrevote(_) => {
Expand All @@ -458,10 +462,8 @@ impl SingleHeightConsensus {
) -> Vec<ShcTask> {
assert!(
proposal_id.is_none(),
"ProposalContentId must be None since the state machine is requesting a \
ProposalContentId"
"StateMachine is requesting a new proposal, but provided a content id."
);
info!("Starting round {round} as Proposer");

// TODO: Figure out how to handle failed proposal building. I believe this should be handled
// by applying timeoutPropose when we are the leader.
Expand Down Expand Up @@ -492,15 +494,15 @@ impl SingleHeightConsensus {
panic!("A valid proposal should exist for valid_round: {valid_round}")
});
assert_eq!(id, proposal_id, "reproposal should match the stored proposal");
let old = self.proposals.insert(round, Some(proposal_id));
assert!(old.is_none(), "There should be no proposal for round {round}.");
let init = ProposalInit {
height: self.height,
round,
proposer: self.id,
valid_round: Some(valid_round),
};
context.repropose(id, init).await;
let old = self.proposals.insert(round, Some(proposal_id));
assert!(old.is_none(), "There should be no proposal for round {round}.");
}

async fn handle_state_machine_vote<ContextT: ConsensusContext>(
Expand Down
4 changes: 3 additions & 1 deletion crates/sequencing/papyrus_consensus/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod state_machine_test;

use std::collections::{HashMap, HashSet, VecDeque};

use tracing::trace;
use tracing::{info, trace};

use crate::types::{ProposalContentId, Round, ValidatorId};

Expand Down Expand Up @@ -337,6 +337,7 @@ impl StateMachine {
self.round = round;
self.step = Step::Propose;
let mut output = if !self.is_observer && self.id == leader_fn(self.round) {
info!("Starting round {round} as Proposer");
// Leader.
match self.valid_value_round {
Some((proposal_id, valid_round)) => VecDeque::from([StateMachineEvent::Proposal(
Expand All @@ -351,6 +352,7 @@ impl StateMachine {
}
}
} else {
info!("Starting round {round} as Validator");
VecDeque::from([StateMachineEvent::TimeoutPropose(self.round)])
};
output.append(&mut self.current_round_upons());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,12 @@ impl SequencerConsensusContext {
impl ConsensusContext for SequencerConsensusContext {
type ProposalPart = ProposalPart;

#[instrument(level = "info", skip_all, fields(proposal_init))]
#[instrument(skip_all)]
async fn build_proposal(
&mut self,
proposal_init: ProposalInit,
timeout: Duration,
) -> oneshot::Receiver<ProposalContentId> {
info!("Building proposal: timeout={timeout:?}");
let cende_write_success = AbortOnDropHandle::new(
self.cende_ambassador.write_prev_height_blob(proposal_init.height),
);
Expand All @@ -215,6 +214,7 @@ impl ConsensusContext for SequencerConsensusContext {
.expect("Failed to send proposal receiver");
let gas_prices = self.gas_prices();

info!(?proposal_init, ?timeout, %proposal_id, "Building proposal");
let handle = tokio::spawn(
async move {
build_proposal(
Expand All @@ -230,7 +230,7 @@ impl ConsensusContext for SequencerConsensusContext {
)
.await;
}
.instrument(debug_span!("consensus_build_proposal")),
.instrument(debug_span!("consensus_build_proposal", %proposal_id)),
);
assert!(self.active_proposal.is_none());
// The cancellation token is unused by the spawned build.
Expand Down Expand Up @@ -281,8 +281,8 @@ impl ConsensusContext for SequencerConsensusContext {
}

async fn repropose(&mut self, id: ProposalContentId, init: ProposalInit) {
info!(?id, ?init, "Reproposing.");
let height = init.height;
debug!("Getting proposal for height: {height} and id: {id}");
let (_transactions, _) = self
.valid_proposals
.lock()
Expand Down Expand Up @@ -489,7 +489,7 @@ impl SequencerConsensusContext {
async fn interrupt_active_proposal(&mut self) {
if let Some((token, handle)) = self.active_proposal.take() {
token.cancel();
handle.await.expect("Proposal task panicked");
handle.await.expect("Proposal task failed");
}
}
}
Expand All @@ -508,20 +508,14 @@ async fn build_proposal(
gas_prices: GasPrices,
) {
initialize_build(proposal_id, &proposal_init, timeout, batcher.as_ref(), gas_prices).await;
debug!("Broadcasting proposal init: {proposal_init:?}");
proposal_sender
.send(ProposalPart::Init(proposal_init))
.await
.expect("Failed to send proposal init");

let Some((proposal_content_id, content)) = get_proposal_content(
proposal_init.height,
proposal_id,
batcher.as_ref(),
proposal_sender,
cende_write_success,
)
.await
let Some((proposal_content_id, content)) =
get_proposal_content(proposal_id, batcher.as_ref(), proposal_sender, cende_write_success)
.await
else {
return;
};
Expand Down Expand Up @@ -580,7 +574,6 @@ async fn initialize_build(
// 2. Forward these to the stream handler to be streamed out to the network.
// 3. Once finished, receive the commitment from the batcher.
async fn get_proposal_content(
height: BlockNumber,
proposal_id: ProposalId,
batcher: &dyn BatcherClient,
mut proposal_sender: mpsc::Sender<ProposalPart>,
Expand All @@ -599,33 +592,23 @@ async fn get_proposal_content(
match response.content {
GetProposalContent::Txs(txs) => {
content.extend_from_slice(&txs[..]);
// TODO: Broadcast the transactions to the network.
// TODO(matan): Convert to protobuf and make sure this isn't too large for a single
// proto message (could this be a With adapter added to the channel in `new`?).
let transaction_hashes =
txs.iter().map(|tx| tx.tx_hash()).collect::<Vec<TransactionHash>>();
debug!("Broadcasting proposal content: {transaction_hashes:?}");

// TODO(matan): Make sure this isn't too large for a single proto message.
debug!(
hashes = ?txs.iter().map(|tx| tx.tx_hash()).collect::<Vec<TransactionHash>>(),
"Sending transaction batch with {} txs.",
txs.len()
);
let transactions =
txs.into_iter().map(|tx| tx.into()).collect::<Vec<Transaction>>();
trace!("Broadcasting proposal content: {transactions:?}");

trace!(?transactions, "Sending transaction batch with {} txs.", transactions.len());
proposal_sender
.send(ProposalPart::Transactions(TransactionBatch { transactions }))
.await
.expect("Failed to broadcast proposal content");
}
GetProposalContent::Finished(id) => {
let proposal_content_id = BlockHash(id.state_diff_commitment.0.0);
info!(
"Finished building proposal {:?}: content_id = {:?}, num_txs = {:?}, height = \
{:?}",
proposal_id,
proposal_content_id,
content.len(),
height
);
debug!("Broadcasting proposal fin: {proposal_content_id:?}");
info!(?proposal_content_id, num_txs = content.len(), "Finished building proposal",);

// If the blob writing operation to Aerospike doesn't return a success status, we
// can't finish the proposal.
Expand Down Expand Up @@ -775,7 +758,7 @@ async fn handle_proposal_part(
match proposal_part {
None => HandledProposalPart::Failed("Failed to receive proposal content".to_string()),
Some(ProposalPart::Transactions(TransactionBatch { transactions: txs })) => {
debug!("Received TransactionBatch with {} txs", txs.len());
debug!("Received transaction batch with {} txs", txs.len());
let exe_txs: Vec<ExecutableTransaction> = txs
.into_iter()
.map(|tx| {
Expand Down Expand Up @@ -815,8 +798,8 @@ async fn handle_proposal_part(
};
let batcher_block_id = BlockHash(response_id.state_diff_commitment.0.0);
info!(
network_block_id = %id,
%batcher_block_id,
network_block_id = ?id,
?batcher_block_id,
num_txs = %content.len(),
"Finished validating proposal."
);
Expand Down

0 comments on commit 6f72e5c

Please sign in to comment.