diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index 2a42c5bf1..58fb0d649 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -162,23 +162,28 @@ func (n *Aggregator) HandlePriceDataMessage(ctx context.Context, msg raft.Messag n.roundPrices.mu.Lock() defer n.roundPrices.mu.Unlock() - if n.roundPrices.locked[priceDataMessage.RoundID] || n.roundPrices.isReplay(priceDataMessage.RoundID, msg.SentFrom) { - log.Warn().Str("Player", "Aggregator").Int32("RoundID", priceDataMessage.RoundID).Msg("price data message already processed") + if n.roundPrices.locked[priceDataMessage.RoundID] { + log.Warn().Str("Player", "Aggregator").Str("Sender", msg.SentFrom).Str("Me", n.Raft.GetHostId()).Str("transmissionDelay", time.Since(msg.Timestamp).String()).Int32("RoundID", priceDataMessage.RoundID).Msg("price data message already processed") return nil } - n.storeRoundPriceData(priceDataMessage.RoundID, priceDataMessage.PriceData, msg.SentFrom) - - if len(n.roundPrices.prices[priceDataMessage.RoundID]) == 1 { - // if it's first message for the round - go n.startPriceCollectionTimeout(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp) + if n.roundPrices.isReplay(priceDataMessage.RoundID, msg.SentFrom) { + log.Warn().Str("Player", "Aggregator").Str("Sender", msg.SentFrom).Str("Me", n.Raft.GetHostId()).Int32("RoundID", priceDataMessage.RoundID).Msg("price data message replayed") + return nil } + n.storeRoundPriceData(priceDataMessage.RoundID, priceDataMessage.PriceData, msg.SentFrom) + if len(n.roundPrices.prices[priceDataMessage.RoundID]) == n.Raft.SubscribersCount()+1 { // if all messsages received for the round return n.processCollectedPrices(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp) } + if len(n.roundPrices.prices[priceDataMessage.RoundID]) == 1 { + // if it's first message for the round + go n.startPriceCollectionTimeout(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp) + } + return nil } @@ -291,21 +296,26 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e n.roundProofs.mu.Lock() defer n.roundProofs.mu.Unlock() - if n.roundProofs.locked[proofMessage.RoundID] || n.roundProofs.isReplay(proofMessage.RoundID, msg.SentFrom) { - log.Warn().Str("Player", "Aggregator").Int32("RoundID", proofMessage.RoundID).Msg("proof message already processed") + if n.roundProofs.locked[proofMessage.RoundID] { + log.Warn().Str("Player", "Aggregator").Str("Sender", msg.SentFrom).Str("Me", n.Raft.GetHostId()).Str("transmissionDelay", time.Since(msg.Timestamp).String()).Int32("RoundID", proofMessage.RoundID).Msg("proof message already processed") return nil } - n.storeRoundProofData(proofMessage.RoundID, proofMessage.Proof, msg.SentFrom) - - if len(n.roundProofs.proofs[proofMessage.RoundID]) == 1 { - go n.startProofCollectionTimeout(ctx, proofMessage) + if n.roundProofs.isReplay(proofMessage.RoundID, msg.SentFrom) { + log.Warn().Str("Player", "Aggregator").Str("Sender", msg.SentFrom).Str("Me", n.Raft.GetHostId()).Int32("RoundID", proofMessage.RoundID).Msg("proof message replayed") + return nil } + n.storeRoundProofData(proofMessage.RoundID, proofMessage.Proof, msg.SentFrom) + if len(n.roundProofs.proofs[proofMessage.RoundID]) == n.Raft.SubscribersCount()+1 { return n.processCollectedProofs(ctx, proofMessage) } + if len(n.roundProofs.proofs[proofMessage.RoundID]) == 1 { + go n.startProofCollectionTimeout(ctx, proofMessage) + } + return nil } diff --git a/node/pkg/raft/raft.go b/node/pkg/raft/raft.go index 87dd0a01c..07ef95205 100644 --- a/node/pkg/raft/raft.go +++ b/node/pkg/raft/raft.go @@ -235,6 +235,7 @@ func (r *Raft) handleReplyVote(ctx context.Context, msg Message) error { // publishing messages func (r *Raft) PublishMessage(ctx context.Context, msg Message) error { + msg.Timestamp = time.Now() data, err := json.Marshal(msg) if err != nil { return err diff --git a/node/pkg/raft/types.go b/node/pkg/raft/types.go index bfcf475f8..b43025c05 100644 --- a/node/pkg/raft/types.go +++ b/node/pkg/raft/types.go @@ -29,9 +29,10 @@ const ( ) type Message struct { - Type MessageType `json:"type"` - SentFrom string `json:"sentFrom"` - Data json.RawMessage `json:"data"` + Type MessageType `json:"type"` + SentFrom string `json:"sentFrom"` + Data json.RawMessage `json:"data"` + Timestamp time.Time `json:"timestamp"` } type RequestVoteMessage struct {