diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java new file mode 100644 index 0000000000..15bb971707 --- /dev/null +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java @@ -0,0 +1,100 @@ +package io.servicetalk.grpc; + +import com.apple.servicetalkleak.Message; +import com.apple.servicetalkleak.ServiceTalkLeak; +import io.netty.buffer.ByteBufUtil; +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.grpc.api.GrpcServiceContext; +import io.servicetalk.grpc.api.GrpcStatusCode; +import io.servicetalk.grpc.api.GrpcStatusException; +import io.servicetalk.grpc.netty.GrpcClients; +import io.servicetalk.grpc.netty.GrpcServers; +import io.servicetalk.http.netty.HttpProtocolConfigs; +import io.servicetalk.http.netty.SpliceFlatStreamToMetaSingle; +import io.servicetalk.logging.api.LogLevel; +import io.servicetalk.transport.api.HostAndPort; +import io.servicetalk.transport.api.IoExecutor; +import io.servicetalk.transport.netty.internal.NettyIoExecutors; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class LeakRepro { + + private static final Logger LOGGER = LoggerFactory.getLogger(LeakRepro.class); + + static boolean leakDetected = false; + + static { + System.setProperty("io.servicetalk.http.netty.leakdetection", "strict"); + System.setProperty("io.netty.leakDetection.level", "paranoid"); + ByteBufUtil.setLeakListener((type, records) -> { + leakDetected = true; + LOGGER.error("ByteBuf leak detected!"); + }); + } + + IoExecutor serverExecutor = NettyIoExecutors.createIoExecutor(1, "server"); + IoExecutor clientExecutor = NettyIoExecutors.createIoExecutor(1, "client"); + + @SuppressWarnings("resource") + @Test + public void testLeak() throws Exception { + GrpcServers.forPort(8888) + .initializeHttp(b -> b + .ioExecutor(serverExecutor) + .executor(serverExecutor)) + .listenAndAwait(new ServiceTalkLeak.ServiceTalkLeakService() { + @Override + public Publisher rpc(GrpcServiceContext ctx, Publisher request) { + Publisher response = splice(request) + .flatMapPublisher(pair -> { + LOGGER.info("Initial message: " + pair.head); + return Publisher.failed(new GrpcStatusException(GrpcStatusCode.INVALID_ARGUMENT.status())); + }); + return response; + } + }); + + ServiceTalkLeak.ServiceTalkLeakClient client = GrpcClients.forAddress(HostAndPort.of("127.0.0.1", 8888)) + .initializeHttp(b -> b + .protocols(HttpProtocolConfigs.h2().enableFrameLogging("CLIENT", LogLevel.INFO, () -> true).build()) + .ioExecutor(clientExecutor) + .executor(clientExecutor)) + .build(new ServiceTalkLeak.ClientFactory()); + + for (int i = 0; i < 10; i++) { + LOGGER.info("Iteration {}", i); + blockingInvocation( + client.rpc( + Publisher.from( + Message.newBuilder().setValue("first message").build(), + Message.newBuilder().setValue("second message (which leaks)").build())) + .ignoreElements() + .onErrorComplete()); + + System.gc(); + System.runFinalization(); + } + + assertFalse(leakDetected); + } + + private static Single splice(Publisher request) { + return request.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Pair::new)); + } + + private static final class Pair { + final Message head; + final Publisher stream; + + public Pair(Message head, Publisher stream) { + this.head = head; + this.stream = stream; + } + } +} \ No newline at end of file diff --git a/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto b/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto new file mode 100644 index 0000000000..e17934d3d7 --- /dev/null +++ b/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "com.apple.servicetalkleak"; + +message Message { + string value = 1; +} + +service ServiceTalkLeak { + rpc Rpc(stream Message) returns (stream Message); +} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java index c0bc77797f..2e38eace0f 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java @@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicReference; import static io.servicetalk.http.netty.HttpMessageDiscardWatchdogServiceFilter.generifyAtomicReference; +import static io.servicetalk.http.netty.WatchdogLeakDetector.REQUEST_LEAK_MESSAGE; +import static io.servicetalk.http.netty.WatchdogLeakDetector.RESPONSE_LEAK_MESSAGE; /** * Filter which tracks message bodies and warns if they are not discarded properly. @@ -67,6 +69,17 @@ public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnect return new StreamingHttpConnectionFilter(connection) { @Override public Single request(final StreamingHttpRequest request) { + return WatchdogLeakDetector.strictDetection() ? requestStrict(request) : requestSimple(request); + } + + private Single requestStrict(final StreamingHttpRequest request) { + return delegate().request(request.transformMessageBody(publisher -> + WatchdogLeakDetector.instrument(publisher, REQUEST_LEAK_MESSAGE))) + .map(response -> response.transformMessageBody(publisher -> + WatchdogLeakDetector.instrument(publisher, RESPONSE_LEAK_MESSAGE))); + } + + private Single requestSimple(final StreamingHttpRequest request) { return delegate().request(request).map(response -> { // always write the buffer publisher into the request context. When a downstream subscriber // arrives, mark the message as subscribed explicitly (having a message present and no @@ -78,10 +91,7 @@ public Single request(final StreamingHttpRequest request) // If a previous message exists, the Single got resubscribed to // (i.e. during a retry) and so previous message body needs to be cleaned up by the // user. - LOGGER.warn("Discovered un-drained HTTP response message body which has " + - "been dropped by user code - this is a strong indication of a bug " + - "in a user-defined filter. Response payload (message) body must " + - "be fully consumed before retrying."); + LOGGER.warn(RESPONSE_LEAK_MESSAGE); } return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { @@ -112,10 +122,7 @@ protected Single request(final StreamingHttpRequester del if (maybePublisher != null && maybePublisher.getAndSet(null) != null) { // No-one subscribed to the message (or there is none), so if there is a message // tell the user to clean it up. - LOGGER.warn("Discovered un-drained HTTP response message body which has " + - "been dropped by user code - this is a strong indication of a bug " + - "in a user-defined filter. Response payload (message) body must " + - "be fully consumed before discarding.", cause); + LOGGER.warn(RESPONSE_LEAK_MESSAGE, cause); } }); } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java index 875acf0291..c54389215c 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java @@ -39,6 +39,9 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import static io.servicetalk.http.netty.WatchdogLeakDetector.REQUEST_LEAK_MESSAGE; +import static io.servicetalk.http.netty.WatchdogLeakDetector.RESPONSE_LEAK_MESSAGE; + /** * Filter which tracks message bodies and warns if they are not discarded properly. */ @@ -46,6 +49,7 @@ final class HttpMessageDiscardWatchdogServiceFilter implements StreamingHttpServ private static final Logger LOGGER = LoggerFactory.getLogger(HttpMessageDiscardWatchdogServiceFilter.class); + /** * Instance of {@link HttpMessageDiscardWatchdogServiceFilter}. */ @@ -69,8 +73,26 @@ private HttpMessageDiscardWatchdogServiceFilter() { public StreamingHttpServiceFilter create(final StreamingHttpService service) { return new StreamingHttpServiceFilter(service) { + @Override public Single handle(final HttpServiceContext ctx, + final StreamingHttpRequest request, + final StreamingHttpResponseFactory responseFactory) { + return WatchdogLeakDetector.strictDetection() ? + handleStrict(ctx, request, responseFactory) : handleSimple(ctx, request, responseFactory); + } + + private Single handleStrict(final HttpServiceContext ctx, + final StreamingHttpRequest request, + final StreamingHttpResponseFactory responseFactory) { + return delegate() + .handle(ctx, request.transformMessageBody(publisher -> + WatchdogLeakDetector.instrument(publisher, REQUEST_LEAK_MESSAGE)), responseFactory) + .map(response -> response.transformMessageBody(publisher -> + WatchdogLeakDetector.instrument(publisher, RESPONSE_LEAK_MESSAGE))); + } + + private Single handleSimple(final HttpServiceContext ctx, final StreamingHttpRequest request, final StreamingHttpResponseFactory responseFactory) { return delegate() @@ -86,10 +108,7 @@ public Single handle(final HttpServiceContext ctx, // If a previous message exists, the Single got resubscribed to // (i.e. during a retry) and so previous message body needs to be cleaned up by the // user. - LOGGER.warn("Discovered un-drained HTTP response message body which has " + - "been dropped by user code - this is a strong indication of a bug " + - "in a user-defined filter. Responses (or their message body) must " + - "be fully consumed before retrying."); + LOGGER.warn(RESPONSE_LEAK_MESSAGE); } return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { @@ -173,10 +192,7 @@ public void onExchangeFinally() { if (maybePublisher != null && maybePublisher.get() != null) { // No-one subscribed to the message (or there is none), so if there is a message // tell the user to clean it up. - LOGGER.warn("Discovered un-drained HTTP response message body which has " + - "been dropped by user code - this is a strong indication of a bug " + - "in a user-defined filter. Responses (or their message body) must " + - "be fully consumed before discarding."); + LOGGER.warn(RESPONSE_LEAK_MESSAGE); } } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java index 3c0025a2e3..0e29512777 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java @@ -52,7 +52,8 @@ * @param type of meta-data in front of the stream of {@link Payload}, eg. {@link HttpResponseMetaData} * @param type of payload inside the {@link Data}, eg. {@link Buffer} */ -final class SpliceFlatStreamToMetaSingle implements PublisherToSingleOperator { +// TODO: revert: this shouldn't be public. +public final class SpliceFlatStreamToMetaSingle implements PublisherToSingleOperator { private static final Logger LOGGER = LoggerFactory.getLogger(SpliceFlatStreamToMetaSingle.class); private final BiFunction, Data> packer; @@ -64,7 +65,7 @@ final class SpliceFlatStreamToMetaSingle implements Pub * @param packer function to pack the {@link Publisher}<{@link Payload}> and {@link MetaData} into a * {@link Data} */ - SpliceFlatStreamToMetaSingle(BiFunction, Data> packer) { + public SpliceFlatStreamToMetaSingle(BiFunction, Data> packer) { this.packer = requireNonNull(packer); } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java new file mode 100644 index 0000000000..4ab0e70feb --- /dev/null +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java @@ -0,0 +1,196 @@ +package io.servicetalk.http.netty; + +import io.servicetalk.concurrent.Cancellable; +import io.servicetalk.concurrent.PublisherSource.Subscriber; +import io.servicetalk.concurrent.PublisherSource.Subscription; +import io.servicetalk.concurrent.api.Executors; +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.SourceAdapters; +import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +final class WatchdogLeakDetector { + + static final String REQUEST_LEAK_MESSAGE = + "Discovered un-drained HTTP request message body which has " + + "been dropped by user code - this is a strong indication of a bug " + + "in a user-defined filter. Requests (or their message body) must " + + "be fully consumed before retrying."; + + static final String RESPONSE_LEAK_MESSAGE = + "Discovered un-drained HTTP response message body which has " + + "been dropped by user code - this is a strong indication of a bug " + + "in a user-defined filter. Responses (or their message body) must " + + "be fully consumed before retrying."; + + private static final Logger LOGGER = LoggerFactory.getLogger(WatchdogLeakDetector.class); + + private static final WatchdogLeakDetector INSTANCE = new WatchdogLeakDetector(); + + private static final String PROPERTY_NAME = "io.servicetalk.http.netty.leakdetection"; + + private static final String STRICT_MODE = "strict"; + + private static final boolean STRICT_DETECTION; + + static { + String prop = System.getProperty(PROPERTY_NAME); + STRICT_DETECTION = prop != null && prop.equalsIgnoreCase(STRICT_MODE); + } + + private final ReferenceQueue refQueue = new ReferenceQueue<>(); + private final Map, CleanupState> allRefs = new ConcurrentHashMap<>(); + + private WatchdogLeakDetector() { + // Singleton. + } + + static Publisher instrument(Publisher publisher, String message) { + return INSTANCE.instrument0(publisher, message); + } + + static boolean strictDetection() { + return STRICT_DETECTION; + } + + private Publisher instrument0(Publisher publisher, String message) { + maybeCleanRefs(); + CleanupState cleanupState = new CleanupState(publisher, message); + Publisher result = publisher.liftSync(subscriber -> new InstrumentedSubscriber<>(subscriber, cleanupState)); + Reference ref = new WeakReference<>(result, refQueue); + allRefs.put(ref, cleanupState); + return result; + } + + private void maybeCleanRefs() { + final Reference testRef = refQueue.poll(); + if (testRef != null) { + // There are references to be cleaned but don't do it on this thread. + // TODO: what executor should we really use? + Executors.global().submit(() -> { + Reference ref = testRef; + do { + ref.clear(); + CleanupState cleanupState = allRefs.remove(ref); + if (cleanupState != null) { + cleanupState.check(); + } + } while ((ref = refQueue.poll()) != null); + }); + } + + + } + + private static final class InstrumentedSubscriber implements Subscriber { + + private final Subscriber delegate; + private final CleanupState cleanupToken; + + public InstrumentedSubscriber(Subscriber delegate, CleanupState cleanupToken) { + this.delegate = delegate; + this.cleanupToken = cleanupToken; + } + + @Override + public void onSubscribe(Subscription subscription) { + cleanupToken.subscribed(subscription); + delegate.onSubscribe(new Subscription() { + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + cleanupToken.doComplete(); + subscription.cancel(); + } + }); + } + + @Override + public void onNext(@Nullable T t) { + delegate.onNext(t); + } + + @Override + public void onError(Throwable t) { + cleanupToken.doComplete(); + delegate.onError(t); + } + + @Override + public void onComplete() { + cleanupToken.doComplete(); + delegate.onComplete(); + } + + + } + + private static final class CleanupState { + + private static final AtomicReferenceFieldUpdater UPDATER = + AtomicReferenceFieldUpdater.newUpdater(CleanupState.class, Object.class,"state"); + private static final String COMPLETE = "complete"; + + private final String message; + volatile Object state; + + public CleanupState(Publisher parent, String message) { + this.message = message; + this.state = parent; + } + + void doComplete() { + UPDATER.set(this, COMPLETE); + } + + private boolean checkComplete() { + Object previous = UPDATER.getAndSet(this, COMPLETE); + if (previous != COMPLETE) { + // This means something leaked. + if (previous instanceof Publisher) { + // never subscribed to. + SourceAdapters.toSource((Publisher) previous).subscribe(CancelImmediatelySubscriber.INSTANCE); + } else { + assert previous instanceof Cancellable; + Cancellable cancellable = (Cancellable) previous; + cancellable.cancel(); + } + return true; + } else { + return false; + } + } + + void subscribed(Subscription subscription) { + while (true) { + Object old = UPDATER.get(this); + if (old == COMPLETE || old instanceof Subscription) { + // TODO: What to do here? + LOGGER.debug("Publisher subscribed to multiple times."); + return; + } else if (UPDATER.compareAndSet(this, old, subscription)) { + return; + } + } + } + + void check() { + if (checkComplete()) { + LOGGER.warn(message); + } + } + } +}