From 22daba42ce0f1e97dd78d0e67cec0d1428939a5a Mon Sep 17 00:00:00 2001 From: Chris Keathley Date: Mon, 25 Jan 2021 16:06:21 -0500 Subject: [PATCH 1/4] Optimize api server --- lib/spandex_datadog/api_server.ex | 339 ++------------------- lib/spandex_datadog/api_server/buffer.ex | 56 ++++ lib/spandex_datadog/api_server/client.ex | 211 +++++++++++++ lib/spandex_datadog/api_server/reporter.ex | 91 ++++++ test/api_server_test.exs | 94 +++--- 5 files changed, 417 insertions(+), 374 deletions(-) create mode 100644 lib/spandex_datadog/api_server/buffer.ex create mode 100644 lib/spandex_datadog/api_server/client.ex create mode 100644 lib/spandex_datadog/api_server/reporter.ex diff --git a/lib/spandex_datadog/api_server.ex b/lib/spandex_datadog/api_server.ex index 47694c4..e680760 100644 --- a/lib/spandex_datadog/api_server.ex +++ b/lib/spandex_datadog/api_server.ex @@ -2,38 +2,18 @@ defmodule SpandexDatadog.ApiServer do @moduledoc """ Implements worker for sending spans to datadog as GenServer in order to send traces async. """ + use Supervisor - use GenServer - require Logger + alias __MODULE__.Buffer + alias __MODULE__.Client + alias __MODULE__.Reporter alias Spandex.{ Span, Trace } - defmodule State do - @moduledoc false - - @type t :: %State{} - - defstruct [ - :asynchronous_send?, - :http, - :url, - :host, - :port, - :verbose?, - :waiting_traces, - :batch_size, - :sync_threshold, - :agent_pid - ] - end - - # Same as HTTPoison.headers - @type headers :: [{atom, binary}] | [{binary, binary}] | %{binary => binary} | any - - @headers [{"Content-Type", "application/msgpack"}] + require Logger @start_link_opts Optimal.schema( opts: [ @@ -76,39 +56,31 @@ defmodule SpandexDatadog.ApiServer do def start_link(opts) do opts = Optimal.validate!(opts, @start_link_opts) - GenServer.start_link(__MODULE__, opts, name: __MODULE__) + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) end - @doc """ - Builds server state. - """ - @spec init(opts :: Keyword.t()) :: {:ok, State.t()} def init(opts) do - {:ok, agent_pid} = Agent.start_link(fn -> 0 end) - - state = %State{ - asynchronous_send?: true, - host: opts[:host], - port: opts[:port], - verbose?: opts[:verbose?], - http: opts[:http], - waiting_traces: [], - batch_size: opts[:batch_size], - sync_threshold: opts[:sync_threshold], - agent_pid: agent_pid - } + buffer = Buffer.new() + reporter_opts = + opts + |> Map.new() + |> Map.take([:http, :verbose?, :host, :port]) + |> Map.put(:buffer, buffer) + + children = [ + {Reporter, reporter_opts}, + ] - {:ok, state} + Supervisor.init(children, strategy: :one_for_one) end @doc """ Send spans asynchronously to DataDog. """ @spec send_trace(Trace.t(), Keyword.t()) :: :ok - def send_trace(%Trace{} = trace, opts \\ []) do + def send_trace(%Trace{} = trace, _opts \\ []) do :telemetry.span([:spandex_datadog, :send_trace], %{trace: trace}, fn -> - timeout = Keyword.get(opts, :timeout, 30_000) - result = GenServer.call(__MODULE__, {:send_trace, trace}, timeout) + result = Buffer.add_trace(trace) {result, %{trace: trace}} end) end @@ -117,276 +89,15 @@ defmodule SpandexDatadog.ApiServer do @doc false @spec send_spans([Span.t()], Keyword.t()) :: :ok def send_spans(spans, opts \\ []) when is_list(spans) do - timeout = Keyword.get(opts, :timeout, 30_000) trace = %Trace{spans: spans} - GenServer.call(__MODULE__, {:send_trace, trace}, timeout) + send_trace(trace, opts) end + # Leaving these here for api versioning purposes. But this logic has + # been moved to the client module. @doc false - def handle_call({:send_trace, trace}, _from, state) do - state = - state - |> enqueue_trace(trace) - |> maybe_flush_traces() - - {:reply, :ok, state} - end - - @spec send_and_log([Trace.t()], State.t()) :: :ok - def send_and_log(traces, %{verbose?: verbose?} = state) do - headers = @headers ++ [{"X-Datadog-Trace-Count", length(traces)}] - - response = - traces - |> Enum.map(&format/1) - |> encode() - |> push(headers, state) - - if verbose? do - Logger.debug(fn -> "Trace response: #{inspect(response)}" end) - end - - :ok - end - - @deprecated "Please use format/3 instead" - @spec format(Trace.t()) :: map() - def format(%Trace{spans: spans, priority: priority, baggage: baggage}) do - Enum.map(spans, fn span -> format(span, priority, baggage) end) - end - - @deprecated "Please use format/3 instead" - @spec format(Span.t()) :: map() - def format(%Span{} = span), do: format(span, 1, []) - - @spec format(Span.t(), integer(), Keyword.t()) :: map() - def format(%Span{} = span, priority, _baggage) do - %{ - trace_id: span.trace_id, - span_id: span.id, - name: span.name, - start: span.start, - duration: (span.completion_time || SpandexDatadog.Adapter.now()) - span.start, - parent_id: span.parent_id, - error: error(span.error), - resource: span.resource || span.name, - service: span.service, - type: span.type, - meta: meta(span), - metrics: - metrics(span, %{ - _sampling_priority_v1: priority - }) - } - end - - # Private Helpers - - defp enqueue_trace(state, trace) do - if state.verbose? do - Logger.info(fn -> "Adding trace to stack with #{Enum.count(trace.spans)} spans" end) - end - - %State{state | waiting_traces: [trace | state.waiting_traces]} - end - - defp maybe_flush_traces(%{waiting_traces: traces, batch_size: size} = state) when length(traces) < size do - state - end - - defp maybe_flush_traces(state) do - %{ - asynchronous_send?: async?, - verbose?: verbose?, - waiting_traces: traces - } = state - - if verbose? do - span_count = Enum.reduce(traces, 0, fn trace, acc -> acc + length(trace.spans) end) - Logger.info(fn -> "Sending #{length(traces)} traces, #{span_count} spans." end) - Logger.debug(fn -> "Trace: #{inspect(traces)}" end) - end - - if async? do - if below_sync_threshold?(state) do - Task.start(fn -> - try do - send_and_log(traces, state) - after - Agent.update(state.agent_pid, fn count -> count - 1 end) - end - end) - else - # We get benefits from running in a separate process (like better GC) - # So we async/await here to mimic the behavour above but still apply backpressure - task = Task.async(fn -> send_and_log(traces, state) end) - Task.await(task) - end - else - send_and_log(traces, state) - end - - %State{state | waiting_traces: []} - end - - defp below_sync_threshold?(state) do - Agent.get_and_update(state.agent_pid, fn count -> - if count < state.sync_threshold do - {true, count + 1} - else - {false, count} - end - end) - end - - @spec meta(Span.t()) :: map - defp meta(span) do - %{} - |> add_datadog_meta(span) - |> add_error_data(span) - |> add_http_data(span) - |> add_sql_data(span) - |> add_tags(span) - |> Enum.reject(fn {_k, v} -> is_nil(v) end) - |> Enum.into(%{}) - end - - @spec add_datadog_meta(map, Span.t()) :: map - defp add_datadog_meta(meta, %Span{env: nil}), do: meta - - defp add_datadog_meta(meta, %Span{env: env}) do - Map.put(meta, :env, env) - end - - @spec add_error_data(map, Span.t()) :: map - defp add_error_data(meta, %{error: nil}), do: meta - - defp add_error_data(meta, %{error: error}) do - meta - |> add_error_type(error[:exception]) - |> add_error_message(error[:exception]) - |> add_error_stacktrace(error[:stacktrace]) - end - - @spec add_error_type(map, Exception.t() | nil) :: map - defp add_error_type(meta, nil), do: meta - defp add_error_type(meta, exception), do: Map.put(meta, "error.type", exception.__struct__) - - @spec add_error_message(map, Exception.t() | nil) :: map - defp add_error_message(meta, nil), do: meta - - defp add_error_message(meta, exception), - do: Map.put(meta, "error.msg", Exception.message(exception)) - - @spec add_error_stacktrace(map, list | nil) :: map - defp add_error_stacktrace(meta, nil), do: meta - - defp add_error_stacktrace(meta, stacktrace), - do: Map.put(meta, "error.stack", Exception.format_stacktrace(stacktrace)) - - @spec add_http_data(map, Span.t()) :: map - defp add_http_data(meta, %{http: nil}), do: meta - - defp add_http_data(meta, %{http: http}) do - status_code = - if http[:status_code] do - to_string(http[:status_code]) - end - - meta - |> Map.put("http.url", http[:url]) - |> Map.put("http.status_code", status_code) - |> Map.put("http.method", http[:method]) - end - - @spec add_sql_data(map, Span.t()) :: map - defp add_sql_data(meta, %{sql_query: nil}), do: meta - - defp add_sql_data(meta, %{sql_query: sql}) do - meta - |> Map.put("sql.query", sql[:query]) - |> Map.put("sql.rows", sql[:rows]) - |> Map.put("sql.db", sql[:db]) - end - - @spec add_tags(map, Span.t()) :: map - defp add_tags(meta, %{tags: nil}), do: meta + def format(trace), do: Client.format(trace) - defp add_tags(meta, %{tags: tags}) do - tags = tags |> Keyword.delete(:analytics_event) - - Map.merge( - meta, - tags - |> Enum.map(fn {k, v} -> {k, term_to_string(v)} end) - |> Enum.into(%{}) - ) - end - - @spec metrics(Span.t(), map) :: map - defp metrics(span, initial_value = %{}) do - initial_value - |> add_metrics(span) - |> Enum.reject(fn {_k, v} -> is_nil(v) end) - |> Enum.into(%{}) - end - - @spec add_metrics(map, Span.t()) :: map - defp add_metrics(metrics, %{tags: nil}), do: metrics - - defp add_metrics(metrics, %{tags: tags}) do - with analytics_event <- tags |> Keyword.get(:analytics_event), - true <- analytics_event != nil do - Map.merge( - metrics, - %{"_dd1.sr.eausr" => 1} - ) - else - _ -> - metrics - end - end - - @spec error(nil | Keyword.t()) :: integer - defp error(nil), do: 0 - - defp error(keyword) do - if Enum.any?(keyword, fn {_, v} -> not is_nil(v) end) do - 1 - else - 0 - end - end - - @spec encode(data :: term) :: iodata | no_return - defp encode(data), - do: data |> deep_remove_nils() |> Msgpax.pack!(data) - - @spec push(body :: iodata(), headers, State.t()) :: any() - defp push(body, headers, %State{http: http, host: host, port: port}), - do: http.put("#{host}:#{port}/v0.3/traces", body, headers) - - @spec deep_remove_nils(term) :: term - defp deep_remove_nils(term) when is_map(term) do - term - |> Enum.reject(fn {_k, v} -> is_nil(v) end) - |> Enum.map(fn {k, v} -> {k, deep_remove_nils(v)} end) - |> Enum.into(%{}) - end - - defp deep_remove_nils(term) when is_list(term) do - if Keyword.keyword?(term) do - term - |> Enum.reject(fn {_k, v} -> is_nil(v) end) - |> Enum.map(fn {k, v} -> {k, deep_remove_nils(v)} end) - else - Enum.map(term, &deep_remove_nils/1) - end - end - - defp deep_remove_nils(term), do: term - - defp term_to_string(term) when is_binary(term), do: term - defp term_to_string(term) when is_atom(term), do: term - defp term_to_string(term), do: inspect(term) + @doc false + def format(span, priority, baggage), do: Client.format(span, priority, baggage) end diff --git a/lib/spandex_datadog/api_server/buffer.ex b/lib/spandex_datadog/api_server/buffer.ex new file mode 100644 index 0000000..0788c03 --- /dev/null +++ b/lib/spandex_datadog/api_server/buffer.ex @@ -0,0 +1,56 @@ +defmodule SpandexDatadog.ApiServer.Buffer do + @moduledoc false + # The buffer is designed to efficiently gather traces and spans without blocking + # the calling process. + # It stores each "trace" in an ets table using the processes currect scheduler + # id. This helps reduce contention on each ets table since, in theory, only one process should be writing to a table at a time. + # We periodically flush all spans to datadog in the background. + + defstruct tabs: [] + + # Builds a bunch of ets tables, 1 per scheduler and returns them in a struct + def new() do + # 1 index erlang ftw + tabs = for s <- 1..System.schedulers() do + :ets.new(:"#{__MODULE__}-#{s}", [:named_table, :set, :public, {:write_concurrency, true}]) + end + + %__MODULE__{tabs: tabs} + end + + def add_trace(trace) do + buffer = :"#{__MODULE__}-#{:erlang.system_info(:scheduler_id)}" + index = :ets.update_counter(buffer, :index, 1, {:index, 0}) + :ets.insert(buffer, {index, trace}) + end + + # Returns the latest messages and then deletes them from the buffer + def flush_latest(buffer, f) do + Enum.flat_map(buffer.tabs, fn tab -> + case :ets.lookup(tab, :index) do + [{:index, index}] -> + records = :ets.select(tab, select_spec(index)) + f.(records) + :ets.select_delete(tab, delete_spec(index)) + records + + [] -> + [] + end + end) + end + + defp delete_spec(index) do + match_spec(index, true) + end + + def select_spec(index) do + # If we're selecting stuff we need to get the second element + match_spec(index, :"$2") + end + + defp match_spec(index, item) do + # Get integers less than the current index + [{{:"$1", :"$2"}, [{:andalso, {:is_integer, :"$1"}, {:"=<", :"$1", index}}], [item]}] + end +end diff --git a/lib/spandex_datadog/api_server/client.ex b/lib/spandex_datadog/api_server/client.ex new file mode 100644 index 0000000..820713c --- /dev/null +++ b/lib/spandex_datadog/api_server/client.ex @@ -0,0 +1,211 @@ +defmodule SpandexDatadog.ApiServer.Client do + @moduledoc false + # This client module is used to interact with datadog. + + # Same as HTTPoison.headers + @type headers :: [{atom, binary}] | [{binary, binary}] | %{binary => binary} | any + + @headers [{"Content-Type", "application/msgpack"}] + + alias Spandex.Span + alias Spandex.Trace + + require Logger + + # Accepts the client configuration and a list of traces. + @spec send(client :: module(), collector_url :: String.t(), [Trace.t()], Keyword.t()) :: :ok + def send(client, collector_url, traces, opts) do + headers = @headers ++ [{"X-Datadog-Trace-Count", length(traces)}] + + body = + traces + |> Enum.map(&format/1) + |> encode() + + response = client.put(collector_url, body, headers) + + if opts[:verbose?] do + Logger.debug(fn -> "Trace response: #{inspect(response)}" end) + end + + :ok + end + + @deprecated "Please use format/3 instead" + @spec format(Trace.t()) :: map() + def format(%Trace{spans: spans, priority: priority, baggage: baggage}) do + Enum.map(spans, fn span -> format(span, priority, baggage) end) + end + + @deprecated "Please use format/3 instead" + @spec format(Span.t()) :: map() + def format(%Span{} = span), do: format(span, 1, []) + + @spec format(Span.t(), integer(), Keyword.t()) :: map() + def format(%Span{} = span, priority, _baggage) do + %{ + trace_id: span.trace_id, + span_id: span.id, + name: span.name, + start: span.start, + duration: (span.completion_time || SpandexDatadog.Adapter.now()) - span.start, + parent_id: span.parent_id, + error: error(span.error), + resource: span.resource || span.name, + service: span.service, + type: span.type, + meta: meta(span), + metrics: + metrics(span, %{ + _sampling_priority_v1: priority + }) + } + end + + @spec meta(Span.t()) :: map + defp meta(span) do + %{} + |> add_datadog_meta(span) + |> add_error_data(span) + |> add_http_data(span) + |> add_sql_data(span) + |> add_tags(span) + |> Enum.reject(fn {_k, v} -> is_nil(v) end) + |> Enum.into(%{}) + end + + @spec add_datadog_meta(map, Span.t()) :: map + defp add_datadog_meta(meta, %Span{env: nil}), do: meta + + defp add_datadog_meta(meta, %Span{env: env}) do + Map.put(meta, :env, env) + end + + @spec add_error_data(map, Span.t()) :: map + defp add_error_data(meta, %{error: nil}), do: meta + + defp add_error_data(meta, %{error: error}) do + meta + |> add_error_type(error[:exception]) + |> add_error_message(error[:exception]) + |> add_error_stacktrace(error[:stacktrace]) + end + + @spec add_error_type(map, Exception.t() | nil) :: map + defp add_error_type(meta, nil), do: meta + defp add_error_type(meta, exception), do: Map.put(meta, "error.type", exception.__struct__) + + @spec add_error_message(map, Exception.t() | nil) :: map + defp add_error_message(meta, nil), do: meta + + defp add_error_message(meta, exception), + do: Map.put(meta, "error.msg", Exception.message(exception)) + + @spec add_error_stacktrace(map, list | nil) :: map + defp add_error_stacktrace(meta, nil), do: meta + + defp add_error_stacktrace(meta, stacktrace), + do: Map.put(meta, "error.stack", Exception.format_stacktrace(stacktrace)) + + @spec add_http_data(map, Span.t()) :: map + defp add_http_data(meta, %{http: nil}), do: meta + + defp add_http_data(meta, %{http: http}) do + status_code = + if http[:status_code] do + to_string(http[:status_code]) + end + + meta + |> Map.put("http.url", http[:url]) + |> Map.put("http.status_code", status_code) + |> Map.put("http.method", http[:method]) + end + + @spec add_sql_data(map, Span.t()) :: map + defp add_sql_data(meta, %{sql_query: nil}), do: meta + + defp add_sql_data(meta, %{sql_query: sql}) do + meta + |> Map.put("sql.query", sql[:query]) + |> Map.put("sql.rows", sql[:rows]) + |> Map.put("sql.db", sql[:db]) + end + + @spec add_tags(map, Span.t()) :: map + defp add_tags(meta, %{tags: nil}), do: meta + + defp add_tags(meta, %{tags: tags}) do + tags = tags |> Keyword.delete(:analytics_event) + + Map.merge( + meta, + tags + |> Enum.map(fn {k, v} -> {k, term_to_string(v)} end) + |> Enum.into(%{}) + ) + end + + @spec metrics(Span.t(), map) :: map + defp metrics(span, initial_value = %{}) do + initial_value + |> add_metrics(span) + |> Enum.reject(fn {_k, v} -> is_nil(v) end) + |> Enum.into(%{}) + end + + @spec add_metrics(map, Span.t()) :: map + defp add_metrics(metrics, %{tags: nil}), do: metrics + + defp add_metrics(metrics, %{tags: tags}) do + with analytics_event <- tags |> Keyword.get(:analytics_event), + true <- analytics_event != nil do + Map.merge( + metrics, + %{"_dd1.sr.eausr" => 1} + ) + else + _ -> + metrics + end + end + + @spec error(nil | Keyword.t()) :: integer + defp error(nil), do: 0 + + defp error(keyword) do + if Enum.any?(keyword, fn {_, v} -> not is_nil(v) end) do + 1 + else + 0 + end + end + + @spec encode(data :: term) :: iodata | no_return + defp encode(data), + do: data |> deep_remove_nils() |> Msgpax.pack!(data) + + @spec deep_remove_nils(term) :: term + defp deep_remove_nils(term) when is_map(term) do + term + |> Enum.reject(fn {_k, v} -> is_nil(v) end) + |> Enum.map(fn {k, v} -> {k, deep_remove_nils(v)} end) + |> Enum.into(%{}) + end + + defp deep_remove_nils(term) when is_list(term) do + if Keyword.keyword?(term) do + term + |> Enum.reject(fn {_k, v} -> is_nil(v) end) + |> Enum.map(fn {k, v} -> {k, deep_remove_nils(v)} end) + else + Enum.map(term, &deep_remove_nils/1) + end + end + + defp deep_remove_nils(term), do: term + + defp term_to_string(term) when is_binary(term), do: term + defp term_to_string(term) when is_atom(term), do: term + defp term_to_string(term), do: inspect(term) +end diff --git a/lib/spandex_datadog/api_server/reporter.ex b/lib/spandex_datadog/api_server/reporter.ex new file mode 100644 index 0000000..f45ef93 --- /dev/null +++ b/lib/spandex_datadog/api_server/reporter.ex @@ -0,0 +1,91 @@ +defmodule SpandexDatadog.ApiServer.Reporter do + @moduledoc false + # This client module periodically grabs the information from the various buffers + # and sends it to datadog. + use GenServer + + alias SpandexDatadog.ApiServer.Buffer + alias SpandexDatadog.ApiServer.Client + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @doc false + def flush do + GenServer.call(__MODULE__, :flush) + end + + @doc false + def enable_verbose_logging do + GenServer.call(__MODULE__, {:verbose_logging, true}) + end + + @doc false + def disable_verbose_logging do + GenServer.call(__MODULE__, {:verbose_logging, false}) + end + + @doc false + def set_http_client(mod) do + GenServer.call(__MODULE__, {:set_http, mod}) + end + + def init(opts) do + host = opts[:host] + port = opts[:port] + state = %{ + buffer: opts[:buffer], + collector_url: "#{host}:#{port}/v0.3/traces", + verbose?: opts[:verbose?], + http: opts[:http], + flush_period: opts[:flush_period] || 1_000, + } + + schedule(state.flush_period) + + {:ok, state} + end + + # Only used for development and testing purposes + def handle_call(:flush, _, state) do + flush(state) + {:reply, :ok, state} + end + + def handle_call({:verbose_logging, bool}, _, state) do + {:reply, :ok, %{state | verbose?: bool}} + end + + def handle_call({:set_http, mod}, _, state) do + {:reply, :ok, %{state | http: mod}} + end + + def handle_info(:flush, state) do + flush(state) + schedule(state.flush_period) + + {:noreply, state} + end + + defp flush(state) do + :telemetry.span([:spandex_datadog, :client, :flush], %{}, fn -> + Buffer.flush_latest(state.buffer, fn buffer -> + if buffer == [] do + :ok + else + Client.send(state.http, state.collector_url, buffer, verbose?: state.verbose?) + end + end) + + {:ok, %{}} + end) + end + + defp schedule(timeout) do + # next time = min(max(min_time * 2, 1_000), 1_000) + # If our minimum requests are taking way longer than 1 second than don't try + # schedule another + Process.send_after(self(), :flush, timeout) + end +end diff --git a/test/api_server_test.exs b/test/api_server_test.exs index c2dd6cb..77e1768 100644 --- a/test/api_server_test.exs +++ b/test/api_server_test.exs @@ -9,17 +9,22 @@ defmodule SpandexDatadog.ApiServerTest do } alias SpandexDatadog.ApiServer + alias SpandexDatadog.ApiServer.Reporter + + @pid_key {:test_module, :test_pid} defmodule TestOkApiServer do def put(url, body, headers) do - send(self(), {:put_datadog_spans, body |> Msgpax.unpack!() |> hd(), url, headers}) + test_pid = :persistent_term.get({:test_module, :test_pid}) + send(test_pid, {:put_datadog_spans, body |> Msgpax.unpack!() |> hd(), url, headers}) {:ok, %HTTPoison.Response{status_code: 200}} end end defmodule TestErrorApiServer do def put(url, body, headers) do - send(self(), {:put_datadog_spans, body |> Msgpax.unpack!() |> hd(), url, headers}) + test_pid = :persistent_term.get({:test_module, :test_pid}) + send(test_pid, {:put_datadog_spans, body |> Msgpax.unpack!() |> hd(), url, headers}) {:error, %HTTPoison.Error{id: :foo, reason: :bar}} end end @@ -38,7 +43,6 @@ defmodule SpandexDatadog.ApiServerTest do end setup_all do - {:ok, agent_pid} = Agent.start_link(fn -> 0 end) trace_id = 4_743_028_846_331_200_905 {:ok, span_1} = @@ -78,25 +82,31 @@ defmodule SpandexDatadog.ApiServerTest do trace = %Trace{spans: [span_1, span_2, span_3]} + start_supervised({ApiServer, [ + host: "localhost", + port: "8126", + http: TestOkApiServer, + verbose?: false + ]}) + { :ok, [ trace: trace, - url: "localhost:8126/v0.3/traces", - state: %ApiServer.State{ - asynchronous_send?: false, - host: "localhost", - port: "8126", - http: TestOkApiServer, - verbose?: false, - waiting_traces: [], - batch_size: 1, - agent_pid: agent_pid - } + url: "localhost:8126/v0.3/traces" ] } end + setup do + :persistent_term.put(@pid_key, self()) + Reporter.set_http_client(TestOkApiServer) + Reporter.disable_verbose_logging() + Reporter.flush() + + :ok + end + describe "ApiServer.send_trace/2" do test "executes telemetry on success", %{trace: trace} do :telemetry.attach_many( @@ -124,43 +134,14 @@ defmodule SpandexDatadog.ApiServerTest do refute Process.get([:spandex_datadog, :send_trace, :exception]) end - - test "executes telemetry on exception", %{trace: trace} do - :telemetry.attach_many( - "log-response-handler", - [ - [:spandex_datadog, :send_trace, :start], - [:spandex_datadog, :send_trace, :stop], - [:spandex_datadog, :send_trace, :exception] - ], - &TelemetryRecorderPDict.handle_event/4, - nil - ) - - ApiServer.start_link(http: TestSlowApiServer, batch_size: 0, sync_threshold: 0) - - catch_exit(ApiServer.send_trace(trace, timeout: 1)) - - {start_measurements, start_metadata} = Process.get([:spandex_datadog, :send_trace, :start]) - assert start_measurements[:system_time] - assert trace == start_metadata[:trace] - - refute Process.get([:spandex_datadog, :send_trace, :stop]) - - {exception_measurements, exception_metadata} = Process.get([:spandex_datadog, :send_trace, :exception]) - assert exception_measurements[:duration] - assert trace == start_metadata[:trace] - assert :exit == exception_metadata[:kind] - assert nil == exception_metadata[:error] - assert is_list(exception_metadata[:stacktrace]) - end end - describe "ApiServer.handle_call/3 - :send_trace" do - test "doesn't log anything when verbose?: false", %{trace: trace, state: state, url: url} do + describe "flushing traces" do + test "doesn't log anything when verbose?: false", %{trace: trace, url: url} do log = capture_log(fn -> - ApiServer.handle_call({:send_trace, trace}, self(), state) + ApiServer.send_trace(trace) + Reporter.flush() end) assert log == "" @@ -230,25 +211,18 @@ defmodule SpandexDatadog.ApiServerTest do assert_received {:put_datadog_spans, ^formatted, ^url, ^headers} end - test "doesn't care about the response result", %{trace: trace, state: state, url: url} do - state = - state - |> Map.put(:verbose?, true) - |> Map.put(:http, TestErrorApiServer) + test "doesn't care about the response result", %{trace: trace, url: url} do + Reporter.set_http_client(TestErrorApiServer) + Reporter.enable_verbose_logging() - [enqueue, processing, received_spans, response] = + [response] = capture_log(fn -> - {:reply, :ok, _} = ApiServer.handle_call({:send_trace, trace}, self(), state) + ApiServer.send_trace(trace) + Reporter.flush() end) |> String.split("\n") |> Enum.reject(fn s -> s == "" end) - assert enqueue =~ ~r/Adding trace to stack with 3 spans/ - - assert processing =~ ~r/Sending 1 traces, 3 spans/ - - assert received_spans =~ ~r/Trace: \[%Spandex.Trace{/ - formatted = [ %{ "duration" => 100_000, From a7ed3459e1d3cec7af0f3b88607fe69a807a822f Mon Sep 17 00:00:00 2001 From: Chris Keathley Date: Mon, 25 Jan 2021 16:20:08 -0500 Subject: [PATCH 2/4] Fix compiler warnings --- lib/spandex_datadog/api_server.ex | 2 ++ lib/spandex_datadog/api_server/client.ex | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/spandex_datadog/api_server.ex b/lib/spandex_datadog/api_server.ex index e680760..195f2d7 100644 --- a/lib/spandex_datadog/api_server.ex +++ b/lib/spandex_datadog/api_server.ex @@ -96,6 +96,8 @@ defmodule SpandexDatadog.ApiServer do # Leaving these here for api versioning purposes. But this logic has # been moved to the client module. @doc false + @deprecated "Please use format/3 instead" + @spec format(Trace.t() | Span.t()) :: map() def format(trace), do: Client.format(trace) @doc false diff --git a/lib/spandex_datadog/api_server/client.ex b/lib/spandex_datadog/api_server/client.ex index 820713c..8aeefa8 100644 --- a/lib/spandex_datadog/api_server/client.ex +++ b/lib/spandex_datadog/api_server/client.ex @@ -31,13 +31,11 @@ defmodule SpandexDatadog.ApiServer.Client do :ok end - @deprecated "Please use format/3 instead" @spec format(Trace.t()) :: map() def format(%Trace{spans: spans, priority: priority, baggage: baggage}) do Enum.map(spans, fn span -> format(span, priority, baggage) end) end - @deprecated "Please use format/3 instead" @spec format(Span.t()) :: map() def format(%Span{} = span), do: format(span, 1, []) From f87749427bcc7ebbc070760e5b349d2e71492e57 Mon Sep 17 00:00:00 2001 From: Chris Keathley Date: Mon, 25 Jan 2021 17:20:59 -0500 Subject: [PATCH 3/4] Add max buffer size, run reports in a task, continue to chunk batch sizes --- lib/spandex_datadog/api_server.ex | 12 +++-- lib/spandex_datadog/api_server/buffer.ex | 63 +++++++++++++++------- lib/spandex_datadog/api_server/reporter.ex | 40 +++++++++----- 3 files changed, 78 insertions(+), 37 deletions(-) diff --git a/lib/spandex_datadog/api_server.ex b/lib/spandex_datadog/api_server.ex index 195f2d7..77e1269 100644 --- a/lib/spandex_datadog/api_server.ex +++ b/lib/spandex_datadog/api_server.ex @@ -23,14 +23,16 @@ defmodule SpandexDatadog.ApiServer do http: :atom, batch_size: :integer, sync_threshold: :integer, + max_buffer_size: :integer, api_adapter: :atom ], defaults: [ host: "localhost", port: 8126, verbose?: false, - batch_size: 10, + batch_size: 50, sync_threshold: 20, + max_buffer_size: 5_000, api_adapter: SpandexDatadog.ApiServer ], required: [:http], @@ -38,6 +40,7 @@ defmodule SpandexDatadog.ApiServer do verbose?: "Only to be used for debugging: All finished traces will be logged", host: "The host the agent can be reached at", port: "The port to use when sending traces to the agent", + max_buffer_size: "The maximum number of traces that will be buffered.", batch_size: "The number of traces that should be sent in a single batch", sync_threshold: "The maximum number of processes that may be sending traces at any one time. This adds backpressure", @@ -60,14 +63,17 @@ defmodule SpandexDatadog.ApiServer do end def init(opts) do - buffer = Buffer.new() + task_sup = __MODULE__.TaskSupervisor + buffer = Buffer.new(opts) reporter_opts = opts |> Map.new() - |> Map.take([:http, :verbose?, :host, :port]) + |> Map.take([:http, :verbose?, :host, :port, :batch_size]) |> Map.put(:buffer, buffer) + |> Map.put(:task_sup, task_sup) children = [ + {Task.Supervisor, name: task_sup}, {Reporter, reporter_opts}, ] diff --git a/lib/spandex_datadog/api_server/buffer.ex b/lib/spandex_datadog/api_server/buffer.ex index 0788c03..fe1d851 100644 --- a/lib/spandex_datadog/api_server/buffer.ex +++ b/lib/spandex_datadog/api_server/buffer.ex @@ -6,38 +6,57 @@ defmodule SpandexDatadog.ApiServer.Buffer do # id. This helps reduce contention on each ets table since, in theory, only one process should be writing to a table at a time. # We periodically flush all spans to datadog in the background. - defstruct tabs: [] + defstruct tabs: [], counters: nil + + @config_key {__MODULE__, :config} # Builds a bunch of ets tables, 1 per scheduler and returns them in a struct - def new() do + def new(opts) do + schedulers = System.schedulers() + counters = :atomics.new(schedulers, [signed: false]) + config = %{ + max_buffer_size: opts[:max_buffer_size] || 5_000, + counters: counters, + } + :persistent_term.put(@config_key, config) + # 1 index erlang ftw - tabs = for s <- 1..System.schedulers() do - :ets.new(:"#{__MODULE__}-#{s}", [:named_table, :set, :public, {:write_concurrency, true}]) + tabs = for s <- 1..schedulers do + :ets.new(tab_name(s), [:named_table, :set, :public, {:write_concurrency, true}]) end - %__MODULE__{tabs: tabs} + %__MODULE__{tabs: tabs, counters: counters} end def add_trace(trace) do - buffer = :"#{__MODULE__}-#{:erlang.system_info(:scheduler_id)}" - index = :ets.update_counter(buffer, :index, 1, {:index, 0}) - :ets.insert(buffer, {index, trace}) + config = :persistent_term.get(@config_key) + id = :erlang.system_info(:scheduler_id) + buffer = :"#{__MODULE__}-#{id}" + index = :atomics.add_get(config.counters, id, 1) + + # If we're at the buffer size we drop the new trace on the ground. + # TODO - This should really be first in last out since we care more about + # the current data than about the old data. + if index > config.max_buffer_size do + # Remove the increment that we just made. + :atomics.sub(config.counters, id, 1) + else + :ets.insert(buffer, {index, trace}) + end end # Returns the latest messages and then deletes them from the buffer def flush_latest(buffer, f) do - Enum.flat_map(buffer.tabs, fn tab -> - case :ets.lookup(tab, :index) do - [{:index, index}] -> - records = :ets.select(tab, select_spec(index)) - f.(records) - :ets.select_delete(tab, delete_spec(index)) - records - - [] -> - [] - end - end) + for s <- 1..System.schedulers() do + # Get current latest index for this table and reset the count to 0 + index = :atomics.exchange(buffer.counters, s, 0) + # Its possible that we interleave with a different process that is adding + # additional traces at this point. That means that we're going to possibly + # allow the caller to overwrite old data (since the index is reset). + # This is OK for our purposes. + records = :ets.select(tab_name(s), select_spec(index)) + f.(records) + end end defp delete_spec(index) do @@ -53,4 +72,8 @@ defmodule SpandexDatadog.ApiServer.Buffer do # Get integers less than the current index [{{:"$1", :"$2"}, [{:andalso, {:is_integer, :"$1"}, {:"=<", :"$1", index}}], [item]}] end + + defp tab_name(index) do + :"#{__MODULE__}-#{index}" + end end diff --git a/lib/spandex_datadog/api_server/reporter.ex b/lib/spandex_datadog/api_server/reporter.ex index f45ef93..8761642 100644 --- a/lib/spandex_datadog/api_server/reporter.ex +++ b/lib/spandex_datadog/api_server/reporter.ex @@ -34,13 +34,11 @@ defmodule SpandexDatadog.ApiServer.Reporter do def init(opts) do host = opts[:host] port = opts[:port] - state = %{ - buffer: opts[:buffer], - collector_url: "#{host}:#{port}/v0.3/traces", - verbose?: opts[:verbose?], - http: opts[:http], - flush_period: opts[:flush_period] || 1_000, - } + collector_url = "#{host}:#{port}/v0.3/traces" + state = + opts + |> update_in([:flush_period], & &1 || 1_000) + |> put_in([:collector_url], collector_url) schedule(state.flush_period) @@ -49,7 +47,10 @@ defmodule SpandexDatadog.ApiServer.Reporter do # Only used for development and testing purposes def handle_call(:flush, _, state) do - flush(state) + # this little dance is hella weird, but it ensures that we can call this + # with backpressure in tests and we don't need to duplicate code in lots of + # places. + handle_info(:flush, state) {:reply, :ok, state} end @@ -62,7 +63,14 @@ defmodule SpandexDatadog.ApiServer.Reporter do end def handle_info(:flush, state) do - flush(state) + # Run this function in a task to avoid bloating this processes binary memory + # and generally optimize GC. We're not really protecting ourselves from failure + # here because if the task exits, we're going to exit as well. But that's OK + # and is probably what we want. + state.task_sup + |> Task.Supervisor.async(fn -> flush(state) end) + |> Task.await() + schedule(state.flush_period) {:noreply, state} @@ -70,12 +78,16 @@ defmodule SpandexDatadog.ApiServer.Reporter do defp flush(state) do :telemetry.span([:spandex_datadog, :client, :flush], %{}, fn -> - Buffer.flush_latest(state.buffer, fn buffer -> - if buffer == [] do + Buffer.flush_latest(state.buffer, fn + [] -> :ok - else - Client.send(state.http, state.collector_url, buffer, verbose?: state.verbose?) - end + + buffer -> + buffer + |> Enum.chunk_every(state.batch_size) + |> Enum.each(fn batch -> + Client.send(state.http, state.collector_url, batch, verbose?: state.verbose?) + end) end) {:ok, %{}} From 37983783c81246c6e65281c8dd123aca918c72c9 Mon Sep 17 00:00:00 2001 From: Chris Keathley Date: Mon, 25 Jan 2021 18:42:33 -0500 Subject: [PATCH 4/4] Remove dead code --- lib/spandex_datadog/api_server/buffer.ex | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/spandex_datadog/api_server/buffer.ex b/lib/spandex_datadog/api_server/buffer.ex index fe1d851..d512ac3 100644 --- a/lib/spandex_datadog/api_server/buffer.ex +++ b/lib/spandex_datadog/api_server/buffer.ex @@ -59,10 +59,6 @@ defmodule SpandexDatadog.ApiServer.Buffer do end end - defp delete_spec(index) do - match_spec(index, true) - end - def select_spec(index) do # If we're selecting stuff we need to get the second element match_spec(index, :"$2")