Skip to content

Commit

Permalink
clarifies comments
Browse files Browse the repository at this point in the history
  • Loading branch information
coltmcnealy-lh committed Oct 17, 2024
1 parent 0effaa0 commit dcbd984
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ public static boolean awaitReady(KafkaClient client, Node node, Time time, long
}
long pollTimeout = timeoutMs - (attemptStartTime - startTime); // initialize in this order to avoid overflow

// If the network client is delayed for some reason (eg. throttling or retry backoff), polling longer than
// that is potentially dangerous as the producer will get stuck waiting with potential for some pending
// requests to just not get sent. This fixes KAFKA-17455. This is the way.
long timeUntilCanSendDataAgain = client.pollDelayMs(node, startTime);
if (timeUntilCanSendDataAgain > 0 && pollTimeout > timeUntilCanSendDataAgain) {
pollTimeout = timeUntilCanSendDataAgain;
// If the network client is waiting to send data for some reason (eg. throttling or retry backoff),
// polling longer than that is potentially dangerous as the producer will not attempt to send
// any pending requests.
long waitingTime = client.pollDelayMs(node, startTime);
if (waitingTime > 0 && pollTimeout > waitingTime) {
// Block only until the next-scheduled time that it's okay to send data to the producer,
// wake up, and try again. This is the way.
pollTimeout = waitingTime;
}

client.poll(pollTimeout, attemptStartTime);
Expand Down

0 comments on commit dcbd984

Please sign in to comment.