From 9a700589f6f1c09e92f4ade23e9488d37049127e Mon Sep 17 00:00:00 2001 From: wuziyi Date: Sat, 4 Jan 2025 16:55:12 +0800 Subject: [PATCH 1/2] [fix] Failing to write and flush StreamChunk data should be counted as FETCH_CHUNK_FAIL. --- .../celeborn/service/deploy/worker/FetchHandler.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index f040f433434..81a0bc1496d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -511,7 +511,8 @@ class FetchHandler( client: TransportClient, streamChunkSlice: StreamChunkSlice, req: RequestMessage): Unit = { - logDebug(s"Received req from ${NettyUtils.getRemoteAddress(client.getChannel)}" + + val remoteAddr = NettyUtils.getRemoteAddress(client.getChannel) + logDebug(s"Received req from ${remoteAddr}" + s" to fetch block $streamChunkSlice") workerSource.recordAppActiveConnection( @@ -548,14 +549,15 @@ class FetchHandler( if (future.isSuccess) { if (log.isDebugEnabled) { logDebug( - s"Sending ChunkFetchSuccess operation succeeded, chunk $streamChunkSlice") + s"Sending ChunkFetchSuccess to $remoteAddr succeeded, chunk $streamChunkSlice") } + workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT) } else { logWarning( - s"Sending ChunkFetchSuccess operation failed, chunk $streamChunkSlice", + s"Sending ChunkFetchSuccess to $remoteAddr failed, chunk $streamChunkSlice", future.cause()) + workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT) } - workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT) chunkStreamManager.chunkSent(streamChunkSlice.streamId) if (fetchTimeMetric != null) { fetchTimeMetric.update(System.nanoTime() - fetchBeginTime) From 359437e26cc4feac7b95884962b562c701ef02dc Mon Sep 17 00:00:00 2001 From: wuziyi Date: Mon, 6 Jan 2025 12:23:03 +0800 Subject: [PATCH 2/2] fix --- .../apache/celeborn/service/deploy/worker/FetchHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 81a0bc1496d..ff32b940173 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -511,7 +511,7 @@ class FetchHandler( client: TransportClient, streamChunkSlice: StreamChunkSlice, req: RequestMessage): Unit = { - val remoteAddr = NettyUtils.getRemoteAddress(client.getChannel) + lazy val remoteAddr = NettyUtils.getRemoteAddress(client.getChannel) logDebug(s"Received req from ${remoteAddr}" + s" to fetch block $streamChunkSlice")