Skip to content

Commit

Permalink
feat: add logs, update logic order
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Sep 30, 2024
1 parent 4ee6f2d commit 4cbfe6c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 16 deletions.
36 changes: 23 additions & 13 deletions node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,23 +162,28 @@ func (n *Aggregator) HandlePriceDataMessage(ctx context.Context, msg raft.Messag
n.roundPrices.mu.Lock()
defer n.roundPrices.mu.Unlock()

if n.roundPrices.locked[priceDataMessage.RoundID] || n.roundPrices.isReplay(priceDataMessage.RoundID, msg.SentFrom) {
log.Warn().Str("Player", "Aggregator").Int32("RoundID", priceDataMessage.RoundID).Msg("price data message already processed")
if n.roundPrices.locked[priceDataMessage.RoundID] {
log.Warn().Str("Player", "Aggregator").Str("transmissionDelay", time.Since(msg.Timestamp).String()).Int32("RoundID", priceDataMessage.RoundID).Msg("price data message already processed")
return nil
}

n.storeRoundPriceData(priceDataMessage.RoundID, priceDataMessage.PriceData, msg.SentFrom)

if len(n.roundPrices.prices[priceDataMessage.RoundID]) == 1 {
// if it's first message for the round
go n.startPriceCollectionTimeout(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp)
if n.roundPrices.isReplay(priceDataMessage.RoundID, msg.SentFrom) {
log.Warn().Str("Player", "Aggregator").Int32("RoundID", priceDataMessage.RoundID).Msg("price data message replayed")
return nil
}

n.storeRoundPriceData(priceDataMessage.RoundID, priceDataMessage.PriceData, msg.SentFrom)

if len(n.roundPrices.prices[priceDataMessage.RoundID]) == n.Raft.SubscribersCount()+1 {
// if all messsages received for the round
return n.processCollectedPrices(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp)
}

if len(n.roundPrices.prices[priceDataMessage.RoundID]) == 1 {
// if it's first message for the round
go n.startPriceCollectionTimeout(ctx, priceDataMessage.RoundID, priceDataMessage.Timestamp)
}

return nil
}

Expand Down Expand Up @@ -291,21 +296,26 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e
n.roundProofs.mu.Lock()
defer n.roundProofs.mu.Unlock()

if n.roundProofs.locked[proofMessage.RoundID] || n.roundProofs.isReplay(proofMessage.RoundID, msg.SentFrom) {
log.Warn().Str("Player", "Aggregator").Int32("RoundID", proofMessage.RoundID).Msg("proof message already processed")
if n.roundProofs.locked[proofMessage.RoundID] {
log.Warn().Str("Player", "Aggregator").Str("transmissionDelay", time.Since(msg.Timestamp).String()).Int32("RoundID", proofMessage.RoundID).Msg("proof message already processed")
return nil
}

n.storeRoundProofData(proofMessage.RoundID, proofMessage.Proof, msg.SentFrom)

if len(n.roundProofs.proofs[proofMessage.RoundID]) == 1 {
go n.startProofCollectionTimeout(ctx, proofMessage)
if n.roundProofs.isReplay(proofMessage.RoundID, msg.SentFrom) {
log.Warn().Str("Player", "Aggregator").Int32("RoundID", proofMessage.RoundID).Msg("proof message replayed")
return nil
}

n.storeRoundProofData(proofMessage.RoundID, proofMessage.Proof, msg.SentFrom)

if len(n.roundProofs.proofs[proofMessage.RoundID]) == n.Raft.SubscribersCount()+1 {
return n.processCollectedProofs(ctx, proofMessage)
}

if len(n.roundProofs.proofs[proofMessage.RoundID]) == 1 {
go n.startProofCollectionTimeout(ctx, proofMessage)
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions node/pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func (r *Raft) handleReplyVote(ctx context.Context, msg Message) error {
// publishing messages

func (r *Raft) PublishMessage(ctx context.Context, msg Message) error {
msg.Timestamp = time.Now()
data, err := json.Marshal(msg)
if err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions node/pkg/raft/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ const (
)

type Message struct {
Type MessageType `json:"type"`
SentFrom string `json:"sentFrom"`
Data json.RawMessage `json:"data"`
Type MessageType `json:"type"`
SentFrom string `json:"sentFrom"`
Data json.RawMessage `json:"data"`
Timestamp time.Time `json:"timestamp"`
}

type RequestVoteMessage struct {
Expand Down

0 comments on commit 4cbfe6c

Please sign in to comment.