Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17455: fix stuck producer when throttling or retrying #17527

Merged
merged 8 commits into from
Jan 9, 2025

Conversation

coltmcnealy-lh
Copy link
Contributor

fixes stuck producer by polling again after pollDelayMs in NetworkUtils#awaitReady()

@github-actions github-actions bot added clients small Small PRs labels Oct 17, 2024
@coltmcnealy-lh
Copy link
Contributor Author

We are soaking this change right now. If it doesn't cause other problems, I will need help with unit tests. Addresses this ticket: https://issues.apache.org/jira/browse/KAFKA-17455

@coltmcnealy-lh
Copy link
Contributor Author

@dajac any help would be appreciated with the tests. Not sure if what I attempted to do was clear or not, but (obviously) the changes I made to MockClient.java broke tons of stuff.

The reason for those changes was that since poll() didn't advance the time, it got stuck in an infinite loop before these changes. There must be a better way to test this.

if (copy.isEmpty()) {
// Simulate time advancing. If no responses are received, then we know that
// we waited for the whole timeoutMs.
time.sleep(timeoutMs);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed when developing my test that repeated calls to poll() with timeoutMs == 10 never resulted in time.milliseconds() advancing, so the fictitious Node in my test never became ready (time never advanced past the throttled time). That's why I made this change; however, it broke all sorts of things.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess most test don't expect that time advanced "automatically"... It think we need to remove this.

I think there two possibilities: start a background thread in your test before you call runOnce() and let the background thread advance mockTime (maybe too complex?), or, make the MockTime object more advanced and add some "auto-time-advance" feature, ie, each time milliseconds() is called, advance time by 1ms (or some other value passed into MockTime -- only your test would enable this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Matthias. I did something somewhat inspired by your comment. Ready for review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should work, too, but why don't you just create new MockTime(1L) and use the already existing "auto-tick" feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax the problem is that the following test still passes on trunk using new MockTime(1L). However, the test that I have in the PR as it stands passes on my branch but fails on trunk:

    @Test
    public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() {
        // We want MockClient#poll() to advance time so that eventually the backoff expires.
        // client.advanceTimeDuringPoll(true);
        time = new MockTime(1L);

        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
        TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);

        setupWithTransactionState(txnManager);
        doInitTransactions(txnManager, producerIdAndEpoch);

        int throttleTimeMs = 1000;
        long startTime = time.milliseconds();
        Node nodeToThrottle = metadata.fetch().nodeById(0);
        client.throttle(nodeToThrottle, throttleTimeMs);

        // Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen
        // as done above by throttling or with a disconnect / backoff.
        long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime);
        assertTrue(currentPollDelay > 0);
        assertTrue(currentPollDelay <= throttleTimeMs);

        txnManager.beginTransaction();
        txnManager.maybeAddPartition(tp0);

        assertFalse(txnManager.hasInFlightRequest());
        sender.runOnce();
        assertTrue(txnManager.hasInFlightRequest());

        long totalTimeToRunOnce = time.milliseconds() - startTime;

        // It should have blocked roughly only the backoffTimeMs and some change.
        assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT);
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that the test passes with new MockTime(1L) because the new time is not used by the client. The client must be created after the time is set. Would it work? I would also prefer this over adding the sleep in the mock client.

return connectionState(node.idString()).pollDelayMs(now);
}

