From c83ed8957d9d7f06bb29c0fce563fe2e3462993e Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 5 Mar 2024 16:17:50 -0500 Subject: [PATCH] ARTEMIS-4668 Moving AMQP Large Message file handling away from Netty thread --- .../amqp/broker/AMQPSessionCallback.java | 4 + .../amqp/proton/AMQPConnectionContext.java | 13 ++++ .../amqp/proton/AMQPLargeMessageReader.java | 78 ++++++++++++------- .../amqp/proton/AMQPLargeMessageWriter.java | 58 ++++++++++---- .../AMQPTunneledCoreLargeMessageWriter.java | 2 +- .../protocol/amqp/proton/MessageReader.java | 3 + .../protocol/amqp/proton/MessageWriter.java | 2 +- .../amqp/proton/ProtonAbstractReceiver.java | 65 +++++++++++----- .../proton/ProtonServerSenderContext.java | 2 +- .../amqp/proton/handler/ProtonHandler.java | 18 +++-- ...MQPTunneledCoreLargeMessageWriterTest.java | 7 +- .../AMQPTunneledCoreMessageWriterTest.java | 5 +- 12 files changed, 180 insertions(+), 77 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index c8f081b30bb..c67cac0a1df 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -172,6 +172,10 @@ public void withinContext(Runnable run) throws Exception { } } + public void execute(Runnable run) { + sessionExecutor.execute(run); + } + public void afterIO(IOCallback ioCallback) { OperationContext context = recoverContext(); try { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index b84823ee2d7..e1f6fe192c2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -94,6 +94,19 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public void disableAutoRead() { + handler.requireHandler(); + connectionCallback.getTransportConnection().setAutoRead(false); + handler.setReadable(false); + } + + public void enableAutoRead() { + handler.requireHandler(); + connectionCallback.getTransportConnection().setAutoRead(true); + getHandler().setReadable(true); + flush(); + } + public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); public static final String AMQP_CONTAINER_ID = "amqp-container-id"; private static final FutureTask VOID_FUTURE = new FutureTask<>(() -> { }, null); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java index 6a44c6339f7..63af7b1418e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java @@ -34,7 +34,7 @@ public class AMQPLargeMessageReader implements MessageReader { private final ProtonAbstractReceiver serverReceiver; - private AMQPLargeMessage currentMessage; + private volatile AMQPLargeMessage currentMessage; private DeliveryAnnotations deliveryAnnotations; private boolean closed = true; @@ -50,14 +50,15 @@ public DeliveryAnnotations getDeliveryAnnotations() { @Override public void close() { if (!closed) { - if (currentMessage != null) { - try { - currentMessage.deleteFile(); - } catch (Throwable error) { - ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); - } finally { - currentMessage = null; + try { + AMQPLargeMessage localCurrentMessage = currentMessage; + if (localCurrentMessage != null) { + localCurrentMessage.deleteFile(); } + } catch (Throwable error) { + ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); + } finally { + currentMessage = null; } deliveryAnnotations = null; @@ -82,34 +83,53 @@ public Message readBytes(Delivery delivery) throws Exception { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } - final Receiver receiver = ((Receiver) delivery.getLink()); - final ReadableBuffer dataBuffer = receiver.recv(); + try { + serverReceiver.connection.requireInHandler(); + + final Receiver receiver = ((Receiver) delivery.getLink()); + final ReadableBuffer dataBuffer = receiver.recv(); - if (currentMessage == null) { final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); - final long id = sessionSPI.getStorageManager().generateID(); - currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, - sessionSPI.getCoreMessageObjectPools(), - sessionSPI.getStorageManager()); - currentMessage.parseHeader(dataBuffer); - sessionSPI.getStorageManager().onLargeMessageCreate(id, currentMessage); - } + if (currentMessage == null) { + final long id = sessionSPI.getStorageManager().generateID(); + AMQPLargeMessage localCurrentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); + localCurrentMessage.parseHeader(dataBuffer); + + sessionSPI.getStorageManager().onLargeMessageCreate(id, localCurrentMessage); + currentMessage = localCurrentMessage; + } + + serverReceiver.getConnection().disableAutoRead(); - currentMessage.addBytes(dataBuffer); + boolean partial = delivery.isPartial(); - final AMQPLargeMessage result; + sessionSPI.execute(() -> addBytes(delivery, dataBuffer, partial)); - if (!delivery.isPartial()) { - currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); - result = currentMessage; - // We don't want a close to delete the file now, we've released the resources. - currentMessage = null; - deliveryAnnotations = result.getDeliveryAnnotations(); - } else { - result = null; + return null; + } catch (Exception e) { + // if an exception happened we must enable it back + serverReceiver.getConnection().enableAutoRead(); + throw e; } + } + + private void addBytes(Delivery delivery, ReadableBuffer dataBuffer, boolean isPartial) { + final AMQPLargeMessage localCurrentMessage = currentMessage; + + try { + localCurrentMessage.addBytes(dataBuffer); - return result; + if (!isPartial) { + localCurrentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); + // We don't want a close to delete the file now, we've released the resources. + currentMessage = null; + serverReceiver.connection.runNow(() -> serverReceiver.onMessageComplete(delivery, localCurrentMessage, localCurrentMessage.getDeliveryAnnotations())); + } + } catch (Throwable e) { + serverReceiver.onExceptionWhileReading(e); + } finally { + serverReceiver.connection.runNow(serverReceiver.getConnection()::enableAutoRead); + } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java index f6ef4ad20b2..d6fccce451c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java @@ -59,6 +59,9 @@ public class AMQPLargeMessageWriter implements MessageWriter { private MessageReference reference; private AMQPLargeMessage message; + + private LargeBodyReader largeBodyReader; + private Delivery delivery; private long position; private boolean initialPacketHandled; @@ -81,33 +84,59 @@ public boolean isWriting() { public void close() { if (!closed) { try { + try { + if (largeBodyReader != null) { + largeBodyReader.close(); + } + } catch (Exception e) { + // if we get an error only at this point, there's nothing else we could do other than log.warn + logger.warn("{}", e.getMessage(), e); + } if (message != null) { message.usageDown(); } } finally { - reset(true); + resetClosed(); } } } @Override - public AMQPLargeMessageWriter open() { + public AMQPLargeMessageWriter open(MessageReference reference) { if (!closed) { throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed"); } - reset(false); + this.reference = reference; + this.message = (AMQPLargeMessage) reference.getMessage(); + this.message.usageUp(); + + try { + largeBodyReader = message.getLargeBodyReader(); + largeBodyReader.open(); + } catch (Exception e) { + serverSender.reportDeliveryError(this, reference, e); + } + + resetOpen(); return this; } - private void reset(boolean closedState) { + private void resetClosed() { message = null; reference = null; delivery = null; + largeBodyReader = null; position = 0; initialPacketHandled = false; - closed = closedState; + closed = true; + } + + private void resetOpen() { + position = 0; + initialPacketHandled = false; + closed = false; } @Override @@ -121,17 +150,15 @@ public void writeBytes(MessageReference messageReference) { throw new IllegalStateException("Cannot write to an AMQP Large Message Writer that has been closed"); } - this.reference = messageReference; - this.message = (AMQPLargeMessage) messageReference.getMessage(); - if (sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()) != null) { + // an interceptor rejected the delivery + // since we opened the message as part of the queue executor we must close it now + close(); return; } this.delivery = serverSender.createDelivery(messageReference, (int) this.message.getMessageFormat()); - message.usageUp(); - tryDelivering(); } @@ -150,15 +177,14 @@ private void tryDelivering() { final ByteBuf frameBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize); final NettyReadable frameView = new NettyReadable(frameBuffer); - try (LargeBodyReader context = message.getLargeBodyReader()) { - context.open(); - context.position(position); - long bodySize = context.getSize(); + try { + largeBodyReader.position(position); + long bodySize = largeBodyReader.getSize(); // materialize it so we can use its internal NIO buffer frameBuffer.ensureWritable(frameSize); if (!initialPacketHandled && protonSender.getLocalState() != EndpointState.CLOSED) { - if (!deliverInitialPacket(context, frameBuffer)) { + if (!deliverInitialPacket(largeBodyReader, frameBuffer)) { return; } @@ -171,7 +197,7 @@ private void tryDelivering() { } frameBuffer.clear(); - final int readSize = context.readInto(frameBuffer.internalNioBuffer(0, frameSize)); + final int readSize = largeBodyReader.readInto(frameBuffer.internalNioBuffer(0, frameSize)); frameBuffer.writerIndex(readSize); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java index 94b118db128..04a3ad2f1ba 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java @@ -129,7 +129,7 @@ public void close() { } @Override - public AMQPTunneledCoreLargeMessageWriter open() { + public AMQPTunneledCoreLargeMessageWriter open(MessageReference reference) { if (state != State.CLOSED) { throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed"); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java index 8b4db7a04ee..020dc6d1422 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java @@ -49,6 +49,9 @@ public interface MessageReader { * and is no longer partial the readBytes method will return the decoded message * for dispatch. * + * Notice that asynchronous Readers will never return the Message but will rather call a complete operation on the + * Server Receiver. + * * @param delivery * The delivery that has pending incoming bytes. */ diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java index 8afac2c8b37..911ffcc5905 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java @@ -90,7 +90,7 @@ default void close() { * be called on every handler by the sender context as it doesn't know which instances need * opened. */ - default MessageWriter open() { + default MessageWriter open(MessageReference reference) { // Default for stateless handlers is to do nothing here. return this; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java index 62dc963634f..9dbfe5406da 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import java.lang.invoke.MethodHandles; + import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -26,12 +28,17 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.transaction.TransactionalState; +import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class ProtonAbstractReceiver extends ProtonInitializable implements ProtonDeliveryHandler { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected final AMQPConnectionContext connection; protected final AMQPSessionContext protonSession; @@ -302,8 +309,6 @@ protected MessageReader trySelectMessageReader(Receiver receiver, Delivery deliv public final void onMessage(Delivery delivery) throws ActiveMQAMQPException { connection.requireInHandler(); - final Receiver receiver = ((Receiver) delivery.getLink()); - if (receiver.current() != delivery) { return; } @@ -320,28 +325,41 @@ public final void onMessage(Delivery delivery) throws ActiveMQAMQPException { return; } - final Message message = messageReader.readBytes(delivery); - - if (message != null) { - // Fetch this before the close of the reader as that will clear any read message - // delivery annotations. - final DeliveryAnnotations deliveryAnnotations = messageReader.getDeliveryAnnotations(); + Message completeMessage; + if ((completeMessage = messageReader.readBytes(delivery)) != null) { + // notice the AMQP Large Message Reader will always return null + // and call the onMessageComplete directly + // since that happens asynchronously + onMessageComplete(delivery, completeMessage, messageReader.getDeliveryAnnotations()); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); + } + } - this.messageReader.close(); - this.messageReader = null; + public void onMessageComplete(Delivery delivery, + Message message, DeliveryAnnotations deliveryAnnotations) { + connection.requireInHandler(); - receiver.advance(); + try { + receiver.advance(); - Transaction tx = null; - if (delivery.getRemoteState() instanceof TransactionalState) { - TransactionalState txState = (TransactionalState) delivery.getRemoteState(); + Transaction tx = null; + if (delivery.getRemoteState() instanceof TransactionalState) { + TransactionalState txState = (TransactionalState) delivery.getRemoteState(); + try { tx = this.sessionSPI.getTransaction(txState.getTxnId(), false); + } catch (Exception e) { + this.onExceptionWhileReading(e); } - - actualDelivery(message, delivery, deliveryAnnotations, receiver, tx); } - } catch (Exception e) { - throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); + + actualDelivery(message, delivery, deliveryAnnotations, receiver, tx); + } finally { + // reader is complete, we give it up now + this.messageReader.close(); + this.messageReader = null; } } @@ -351,6 +369,17 @@ public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { closeCurrentReader(); } + public void onExceptionWhileReading(Throwable e) { + logger.warn(e.getMessage(), e); + connection.runNow(() -> { + // setting it enabled just in case a large message reader disabled it + connection.enableAutoRead(); + ErrorCondition ec = new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()); + connection.close(ec); + connection.flush(); + }); + } + @Override public void close(ErrorCondition condition) throws ActiveMQAMQPException { receiver.setCondition(condition); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 9359f7fe94b..ad0d42c20e2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -483,7 +483,7 @@ public int deliverMessage(final MessageReference messageReference, final ServerC credits--; } - final MessageWriter messageWriter = controller.selectOutgoingMessageWriter(this, messageReference).open(); + final MessageWriter messageWriter = controller.selectOutgoingMessageWriter(this, messageReference).open(messageReference); // Preserve for hasCredits to check for busy state and possible abort on close this.messageWriter = messageWriter; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 5bd3537ef54..17f17b0c706 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -95,6 +95,8 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { boolean flushInstantly = false; + volatile boolean readable = true; + /** afterFlush and afterFlushSet properties * are set by afterFlush methods. * This is to be called after the flush loop. @@ -108,6 +110,15 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { private Runnable afterFlush; protected Set afterFlushSet; + public boolean isReadable() { + return readable; + } + + public ProtonHandler setReadable(boolean readable) { + this.readable = readable; + return this; + } + @Override public void initialize() throws Exception { initialized = true; @@ -381,11 +392,6 @@ public void close(ErrorCondition errorCondition, AMQPConnectionContext connectio flush(); }); - /*try { - Thread.sleep(1000); - } catch (Exception e) { - e.printStackTrace(); - } */ // this needs to be done in two steps // we first flush what we have to the client // after flushed, we close the local connection @@ -562,7 +568,7 @@ private void dispatch() { AuditLogger.setRemoteAddress(h.getRemoteAddress()); } } - while ((ev = collector.peek()) != null) { + while (isReadable() && (ev = collector.peek()) != null) { for (EventHandler h : handlers) { logger.trace("Handling {} towards {}", ev, h); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java index 44e85d358db..c686a593848 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java @@ -55,6 +55,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.Spy; @@ -150,7 +151,7 @@ public void testNoWritesWhenProtonSenderIsLocallyClosed() throws Exception { when(protonSender.getLocalState()).thenReturn(EndpointState.CLOSED); - writer.open(); + writer.open(Mockito.mock(MessageReference.class)); try { writer.writeBytes(reference); @@ -177,7 +178,7 @@ public void testMessageEncodingWrittenToDeliveryWithAnnotations() throws Excepti private void doTestMessageEncodingWrittenToDeliveryWithAnnotations(boolean deliveryAnnotations) throws Exception { AMQPTunneledCoreLargeMessageWriter writer = new AMQPTunneledCoreLargeMessageWriter(serverSender); - writer.open(); + writer.open(Mockito.mock(MessageReference.class)); final ByteBuf expectedEncoding = Unpooled.buffer(); @@ -276,7 +277,7 @@ private void doTestMessageEncodingWrittenToDeliveryWithAnnotations(boolean deliv public void testLargeMessageUsageLoweredOnCloseWhenWriteNotCompleted() throws Exception { AMQPTunneledCoreLargeMessageWriter writer = new AMQPTunneledCoreLargeMessageWriter(serverSender); - writer.open(); + writer.open(Mockito.mock(MessageReference.class)); when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE); when(protonDelivery.isPartial()).thenReturn(true); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java index 9245fa095bd..7a09c0911be 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java @@ -52,6 +52,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.Spy; @@ -110,7 +111,7 @@ public void testNoWritesWhenProtonSenderIsLocallyClosed() throws Exception { when(protonSender.getLocalState()).thenReturn(EndpointState.CLOSED); - writer.open(); + writer.open(Mockito.mock(MessageReference.class)); try { writer.writeBytes(reference); @@ -171,7 +172,7 @@ private void doTestMessageEncodingWrittenToDeliveryWithAnnotations(boolean deliv return null; }).when(message).persist(any(ActiveMQBuffer.class)); - writer.open(); + writer.open(Mockito.mock(MessageReference.class)); try { writer.writeBytes(reference);