Skip to content

Commit

Permalink
ARTEMIS-5119 Expired Messages on Cluster SNF should to to the origina…
Browse files Browse the repository at this point in the history
…l Expiry Queue
  • Loading branch information
clebertsuconic committed Jan 13, 2025
1 parent 9f7ecf3 commit f140963
Show file tree
Hide file tree
Showing 5 changed files with 345 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,10 @@ default Queue locateQueue(String queueName) {
return locateQueue(SimpleString.of(queueName));
}

default Queue locateQueue(String address, String queue) throws Exception {
return null;
}

default BindingQueryResult bindingQuery(SimpleString address) throws Exception {
return bindingQuery(address, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
Expand Down Expand Up @@ -2385,6 +2386,30 @@ public Queue locateQueue(SimpleString queueName) {
return (Queue) binding.getBindable();
}

@Override
public Queue locateQueue(String address, String queue) throws Exception {
Bindings bindings = postOffice.getBindingsForAddress(SimpleString.of(address));
if (bindings == null) {
return null;
}

Binding binding = bindings.getBinding(queue);
if (binding == null) {
return null;
}

Bindable bindingContent = binding.getBindable();

if (!(bindingContent instanceof Queue)) {
if (logger.isDebugEnabled()) {
logger.debug("localQueue({}. {}) found non Queue ( {} ) on binding table, returning null instead", address, queue, bindingContent);
}
return null;
}

return (Queue) bindingContent;
}

@Deprecated
@Override
public Queue deployQueue(final SimpleString address,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ private void checkIDSupplier(NodeStoreFactory<MessageReference> nodeStoreFactory

private final StorageManager storageManager;

private volatile AddressSettings addressSettings;
// Instead of looking up the AddressSettings every time, we cache and monitor it through onChange
private volatile AddressSettings cachedAddressSettings;

private final ActiveMQServer server;

Expand Down Expand Up @@ -733,9 +734,9 @@ public QueueImpl(final QueueConfiguration queueConfiguration,
if (addressSettingsRepository != null) {
addressSettingsRepositoryListener = new AddressSettingsRepositoryListener(addressSettingsRepository);
addressSettingsRepository.registerListener(addressSettingsRepositoryListener);
this.addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch());
this.cachedAddressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch());
} else {
this.addressSettings = new AddressSettings();
this.cachedAddressSettings = new AddressSettings();
}

if (pageSubscription != null) {
Expand All @@ -757,9 +758,9 @@ public QueueImpl(final QueueConfiguration queueConfiguration,

this.ringSize = queueConfiguration.getRingSize() == null ? ActiveMQDefaultConfiguration.getDefaultRingSize() : queueConfiguration.getRingSize();

this.initialQueueBufferSize = this.addressSettings.getInitialQueueBufferSize() == null
this.initialQueueBufferSize = this.cachedAddressSettings.getInitialQueueBufferSize() == null
? ActiveMQDefaultConfiguration.INITIAL_QUEUE_BUFFER_SIZE
: this.addressSettings.getInitialQueueBufferSize();
: this.cachedAddressSettings.getInitialQueueBufferSize();
this.intermediateMessageReferences = new MpscUnboundedArrayQueue<>(initialQueueBufferSize);
}

Expand Down Expand Up @@ -2129,36 +2130,92 @@ public void expire(final MessageReference ref) throws Exception {
* hence no information about delivering statistics should be updated. */
@Override
public void expire(final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception {
if (addressSettings.getExpiryAddress() != null) {
createExpiryResources();
expire(null, ref, consumer, delivering);
}

if (logger.isTraceEnabled()) {
logger.trace("moving expired reference {} to address = {} from queue={}", ref, addressSettings.getExpiryAddress(), name);
private void expire(final Transaction tx, final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception {
AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage());
SimpleString expiryAddress = settingsToUse.getExpiryAddress();

if (logger.isDebugEnabled()) {
logger.debug("expire on {}/{}, consumer={}, expiryAddress={}", this.address, this.name, consumer, expiryAddress);
}

if (expiryAddress != null && expiryAddress.length() != 0) {
String messageAddress = ref.getMessage().getAddress();

if (messageAddress == null) {
// in the unlikely event where a message does not have an address stored on the message itself,
// we will get the address from the current queue
messageAddress = String.valueOf(getAddress());
}
createExpiryResources(messageAddress, settingsToUse);

move(null, addressSettings.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer, null, delivering);
Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress);

if (bindingList == null || bindingList.getBindings().isEmpty()) {
if (!printErrorExpiring) {
// print this only once
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
printErrorExpiring = true;
}
acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
} else {
move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, consumer, null, delivering);
}
} else {
logger.trace("expiry is null, just acking expired message for reference {} from queue={}", ref, name);
if (!printErrorExpiring) {
printErrorExpiring = true;
// print this only once
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name);
}

acknowledge(null, ref, AckReason.EXPIRED, consumer, delivering);
acknowledge(tx, ref, AckReason.EXPIRED, consumer, delivering);
}

// potentially auto-delete this queue if this expired the last message
refCountForConsumers.check();

if (server != null && server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer));
if (tx == null) {
server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, settingsToUse.getExpiryAddress(), consumer));
} else {
ExpiryLogger expiryLogger = (ExpiryLogger) tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
if (expiryLogger == null) {
expiryLogger = new ExpiryLogger();
tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger);
tx.addOperation(expiryLogger);
}

expiryLogger.addExpiry(address, ref);

// potentially auto-delete this queue if this expired the last message
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
refCountForConsumers.check();
}
});
}
}
}

