diff --git a/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml b/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml index a396334e69..c18d61b816 100644 --- a/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml +++ b/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml @@ -19,16 +19,18 @@ - + - + - + + + diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java deleted file mode 100644 index 15bb971707..0000000000 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java +++ /dev/null @@ -1,100 +0,0 @@ -package io.servicetalk.grpc; - -import com.apple.servicetalkleak.Message; -import com.apple.servicetalkleak.ServiceTalkLeak; -import io.netty.buffer.ByteBufUtil; -import io.servicetalk.concurrent.api.Publisher; -import io.servicetalk.concurrent.api.Single; -import io.servicetalk.grpc.api.GrpcServiceContext; -import io.servicetalk.grpc.api.GrpcStatusCode; -import io.servicetalk.grpc.api.GrpcStatusException; -import io.servicetalk.grpc.netty.GrpcClients; -import io.servicetalk.grpc.netty.GrpcServers; -import io.servicetalk.http.netty.HttpProtocolConfigs; -import io.servicetalk.http.netty.SpliceFlatStreamToMetaSingle; -import io.servicetalk.logging.api.LogLevel; -import io.servicetalk.transport.api.HostAndPort; -import io.servicetalk.transport.api.IoExecutor; -import io.servicetalk.transport.netty.internal.NettyIoExecutors; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation; -import static org.junit.jupiter.api.Assertions.assertFalse; - -public class LeakRepro { - - private static final Logger LOGGER = LoggerFactory.getLogger(LeakRepro.class); - - static boolean leakDetected = false; - - static { - System.setProperty("io.servicetalk.http.netty.leakdetection", "strict"); - System.setProperty("io.netty.leakDetection.level", "paranoid"); - ByteBufUtil.setLeakListener((type, records) -> { - leakDetected = true; - LOGGER.error("ByteBuf leak detected!"); - }); - } - - IoExecutor serverExecutor = NettyIoExecutors.createIoExecutor(1, "server"); - IoExecutor clientExecutor = NettyIoExecutors.createIoExecutor(1, "client"); - - @SuppressWarnings("resource") - @Test - public void testLeak() throws Exception { - GrpcServers.forPort(8888) - .initializeHttp(b -> b - .ioExecutor(serverExecutor) - .executor(serverExecutor)) - .listenAndAwait(new ServiceTalkLeak.ServiceTalkLeakService() { - @Override - public Publisher rpc(GrpcServiceContext ctx, Publisher request) { - Publisher response = splice(request) - .flatMapPublisher(pair -> { - LOGGER.info("Initial message: " + pair.head); - return Publisher.failed(new GrpcStatusException(GrpcStatusCode.INVALID_ARGUMENT.status())); - }); - return response; - } - }); - - ServiceTalkLeak.ServiceTalkLeakClient client = GrpcClients.forAddress(HostAndPort.of("127.0.0.1", 8888)) - .initializeHttp(b -> b - .protocols(HttpProtocolConfigs.h2().enableFrameLogging("CLIENT", LogLevel.INFO, () -> true).build()) - .ioExecutor(clientExecutor) - .executor(clientExecutor)) - .build(new ServiceTalkLeak.ClientFactory()); - - for (int i = 0; i < 10; i++) { - LOGGER.info("Iteration {}", i); - blockingInvocation( - client.rpc( - Publisher.from( - Message.newBuilder().setValue("first message").build(), - Message.newBuilder().setValue("second message (which leaks)").build())) - .ignoreElements() - .onErrorComplete()); - - System.gc(); - System.runFinalization(); - } - - assertFalse(leakDetected); - } - - private static Single splice(Publisher request) { - return request.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Pair::new)); - } - - private static final class Pair { - final Message head; - final Publisher stream; - - public Pair(Message head, Publisher stream) { - this.head = head; - this.stream = stream; - } - } -} \ No newline at end of file diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GcWatchdogLeakDetectorTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GcWatchdogLeakDetectorTest.java new file mode 100644 index 0000000000..e320a43049 --- /dev/null +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GcWatchdogLeakDetectorTest.java @@ -0,0 +1,96 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.grpc.netty; + +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.grpc.api.GrpcServiceContext; +import io.servicetalk.grpc.api.GrpcStatusCode; +import io.servicetalk.grpc.api.GrpcStatusException; +import io.servicetalk.http.netty.SpliceFlatStreamToMetaSingle; +import io.servicetalk.leak.LeakMessage; +import io.servicetalk.leak.Leaker; +import io.servicetalk.transport.api.HostAndPort; + +import io.netty.buffer.ByteBufUtil; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation; +import static org.junit.jupiter.api.Assertions.assertFalse; + +final class GcWatchdogLeakDetectorTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(GcWatchdogLeakDetectorTest.class); + + private static boolean leakDetected; + + static { + System.setProperty("io.servicetalk.http.netty.leakdetection", "strict"); + System.setProperty("io.netty.leakDetection.level", "paranoid"); + ByteBufUtil.setLeakListener((type, records) -> { + leakDetected = true; + LOGGER.error("ByteBuf leak detected!"); + }); + } + + @Test + void testLeak() throws Exception { + GrpcServers.forPort(8888) + .listenAndAwait(new Leaker.LeakerService() { + @Override + public Publisher rpc(GrpcServiceContext ctx, Publisher request) { + Publisher response = splice(request) + .flatMapPublisher(pair -> Publisher.failed( + new GrpcStatusException(GrpcStatusCode.INVALID_ARGUMENT.status()))); + return response; + } + }); + + Leaker.LeakerClient client = GrpcClients.forAddress(HostAndPort.of("localhost", 8888)) + .build(new Leaker.ClientFactory()); + + for (int i = 0; i < 10; i++) { + blockingInvocation( + client.rpc( + Publisher.from( + LeakMessage.newBuilder().setValue("first LeakMessage").build(), + LeakMessage.newBuilder().setValue("second LeakMessage (which leaks)").build())) + .ignoreElements() + .onErrorComplete()); + + System.gc(); + System.runFinalization(); + } + + assertFalse(leakDetected); + } + + private static Single splice(Publisher request) { + return request.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Pair::new)); + } + + private static final class Pair { + final LeakMessage head; + final Publisher stream; + + Pair(LeakMessage head, Publisher stream) { + this.head = head; + this.stream = stream; + } + } +} diff --git a/servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto b/servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto new file mode 100644 index 0000000000..c6c3e39b4a --- /dev/null +++ b/servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_outer_classname = "ServiceTalkLeak"; +option java_package = "io.servicetalk.leak"; + +message LeakMessage { + string value = 1; +} + +service Leaker { + rpc Rpc(stream LeakMessage) returns (stream LeakMessage); +} diff --git a/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto b/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto deleted file mode 100644 index e17934d3d7..0000000000 --- a/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto +++ /dev/null @@ -1,12 +0,0 @@ -syntax = "proto3"; - -option java_multiple_files = true; -option java_package = "com.apple.servicetalkleak"; - -message Message { - string value = 1; -} - -service ServiceTalkLeak { - rpc Rpc(stream Message) returns (stream Message); -} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java index 2e38eace0f..1f4903f547 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java @@ -74,9 +74,9 @@ public Single request(final StreamingHttpRequest request) private Single requestStrict(final StreamingHttpRequest request) { return delegate().request(request.transformMessageBody(publisher -> - WatchdogLeakDetector.instrument(publisher, REQUEST_LEAK_MESSAGE))) + WatchdogLeakDetector.gcLeakDetection(publisher, REQUEST_LEAK_MESSAGE))) .map(response -> response.transformMessageBody(publisher -> - WatchdogLeakDetector.instrument(publisher, RESPONSE_LEAK_MESSAGE))); + WatchdogLeakDetector.gcLeakDetection(publisher, RESPONSE_LEAK_MESSAGE))); } private Single requestSimple(final StreamingHttpRequest request) { diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java index c54389215c..5a737c33c3 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java @@ -87,9 +87,9 @@ private Single handleStrict(final HttpServiceContext ctx, final StreamingHttpResponseFactory responseFactory) { return delegate() .handle(ctx, request.transformMessageBody(publisher -> - WatchdogLeakDetector.instrument(publisher, REQUEST_LEAK_MESSAGE)), responseFactory) + WatchdogLeakDetector.gcLeakDetection(publisher, REQUEST_LEAK_MESSAGE)), responseFactory) .map(response -> response.transformMessageBody(publisher -> - WatchdogLeakDetector.instrument(publisher, RESPONSE_LEAK_MESSAGE))); + WatchdogLeakDetector.gcLeakDetection(publisher, RESPONSE_LEAK_MESSAGE))); } private Single handleSimple(final HttpServiceContext ctx, diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java index 4ab0e70feb..2d82db55cc 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java @@ -1,3 +1,18 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.servicetalk.http.netty; import io.servicetalk.concurrent.Cancellable; @@ -7,16 +22,17 @@ import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.SourceAdapters; import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import javax.annotation.Nullable; final class WatchdogLeakDetector { @@ -54,15 +70,15 @@ private WatchdogLeakDetector() { // Singleton. } - static Publisher instrument(Publisher publisher, String message) { - return INSTANCE.instrument0(publisher, message); + static Publisher gcLeakDetection(Publisher publisher, String message) { + return INSTANCE.gcLeakDetection0(publisher, message); } static boolean strictDetection() { return STRICT_DETECTION; } - private Publisher instrument0(Publisher publisher, String message) { + private Publisher gcLeakDetection0(Publisher publisher, String message) { maybeCleanRefs(); CleanupState cleanupState = new CleanupState(publisher, message); Publisher result = publisher.liftSync(subscriber -> new InstrumentedSubscriber<>(subscriber, cleanupState)); @@ -87,8 +103,6 @@ private void maybeCleanRefs() { } while ((ref = refQueue.poll()) != null); }); } - - } private static final class InstrumentedSubscriber implements Subscriber { @@ -96,7 +110,7 @@ private static final class InstrumentedSubscriber implements Subscriber { private final Subscriber delegate; private final CleanupState cleanupToken; - public InstrumentedSubscriber(Subscriber delegate, CleanupState cleanupToken) { + InstrumentedSubscriber(Subscriber delegate, CleanupState cleanupToken) { this.delegate = delegate; this.cleanupToken = cleanupToken; } @@ -134,20 +148,18 @@ public void onComplete() { cleanupToken.doComplete(); delegate.onComplete(); } - - } private static final class CleanupState { private static final AtomicReferenceFieldUpdater UPDATER = - AtomicReferenceFieldUpdater.newUpdater(CleanupState.class, Object.class,"state"); + AtomicReferenceFieldUpdater.newUpdater(CleanupState.class, Object.class, "state"); private static final String COMPLETE = "complete"; private final String message; volatile Object state; - public CleanupState(Publisher parent, String message) { + CleanupState(Publisher parent, String message) { this.message = message; this.state = parent; }