Skip to content

Commit

Permalink
Emit telemetry and logs on sonar status change
Browse files Browse the repository at this point in the history
The notifier switch telemetry event provided for diagnostic
instrumentation, and it's included in default telemetry logger.
  • Loading branch information
sorentwo committed Feb 7, 2024
1 parent 8a2470d commit 1986d13
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 26 deletions.
12 changes: 7 additions & 5 deletions lib/oban/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ defmodule Oban.Notifier do

@type channel :: atom()
@type name_or_conf :: Oban.name() | Config.t()
@type pubsub_status :: :isolated | :solitary | :clustered
@type pubsub_status :: :unknown | :isolated | :solitary | :clustered

@doc """
Starts a notifier instance.
Expand Down Expand Up @@ -214,19 +214,21 @@ defmodule Oban.Notifier do
## Statuses
* `:isolated` — the notifier isn't receiving any messages. This is the default state on start,
and holds when the current node receives absolutely _no_ messages.
* `:unknown` — This is the default state on start before the notifier has time to determine the
appropriate status.
* `:isolated` — The notifier isn't receiving any messages.
The notifier may be connected to a database but `:isolated` and unable to receive other
message and unable to receive outside messages. Typically, this is the case for the default
`Postgres` notifier while testing or behind a connection pooler.
* `:solitary` — the notifier is only receiving messages from itself. This may be the case for
* `:solitary` — The notifier is only receiving messages from itself. This may be the case for
the `PG` notifier when Distributed Erlang nodes aren't connected, in development, or in
production deployments that only run a single node. If you're running multiple nodes in production
and the status is `:solitary`, there's a connectivity issue.
* `:clustered` — the notifier is connected and able to receive messages from other nodes. The
* `:clustered` — The notifier is connected and able to receive messages from other nodes. The
`Postgres` notifier is considered clustered if it can receive notifications, while the PG
notifier requires a functional Distributed Erlang cluster.
Expand Down
11 changes: 8 additions & 3 deletions lib/oban/sonar.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ defmodule Oban.Sonar do
defstruct [
:conf,
:timer,
interval: :timer.seconds(30),
interval: :timer.seconds(15),
nodes: %{},
stale_mult: 2,
status: :isolated
status: :unknown
]

@spec start_link(keyword()) :: GenServer.on_start()
Expand All @@ -39,8 +39,9 @@ defmodule Oban.Sonar do
@impl GenServer
def handle_continue(:start, state) do
:ok = Notifier.listen(state.conf.name, :sonar)
:ok = Notifier.notify(state.conf, :sonar, %{node: state.conf.node, ping: :ping})

handle_info(:ping, state)
{:noreply, schedule_ping(state)}
end

@impl GenServer
Expand Down Expand Up @@ -88,6 +89,10 @@ defmodule Oban.Sonar do
[_ | _] -> :clustered
end

if status != state.status do
:telemetry.execute([:oban, :notifier, :switch], %{}, %{conf: state.conf, status: status})
end

%{state | status: status}
end

Expand Down
59 changes: 56 additions & 3 deletions lib/oban/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,23 @@ defmodule Oban.Telemetry do
* `:conf` — see the explanation in metadata above
* `:mode` — either `local` for polling mode or `global` in the more efficient pub-sub mode
## Notifier Events
Oban emits an event when the notifier's sonar, responsible for tracking notifier connectivity,
switches connectivity status:
* `[:oban, :notifier, :switch]`
| event | measures | metadata |
| ------------ | --------- | ------------------ |
| `:switch` | | `:conf`, `:status` |
### Metadata
* `:conf` — see the explanation in metadata above
* `:status` — one of `:isolated`, `:solitary`, or `:clustered`, see
`Oban.Notifier.status/1` for details
## Default Logger
A default log handler that emits structured JSON is provided, see `attach_default_logger/0` for
Expand Down Expand Up @@ -273,13 +290,20 @@ defmodule Oban.Telemetry do
* `tags` — the job's tags
* `worker` — the job's worker module
And the following fields for stager events:
For stager events:
* `event` — always `stager:switch`
* `message` — information about the mode switch
* `mode` — either `"local"` or `"global"`
* `source` — always "oban"
For notifier events:
* `event` — always `notifier:switch`
* `message` — information about the status switch
* `source` — always "oban"
* `status` — either `"isolated"`, `"solitary"`, or `"clustered"`
## Options
* `:level` — The log level to use for logging output, defaults to `:info`
Expand Down Expand Up @@ -312,6 +336,7 @@ defmodule Oban.Telemetry do
[:oban, :job, :start],
[:oban, :job, :stop],
[:oban, :job, :exception],
[:oban, :notifier, :switch],
[:oban, :stager, :switch]
]

Expand Down Expand Up @@ -376,13 +401,41 @@ defmodule Oban.Telemetry do
end)
end

def handle_event([:oban, :notifier, :switch], _measure, %{status: status}, opts) do
log(opts, fn ->
case status do
:isolated ->
%{
event: "notifier:switch",
status: status,
message: "notifier can't receive messages from any nodes, functionality degraded"
}

:solitary ->
%{
event: "notifier:switch",
status: status,
message:
"notifier only receiving messages from its own node, functionality may be degraded"
}

:clustered ->
%{
event: "notifier:switch",
status: status,
message: "notifier is receiving messages from other nodes"
}
end
end)
end

def handle_event([:oban, :stager, :switch], _measure, %{mode: mode}, opts) do
log(opts, fn ->
case mode do
:local ->
%{
event: "stager:switch",
mode: "local",
mode: mode,
message:
"job staging switched to local mode. local mode polls for jobs for every queue; " <>
"restore global mode with a functional notifier"
Expand All @@ -391,7 +444,7 @@ defmodule Oban.Telemetry do
:global ->
%{
event: "stager:switch",
mode: "global",
mode: mode,
message: "job staging switched back to global mode"
}
end
Expand Down
28 changes: 26 additions & 2 deletions test/oban/sonar_test.exs
Original file line number Diff line number Diff line change
@@ -1,13 +1,33 @@
defmodule Oban.SonarTest do
use Oban.Case, async: true

alias Oban.Notifier
alias Oban.{Notifier, Registry, Sonar}

describe "integration" do
setup do
:telemetry_test.attach_event_handlers(self(), [[:oban, :notifier, :switch]])

:ok
end

test "starting with an :unknown status" do
name = start_supervised_oban!(notifier: Oban.Notifiers.Postgres)

assert :unknown = Notifier.status(name)
end

test "remaining isolated without any notifications" do
name = start_supervised_oban!(notifier: Oban.Notifiers.Postgres)

assert :isolated = Notifier.status(name)
name
|> Registry.whereis(Sonar)
|> send(:ping)

with_backoff(fn ->
assert :isolated = Notifier.status(name)
end)

assert_received {_event, _ref, _timing, %{status: :isolated}}
end

test "reporting a solitary status with only a single node" do
Expand All @@ -16,6 +36,8 @@ defmodule Oban.SonarTest do
with_backoff(fn ->
assert :solitary = Notifier.status(name)
end)

assert_received {_event, _ref, _timing, %{status: :solitary}}
end

test "reporting a clustered status with multiple nodes" do
Expand All @@ -26,6 +48,8 @@ defmodule Oban.SonarTest do
with_backoff(fn ->
assert :clustered = Notifier.status(name)
end)

assert_received {_event, _ref, _timing, %{status: :clustered}}
end
end
end
33 changes: 20 additions & 13 deletions test/oban/telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ defmodule Oban.TelemetryTest do
test "the default handler logs detailed event information" do
:ok = Telemetry.attach_default_logger(:warning)

start_supervised_oban!(stage_interval: 10, queues: [alpha: 3])
start_supervised_oban!(
notifier: Oban.Notifiers.Postgres,
queues: [alpha: 3],
stage_interval: 10
)

logged =
capture_log(fn ->
Expand Down Expand Up @@ -153,25 +157,28 @@ defmodule Oban.TelemetryTest do
Telemetry.detach_default_logger()
end

test "detaching the logger prevents logging" do
test "the default handler logs notifier switch events" do
:ok = Telemetry.attach_default_logger(:warning)

start_supervised_oban!(stage_interval: 10, queues: [alpha: 3])

:ok = Telemetry.detach_default_logger()

logged =
capture_log(fn ->
insert!(ref: 1, action: "OK")
insert!(ref: 2, action: "ERROR")
:telemetry.execute([:oban, :notifier, :switch], %{}, %{status: :isolated})
end)

assert_receive {:ok, 1}
assert_receive {:error, 2}
assert logged =~ ~s("source":"oban")
assert logged =~ ~s("event":"notifier:switch")
assert logged =~ ~s("message":"notifier can't receive messages)
after
Telemetry.detach_default_logger()
end

Logger.flush()
end)
test "detaching the logger prevents logging" do
:ok = Telemetry.attach_default_logger(:warning)
:ok = Telemetry.detach_default_logger()

assert logged == ""
assert capture_log(fn ->
:telemetry.execute([:oban, :notifier, :switch], %{}, %{status: :isolated})
end) == ""
end

test "disabling encoding on the default logger" do
Expand Down

0 comments on commit 1986d13

Please sign in to comment.