diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java index 2d6744f7435..06366052e32 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java @@ -81,29 +81,6 @@ public void run() { }); } - @Test - public void testTransactional() throws Throwable { - runTest(new TestRunner() { - volatile boolean running = false; - - @Override - public void run() { - try { - assertFalse(running); - running = true; - try { - doTestTransactional(this); - } finally { - running = false; - } - } catch (Throwable e) { - logger.error("Test failed", e); - addException(e); - } - } - }); - } - abstract class TestRunner implements Runnable { volatile boolean failed; @@ -323,228 +300,6 @@ private void doTestNonTransactional(final TestRunner runner) throws Exception { } } - private void doTestTransactional(final TestRunner runner) throws Throwable { - // For duplication detection - int executionId = 0; - - while (!runner.isFailed()) { - ClientSession session = null; - - executionId++; - - logger.debug("#test doTestTransactional starting now. Execution {}", executionId); - - try { - - boolean retry = false; - - final int numMessages = 1000; - - int retryCreateSession = 4; - //session creation may fail in the middle of failover - while (session == null) { - try { - //if autoCommitSends is false, send will be non-blocking - session = sf.createSession(true, false); - } catch (ActiveMQException e) { - if (retryCreateSession == 0) { - throw e; - } - retryCreateSession--; - Thread.sleep(2000); - } - } - - listener = new CountDownSessionFailureListener(session); - session.addFailureListener(listener); - - do { - if (runner.isFailed()) { - //test ends, return - return; - } - try { - ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS); - - for (int i = 0; i < numMessages; i++) { - ClientMessage message = session.createMessage(true); - - message.getBodyBuffer().writeString("message" + i); - - message.putIntProperty("counter", i); - - message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.of("id:" + i + - ",exec:" + - executionId)); - - addPayload(message); - - logger.debug("Sending message {}", message); - - producer.send(message); - } - - logger.debug("Sending commit"); - session.commit(); - - retry = false; - } catch (ActiveMQDuplicateIdException die) { - logAndSystemOut("#test duplicate id rejected on sending"); - break; - } catch (ActiveMQTransactionRolledBackException trbe) { - logger.debug("#test transaction rollback retrying on sending"); - // OK - retry = true; - } catch (ActiveMQUnBlockedException ube) { - logger.debug("#test transaction rollback retrying on sending"); - // OK - retry = true; - } catch (ActiveMQTransactionOutcomeUnknownException toue) { - logger.debug("#test transaction rollback retrying on sending"); - // OK - retry = true; - } catch (ActiveMQObjectClosedException closedException) { - logger.debug("#test producer closed, retrying on sending..."); - Thread.sleep(2000); - // OK - retry = true; - } catch (ActiveMQConnectionTimedOutException timedoutEx) { - //commit timedout because of server crash. retry - //will be ok after failover - Thread.sleep(2000); - retry = true; - } catch (ActiveMQException e) { - logger.debug("#test Exception {}", e.getMessage(), e); - throw e; - } - } - while (retry); - - logAndSystemOut("#test Finished sending, starting consumption now"); - - boolean blocked = false; - - retry = false; - - ClientConsumer consumer = null; - do { - if (runner.isFailed()) { - //test ends, return - return; - } - ArrayList msgs = new ArrayList<>(); - try { - int retryCreate = 4; - while (consumer == null) { - try { - consumer = session.createConsumer(FailoverTestBase.ADDRESS); - } catch (ActiveMQObjectClosedException closedEx) { - //the session may just crashed and failover not done yet - if (retryCreate == 0) { - throw closedEx; - } - Thread.sleep(2000); - retryCreate--; - } - } - session.start(); - - for (int i = 0; i < numMessages; i++) { - logger.debug("Consumer receiving message {}", i); - - ClientMessage message = consumer.receive(60000); - if (message == null) { - break; - } - - logger.debug("Received message {}", message); - - int count = message.getIntProperty("counter"); - - if (count != i) { - logger.warn("count was received out of order, {}!={}", count, i); - } - - msgs.add(count); - - message.acknowledge(); - } - - logger.debug("#test commit"); - try { - session.commit(); - } catch (ActiveMQTransactionRolledBackException trbe) { - //we know the tx has been rolled back so we just consume again - retry = true; - continue; - } catch (ActiveMQException e) { - // This could eventually happen - // We will get rid of this when we implement 2 phase commit on failover - logger.warn("exception during commit, continue {}", e.getMessage(), e); - continue; - } - - try { - if (blocked) { - assertTrue(msgs.size() == 0 || msgs.size() == numMessages, "msgs.size is expected to be 0 or " + numMessages + " but it was " + msgs.size()); - } else { - assertTrue(msgs.size() == numMessages, "msgs.size is expected to be " + numMessages + " but it was " + msgs.size()); - } - } catch (Throwable e) { - if (logger.isDebugEnabled()) { - String dumpMessage = "Thread dump, messagesReceived = " + msgs.size(); - logger.debug(threadDump(dumpMessage)); - } - logAndSystemOut(e.getMessage() + " messages received"); - for (Integer msg : msgs) { - logAndSystemOut(msg.toString()); - } - throw e; - } - - int i = 0; - for (Integer msg : msgs) { - assertEquals(i++, (int) msg); - } - - retry = false; - blocked = false; - } catch (ActiveMQTransactionRolledBackException trbe) { - logAndSystemOut("Transaction rolled back with " + msgs.size(), trbe); - // TODO: https://jira.jboss.org/jira/browse/HORNETQ-369 - // ATM RolledBack exception is being called with the transaction is committed. - // the test will fail if you remove this next line - blocked = true; - retry = true; - } catch (ActiveMQTransactionOutcomeUnknownException tou) { - logAndSystemOut("Transaction rolled back with " + msgs.size(), tou); - // TODO: https://jira.jboss.org/jira/browse/HORNETQ-369 - // ATM RolledBack exception is being called with the transaction is committed. - // the test will fail if you remove this next line - blocked = true; - retry = true; - } catch (ActiveMQUnBlockedException ube) { - logAndSystemOut("Unblocked with " + msgs.size(), ube); - // TODO: https://jira.jboss.org/jira/browse/HORNETQ-369 - // This part of the test is never being called. - blocked = true; - retry = true; - } catch (ActiveMQException e) { - logAndSystemOut(e.getMessage(), e); - throw e; - } - } - while (retry); - } finally { - if (session != null) { - session.close(); - } - } - - listener = null; - } - } - @Override protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { return TransportConfigurationUtils.getInVMAcceptor(live);