diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index 42c81721e..2a42c5bf1 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -16,7 +16,7 @@ import ( "github.com/rs/zerolog/log" ) -const maxLeaderMsgReceiveTimeout = 100 * time.Millisecond +const maxLeaderMsgReceiveTimeout = 150 * time.Millisecond func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Config, signHelper *helper.Signer, latestLocalAggregates *LatestLocalAggregates) (*Aggregator, error) { if h == nil || ps == nil || topicString == "" { @@ -169,12 +169,14 @@ func (n *Aggregator) HandlePriceDataMessage(ctx context.Context, msg raft.Messag n.storeRoundPriceData(priceDataMessage.RoundID, priceDataMessage.PriceData, msg.SentFrom) + if len(n.roundPrices.prices[priceDataMessage.RoundID]) == 1 { + // if it's first message for the round + go n.startPriceCollectionTimeout(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp) + } + if len(n.roundPrices.prices[priceDataMessage.RoundID]) == n.Raft.SubscribersCount()+1 { // if all messsages received for the round return n.processCollectedPrices(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp) - } else if len(n.roundPrices.prices[priceDataMessage.RoundID]) == 1 { - // if it's first message for the round - go n.startPriceCollectionTimeout(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp) } return nil @@ -296,10 +298,12 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e n.storeRoundProofData(proofMessage.RoundID, proofMessage.Proof, msg.SentFrom) + if len(n.roundProofs.proofs[proofMessage.RoundID]) == 1 { + go n.startProofCollectionTimeout(ctx, proofMessage) + } + if len(n.roundProofs.proofs[proofMessage.RoundID]) == n.Raft.SubscribersCount()+1 { return n.processCollectedProofs(ctx, proofMessage) - } else if len(n.roundProofs.proofs[proofMessage.RoundID]) == 1 { - go n.startProofCollectionTimeout(ctx, proofMessage) } return nil