diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index 5284ad7e8..0cb4503c0 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -33,7 +33,7 @@ func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Co Config: config, Raft: raft.NewRaftNode(h, ps, topic, 1000, aggregateInterval), - RoundTriggers: &RoundTriggers{ + roundTriggers: &RoundTriggers{ locked: map[int32]bool{}, }, roundPrices: &RoundPrices{ @@ -101,7 +101,7 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message) log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to unmarshal trigger message") return err } - n.cleanUp(triggerMessage.RoundID - 10) + defer n.leaveOnlyLast10Entries(triggerMessage.RoundID) if triggerMessage.RoundID == 0 { log.Error().Str("Player", "Aggregator").Msg("invalid trigger message") @@ -119,14 +119,14 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message) n.mu.Unlock() } - n.RoundTriggers.mu.Lock() - defer n.RoundTriggers.mu.Unlock() + n.roundTriggers.mu.Lock() + defer n.roundTriggers.mu.Unlock() - if n.RoundTriggers.locked[triggerMessage.RoundID] { + if n.roundTriggers.locked[triggerMessage.RoundID] { log.Warn().Str("Player", "Aggregator").Int32("RoundID", triggerMessage.RoundID).Msg("trigger message already processed") return nil } - n.RoundTriggers.locked[triggerMessage.RoundID] = true + n.roundTriggers.locked[triggerMessage.RoundID] = true var value int64 localAggregate, ok := n.LatestLocalAggregates.Load(n.ID) @@ -173,7 +173,6 @@ func (n *Aggregator) HandlePriceDataMessage(ctx context.Context, msg raft.Messag } if len(n.roundPrices.prices[priceDataMessage.RoundID]) == n.Raft.SubscribersCount()+1 { - defer delete(n.roundPrices.prices, priceDataMessage.RoundID) n.roundPrices.locked[priceDataMessage.RoundID] = true if n.Raft.GetRole() == raft.Leader { @@ -213,7 +212,6 @@ func (n *Aggregator) HandlePriceFixMessage(ctx context.Context, msg raft.Message n.roundPriceFixes.mu.Lock() defer n.roundPriceFixes.mu.Unlock() - if n.roundPriceFixes.locked[priceFixMessage.RoundID] { log.Warn().Str("Player", "Aggregator").Int32("RoundID", priceFixMessage.RoundID).Msg("price fix message already processed") return nil @@ -284,7 +282,6 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e if err != nil { log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to publish global aggregate and proof") } - n.cleanUp(proofMessage.RoundID - 10) }(ctx, globalAggregate, proof) } @@ -380,9 +377,9 @@ func (n *Aggregator) PublishProofMessage(ctx context.Context, roundId int32, val return n.Raft.PublishMessage(ctx, message) } -func (n *Aggregator) cleanUp(roundID int32) { - n.RoundTriggers.cleanup(roundID) - n.roundPrices.cleanup(roundID) - n.roundPriceFixes.cleanup(roundID) - n.roundProofs.cleanup(roundID) +func (n *Aggregator) leaveOnlyLast10Entries(roundID int32) { + n.roundTriggers.leaveOnlyLast10Entries(roundID) + n.roundPrices.leaveOnlyLast10Entries(roundID) + n.roundPriceFixes.leaveOnlyLast10Entries(roundID) + n.roundProofs.leaveOnlyLast10Entries(roundID) } diff --git a/node/pkg/aggregator/types.go b/node/pkg/aggregator/types.go index 42622a689..b6de70aae 100644 --- a/node/pkg/aggregator/types.go +++ b/node/pkg/aggregator/types.go @@ -58,10 +58,19 @@ type RoundTriggers struct { mu sync.Mutex } -func (r *RoundTriggers) cleanup(roundID int32) { +func (r *RoundTriggers) leaveOnlyLast10Entries(roundID int32) { r.mu.Lock() defer r.mu.Unlock() - delete(r.locked, roundID) + + newLocked := make(map[int32]bool) + + for i := roundID; i > roundID-10; i-- { + if val, exists := r.locked[i]; exists { + newLocked[i] = val + } + } + + r.locked = newLocked } type RoundPrices struct { @@ -80,12 +89,33 @@ func (r *RoundPrices) isReplay(roundID int32, sender string) bool { return false } -func (r *RoundPrices) cleanup(roundID int32) { +func (r *RoundPrices) leaveOnlyLast10Entries(roundID int32) { r.mu.Lock() defer r.mu.Unlock() - delete(r.senders, roundID) - delete(r.prices, roundID) - delete(r.locked, roundID) + + newLocked := make(map[int32]bool) + for i := roundID; i > roundID-10; i-- { + if val, exists := r.locked[i]; exists { + newLocked[i] = val + } + } + r.locked = newLocked + + newPrices := make(map[int32][]int64) + for i := roundID; i > roundID-10; i-- { + if val, exists := r.prices[i]; exists { + newPrices[i] = val + } + } + r.prices = newPrices + + newSenders := make(map[int32][]string) + for i := roundID; i > roundID-10; i-- { + if val, exists := r.senders[i]; exists { + newSenders[i] = val + } + } + r.senders = newSenders } type RoundPriceFixes struct { @@ -93,10 +123,17 @@ type RoundPriceFixes struct { mu sync.Mutex } -func (r *RoundPriceFixes) cleanup(roundID int32) { +func (r *RoundPriceFixes) leaveOnlyLast10Entries(roundID int32) { r.mu.Lock() defer r.mu.Unlock() - delete(r.locked, roundID) + + newLocked := make(map[int32]bool) + for i := roundID; i > roundID-10; i-- { + if val, exists := r.locked[i]; exists { + newLocked[i] = val + } + } + r.locked = newLocked } type RoundProofs struct { @@ -115,12 +152,33 @@ func (r *RoundProofs) isReplay(roundID int32, sender string) bool { return false } -func (r *RoundProofs) cleanup(roundID int32) { +func (r *RoundProofs) leaveOnlyLast10Entries(roundID int32) { r.mu.Lock() defer r.mu.Unlock() - delete(r.senders, roundID) - delete(r.proofs, roundID) - delete(r.locked, roundID) + + newLocked := make(map[int32]bool) + for i := roundID; i > roundID-10; i-- { + if val, exists := r.locked[i]; exists { + newLocked[i] = val + } + } + r.locked = newLocked + + newProofs := make(map[int32][][]byte) + for i := roundID; i > roundID-10; i-- { + if val, exists := r.proofs[i]; exists { + newProofs[i] = val + } + } + r.proofs = newProofs + + newSenders := make(map[int32][]string) + for i := roundID; i > roundID-10; i-- { + if val, exists := r.senders[i]; exists { + newSenders[i] = val + } + } + r.senders = newSenders } type Aggregator struct { @@ -128,7 +186,7 @@ type Aggregator struct { Raft *raft.Raft LatestLocalAggregates *LatestLocalAggregates - RoundTriggers *RoundTriggers + roundTriggers *RoundTriggers roundPrices *RoundPrices roundPriceFixes *RoundPriceFixes roundProofs *RoundProofs diff --git a/node/pkg/dal/api/hub.go b/node/pkg/dal/api/hub.go index e616af62b..975c67960 100644 --- a/node/pkg/dal/api/hub.go +++ b/node/pkg/dal/api/hub.go @@ -80,11 +80,14 @@ func (h *Hub) addClient(client *ThreadSafeClient) { delete(subscriptions, symbol) } h.connPerIP[ip] = h.connPerIP[ip][1:] - oldConn.WriteControl( + err := oldConn.WriteControl( websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "too many connections"), time.Now().Add(time.Second), ) + if err != nil { + log.Warn().Err(err).Msg("failed to write close message") + } oldConn.Close() } } diff --git a/node/pkg/raft/raft.go b/node/pkg/raft/raft.go index 7cc32d770..783cb1b1a 100644 --- a/node/pkg/raft/raft.go +++ b/node/pkg/raft/raft.go @@ -50,7 +50,12 @@ func (r *Raft) Run(ctx context.Context) { for { select { case msg := <-r.MessageBuffer: - go r.handleMessage(ctx, msg) + go func(Message) { + err := r.handleMessage(ctx, msg) + if err != nil { + log.Error().Err(err).Str("Player", "Raft").Msg("failed to handle message") + } + }(msg) case <-r.ElectionTimer.C: r.startElection(ctx) case <-ctx.Done():