diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index 82eba9cba..e33430310 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -103,32 +103,28 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message) } defer n.leaveOnlyLast10Entries(triggerMessage.RoundID) - if msg.SentFrom != n.Raft.GetHostId() { - // follower can be changed into leader unexpectedly before recieving the message - // increase round id before checking the message sent from leader - // so that the next round will be triggered with larger round id - // prevents already handled message error - - n.mu.Lock() - n.RoundID = max(triggerMessage.RoundID, n.RoundID) - n.mu.Unlock() - } - if triggerMessage.RoundID == 0 { log.Error().Str("Player", "Aggregator").Msg("invalid trigger message") return errorSentinel.ErrAggregatorInvalidRaftMessage } - if msg.SentFrom != n.Raft.GetLeader() { - log.Warn().Str("Player", "Aggregator").Msg("trigger message sent from non-leader") + currentLeader := n.Raft.GetLeader() + if msg.SentFrom != currentLeader { + log.Warn().Str("Player", "Aggregator").Str("Sender", msg.SentFrom).Str("CurrentLeader", currentLeader).Str("Me", n.Raft.GetHostId()).Msg("trigger message sent from non-leader") return errorSentinel.ErrAggregatorNonLeaderRaftMessage } + if msg.SentFrom != n.Raft.GetHostId() { + n.mu.Lock() + n.RoundID = max(triggerMessage.RoundID, n.RoundID) + n.mu.Unlock() + } + n.roundTriggers.mu.Lock() defer n.roundTriggers.mu.Unlock() if n.roundTriggers.locked[triggerMessage.RoundID] { - log.Warn().Str("Player", "Aggregator").Int32("RoundID", triggerMessage.RoundID).Msg("trigger message already processed") + log.Warn().Str("Player", "Aggregator").Str("Sender", msg.SentFrom).Str("CurrentLeader", currentLeader).Str("Me", n.Raft.GetHostId()).Int32("RoundID", triggerMessage.RoundID).Msg("trigger message already processed") return nil } n.roundTriggers.locked[triggerMessage.RoundID] = true @@ -210,8 +206,9 @@ func (n *Aggregator) HandlePriceFixMessage(ctx context.Context, msg raft.Message return err } - if msg.SentFrom != n.Raft.GetLeader() { - log.Warn().Str("Player", "Aggregator").Str("Leader", n.Raft.GetLeader()).Msg("price fix message sent from non-leader") + currentLeader := n.Raft.GetLeader() + if msg.SentFrom != currentLeader { + log.Warn().Str("Player", "Aggregator").Str("Sender", msg.SentFrom).Str("CurrentLeader", currentLeader).Str("Me", n.Raft.GetHostId()).Msg("price fix message sent from non-leader") return errorSentinel.ErrAggregatorNonLeaderRaftMessage }