Skip to content

Commit

Permalink
fix: potential fix for dal memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 14, 2024
1 parent e13259c commit cbc392f
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 7 deletions.
4 changes: 2 additions & 2 deletions node/pkg/dal/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func handleSubscribe(h *Hub, client *ThreadSafeClient, msg Subscription, ctx con

subscriptions, ok := h.clients[client]
if !ok {
subscriptions = map[string]bool{}
subscriptions = map[string]any{}
}

valid := []string{}
Expand All @@ -83,7 +83,7 @@ func handleSubscribe(h *Hub, client *ThreadSafeClient, msg Subscription, ctx con
if _, ok := h.configs[symbol]; !ok {
continue
}
subscriptions[symbol] = true
subscriptions[symbol] = struct{}{}
valid = append(valid, param)
}
h.clients[client] = subscriptions
Expand Down
46 changes: 43 additions & 3 deletions node/pkg/dal/api/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func HubSetup(ctx context.Context, configs []Config) *Hub {
func NewHub(configs map[string]Config) *Hub {
return &Hub{
configs: configs,
clients: make(map[*ThreadSafeClient]map[string]bool),
clients: make(map[*ThreadSafeClient]map[string]any),
register: make(chan *ThreadSafeClient),
unregister: make(chan *ThreadSafeClient),
broadcast: make(map[string]chan *dalcommon.OutgoingSubmissionData),
Expand All @@ -40,6 +40,8 @@ func (h *Hub) Start(ctx context.Context, collector *collector.Collector) {
for symbol := range h.configs {
go h.broadcastDataForSymbol(symbol)
}

go h.cleanupJob(ctx)
}

func (h *Hub) handleClientRegistration() {
Expand All @@ -59,7 +61,7 @@ func (h *Hub) addClient(client *ThreadSafeClient) {
if _, ok := h.clients[client]; ok {
return
}
h.clients[client] = make(map[string]bool)
h.clients[client] = make(map[string]any)

ip := client.Conn.IP()
if _, ok := h.connPerIP[ip]; !ok {
Expand Down Expand Up @@ -133,7 +135,7 @@ func (h *Hub) getClientsSnapshotToNotify(symbol string) []*ThreadSafeClient {
defer h.mu.RUnlock()
result := []*ThreadSafeClient{}
for client, subscriptions := range h.clients {
if subscriptions[symbol] {
if _, ok := subscriptions[symbol]; ok {
result = append(result, client)
}
}
Expand Down Expand Up @@ -183,3 +185,41 @@ func (h *Hub) castSubmissionData(data *dalcommon.OutgoingSubmissionData, symbol
}
wg.Wait()
}

func (h *Hub) cleanupJob(ctx context.Context) {
ticker := time.NewTicker(CleanupInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
h.cleanup()
}
}
}

func (h *Hub) cleanup() {
h.mu.Lock()
defer h.mu.Unlock()

newClients := make(map[*ThreadSafeClient]map[string]any, len(h.clients))
for client, subscriptions := range h.clients {
if len(subscriptions) > 0 {
newClients[client] = subscriptions
} else {
h.unregister <- client
}
}
h.clients = newClients

newConnPerIP := make(map[string][]*ThreadSafeClient, len(h.connPerIP))
for ip, clients := range h.connPerIP {
if len(clients) == 0 {
continue
}
newConnPerIP[ip] = clients
}
h.connPerIP = newConnPerIP
}
8 changes: 6 additions & 2 deletions node/pkg/dal/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package api

import (
"sync"
"time"

"bisonai.com/orakl/node/pkg/common/types"
dalcommon "bisonai.com/orakl/node/pkg/dal/common"
)

const MAX_CONNECTIONS = 10
const (
MAX_CONNECTIONS = 10
CleanupInterval = time.Hour
)

type Config = types.Config

Expand All @@ -18,7 +22,7 @@ type Subscription struct {

type Hub struct {
configs map[string]Config
clients map[*ThreadSafeClient]map[string]bool
clients map[*ThreadSafeClient]map[string]any
register chan *ThreadSafeClient
unregister chan *ThreadSafeClient
broadcast map[string]chan *dalcommon.OutgoingSubmissionData
Expand Down

0 comments on commit cbc392f

Please sign in to comment.