private AddressSettings getMessageAddressSettings(Message message) {
if (message.getAddress() == null || message.getAddress().equals(String.valueOf(address))) {
return cachedAddressSettings;
} else {
return server.getAddressSettingsRepository().getMatch(message.getAddress());
}
}

@Override
public SimpleString getExpiryAddress() {
return this.addressSettings.getExpiryAddress();
return this.cachedAddressSettings.getExpiryAddress();
}

@Override
public SimpleString getDeadLetterAddress() {
return this.addressSettings.getDeadLetterAddress();
return this.cachedAddressSettings.getDeadLetterAddress();
}

@Override
Expand Down Expand Up @@ -2510,7 +2567,7 @@ public synchronized int expireReferences(final Filter filter) throws Exception {
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage())) {
incDelivering(ref);
expire(tx, ref, true);
expire(tx, ref, null, true);
iter.remove();
refRemoved(ref);
count++;
Expand Down Expand Up @@ -2543,7 +2600,7 @@ public void expireReferences(Runnable done) {
}

private boolean isExpiryDisabled() {
final SimpleString expiryAddress = addressSettings.getExpiryAddress();
final SimpleString expiryAddress = cachedAddressSettings.getExpiryAddress();
if (expiryAddress != null && expiryAddress.equals(this.address)) {
// check expire with itself would be silly (waste of time)
logger.trace("Redundant expiration from {} to {}", address, expiryAddress);
Expand Down Expand Up @@ -2637,7 +2694,7 @@ public void run() {
final Transaction tx = new TransactionImpl(storageManager);
for (MessageReference ref : expiredMessages) {
try {
expire(tx, ref, true);
expire(tx, ref, null, true);
refRemoved(ref);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(ref, e);
Expand Down Expand Up @@ -3598,23 +3655,23 @@ public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
storageManager.updateDeliveryCount(reference);
}

int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
int maxDeliveries = cachedAddressSettings.getMaxDeliveryAttempts();
int deliveryCount = reference.getDeliveryCount();

// First check DLA
if (maxDeliveries > 0 && deliveryCount >= maxDeliveries) {
if (logger.isTraceEnabled()) {
logger.trace("Sending reference {} to DLA = {} since ref.getDeliveryCount={} and maxDeliveries={} from queue={}",
reference, addressSettings.getDeadLetterAddress(), reference.getDeliveryCount(), maxDeliveries, name);
reference, cachedAddressSettings.getDeadLetterAddress(), reference.getDeliveryCount(), maxDeliveries, name);
}
boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());
boolean dlaResult = sendToDeadLetterAddress(null, reference, cachedAddressSettings.getDeadLetterAddress());

return new Pair<>(false, dlaResult);
} else {
// Second check Redelivery Delay
long redeliveryDelay = addressSettings.getRedeliveryDelay();
long redeliveryDelay = cachedAddressSettings.getRedeliveryDelay();
if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
redeliveryDelay = calculateRedeliveryDelay(addressSettings, deliveryCount);
redeliveryDelay = calculateRedeliveryDelay(cachedAddressSettings, deliveryCount);

if (logger.isTraceEnabled()) {
logger.trace("Setting redeliveryDelay={} on reference={}", redeliveryDelay, reference);
Expand Down Expand Up @@ -3896,51 +3953,6 @@ private Message makeCopy(final MessageReference ref,
return LargeServerMessageImpl.checkLargeMessage(copy, storageManager);
}

private void expire(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception {
SimpleString expiryAddress = addressSettings.getExpiryAddress();

if (expiryAddress != null && expiryAddress.length() != 0) {

createExpiryResources();

Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress);

if (bindingList == null || bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
} else {
move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, null, null, delivering);
}
} else {
if (!printErrorExpiring) {
printErrorExpiring = true;
// print this only once
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name);
}

acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
}

if (server != null && server.hasBrokerMessagePlugins()) {
ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
if (expiryLogger == null) {
expiryLogger = new ExpiryLogger();
tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger);
tx.addOperation(expiryLogger);
}

expiryLogger.addExpiry(address, ref);
}

// potentially auto-delete this queue if this expired the last message
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
refCountForConsumers.check();
}
});
}

