Skip to content

Commit

Permalink
test that passes on my branch and fails on trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
coltmcnealy-lh committed Oct 19, 2024
1 parent ec9dd11 commit 05200f7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
11 changes: 8 additions & 3 deletions clients/src/test/java/org/apache/kafka/clients/MockClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public FutureResponse(Node node,

private int correlation;
private Runnable wakeupHook;
private boolean advanceTimeDuringPoll;
private final Time time;
private final MockMetadataUpdater metadataUpdater;
private final Map<String, ConnectionState> connections = new HashMap<>();
Expand Down Expand Up @@ -141,6 +142,10 @@ public long pollDelayMs(Node node, long now) {
return connectionState(node.idString()).pollDelayMs(now);
}

public void setAdvanceTimeDuringPoll(boolean advanceTimeDuringPoll) {
this.advanceTimeDuringPoll = advanceTimeDuringPoll;
}

public void backoff(Node node, long durationMs) {
connectionState(node.idString()).backoff(time.milliseconds() + durationMs);
}
Expand Down Expand Up @@ -336,9 +341,9 @@ public List<ClientResponse> poll(long timeoutMs, long now) {
copy.add(response);
}

if (copy.isEmpty()) {
// Simulate time advancing. If no responses are received, then we know that
// we waited for the whole timeoutMs.
// In real life, if poll() is called and we get to the end with no responses,
// time equal to timeoutMs would have passed.
if (advanceTimeDuringPoll) {
time.sleep(timeoutMs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,24 +568,27 @@ public void testMetadataTopicExpiry() throws Exception {
}

@Test
public void senderThreadShouldNotBlockWhenBackingOffAndAddingPartitionsToTxn() {
public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() {
// We want MockClient#poll() to advance time so that eventually the backoff expires.
client.setAdvanceTimeDuringPoll(true);

ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);

setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);

int backoffTimeMs = 10;
int throttleTimeMs = 1000;
long startTime = time.milliseconds();
Node nodeToThrottle = metadata.fetch().nodeById(0);
client.throttle(nodeToThrottle, backoffTimeMs);
client.throttle(nodeToThrottle, throttleTimeMs);

// Verify node is throttled about 10ms. In real-life Apache Kafka, we observe that this can happen
// Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen
// as done above by throttling or with a disconnect / backoff.
long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime);
assertTrue(currentPollDelay > 0);
assertTrue(currentPollDelay <= backoffTimeMs);
assertTrue(currentPollDelay <= throttleTimeMs);

txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
Expand All @@ -598,6 +601,8 @@ public void senderThreadShouldNotBlockWhenBackingOffAndAddingPartitionsToTxn() {

// It should have blocked roughly only the backoffTimeMs and some change.
assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT);

client.setAdvanceTimeDuringPoll(false);
}

@Test
Expand Down

0 comments on commit 05200f7

Please sign in to comment.