Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Jan 14, 2025
1 parent 47d1fcf commit ed72a8c
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 130 deletions.
8 changes: 5 additions & 3 deletions servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
<Match>
<Or>
<Source name="~CompatGrpc\.java"/>
<Source name="~EsTesterGrpc\.java"/>
<!-- <Source name="~EsTesterGrpc\.java"/>-->
<Source name="~TesterGrpc\.java"/>
<Source name="~CompatProto\.java"/>
<Source name="~EsProto\.java"/>
<!-- <Source name="~EsProto\.java"/>-->
<Source name="~TesterProto\.java"/>
<Source name="~HelloReply\.java"/>
<Source name="~HelloRequest\.java"/>
<Source name="~Greeter\.java"/>
<Source name="~GreeterGrpc\.java"/>
<Source name="~HelloWorldProto\.java"/>
<Source name="~ServiceTalkLeak\.java"/>
<Source name="~Leaker\.java"/>
<Source name="~LeakMessage\.java"/>
</Or>
</Match>
<Match>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.grpc.netty;

import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.grpc.api.GrpcServiceContext;
import io.servicetalk.grpc.api.GrpcStatusCode;
import io.servicetalk.grpc.api.GrpcStatusException;
import io.servicetalk.http.netty.SpliceFlatStreamToMetaSingle;
import io.servicetalk.leak.LeakMessage;
import io.servicetalk.leak.Leaker;
import io.servicetalk.transport.api.HostAndPort;

import io.netty.buffer.ByteBufUtil;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation;
import static org.junit.jupiter.api.Assertions.assertFalse;

final class GcWatchdogLeakDetectorTest {

private static final Logger LOGGER = LoggerFactory.getLogger(GcWatchdogLeakDetectorTest.class);

private static boolean leakDetected;

static {
System.setProperty("io.servicetalk.http.netty.leakdetection", "strict");
System.setProperty("io.netty.leakDetection.level", "paranoid");
ByteBufUtil.setLeakListener((type, records) -> {
leakDetected = true;
LOGGER.error("ByteBuf leak detected!");
});
}

@Test
void testLeak() throws Exception {
GrpcServers.forPort(8888)
.listenAndAwait(new Leaker.LeakerService() {
@Override
public Publisher<LeakMessage> rpc(GrpcServiceContext ctx, Publisher<LeakMessage> request) {
Publisher<LeakMessage> response = splice(request)
.flatMapPublisher(pair -> Publisher.failed(
new GrpcStatusException(GrpcStatusCode.INVALID_ARGUMENT.status())));
return response;
}
});

Leaker.LeakerClient client = GrpcClients.forAddress(HostAndPort.of("localhost", 8888))
.build(new Leaker.ClientFactory());

for (int i = 0; i < 10; i++) {
blockingInvocation(
client.rpc(
Publisher.from(
LeakMessage.newBuilder().setValue("first LeakMessage").build(),
LeakMessage.newBuilder().setValue("second LeakMessage (which leaks)").build()))
.ignoreElements()
.onErrorComplete());

System.gc();
System.runFinalization();
}

assertFalse(leakDetected);
}

private static Single<Pair> splice(Publisher<LeakMessage> request) {
return request.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Pair::new));
}

private static final class Pair {
final LeakMessage head;
final Publisher<LeakMessage> stream;

Pair(LeakMessage head, Publisher<LeakMessage> stream) {
this.head = head;
this.stream = stream;
}
}
}
13 changes: 13 additions & 0 deletions servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

option java_multiple_files = true;
option java_outer_classname = "ServiceTalkLeak";
option java_package = "io.servicetalk.leak";

message LeakMessage {
string value = 1;
}

service Leaker {
rpc Rpc(stream LeakMessage) returns (stream LeakMessage);
}
12 changes: 0 additions & 12 deletions servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)

private Single<StreamingHttpResponse> requestStrict(final StreamingHttpRequest request) {
return delegate().request(request.transformMessageBody(publisher ->
WatchdogLeakDetector.instrument(publisher, REQUEST_LEAK_MESSAGE)))
WatchdogLeakDetector.gcLeakDetection(publisher, REQUEST_LEAK_MESSAGE)))
.map(response -> response.transformMessageBody(publisher ->
WatchdogLeakDetector.instrument(publisher, RESPONSE_LEAK_MESSAGE)));
WatchdogLeakDetector.gcLeakDetection(publisher, RESPONSE_LEAK_MESSAGE)));
}

private Single<StreamingHttpResponse> requestSimple(final StreamingHttpRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ private Single<StreamingHttpResponse> handleStrict(final HttpServiceContext ctx,
final StreamingHttpResponseFactory responseFactory) {
return delegate()
.handle(ctx, request.transformMessageBody(publisher ->
WatchdogLeakDetector.instrument(publisher, REQUEST_LEAK_MESSAGE)), responseFactory)
WatchdogLeakDetector.gcLeakDetection(publisher, REQUEST_LEAK_MESSAGE)), responseFactory)
.map(response -> response.transformMessageBody(publisher ->
WatchdogLeakDetector.instrument(publisher, RESPONSE_LEAK_MESSAGE)));
WatchdogLeakDetector.gcLeakDetection(publisher, RESPONSE_LEAK_MESSAGE)));
}

private Single<StreamingHttpResponse> handleSimple(final HttpServiceContext ctx,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.Cancellable;
Expand All @@ -7,16 +22,17 @@
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;

final class WatchdogLeakDetector {

Expand Down Expand Up @@ -54,15 +70,15 @@ private WatchdogLeakDetector() {
// Singleton.
}

static <T> Publisher<T> instrument(Publisher<T> publisher, String message) {
return INSTANCE.instrument0(publisher, message);
static <T> Publisher<T> gcLeakDetection(Publisher<T> publisher, String message) {
return INSTANCE.gcLeakDetection0(publisher, message);
}

static boolean strictDetection() {
return STRICT_DETECTION;
}

private <T> Publisher<T> instrument0(Publisher<T> publisher, String message) {
private <T> Publisher<T> gcLeakDetection0(Publisher<T> publisher, String message) {
maybeCleanRefs();
CleanupState cleanupState = new CleanupState(publisher, message);
Publisher<T> result = publisher.liftSync(subscriber -> new InstrumentedSubscriber<>(subscriber, cleanupState));
Expand All @@ -87,16 +103,14 @@ private void maybeCleanRefs() {
} while ((ref = refQueue.poll()) != null);
});
}


}

private static final class InstrumentedSubscriber<T> implements Subscriber<T> {

private final Subscriber<T> delegate;
private final CleanupState cleanupToken;

public InstrumentedSubscriber(Subscriber<T> delegate, CleanupState cleanupToken) {
InstrumentedSubscriber(Subscriber<T> delegate, CleanupState cleanupToken) {
this.delegate = delegate;
this.cleanupToken = cleanupToken;
}
Expand Down Expand Up @@ -134,20 +148,18 @@ public void onComplete() {
cleanupToken.doComplete();
delegate.onComplete();
}


}

private static final class CleanupState {

private static final AtomicReferenceFieldUpdater<CleanupState, Object> UPDATER =
AtomicReferenceFieldUpdater.newUpdater(CleanupState.class, Object.class,"state");
AtomicReferenceFieldUpdater.newUpdater(CleanupState.class, Object.class, "state");
private static final String COMPLETE = "complete";

private final String message;
volatile Object state;

public CleanupState(Publisher<?> parent, String message) {
CleanupState(Publisher<?> parent, String message) {
this.message = message;
this.state = parent;
}
Expand Down

0 comments on commit ed72a8c

Please sign in to comment.