public void setAdvanceTimeDuringPoll(boolean advanceTimeDuringPoll) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void setAdvanceTimeDuringPoll(boolean advanceTimeDuringPoll) {
public void advanceTimeDuringPoll(boolean advanceTimeDuringPoll) {

if (copy.isEmpty()) {
// Simulate time advancing. If no responses are received, then we know that
// we waited for the whole timeoutMs.
time.sleep(timeoutMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should work, too, but why don't you just create new MockTime(1L) and use the already existing "auto-tick" feature?

@Test
public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() {
// We want MockClient#poll() to advance time so that eventually the backoff expires.
client.setAdvanceTimeDuringPoll(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be replace with this.time = new MockTime(1L); (or some other auto-tick value)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that. However, the problem is that the thing we want to test is not how many times we call time.milliseconds() but rather what values are passed into NetworkClient#poll().

I wasn't able to find a way to use this strategy to get a test that failed on trunk and passed on my branch.

@coltmcnealy-lh coltmcnealy-lh force-pushed the kafka-17455/fix-stuck-producer branch from 05200f7 to 1bc3443 Compare November 10, 2024 19:42
@coltmcnealy-lh coltmcnealy-lh requested a review from mjsax November 12, 2024 18:12
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I am not an expert on producer code. \cc @dajac

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coltmcnealy-lh Thanks for the patch and sorry for the delay. I left a few minor comments for consideration.

Comment on lines 590 to 591
assertTrue(currentPollDelay > 0);
assertTrue(currentPollDelay <= throttleTimeMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wonder if we could just assert that currentPollDelay equals to throttleTimeMs. The time does not seem to advance before pollDelayMs is called. Would it work?

long totalTimeToRunOnce = time.milliseconds() - startTime;

// It should have blocked roughly only the backoffTimeMs and some change.
assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Same question here. Could we assert that it equals to currentPollDelay?

// It should have blocked roughly only the backoffTimeMs and some change.
assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT);

client.advanceTimeDuringPoll(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this line won't be called if any of the previous code throws (e.g. failed assertions). We should use a try..finally to ensure that we restore the state. An alternative would be to create the client in the setup in order to ensure a clean state for each test.

if (copy.isEmpty()) {
// Simulate time advancing. If no responses are received, then we know that
// we waited for the whole timeoutMs.
time.sleep(timeoutMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that the test passes with new MockTime(1L) because the new time is not used by the client. The client must be created after the time is set. Would it work? I would also prefer this over adding the sleep in the mock client.

@coltmcnealy-lh
Copy link
Contributor Author

@dajac writes:

I suppose that the test passes with new MockTime(1L) because the new time is not used by the client. The client must be created after the time is set. Would it work? I would also prefer this over adding the sleep in the mock client.

It does pass; however, it also passes on trunk, which means the test has no signal. I am fine with making that change however @mjsax told me it was best to craft a test that fails before the patch (admittedly, this was tricky).

Comment on lines 591 to 592
assertTrue(currentPollDelay > 0);
assertTrue(currentPollDelay == throttleTimeMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could use assertEquals(throttleTimeMs, currentPollDelay instead of those two lines.

@dajac
Copy link
Member

dajac commented Jan 9, 2025

@dajac writes:

I suppose that the test passes with new MockTime(1L) because the new time is not used by the client. The client must be created after the time is set. Would it work? I would also prefer this over adding the sleep in the mock client.

It does pass; however, it also passes on trunk, which means the test has no signal. I am fine with making that change however @mjsax told me it was best to craft a test that fails before the patch (admittedly, this was tricky).

@coltmcnealy-lh Thanks for trying. I also gave it a shot and I could not find a better way to do it. I think that we can keep your unit test as you proposed it. I left a minor comment.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@mjsax mjsax merged commit bb22eec into apache:trunk Jan 9, 2025
9 checks passed
mjsax pushed a commit that referenced this pull request Jan 9, 2025
A producer might get stuck after it was throttled. This PR unblocks the producer by polling again
after pollDelayMs in NetworkUtils#awaitReady().

Reviewers: Matthias J. Sax <[email protected]>, David Jacot <[email protected]>
mjsax pushed a commit that referenced this pull request Jan 9, 2025
A producer might get stuck after it was throttled. This PR unblocks the producer by polling again
after pollDelayMs in NetworkUtils#awaitReady().

Reviewers: Matthias J. Sax <[email protected]>, David Jacot <[email protected]>
mjsax pushed a commit that referenced this pull request Jan 9, 2025
A producer might get stuck after it was throttled. This PR unblocks the producer by polling again
after pollDelayMs in NetworkUtils#awaitReady().

Reviewers: Matthias J. Sax <[email protected]>, David Jacot <[email protected]>
@mjsax
Copy link
Member

mjsax commented Jan 9, 2025

Thanks for the fix. Merged to trunk and cherry-picked to 4.0, 3.9, and 3.8 branches.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants