From 04dc4479f336aac2220cf131ebb9e7a3eb781e6f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 21 Dec 2023 10:49:15 -0500 Subject: [PATCH] NIFI-12532: Ensure that when CommunicateAction completes (exceptionally or otherwise) that it gets removed from the list of all CommunicationActions --- .../server/ConnectionLoadBalanceServer.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java index 666620b9550f..8db116bbc994 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicLong; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLPeerUnverifiedException; @@ -135,8 +136,7 @@ protected static class CommunicateAction implements Runnable { private volatile boolean stopped = false; - // This should be final but it is not to allow override during testing; no production code modifies the value - private static int EXCEPTION_THRESHOLD_MILLIS = 10_000; + private static final int EXCEPTION_THRESHOLD_MILLIS = 10_000; private volatile long tlsErrorLastSeen = -1; public CommunicateAction(final LoadBalanceProtocol loadBalanceProtocol, final Socket socket, final EventReporter eventReporter) throws IOException { @@ -187,6 +187,8 @@ public void run() { logger.error("Failed to communicate over Channel {}", channelDescription, e); eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", "Failed to receive FlowFiles for Load Balancing due to " + e); } + + return; } } } @@ -265,11 +267,18 @@ public void run() { socket.setSoTimeout(connectionTimeoutMillis); final CommunicateAction communicateAction = new CommunicateAction(loadBalanceProtocol, socket, eventReporter); - final Thread commsThread = new Thread(communicateAction); + communicationActions.add(communicateAction); + + final Thread commsThread = new Thread(() -> { + try { + communicateAction.run(); + } finally { + communicationActions.remove(communicateAction); + } + }); + commsThread.setName("Load-Balance Server Thread-" + threadCounter.getAndIncrement()); commsThread.start(); - - communicationActions.add(communicateAction); } catch (final Exception e) { logger.error("{} Failed to accept connection from other node in cluster", ConnectionLoadBalanceServer.this, e); }