Skip to content

Commit

Permalink
Added Commit message and send this instead of Vote when aggregated co…
Browse files Browse the repository at this point in the history
…mmit
  • Loading branch information
jmalicevic committed Jan 11, 2025
1 parent 46423c8 commit 1a36a1b
Show file tree
Hide file tree
Showing 10 changed files with 456 additions and 89 deletions.
8 changes: 8 additions & 0 deletions api/cometbft/consensus/v1/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ func (m *Vote) Wrap() proto.Message {
return cm
}

func (m *Commit) Wrap() proto.Message {
cm := &Message{}
cm.Sum = &Message_Commit{Commit: m}
return cm
}

func (m *BlockPart) Wrap() proto.Message {
cm := &Message{}
cm.Sum = &Message_BlockPart{BlockPart: m}
Expand Down Expand Up @@ -87,6 +93,8 @@ func (m *Message) Unwrap() (proto.Message, error) {

case *Message_Vote:
return m.GetVote(), nil
case *Message_Commit:
return m.GetCommit(), nil

case *Message_HasVote:
return m.GetHasVote(), nil
Expand Down
388 changes: 326 additions & 62 deletions api/cometbft/consensus/v1/types.pb.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions internal/consensus/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func MsgToWrappedProto(msg Message) (cmtcons.Message, error) {
Vote: vote,
}}

case *CommitMessage:
commit := msg.Commit.ToProto()
pb.Sum = &cmtcons.Message_Commit{Commit: &cmtcons.Commit{
Commit: commit,
}}
case *HasVoteMessage:
pb.Sum = &cmtcons.Message_HasVote{HasVote: &cmtcons.HasVote{
Height: msg.Height,
Expand Down Expand Up @@ -200,6 +205,17 @@ func MsgFromProto(p proto.Message) (Message, error) {
pb = &VoteMessage{
Vote: vote,
}
case *cmtcons.Commit:
// Commit validation will be handled in the vote message ValidateBasic
// call below.
commit, err := types.CommitFromProto(msg.Commit)
if err != nil {
return nil, cmterrors.ErrMsgToProto{MessageName: "Commit", Err: err}
}

pb = &CommitMessage{
Commit: commit,
}
case *cmtcons.HasVote:
pb = &HasVoteMessage{
Height: msg.Height,
Expand Down
104 changes: 83 additions & 21 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,12 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
ps.SetHasVoteFromPeer(msg.Vote, height, valSize, lastCommitSize)

cs.peerMsgQueue <- msgInfo{msg, e.Src.ID(), time.Time{}}
// case *CommitMessage:
// TODO Send it to the queue like VoteMessage
case *CommitMessage:
cs := conR.conS
cs.Logger.Info("XX Received ")
// TODO setHasAggregatedVoteFromPeer
cs.peerMsgQueue <- msgInfo{msg, e.Src.ID(), time.Time{}}

default:
// don't punish (leave room for soft upgrades)
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
Expand Down Expand Up @@ -687,6 +691,20 @@ OUTER_LOOP:
"height", prs.Height,
"vote", vote,
)
} else {
if c := sendEntireCommit(logger, conR.conS, rs, ps, prs, rng); c != nil {
commit, ok := (*c).(*types.Commit)
if !ok {
panic("Wrong type,got")
}
if ps.sendCommit(commit) {
logger.Info("XXX SENT COMMIT")
continue OUTER_LOOP
}
logger.Info("XX Failed to send commit to peer",
"height", prs.Height,
"commit", commit)
}
}

if sleeping == 0 {
Expand Down Expand Up @@ -885,29 +903,13 @@ func pickPartForCatchup(
return part
}

func pickVoteToSend(
logger log.Logger,
func sendEntireCommit(logger log.Logger,
conS *State,
rs *cstypes.RoundState,
ps *PeerState,
prs *cstypes.PeerRoundState,
rng *rand.Rand,
) *types.Vote {
// If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height {
heightLogger := logger.With("height", prs.Height)
return pickVoteCurrentHeight(heightLogger, rs, prs, ps, rng)
}

// Special catchup logic.
// If peer is lagging by height 1, send LastCommit.
if prs.Height != 0 && rs.Height == prs.Height+1 {
if vote := ps.PickVoteToSend(rs.LastCommit, rng); vote != nil {
logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
return vote
}
}

) *types.VoteSetReader {
// Catchup logic
// If peer is lagging by more than 1, send Commit.
blockStoreBase := conS.blockStore.Base()
Expand All @@ -929,14 +931,41 @@ func pickVoteToSend(
if commit == nil {
return nil
}

return &commit
// TODO we cannot pick a vote. Here is the place where we need to send the whole commit.
// if vote := ps.PickVoteToSend(commit, rng); vote != nil {
// logger.Debug("Picked Catchup commit to send", "height", prs.Height)
// return vote
// }
}
return nil

}

func pickVoteToSend(
logger log.Logger,
conS *State,
rs *cstypes.RoundState,
ps *PeerState,
prs *cstypes.PeerRoundState,
rng *rand.Rand,
) *types.Vote {
// If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height {
heightLogger := logger.With("height", prs.Height)
return pickVoteCurrentHeight(heightLogger, rs, prs, ps, rng)
}

// Special catchup logic.
// If peer is lagging by height 1, send LastCommit.
if prs.Height != 0 && rs.Height == prs.Height+1 {
if vote := ps.PickVoteToSend(rs.LastCommit, rng); vote != nil {
logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
return vote
}
}

return nil
}

func pickVoteCurrentHeight(
Expand Down Expand Up @@ -1265,6 +1294,21 @@ func (ps *PeerState) SendProposalSetHasProposal(
}
}

// sendCommit sends the vote to the peer.
// Returns true and marks the peer as having the vote if the vote was sent.
func (ps *PeerState) sendCommit(commit *types.Commit) bool {
ps.logger.Info("Sending commit message", "ps", ps, "commit", commit)
return ps.peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &cmtcons.Commit{
Commit: commit.ToProto(),
},
})

// TODO Set COMMIT TO PEER
// ps.SetHasVote(vote)
}

// sendVoteSetHasVote sends the vote to the peer.
// Returns true and marks the peer as having the vote if the vote was sent.
func (ps *PeerState) sendVoteSetHasVote(vote *types.Vote) bool {
Expand Down Expand Up @@ -1859,6 +1903,23 @@ func (m *BlockPartMessage) String() string {

// -------------------------------------

// VoteMessage is sent when voting for a proposal (or lack thereof).
type CommitMessage struct {
Commit *types.Commit
}

// ValidateBasic checks whether the vote within the message is well-formed.
func (m *CommitMessage) ValidateBasic() error {
return m.Commit.ValidateBasic()
}

// String returns a string representation.
func (m *CommitMessage) String() string {
return fmt.Sprintf("[Commit with aggregated signatures %v]", m.Commit)
}

// -------------------------------------

// VoteMessage is sent when voting for a proposal (or lack thereof).
type VoteMessage struct {
Vote *types.Vote
Expand Down Expand Up @@ -2010,4 +2071,5 @@ var (
_ types.Wrapper = &cmtcons.ProposalPOL{}
_ types.Wrapper = &cmtcons.VoteSetBits{}
_ types.Wrapper = &cmtcons.VoteSetMaj23{}
_ types.Wrapper = &cmtcons.Commit{}
)
4 changes: 3 additions & 1 deletion internal/consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscr
v := msg.Vote
cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"blockID", v.BlockID, "peer", peerID, "extensionLen", len(v.Extension), "extSigLen", len(v.ExtensionSignature))
case *CommitMessage:
c := msg.Commit
cs.Logger.Info("Replay: Commit", c.Height)
}

cs.handleMsg(m)
case timeoutInfo:
cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
Expand Down
9 changes: 7 additions & 2 deletions internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,9 @@ func (cs *State) OnStart() error {

LOOP:
for {
cs.Logger.Info("XXX Catchup start")
err := cs.catchupReplay(cs.Height)
cs.Logger.Info("XXX Catchup stop")
switch {
case err == nil:
break LOOP
Expand Down Expand Up @@ -880,6 +882,7 @@ func (cs *State) receiveRoutine(maxSteps int) {

// state transitions on complete-proposal, 2/3-any, 2/3-one.
func (cs *State) handleMsg(mi msgInfo) {
cs.Logger.Info("Handling msg")
cs.mtx.Lock()
defer cs.mtx.Unlock()
var (
Expand Down Expand Up @@ -953,9 +956,11 @@ func (cs *State) handleMsg(mi msgInfo) {
// the peer is sending us CatchupCommit precommits.
// We could make note of this and help filter in broadcastHasVoteMessage().

// case *CommitMessage:
case *CommitMessage:
cs.Logger.Info("XX received cmt message")
panic("RECEIVED COMMIT MESSAGE")
// TODO Commit has been validated Validate Basic when unmarshalling, but need to validate the the commit itself
// cs.AddCommit(aggCommit)
//cs.AddCommit(aggCommit)
default:
cs.Logger.Error("Unknown msg type", "type", fmt.Sprintf("%T", msg))
return
Expand Down
2 changes: 1 addition & 1 deletion p2p/conn/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
default:
}
} else {
c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", log.NewLazySprintf("%X", msgBytes))
c.Logger.Info("Send failed", "channel", chID, "conn", c, "msgBytes", log.NewLazySprintf("%X", msgBytes))
}
return success
}
Expand Down
6 changes: 6 additions & 0 deletions proto/cometbft/consensus/v1/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ message Vote {
cometbft.types.v1.Vote vote = 1;
}

// An entire Commit is sent when signatures are aggregated and we cannot
// extract a single signature.
message Commit {
cometbft.types.v1.Commit commit = 1;
}
// HasVote is sent to indicate that a particular vote has been received.
message HasVote {
int64 height = 1;
Expand Down Expand Up @@ -98,5 +103,6 @@ message Message {
VoteSetMaj23 vote_set_maj23 = 8;
VoteSetBits vote_set_bits = 9;
HasProposalBlockPart has_proposal_block_part = 10;
Commit commit = 11;
}
}
4 changes: 4 additions & 0 deletions test/e2e/networks/simple.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
prometheus = true
vote_extensions_update_height = -1
vote_extensions_enable_height = 0
pbts_update_height = -1
pbts_enable_height = 1
[node.validator00]
[node.validator01]
[node.validator02]
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/run-multiple.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ for MANIFEST in "$@"; do
echo "==> Dumping container logs for $MANIFEST..."
./build/runner -f "$MANIFEST" logs

echo "==> Cleaning up failed testnet $MANIFEST..."
./build/runner -f "$MANIFEST" cleanup
# echo "==> Cleaning up failed testnet $MANIFEST..."
# ./build/runner -f "$MANIFEST" cleanup

FAILED+=("$MANIFEST")
fi
Expand Down

0 comments on commit 1a36a1b

Please sign in to comment.