Skip to content

Commit

Permalink
fix(consensus): papyrus context repropose using streamed proposal cha…
Browse files Browse the repository at this point in the history
…nnel
  • Loading branch information
asmaastarkware committed Dec 22, 2024
1 parent 2929403 commit 8a90915
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use papyrus_consensus::types::{
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_protobuf::consensus::{
ConsensusMessage,
Proposal,
ProposalFin,
ProposalInit,
ProposalPart,
Expand Down Expand Up @@ -90,7 +89,7 @@ impl ConsensusContext for PapyrusConsensusContext {
_timeout: Duration,
) -> oneshot::Receiver<ProposalContentId> {
let height = proposal_init.height;
let mut proposal_sender_sender = self.network_proposal_sender.clone();
let mut network_proposal_sender = self.network_proposal_sender.clone();
let (fin_sender, fin_receiver) = oneshot::channel();

let storage_reader = self.storage_reader.clone();
Expand Down Expand Up @@ -120,7 +119,7 @@ impl ConsensusContext for PapyrusConsensusContext {

let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
let stream_id = height.0;
proposal_sender_sender
network_proposal_sender
.send((stream_id, proposal_receiver))
.await
.expect("Failed to send proposal receiver");
Expand Down Expand Up @@ -251,18 +250,24 @@ impl ConsensusContext for PapyrusConsensusContext {
.unwrap_or_else(|| panic!("No proposal found for height {height} and id {id}"))
.clone();

let proposal = Proposal {
height: height.0,
round: init.round,
proposer: init.proposer,
transactions,
block_hash: id,
valid_round: init.valid_round,
};
self.network_broadcast_client
.broadcast_message(ConsensusMessage::Proposal(proposal))
let stream_id = height.0;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
self.network_proposal_sender
.send((stream_id, proposal_receiver))
.await
.expect("Failed to send proposal");
.expect("Failed to send proposal receiver");
proposal_sender
.send(Self::ProposalPart::Init(init))
.await
.expect("Failed to send proposal init");
proposal_sender
.send(ProposalPart::Transactions(TransactionBatch { transactions, tx_hashes: vec![] }))
.await
.expect("Failed to send transactions");
proposal_sender
.send(ProposalPart::Fin(ProposalFin { proposal_content_id: id }))
.await
.expect("Failed to send fin");
}

async fn validators(&self, _height: BlockNumber) -> Vec<ValidatorId> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use papyrus_storage::header::HeaderStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
use papyrus_test_utils::get_test_block;
use starknet_api::block::{Block, BlockHash};
use test_case::test_case;

use crate::papyrus_consensus_context::PapyrusConsensusContext;

Expand All @@ -44,8 +45,10 @@ async fn build_proposal() {
assert_eq!(fin, block.header.block_hash);
}

#[test_case(true ; "repropose")]
#[test_case(false ; "dont_repropose")]
#[tokio::test]
async fn validate_proposal_success() {
async fn validate_proposal_success(repropose: bool) {
let (block, mut papyrus_context, _mock_network, _) = test_setup();
let block_number = block.header.block_header_without_hash.block_number;

Expand All @@ -72,6 +75,12 @@ async fn validate_proposal_success() {
.unwrap();

assert_eq!(fin.0, block.header.block_hash);
if repropose {
let proposal_init = ProposalInit { height: block_number, ..Default::default() };
// Context checks if the proposal exists in `self.valid_proposals` for retrieval and
// streaming. The user doesn't interact or see what's done with it.
papyrus_context.repropose(block.header.block_hash, proposal_init).await;
}
}

#[tokio::test]
Expand Down

0 comments on commit 8a90915

Please sign in to comment.