Skip to content

Commit

Permalink
Adds a test but my changes to MockClient.java broke all sorts of stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
coltmcnealy-lh committed Oct 17, 2024
1 parent 8635aed commit ec9dd11
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
15 changes: 14 additions & 1 deletion clients/src/test/java/org/apache/kafka/clients/MockClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public long connectionDelay(Node node, long now) {

@Override
public long pollDelayMs(Node node, long now) {
return connectionDelay(node, now);
return connectionState(node.idString()).pollDelayMs(now);
}

public void backoff(Node node, long durationMs) {
Expand Down Expand Up @@ -336,6 +336,12 @@ public List<ClientResponse> poll(long timeoutMs, long now) {
copy.add(response);
}

if (copy.isEmpty()) {
// Simulate time advancing. If no responses are received, then we know that
// we waited for the whole timeoutMs.
time.sleep(timeoutMs);
}

return copy;
}

Expand Down Expand Up @@ -795,6 +801,13 @@ long connectionDelay(long now) {
return 0;
}

long pollDelayMs(long now) {
if (notThrottled(now))
return connectionDelay(now);

return throttledUntilMs - now;
}

boolean ready(long now) {
switch (state) {
case CONNECTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,25 +577,27 @@ public void senderThreadShouldNotBlockWhenBackingOffAndAddingPartitionsToTxn() {
doInitTransactions(txnManager, producerIdAndEpoch);

int backoffTimeMs = 10;
long now = time.milliseconds();
long startTime = time.milliseconds();
Node nodeToThrottle = metadata.fetch().nodeById(0);
// client.throttle(nodeToThrottle, backoffTimeMs);
client.backoff(nodeToThrottle, backoffTimeMs);
assertTrue(client.isConnected(nodeToThrottle.idString()));
client.disconnect(nodeToThrottle.idString());
assertFalse(client.isConnected(nodeToThrottle.idString()));
client.throttle(nodeToThrottle, backoffTimeMs);

// Verify node is throttled about 10ms. In real-life Apache Kafka, we observe that this can happen
// as done above (with a disconnect and a backoff) or if the producer receives a response with
// throttleTimeMs > 0.
long currentPollDelay = client.pollDelayMs(nodeToThrottle, now);
// as done above by throttling or with a disconnect / backoff.
long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime);
assertTrue(currentPollDelay > 0);
assertTrue(currentPollDelay <= backoffTimeMs);

txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);

// sender.runOnce();
assertFalse(txnManager.hasInFlightRequest());
sender.runOnce();
assertTrue(txnManager.hasInFlightRequest());

long totalTimeToRunOnce = time.milliseconds() - startTime;

// It should have blocked roughly only the backoffTimeMs and some change.
assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT);
}

@Test
Expand Down

0 comments on commit ec9dd11

Please sign in to comment.