Skip to content

Commit

Permalink
sync-service: Add cpu and memory usage info (#2163)
Browse files Browse the repository at this point in the history
and tidy up telemetry config a little and make the call-home url
configurable

---------

Co-authored-by: Oleksii Sholik <[email protected]>
  • Loading branch information
magnetised and alco authored Dec 17, 2024
1 parent 1b8dce0 commit 0565b46
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 52 deletions.
35 changes: 24 additions & 11 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import Config
import Dotenvy

alias Electric.ConfigParser

config :elixir, :time_zone_database, Tz.TimeZoneDatabase

if config_env() in [:dev, :test] do
source!([".env.#{config_env()}", ".env.#{config_env()}.local", System.get_env()])
end

config :logger,
level: env!("ELECTRIC_LOG_LEVEL", &Electric.ConfigParser.parse_log_level!/1, :info)
config :logger, level: env!("ELECTRIC_LOG_LEVEL", &ConfigParser.parse_log_level!/1, :info)

config :logger, :default_formatter,
# Doubled line breaks serve as long message boundaries
Expand Down Expand Up @@ -46,8 +47,6 @@ service_name = env!("ELECTRIC_SERVICE_NAME", :string, "electric")
instance_id = env!("ELECTRIC_INSTANCE_ID", :string, Electric.Utils.uuid4())
version = Electric.version()

config :telemetry_poller, :default, period: 500

config :opentelemetry,
resource_detectors: [:otel_resource_env_var, :otel_resource_app_env],
resource: %{service: %{name: service_name, version: version}, instance: %{id: instance_id}}
Expand Down Expand Up @@ -107,13 +106,11 @@ config :opentelemetry,
local_parent_not_sampled: :always_off
}}

database_url = env!("DATABASE_URL", :string!)
database_url_config = env!("DATABASE_URL", &ConfigParser.parse_postgresql_uri!/1)

database_ipv6_config =
env!("ELECTRIC_DATABASE_USE_IPV6", :boolean, false)

{:ok, database_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_url)

connection_opts = database_url_config ++ [ipv6: database_ipv6_config]

config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts)
Expand Down Expand Up @@ -194,6 +191,21 @@ replication_stream_id =
storage = {storage_mod, storage_opts}

prometheus_port = env!("ELECTRIC_PROMETHEUS_PORT", :integer, nil)

call_home_telemetry_url =
env!(
"ELECTRIC_TELEMETRY_URL",
&ConfigParser.parse_telemetry_url!/1,
"https://checkpoint.electric-sql.com"
)

system_metrics_poll_interval =
env!(
"ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL",
&ConfigParser.parse_human_readable_time!/1,
:timer.seconds(5)
)

# The provided database id is relevant if you had used v0.8 and want to keep the storage
# instead of having hanging files. We use a provided value as stack id, but nothing else.
provided_database_id = env!("ELECTRIC_DATABASE_ID", :string, "single_stack")
Expand All @@ -206,14 +218,15 @@ config :electric,
chunk_bytes_threshold: chunk_bytes_threshold,
# Used in telemetry
instance_id: instance_id,
call_home_telemetry?: env!("ELECTRIC_USAGE_REPORTING", :boolean, config_env() == :prod),
telemetry_url: call_home_telemetry_url,
system_metrics_poll_interval: system_metrics_poll_interval,
telemetry_statsd_host: statsd_host,
prometheus_port: prometheus_port,
db_pool_size: env!("ELECTRIC_DB_POOL_SIZE", :integer, 20),
replication_stream_id: replication_stream_id,
replication_slot_temporary?: env!("CLEANUP_REPLICATION_SLOTS_ON_SHUTDOWN", :boolean, false),
service_port: env!("ELECTRIC_PORT", :integer, 3000),
prometheus_port: prometheus_port,
storage: storage,
persistent_kv: persistent_kv,
listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false),
call_home_telemetry: env!("ELECTRIC_USAGE_REPORTING", :boolean, config_env() == :prod),
telemetry_url: "https://checkpoint.electric-sql.com"
listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false)
47 changes: 45 additions & 2 deletions packages/sync-service/lib/electric/config_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,49 @@ defmodule Electric.ConfigParser do

