Skip to content

Commit

Permalink
ZipkinReporter: add trace logging for batching and encoded spans
Browse files Browse the repository at this point in the history
Motivation:

In order to trace batching, trace level logging will be helpful.

Modifications:

- Log every accumulation;
- Log every received span/list after encoding;

Result:

Easier to troubleshoot if there are any issues.
  • Loading branch information
idelpivnitskiy committed Oct 27, 2023
1 parent d539ff4 commit a8f13f8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019-2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -127,7 +127,12 @@ private PublisherSource.Processor<Span, Span> initReporter(final Builder builder
if (!builder.batchingEnabled) {
buffer = newPublisherProcessorDropHeadOnOverflow(builder.maxConcurrentReports);
spans = fromSource(buffer)
.map(span -> allocator.wrap(spanEncoder.encodeList(Collections.singletonList(span))));
.map(span -> {
// Always encode spans as list: https://github.com/apple/servicetalk/pull/2092
final byte[] bytes = spanEncoder.encodeList(Collections.singletonList(span));
LOGGER.trace("Encoded received span: {}, result={} bytes", span, bytes.length);
return allocator.wrap(bytes);
});
} else {
// As we send maxConcurrentReports number of parallel requests, each with roughly batchSizeHint number of
// spans, we hold a maximum of that many Spans in-memory that we can send in parallel to the collector.
Expand All @@ -136,7 +141,12 @@ private PublisherSource.Processor<Span, Span> initReporter(final Builder builder
.buffer(forCountOrTime(builder.batchSizeHint, builder.maxBatchDuration,
() -> new ListAccumulator(builder.batchSizeHint), client.executionContext().executor()))
.filter(accumulate -> !accumulate.isEmpty())
.map(bufferedSpans -> allocator.wrap(spanEncoder.encodeList(bufferedSpans)));
.map(bufferedSpans -> {
final byte[] bytes = spanEncoder.encodeList(bufferedSpans);
LOGGER.trace("Encoded received list of spans (size={}): {}, result={} bytes",
bufferedSpans.size(), bufferedSpans, bytes.length);
return allocator.wrap(bytes);
});
}

final CompletableSource.Processor spansTerminated = newCompletableProcessor();
Expand Down Expand Up @@ -287,6 +297,7 @@ private static final class ListAccumulator implements Accumulator<Span, List<Spa

@Override
public void accumulate(@Nonnull final Span item) {
LOGGER.trace("Accumulating received span: {}", item);
accumulate.add(requireNonNull(item));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019-2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -197,6 +197,7 @@ protected void initChannel(final Channel ch) {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof Span) {
byte[] bytes = codec.spanBytesEncoder().encode((Span) msg);
LOGGER.trace("Encoded received span: {}, result={} bytes", msg, bytes.length);
ByteBuf buf = ctx.alloc().buffer(bytes.length).writeBytes(bytes);
ctx.write(new DatagramPacket(buf, (InetSocketAddress) collectorAddress), promise);
} else {
Expand Down

0 comments on commit a8f13f8

Please sign in to comment.