Skip to content
This repository has been archived by the owner on Mar 15, 2021. It is now read-only.

Commit

Permalink
Merge pull request #109 from spotify/serializer
Browse files Browse the repository at this point in the history
Revert sendMetrics
  • Loading branch information
lmuhlha authored Mar 5, 2019
2 parents 19ee1b3 + acfbc64 commit cefa6ec
Showing 1 changed file with 24 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -101,20 +102,32 @@ public void onSuccess(String messageId) { }
}, executorService);
}

// The pubsub plugin only supports sending batches of metrics.
@Override
public AsyncFuture<Void> sendMetrics(Collection<Metric> metrics) {
log.info("Sending {} metrics", metrics.size());

int size = 0;
try {
final ByteString m = ByteString.copyFrom(serializer.serialize(metrics));
publishPubSub(m);
size += m.size();
} catch (Exception e) {
log.error("Failed to serialize batch of metrics {}", e);
final UUID traceId = UUID.randomUUID();
log.info("{}: Start sending metrics", traceId);

for (Metric metric : metrics) {
try {
final ApiFuture<String> publish = publisher.publish(PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(serializer.serialize(metric)))
.build()
);
ApiFutures.addCallback(publish, new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable t) {
log.error("Failed sending metrics {}", t.getMessage());
}

@Override
public void onSuccess(String messageId) { }

}, executorService);
} catch (Exception e) {
log.error("Failed to publish metric {}", e);
}
}
log.debug("Total size of sent metrics {} bytes", size);
log.info("{}: Finished sending metrics", traceId);
return async.resolved();
}

Expand Down

0 comments on commit cefa6ec

Please sign in to comment.