Skip to content

Commit

Permalink
NIFI-12532: Ensure that when CommunicateAction completes (exceptional…
Browse files Browse the repository at this point in the history
…ly or otherwise) that it gets removed from the list of all CommunicationActions
  • Loading branch information
markap14 committed Dec 21, 2023
1 parent 02d563e commit 04dc447
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
Expand Down Expand Up @@ -135,8 +136,7 @@ protected static class CommunicateAction implements Runnable {

private volatile boolean stopped = false;

// This should be final but it is not to allow override during testing; no production code modifies the value
private static int EXCEPTION_THRESHOLD_MILLIS = 10_000;
private static final int EXCEPTION_THRESHOLD_MILLIS = 10_000;
private volatile long tlsErrorLastSeen = -1;

public CommunicateAction(final LoadBalanceProtocol loadBalanceProtocol, final Socket socket, final EventReporter eventReporter) throws IOException {
Expand Down Expand Up @@ -187,6 +187,8 @@ public void run() {
logger.error("Failed to communicate over Channel {}", channelDescription, e);
eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", "Failed to receive FlowFiles for Load Balancing due to " + e);
}

return;
}
}
}
Expand Down Expand Up @@ -265,11 +267,18 @@ public void run() {
socket.setSoTimeout(connectionTimeoutMillis);

final CommunicateAction communicateAction = new CommunicateAction(loadBalanceProtocol, socket, eventReporter);
final Thread commsThread = new Thread(communicateAction);
communicationActions.add(communicateAction);

final Thread commsThread = new Thread(() -> {
try {
communicateAction.run();
} finally {
communicationActions.remove(communicateAction);
}
});

commsThread.setName("Load-Balance Server Thread-" + threadCounter.getAndIncrement());
commsThread.start();

communicationActions.add(communicateAction);
} catch (final Exception e) {
logger.error("{} Failed to accept connection from other node in cluster", ConnectionLoadBalanceServer.this, e);
}
Expand Down

0 comments on commit 04dc447

Please sign in to comment.