Skip to content

Commit

Permalink
ARTEMIS-4668 Moving AMQP Large Message file handling away from Netty …
Browse files Browse the repository at this point in the history
…thread
  • Loading branch information
clebertsuconic committed Mar 6, 2024
1 parent 5ce70f9 commit c83ed89
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public void withinContext(Runnable run) throws Exception {
}
}

public void execute(Runnable run) {
sessionExecutor.execute(run);
}

public void afterIO(IOCallback ioCallback) {
OperationContext context = recoverContext();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public void disableAutoRead() {
handler.requireHandler();
connectionCallback.getTransportConnection().setAutoRead(false);
handler.setReadable(false);
}

public void enableAutoRead() {
handler.requireHandler();
connectionCallback.getTransportConnection().setAutoRead(true);
getHandler().setReadable(true);
flush();
}

public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
public static final String AMQP_CONTAINER_ID = "amqp-container-id";
private static final FutureTask<Void> VOID_FUTURE = new FutureTask<>(() -> { }, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class AMQPLargeMessageReader implements MessageReader {

private final ProtonAbstractReceiver serverReceiver;

private AMQPLargeMessage currentMessage;
private volatile AMQPLargeMessage currentMessage;
private DeliveryAnnotations deliveryAnnotations;
private boolean closed = true;

Expand All @@ -50,14 +50,15 @@ public DeliveryAnnotations getDeliveryAnnotations() {
@Override
public void close() {
if (!closed) {
if (currentMessage != null) {
try {
currentMessage.deleteFile();
} catch (Throwable error) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
} finally {
currentMessage = null;
try {
AMQPLargeMessage localCurrentMessage = currentMessage;
if (localCurrentMessage != null) {
localCurrentMessage.deleteFile();
}
} catch (Throwable error) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
} finally {
currentMessage = null;
}

deliveryAnnotations = null;
Expand All @@ -82,34 +83,53 @@ public Message readBytes(Delivery delivery) throws Exception {
throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed");
}

final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();
try {
serverReceiver.connection.requireInHandler();

final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();

if (currentMessage == null) {
final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI();
final long id = sessionSPI.getStorageManager().generateID();
currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null,
sessionSPI.getCoreMessageObjectPools(),
sessionSPI.getStorageManager());
currentMessage.parseHeader(dataBuffer);

sessionSPI.getStorageManager().onLargeMessageCreate(id, currentMessage);
}
if (currentMessage == null) {
final long id = sessionSPI.getStorageManager().generateID();
AMQPLargeMessage localCurrentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager());
localCurrentMessage.parseHeader(dataBuffer);

sessionSPI.getStorageManager().onLargeMessageCreate(id, localCurrentMessage);
currentMessage = localCurrentMessage;
}

serverReceiver.getConnection().disableAutoRead();

currentMessage.addBytes(dataBuffer);
boolean partial = delivery.isPartial();

final AMQPLargeMessage result;
sessionSPI.execute(() -> addBytes(delivery, dataBuffer, partial));

if (!delivery.isPartial()) {
currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
result = currentMessage;
// We don't want a close to delete the file now, we've released the resources.
currentMessage = null;
deliveryAnnotations = result.getDeliveryAnnotations();
} else {
result = null;
return null;
} catch (Exception e) {
// if an exception happened we must enable it back
serverReceiver.getConnection().enableAutoRead();
throw e;
}
}

private void addBytes(Delivery delivery, ReadableBuffer dataBuffer, boolean isPartial) {
final AMQPLargeMessage localCurrentMessage = currentMessage;

try {
localCurrentMessage.addBytes(dataBuffer);

return result;
if (!isPartial) {
localCurrentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
// We don't want a close to delete the file now, we've released the resources.
currentMessage = null;
serverReceiver.connection.runNow(() -> serverReceiver.onMessageComplete(delivery, localCurrentMessage, localCurrentMessage.getDeliveryAnnotations()));
}
} catch (Throwable e) {
serverReceiver.onExceptionWhileReading(e);
} finally {
serverReceiver.connection.runNow(serverReceiver.getConnection()::enableAutoRead);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class AMQPLargeMessageWriter implements MessageWriter {

private MessageReference reference;
private AMQPLargeMessage message;

private LargeBodyReader largeBodyReader;

private Delivery delivery;
private long position;
private boolean initialPacketHandled;
Expand All @@ -81,33 +84,59 @@ public boolean isWriting() {
public void close() {
if (!closed) {
try {
try {
if (largeBodyReader != null) {
largeBodyReader.close();
}
} catch (Exception e) {
// if we get an error only at this point, there's nothing else we could do other than log.warn
logger.warn("{}", e.getMessage(), e);
}
if (message != null) {
message.usageDown();
}
} finally {
reset(true);
resetClosed();
}
}
}

@Override
public AMQPLargeMessageWriter open() {
public AMQPLargeMessageWriter open(MessageReference reference) {
if (!closed) {
throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed");
}

reset(false);
this.reference = reference;
this.message = (AMQPLargeMessage) reference.getMessage();
this.message.usageUp();

try {
largeBodyReader = message.getLargeBodyReader();
largeBodyReader.open();
} catch (Exception e) {
serverSender.reportDeliveryError(this, reference, e);
}

resetOpen();

return this;
}

private void reset(boolean closedState) {
private void resetClosed() {
message = null;
reference = null;
delivery = null;
largeBodyReader = null;
position = 0;
initialPacketHandled = false;
closed = closedState;
closed = true;
}

private void resetOpen() {
position = 0;
initialPacketHandled = false;
closed = false;
}

@Override
Expand All @@ -121,17 +150,15 @@ public void writeBytes(MessageReference messageReference) {
throw new IllegalStateException("Cannot write to an AMQP Large Message Writer that has been closed");
}

this.reference = messageReference;
this.message = (AMQPLargeMessage) messageReference.getMessage();

if (sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()) != null) {
// an interceptor rejected the delivery
// since we opened the message as part of the queue executor we must close it now
close();
return;
}

this.delivery = serverSender.createDelivery(messageReference, (int) this.message.getMessageFormat());

message.usageUp();

tryDelivering();
}

Expand All @@ -150,15 +177,14 @@ private void tryDelivering() {
final ByteBuf frameBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize);
final NettyReadable frameView = new NettyReadable(frameBuffer);

try (LargeBodyReader context = message.getLargeBodyReader()) {
context.open();
context.position(position);
long bodySize = context.getSize();
try {
largeBodyReader.position(position);
long bodySize = largeBodyReader.getSize();
// materialize it so we can use its internal NIO buffer
frameBuffer.ensureWritable(frameSize);

if (!initialPacketHandled && protonSender.getLocalState() != EndpointState.CLOSED) {
if (!deliverInitialPacket(context, frameBuffer)) {
if (!deliverInitialPacket(largeBodyReader, frameBuffer)) {
return;
}

Expand All @@ -171,7 +197,7 @@ private void tryDelivering() {
}
frameBuffer.clear();

final int readSize = context.readInto(frameBuffer.internalNioBuffer(0, frameSize));
final int readSize = largeBodyReader.readInto(frameBuffer.internalNioBuffer(0, frameSize));

frameBuffer.writerIndex(readSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void close() {
}

@Override
public AMQPTunneledCoreLargeMessageWriter open() {
public AMQPTunneledCoreLargeMessageWriter open(MessageReference reference) {
if (state != State.CLOSED) {
throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public interface MessageReader {
* and is no longer partial the readBytes method will return the decoded message
* for dispatch.
*
* Notice that asynchronous Readers will never return the Message but will rather call a complete operation on the
* Server Receiver.
*
* @param delivery
* The delivery that has pending incoming bytes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ default void close() {
* be called on every handler by the sender context as it doesn't know which instances need
* opened.
*/
default MessageWriter open() {
default MessageWriter open(MessageReference reference) {
// Default for stateless handlers is to do nothing here.
return this;
}
Expand Down
Loading

0 comments on commit c83ed89

Please sign in to comment.