diff --git a/node/pkg/dal/hub/hub.go b/node/pkg/dal/hub/hub.go index 1b5f69f52..3540aec70 100644 --- a/node/pkg/dal/hub/hub.go +++ b/node/pkg/dal/hub/hub.go @@ -10,6 +10,7 @@ import ( "bisonai.com/miko/node/pkg/dal/collector" dalcommon "bisonai.com/miko/node/pkg/dal/common" "bisonai.com/miko/node/pkg/dal/utils/stats" + "bisonai.com/miko/node/pkg/utils/pool" "github.com/rs/zerolog/log" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" @@ -29,10 +30,12 @@ type Hub struct { Unregister chan *websocket.Conn broadcast map[string]chan *dalcommon.OutgoingSubmissionData mu sync.RWMutex + pool *pool.Pool } const ( MAX_CONNECTIONS = 10 + WORKER_COUNT = 20 CleanupInterval = time.Hour ) @@ -53,10 +56,13 @@ func NewHub(configs map[string]Config) *Hub { Register: make(chan *websocket.Conn), Unregister: make(chan *websocket.Conn), broadcast: make(map[string]chan *dalcommon.OutgoingSubmissionData), + pool: pool.NewPool(WORKER_COUNT), } } func (h *Hub) Start(ctx context.Context, collector *collector.Collector) { + h.pool.Run(ctx) + go h.handleClientRegistration() h.initializeBroadcastChannels(collector) @@ -151,24 +157,19 @@ func (h *Hub) broadcastDataForSymbol(ctx context.Context, symbol string) { } func (h *Hub) castSubmissionData(ctx context.Context, data *dalcommon.OutgoingSubmissionData, symbol *string) { - var wg sync.WaitGroup - h.mu.RLock() defer h.mu.RUnlock() for client, subscriptions := range h.Clients { if _, ok := subscriptions[*symbol]; ok { - wg.Add(1) - go func(entry *websocket.Conn) { - defer wg.Done() - err := wsjson.Write(ctx, entry, data) + h.pool.AddJob(func() { + err := wsjson.Write(ctx, client, data) if err != nil { log.Warn().Err(err).Msg("failed to write message to client") } - }(client) + }) } } - wg.Wait() } func (h *Hub) cleanupJob(ctx context.Context) {