Skip to content

Commit

Permalink
[CELEBORN-1818] Fix incorrect timeout exception when waiting on no pe…
Browse files Browse the repository at this point in the history
…nding writes

### What changes were proposed in this pull request?
Do not throw "Wait pending actions timeout" exception when waiting pending writes times out.

### Why are the changes needed?
When pendingWrites is reduced to zero, the method waitOnNoPending will jump out of the while loop. Meanwhile, if new PushData/PushMergedData request comes, pendingWrites will increment and be larger then zero. As a result, "wait pending actions timeout" exception will be thrown in waitOnNoPending.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
manual test

Closes #3049 from littlexyw/fix_wait_pending_writes_timeout.

Authored-by: xinyuwang1 <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
xinyuwang1 authored and FMX committed Jan 7, 2025
1 parent ca60613 commit 61c90e3
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ protected synchronized long close(
}

try {
waitOnNoPending(numPendingWrites);
waitOnNoPending(numPendingWrites, false);
closed = true;

synchronized (flushLock) {
Expand All @@ -520,7 +520,7 @@ protected synchronized long close(
}

tryClose.run();
waitOnNoPending(notifier.numPendingFlushes);
waitOnNoPending(notifier.numPendingFlushes, true);
} finally {
returnBuffer(false);
try {
Expand Down Expand Up @@ -582,7 +582,7 @@ public void evict(boolean checkClose) throws IOException {
if (memoryFileInfo != null) {
evictInternal();
if (isClosed()) {
waitOnNoPending(notifier.numPendingFlushes);
waitOnNoPending(notifier.numPendingFlushes, true);
storageManager.notifyFileInfoCommitted(shuffleKey, getFile().getName(), diskFileInfo);
}
}
Expand Down Expand Up @@ -636,7 +636,8 @@ public IOException getException() {
}
}

protected void waitOnNoPending(AtomicInteger counter) throws IOException {
protected void waitOnNoPending(AtomicInteger counter, boolean failWhenTimeout)
throws IOException {
long waitTime = writerCloseTimeoutMs;
while (counter.get() > 0 && waitTime > 0) {
try {
Expand All @@ -649,7 +650,7 @@ protected void waitOnNoPending(AtomicInteger counter) throws IOException {
}
waitTime -= WAIT_INTERVAL_MS;
}
if (counter.get() > 0) {
if (counter.get() > 0 && failWhenTimeout) {
IOException ioe = new IOException("Wait pending actions timeout, Counter: " + counter.get());
notifier.setException(ioe);
throw ioe;
Expand Down

0 comments on commit 61c90e3

Please sign in to comment.