Skip to content

Commit

Permalink
feat: reduce mutex lock, remove log
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 2, 2024
1 parent 238a9f9 commit 9be6d07
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
1 change: 0 additions & 1 deletion node/pkg/dal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func (c *ThreadSafeClient) WriteJSON(data any) error {
// since the only place it is called is from `HandleWebsocket` inner for loop
func (c *ThreadSafeClient) ReadJSON(data any) error {
if err := c.Conn.ReadJSON(&data); err != nil {
log.Error().Err(err).Msg("failed to read json msg")
return err
}
return nil
Expand Down
23 changes: 14 additions & 9 deletions node/pkg/dal/api/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,25 @@ func (c *Hub) broadcastDataForSymbol(symbol string) {
// pass by pointer to reduce memory copy time
func (c *Hub) castSubmissionData(data *dalcommon.OutgoingSubmissionData, symbol *string) {
var wg sync.WaitGroup
clientsToNotify := make([]*ThreadSafeClient, 0)

c.mu.Lock()
defer c.mu.Unlock()
for client, subscriptions := range c.clients {
if subscriptions[*symbol] {
wg.Add(1)
go func(entry *ThreadSafeClient) {
defer wg.Done()
if err := entry.WriteJSON(*data); err != nil {
log.Error().Err(err).Msg("failed to write message")
c.unregister <- entry
}
}(client)
clientsToNotify = append(clientsToNotify, client)
}
}
c.mu.Unlock()

for _, client := range clientsToNotify {
wg.Add(1)
go func(entry *ThreadSafeClient) {
defer wg.Done()
if err := entry.WriteJSON(*data); err != nil {
log.Error().Err(err).Msg("failed to write message")
c.unregister <- entry
}
}(client)
}
wg.Wait()
}

0 comments on commit 9be6d07

Please sign in to comment.