diff --git a/grapesy/src/Network/GRPC/Client/Call.hs b/grapesy/src/Network/GRPC/Client/Call.hs index 1a5b90b3..c8c8c3f4 100644 --- a/grapesy/src/Network/GRPC/Client/Call.hs +++ b/grapesy/src/Network/GRPC/Client/Call.hs @@ -29,7 +29,9 @@ module Network.GRPC.Client.Call ( , recvInitialResponse ) where +import Control.Concurrent import Control.Concurrent.STM +import Control.Concurrent.Thread.Delay qualified as UnboundedDelays import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class @@ -104,6 +106,13 @@ data Call rpc = SupportsClientRpc rpc => Call { -- If there are still /inbound/ messages upon leaving the scope of 'withRPC' no -- exception is raised (but the call is nonetheless still closed, and the server -- handler will be informed that the client has disappeared). +-- +-- Note on timeouts: if a timeout is specified for the call (either through +-- 'callTimeout' or through 'connDefaultTimeout'), when the timeout is reached +-- the RPC is cancelled; any further attempts to receive or send messages will +-- result in a 'GrpcException' with 'GrpcDeadlineExceeded'. As per the gRPC +-- specification, this does /not/ rely on the server; this does mean that the +-- same deadline also applies if the /client/ is slow (rather than the server). withRPC :: forall rpc m a. (MonadMask m, MonadIO m, SupportsClientRpc rpc, HasCallStack) => Connection -> CallParams rpc -> Proxy rpc -> (Call rpc -> m a) -> m a @@ -153,6 +162,67 @@ startRPC conn _ callParams = do serverClosedConnection flowStart + -- The spec mandates that + -- + -- > If a server has gone past the deadline when processing a request, the + -- > client will give up and fail the RPC with the DEADLINE_EXCEEDED status. + -- + -- and also that the deadline applies when when wait-for-ready semantics is + -- used. + -- + -- We have to be careful implementing this. In particular, we definitely + -- don't want to impose the timeout on the /client/ (that is, we should not + -- force the client to exit the scope of 'withRPC' within the timeout). + -- Instead, we work a thread that cancels the RPC after the timeout expires; + -- this means that /if/ the client that attempts to communicate with the + -- server after the timeout, only then will it receive an exception. + -- + -- The thread we spawn here is cleaned up by the monitor thread (below). + -- + -- See + -- + -- o + -- o + mClientSideTimeout <- + case callTimeout callParams of + Nothing -> return Nothing + Just t -> fmap Just $ forkLabelled "grapesy:clientSideTimeout" $ do + UnboundedDelays.delay (timeoutToMicro t) + let timeout :: SomeException + timeout = toException $ GrpcException { + grpcError = GrpcDeadlineExceeded + , grpcErrorMessage = Nothing + , grpcErrorMetadata = [] + } + + -- We recognized client-side that the timeout we imposed on the server + -- has passed. Acting on this is however tricky: + -- + -- * A call to 'closeRPC' will only terminate the /outbound/ thread; + -- the idea is the inbound thread might still be reading in-flight + -- messages, and it will terminate once the last message is read or + -- the thread notices a broken connection. + -- * Unfortunately, this does not work in the timeout case: /if/ the + -- outbound thread has not yet terminated (that is, the client has + -- not yet sent their final message), then calling 'closeRPC' will + -- result in a RST_STREAM being sent to the server, which /should/ + -- result in the inbound connection being closed also, but may not, + -- in the case of a non-compliant server. + -- * Worse, if the client /did/ already send their final message, the + -- outbound thread has already terminated, no RST_STREAM will be + -- sent, and the we will continue to wait for messages from the + -- server. + -- + -- Ideally we'd inform the receiving thread that a timeout has been + -- reached and to "continue until it would block", but that is hard + -- to do. So instead we just kill the receiving thread, which means + -- that once the timeout is reached, the client will not be able to + -- receive any further messages (even if that is because the /client/ + -- was slow, rather than the server). + + void $ Thread.cancelThread (Session.channelInbound channel) timeout + closeRPC channel cancelRequest $ ExitCaseException timeout + -- Spawn a thread to monitor the connection, and close the new channel when -- the connection is closed. To prevent a memory leak by hanging on to the -- channel for the lifetime of the connection, the thread also terminates in @@ -163,6 +233,7 @@ startRPC conn _ callParams = do (Session.channelInbound channel)) `orElse` (Right <$> readTMVar connClosed) + forM_ mClientSideTimeout killThread case status of Left _ -> return () -- Channel closed before the connection Right mErr -> do diff --git a/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs b/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs index 58b63157..f7f8cdcb 100644 --- a/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs +++ b/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs @@ -5,6 +5,7 @@ module Test.Sanity.BrokenDeployments (tests) where +import Control.Concurrent import Control.Exception import Data.ByteString.Char8 qualified as BS.Strict.Char8 import Data.ByteString.UTF8 qualified as BS.Strict.UTF8 @@ -54,6 +55,9 @@ tests = testGroup "Test.Sanity.BrokenDeployments" [ , testGroup "Undefined" [ testCase "output" test_undefinedOutput ] + , testGroup "Timeout" [ + testCase "serverIgnoresTimeout" test_serverIgnoresTimeout + ] ] connParams :: Client.ConnParams @@ -380,3 +384,40 @@ test_undefinedOutput = do if isFirst then return $ throw $ DeliberateException (userError "uhoh") else return $ defMessage & #id .~ req ^. #id +{------------------------------------------------------------------------------- + Timeouts +-------------------------------------------------------------------------------} + +-- | Check that timeouts don't depend on the server +-- +-- When a timeout is set for an RPC, the server should respect it, but the +-- client should not /depend/ on the server respecting it. +-- +-- See also . +test_serverIgnoresTimeout :: Assertion +test_serverIgnoresTimeout = respondWithIO response $ \addr -> do + mResp :: Either GrpcException + (StreamElem NoMetadata (Proto PongMessage)) <- try $ + Client.withConnection connParams (Client.ServerInsecure addr) $ \conn -> + Client.withRPC conn callParams (Proxy @Ping) $ \call -> do + Client.sendFinalInput call defMessage + Client.recvOutput call + case mResp of + Left e | grpcError e == GrpcDeadlineExceeded -> + return () + Left e -> + assertFailure $ "unexpected error: " ++ show e + Right _ -> + assertFailure "Timeout did not trigger" + where + response :: IO Response + response = do + threadDelay 10_000_000 + return def + + callParams :: Client.CallParams Ping + callParams = def { + Client.callTimeout = Just $ + Client.Timeout Client.Millisecond (Client.TimeoutValue 100) + } + diff --git a/grapesy/test-grapesy/Test/Util/RawTestServer.hs b/grapesy/test-grapesy/Test/Util/RawTestServer.hs index 03362f23..a3b465fe 100644 --- a/grapesy/test-grapesy/Test/Util/RawTestServer.hs +++ b/grapesy/test-grapesy/Test/Util/RawTestServer.hs @@ -1,6 +1,7 @@ module Test.Util.RawTestServer ( -- * Raw test server respondWith + , respondWithIO -- * Abstract response type , Response(..) @@ -35,7 +36,7 @@ withTestServer server k = do ServerConfig { serverInsecure = Just $ InsecureConfig { insecureHost = Just "127.0.0.1" - , insecurePort = 0 + , insecurePort = 50051 } , serverSecure = Nothing } @@ -51,7 +52,12 @@ withTestServer server k = do -- | Server that responds with the given 'Response', independent of the request respondWith :: Response -> (Client.Address -> IO a) -> IO a -respondWith response = withTestServer $ \_req _aux respond -> +respondWith resp = respondWithIO (return resp) + +-- | Version of 'respondWith' that constructs the response +respondWithIO :: IO Response -> (Client.Address -> IO a) -> IO a +respondWithIO mkResponse = withTestServer $ \_req _aux respond -> do + response <- mkResponse respond (toHTTP2Response response) [] data Response = Response {