-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean up client response payloads on error to free up connections #2710
Conversation
725f101
to
f742cf8
Compare
|
||
// This filter cleans up tracked and discarded message payloads. | ||
currClientFilterFactory = appendFilter(currClientFilterFactory, | ||
HttpMessageDiscardWatchdogClientFilter.INSTANCE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The watching filter should be added as the last connection-level filter at connectionFilterFactory
. That way it can intercept the original publisher before it's returned to any user-defined filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my updated code - I tried that but the tests are failing .. maybe I put it in the wrong spot? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the initial spot is perfect. But it affected some other state. There are 2 stages that rely on full response being consumed:
- connection state
- LoadBalancedStreamingHttpClient state
After we moved "catch" logic to the connection filter, it intercepts original payload body publisher, but it doesn't see the result of liftSync
operator applied by LoadBalancedStreamingHttpClient
. Something similar to what we observed on the server-side when users have BeforeFinallyHttpOperator
in the middle of the chain.
It seems like we need 2 "catch" points:
- as the last connection filter for the initial catch
- as the last client filter that will override the message body publisher inside the atomic reference if response reached client filter successfully.
That way we can at least guarantee that we release at both levels.
...tp-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java
Outdated
Show resolved
Hide resolved
...tp-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java
Outdated
Show resolved
Hide resolved
if (message != null) { | ||
// No-one subscribed to the message (or there is none), so if there is a message | ||
// proactively clean it up. | ||
return message.ignoreElements().concat(Single.failed(originalThrowable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with concat
is that if ignoreElements
fails, we loose originalThrowable
. Consider using a similar trick I used in proxy LB factory to preserve the originalThrowable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me understand what you mean here exactly? As in: dropping the error from ignoreElements
or adding one as the cause from the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recovering from any error, propagating the originalThrowable
, and suppressing any new errors. Something similar to what I had here: https://github.com/apple/servicetalk/pull/2697/files#diff-740b3f38328018b08090bca154c032069df66494a187fc201ad50b5c9693a464R178
...tp-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java
Outdated
Show resolved
Hide resolved
...etty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilterTest.java
Outdated
Show resolved
Hide resolved
...etty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilterTest.java
Show resolved
Hide resolved
|
||
// This filter cleans up tracked and discarded message payloads. | ||
currClientFilterFactory = appendFilter(currClientFilterFactory, | ||
HttpMessageDiscardWatchdogClientFilter.INSTANCE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the initial spot is perfect. But it affected some other state. There are 2 stages that rely on full response being consumed:
- connection state
- LoadBalancedStreamingHttpClient state
After we moved "catch" logic to the connection filter, it intercepts original payload body publisher, but it doesn't see the result of liftSync
operator applied by LoadBalancedStreamingHttpClient
. Something similar to what we observed on the server-side when users have BeforeFinallyHttpOperator
in the middle of the chain.
It seems like we need 2 "catch" points:
- as the last connection filter for the initial catch
- as the last client filter that will override the message body publisher inside the atomic reference if response reached client filter successfully.
That way we can at least guarantee that we release at both levels.
if (message != null) { | ||
// No-one subscribed to the message (or there is none), so if there is a message | ||
// proactively clean it up. | ||
return message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider logging a warning for users to clarify that they lost a reference to response payload body that had to be drained. Ideally, we want users to implement correct filters that clean up the state before propagating an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is happening on error, do they even have a chance to clean it up if an exception bubbles up and they never get a reference to the message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In such cases, an exception is happening in a user filter. Their responsibility is to clean up the state before propagating an exception. For example, instead of
response.flatMap(r -> Single.failed(...));
they suppose to return:
response.flatMap(r -> r.messageBody().ignoreElements().concat(Single.failed(...)));
a481091
to
93a8015
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
...tp-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java
Outdated
Show resolved
Hide resolved
...etty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilterTest.java
Outdated
Show resolved
Hide resolved
Motivation ---------- When a client makes a request and during response processing an exception is thrown, the exception bubbles up to the client but the message payload body is then not available to be consumed. In this case, the underlying pooled connection will not be freed up and a new connection is created. Modifications ------------- By tracking the message payloads when responses are bubbled up through the filter chain, we can detect if an error happens and if detected the message payload body is being proactively drained. This will free up the connection again so it can be reused by a subsequent request. Result ------ More correct handling of message payload bodies if an exception happens during response filter processing.
8e977fb
to
3fcbc64
Compare
Motivation
When a client makes a request and during response processing an exception is thrown, the exception bubbles up to the client but the message payload body is then not available to be consumed.
In this case, the underlying pooled connection will not be freed up and a new connection is created.
Modifications
By tracking the message payloads when responses are bubbled up through the filter chain, we can detect if an error happens and if detected the message payload body is being proactively drained.
This will free up the connection again so it can be reused by a subsequent request.
Result
More correct handling of message payload bodies if an exception happens during response filter processing.