Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
AntonRoskvist committed Aug 8, 2024
1 parent 8250e61 commit 3f5831c
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;

Expand Down Expand Up @@ -83,4 +85,30 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
buffer.release();
}
}

//Don't know if this belongs here but addresses a Netty WARN:
//WARN [io.netty.channel.DefaultChannelPipeline] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
//Copied from the CORE equivalent
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
if (!active) {
return;
}
// We don't want to log this - since it is normal for this to happen during failover/reconnect
// and we don't want to spew out stack traces in that event
// The user has access to this exeception anyway via the ActiveMQException initial cause

ActiveMQException me = new ActiveMQException(cause.getMessage());
me.initCause(cause);

synchronized (listener) {
try {
listenerExecutor.execute(() -> listener.connectionException(channelId(ctx.channel()), me));
active = false;
} catch (Exception ex) {
ActiveMQClientLogger.LOGGER.errorCallingLifeCycleListener(ex);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.BytesMessage;
import javax.jms.Connection;
Expand All @@ -39,6 +45,7 @@
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand All @@ -50,6 +57,7 @@
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.json.JsonValue;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
Expand Down Expand Up @@ -1383,5 +1391,149 @@ private void doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNod
assertNull(consumerR.receiveNoWait());
}
}

@Test
@Timeout(200)
public void testConsumerCleanupOnLinkFailAMQPRemote() throws Exception {
testConsumerCleanupOnLinkFail("AMQP", false);
}

@Test
@Timeout(200)
public void testConsumerCleanupOnLinkFailAMQPLocal() throws Exception {
testConsumerCleanupOnLinkFail("AMQP", true);
}

@Test
@Timeout(200)
public void testConsumerCleanupOnLinkFailCoreRemote() throws Exception {
testConsumerCleanupOnLinkFail("CORE", false);
}

@Test
@Timeout(200)
public void testConsumerCleanupOnLinkFailCoreLocal() throws Exception {
testConsumerCleanupOnLinkFail("CORE", true);
}

private void testConsumerCleanupOnLinkFail(String clientProtocol, boolean local) throws Exception {
final String queueName = "test";
final int messageCount = 100;
logger.info("Test started: {}", getTestName());

final AMQPFederationQueuePolicyElement localQueuePolicy1 = new AMQPFederationQueuePolicyElement();
localQueuePolicy1.setName("local-test-policy");
localQueuePolicy1.addToIncludes("#", "#");
localQueuePolicy1.setPriorityAdjustment(-5);
localQueuePolicy1.setIncludeFederated(false);

final AMQPFederationQueuePolicyElement localQueuePolicy2 = new AMQPFederationQueuePolicyElement();
localQueuePolicy2.setName("remote-test-policy");
localQueuePolicy2.addToIncludes("#", "#");
localQueuePolicy2.setPriorityAdjustment(-5);
localQueuePolicy2.setIncludeFederated(false);

final AMQPFederatedBrokerConnectionElement element1 = new AMQPFederatedBrokerConnectionElement();
element1.setName(getTestName() + "_1");
if (local) {
element1.addLocalQueuePolicy(localQueuePolicy1);
} else {
element1.addRemoteQueuePolicy(localQueuePolicy1);
}

final AMQPFederatedBrokerConnectionElement element2 = new AMQPFederatedBrokerConnectionElement();
element2.setName(getTestName() + "_2");
if (local) {
element2.addLocalQueuePolicy(localQueuePolicy2);
} else {
element2.addRemoteQueuePolicy(localQueuePolicy2);
}

final AMQPBrokerConnectConfiguration amqpConnection1 =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + SERVER_PORT_REMOTE);
amqpConnection1.setReconnectAttempts(10);// Limit reconnects
amqpConnection1.addFederation(element1);

final AMQPBrokerConnectConfiguration amqpConnection2 =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + SERVER_PORT);
amqpConnection2.setReconnectAttempts(10);// Limit reconnects
amqpConnection2.addFederation(element2);

server.getConfiguration().addAMQPConnection(amqpConnection1).setName("Local");
remoteServer.getConfiguration().addAMQPConnection(amqpConnection2).setName("Remote");

remoteServer.start();
server.start();

server.createQueue(QueueConfiguration.of("test").setRoutingType(RoutingType.ANYCAST));
remoteServer.createQueue(QueueConfiguration.of("test").setRoutingType(RoutingType.ANYCAST));

final ConnectionFactory factoryLocal = CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + SERVER_PORT);
final ConnectionFactory factoryRemote = CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + SERVER_PORT_REMOTE);

ExecutorService executorService = Executors.newCachedThreadPool();
AtomicInteger count = new AtomicInteger(0);
AtomicBoolean run = new AtomicBoolean(true);

runAfter(executorService::shutdownNow);

executorService.submit(() -> {
while (run.get()) {
try (Connection connection = factoryRemote.createConnection()) {
connection.start();

Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);

while (run.get()) {
if (consumer.receive(10) != null) {
count.incrementAndGet();
}
}

} catch (Exception ignore) { }
}
});

try (Connection connection = factoryLocal.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)
) {

Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);

producer.send(session.createTextMessage("msg"));
Wait.assertTrue(() -> remoteServer.locateQueue(queueName).getMessagesAcknowledged() == 1L);

//Simulate intermittent network failure
//running this multiple times creates more orphaned consumers
remoteServer.getActiveMQServerControl().closeConsumerConnectionsForAddress(queueName);

for (int i = 1; i < messageCount; i++) {
producer.send(session.createTextMessage("msg"));
}

Wait.assertTrue(() -> count.get() >= messageCount);

String localConsumersJSON = server.getActiveMQServerControl().listAllConsumersAsJSON();
boolean failed = false;

for (JsonValue value : JsonUtil.readJsonArray(localConsumersJSON)) {
if (value.asJsonObject().getString("status").equals("Orphaned")) {
System.out.println("Orphaned: " + value);
failed = true;
}
}

run.set(false);
executorService.shutdown();
assertTrue(executorService.awaitTermination(5000, TimeUnit.MILLISECONDS));

assertFalse(failed);
}

}

}

0 comments on commit 3f5831c

Please sign in to comment.