diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index f040f433434..ff32b940173 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -511,7 +511,8 @@ class FetchHandler( client: TransportClient, streamChunkSlice: StreamChunkSlice, req: RequestMessage): Unit = { - logDebug(s"Received req from ${NettyUtils.getRemoteAddress(client.getChannel)}" + + lazy val remoteAddr = NettyUtils.getRemoteAddress(client.getChannel) + logDebug(s"Received req from ${remoteAddr}" + s" to fetch block $streamChunkSlice") workerSource.recordAppActiveConnection( @@ -548,14 +549,15 @@ class FetchHandler( if (future.isSuccess) { if (log.isDebugEnabled) { logDebug( - s"Sending ChunkFetchSuccess operation succeeded, chunk $streamChunkSlice") + s"Sending ChunkFetchSuccess to $remoteAddr succeeded, chunk $streamChunkSlice") } + workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT) } else { logWarning( - s"Sending ChunkFetchSuccess operation failed, chunk $streamChunkSlice", + s"Sending ChunkFetchSuccess to $remoteAddr failed, chunk $streamChunkSlice", future.cause()) + workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT) } - workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT) chunkStreamManager.chunkSent(streamChunkSlice.streamId) if (fetchTimeMetric != null) { fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)