Skip to content

Commit

Permalink
feat: use Mutex instead of RWMutex (#2000)
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai authored Aug 2, 2024
1 parent ef542cd commit d1641c9
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
6 changes: 3 additions & 3 deletions node/pkg/dal/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ func HandleWebsocket(conn *websocket.Conn) {
}

if msg.Method == "SUBSCRIBE" {
h.mu.RLock()
h.mu.Lock()

subscriptions, ok := h.clients[threadSafeClient]
if !ok {
subscriptions = map[string]bool{}
}
h.mu.RUnlock()

valid := []string{}

for _, param := range msg.Params {
Expand All @@ -79,7 +80,6 @@ func HandleWebsocket(conn *websocket.Conn) {
subscriptions[symbol] = true
valid = append(valid, param)
}
h.mu.Lock()
h.clients[threadSafeClient] = subscriptions
h.mu.Unlock()
err = stats.InsertWebsocketSubscriptions(*ctx, id, valid)
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/dal/api/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func (c *Hub) broadcastDataForSymbol(symbol string) {
func (c *Hub) castSubmissionData(data *dalcommon.OutgoingSubmissionData, symbol *string) {
var wg sync.WaitGroup

c.mu.RLock()
defer c.mu.RUnlock()
c.mu.Lock()
defer c.mu.Unlock()
for client, subscriptions := range c.clients {
if subscriptions[*symbol] {
wg.Add(1)
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/dal/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Hub struct {
register chan *ThreadSafeClient
unregister chan *ThreadSafeClient
broadcast map[string]chan dalcommon.OutgoingSubmissionData
mu sync.RWMutex
mu sync.Mutex
}

type BulkResponse struct {
Expand Down

0 comments on commit d1641c9

Please sign in to comment.