Skip to content

Commit

Permalink
[CELEBORN-1820] Failing to write and flush StreamChunk data should be…
Browse files Browse the repository at this point in the history
… counted as FETCH_CHUNK_FAIL

<!--
Thanks for sending a pull request!  Here are some tips for you:
  - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
  - Be sure to keep the PR description updated to reflect all changes.
  - Please write your PR title to summarize what this PR proposes.
  - If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?

- In current implementation, we use `writeAndFlush` to send chunk data to client asynchronously. Failing to write and flush StreamChunk data to remote should be counted as FETCH_CHUNK_FAIL.
- Add remote client address in log for debug.

### Why are the changes needed?

It it important to monitor FETCH_CHUNK_FAIL count correctly.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #3051 from Z1Wu/fix/fetch_handler_metrics.

Authored-by: wuziyi <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
Z1Wu authored and FMX committed Jan 7, 2025
1 parent 61c90e3 commit eb9e164
Showing 1 changed file with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,8 @@ class FetchHandler(
client: TransportClient,
streamChunkSlice: StreamChunkSlice,
req: RequestMessage): Unit = {
logDebug(s"Received req from ${NettyUtils.getRemoteAddress(client.getChannel)}" +
lazy val remoteAddr = NettyUtils.getRemoteAddress(client.getChannel)
logDebug(s"Received req from ${remoteAddr}" +
s" to fetch block $streamChunkSlice")

workerSource.recordAppActiveConnection(
Expand Down Expand Up @@ -548,14 +549,15 @@ class FetchHandler(
if (future.isSuccess) {
if (log.isDebugEnabled) {
logDebug(
s"Sending ChunkFetchSuccess operation succeeded, chunk $streamChunkSlice")
s"Sending ChunkFetchSuccess to $remoteAddr succeeded, chunk $streamChunkSlice")
}
workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT)
} else {
logWarning(
s"Sending ChunkFetchSuccess operation failed, chunk $streamChunkSlice",
s"Sending ChunkFetchSuccess to $remoteAddr failed, chunk $streamChunkSlice",
future.cause())
workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
}
workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT)
chunkStreamManager.chunkSent(streamChunkSlice.streamId)
if (fetchTimeMetric != null) {
fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
Expand Down

0 comments on commit eb9e164

Please sign in to comment.