def parse_log_level!(str) when str in @log_levels, do: String.to_existing_atom(str)

def parse_log_level!(_str),
do: raise(Dotenvy.Error, message: "Must be one of #{inspect(@public_log_levels)}")
def parse_log_level!(_str) do
raise Dotenvy.Error, message: "Must be one of #{inspect(@public_log_levels)}"
end

@spec parse_telemetry_url(binary) :: {:ok, binary} | {:error, binary}
def parse_telemetry_url(str) do
case URI.new(str) do
{:ok, %URI{scheme: scheme}} when scheme in ["http", "https"] -> {:ok, str}
_ -> {:error, "has invalid URL format: \"#{str}\""}
end
end

def parse_telemetry_url!(str) do
case parse_telemetry_url(str) do
{:ok, url} -> url
{:error, message} -> raise Dotenvy.Error, message: message
end
end

@time_units ~w[ms msec s sec m min]

@spec parse_human_readable_time(binary | nil) :: {:ok, pos_integer} | {:error, binary}

def parse_human_readable_time(str) do
with {num, suffix} <- Float.parse(str),
true <- num > 0,
suffix = String.trim(suffix),
true <- suffix == "" or suffix in @time_units do
{:ok, trunc(num * time_multiplier(suffix))}
else
_ -> {:error, "has invalid value: #{inspect(str)}. Must be one of #{inspect(@time_units)}"}
end
end

defp time_multiplier(""), do: 1
defp time_multiplier(millisecond) when millisecond in ["ms", "msec"], do: 1
defp time_multiplier(second) when second in ["s", "sec"], do: 1000
defp time_multiplier(minute) when minute in ["m", "min"], do: 1000 * 60

def parse_human_readable_time!(str) do
case parse_human_readable_time(str) do
{:ok, result} -> result
{:error, message} -> raise Dotenvy.Error, message: message
end
end
end
7 changes: 5 additions & 2 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -702,16 +702,19 @@ defmodule Electric.Connection.Manager do
defp query_and_report_retained_wal_size(pool, slot_name, stack_id) do
query = """
SELECT
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::int8
FROM
pg_replication_slots
WHERE
slot_name = $1
"""

