From f14096341ce48a1ec117d2f4bcff949d17c8b109 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 30 Oct 2024 12:49:31 -0400 Subject: [PATCH] ARTEMIS-5119 Expired Messages on Cluster SNF should to to the original Expiry Queue --- .../artemis/core/server/ActiveMQServer.java | 4 + .../core/server/impl/ActiveMQServerImpl.java | 25 +++ .../artemis/core/server/impl/QueueImpl.java | 181 ++++++++++-------- .../cluster/expiry/ClusteredExpiryTest.java | 118 ++++++++++++ .../server/ExpireQueueSuffixTest.java | 101 ++++++++++ 5 files changed, 345 insertions(+), 84 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index e491706d802..d35cad4304e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -682,6 +682,10 @@ default Queue locateQueue(String queueName) { return locateQueue(SimpleString.of(queueName)); } + default Queue locateQueue(String address, String queue) throws Exception { + return null; + } + default BindingQueryResult bindingQuery(SimpleString address) throws Exception { return bindingQuery(address, true); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 287db949c36..8a1956f2b51 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -111,6 +111,7 @@ import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding; @@ -2385,6 +2386,30 @@ public Queue locateQueue(SimpleString queueName) { return (Queue) binding.getBindable(); } + @Override + public Queue locateQueue(String address, String queue) throws Exception { + Bindings bindings = postOffice.getBindingsForAddress(SimpleString.of(address)); + if (bindings == null) { + return null; + } + + Binding binding = bindings.getBinding(queue); + if (binding == null) { + return null; + } + + Bindable bindingContent = binding.getBindable(); + + if (!(bindingContent instanceof Queue)) { + if (logger.isDebugEnabled()) { + logger.debug("localQueue({}. {}) found non Queue ( {} ) on binding table, returning null instead", address, queue, bindingContent); + } + return null; + } + + return (Queue) bindingContent; + } + @Deprecated @Override public Queue deployQueue(final SimpleString address, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 06876e8dad6..b3a57d1af89 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -274,7 +274,8 @@ private void checkIDSupplier(NodeStoreFactory nodeStoreFactory private final StorageManager storageManager; - private volatile AddressSettings addressSettings; + // Instead of looking up the AddressSettings every time, we cache and monitor it through onChange + private volatile AddressSettings cachedAddressSettings; private final ActiveMQServer server; @@ -733,9 +734,9 @@ public QueueImpl(final QueueConfiguration queueConfiguration, if (addressSettingsRepository != null) { addressSettingsRepositoryListener = new AddressSettingsRepositoryListener(addressSettingsRepository); addressSettingsRepository.registerListener(addressSettingsRepositoryListener); - this.addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch()); + this.cachedAddressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch()); } else { - this.addressSettings = new AddressSettings(); + this.cachedAddressSettings = new AddressSettings(); } if (pageSubscription != null) { @@ -757,9 +758,9 @@ public QueueImpl(final QueueConfiguration queueConfiguration, this.ringSize = queueConfiguration.getRingSize() == null ? ActiveMQDefaultConfiguration.getDefaultRingSize() : queueConfiguration.getRingSize(); - this.initialQueueBufferSize = this.addressSettings.getInitialQueueBufferSize() == null + this.initialQueueBufferSize = this.cachedAddressSettings.getInitialQueueBufferSize() == null ? ActiveMQDefaultConfiguration.INITIAL_QUEUE_BUFFER_SIZE - : this.addressSettings.getInitialQueueBufferSize(); + : this.cachedAddressSettings.getInitialQueueBufferSize(); this.intermediateMessageReferences = new MpscUnboundedArrayQueue<>(initialQueueBufferSize); } @@ -2129,36 +2130,92 @@ public void expire(final MessageReference ref) throws Exception { * hence no information about delivering statistics should be updated. */ @Override public void expire(final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception { - if (addressSettings.getExpiryAddress() != null) { - createExpiryResources(); + expire(null, ref, consumer, delivering); + } - if (logger.isTraceEnabled()) { - logger.trace("moving expired reference {} to address = {} from queue={}", ref, addressSettings.getExpiryAddress(), name); + private void expire(final Transaction tx, final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception { + AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage()); + SimpleString expiryAddress = settingsToUse.getExpiryAddress(); + + if (logger.isDebugEnabled()) { + logger.debug("expire on {}/{}, consumer={}, expiryAddress={}", this.address, this.name, consumer, expiryAddress); + } + + if (expiryAddress != null && expiryAddress.length() != 0) { + String messageAddress = ref.getMessage().getAddress(); + + if (messageAddress == null) { + // in the unlikely event where a message does not have an address stored on the message itself, + // we will get the address from the current queue + messageAddress = String.valueOf(getAddress()); } + createExpiryResources(messageAddress, settingsToUse); - move(null, addressSettings.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer, null, delivering); + Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress); + + if (bindingList == null || bindingList.getBindings().isEmpty()) { + if (!printErrorExpiring) { + // print this only once + ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress); + printErrorExpiring = true; + } + acknowledge(tx, ref, AckReason.EXPIRED, null, delivering); + } else { + move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, consumer, null, delivering); + } } else { - logger.trace("expiry is null, just acking expired message for reference {} from queue={}", ref, name); + if (!printErrorExpiring) { + printErrorExpiring = true; + // print this only once + ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name); + } - acknowledge(null, ref, AckReason.EXPIRED, consumer, delivering); + acknowledge(tx, ref, AckReason.EXPIRED, consumer, delivering); } // potentially auto-delete this queue if this expired the last message refCountForConsumers.check(); if (server != null && server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer)); + if (tx == null) { + server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, settingsToUse.getExpiryAddress(), consumer)); + } else { + ExpiryLogger expiryLogger = (ExpiryLogger) tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER); + if (expiryLogger == null) { + expiryLogger = new ExpiryLogger(); + tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger); + tx.addOperation(expiryLogger); + } + + expiryLogger.addExpiry(address, ref); + + // potentially auto-delete this queue if this expired the last message + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + refCountForConsumers.check(); + } + }); + } + } + } + + private AddressSettings getMessageAddressSettings(Message message) { + if (message.getAddress() == null || message.getAddress().equals(String.valueOf(address))) { + return cachedAddressSettings; + } else { + return server.getAddressSettingsRepository().getMatch(message.getAddress()); } } @Override public SimpleString getExpiryAddress() { - return this.addressSettings.getExpiryAddress(); + return this.cachedAddressSettings.getExpiryAddress(); } @Override public SimpleString getDeadLetterAddress() { - return this.addressSettings.getDeadLetterAddress(); + return this.cachedAddressSettings.getDeadLetterAddress(); } @Override @@ -2510,7 +2567,7 @@ public synchronized int expireReferences(final Filter filter) throws Exception { MessageReference ref = iter.next(); if (filter == null || filter.match(ref.getMessage())) { incDelivering(ref); - expire(tx, ref, true); + expire(tx, ref, null, true); iter.remove(); refRemoved(ref); count++; @@ -2543,7 +2600,7 @@ public void expireReferences(Runnable done) { } private boolean isExpiryDisabled() { - final SimpleString expiryAddress = addressSettings.getExpiryAddress(); + final SimpleString expiryAddress = cachedAddressSettings.getExpiryAddress(); if (expiryAddress != null && expiryAddress.equals(this.address)) { // check expire with itself would be silly (waste of time) logger.trace("Redundant expiration from {} to {}", address, expiryAddress); @@ -2637,7 +2694,7 @@ public void run() { final Transaction tx = new TransactionImpl(storageManager); for (MessageReference ref : expiredMessages) { try { - expire(tx, ref, true); + expire(tx, ref, null, true); refRemoved(ref); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(ref, e); @@ -3598,23 +3655,23 @@ public Pair checkRedelivery(final MessageReference reference, storageManager.updateDeliveryCount(reference); } - int maxDeliveries = addressSettings.getMaxDeliveryAttempts(); + int maxDeliveries = cachedAddressSettings.getMaxDeliveryAttempts(); int deliveryCount = reference.getDeliveryCount(); // First check DLA if (maxDeliveries > 0 && deliveryCount >= maxDeliveries) { if (logger.isTraceEnabled()) { logger.trace("Sending reference {} to DLA = {} since ref.getDeliveryCount={} and maxDeliveries={} from queue={}", - reference, addressSettings.getDeadLetterAddress(), reference.getDeliveryCount(), maxDeliveries, name); + reference, cachedAddressSettings.getDeadLetterAddress(), reference.getDeliveryCount(), maxDeliveries, name); } - boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress()); + boolean dlaResult = sendToDeadLetterAddress(null, reference, cachedAddressSettings.getDeadLetterAddress()); return new Pair<>(false, dlaResult); } else { // Second check Redelivery Delay - long redeliveryDelay = addressSettings.getRedeliveryDelay(); + long redeliveryDelay = cachedAddressSettings.getRedeliveryDelay(); if (!ignoreRedeliveryDelay && redeliveryDelay > 0) { - redeliveryDelay = calculateRedeliveryDelay(addressSettings, deliveryCount); + redeliveryDelay = calculateRedeliveryDelay(cachedAddressSettings, deliveryCount); if (logger.isTraceEnabled()) { logger.trace("Setting redeliveryDelay={} on reference={}", redeliveryDelay, reference); @@ -3896,51 +3953,6 @@ private Message makeCopy(final MessageReference ref, return LargeServerMessageImpl.checkLargeMessage(copy, storageManager); } - private void expire(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception { - SimpleString expiryAddress = addressSettings.getExpiryAddress(); - - if (expiryAddress != null && expiryAddress.length() != 0) { - - createExpiryResources(); - - Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress); - - if (bindingList == null || bindingList.getBindings().isEmpty()) { - ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress); - acknowledge(tx, ref, AckReason.EXPIRED, null, delivering); - } else { - move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, null, null, delivering); - } - } else { - if (!printErrorExpiring) { - printErrorExpiring = true; - // print this only once - ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name); - } - - acknowledge(tx, ref, AckReason.EXPIRED, null, delivering); - } - - if (server != null && server.hasBrokerMessagePlugins()) { - ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER); - if (expiryLogger == null) { - expiryLogger = new ExpiryLogger(); - tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger); - tx.addOperation(expiryLogger); - } - - expiryLogger.addExpiry(address, ref); - } - - // potentially auto-delete this queue if this expired the last message - tx.addOperation(new TransactionOperationAbstract() { - @Override - public void afterCommit(Transaction tx) { - refCountForConsumers.check(); - } - }); - } - private class ExpiryLogger extends TransactionOperationAbstract { List> expiries = new LinkedList<>(); @@ -3964,7 +3976,7 @@ public void afterCommit(Transaction tx) { @Override public boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception { - return sendToDeadLetterAddress(tx, ref, addressSettings.getDeadLetterAddress()); + return sendToDeadLetterAddress(tx, ref, cachedAddressSettings.getDeadLetterAddress()); } private boolean sendToDeadLetterAddress(final Transaction tx, @@ -3999,22 +4011,23 @@ private boolean sendToDeadLetterAddress(final Transaction tx, private void createDeadLetterResources() throws Exception { AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString()); - createResources(addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix()); + createResources(String.valueOf(getAddress()), addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix()); } - private void createExpiryResources() throws Exception { - AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString()); - createResources(addressSettings.isAutoCreateExpiryResources(), addressSettings.getExpiryAddress(), addressSettings.getExpiryQueuePrefix(), addressSettings.getExpiryQueueSuffix()); + private void createExpiryResources(String address, AddressSettings messageAddressSettings) throws Exception { + createResources(address, messageAddressSettings.isAutoCreateExpiryResources(), messageAddressSettings.getExpiryAddress(), messageAddressSettings.getExpiryQueuePrefix(), messageAddressSettings.getExpiryQueueSuffix()); } - private void createResources(boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception { - if (isAutoCreate && !getAddress().equals(destinationAddress)) { + private void createResources(String address, boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception { + if (isAutoCreate && !address.equals(destinationAddress)) { if (destinationAddress != null && destinationAddress.length() != 0) { - SimpleString destinationQueueName = prefix.concat(getAddress()).concat(suffix); - SimpleString filter = SimpleString.of(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress())); + SimpleString destinationQueueName = prefix.concat(address).concat(suffix); + SimpleString filter = SimpleString.of(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, address)); try { + logger.debug("Creating Resource queue {}", destinationQueueName); server.createQueue(QueueConfiguration.of(destinationQueueName).setAddress(destinationAddress).setFilterString(filter).setAutoCreated(true).setAutoCreateAddress(true), true); } catch (ActiveMQQueueExistsException e) { + logger.debug("resource {} already existed, ignoring outcome", destinationQueueName); // ignore } } @@ -4770,7 +4783,7 @@ private long getPersistentSize(final MessageReference reference) { } private void configureSlowConsumerReaper() { - if (addressSettings == null || addressSettings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) { + if (cachedAddressSettings == null || cachedAddressSettings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) { if (slowConsumerReaperFuture != null) { slowConsumerReaperFuture.cancel(false); slowConsumerReaperFuture = null; @@ -4780,13 +4793,13 @@ private void configureSlowConsumerReaper() { } } else { if (slowConsumerReaperRunnable == null) { - scheduleSlowConsumerReaper(addressSettings); - } else if (slowConsumerReaperRunnable.checkPeriod != addressSettings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != addressSettings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(addressSettings.getSlowConsumerPolicy())) { + scheduleSlowConsumerReaper(cachedAddressSettings); + } else if (slowConsumerReaperRunnable.checkPeriod != cachedAddressSettings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != cachedAddressSettings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(cachedAddressSettings.getSlowConsumerPolicy())) { if (slowConsumerReaperFuture != null) { slowConsumerReaperFuture.cancel(false); slowConsumerReaperFuture = null; } - scheduleSlowConsumerReaper(addressSettings); + scheduleSlowConsumerReaper(cachedAddressSettings); } } } @@ -4847,7 +4860,7 @@ private class AddressSettingsRepositoryListener implements HierarchicalRepositor @Override public void onChange() { - addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch()); + cachedAddressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch()); checkDeadLetterAddressAndExpiryAddress(); configureSlowConsumerReaper(); } @@ -4863,10 +4876,10 @@ private String getAddressSettingsMatch() { private void checkDeadLetterAddressAndExpiryAddress() { if (!Env.isTestEnv() && !internalQueue && !address.equals(server.getConfiguration().getManagementNotificationAddress())) { - if (addressSettings.getDeadLetterAddress() == null) { + if (cachedAddressSettings.getDeadLetterAddress() == null) { ActiveMQServerLogger.LOGGER.AddressSettingsNoDLA(name); } - if (addressSettings.getExpiryAddress() == null) { + if (cachedAddressSettings.getExpiryAddress() == null) { ActiveMQServerLogger.LOGGER.AddressSettingsNoExpiryAddress(name); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java new file mode 100644 index 00000000000..43d8ca357c9 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.expiry; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ClusteredExpiryTest extends ClusterTestBase { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + Queue snfPaused; + + @Test + public void testExpiryOnSNF() throws Exception { + setupServer(0, true, true); + setupServer(1, true, true); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, true, 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.STRICT, 1, true, 1, 0); + + servers[0].getConfiguration().setMessageExpiryScanPeriod(10); + + startServers(0, 1); + + final String queuesPrefix = "queues."; + final String queueName = queuesPrefix + getName(); + final String expirySuffix = ".Expiry"; + final String expiryPrefix = "myEXP."; + final String expiryAddress = "ExpiryAddress"; + final String resultingExpiryQueue = expiryPrefix + queueName + expirySuffix; + + servers[0].getAddressSettingsRepository().clear(); + servers[0].getAddressSettingsRepository().addMatch(queuesPrefix + "#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(expirySuffix)).setExpiryQueuePrefix(SimpleString.of(expiryPrefix)).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of(expiryAddress))); + servers[0].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + servers[1].getAddressSettingsRepository().clear(); + servers[1].getAddressSettingsRepository().addMatch(queuesPrefix + "#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(expirySuffix)).setExpiryQueuePrefix(SimpleString.of(expiryPrefix)).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of(expiryAddress))); + servers[1].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + Queue serverQueue0 = servers[0].createQueue(QueueConfiguration.of(queueName).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[1].createQueue(QueueConfiguration.of(queueName).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + + waitForBindings(0, queueName, 1, 0, true); + waitForBindings(1, queueName, 1, 0, true); + + waitForBindings(0, queueName, 1, 0, false); + waitForBindings(1, queueName, 1, 0, false); + + // pausing the SNF queue to keep messages stuck on the queue + servers[0].getPostOffice().getAllBindings().filter(f -> f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue); + assertNotNull(snfPaused); + + long NUMBER_OF_MESSAGES = 100; + + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session1.createProducer(session1.createQueue(queueName)); + producer.setTimeToLive(500); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session1.createTextMessage("hello")); + } + session1.commit(); + } + Wait.assertEquals(0L, serverQueue0::getMessageCount, 5_000, 100); + Wait.assertEquals(0L, snfPaused::getMessageCount, 5_000, 100); + Queue expiryQueue = servers[0].locateQueue(expiryAddress, resultingExpiryQueue); + assertNotNull(expiryQueue); + Wait.assertEquals(NUMBER_OF_MESSAGES, expiryQueue::getMessageCount, 5_000, 100); + + } + + private void pauseQueue(Binding binding) { + assertNull(snfPaused); + if (binding instanceof LocalQueueBinding) { + logger.info("Pausing {}", binding.getUniqueName()); + snfPaused = ((LocalQueueBinding) binding).getQueue(); + snfPaused.pause(); + } + } + +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java new file mode 100644 index 00000000000..9193f7f64ed --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.server; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class ExpireQueueSuffixTest extends ActiveMQTestBase { + + public final SimpleString queueA = SimpleString.of("queueA"); + public final SimpleString queueB = SimpleString.of("queueB"); + public final SimpleString expiryAddress = SimpleString.of("myExpiry"); + + public final SimpleString expirySuffix = SimpleString.of(".expSuffix"); + public final long EXPIRY_DELAY = 10L; + + private ActiveMQServer server; + + @Override + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, true); + server.getConfiguration().setAddressQueueScanPeriod(50L).setMessageExpiryScanPeriod(50L); + + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY).setExpiryQueueSuffix(expirySuffix)); + + server.start(); + + server.createQueue(QueueConfiguration.of(queueA).setRoutingType(RoutingType.ANYCAST)); + server.createQueue(QueueConfiguration.of(queueB).setRoutingType(RoutingType.ANYCAST)); + } + + @Test + public void testAutoCreationOfExpiryResources() throws Exception { + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + + long sendA = 7; + long sendB = 11; + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(queueA.toString())); + producer.setTimeToLive(100); + + for (int i = 0; i < sendA; i++) { + producer.send(session.createTextMessage("queueA")); + } + session.commit(); + + producer = session.createProducer(session.createQueue(queueB.toString())); + producer.setTimeToLive(100); + for (int i = 0; i < sendB; i++) { + producer.send(session.createTextMessage("queueB")); + } + session.commit(); + } + + Wait.waitFor(() -> server.locateQueue(expiryAddress.toString(), "EXP." + queueA + expirySuffix) != null, 5000); + Queue expA = server.locateQueue(expiryAddress.toString(), "EXP." + queueA + expirySuffix); + assertNotNull(expA); + + Wait.waitFor(() -> server.locateQueue(expiryAddress.toString(), "EXP." + queueB + expirySuffix) != null, 5000); + Queue expB = server.locateQueue(expiryAddress.toString(), "EXP." + queueB + expirySuffix); + assertNotNull(expB); + + Wait.assertEquals(sendA, expA::getMessageCount, 5000, 100); + Wait.assertEquals(sendB, expB::getMessageCount, 5000, 100); + } +} +