diff --git a/node/pkg/dal/api/controller.go b/node/pkg/dal/api/controller.go index 72fe92f45..ab8e43b5b 100644 --- a/node/pkg/dal/api/controller.go +++ b/node/pkg/dal/api/controller.go @@ -63,12 +63,13 @@ func HandleWebsocket(conn *websocket.Conn) { } if msg.Method == "SUBSCRIBE" { - h.mu.RLock() + h.mu.Lock() + subscriptions, ok := h.clients[threadSafeClient] if !ok { subscriptions = map[string]bool{} } - h.mu.RUnlock() + valid := []string{} for _, param := range msg.Params { @@ -79,7 +80,6 @@ func HandleWebsocket(conn *websocket.Conn) { subscriptions[symbol] = true valid = append(valid, param) } - h.mu.Lock() h.clients[threadSafeClient] = subscriptions h.mu.Unlock() err = stats.InsertWebsocketSubscriptions(*ctx, id, valid) diff --git a/node/pkg/dal/api/hub.go b/node/pkg/dal/api/hub.go index 1258ce2ac..51d32705e 100644 --- a/node/pkg/dal/api/hub.go +++ b/node/pkg/dal/api/hub.go @@ -118,8 +118,8 @@ func (c *Hub) broadcastDataForSymbol(symbol string) { func (c *Hub) castSubmissionData(data *dalcommon.OutgoingSubmissionData, symbol *string) { var wg sync.WaitGroup - c.mu.RLock() - defer c.mu.RUnlock() + c.mu.Lock() + defer c.mu.Unlock() for client, subscriptions := range c.clients { if subscriptions[*symbol] { wg.Add(1) diff --git a/node/pkg/dal/api/types.go b/node/pkg/dal/api/types.go index 4b7febd2a..39f133ad5 100644 --- a/node/pkg/dal/api/types.go +++ b/node/pkg/dal/api/types.go @@ -18,7 +18,7 @@ type Hub struct { register chan *ThreadSafeClient unregister chan *ThreadSafeClient broadcast map[string]chan dalcommon.OutgoingSubmissionData - mu sync.RWMutex + mu sync.Mutex } type BulkResponse struct {