Skip to content

Commit

Permalink
Improve writer buffer overflow messaging and handling
Browse files Browse the repository at this point in the history
  • Loading branch information
pnv1 committed Dec 14, 2023
1 parent 7bfc71c commit 0d70af8
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,22 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
logger.info("[{}] Message queue in-flight limit of {} reached. Putting the message into incoming " +
"waiting queue", id, settings.getMaxSendBufferMessagesCount());
}
} else if (availableSizeBytes <= message.getMessage().getData().length) {
} else if (availableSizeBytes < message.getMessage().getData().length) {
if (instant) {
String errorMessage = "[" + id +
"] Rejecting a message due to reaching message queue size limit of " +
settings.getMaxSendBufferMemorySize() + " bytes. Buffer currently has " +
currentInFlightCount + " messages with " + availableSizeBytes +
String errorMessage = "[" + id + "] Rejecting a message of " +
message.getMessage().getData().length +
" bytes: not enough space in message queue. Buffer currently has " + currentInFlightCount +
" messages with " + availableSizeBytes + " / " + settings.getMaxSendBufferMemorySize() +
" bytes available";
logger.info(errorMessage);
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(new QueueOverflowException(errorMessage));
return result;
} else {
logger.info("[{}] Message queue size limit of {} bytes reached. Putting the message into incoming" +
" waiting queue. Buffer currently has {} messages with {} bytes available", id,
settings.getMaxSendBufferMemorySize(), currentInFlightCount, availableSizeBytes);
logger.info("[{}] Can't accept a message of {} bytes into message queue. Buffer currently has " +
"{} messages with {} / {} bytes available. Putting the message into incoming " +
"waiting queue.", id, message.getMessage().getData().length, currentInFlightCount,
availableSizeBytes, settings.getMaxSendBufferMemorySize());
}
} else if (incomingQueue.isEmpty()) {
acceptMessageIntoSendingQueue(message);
Expand Down

0 comments on commit 0d70af8

Please sign in to comment.