From 7ef45c8f8bc8950133e6aacbe17edd9b8b2da707 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 15:05:23 +0900 Subject: [PATCH 1/4] feat: pooling broadcast --- node/pkg/dal/hub/hub.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/node/pkg/dal/hub/hub.go b/node/pkg/dal/hub/hub.go index 1b5f69f52..3540aec70 100644 --- a/node/pkg/dal/hub/hub.go +++ b/node/pkg/dal/hub/hub.go @@ -10,6 +10,7 @@ import ( "bisonai.com/miko/node/pkg/dal/collector" dalcommon "bisonai.com/miko/node/pkg/dal/common" "bisonai.com/miko/node/pkg/dal/utils/stats" + "bisonai.com/miko/node/pkg/utils/pool" "github.com/rs/zerolog/log" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" @@ -29,10 +30,12 @@ type Hub struct { Unregister chan *websocket.Conn broadcast map[string]chan *dalcommon.OutgoingSubmissionData mu sync.RWMutex + pool *pool.Pool } const ( MAX_CONNECTIONS = 10 + WORKER_COUNT = 20 CleanupInterval = time.Hour ) @@ -53,10 +56,13 @@ func NewHub(configs map[string]Config) *Hub { Register: make(chan *websocket.Conn), Unregister: make(chan *websocket.Conn), broadcast: make(map[string]chan *dalcommon.OutgoingSubmissionData), + pool: pool.NewPool(WORKER_COUNT), } } func (h *Hub) Start(ctx context.Context, collector *collector.Collector) { + h.pool.Run(ctx) + go h.handleClientRegistration() h.initializeBroadcastChannels(collector) @@ -151,24 +157,19 @@ func (h *Hub) broadcastDataForSymbol(ctx context.Context, symbol string) { } func (h *Hub) castSubmissionData(ctx context.Context, data *dalcommon.OutgoingSubmissionData, symbol *string) { - var wg sync.WaitGroup - h.mu.RLock() defer h.mu.RUnlock() for client, subscriptions := range h.Clients { if _, ok := subscriptions[*symbol]; ok { - wg.Add(1) - go func(entry *websocket.Conn) { - defer wg.Done() - err := wsjson.Write(ctx, entry, data) + h.pool.AddJob(func() { + err := wsjson.Write(ctx, client, data) if err != nil { log.Warn().Err(err).Msg("failed to write message to client") } - }(client) + }) } } - wg.Wait() } func (h *Hub) cleanupJob(ctx context.Context) { From b2f5f98538983888b85333067b3311d6479e3b1f Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 15:17:21 +0900 Subject: [PATCH 2/4] refactor: rename function name --- node/pkg/utils/pool/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/pkg/utils/pool/pool.go b/node/pkg/utils/pool/pool.go index a4f92a8b4..33f5ab495 100644 --- a/node/pkg/utils/pool/pool.go +++ b/node/pkg/utils/pool/pool.go @@ -24,11 +24,11 @@ func (p *Pool) Run(ctx context.Context) { p.IsRunning = true for i := 0; i < p.workerCount; i++ { - go p.worker(poolCtx) + go p.work(poolCtx) } } -func (p *Pool) worker(ctx context.Context) { +func (p *Pool) work(ctx context.Context) { for { select { case job := <-p.jobChannel: From 6f8e22d99c53148a087992963a7e10d4747fda75 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 16:01:45 +0900 Subject: [PATCH 3/4] fix: update based on feedback --- node/pkg/utils/pool/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/pkg/utils/pool/pool.go b/node/pkg/utils/pool/pool.go index 33f5ab495..5eccbcdf6 100644 --- a/node/pkg/utils/pool/pool.go +++ b/node/pkg/utils/pool/pool.go @@ -24,11 +24,11 @@ func (p *Pool) Run(ctx context.Context) { p.IsRunning = true for i := 0; i < p.workerCount; i++ { - go p.work(poolCtx) + go p.runWorker(poolCtx) } } -func (p *Pool) work(ctx context.Context) { +func (p *Pool) runWorker(ctx context.Context) { for { select { case job := <-p.jobChannel: From 193f65883d254c461b06c57580f6da71af5d81fa Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 17:17:47 +0900 Subject: [PATCH 4/4] feat: increase worker count --- node/pkg/dal/hub/hub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/pkg/dal/hub/hub.go b/node/pkg/dal/hub/hub.go index 3540aec70..696ccd944 100644 --- a/node/pkg/dal/hub/hub.go +++ b/node/pkg/dal/hub/hub.go @@ -35,7 +35,7 @@ type Hub struct { const ( MAX_CONNECTIONS = 10 - WORKER_COUNT = 20 + WORKER_COUNT = 100 CleanupInterval = time.Hour )