Skip to content

Commit

Permalink
Add pause_all_queues/2 and resume_all_queues/2
Browse files Browse the repository at this point in the history
Pause and resume all queues with a single function call and a single
signal, rather than manually looping through all queues and issuing
separate calls.
  • Loading branch information
sorentwo committed Nov 16, 2023
1 parent c4d4068 commit 814e148
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 0 deletions.
58 changes: 58 additions & 0 deletions lib/oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,18 @@ defmodule Oban do
Oban.pause_queue(__MODULE__, opts)
end

def pause_all_queues(opts) do
Oban.pause_all_queues(__MODULE__, opts)
end

def resume_queue(opts) do
Oban.resume_queue(__MODULE__, opts)
end

def resume_all_queues(opts) do
Oban.resume_all_queues(__MODULE__, opts)
end

def scale_queue(opts) do
Oban.scale_queue(__MODULE__, opts)
end
Expand Down Expand Up @@ -860,6 +868,31 @@ defmodule Oban do
Notifier.notify(conf, :signal, data)
end

@doc """
Pause all running queues to prevent them from executing any new jobs.
See `pause_queue/2` for options and details.
## Example
Pause all queues:
Oban.pause_all_queues()
Pause all queues on the local node:
Oban.pause_all_queues(local_only: true)
Pause all queues on a specific node:
Oban.pause_all_queues(node: "worker.1")
"""
@doc since: "2.17.0"
@spec pause_all_queues(name(), opts :: [local_only: boolean(), node: String.t()]) :: :ok
def pause_all_queues(name \\ __MODULE__, opts \\ []) do
pause_queue(name, Keyword.put(opts, :queue, :*))
end

@doc """
Resume executing jobs in a paused queue.
Expand Down Expand Up @@ -904,6 +937,31 @@ defmodule Oban do
Notifier.notify(conf, :signal, data)
end

@doc """
Resume executing jobs in all paused queues.
See `resume_queue/2` for options and details.
## Example
Resume all queues:
Oban.resume_all_queues()
Resume all queues on the local node:
Oban.resume_all_queues(local_only: true)
Resume all queues on a specific node:
Oban.resume_all_queues(node: "worker.1")
"""
@doc since: "2.17.0"
@spec resume_all_queues(name(), opts :: [local_only: boolean(), node: String.t()]) :: :ok
def resume_all_queues(name \\ __MODULE__, opts \\ []) do
resume_queue(name, Keyword.put(opts, :queue, :*))
end

@doc """
Scale the concurrency for a queue.
Expand Down
6 changes: 6 additions & 0 deletions lib/oban/queue/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ defmodule Oban.Queue.Producer do

meta =
case payload do
%{"action" => "pause", "queue" => "*"} ->
Engine.put_meta(state.conf, state.meta, :paused, true)

%{"action" => "resume", "queue" => "*"} ->
Engine.put_meta(state.conf, state.meta, :paused, false)

%{"action" => "pause", "queue" => ^queue} ->
Engine.put_meta(state.conf, state.meta, :paused, true)

Expand Down
22 changes: 22 additions & 0 deletions test/oban_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,28 @@ defmodule ObanTest do
end
end

describe "pause_all_queues/2 and resume_all_queues/2" do
test "pausing and resuming all queues" do
name = start_supervised_oban!(queues: [alpha: 1, gamma: 1, delta: 1])

assert :ok = Oban.pause_all_queues(name)

with_backoff(fn ->
assert %{paused: true} = Oban.check_queue(name, queue: :alpha)
assert %{paused: true} = Oban.check_queue(name, queue: :gamma)
assert %{paused: true} = Oban.check_queue(name, queue: :delta)
end)

assert :ok = Oban.resume_all_queues(name)

with_backoff(fn ->
assert %{paused: false} = Oban.check_queue(name, queue: :alpha)
assert %{paused: false} = Oban.check_queue(name, queue: :gamma)
assert %{paused: false} = Oban.check_queue(name, queue: :delta)
end)
end
end

describe "scale_queue/2" do
test "validating options" do
name = start_supervised_oban!(testing: :manual)
Expand Down

0 comments on commit 814e148

Please sign in to comment.