diff --git a/lib/oban/notifier.ex b/lib/oban/notifier.ex index 2820146e..c270e449 100644 --- a/lib/oban/notifier.ex +++ b/lib/oban/notifier.ex @@ -81,7 +81,7 @@ defmodule Oban.Notifier do @type channel :: atom() @type name_or_conf :: Oban.name() | Config.t() - @type pubsub_status :: :isolated | :solitary | :clustered + @type pubsub_status :: :unknown | :isolated | :solitary | :clustered @doc """ Starts a notifier instance. @@ -214,19 +214,21 @@ defmodule Oban.Notifier do ## Statuses - * `:isolated` — the notifier isn't receiving any messages. This is the default state on start, - and holds when the current node receives absolutely _no_ messages. + * `:unknown` — This is the default state on start before the notifier has time to determine the + appropriate status. + + * `:isolated` — The notifier isn't receiving any messages. The notifier may be connected to a database but `:isolated` and unable to receive other message and unable to receive outside messages. Typically, this is the case for the default `Postgres` notifier while testing or behind a connection pooler. - * `:solitary` — the notifier is only receiving messages from itself. This may be the case for + * `:solitary` — The notifier is only receiving messages from itself. This may be the case for the `PG` notifier when Distributed Erlang nodes aren't connected, in development, or in production deployments that only run a single node. If you're running multiple nodes in production and the status is `:solitary`, there's a connectivity issue. - * `:clustered` — the notifier is connected and able to receive messages from other nodes. The + * `:clustered` — The notifier is connected and able to receive messages from other nodes. The `Postgres` notifier is considered clustered if it can receive notifications, while the PG notifier requires a functional Distributed Erlang cluster. diff --git a/lib/oban/sonar.ex b/lib/oban/sonar.ex index d7f9f1cb..68afd33a 100644 --- a/lib/oban/sonar.ex +++ b/lib/oban/sonar.ex @@ -9,10 +9,10 @@ defmodule Oban.Sonar do defstruct [ :conf, :timer, - interval: :timer.seconds(30), + interval: :timer.seconds(15), nodes: %{}, stale_mult: 2, - status: :isolated + status: :unknown ] @spec start_link(keyword()) :: GenServer.on_start() @@ -39,8 +39,9 @@ defmodule Oban.Sonar do @impl GenServer def handle_continue(:start, state) do :ok = Notifier.listen(state.conf.name, :sonar) + :ok = Notifier.notify(state.conf, :sonar, %{node: state.conf.node, ping: :ping}) - handle_info(:ping, state) + {:noreply, schedule_ping(state)} end @impl GenServer @@ -88,6 +89,10 @@ defmodule Oban.Sonar do [_ | _] -> :clustered end + if status != state.status do + :telemetry.execute([:oban, :notifier, :switch], %{}, %{conf: state.conf, status: status}) + end + %{state | status: status} end diff --git a/lib/oban/telemetry.ex b/lib/oban/telemetry.ex index 16b23b6b..93990c29 100644 --- a/lib/oban/telemetry.ex +++ b/lib/oban/telemetry.ex @@ -184,6 +184,23 @@ defmodule Oban.Telemetry do * `:conf` — see the explanation in metadata above * `:mode` — either `local` for polling mode or `global` in the more efficient pub-sub mode + ## Notifier Events + + Oban emits an event when the notifier's sonar, responsible for tracking notifier connectivity, + switches connectivity status: + + * `[:oban, :notifier, :switch]` + + | event | measures | metadata | + | ------------ | --------- | ------------------ | + | `:switch` | | `:conf`, `:status` | + + ### Metadata + + * `:conf` — see the explanation in metadata above + * `:status` — one of `:isolated`, `:solitary`, or `:clustered`, see + `Oban.Notifier.status/1` for details + ## Default Logger A default log handler that emits structured JSON is provided, see `attach_default_logger/0` for @@ -273,13 +290,20 @@ defmodule Oban.Telemetry do * `tags` — the job's tags * `worker` — the job's worker module - And the following fields for stager events: + For stager events: * `event` — always `stager:switch` * `message` — information about the mode switch * `mode` — either `"local"` or `"global"` * `source` — always "oban" + For notifier events: + + * `event` — always `notifier:switch` + * `message` — information about the status switch + * `source` — always "oban" + * `status` — either `"isolated"`, `"solitary"`, or `"clustered"` + ## Options * `:level` — The log level to use for logging output, defaults to `:info` @@ -312,6 +336,7 @@ defmodule Oban.Telemetry do [:oban, :job, :start], [:oban, :job, :stop], [:oban, :job, :exception], + [:oban, :notifier, :switch], [:oban, :stager, :switch] ] @@ -376,13 +401,41 @@ defmodule Oban.Telemetry do end) end + def handle_event([:oban, :notifier, :switch], _measure, %{status: status}, opts) do + log(opts, fn -> + case status do + :isolated -> + %{ + event: "notifier:switch", + status: status, + message: "notifier can't receive messages from any nodes, functionality degraded" + } + + :solitary -> + %{ + event: "notifier:switch", + status: status, + message: + "notifier only receiving messages from its own node, functionality may be degraded" + } + + :clustered -> + %{ + event: "notifier:switch", + status: status, + message: "notifier is receiving messages from other nodes" + } + end + end) + end + def handle_event([:oban, :stager, :switch], _measure, %{mode: mode}, opts) do log(opts, fn -> case mode do :local -> %{ event: "stager:switch", - mode: "local", + mode: mode, message: "job staging switched to local mode. local mode polls for jobs for every queue; " <> "restore global mode with a functional notifier" @@ -391,7 +444,7 @@ defmodule Oban.Telemetry do :global -> %{ event: "stager:switch", - mode: "global", + mode: mode, message: "job staging switched back to global mode" } end diff --git a/test/oban/sonar_test.exs b/test/oban/sonar_test.exs index 920862d7..1d5378a2 100644 --- a/test/oban/sonar_test.exs +++ b/test/oban/sonar_test.exs @@ -1,13 +1,33 @@ defmodule Oban.SonarTest do use Oban.Case, async: true - alias Oban.Notifier + alias Oban.{Notifier, Registry, Sonar} describe "integration" do + setup do + :telemetry_test.attach_event_handlers(self(), [[:oban, :notifier, :switch]]) + + :ok + end + + test "starting with an :unknown status" do + name = start_supervised_oban!(notifier: Oban.Notifiers.Postgres) + + assert :unknown = Notifier.status(name) + end + test "remaining isolated without any notifications" do name = start_supervised_oban!(notifier: Oban.Notifiers.Postgres) - assert :isolated = Notifier.status(name) + name + |> Registry.whereis(Sonar) + |> send(:ping) + + with_backoff(fn -> + assert :isolated = Notifier.status(name) + end) + + assert_received {_event, _ref, _timing, %{status: :isolated}} end test "reporting a solitary status with only a single node" do @@ -16,6 +36,8 @@ defmodule Oban.SonarTest do with_backoff(fn -> assert :solitary = Notifier.status(name) end) + + assert_received {_event, _ref, _timing, %{status: :solitary}} end test "reporting a clustered status with multiple nodes" do @@ -26,6 +48,8 @@ defmodule Oban.SonarTest do with_backoff(fn -> assert :clustered = Notifier.status(name) end) + + assert_received {_event, _ref, _timing, %{status: :clustered}} end end end diff --git a/test/oban/telemetry_test.exs b/test/oban/telemetry_test.exs index 20927178..357be1e8 100644 --- a/test/oban/telemetry_test.exs +++ b/test/oban/telemetry_test.exs @@ -95,7 +95,11 @@ defmodule Oban.TelemetryTest do test "the default handler logs detailed event information" do :ok = Telemetry.attach_default_logger(:warning) - start_supervised_oban!(stage_interval: 10, queues: [alpha: 3]) + start_supervised_oban!( + notifier: Oban.Notifiers.Postgres, + queues: [alpha: 3], + stage_interval: 10 + ) logged = capture_log(fn -> @@ -153,25 +157,28 @@ defmodule Oban.TelemetryTest do Telemetry.detach_default_logger() end - test "detaching the logger prevents logging" do + test "the default handler logs notifier switch events" do :ok = Telemetry.attach_default_logger(:warning) - start_supervised_oban!(stage_interval: 10, queues: [alpha: 3]) - - :ok = Telemetry.detach_default_logger() - logged = capture_log(fn -> - insert!(ref: 1, action: "OK") - insert!(ref: 2, action: "ERROR") + :telemetry.execute([:oban, :notifier, :switch], %{}, %{status: :isolated}) + end) - assert_receive {:ok, 1} - assert_receive {:error, 2} + assert logged =~ ~s("source":"oban") + assert logged =~ ~s("event":"notifier:switch") + assert logged =~ ~s("message":"notifier can't receive messages) + after + Telemetry.detach_default_logger() + end - Logger.flush() - end) + test "detaching the logger prevents logging" do + :ok = Telemetry.attach_default_logger(:warning) + :ok = Telemetry.detach_default_logger() - assert logged == "" + assert capture_log(fn -> + :telemetry.execute([:oban, :notifier, :switch], %{}, %{status: :isolated}) + end) == "" end test "disabling encoding on the default logger" do