Skip to content

Commit

Permalink
Better armeria
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Jan 24, 2025
1 parent e363385 commit 8ee1e4a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,23 @@ abstract class AbstractArmeriaBackend[F[_], S <: Streams[S]](
// #1987: see the comments in HttpClientAsyncBackend
protected def ensureOnAbnormal[T](effect: F[T])(finalizer: => F[Unit]): F[T]

override def send[T](request: GenericRequest[T, R]): F[Response[T]] =
monad.suspend(adjustExceptions(request)(execute(request)))
override def send[T](request: GenericRequest[T, R]): F[Response[T]] = {
// #1987: see the comments in HttpClientAsyncBackend
val armeriaCtx = new AtomicReference[ClientRequestContext]()
ensureOnAbnormal {
monad.suspend(adjustExceptions(request)(execute(request, armeriaCtx)))
} {
monad.eval {
val ctx = armeriaCtx.get()
if (ctx != null) ctx.cancel()
}
}
}

private def execute[T](request: GenericRequest[T, R]): F[Response[T]] = {
private def execute[T](
request: GenericRequest[T, R],
armeriaCtx: AtomicReference[ClientRequestContext]
): F[Response[T]] = {
val captor = Clients.newContextCaptor()
try {
val armeriaRes = requestToArmeria(request).execute()
Expand All @@ -87,8 +100,8 @@ abstract class AbstractArmeriaBackend[F[_], S <: Streams[S]](
noopCanceler
}
case Success(ctx) =>
// #1987: see the comments in HttpClientAsyncBackend
ensureOnAbnormal(fromArmeriaResponse(request, armeriaRes, ctx))(monad.eval(ctx.cancel()))
armeriaCtx.set(ctx)
fromArmeriaResponse(request, armeriaRes, ctx)
}
} catch {
case NonFatal(ex) => monad.error(ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ abstract class HttpClientAsyncBackend[F[_], S <: Streams[S], BH, B](
* is consumed. This is done only in case of failure, as in case of success the body is either fully consumed as
* specified in the response description, or when an `...Unsafe` response description is used, it's up to the user to
* consume it.
*
* Any exceptions that occur while running `finalizer` should be added as suppressed or logged, not impacting the
* outcome of `effect`. If possible, `finalizer` should not be run in a cancellable way.
*/
protected def ensureOnAbnormal[T](effect: F[T])(finalizer: => F[Unit]): F[T]

Expand Down

0 comments on commit 8ee1e4a

Please sign in to comment.