diff --git a/node/pkg/dal/api/client.go b/node/pkg/dal/api/client.go index d4c794460..3a1409b00 100644 --- a/node/pkg/dal/api/client.go +++ b/node/pkg/dal/api/client.go @@ -29,11 +29,10 @@ func (c *ThreadSafeClient) WriteJSON(data any) error { return nil } -// even though readjson is not thread safe, it is expected not be called concurrently +// even though readjson is not thread safe, it is expected not to be called concurrently // since the only place it is called is from `HandleWebsocket` inner for loop func (c *ThreadSafeClient) ReadJSON(data any) error { if err := c.Conn.ReadJSON(&data); err != nil { - log.Error().Err(err).Msg("failed to read json msg") return err } return nil diff --git a/node/pkg/dal/api/hub.go b/node/pkg/dal/api/hub.go index a3304d356..fb0c3fd6d 100644 --- a/node/pkg/dal/api/hub.go +++ b/node/pkg/dal/api/hub.go @@ -126,6 +126,18 @@ func (h *Hub) removeClient(client *ThreadSafeClient) { } } +func (h *Hub) getClientsSnapshotToNotify(symbol string) []*ThreadSafeClient { + h.mu.RLock() + defer h.mu.RUnlock() + result := []*ThreadSafeClient{} + for client, subscriptions := range h.clients { + if subscriptions[symbol] { + result = append(result, client) + } + } + return result +} + func (h *Hub) initializeBroadcastChannels(collector *collector.Collector) { for configId, stream := range collector.OutgoingStream { symbol := h.configIdToSymbol(configId) @@ -153,22 +165,19 @@ func (h *Hub) broadcastDataForSymbol(symbol string) { } // pass by pointer to reduce memory copy time -func (c *Hub) castSubmissionData(data *dalcommon.OutgoingSubmissionData, symbol *string) { +func (h *Hub) castSubmissionData(data *dalcommon.OutgoingSubmissionData, symbol *string) { var wg sync.WaitGroup - - c.mu.Lock() - defer c.mu.Unlock() - for client, subscriptions := range c.clients { - if subscriptions[*symbol] { - wg.Add(1) - go func(entry *ThreadSafeClient) { - defer wg.Done() - if err := entry.WriteJSON(*data); err != nil { - log.Error().Err(err).Msg("failed to write message") - c.unregister <- entry - } - }(client) - } + clientsToNotify := h.getClientsSnapshotToNotify(*symbol) + + for _, client := range clientsToNotify { + wg.Add(1) + go func(entry *ThreadSafeClient) { + defer wg.Done() + if err := entry.WriteJSON(*data); err != nil { + log.Error().Err(err).Msg("failed to write message") + h.unregister <- entry + } + }(client) } wg.Wait() } diff --git a/node/pkg/dal/api/types.go b/node/pkg/dal/api/types.go index fcfa451c2..7cf479cb8 100644 --- a/node/pkg/dal/api/types.go +++ b/node/pkg/dal/api/types.go @@ -21,7 +21,7 @@ type Hub struct { unregister chan *ThreadSafeClient broadcast map[string]chan dalcommon.OutgoingSubmissionData connPerIP map[string][]*ThreadSafeClient - mu sync.Mutex + mu sync.RWMutex }