From cbc392f925ed22af7d6829f36de77342ae39d1e1 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 14 Aug 2024 17:09:35 +0900 Subject: [PATCH] fix: potential fix for dal memory leak --- node/pkg/dal/api/controller.go | 4 +-- node/pkg/dal/api/hub.go | 46 +++++++++++++++++++++++++++++++--- node/pkg/dal/api/types.go | 8 ++++-- 3 files changed, 51 insertions(+), 7 deletions(-) diff --git a/node/pkg/dal/api/controller.go b/node/pkg/dal/api/controller.go index f0b5791fc..1090f9d50 100644 --- a/node/pkg/dal/api/controller.go +++ b/node/pkg/dal/api/controller.go @@ -74,7 +74,7 @@ func handleSubscribe(h *Hub, client *ThreadSafeClient, msg Subscription, ctx con subscriptions, ok := h.clients[client] if !ok { - subscriptions = map[string]bool{} + subscriptions = map[string]any{} } valid := []string{} @@ -83,7 +83,7 @@ func handleSubscribe(h *Hub, client *ThreadSafeClient, msg Subscription, ctx con if _, ok := h.configs[symbol]; !ok { continue } - subscriptions[symbol] = true + subscriptions[symbol] = struct{}{} valid = append(valid, param) } h.clients[client] = subscriptions diff --git a/node/pkg/dal/api/hub.go b/node/pkg/dal/api/hub.go index a4e19784b..e526d0d26 100644 --- a/node/pkg/dal/api/hub.go +++ b/node/pkg/dal/api/hub.go @@ -24,7 +24,7 @@ func HubSetup(ctx context.Context, configs []Config) *Hub { func NewHub(configs map[string]Config) *Hub { return &Hub{ configs: configs, - clients: make(map[*ThreadSafeClient]map[string]bool), + clients: make(map[*ThreadSafeClient]map[string]any), register: make(chan *ThreadSafeClient), unregister: make(chan *ThreadSafeClient), broadcast: make(map[string]chan *dalcommon.OutgoingSubmissionData), @@ -40,6 +40,8 @@ func (h *Hub) Start(ctx context.Context, collector *collector.Collector) { for symbol := range h.configs { go h.broadcastDataForSymbol(symbol) } + + go h.cleanupJob(ctx) } func (h *Hub) handleClientRegistration() { @@ -59,7 +61,7 @@ func (h *Hub) addClient(client *ThreadSafeClient) { if _, ok := h.clients[client]; ok { return } - h.clients[client] = make(map[string]bool) + h.clients[client] = make(map[string]any) ip := client.Conn.IP() if _, ok := h.connPerIP[ip]; !ok { @@ -133,7 +135,7 @@ func (h *Hub) getClientsSnapshotToNotify(symbol string) []*ThreadSafeClient { defer h.mu.RUnlock() result := []*ThreadSafeClient{} for client, subscriptions := range h.clients { - if subscriptions[symbol] { + if _, ok := subscriptions[symbol]; ok { result = append(result, client) } } @@ -183,3 +185,41 @@ func (h *Hub) castSubmissionData(data *dalcommon.OutgoingSubmissionData, symbol } wg.Wait() } + +func (h *Hub) cleanupJob(ctx context.Context) { + ticker := time.NewTicker(CleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + h.cleanup() + } + } +} + +func (h *Hub) cleanup() { + h.mu.Lock() + defer h.mu.Unlock() + + newClients := make(map[*ThreadSafeClient]map[string]any, len(h.clients)) + for client, subscriptions := range h.clients { + if len(subscriptions) > 0 { + newClients[client] = subscriptions + } else { + h.unregister <- client + } + } + h.clients = newClients + + newConnPerIP := make(map[string][]*ThreadSafeClient, len(h.connPerIP)) + for ip, clients := range h.connPerIP { + if len(clients) == 0 { + continue + } + newConnPerIP[ip] = clients + } + h.connPerIP = newConnPerIP +} diff --git a/node/pkg/dal/api/types.go b/node/pkg/dal/api/types.go index 4ad1c4e08..61a4327eb 100644 --- a/node/pkg/dal/api/types.go +++ b/node/pkg/dal/api/types.go @@ -2,12 +2,16 @@ package api import ( "sync" + "time" "bisonai.com/orakl/node/pkg/common/types" dalcommon "bisonai.com/orakl/node/pkg/dal/common" ) -const MAX_CONNECTIONS = 10 +const ( + MAX_CONNECTIONS = 10 + CleanupInterval = time.Hour +) type Config = types.Config @@ -18,7 +22,7 @@ type Subscription struct { type Hub struct { configs map[string]Config - clients map[*ThreadSafeClient]map[string]bool + clients map[*ThreadSafeClient]map[string]any register chan *ThreadSafeClient unregister chan *ThreadSafeClient broadcast map[string]chan *dalcommon.OutgoingSubmissionData