Skip to content

Commit

Permalink
(DAL) Fix ws deadlock (#1994)
Browse files Browse the repository at this point in the history
* Revert "fix: prevent possible race condition (#1992)"

This reverts commit 303cd85.

* fix: unregister in separate thread

* fix: fix deadlock

* fix: remove unnecssary threading
  • Loading branch information
nick-bisonai authored Aug 2, 2024
1 parent 23e2232 commit e60151d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 14 deletions.
7 changes: 4 additions & 3 deletions node/pkg/dal/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,12 @@ func HandleWebsocket(conn *websocket.Conn) {
}

if msg.Method == "SUBSCRIBE" {
h.mu.Lock()
defer h.mu.Unlock()
h.mu.RLock()
subscriptions, ok := h.clients[threadSafeClient]
if !ok {
subscriptions = map[string]bool{}
}

h.mu.RUnlock()
valid := []string{}

for _, param := range msg.Params {
Expand All @@ -80,7 +79,9 @@ func HandleWebsocket(conn *websocket.Conn) {
subscriptions[symbol] = true
valid = append(valid, param)
}
h.mu.Lock()
h.clients[threadSafeClient] = subscriptions
h.mu.Unlock()
err = stats.InsertWebsocketSubscriptions(*ctx, id, valid)
if err != nil {
log.Error().Err(err).Msg("failed to insert websocket subscription log")
Expand Down
16 changes: 5 additions & 11 deletions node/pkg/dal/api/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,21 @@ func (c *Hub) handleClientRegistration() {
}

func (c *Hub) addClient(client *ThreadSafeClient) {
c.mu.RLock()
_, ok := c.clients[client]
c.mu.RUnlock()
if ok {
c.mu.Lock() // Use write lock for both checking and insertion
defer c.mu.Unlock()
if _, ok := c.clients[client]; ok {
return
}

c.mu.Lock()
defer c.mu.Unlock()
c.clients[client] = make(map[string]bool)
}

func (c *Hub) removeClient(client *ThreadSafeClient) {
c.mu.RLock()
c.mu.Lock() // Use write lock for both checking and removal
defer c.mu.Unlock()
subscriptions, ok := c.clients[client]
c.mu.RUnlock()
if !ok {
return
}

c.mu.Lock()
delete(c.clients, client)
for symbol := range subscriptions {
delete(subscriptions, symbol)
Expand Down

0 comments on commit e60151d

Please sign in to comment.