Skip to content

Commit

Permalink
ARTEMIS-5017 bridge leaks ClientSessionFactory instance on reconnect …
Browse files Browse the repository at this point in the history
…attempt
  • Loading branch information
jbertram authored and clebertsuconic committed Aug 28, 2024
1 parent 898c09c commit d0c83af
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1914,6 +1914,10 @@ public boolean isReceivedTopology() {
return receivedTopology;
}

public int getClientSessionFactoryCount() {
return factories.size();
}

private String fromInterceptors(final List<Interceptor> interceptors) {
StringBuffer buffer = new StringBuffer();
boolean first = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ public ClientSessionFactory getSessionFactory() {
return csf;
}

// for tests
public ServerLocatorInternal getServerLocator() {
return serverLocator;
}

/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.server.Consumer#getDeliveringMessages()
*/
Expand Down Expand Up @@ -1024,6 +1029,10 @@ public void run() {
if (csf == null || csf.isClosed()) {
if (state == State.STOPPING || state == State.PAUSING)
return;
if (csf != null && csf.isClosed()) {
// ensure we release any references to the existing ClientSessionFactory before creating a new one otherwise we will leak
serverLocator.factoryClosed(csf);
}
csf = createSessionFactory();
if (csf == null) {
// Retrying. This probably means the node is not available (for the cluster connection case)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
Expand Down Expand Up @@ -521,6 +522,38 @@ public void internaltestSimpleBridge(final boolean largeMessage, final boolean u
}
}

@TestTemplate
public void testClientSessionFactoryLeak() throws Exception {
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);

Map<String, Object> server1Params = new HashMap<>();
addTargetParameters(server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);

final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";

TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);

HashMap<String, TransportConfiguration> connectors = new HashMap<>();
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);

List<String> connectorConfig = List.of(server1tc.getName());
// intentionally configure a bridge that will fail to connect and attempt to reconnect multiple times quickly
server0.getConfiguration().setBridgeConfigurations(List.of(new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(0).setReconnectAttempts(-1).setStaticConnectors(connectorConfig)));
server0.getConfiguration().setQueueConfigs(List.of(QueueConfiguration.of(queueName0).setAddress(testAddress)));

server1.start();
server0.start();

ServerLocatorImpl serverLocator = (ServerLocatorImpl) ((BridgeImpl)server0.getClusterManager().getBridges().get("bridge1")).getServerLocator();
Wait.waitFor(() -> serverLocator.getClientSessionFactoryCount() > 1, 500, 10);
assertTrue(serverLocator.getClientSessionFactoryCount() <= 1);
}

/**
* @param server1Params
*/
Expand Down

0 comments on commit d0c83af

Please sign in to comment.