private class ExpiryLogger extends TransactionOperationAbstract {

List<Pair<SimpleString, MessageReference>> expiries = new LinkedList<>();
Expand All @@ -3964,7 +3976,7 @@ public void afterCommit(Transaction tx) {

@Override
public boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
return sendToDeadLetterAddress(tx, ref, addressSettings.getDeadLetterAddress());
return sendToDeadLetterAddress(tx, ref, cachedAddressSettings.getDeadLetterAddress());
}

private boolean sendToDeadLetterAddress(final Transaction tx,
Expand Down Expand Up @@ -3999,22 +4011,23 @@ private boolean sendToDeadLetterAddress(final Transaction tx,

private void createDeadLetterResources() throws Exception {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
createResources(addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix());
createResources(String.valueOf(getAddress()), addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix());
}

private void createExpiryResources() throws Exception {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
createResources(addressSettings.isAutoCreateExpiryResources(), addressSettings.getExpiryAddress(), addressSettings.getExpiryQueuePrefix(), addressSettings.getExpiryQueueSuffix());
private void createExpiryResources(String address, AddressSettings messageAddressSettings) throws Exception {
createResources(address, messageAddressSettings.isAutoCreateExpiryResources(), messageAddressSettings.getExpiryAddress(), messageAddressSettings.getExpiryQueuePrefix(), messageAddressSettings.getExpiryQueueSuffix());
}

private void createResources(boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception {
if (isAutoCreate && !getAddress().equals(destinationAddress)) {
private void createResources(String address, boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception {
if (isAutoCreate && !address.equals(destinationAddress)) {
if (destinationAddress != null && destinationAddress.length() != 0) {
SimpleString destinationQueueName = prefix.concat(getAddress()).concat(suffix);
SimpleString filter = SimpleString.of(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress()));
SimpleString destinationQueueName = prefix.concat(address).concat(suffix);
SimpleString filter = SimpleString.of(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, address));
try {
logger.debug("Creating Resource queue {}", destinationQueueName);
server.createQueue(QueueConfiguration.of(destinationQueueName).setAddress(destinationAddress).setFilterString(filter).setAutoCreated(true).setAutoCreateAddress(true), true);
} catch (ActiveMQQueueExistsException e) {
logger.debug("resource {} already existed, ignoring outcome", destinationQueueName);
// ignore
}
}
Expand Down Expand Up @@ -4770,7 +4783,7 @@ private long getPersistentSize(final MessageReference reference) {
}

private void configureSlowConsumerReaper() {
if (addressSettings == null || addressSettings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) {
if (cachedAddressSettings == null || cachedAddressSettings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) {
if (slowConsumerReaperFuture != null) {
slowConsumerReaperFuture.cancel(false);
slowConsumerReaperFuture = null;
Expand All @@ -4780,13 +4793,13 @@ private void configureSlowConsumerReaper() {
}
} else {
if (slowConsumerReaperRunnable == null) {
scheduleSlowConsumerReaper(addressSettings);
} else if (slowConsumerReaperRunnable.checkPeriod != addressSettings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != addressSettings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(addressSettings.getSlowConsumerPolicy())) {
scheduleSlowConsumerReaper(cachedAddressSettings);
} else if (slowConsumerReaperRunnable.checkPeriod != cachedAddressSettings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != cachedAddressSettings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(cachedAddressSettings.getSlowConsumerPolicy())) {
if (slowConsumerReaperFuture != null) {
slowConsumerReaperFuture.cancel(false);
slowConsumerReaperFuture = null;
}
scheduleSlowConsumerReaper(addressSettings);
scheduleSlowConsumerReaper(cachedAddressSettings);
}
}
}
Expand Down Expand Up @@ -4847,7 +4860,7 @@ private class AddressSettingsRepositoryListener implements HierarchicalRepositor

@Override
public void onChange() {
addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch());
cachedAddressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch());
checkDeadLetterAddressAndExpiryAddress();
configureSlowConsumerReaper();
}
Expand All @@ -4863,10 +4876,10 @@ private String getAddressSettingsMatch() {

private void checkDeadLetterAddressAndExpiryAddress() {
if (!Env.isTestEnv() && !internalQueue && !address.equals(server.getConfiguration().getManagementNotificationAddress())) {
if (addressSettings.getDeadLetterAddress() == null) {
if (cachedAddressSettings.getDeadLetterAddress() == null) {
ActiveMQServerLogger.LOGGER.AddressSettingsNoDLA(name);
}
if (addressSettings.getExpiryAddress() == null) {
if (cachedAddressSettings.getExpiryAddress() == null) {
ActiveMQServerLogger.LOGGER.AddressSettingsNoExpiryAddress(name);
}
}
Expand Down
Loading

0 comments on commit f140963

Please sign in to comment.