Skip to content

Commit

Permalink
Notify global peers when the leader terminates
Browse files Browse the repository at this point in the history
Now the `Global` leader sends a `down` message to all connected nodes
when the process terminates cleanly. This behaviour prevents up to 30s
of downtime without a leader and matches how the Postgres peer operates.

Closes #847
  • Loading branch information
sorentwo committed Feb 17, 2023
1 parent bc43ce5 commit 8a65b00
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
32 changes: 29 additions & 3 deletions lib/oban/peers/global.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Oban.Peers.Global do

use GenServer

alias Oban.Backoff
alias Oban.{Backoff, Notifier}

defmodule State do
@moduledoc false
Expand Down Expand Up @@ -52,13 +52,23 @@ defmodule Oban.Peers.Global do
@impl GenServer
def terminate(_reason, %State{timer: timer} = state) do
if is_reference(timer), do: Process.cancel_timer(timer)
if state.leader?, do: :global.del_lock(key(state), nodes())

if state.leader? do
try do
delete_self(state)
notify_down(state)
catch
:exit, _reason -> :ok
end
end

:ok
end

@impl GenServer
def handle_continue(:start, %State{} = state) do
Notifier.listen(state.conf.name, [:leader])

handle_info(:election, state)
end

Expand All @@ -81,13 +91,29 @@ defmodule Oban.Peers.Global do
{:noreply, schedule_election(%{state | leader?: locked?})}
end

def handle_info({:notification, :leader, %{"down" => name}}, %State{conf: conf} = state) do
if name == inspect(conf.name) do
handle_info(:election, state)
else
{:noreply, state}
end
end

# Helpers

defp schedule_election(%State{interval: interval} = state) do
time = Backoff.jitter(interval, mode: :dec)

%{state | timer: Process.send_after(self(), :election, time)}
end

# Helpers
defp delete_self(state) do
:global.del_lock(key(state), nodes())
end

defp notify_down(%{conf: conf}) do
Notifier.notify(conf, :leader, %{down: inspect(conf.name)})
end

defp key(state), do: {state.conf.name, state.conf.node}

Expand Down
35 changes: 33 additions & 2 deletions test/oban/peers/global_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ defmodule Oban.Peers.GlobalTest do
alias Oban.TelemetryHandler

test "only a single peer is leader" do
TelemetryHandler.attach_events()
# Start an instance just to provide a notifier under the Oban name
start_supervised_oban!(name: Oban, peer: false)

name_1 = start_supervised_oban!(peer: Global, node: "worker.1")
name_2 = start_supervised_oban!(peer: Global, node: "worker.2")
Expand All @@ -17,7 +18,37 @@ defmodule Oban.Peers.GlobalTest do
peer_1 = start_supervised!({Peer, conf: conf_1, name: A})
peer_2 = start_supervised!({Peer, conf: conf_2, name: B})

assert [_leader] = Enum.filter([peer_1, peer_2], &Global.leader?/1)
assert [_pid] = Enum.filter([peer_1, peer_2], &Global.leader?/1)
end

test "leadership changes when a peer terminates" do
start_supervised_oban!(name: Oban, peer: false)

:ok = Oban.Notifier.listen(Oban, :leader)

conf =
[peer: false, node: "worker.1"]
|> start_supervised_oban!()
|> Oban.config()
|> Map.put(:peer, Global)

peer_1 = start_supervised!({Peer, name: A, conf: %{conf | name: Oban, node: "web.1"}})
peer_2 = start_supervised!({Peer, name: B, conf: %{conf | name: Oban, node: "web.2"}})

assert leader = Enum.find([peer_1, peer_2], &Global.leader?/1)
assert :ok = GenServer.stop(leader)

assert_receive {:notification, :leader, %{"down" => _}}

with_backoff(fn ->
assert Enum.find([peer_1, peer_2] -- [leader], &Global.leader?/1)
end)
end

test "emitting elemetry events for elections" do
TelemetryHandler.attach_events()

start_supervised_oban!(peer: Global, node: "worker.1")

assert_receive {:event, [:election, :start], _measure, %{leader: _, peer: Oban.Peers.Global}}
assert_receive {:event, [:election, :stop], _measure, %{leader: _, peer: Oban.Peers.Global}}
Expand Down

0 comments on commit 8a65b00

Please sign in to comment.