diff --git a/node/pkg/dal/hub/hub.go b/node/pkg/dal/hub/hub.go index 696ccd944..1b5f69f52 100644 --- a/node/pkg/dal/hub/hub.go +++ b/node/pkg/dal/hub/hub.go @@ -10,7 +10,6 @@ import ( "bisonai.com/miko/node/pkg/dal/collector" dalcommon "bisonai.com/miko/node/pkg/dal/common" "bisonai.com/miko/node/pkg/dal/utils/stats" - "bisonai.com/miko/node/pkg/utils/pool" "github.com/rs/zerolog/log" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" @@ -30,12 +29,10 @@ type Hub struct { Unregister chan *websocket.Conn broadcast map[string]chan *dalcommon.OutgoingSubmissionData mu sync.RWMutex - pool *pool.Pool } const ( MAX_CONNECTIONS = 10 - WORKER_COUNT = 100 CleanupInterval = time.Hour ) @@ -56,13 +53,10 @@ func NewHub(configs map[string]Config) *Hub { Register: make(chan *websocket.Conn), Unregister: make(chan *websocket.Conn), broadcast: make(map[string]chan *dalcommon.OutgoingSubmissionData), - pool: pool.NewPool(WORKER_COUNT), } } func (h *Hub) Start(ctx context.Context, collector *collector.Collector) { - h.pool.Run(ctx) - go h.handleClientRegistration() h.initializeBroadcastChannels(collector) @@ -157,19 +151,24 @@ func (h *Hub) broadcastDataForSymbol(ctx context.Context, symbol string) { } func (h *Hub) castSubmissionData(ctx context.Context, data *dalcommon.OutgoingSubmissionData, symbol *string) { + var wg sync.WaitGroup + h.mu.RLock() defer h.mu.RUnlock() for client, subscriptions := range h.Clients { if _, ok := subscriptions[*symbol]; ok { - h.pool.AddJob(func() { - err := wsjson.Write(ctx, client, data) + wg.Add(1) + go func(entry *websocket.Conn) { + defer wg.Done() + err := wsjson.Write(ctx, entry, data) if err != nil { log.Warn().Err(err).Msg("failed to write message to client") } - }) + }(client) } } + wg.Wait() } func (h *Hub) cleanupJob(ctx context.Context) { diff --git a/node/pkg/utils/pool/pool.go b/node/pkg/utils/pool/pool.go index 5eccbcdf6..a4f92a8b4 100644 --- a/node/pkg/utils/pool/pool.go +++ b/node/pkg/utils/pool/pool.go @@ -24,11 +24,11 @@ func (p *Pool) Run(ctx context.Context) { p.IsRunning = true for i := 0; i < p.workerCount; i++ { - go p.runWorker(poolCtx) + go p.worker(poolCtx) } } -func (p *Pool) runWorker(ctx context.Context) { +func (p *Pool) worker(ctx context.Context) { for { select { case job := <-p.jobChannel: