diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 9b0f8aeded..7f3175f92a 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -1,14 +1,15 @@ import Config import Dotenvy +alias Electric.ConfigParser + config :elixir, :time_zone_database, Tz.TimeZoneDatabase if config_env() in [:dev, :test] do source!([".env.#{config_env()}", ".env.#{config_env()}.local", System.get_env()]) end -config :logger, - level: env!("ELECTRIC_LOG_LEVEL", &Electric.ConfigParser.parse_log_level!/1, :info) +config :logger, level: env!("ELECTRIC_LOG_LEVEL", &ConfigParser.parse_log_level!/1, :info) config :logger, :default_formatter, # Doubled line breaks serve as long message boundaries @@ -46,8 +47,6 @@ service_name = env!("ELECTRIC_SERVICE_NAME", :string, "electric") instance_id = env!("ELECTRIC_INSTANCE_ID", :string, Electric.Utils.uuid4()) version = Electric.version() -config :telemetry_poller, :default, period: 500 - config :opentelemetry, resource_detectors: [:otel_resource_env_var, :otel_resource_app_env], resource: %{service: %{name: service_name, version: version}, instance: %{id: instance_id}} @@ -107,13 +106,11 @@ config :opentelemetry, local_parent_not_sampled: :always_off }} -database_url = env!("DATABASE_URL", :string!) +database_url_config = env!("DATABASE_URL", &ConfigParser.parse_postgresql_uri!/1) database_ipv6_config = env!("ELECTRIC_DATABASE_USE_IPV6", :boolean, false) -{:ok, database_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_url) - connection_opts = database_url_config ++ [ipv6: database_ipv6_config] config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts) @@ -194,6 +191,21 @@ replication_stream_id = storage = {storage_mod, storage_opts} prometheus_port = env!("ELECTRIC_PROMETHEUS_PORT", :integer, nil) + +call_home_telemetry_url = + env!( + "ELECTRIC_TELEMETRY_URL", + &ConfigParser.parse_telemetry_url!/1, + "https://checkpoint.electric-sql.com" + ) + +system_metrics_poll_interval = + env!( + "ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL", + &ConfigParser.parse_human_readable_time!/1, + :timer.seconds(5) + ) + # The provided database id is relevant if you had used v0.8 and want to keep the storage # instead of having hanging files. We use a provided value as stack id, but nothing else. provided_database_id = env!("ELECTRIC_DATABASE_ID", :string, "single_stack") @@ -206,14 +218,15 @@ config :electric, chunk_bytes_threshold: chunk_bytes_threshold, # Used in telemetry instance_id: instance_id, + call_home_telemetry?: env!("ELECTRIC_USAGE_REPORTING", :boolean, config_env() == :prod), + telemetry_url: call_home_telemetry_url, + system_metrics_poll_interval: system_metrics_poll_interval, telemetry_statsd_host: statsd_host, + prometheus_port: prometheus_port, db_pool_size: env!("ELECTRIC_DB_POOL_SIZE", :integer, 20), replication_stream_id: replication_stream_id, replication_slot_temporary?: env!("CLEANUP_REPLICATION_SLOTS_ON_SHUTDOWN", :boolean, false), service_port: env!("ELECTRIC_PORT", :integer, 3000), - prometheus_port: prometheus_port, storage: storage, persistent_kv: persistent_kv, - listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false), - call_home_telemetry: env!("ELECTRIC_USAGE_REPORTING", :boolean, config_env() == :prod), - telemetry_url: "https://checkpoint.electric-sql.com" + listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false) diff --git a/packages/sync-service/lib/electric/config_parser.ex b/packages/sync-service/lib/electric/config_parser.ex index f43a13a717..6d8c7ddd67 100644 --- a/packages/sync-service/lib/electric/config_parser.ex +++ b/packages/sync-service/lib/electric/config_parser.ex @@ -201,6 +201,49 @@ defmodule Electric.ConfigParser do def parse_log_level!(str) when str in @log_levels, do: String.to_existing_atom(str) - def parse_log_level!(_str), - do: raise(Dotenvy.Error, message: "Must be one of #{inspect(@public_log_levels)}") + def parse_log_level!(_str) do + raise Dotenvy.Error, message: "Must be one of #{inspect(@public_log_levels)}" + end + + @spec parse_telemetry_url(binary) :: {:ok, binary} | {:error, binary} + def parse_telemetry_url(str) do + case URI.new(str) do + {:ok, %URI{scheme: scheme}} when scheme in ["http", "https"] -> {:ok, str} + _ -> {:error, "has invalid URL format: \"#{str}\""} + end + end + + def parse_telemetry_url!(str) do + case parse_telemetry_url(str) do + {:ok, url} -> url + {:error, message} -> raise Dotenvy.Error, message: message + end + end + + @time_units ~w[ms msec s sec m min] + + @spec parse_human_readable_time(binary | nil) :: {:ok, pos_integer} | {:error, binary} + + def parse_human_readable_time(str) do + with {num, suffix} <- Float.parse(str), + true <- num > 0, + suffix = String.trim(suffix), + true <- suffix == "" or suffix in @time_units do + {:ok, trunc(num * time_multiplier(suffix))} + else + _ -> {:error, "has invalid value: #{inspect(str)}. Must be one of #{inspect(@time_units)}"} + end + end + + defp time_multiplier(""), do: 1 + defp time_multiplier(millisecond) when millisecond in ["ms", "msec"], do: 1 + defp time_multiplier(second) when second in ["s", "sec"], do: 1000 + defp time_multiplier(minute) when minute in ["m", "min"], do: 1000 * 60 + + def parse_human_readable_time!(str) do + case parse_human_readable_time(str) do + {:ok, result} -> result + {:error, message} -> raise Dotenvy.Error, message: message + end + end end diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index ed1ac4f43e..ded8707935 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -702,7 +702,7 @@ defmodule Electric.Connection.Manager do defp query_and_report_retained_wal_size(pool, slot_name, stack_id) do query = """ SELECT - pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) + pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::int8 FROM pg_replication_slots WHERE @@ -710,8 +710,11 @@ defmodule Electric.Connection.Manager do """ case Postgrex.query(pool, query, [slot_name]) do + # The query above can return `-1` which I'm assuming means "up-to-date". + # This is a confusing stat if we're measuring in bytes, so normalise to + # [0, :infinity) {:ok, %Postgrex.Result{rows: [[wal_size]]}} -> - :telemetry.execute([:electric, :postgres, :replication], %{wal_size: wal_size}, %{ + :telemetry.execute([:electric, :postgres, :replication], %{wal_size: max(0, wal_size)}, %{ stack_id: stack_id }) diff --git a/packages/sync-service/lib/electric/telemetry.ex b/packages/sync-service/lib/electric/telemetry.ex index 79a8bbb10e..fbd8d40ca7 100644 --- a/packages/sync-service/lib/electric/telemetry.ex +++ b/packages/sync-service/lib/electric/telemetry.ex @@ -1,5 +1,6 @@ defmodule Electric.Telemetry do use Supervisor + import Telemetry.Metrics def start_link(init_arg) do @@ -7,29 +8,34 @@ defmodule Electric.Telemetry do end def init(opts) do - children = [ - {:telemetry_poller, measurements: periodic_measurements(opts), period: 2_000} - ] + system_metrics_poll_interval = Application.get_env(:electric, :system_metrics_poll_interval) - children - |> add_statsd_reporter(Application.fetch_env!(:electric, :telemetry_statsd_host)) - |> add_prometheus_reporter(Application.fetch_env!(:electric, :prometheus_port)) - |> add_call_home_reporter(Application.fetch_env!(:electric, :call_home_telemetry)) + statsd_host = Application.fetch_env!(:electric, :telemetry_statsd_host) + prometheus? = not is_nil(Application.fetch_env!(:electric, :prometheus_port)) + call_home_telemetry? = Application.fetch_env!(:electric, :call_home_telemetry?) + + [ + {:telemetry_poller, + measurements: periodic_measurements(opts), + period: system_metrics_poll_interval, + init_delay: :timer.seconds(5)}, + statsd_reporter_child_spec(statsd_host), + prometheus_reporter_child_spec(prometheus?), + call_home_reporter_child_spec(call_home_telemetry?) + ] + |> Enum.reject(&is_nil/1) |> Supervisor.init(strategy: :one_for_one) end - defp add_call_home_reporter(children, false), do: children - - defp add_call_home_reporter(children, true) do - children ++ - [ - {Electric.Telemetry.CallHomeReporter, - static_info: static_info(), - metrics: call_home_metrics(), - first_report_in: {2, :minute}, - reporting_period: {30, :minute}, - reporter_fn: &Electric.Telemetry.CallHomeReporter.report_home/1} - ] + defp call_home_reporter_child_spec(false), do: nil + + defp call_home_reporter_child_spec(true) do + {Electric.Telemetry.CallHomeReporter, + static_info: static_info(), + metrics: call_home_metrics(), + first_report_in: {2, :minute}, + reporting_period: {30, :minute}, + reporter_fn: &Electric.Telemetry.CallHomeReporter.report_home/1} end def static_info() do @@ -69,6 +75,19 @@ defmodule Electric.Telemetry do run_queue_cpu: summary("vm.total_run_queue_lengths.cpu"), run_queue_io: summary("vm.total_run_queue_lengths.io") ], + system: [ + load_avg1: last_value("system.load_percent.avg1"), + load_avg5: last_value("system.load_percent.avg5"), + load_avg15: last_value("system.load_percent.avg15"), + memory_free: last_value("system.memory.free_memory"), + memory_used: last_value("system.memory.used_memory"), + memory_free_percent: last_value("system.memory_percent.free_memory"), + memory_used_percent: last_value("system.memory_percent.used_memory"), + swap_free: last_value("system.swap.free"), + swap_used: last_value("system.swap.used"), + swap_free_percent: last_value("system.swap_percent.free"), + swap_used_percent: last_value("system.swap_percent.used") + ], usage: [ inbound_bytes: sum("electric.postgres.replication.transaction_received.bytes", unit: :byte), @@ -105,23 +124,20 @@ defmodule Electric.Telemetry do ] end - defp add_statsd_reporter(children, nil), do: children + defp statsd_reporter_child_spec(nil), do: nil - defp add_statsd_reporter(children, host) do - children ++ - [ - {TelemetryMetricsStatsd, - host: host, - formatter: :datadog, - global_tags: [instance_id: Electric.instance_id()], - metrics: statsd_metrics()} - ] + defp statsd_reporter_child_spec(host) do + {TelemetryMetricsStatsd, + host: host, + formatter: :datadog, + global_tags: [instance_id: Electric.instance_id()], + metrics: statsd_metrics()} end - defp add_prometheus_reporter(children, nil), do: children + defp prometheus_reporter_child_spec(false), do: nil - defp add_prometheus_reporter(children, _) do - children ++ [{TelemetryMetricsPrometheus.Core, metrics: prometheus_metrics()}] + defp prometheus_reporter_child_spec(true) do + {TelemetryMetricsPrometheus.Core, metrics: prometheus_metrics()} end defp statsd_metrics() do @@ -147,7 +163,14 @@ defmodule Electric.Telemetry do summary("electric.storage.make_new_snapshot.stop.duration", unit: {:native, :millisecond}), summary("electric.querying.stream_initial_data.stop.duration", unit: {:native, :millisecond} - ) + ), + last_value("system.load_percent.avg1"), + last_value("system.load_percent.avg5"), + last_value("system.load_percent.avg15"), + last_value("system.memory.free_memory"), + last_value("system.memory.used_memory"), + last_value("system.swap.free"), + last_value("system.swap.used") ] |> Enum.map(&%{&1 | tags: [:instance_id | &1.tags]}) end @@ -185,6 +208,8 @@ defmodule Electric.Telemetry do {__MODULE__, :uptime_event, []}, {__MODULE__, :count_shapes, [stack_id]}, {__MODULE__, :get_total_disk_usage, [opts]}, + {__MODULE__, :get_system_load_average, [opts]}, + {__MODULE__, :get_system_memory_usage, [opts]}, {Electric.Connection.Manager, :report_retained_wal_size, [Electric.Connection.Manager.name(stack_id)]} ] @@ -217,4 +242,92 @@ defmodule Electric.Telemetry do :exit, {:noproc, _} -> :ok end + + def get_system_load_average(opts) do + cores = :erlang.system_info(:logical_processors) + + # > The load values are proportional to how long time a runnable Unix + # > process has to spend in the run queue before it is scheduled. + # > Accordingly, higher values mean more system load. The returned value + # > divided by 256 produces the figure displayed by rup and top. + # + # I'm going one step further and dividing by the number of CPUs so in a 4 + # core system, a load of 4.0 (in top) will show as 100%. + # Since load can go above num cores, we can to 200%, 300% but + # I think this makes sense. + # + # Certainly the formula in the erlang docs: + # + # > the following simple mathematical transformation can produce the load + # > value as a percentage: + # > + # > PercentLoad = 100 * (1 - D/(D + Load)) + # > + # > D determines which load value should be associated with which + # > percentage. Choosing D = 50 means that 128 is 60% load, 256 is 80%, 512 + # > is 90%, and so on. + # + # Makes little sense. Setting `D` as they say and plugging in a avg1 value + # of 128 does not give 60% so I'm not sure how to square what they say with + # the numbers... + # + # e.g. my machine currently has a cpu util (:cpu_sup.util()) of 4% and an + # avg1() of 550 ish across 24 cores (so doing very little) but that formula + # would give a `PercentLoad` of ~92%. + # + # My version would give value of 550 / 256 / 24 = 9% + [:avg1, :avg5, :avg15] + |> Map.new(fn probe -> + {probe, 100 * (apply(:cpu_sup, probe, []) / 256 / cores)} + end) + |> then( + &:telemetry.execute([:system, :load_percent], &1, %{ + stack_id: opts[:stack_id] + }) + ) + end + + def get_system_memory_usage(opts) do + {total, stats} = + :memsup.get_system_memory_data() + |> Keyword.delete(:total_memory) + |> Keyword.pop(:system_total_memory) + + mem_stats = + Keyword.take(stats, [:free_memory, :available_memory, :buffered_memory, :cached_memory]) + + {total_swap, stats} = Keyword.pop(stats, :total_swap) + + used_memory = total - Keyword.fetch!(mem_stats, :free_memory) + resident_memory = total - Keyword.fetch!(mem_stats, :available_memory) + + memory_stats = + mem_stats + |> Map.new() + |> Map.put(:used_memory, used_memory) + |> Map.put(:resident_memory, resident_memory) + + memory_percent_stats = + mem_stats + |> Map.new(fn {k, v} -> {k, 100 * v / total} end) + |> Map.put(:used_memory, 100 * used_memory / total) + |> Map.put(:resident_memory, 100 * resident_memory / total) + + :telemetry.execute([:system, :memory], memory_stats, %{ + stack_id: opts[:stack_id] + }) + + :telemetry.execute([:system, :memory_percent], memory_percent_stats, %{ + stack_id: opts[:stack_id] + }) + + free_swap = Keyword.get(stats, :free_swap, 0) + used_swap = total_swap - free_swap + + swap_stats = %{total: total_swap, free: free_swap, used: used_swap} + swap_percent_stats = %{free: 100 * free_swap / total_swap, used: 100 * used_swap / total_swap} + + :telemetry.execute([:system, :swap], swap_stats, %{stack_id: opts[:stack_id]}) + :telemetry.execute([:system, :swap_percent], swap_percent_stats, %{stack_id: opts[:stack_id]}) + end end diff --git a/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex b/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex index b2d1ae09bd..f698d7bf46 100644 --- a/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex +++ b/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex @@ -28,12 +28,12 @@ defmodule Electric.Telemetry.CallHomeReporter do end def report_home(results) do - url = Application.fetch_env!(:electric, :telemetry_url) - - Req.post!(url, json: results, retry: :transient) + Req.post!(telemetry_url(), json: results, retry: :transient) :ok end + defp telemetry_url, do: Application.fetch_env!(:electric, :telemetry_url) + def print_stats(name \\ __MODULE__) do GenServer.call(name, :print_stats) end @@ -49,7 +49,9 @@ defmodule Electric.Telemetry.CallHomeReporter do Process.set_label({:call_home_reporter, name}) Logger.notice( - "Starting telemetry reporter. Electric will send anonymous usage data to #{Application.fetch_env!(:electric, :telemetry_url)}. You can configure this with `ELECTRIC_USAGE_REPORTING` environment variable, see https://electric-sql.com/docs/reference/telemetry for more information." + "Starting telemetry reporter. Electric will send anonymous usage data to #{telemetry_url()}. " <> + "You can configure this with `ELECTRIC_USAGE_REPORTING` environment variable, " <> + "see https://electric-sql.com/docs/reference/telemetry for more information." ) metrics = save_target_path_to_options(metrics)