From 54e8e84bc082d636e0e9be292ecbdd1782ad77a4 Mon Sep 17 00:00:00 2001 From: xinyuwang1 Date: Fri, 3 Jan 2025 20:10:06 +0800 Subject: [PATCH 1/2] [CELEBORN-1818]Fix incorrect timeout exception when waiting on no pending writes --- .../deploy/worker/storage/PartitionDataWriter.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index c711752fbf7..9667628c028 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -507,7 +507,7 @@ protected synchronized long close( } try { - waitOnNoPending(numPendingWrites); + waitOnNoPending(numPendingWrites, false); closed = true; synchronized (flushLock) { @@ -520,7 +520,7 @@ protected synchronized long close( } tryClose.run(); - waitOnNoPending(notifier.numPendingFlushes); + waitOnNoPending(notifier.numPendingFlushes, true); } finally { returnBuffer(false); try { @@ -580,7 +580,7 @@ public void evict(boolean checkClose) throws IOException { if (memoryFileInfo != null) { evictInternal(); if (isClosed()) { - waitOnNoPending(notifier.numPendingFlushes); + waitOnNoPending(notifier.numPendingFlushes, true); storageManager.notifyFileInfoCommitted(shuffleKey, getFile().getName(), diskFileInfo); } } @@ -634,7 +634,7 @@ public IOException getException() { } } - protected void waitOnNoPending(AtomicInteger counter) throws IOException { + protected void waitOnNoPending(AtomicInteger counter, boolean failWhenTimeout) throws IOException { long waitTime = writerCloseTimeoutMs; while (counter.get() > 0 && waitTime > 0) { try { @@ -647,7 +647,7 @@ protected void waitOnNoPending(AtomicInteger counter) throws IOException { } waitTime -= WAIT_INTERVAL_MS; } - if (counter.get() > 0) { + if (counter.get() > 0 && failWhenTimeout) { IOException ioe = new IOException("Wait pending actions timeout, Counter: " + counter.get()); notifier.setException(ioe); throw ioe; From 521e01a27ab577c6a2fe4a523fab18bba26aa42b Mon Sep 17 00:00:00 2001 From: xinyuwang1 Date: Fri, 3 Jan 2025 20:22:24 +0800 Subject: [PATCH 2/2] fix style --- .../service/deploy/worker/storage/PartitionDataWriter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index 9667628c028..02766a9a8a2 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -634,7 +634,8 @@ public IOException getException() { } } - protected void waitOnNoPending(AtomicInteger counter, boolean failWhenTimeout) throws IOException { + protected void waitOnNoPending(AtomicInteger counter, boolean failWhenTimeout) + throws IOException { long waitTime = writerCloseTimeoutMs; while (counter.get() > 0 && waitTime > 0) { try {