Skip to content

Commit

Permalink
fix: fix memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 7, 2024
1 parent e4c1a66 commit 50d8879
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 29 deletions.
25 changes: 11 additions & 14 deletions node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Co
Config: config,
Raft: raft.NewRaftNode(h, ps, topic, 1000, aggregateInterval),

RoundTriggers: &RoundTriggers{
roundTriggers: &RoundTriggers{
locked: map[int32]bool{},
},
roundPrices: &RoundPrices{
Expand Down Expand Up @@ -101,7 +101,7 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message)
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to unmarshal trigger message")
return err
}
n.cleanUp(triggerMessage.RoundID - 10)
defer n.leaveOnlyLast10Entries(triggerMessage.RoundID)

if triggerMessage.RoundID == 0 {
log.Error().Str("Player", "Aggregator").Msg("invalid trigger message")
Expand All @@ -119,14 +119,14 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message)
n.mu.Unlock()
}

n.RoundTriggers.mu.Lock()
defer n.RoundTriggers.mu.Unlock()
n.roundTriggers.mu.Lock()
defer n.roundTriggers.mu.Unlock()

if n.RoundTriggers.locked[triggerMessage.RoundID] {
if n.roundTriggers.locked[triggerMessage.RoundID] {
log.Warn().Str("Player", "Aggregator").Int32("RoundID", triggerMessage.RoundID).Msg("trigger message already processed")
return nil
}
n.RoundTriggers.locked[triggerMessage.RoundID] = true
n.roundTriggers.locked[triggerMessage.RoundID] = true

var value int64
localAggregate, ok := n.LatestLocalAggregates.Load(n.ID)
Expand Down Expand Up @@ -173,7 +173,6 @@ func (n *Aggregator) HandlePriceDataMessage(ctx context.Context, msg raft.Messag
}

if len(n.roundPrices.prices[priceDataMessage.RoundID]) == n.Raft.SubscribersCount()+1 {
defer delete(n.roundPrices.prices, priceDataMessage.RoundID)
n.roundPrices.locked[priceDataMessage.RoundID] = true

if n.Raft.GetRole() == raft.Leader {
Expand Down Expand Up @@ -213,7 +212,6 @@ func (n *Aggregator) HandlePriceFixMessage(ctx context.Context, msg raft.Message

n.roundPriceFixes.mu.Lock()
defer n.roundPriceFixes.mu.Unlock()

if n.roundPriceFixes.locked[priceFixMessage.RoundID] {
log.Warn().Str("Player", "Aggregator").Int32("RoundID", priceFixMessage.RoundID).Msg("price fix message already processed")
return nil
Expand Down Expand Up @@ -284,7 +282,6 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to publish global aggregate and proof")
}
n.cleanUp(proofMessage.RoundID - 10)
}(ctx, globalAggregate, proof)

}
Expand Down Expand Up @@ -380,9 +377,9 @@ func (n *Aggregator) PublishProofMessage(ctx context.Context, roundId int32, val
return n.Raft.PublishMessage(ctx, message)
}

func (n *Aggregator) cleanUp(roundID int32) {
n.RoundTriggers.cleanup(roundID)
n.roundPrices.cleanup(roundID)
n.roundPriceFixes.cleanup(roundID)
n.roundProofs.cleanup(roundID)
func (n *Aggregator) leaveOnlyLast10Entries(roundID int32) {
n.roundTriggers.leaveOnlyLast10Entries(roundID)
n.roundPrices.leaveOnlyLast10Entries(roundID)
n.roundPriceFixes.leaveOnlyLast10Entries(roundID)
n.roundProofs.leaveOnlyLast10Entries(roundID)
}
84 changes: 71 additions & 13 deletions node/pkg/aggregator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,19 @@ type RoundTriggers struct {
mu sync.Mutex
}

func (r *RoundTriggers) cleanup(roundID int32) {
func (r *RoundTriggers) leaveOnlyLast10Entries(roundID int32) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.locked, roundID)

newLocked := make(map[int32]bool)

for i := roundID; i > roundID-10; i-- {
if val, exists := r.locked[i]; exists {
newLocked[i] = val
}
}

r.locked = newLocked
}

type RoundPrices struct {
Expand All @@ -80,23 +89,51 @@ func (r *RoundPrices) isReplay(roundID int32, sender string) bool {
return false
}

func (r *RoundPrices) cleanup(roundID int32) {
func (r *RoundPrices) leaveOnlyLast10Entries(roundID int32) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.senders, roundID)
delete(r.prices, roundID)
delete(r.locked, roundID)

newLocked := make(map[int32]bool)
for i := roundID; i > roundID-10; i-- {
if val, exists := r.locked[i]; exists {
newLocked[i] = val
}
}
r.locked = newLocked

newPrices := make(map[int32][]int64)
for i := roundID; i > roundID-10; i-- {
if val, exists := r.prices[i]; exists {
newPrices[i] = val
}
}
r.prices = newPrices

newSenders := make(map[int32][]string)
for i := roundID; i > roundID-10; i-- {
if val, exists := r.senders[i]; exists {
newSenders[i] = val
}
}
r.senders = newSenders
}

type RoundPriceFixes struct {
locked map[int32]bool
mu sync.Mutex
}

func (r *RoundPriceFixes) cleanup(roundID int32) {
func (r *RoundPriceFixes) leaveOnlyLast10Entries(roundID int32) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.locked, roundID)

newLocked := make(map[int32]bool)
for i := roundID; i > roundID-10; i-- {
if val, exists := r.locked[i]; exists {
newLocked[i] = val
}
}
r.locked = newLocked
}

type RoundProofs struct {
Expand All @@ -115,20 +152,41 @@ func (r *RoundProofs) isReplay(roundID int32, sender string) bool {
return false
}

func (r *RoundProofs) cleanup(roundID int32) {
func (r *RoundProofs) leaveOnlyLast10Entries(roundID int32) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.senders, roundID)
delete(r.proofs, roundID)
delete(r.locked, roundID)

newLocked := make(map[int32]bool)
for i := roundID; i > roundID-10; i-- {
if val, exists := r.locked[i]; exists {
newLocked[i] = val
}
}
r.locked = newLocked

newProofs := make(map[int32][][]byte)
for i := roundID; i > roundID-10; i-- {
if val, exists := r.proofs[i]; exists {
newProofs[i] = val
}
}
r.proofs = newProofs

newSenders := make(map[int32][]string)
for i := roundID; i > roundID-10; i-- {
if val, exists := r.senders[i]; exists {
newSenders[i] = val
}
}
r.senders = newSenders
}

type Aggregator struct {
Config
Raft *raft.Raft

LatestLocalAggregates *LatestLocalAggregates
RoundTriggers *RoundTriggers
roundTriggers *RoundTriggers
roundPrices *RoundPrices
roundPriceFixes *RoundPriceFixes
roundProofs *RoundProofs
Expand Down
5 changes: 4 additions & 1 deletion node/pkg/dal/api/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,14 @@ func (h *Hub) addClient(client *ThreadSafeClient) {
delete(subscriptions, symbol)
}
h.connPerIP[ip] = h.connPerIP[ip][1:]
oldConn.WriteControl(
err := oldConn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "too many connections"),
time.Now().Add(time.Second),
)
if err != nil {
log.Warn().Err(err).Msg("failed to write close message")
}
oldConn.Close()
}
}
Expand Down
7 changes: 6 additions & 1 deletion node/pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ func (r *Raft) Run(ctx context.Context) {
for {
select {
case msg := <-r.MessageBuffer:
go r.handleMessage(ctx, msg)
go func(Message) {
err := r.handleMessage(ctx, msg)
if err != nil {
log.Error().Err(err).Str("Player", "Raft").Msg("failed to handle message")
}
}(msg)
case <-r.ElectionTimer.C:
r.startElection(ctx)
case <-ctx.Done():
Expand Down

0 comments on commit 50d8879

Please sign in to comment.