Skip to content

Commit

Permalink
feat: add more logs (#2291)
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai authored Sep 28, 2024
1 parent dccdc9d commit bd6313d
Showing 1 changed file with 13 additions and 16 deletions.
29 changes: 13 additions & 16 deletions node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,32 +103,28 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message)
}
defer n.leaveOnlyLast10Entries(triggerMessage.RoundID)

if msg.SentFrom != n.Raft.GetHostId() {
// follower can be changed into leader unexpectedly before recieving the message
// increase round id before checking the message sent from leader
// so that the next round will be triggered with larger round id
// prevents already handled message error

n.mu.Lock()
n.RoundID = max(triggerMessage.RoundID, n.RoundID)
n.mu.Unlock()
}

if triggerMessage.RoundID == 0 {
log.Error().Str("Player", "Aggregator").Msg("invalid trigger message")
return errorSentinel.ErrAggregatorInvalidRaftMessage
}

if msg.SentFrom != n.Raft.GetLeader() {
log.Warn().Str("Player", "Aggregator").Msg("trigger message sent from non-leader")
currentLeader := n.Raft.GetLeader()
if msg.SentFrom != currentLeader {
log.Warn().Str("Player", "Aggregator").Str("Sender", msg.SentFrom).Str("CurrentLeader", currentLeader).Str("Me", n.Raft.GetHostId()).Msg("trigger message sent from non-leader")
return errorSentinel.ErrAggregatorNonLeaderRaftMessage
}

if msg.SentFrom != n.Raft.GetHostId() {
n.mu.Lock()
n.RoundID = max(triggerMessage.RoundID, n.RoundID)
n.mu.Unlock()
}

n.roundTriggers.mu.Lock()
defer n.roundTriggers.mu.Unlock()

if n.roundTriggers.locked[triggerMessage.RoundID] {
log.Warn().Str("Player", "Aggregator").Int32("RoundID", triggerMessage.RoundID).Msg("trigger message already processed")
log.Warn().Str("Player", "Aggregator").Str("Sender", msg.SentFrom).Str("CurrentLeader", currentLeader).Str("Me", n.Raft.GetHostId()).Int32("RoundID", triggerMessage.RoundID).Msg("trigger message already processed")
return nil
}
n.roundTriggers.locked[triggerMessage.RoundID] = true
Expand Down Expand Up @@ -210,8 +206,9 @@ func (n *Aggregator) HandlePriceFixMessage(ctx context.Context, msg raft.Message
return err
}

if msg.SentFrom != n.Raft.GetLeader() {
log.Warn().Str("Player", "Aggregator").Str("Leader", n.Raft.GetLeader()).Msg("price fix message sent from non-leader")
currentLeader := n.Raft.GetLeader()
if msg.SentFrom != currentLeader {
log.Warn().Str("Player", "Aggregator").Str("Sender", msg.SentFrom).Str("CurrentLeader", currentLeader).Str("Me", n.Raft.GetHostId()).Msg("price fix message sent from non-leader")
return errorSentinel.ErrAggregatorNonLeaderRaftMessage
}

Expand Down

0 comments on commit bd6313d

Please sign in to comment.