Skip to content

Commit

Permalink
add websocket ping-pong to keep connections alive
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Jan 29, 2025
1 parent ccbe311 commit 6bfea29
Showing 1 changed file with 32 additions and 29 deletions.
61 changes: 32 additions & 29 deletions src/Nostr/RelayConnection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import Effectful.State.Static.Shared (State, get, gets, modify)
import Effectful.TH
import Network.URI (URI(..), parseURI, uriAuthority, uriPort, uriRegName, uriScheme)
import Network.WebSockets qualified as WS
import Network.WebSockets.Connection.PingPong (defaultPingPongOptions, withPingPong)
import Wuss qualified as Wuss

import QtQuick
Expand Down Expand Up @@ -172,47 +173,49 @@ nostrClient :: RelayConnectionEff es => TMVar Bool -> RelayURI -> TChan Request
nostrClient connectionMVar r requestChan runE conn = runE $ do
logDebug $ "Connected to " <> r

modify @RelayPoolState $ \st ->
st { activeConnections = Map.adjust
(\d -> d { connectionState = Connected
, requestChannel = requestChan
})
r
(activeConnections st)
}
notifyRelayStatus

void $ atomically $ putTMVar connectionMVar True

updateQueue <- newTQueueIO
receiveThread <- async $ receiveLoop updateQueue
sendThread <- async $ sendLoop
void $ waitAnyCancel [receiveThread, sendThread]
modify @RelayPoolState $ \st ->
st { activeConnections = Map.adjust (\d -> d { connectionState = Disconnected }) r (activeConnections st) }
notifyRelayStatus
liftIO $ withPingPong defaultPingPongOptions conn $ \conn' -> runE $ do
modify @RelayPoolState $ \st ->
st { activeConnections = Map.adjust
(\d -> d { connectionState = Connected
, requestChannel = requestChan
})
r
(activeConnections st)
}
notifyRelayStatus

void $ atomically $ putTMVar connectionMVar True

updateQueue <- newTQueueIO
receiveThread <- async $ receiveLoop conn' updateQueue
sendThread <- async $ sendLoop conn'
void $ waitAnyCancel [receiveThread, sendThread]
modify @RelayPoolState $ \st ->
st { activeConnections = Map.adjust (\d -> d { connectionState = Disconnected }) r (activeConnections st) }
notifyRelayStatus

where
receiveLoop q = do
msg <- liftIO (try (WS.receiveData conn) :: IO (Either SomeException BSL.ByteString))
receiveLoop conn' q = do
msg <- liftIO (try (WS.receiveData conn') :: IO (Either SomeException BSL.ByteString))
case msg of
Left _ -> return () -- Exit the loop on error
Right msg' -> case eitherDecode msg' of
Right response -> do
updates <- handleResponse r response
atomically $ writeTQueue q updates
receiveLoop q
receiveLoop conn' q
Left err -> do
logError $ "Could not decode server response from " <> r <> ": " <> T.pack err
receiveLoop q
receiveLoop conn' q

sendLoop = do
sendLoop conn' = do
msg <- atomically $ readTChan requestChan
case msg of
NT.Disconnect -> do
liftIO $ WS.sendClose conn (T.pack "Bye!")
liftIO $ WS.sendClose conn' (T.pack "Bye!")
return ()
NT.SendEvent event -> do
result <- liftIO $ try @SomeException $ WS.sendTextData conn $ encode msg
result <- liftIO $ try @SomeException $ WS.sendTextData conn' $ encode msg
case result of
Left ex -> do
logError $ "Error sending data to " <> r <> ": " <> T.pack (show ex)
Expand All @@ -225,14 +228,14 @@ nostrClient connectionMVar r requestChan runE conn = runE $ do
r
(activeConnections st)
}
sendLoop
sendLoop conn'
_ -> do
result <- liftIO $ try @SomeException $ WS.sendTextData conn $ encode msg
result <- liftIO $ try @SomeException $ WS.sendTextData conn' $ encode msg
case result of
Left ex -> do
logError $ "Error sending data to " <> r <> ": " <> T.pack (show ex)
return ()
Right _ -> sendLoop
Right _ -> sendLoop conn'


-- | Handle responses.
Expand Down

0 comments on commit 6bfea29

Please sign in to comment.