Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement telemetry API v2 #62

Merged
merged 40 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
6618c75
WIP. Working API, limited metrics, missing tags for metrics
cgilmour Sep 27, 2023
890490d
Redesign to support tagged metrics
cgilmour Sep 29, 2023
f41f4d4
Add tags to metrics payloads. Ignore unused counters.
cgilmour Sep 29, 2023
b91869f
Remove comforting developer noise
cgilmour Oct 2, 2023
0d4e82f
Additions for building via bazel
cgilmour Oct 2, 2023
94bfc20
Adding comments
cgilmour Oct 3, 2023
e13e5d1
first pass at a review of cgilmour/telemetry-api:
dgoffredo Sep 29, 2023
9e432cb
Store value of hostname in member field
cgilmour Oct 3, 2023
0005e65
Refactor runtime_id into span defaults, available to tracer telemetry
cgilmour Oct 3, 2023
e54bd30
Change count metrics to report absolute instead of cumulative value
cgilmour Oct 3, 2023
19e3e43
Allow telemetry to be enabled/disabled using DD_INSTRUMENTATION_TELEM…
cgilmour Oct 6, 2023
5603789
Add debug option, for developer convenience
cgilmour Oct 6, 2023
1fd88bd
Call capture_and_reset_value() instead of having reset behavior insid…
cgilmour Oct 6, 2023
8cf6d49
Omit zero values, and sending empty generate-metrics payloads
cgilmour Oct 6, 2023
70ad77c
Comments
cgilmour Oct 8, 2023
32b7885
Add app-closing event
cgilmour Oct 10, 2023
16d532b
Temporary fix for datadog agent tests
cgilmour Oct 10, 2023
9811f8c
Unit tests
cgilmour Oct 10, 2023
dc94da4
CI reported this
cgilmour Oct 10, 2023
1603c0e
Additional comments
cgilmour Oct 10, 2023
09fc24d
Additional comments
cgilmour Oct 10, 2023
28cc724
Refactor repeated telemetry body construction
cgilmour Oct 11, 2023
31d88a3
Refactor http client callbacks
cgilmour Oct 11, 2023
879cfe3
separate runtime_id from class SpanDefaults, and introduce class Runt…
dgoffredo Oct 12, 2023
26a5eac
doc 'till you drop
dgoffredo Oct 12, 2023
03a9921
don't use the real clock in SpanSampler's limiter test
dgoffredo Oct 12, 2023
04e1a17
Disable telemetry for curl test that's intended for single requests
cgilmour Oct 12, 2023
688906d
Add mutex to MockLogger, seems to be prevent SIGSEGV now that telemet…
dgoffredo Oct 12, 2023
1ad8f70
Bump coverage percentage
cgilmour Oct 12, 2023
c839b3a
Only pass `this` when calling methods
cgilmour Oct 12, 2023
95c5d0b
Log errors with a prefix
cgilmour Oct 12, 2023
5ec1769
Unconstification
cgilmour Oct 12, 2023
a92cdf3
Reordering things
cgilmour Oct 12, 2023
d9c2288
std::time_t
cgilmour Oct 12, 2023
6e9e28d
Change datadog_agent test
cgilmour Oct 12, 2023
feee3fe
Merge branch 'david.goffredo/fix-flaky-limiter-test' into cgilmour/te…
cgilmour Oct 12, 2023
bdb9d9f
Only send interval for gauge metrics.
cgilmour Oct 13, 2023
28dd1c4
Fix logged error message
cgilmour Oct 16, 2023
eb04fe8
Fix the app-closing metrics payload
cgilmour Oct 17, 2023
c57a039
Formatting.
cgilmour Oct 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ cc_library(
"src/datadog/id_generator.cpp",
"src/datadog/limiter.cpp",
"src/datadog/logger.cpp",
"src/datadog/metrics.cpp",
"src/datadog/msgpack.cpp",
"src/datadog/null_collector.cpp",
"src/datadog/parse_util.cpp",
"src/datadog/platform_util.cpp",
"src/datadog/propagation_style.cpp",
"src/datadog/random.cpp",
"src/datadog/rate.cpp",
"src/datadog/runtime_id.cpp",
"src/datadog/span.cpp",
"src/datadog/span_data.cpp",
"src/datadog/span_defaults.cpp",
Expand All @@ -32,6 +34,7 @@ cc_library(
"src/datadog/tags.cpp",
"src/datadog/threaded_event_scheduler.cpp",
"src/datadog/tracer_config.cpp",
"src/datadog/tracer_telemetry.cpp",
"src/datadog/tracer.cpp",
"src/datadog/trace_id.cpp",
"src/datadog/trace_sampler_config.cpp",
Expand Down Expand Up @@ -64,6 +67,7 @@ cc_library(
"src/datadog/json_fwd.hpp",
"src/datadog/limiter.h",
"src/datadog/logger.h",
"src/datadog/metrics.h",
"src/datadog/msgpack.h",
"src/datadog/null_collector.h",
"src/datadog/optional.h",
Expand All @@ -72,6 +76,7 @@ cc_library(
"src/datadog/propagation_style.h",
"src/datadog/random.h",
"src/datadog/rate.h",
"src/datadog/runtime_id.h",
"src/datadog/sampling_decision.h",
"src/datadog/sampling_mechanism.h",
"src/datadog/sampling_priority.h",
Expand All @@ -88,6 +93,7 @@ cc_library(
"src/datadog/tags.h",
"src/datadog/threaded_event_scheduler.h",
"src/datadog/tracer_config.h",
"src/datadog/tracer_telemetry.h",
"src/datadog/tracer.h",
"src/datadog/trace_id.h",
"src/datadog/trace_sampler_config.h",
Expand All @@ -110,4 +116,4 @@ cc_library(
"@com_google_absl//absl/strings",
"@com_google_absl//absl/types:optional",
],
)
)
18 changes: 12 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ target_sources(dd_trace_cpp-objects PRIVATE
src/datadog/id_generator.cpp
src/datadog/limiter.cpp
src/datadog/logger.cpp
src/datadog/metrics.cpp
src/datadog/msgpack.cpp
src/datadog/null_collector.cpp
src/datadog/parse_util.cpp
src/datadog/platform_util.cpp
src/datadog/propagation_style.cpp
src/datadog/random.cpp
src/datadog/rate.cpp
src/datadog/runtime_id.cpp
src/datadog/span.cpp
src/datadog/span_data.cpp
src/datadog/span_defaults.cpp
Expand All @@ -127,6 +129,7 @@ target_sources(dd_trace_cpp-objects PRIVATE
src/datadog/tags.cpp
src/datadog/threaded_event_scheduler.cpp
src/datadog/tracer_config.cpp
src/datadog/tracer_telemetry.cpp
src/datadog/tracer.cpp
src/datadog/trace_id.cpp
src/datadog/trace_sampler_config.cpp
Expand Down Expand Up @@ -165,6 +168,7 @@ target_sources(dd_trace_cpp-objects PUBLIC
src/datadog/json.hpp
src/datadog/limiter.h
src/datadog/logger.h
src/datadog/metrics.h
src/datadog/msgpack.h
src/datadog/null_collector.h
src/datadog/optional.h
Expand All @@ -173,6 +177,7 @@ target_sources(dd_trace_cpp-objects PUBLIC
src/datadog/propagation_style.h
src/datadog/random.h
src/datadog/rate.h
src/datadog/runtime_id.h
src/datadog/sampling_decision.h
src/datadog/sampling_mechanism.h
src/datadog/sampling_priority.h
Expand All @@ -189,6 +194,7 @@ target_sources(dd_trace_cpp-objects PUBLIC
src/datadog/tags.h
src/datadog/threaded_event_scheduler.h
src/datadog/tracer_config.h
src/datadog/tracer_telemetry.h
src/datadog/tracer.h
src/datadog/trace_id.h
src/datadog/trace_sampler_config.h
Expand All @@ -205,12 +211,12 @@ include_directories(${CMAKE_BINARY_DIR}/include)

# Linking this library requires libcurl and threads.
find_package(Threads REQUIRED)
target_link_libraries(dd_trace_cpp-objects
PUBLIC
${CMAKE_BINARY_DIR}/lib/libcurl.a
PUBLIC
Threads::Threads
${COVERAGE_LIBRARIES}
target_link_libraries(dd_trace_cpp-objects
PUBLIC
${CMAKE_BINARY_DIR}/lib/libcurl.a
PUBLIC
Threads::Threads
${COVERAGE_LIBRARIES}
${COREFOUNDATION_LIBRARY}
${SYSTEMCONFIGURATION_LIBRARY}
)
Expand Down
125 changes: 114 additions & 11 deletions src/datadog/datadog_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,20 @@ namespace tracing {
namespace {

const StringView traces_api_path = "/v0.4/traces";
const StringView telemetry_v2_path = "/telemetry/proxy/api/v2/apmtelemetry";

HTTPClient::URL traces_endpoint(const HTTPClient::URL& agent_url) {
auto traces_url = agent_url;
append(traces_url.path, traces_api_path);
return traces_url;
}

HTTPClient::URL telemetry_endpoint(const HTTPClient::URL& agent_url) {
auto telemetry_v2_url = agent_url;
append(telemetry_v2_url.path, telemetry_v2_path);
dmehala marked this conversation as resolved.
Show resolved Hide resolved
return telemetry_v2_url;
}

Expected<void> msgpack_encode(
std::string& destination,
const std::vector<DatadogAgent::TraceChunk>& trace_chunks) {
Expand Down Expand Up @@ -124,24 +131,71 @@ std::variant<CollectorResponse, std::string> parse_agent_traces_response(

} // namespace

DatadogAgent::DatadogAgent(const FinalizedDatadogAgentConfig& config,
const Clock& clock,
const std::shared_ptr<Logger>& logger)
: clock_(clock),
DatadogAgent::DatadogAgent(
const FinalizedDatadogAgentConfig& config,
const std::shared_ptr<TracerTelemetry>& tracer_telemetry,
const Clock& clock, const std::shared_ptr<Logger>& logger)
: tracer_telemetry_(tracer_telemetry),
clock_(clock),
logger_(logger),
traces_endpoint_(traces_endpoint(config.url)),
telemetry_endpoint_(telemetry_endpoint(config.url)),
http_client_(config.http_client),
event_scheduler_(config.event_scheduler),
cancel_scheduled_flush_(event_scheduler_->schedule_recurring_event(
config.flush_interval, [this]() { flush(); })),
flush_interval_(config.flush_interval) {
assert(logger_);
assert(tracer_telemetry_);
if (tracer_telemetry_->enabled()) {
// Only schedule this if telemetry is enabled.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Details but the line above is self explanatory.

// Every 10 seconds, have the tracer telemetry capture the metrics values.
// Every 60 seconds, also report those values to the datadog agent.
cancel_telemetry_timer_ = event_scheduler_->schedule_recurring_event(
std::chrono::seconds(10), [this, n = 0]() mutable {
n++;
tracer_telemetry_->capture_metrics();
if (n % 6 == 0) {
send_heartbeat_and_telemetry();
}
});
// Callback for setting telemetry request headers.
telemetry_set_request_headers_ = [](DictWriter& headers) {
headers.set("Content-Type", "application/json");
};
// Callback for successful telemetry HTTP requests, to examine HTTP status.
telemetry_on_response_ = [logger = logger_](
int response_status,
const DictReader& /*response_headers*/,
std::string response_body) {
if (response_status < 200 || response_status >= 300) {
logger->log_error([&](auto& stream) {
stream << "Unexpected telemetry response status " << response_status
<< " with body (starts on next line):\n"
<< response_body;
});
}
};
// Callback for unsuccessful telemetry HTTP requests.
telemetry_on_error_ = [logger = logger_](Error error) {
logger->log_error(error.with_prefix(
"Error occurred during HTTP request for telemetry: "));
};
}
}

DatadogAgent::~DatadogAgent() {
const auto deadline = clock_().tick + std::chrono::seconds(2);
cancel_scheduled_flush_();
flush();
if (tracer_telemetry_->enabled()) {
// This action only needs to occur if tracer telemetry is enabled.
cancel_telemetry_timer_();
tracer_telemetry_->capture_metrics();
// The app-closing message is bundled with a message containing the final
// metric values.
send_app_closing();
}
http_client_->drain(deadline);
}

Expand All @@ -154,7 +208,6 @@ Expected<void> DatadogAgent::send(
}

nlohmann::json DatadogAgent::config_json() const {
const auto& url = traces_endpoint_; // brevity
const auto flush_interval_milliseconds =
std::chrono::duration_cast<std::chrono::milliseconds>(flush_interval_)
.count();
Expand All @@ -163,7 +216,8 @@ nlohmann::json DatadogAgent::config_json() const {
return nlohmann::json::object({
{"type", "datadog::tracing::DatadogAgent"},
{"config", nlohmann::json::object({
{"url", (url.scheme + "://" + url.authority + url.path)},
{"traces_url", (traces_endpoint_.scheme + "://" + traces_endpoint_.authority + traces_endpoint_.path)},
{"telemetry_url", (telemetry_endpoint_.scheme + "://" + telemetry_endpoint_.authority + telemetry_endpoint_.path)},
{"flush_interval_milliseconds", flush_interval_milliseconds},
{"http_client", http_client_->config_json()},
{"event_scheduler", event_scheduler_->config_json()},
Expand Down Expand Up @@ -211,10 +265,22 @@ void DatadogAgent::flush() {

// This is the callback for the HTTP response. It's invoked
// asynchronously.
auto on_response = [samplers = std::move(response_handlers),
auto on_response = [telemetry = tracer_telemetry_,
samplers = std::move(response_handlers),
logger = logger_](int response_status,
const DictReader& /*response_headers*/,
std::string response_body) {
if (response_status >= 500) {
telemetry->metrics().trace_api.responses_5xx.inc();
} else if (response_status >= 400) {
telemetry->metrics().trace_api.responses_4xx.inc();
} else if (response_status >= 300) {
telemetry->metrics().trace_api.responses_3xx.inc();
} else if (response_status >= 200) {
telemetry->metrics().trace_api.responses_2xx.inc();
} else if (response_status >= 100) {
telemetry->metrics().trace_api.responses_1xx.inc();
}
if (response_status != 200) {
logger->log_error([&](auto& stream) {
stream << "Unexpected response status " << response_status
Expand Down Expand Up @@ -250,16 +316,53 @@ void DatadogAgent::flush() {
// This is the callback for if something goes wrong sending the
// request or retrieving the response. It's invoked
// asynchronously.
auto on_error = [logger = logger_](Error error) {
logger->log_error(
error.with_prefix("Error occurred during HTTP request: "));
auto on_error = [telemetry = tracer_telemetry_,
logger = logger_](Error error) {
telemetry->metrics().trace_api.errors_network.inc();
logger->log_error(error.with_prefix(
"Error occurred during HTTP request for submitting traces: "));
};

tracer_telemetry_->metrics().trace_api.requests.inc();
auto post_result = http_client_->post(
traces_endpoint_, std::move(set_request_headers), std::move(body),
std::move(on_response), std::move(on_error));
if (auto* error = post_result.if_error()) {
logger_->log_error(*error);
logger_->log_error(
error->with_prefix("Unexpected error submitting traces: "));
}
}

void DatadogAgent::send_app_started(nlohmann::json&& tracer_config) {
auto payload = tracer_telemetry_->app_started(std::move(tracer_config));
auto post_result = http_client_->post(
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
telemetry_on_response_, telemetry_on_error_);
if (auto* error = post_result.if_error()) {
logger_->log_error(
error->with_prefix("Unexpected error submitting telemetry: "));
dgoffredo marked this conversation as resolved.
Show resolved Hide resolved
}
}

void DatadogAgent::send_heartbeat_and_telemetry() {
auto payload = tracer_telemetry_->heartbeat_and_telemetry();
auto post_result = http_client_->post(
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
telemetry_on_response_, telemetry_on_error_);
if (auto* error = post_result.if_error()) {
logger_->log_error(
error->with_prefix("Unexpected error submitting traces: "));
dgoffredo marked this conversation as resolved.
Show resolved Hide resolved
}
}

void DatadogAgent::send_app_closing() {
auto payload = tracer_telemetry_->app_closing();
auto post_result = http_client_->post(
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
telemetry_on_response_, telemetry_on_error_);
if (auto* error = post_result.if_error()) {
logger_->log_error(
error->with_prefix("Unexpected error submitting traces: "));
dgoffredo marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
16 changes: 15 additions & 1 deletion src/datadog/datadog_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "collector.h"
#include "event_scheduler.h"
#include "http_client.h"
#include "metrics.h"
#include "tracer_telemetry.h"

namespace datadog {
namespace tracing {
Expand All @@ -32,26 +34,38 @@ class DatadogAgent : public Collector {

private:
std::mutex mutex_;
std::shared_ptr<TracerTelemetry> tracer_telemetry_;
Clock clock_;
std::shared_ptr<Logger> logger_;
std::vector<TraceChunk> trace_chunks_;
HTTPClient::URL traces_endpoint_;
HTTPClient::URL telemetry_endpoint_;
std::shared_ptr<HTTPClient> http_client_;
std::shared_ptr<EventScheduler> event_scheduler_;
EventScheduler::Cancel cancel_scheduled_flush_;
EventScheduler::Cancel cancel_telemetry_timer_;
std::chrono::steady_clock::duration flush_interval_;
// Callbacks for submitting telemetry data
HTTPClient::HeadersSetter telemetry_set_request_headers_;
HTTPClient::ResponseHandler telemetry_on_response_;
HTTPClient::ErrorHandler telemetry_on_error_;

void flush();
void send_heartbeat_and_telemetry();
void send_app_closing();

public:
DatadogAgent(const FinalizedDatadogAgentConfig&, const Clock& clock,
DatadogAgent(const FinalizedDatadogAgentConfig&,
const std::shared_ptr<TracerTelemetry>&, const Clock& clock,
const std::shared_ptr<Logger>&);
~DatadogAgent();

Expected<void> send(
std::vector<std::unique_ptr<SpanData>>&& spans,
const std::shared_ptr<TraceSampler>& response_handler) override;

void send_app_started(nlohmann::json&& tracer_config);

nlohmann::json config_json() const override;
};

Expand Down
1 change: 1 addition & 0 deletions src/datadog/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace environment {
#define LIST_ENVIRONMENT_VARIABLES(MACRO) \
MACRO(DD_AGENT_HOST) \
MACRO(DD_ENV) \
MACRO(DD_INSTRUMENTATION_TELEMETRY_ENABLED) \
MACRO(DD_PROPAGATION_STYLE_EXTRACT) \
MACRO(DD_PROPAGATION_STYLE_INJECT) \
MACRO(DD_TRACE_PROPAGATION_STYLE_EXTRACT) \
Expand Down
Loading