Skip to content

Commit

Permalink
Additional handling and test for diverting redistributed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
AntonRoskvist committed Apr 8, 2024
1 parent 87fc689 commit 93f47b6
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public void route(final Message message, final RoutingContext context) throws Ex

copy.setExpiration(message.getExpiration());

//This header could be set if the message is redistributed from a clustered broker.
//It needs to be removed as it will interfere with upcoming routing
copy.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);

switch (routingType) {
case ANYCAST:
copy.setRoutingType(RoutingType.ANYCAST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Ignore;
Expand Down Expand Up @@ -546,4 +549,50 @@ public void testSimpleRoundRobbinNoFailure() throws Exception {

}

@Test
public void testDivertRedistributedMessage() throws Exception {
final String queue = "queue0";
final String divertedQueueName = "divertedQueue";
final int messageCount = 10;

setupServer(0, true, isNetty());
setupServer(1, true, isNetty());
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);

servers[0].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
servers[1].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));

startServers(0, 1);

servers[0].deployDivert(new DivertConfiguration()
.setName("myDivert")
.setAddress(queue)
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
.setForwardingAddress(divertedQueueName)
.setExclusive(true));

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());

createQueue(0, queue, queue, null, true, RoutingType.ANYCAST);
createQueue(1, queue, queue, null, true, RoutingType.ANYCAST);
createQueue(0, divertedQueueName, divertedQueueName, null, true, RoutingType.ANYCAST);
createQueue(1, divertedQueueName, divertedQueueName, null, true, RoutingType.ANYCAST);

addConsumer(0, 0, queue, null);

waitForBindings(0, queue, 1, 1, true);
waitForBindings(1, queue, 1, 1, false);

send(1, queue, messageCount, true, null);

Wait.assertEquals((long) messageCount, () -> servers[0].locateQueue(divertedQueueName).getMessageCount(), 2000, 100);

addConsumer(1, 1, divertedQueueName, null);

verifyReceiveAll(messageCount, 1);
closeAllConsumers();
}

}

0 comments on commit 93f47b6

Please sign in to comment.