From 6adf0deb7dfbbff1a70ef58b35857ab6c0656521 Mon Sep 17 00:00:00 2001 From: "Mark S. Lewis" Date: Sat, 2 Dec 2023 16:13:50 +0000 Subject: [PATCH] Use explicit ExecutorService for Java event listening tests Avoid exhausting the ForkJoin.commonPool() in constrained environments, which can cause deadlocks. Signed-off-by: Mark S. Lewis --- java/pom.xml | 6 +- .../fabric/client/GatewayClient.java | 3 +- .../hyperledger/fabric/client/Network.java | 4 +- .../hyperledger/fabric/client/TestUtils.java | 62 +++++++++++-------- .../java/scenario/BasicEventListener.java | 40 ++++++++---- 5 files changed, 70 insertions(+), 45 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index 9cf1a7a5c..016882704 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -61,7 +61,7 @@ io.grpc grpc-bom - 1.59.1 + 1.60.0 pom import @@ -256,7 +256,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.6.2 + 3.6.3 public Hyperledger Fabric Gateway client API for Java @@ -379,7 +379,7 @@ org.owasp dependency-check-maven - 9.0.2 + 9.0.3 true true diff --git a/java/src/main/java/org/hyperledger/fabric/client/GatewayClient.java b/java/src/main/java/org/hyperledger/fabric/client/GatewayClient.java index d9e6c4e1e..0cf95e6a9 100644 --- a/java/src/main/java/org/hyperledger/fabric/client/GatewayClient.java +++ b/java/src/main/java/org/hyperledger/fabric/client/GatewayClient.java @@ -277,7 +277,8 @@ private Supplier readNext() { try { next = queue.take(); } catch (InterruptedException e) { - throw new NoSuchElementException(); + Thread.currentThread().interrupt(); + next = () -> null; } } diff --git a/java/src/main/java/org/hyperledger/fabric/client/Network.java b/java/src/main/java/org/hyperledger/fabric/client/Network.java index 9f42964e6..e042ac1aa 100644 --- a/java/src/main/java/org/hyperledger/fabric/client/Network.java +++ b/java/src/main/java/org/hyperledger/fabric/client/Network.java @@ -48,7 +48,7 @@ * // Process then checkpoint event * checkpointer.checkpointChaincodeEvent(event); * }); - * } catch (io.grpc.StatusRuntimeException e) { + * } catch (GatewayRuntimeException e) { * // Connection error * } * } @@ -67,7 +67,7 @@ * // Process then checkpoint block * checkpointer.checkpointBlock(event.getHeader().getNumber()); * }); - * } catch (io.grpc.StatusRuntimeException e) { + * } catch (GatewayRuntimeException e) { * // Connection error * } * } diff --git a/java/src/test/java/org/hyperledger/fabric/client/TestUtils.java b/java/src/test/java/org/hyperledger/fabric/client/TestUtils.java index c7e4c20d0..c7cb84f85 100644 --- a/java/src/test/java/org/hyperledger/fabric/client/TestUtils.java +++ b/java/src/test/java/org/hyperledger/fabric/client/TestUtils.java @@ -6,22 +6,6 @@ package org.hyperledger.fabric.client; -import java.io.IOException; -import java.io.Reader; -import java.io.UncheckedIOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.attribute.FileAttribute; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Stream; - import com.google.protobuf.ByteString; import io.grpc.BindableService; import io.grpc.ManagedChannel; @@ -53,18 +37,39 @@ import org.hyperledger.fabric.protos.peer.TransactionAction; import org.hyperledger.fabric.protos.peer.TxValidationCode; +import java.io.IOException; +import java.io.Reader; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Stream; + public final class TestUtils { private static final TestUtils INSTANCE = new TestUtils(); private static final String TEST_FILE_PREFIX = "fg-test-"; private final AtomicLong currentTransactionId = new AtomicLong(); private final X509Credentials credentials = new X509Credentials(); + private final ExecutorService executor = Executors.newCachedThreadPool(); public static TestUtils getInstance() { return INSTANCE; } - private TestUtils() { } + private TestUtils() { + Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdownNow)); + } public X509Credentials getCredentials() { return credentials; @@ -250,20 +255,23 @@ public StreamObserver invokeStubDuplexCall( try { Stream responses = stubCall.apply(requestQueue.stream()); // Stub invocation may throw exception - responseFuture = CompletableFuture.runAsync(() -> { - try { - requestCountLatch.await(); - responses.forEachOrdered(responseObserver::onNext); - responseObserver.onCompleted(); - } catch (Throwable t) { - responseObserver.onError(t); - } - }); + responseFuture = CompletableFuture.runAsync( + () -> { + try { + requestCountLatch.await(); + responses.forEachOrdered(responseObserver::onNext); + responseObserver.onCompleted(); + } catch (Throwable t) { + responseObserver.onError(t); + } + }, + executor + ); } catch (Exception e) { responseObserver.onError(e); } - CompletableFuture finalResponseFuture = responseFuture; + final CompletableFuture finalResponseFuture = responseFuture; responseObserver.setOnCancelHandler(() -> finalResponseFuture.cancel(true)); // Avoids gRPC error if cancel is called more than once return streamObserverFromQueue( requestQueue, diff --git a/java/src/test/java/scenario/BasicEventListener.java b/java/src/test/java/scenario/BasicEventListener.java index 2765680b3..8bda90f83 100644 --- a/java/src/test/java/scenario/BasicEventListener.java +++ b/java/src/test/java/scenario/BasicEventListener.java @@ -6,30 +6,45 @@ package scenario; +import io.grpc.Status; +import org.hyperledger.fabric.client.CloseableIterator; +import org.hyperledger.fabric.client.GatewayRuntimeException; + import java.util.Objects; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; -import org.hyperledger.fabric.client.CloseableIterator; - public final class BasicEventListener implements EventListener { private final BlockingQueue eventQueue = new SynchronousQueue<>(); - private final Runnable close; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private final CloseableIterator iterator; public BasicEventListener(final CloseableIterator iterator) { - close = iterator::close; + this.iterator = iterator; // Start reading events immediately as Java gRPC implementation may not invoke the gRPC service until the first // read attempt occurs. - CompletableFuture.runAsync(() -> iterator.forEachRemaining(event -> { - try { - eventQueue.put(event); - } catch (InterruptedException e) { - iterator.close(); + executor.execute(this::readEvents); + } + + private void readEvents() { + try { + iterator.forEachRemaining(event -> { + try { + eventQueue.put(event); + } catch (InterruptedException e) { + iterator.close(); + Thread.currentThread().interrupt(); + } + }); + } catch (GatewayRuntimeException e) { + if (e.getStatus().getCode() != Status.Code.CANCELLED) { + throw e; } - })); + } } public T next() throws InterruptedException { @@ -39,6 +54,7 @@ public T next() throws InterruptedException { } public void close() { - close.run(); + executor.shutdownNow(); + iterator.close(); } }