diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 2297109b14836..a458f06d00fcc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -567,6 +567,37 @@ public void testMetadataTopicExpiry() throws Exception { assertTrue(future.isDone(), "Request should be completed"); } + @Test + public void senderThreadShouldNotBlockWhenBackingOffAndAddingPartitionsToTxn() { + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); + TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions); + + setupWithTransactionState(txnManager); + doInitTransactions(txnManager, producerIdAndEpoch); + + int backoffTimeMs = 10; + long now = time.milliseconds(); + Node nodeToThrottle = metadata.fetch().nodeById(0); + // client.throttle(nodeToThrottle, backoffTimeMs); + client.backoff(nodeToThrottle, backoffTimeMs); + assertTrue(client.isConnected(nodeToThrottle.idString())); + client.disconnect(nodeToThrottle.idString()); + assertFalse(client.isConnected(nodeToThrottle.idString())); + + // Verify node is throttled about 10ms. In real-life Apache Kafka, we observe that this can happen + // as done above (with a disconnect and a backoff) or if the producer receives a response with + // throttleTimeMs > 0. + long currentPollDelay = client.pollDelayMs(nodeToThrottle, now); + assertTrue(currentPollDelay > 0); + assertTrue(currentPollDelay <= backoffTimeMs); + + txnManager.beginTransaction(); + txnManager.maybeAddPartition(tp0); + + // sender.runOnce(); + } + @Test public void testNodeLatencyStats() throws Exception { try (Metrics m = new Metrics()) {