Skip to content

Commit

Permalink
ARTEMIS-5010 Addressing deadlock on AckManager
Browse files Browse the repository at this point in the history
AckManager.flush would hold a lock on ackManager, There was a possible deadlock with MirrorTarget:

Thread 1:

        at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.addRetry(AckManager.java:393)
        - waiting to lock <0x00000007990a13e8> (a org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager)
        at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.ack(AckManager.java:418)
        at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.performAck(AMQPMirrorControllerTarget.java:479)
        at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.postAcknowledge(AMQPMirrorControllerTarget.java:461)
        at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.actualDelivery(AMQPMirrorControllerTarget.java:318)
        at org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver.onMessageComplete(ProtonAbstractReceiver.java:361)

Thread 2:

        at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
        - parking to wait for  <0x000000079de0af38> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.parkNanos([email protected]/LockSupport.java:234)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos([email protected]/AbstractQueuedSynchronizer.java:1079)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos([email protected]/AbstractQueuedSynchronizer.java:1369)
        at java.util.concurrent.CountDownLatch.await([email protected]/CountDownLatch.java:278)
        at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.flush(AMQPMirrorControllerTarget.java:230)
        at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager$$Lambda$601/0x00000008005c3040.accept(Unknown Source)
        at java.lang.Iterable.forEach([email protected]/Iterable.java:75)
        at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.flushMirrorTargets(AckManager.java:184)
        - locked <0x00000007990a13e8> (a org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager)
        at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.initRetry(AckManager.java:162)
  • Loading branch information
clebertsuconic committed Sep 4, 2024
1 parent 74691b8 commit 7fb9aa5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.Set;
Expand Down Expand Up @@ -178,9 +180,14 @@ public synchronized void unregisterMirror(AMQPMirrorControllerTarget mirrorTarge
this.mirrorControllerTargets.remove(mirrorTarget);
}

private synchronized void flushMirrorTargets() {
private void flushMirrorTargets() {
logger.debug("scanning and flushing mirror targets");
mirrorControllerTargets.forEach(AMQPMirrorControllerTarget::flush);
List<AMQPMirrorControllerTarget> targetCopy = copyTargets();
targetCopy.forEach(AMQPMirrorControllerTarget::flush);
}

private synchronized List<AMQPMirrorControllerTarget> copyTargets() {
return new ArrayList<>(mirrorControllerTargets);
}

// Sort the ACK list by address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ private static void createMirroredServer(boolean paging, String serverName,
File brokerXml = new File(serverLocation, "/etc/broker.xml");
assertTrue(brokerXml.exists());
assertTrue(FileUtil.findReplace(brokerXml, "<page-size-bytes>10M</page-size-bytes>", "<page-size-bytes>100K</page-size-bytes>"));
assertTrue(FileUtil.findReplace(brokerXml, "amqpDuplicateDetection=true;", "amqpDuplicateDetection=true;ackManagerFlushTimeout=" + TimeUnit.MINUTES.toMillis(10) + ";"));

if (TRACE_LOGS) {
replaceLogs(serverLocation);
Expand Down Expand Up @@ -271,6 +272,7 @@ private static void createMirroredBackupServer(boolean paging, String serverName

File brokerXml = new File(serverLocation, "/etc/broker.xml");
assertTrue(brokerXml.exists());
assertTrue(FileUtil.findReplace(brokerXml, "amqpDuplicateDetection=true;", "amqpDuplicateDetection=true;ackManagerFlushTimeout=" + TimeUnit.MINUTES.toMillis(10) + ";"));

if (TRACE_LOGS) {
replaceLogs(serverLocation);
Expand Down Expand Up @@ -369,7 +371,7 @@ private void testQuickACK(final String protocol) throws Exception {

final int startAt = 300;
final int killAt = 800;
final int totalMessages = 1000;
final int totalMessages = 1_600;
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";

try (Connection connection = connectionFactoryDC1A.createConnection()) {
Expand Down

0 comments on commit 7fb9aa5

Please sign in to comment.