case Postgrex.query(pool, query, [slot_name]) do
# The query above can return `-1` which I'm assuming means "up-to-date".
# This is a confusing stat if we're measuring in bytes, so normalise to
# [0, :infinity)
{:ok, %Postgrex.Result{rows: [[wal_size]]}} ->
:telemetry.execute([:electric, :postgres, :replication], %{wal_size: wal_size}, %{
:telemetry.execute([:electric, :postgres, :replication], %{wal_size: max(0, wal_size)}, %{
stack_id: stack_id
})

Expand Down
179 changes: 146 additions & 33 deletions packages/sync-service/lib/electric/telemetry.ex
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
defmodule Electric.Telemetry do
use Supervisor

import Telemetry.Metrics

def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def init(opts) do
children = [
{:telemetry_poller, measurements: periodic_measurements(opts), period: 2_000}
]
system_metrics_poll_interval = Application.get_env(:electric, :system_metrics_poll_interval)

children
|> add_statsd_reporter(Application.fetch_env!(:electric, :telemetry_statsd_host))
|> add_prometheus_reporter(Application.fetch_env!(:electric, :prometheus_port))
|> add_call_home_reporter(Application.fetch_env!(:electric, :call_home_telemetry))
statsd_host = Application.fetch_env!(:electric, :telemetry_statsd_host)
prometheus? = not is_nil(Application.fetch_env!(:electric, :prometheus_port))
call_home_telemetry? = Application.fetch_env!(:electric, :call_home_telemetry?)

[
{:telemetry_poller,
measurements: periodic_measurements(opts),
period: system_metrics_poll_interval,
init_delay: :timer.seconds(5)},
statsd_reporter_child_spec(statsd_host),
prometheus_reporter_child_spec(prometheus?),
call_home_reporter_child_spec(call_home_telemetry?)
]
|> Enum.reject(&is_nil/1)
|> Supervisor.init(strategy: :one_for_one)
end

defp add_call_home_reporter(children, false), do: children

defp add_call_home_reporter(children, true) do
children ++
[
{Electric.Telemetry.CallHomeReporter,
static_info: static_info(),
metrics: call_home_metrics(),
first_report_in: {2, :minute},
reporting_period: {30, :minute},
reporter_fn: &Electric.Telemetry.CallHomeReporter.report_home/1}
]
defp call_home_reporter_child_spec(false), do: nil

defp call_home_reporter_child_spec(true) do
{Electric.Telemetry.CallHomeReporter,
static_info: static_info(),
metrics: call_home_metrics(),
first_report_in: {2, :minute},
reporting_period: {30, :minute},
reporter_fn: &Electric.Telemetry.CallHomeReporter.report_home/1}
end

def static_info() do
Expand Down Expand Up @@ -69,6 +75,19 @@ defmodule Electric.Telemetry do
run_queue_cpu: summary("vm.total_run_queue_lengths.cpu"),
run_queue_io: summary("vm.total_run_queue_lengths.io")
],
system: [
load_avg1: last_value("system.load_percent.avg1"),
load_avg5: last_value("system.load_percent.avg5"),
load_avg15: last_value("system.load_percent.avg15"),
memory_free: last_value("system.memory.free_memory"),
memory_used: last_value("system.memory.used_memory"),
memory_free_percent: last_value("system.memory_percent.free_memory"),
memory_used_percent: last_value("system.memory_percent.used_memory"),
swap_free: last_value("system.swap.free"),
swap_used: last_value("system.swap.used"),
swap_free_percent: last_value("system.swap_percent.free"),
swap_used_percent: last_value("system.swap_percent.used")
],
usage: [
inbound_bytes:
sum("electric.postgres.replication.transaction_received.bytes", unit: :byte),
Expand Down Expand Up @@ -105,23 +124,20 @@ defmodule Electric.Telemetry do
]
end

defp add_statsd_reporter(children, nil), do: children
defp statsd_reporter_child_spec(nil), do: nil

defp add_statsd_reporter(children, host) do
children ++
[
{TelemetryMetricsStatsd,
host: host,
formatter: :datadog,
global_tags: [instance_id: Electric.instance_id()],
metrics: statsd_metrics()}
]
defp statsd_reporter_child_spec(host) do
{TelemetryMetricsStatsd,
host: host,
formatter: :datadog,
global_tags: [instance_id: Electric.instance_id()],
metrics: statsd_metrics()}
end

defp add_prometheus_reporter(children, nil), do: children
defp prometheus_reporter_child_spec(false), do: nil

defp add_prometheus_reporter(children, _) do
children ++ [{TelemetryMetricsPrometheus.Core, metrics: prometheus_metrics()}]
defp prometheus_reporter_child_spec(true) do
{TelemetryMetricsPrometheus.Core, metrics: prometheus_metrics()}
end

defp statsd_metrics() do
Expand All @@ -147,7 +163,14 @@ defmodule Electric.Telemetry do
summary("electric.storage.make_new_snapshot.stop.duration", unit: {:native, :millisecond}),
summary("electric.querying.stream_initial_data.stop.duration",
unit: {:native, :millisecond}
)
),
last_value("system.load_percent.avg1"),
last_value("system.load_percent.avg5"),
last_value("system.load_percent.avg15"),
last_value("system.memory.free_memory"),
last_value("system.memory.used_memory"),
last_value("system.swap.free"),
last_value("system.swap.used")
]
|> Enum.map(&%{&1 | tags: [:instance_id | &1.tags]})
end
Expand Down Expand Up @@ -185,6 +208,8 @@ defmodule Electric.Telemetry do
{__MODULE__, :uptime_event, []},
{__MODULE__, :count_shapes, [stack_id]},
{__MODULE__, :get_total_disk_usage, [opts]},
{__MODULE__, :get_system_load_average, [opts]},
{__MODULE__, :get_system_memory_usage, [opts]},
{Electric.Connection.Manager, :report_retained_wal_size,
[Electric.Connection.Manager.name(stack_id)]}
]
Expand Down Expand Up @@ -217,4 +242,92 @@ defmodule Electric.Telemetry do
:exit, {:noproc, _} ->
:ok
end

