From 12378aea7e2c2eefe1165131385e0669298300e2 Mon Sep 17 00:00:00 2001 From: Nick <148735107+nick-bisonai@users.noreply.github.com> Date: Wed, 28 Aug 2024 17:19:40 +0900 Subject: [PATCH] (DAL) Pooling broadcast (#2246) * feat: pooling broadcast * refactor: rename function name * fix: update based on feedback * feat: increase worker count --- node/pkg/dal/hub/hub.go | 17 +++++++++-------- node/pkg/utils/pool/pool.go | 4 ++-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/node/pkg/dal/hub/hub.go b/node/pkg/dal/hub/hub.go index 1b5f69f52..696ccd944 100644 --- a/node/pkg/dal/hub/hub.go +++ b/node/pkg/dal/hub/hub.go @@ -10,6 +10,7 @@ import ( "bisonai.com/miko/node/pkg/dal/collector" dalcommon "bisonai.com/miko/node/pkg/dal/common" "bisonai.com/miko/node/pkg/dal/utils/stats" + "bisonai.com/miko/node/pkg/utils/pool" "github.com/rs/zerolog/log" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" @@ -29,10 +30,12 @@ type Hub struct { Unregister chan *websocket.Conn broadcast map[string]chan *dalcommon.OutgoingSubmissionData mu sync.RWMutex + pool *pool.Pool } const ( MAX_CONNECTIONS = 10 + WORKER_COUNT = 100 CleanupInterval = time.Hour ) @@ -53,10 +56,13 @@ func NewHub(configs map[string]Config) *Hub { Register: make(chan *websocket.Conn), Unregister: make(chan *websocket.Conn), broadcast: make(map[string]chan *dalcommon.OutgoingSubmissionData), + pool: pool.NewPool(WORKER_COUNT), } } func (h *Hub) Start(ctx context.Context, collector *collector.Collector) { + h.pool.Run(ctx) + go h.handleClientRegistration() h.initializeBroadcastChannels(collector) @@ -151,24 +157,19 @@ func (h *Hub) broadcastDataForSymbol(ctx context.Context, symbol string) { } func (h *Hub) castSubmissionData(ctx context.Context, data *dalcommon.OutgoingSubmissionData, symbol *string) { - var wg sync.WaitGroup - h.mu.RLock() defer h.mu.RUnlock() for client, subscriptions := range h.Clients { if _, ok := subscriptions[*symbol]; ok { - wg.Add(1) - go func(entry *websocket.Conn) { - defer wg.Done() - err := wsjson.Write(ctx, entry, data) + h.pool.AddJob(func() { + err := wsjson.Write(ctx, client, data) if err != nil { log.Warn().Err(err).Msg("failed to write message to client") } - }(client) + }) } } - wg.Wait() } func (h *Hub) cleanupJob(ctx context.Context) { diff --git a/node/pkg/utils/pool/pool.go b/node/pkg/utils/pool/pool.go index a4f92a8b4..5eccbcdf6 100644 --- a/node/pkg/utils/pool/pool.go +++ b/node/pkg/utils/pool/pool.go @@ -24,11 +24,11 @@ func (p *Pool) Run(ctx context.Context) { p.IsRunning = true for i := 0; i < p.workerCount; i++ { - go p.worker(poolCtx) + go p.runWorker(poolCtx) } } -func (p *Pool) worker(ctx context.Context) { +func (p *Pool) runWorker(ctx context.Context) { for { select { case job := <-p.jobChannel: