Skip to content

Commit

Permalink
Use explicit ExecutorService for Java event listening tests
Browse files Browse the repository at this point in the history
Avoid exhausting the ForkJoin.commonPool() in constrained environments, which can cause deadlocks.

Signed-off-by: Mark S. Lewis <[email protected]>
  • Loading branch information
bestbeforetoday committed Dec 7, 2023
1 parent fc42090 commit 6adf0de
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 45 deletions.
6 changes: 3 additions & 3 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>1.59.1</version>
<version>1.60.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -256,7 +256,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.6.2</version>
<version>3.6.3</version>
<configuration>
<show>public</show>
<doctitle>Hyperledger Fabric Gateway client API for Java</doctitle>
Expand Down Expand Up @@ -379,7 +379,7 @@
<plugin>
<groupId>org.owasp</groupId>
<artifactId>dependency-check-maven</artifactId>
<version>9.0.2</version>
<version>9.0.3</version>
<configuration>
<skipProvidedScope>true</skipProvidedScope>
<skipTestScope>true</skipTestScope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ private Supplier<T> readNext() {
try {
next = queue.take();
} catch (InterruptedException e) {
throw new NoSuchElementException();
Thread.currentThread().interrupt();
next = () -> null;
}
}

Expand Down
4 changes: 2 additions & 2 deletions java/src/main/java/org/hyperledger/fabric/client/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* // Process then checkpoint event
* checkpointer.checkpointChaincodeEvent(event);
* });
* } catch (io.grpc.StatusRuntimeException e) {
* } catch (GatewayRuntimeException e) {
* // Connection error
* }
* }
Expand All @@ -67,7 +67,7 @@
* // Process then checkpoint block
* checkpointer.checkpointBlock(event.getHeader().getNumber());
* });
* } catch (io.grpc.StatusRuntimeException e) {
* } catch (GatewayRuntimeException e) {
* // Connection error
* }
* }
Expand Down
62 changes: 35 additions & 27 deletions java/src/test/java/org/hyperledger/fabric/client/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,6 @@

package org.hyperledger.fabric.client;

import java.io.IOException;
import java.io.Reader;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

import com.google.protobuf.ByteString;
import io.grpc.BindableService;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -53,18 +37,39 @@
import org.hyperledger.fabric.protos.peer.TransactionAction;
import org.hyperledger.fabric.protos.peer.TxValidationCode;

import java.io.IOException;
import java.io.Reader;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

public final class TestUtils {
private static final TestUtils INSTANCE = new TestUtils();
private static final String TEST_FILE_PREFIX = "fg-test-";

private final AtomicLong currentTransactionId = new AtomicLong();
private final X509Credentials credentials = new X509Credentials();
private final ExecutorService executor = Executors.newCachedThreadPool();

public static TestUtils getInstance() {
return INSTANCE;
}

private TestUtils() { }
private TestUtils() {
Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdownNow));
}

public X509Credentials getCredentials() {
return credentials;
Expand Down Expand Up @@ -250,20 +255,23 @@ public <Request, Response> StreamObserver<Request> invokeStubDuplexCall(

try {
Stream<Response> responses = stubCall.apply(requestQueue.stream()); // Stub invocation may throw exception
responseFuture = CompletableFuture.runAsync(() -> {
try {
requestCountLatch.await();
responses.forEachOrdered(responseObserver::onNext);
responseObserver.onCompleted();
} catch (Throwable t) {
responseObserver.onError(t);
}
});
responseFuture = CompletableFuture.runAsync(
() -> {
try {
requestCountLatch.await();
responses.forEachOrdered(responseObserver::onNext);
responseObserver.onCompleted();
} catch (Throwable t) {
responseObserver.onError(t);
}
},
executor
);
} catch (Exception e) {
responseObserver.onError(e);
}

CompletableFuture<Void> finalResponseFuture = responseFuture;
final CompletableFuture<Void> finalResponseFuture = responseFuture;
responseObserver.setOnCancelHandler(() -> finalResponseFuture.cancel(true)); // Avoids gRPC error if cancel is called more than once
return streamObserverFromQueue(
requestQueue,
Expand Down
40 changes: 28 additions & 12 deletions java/src/test/java/scenario/BasicEventListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,45 @@

package scenario;

import io.grpc.Status;
import org.hyperledger.fabric.client.CloseableIterator;
import org.hyperledger.fabric.client.GatewayRuntimeException;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import org.hyperledger.fabric.client.CloseableIterator;

public final class BasicEventListener<T> implements EventListener<T> {
private final BlockingQueue<T> eventQueue = new SynchronousQueue<>();
private final Runnable close;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final CloseableIterator<T> iterator;

public BasicEventListener(final CloseableIterator<T> iterator) {
close = iterator::close;
this.iterator = iterator;

// Start reading events immediately as Java gRPC implementation may not invoke the gRPC service until the first
// read attempt occurs.
CompletableFuture.runAsync(() -> iterator.forEachRemaining(event -> {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
iterator.close();
executor.execute(this::readEvents);
}

private void readEvents() {
try {
iterator.forEachRemaining(event -> {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
iterator.close();
Thread.currentThread().interrupt();
}
});
} catch (GatewayRuntimeException e) {
if (e.getStatus().getCode() != Status.Code.CANCELLED) {
throw e;
}
}));
}
}

public T next() throws InterruptedException {
Expand All @@ -39,6 +54,7 @@ public T next() throws InterruptedException {
}

public void close() {
close.run();
executor.shutdownNow();
iterator.close();
}
}

0 comments on commit 6adf0de

Please sign in to comment.