def get_system_load_average(opts) do
cores = :erlang.system_info(:logical_processors)

# > The load values are proportional to how long time a runnable Unix
# > process has to spend in the run queue before it is scheduled.
# > Accordingly, higher values mean more system load. The returned value
# > divided by 256 produces the figure displayed by rup and top.
#
# I'm going one step further and dividing by the number of CPUs so in a 4
# core system, a load of 4.0 (in top) will show as 100%.
# Since load can go above num cores, we can to 200%, 300% but
# I think this makes sense.
#
# Certainly the formula in the erlang docs:
#
# > the following simple mathematical transformation can produce the load
# > value as a percentage:
# >
# > PercentLoad = 100 * (1 - D/(D + Load))
# >
# > D determines which load value should be associated with which
# > percentage. Choosing D = 50 means that 128 is 60% load, 256 is 80%, 512
# > is 90%, and so on.
#
# Makes little sense. Setting `D` as they say and plugging in a avg1 value
# of 128 does not give 60% so I'm not sure how to square what they say with
# the numbers...
#
# e.g. my machine currently has a cpu util (:cpu_sup.util()) of 4% and an
# avg1() of 550 ish across 24 cores (so doing very little) but that formula
# would give a `PercentLoad` of ~92%.
#
# My version would give value of 550 / 256 / 24 = 9%
[:avg1, :avg5, :avg15]
|> Map.new(fn probe ->
{probe, 100 * (apply(:cpu_sup, probe, []) / 256 / cores)}
end)
|> then(
&:telemetry.execute([:system, :load_percent], &1, %{
stack_id: opts[:stack_id]
})
)
end

def get_system_memory_usage(opts) do
{total, stats} =
:memsup.get_system_memory_data()
|> Keyword.delete(:total_memory)
|> Keyword.pop(:system_total_memory)

mem_stats =
Keyword.take(stats, [:free_memory, :available_memory, :buffered_memory, :cached_memory])

{total_swap, stats} = Keyword.pop(stats, :total_swap)

used_memory = total - Keyword.fetch!(mem_stats, :free_memory)
resident_memory = total - Keyword.fetch!(mem_stats, :available_memory)

memory_stats =
mem_stats
|> Map.new()
|> Map.put(:used_memory, used_memory)
|> Map.put(:resident_memory, resident_memory)

memory_percent_stats =
mem_stats
|> Map.new(fn {k, v} -> {k, 100 * v / total} end)
|> Map.put(:used_memory, 100 * used_memory / total)
|> Map.put(:resident_memory, 100 * resident_memory / total)

:telemetry.execute([:system, :memory], memory_stats, %{
stack_id: opts[:stack_id]
})

:telemetry.execute([:system, :memory_percent], memory_percent_stats, %{
stack_id: opts[:stack_id]
})

free_swap = Keyword.get(stats, :free_swap, 0)
used_swap = total_swap - free_swap

swap_stats = %{total: total_swap, free: free_swap, used: used_swap}
swap_percent_stats = %{free: 100 * free_swap / total_swap, used: 100 * used_swap / total_swap}

:telemetry.execute([:system, :swap], swap_stats, %{stack_id: opts[:stack_id]})
:telemetry.execute([:system, :swap_percent], swap_percent_stats, %{stack_id: opts[:stack_id]})
end
end
Loading

0 comments on commit 0565b46

Please sign in to comment.