Skip to content

Commit

Permalink
fix: update logic & increase timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Sep 29, 2024
1 parent 8198ce1 commit 283e225
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/rs/zerolog/log"
)

const maxLeaderMsgReceiveTimeout = 100 * time.Millisecond
const maxLeaderMsgReceiveTimeout = 150 * time.Millisecond

func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Config, signHelper *helper.Signer, latestLocalAggregates *LatestLocalAggregates) (*Aggregator, error) {
if h == nil || ps == nil || topicString == "" {
Expand Down Expand Up @@ -169,12 +169,14 @@ func (n *Aggregator) HandlePriceDataMessage(ctx context.Context, msg raft.Messag

n.storeRoundPriceData(priceDataMessage.RoundID, priceDataMessage.PriceData, msg.SentFrom)

if len(n.roundPrices.prices[priceDataMessage.RoundID]) == 1 {
// if it's first message for the round
go n.startPriceCollectionTimeout(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp)
}

if len(n.roundPrices.prices[priceDataMessage.RoundID]) == n.Raft.SubscribersCount()+1 {
// if all messsages received for the round
return n.processCollectedPrices(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp)
} else if len(n.roundPrices.prices[priceDataMessage.RoundID]) == 1 {
// if it's first message for the round
go n.startPriceCollectionTimeout(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp)
}

return nil
Expand Down Expand Up @@ -296,10 +298,12 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e

n.storeRoundProofData(proofMessage.RoundID, proofMessage.Proof, msg.SentFrom)

if len(n.roundProofs.proofs[proofMessage.RoundID]) == 1 {
go n.startProofCollectionTimeout(ctx, proofMessage)
}

if len(n.roundProofs.proofs[proofMessage.RoundID]) == n.Raft.SubscribersCount()+1 {
return n.processCollectedProofs(ctx, proofMessage)
} else if len(n.roundProofs.proofs[proofMessage.RoundID]) == 1 {
go n.startProofCollectionTimeout(ctx, proofMessage)
}

return nil
Expand Down

0 comments on commit 283e225

Please sign in to comment.