Skip to content

Commit

Permalink
Add delete_job/2 and delete_all_jobs/2 operations
Browse files Browse the repository at this point in the history
This adds Oban.delete_job/2, Oban.delete_all_jobs/2, Engine callbacks,
and associated operations for all native engines. Deleting jobs is now
easier and safer, due to automatic state protections.
  • Loading branch information
sorentwo committed Dec 24, 2024
1 parent d3b154d commit 9f410d4
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 10 deletions.
72 changes: 69 additions & 3 deletions lib/oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ defmodule Oban do
Oban.config(__MODULE__)
end

def delete_all_jobs(queryable) do
Oban.delete_all_jobs(__MODULE__, queryable)
end

def delete_job(job_or_id) do
Oban.delete_job(__MODULE__, job_or_id)
end

def drain_queue(opts) do
Oban.drain_queue(__MODULE__, opts)
end
Expand Down Expand Up @@ -267,6 +275,8 @@ defmodule Oban do
check_queue: 1,
check_all_queues: 0,
config: 0,
delete_job: 1,
delete_all_jobs: 1,
drain_queue: 1,
insert: 1,
insert: 2,
Expand Down Expand Up @@ -1322,6 +1332,11 @@ defmodule Oban do
Oban.cancel_job(job)
:ok
Cancel a job for a custom instance:
Oban.cancel_job(MyOban, job)
:ok
"""
@doc since: "1.3.0"
@spec cancel_job(name(), job_or_id :: Job.t() | integer()) :: :ok
Expand All @@ -1337,10 +1352,10 @@ defmodule Oban do

@doc """
Cancel many jobs based on a queryable and mark them as `cancelled` to prevent them from running.
Any currently `executing` jobs are killed while the others are ignored.
If executing jobs happen to fail before cancellation then the state is set to `cancelled`.
However, any that complete successfully will remain `completed`.
Any currently `executing` jobs are killed. If executing jobs happen to fail before cancellation
then the state is set to `cancelled`. However, any that complete successfully will remain
`completed`.
Only jobs with the statuses `executing`, `available`, `scheduled`, or `retryable` can be cancelled.
Expand Down Expand Up @@ -1375,6 +1390,57 @@ defmodule Oban do
{:ok, length(cancelled_jobs)}
end

@doc """
Delete a job that's not currently `executing`.
## Example
Delete a job:
Oban.delete_job(job)
:ok
Delete a job for a custom instance:
Oban.delete_job(MyOban, job)
:ok
"""
@doc since: "1.19.0"
@spec delete_job(name(), job_or_id :: Job.t() | integer()) :: :ok
def delete_job(name \\ __MODULE__, job_or_id) do
conf = config(name)

if is_integer(job_or_id) do
Engine.delete_job(conf, %Job{id: job_or_id})
else
Engine.delete_job(conf, job_or_id)
end
end

@doc """
Delete many jobs based on a queryable.
Only jobs that aren't `executing` may be deleted.
## Example
Delete all jobs for a specific worker:
Oban.Job
|> Ecto.Query.where(worker: "MyApp.MyWorker")
|> Oban.delete_all_jobs()
{:ok, 9}
"""
@doc since: "1.19.0"
@spec delete_all_jobs(name(), queryable :: Ecto.Queryable.t()) :: {:ok, non_neg_integer()}
def delete_all_jobs(name \\ __MODULE__, queryable) do
conf = config(name)

{:ok, deleted_jobs} = Engine.delete_all_jobs(conf, queryable)

{:ok, length(deleted_jobs)}
end

## Child Spec Helpers

defp plugin_child_spec({module, opts}, conf) do
Expand Down
46 changes: 39 additions & 7 deletions lib/oban/engine.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,25 +131,37 @@ defmodule Oban.Engine do
@callback cancel_job(conf(), Job.t()) :: :ok

@doc """
Mark a job as `available`, adding attempts if already maxed out. If the job is currently
`available`, `executing` or `scheduled` it should be ignored.
Mark many `executing`, `available`, `scheduled` or `retryable` job as `cancelled` to prevent them
from running.
"""
@callback retry_job(conf(), Job.t()) :: :ok
@callback cancel_all_jobs(conf(), queryable()) :: {:ok, [map()]}

@doc """
Mark many `executing`, `available`, `scheduled` or `retryable` job as `cancelled` to prevent them
from running.
Delete a job that isn't currently `executing`.
"""
@callback delete_job(conf(), Job.t()) :: :ok

@doc """
Delete many jobs that aren't currently `executing`.
"""
@callback delete_all_jobs(conf(), queryable()) :: {:ok, [map()]}

@doc """
Mark a job as `available`, adding attempts if already maxed out. If the job is currently
`available`, `executing` or `scheduled` it should be ignored.
"""
@callback cancel_all_jobs(conf(), queryable()) :: {:ok, [Job.t()]}
@callback retry_job(conf(), Job.t()) :: :ok

@doc """
Mark many jobs as `available`, adding attempts if already maxed out. Any jobs currently
`available`, `executing` or `scheduled` should be ignored.
"""
@callback retry_all_jobs(conf(), queryable()) :: {:ok, [Job.t()]}
@callback retry_all_jobs(conf(), queryable()) :: {:ok, [map()]}

@optional_callbacks [
check_available: 1,
delete_job: 2,
delete_all_jobs: 2,
insert_all_jobs: 5,
insert_job: 5,
prune_jobs: 3,
Expand Down Expand Up @@ -326,6 +338,26 @@ defmodule Oban.Engine do
end)
end

@doc false
def delete_job(%Config{} = conf, %Job{} = job) do
conf = with_compatible_engine(conf, :delete_job, 2)

with_span(:delete_job, conf, %{job: job}, fn engine ->
engine.delete_job(conf, job)
end)
end

@doc false
def delete_all_jobs(%Config{} = conf, queryable) do
conf = with_compatible_engine(conf, :delete_all_jobs, 2)

with_span(:delete_all_jobs, conf, %{queryable: queryable}, fn engine ->
with {:ok, jobs} <- engine.delete_all_jobs(conf, queryable) do
{:meta, {:ok, jobs}, %{jobs: jobs}}
end
end)
end

@doc false
def retry_job(%Config{} = conf, %Job{} = job) do
with_span(:retry_job, conf, %{job: job}, fn engine ->
Expand Down
21 changes: 21 additions & 0 deletions lib/oban/engines/basic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,27 @@ defmodule Oban.Engines.Basic do
{:ok, jobs}
end

@impl Engine
def delete_job(%Config{} = conf, %Job{id: id}) do
delete_all_jobs(conf, where(Job, [j], j.id == ^id))

:ok
end

@impl Engine
def delete_all_jobs(%Config{} = conf, queryable) do
subquery = where(queryable, [j], j.state not in ["executing"])

query =
Job
|> join(:inner, [j], x in subquery(subquery), on: j.id == x.id)
|> select([_, x], map(x, [:id, :queue, :state]))

{_, deleted} = Repo.delete_all(conf, query)

{:ok, deleted}
end

@impl Engine
def retry_job(%Config{} = conf, %Job{id: id}) do
retry_all_jobs(conf, where(Job, [j], j.id == ^id))
Expand Down
23 changes: 23 additions & 0 deletions lib/oban/engines/dolphin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,29 @@ defmodule Oban.Engines.Dolphin do
end)
end

@impl Engine
def delete_job(%Config{} = conf, %Job{id: id}) do
delete_all_jobs(conf, where(Job, [j], j.id == ^id))

:ok
end

@impl Engine
def delete_all_jobs(%Config{} = conf, queryable) do
select_query =
queryable
|> select([j], map(j, [:id, :queue, :state]))
|> where([j], j.state != "executing")

deleted = Repo.all(conf, select_query)

if Enum.any?(deleted) do
Repo.delete_all(conf, where(Job, [j], j.id in ^Enum.map(deleted, & &1.id)))
end

{:ok, deleted}
end

@impl Engine
def retry_job(%Config{} = conf, %Job{id: id}) do
retry_all_jobs(conf, where(Job, [j], j.id == ^id))
Expand Down
23 changes: 23 additions & 0 deletions lib/oban/engines/lite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,29 @@ defmodule Oban.Engines.Lite do
:ok
end

@impl Engine
def delete_job(%Config{} = conf, %Job{id: id}) do
delete_all_jobs(conf, where(Job, [j], j.id == ^id))

:ok
end

@impl Engine
def delete_all_jobs(%Config{} = conf, queryable) do
select_query =
queryable
|> select([j], map(j, [:id, :queue, :state]))
|> where([j], j.state != "executing")

deleted = Repo.all(conf, select_query)

if Enum.any?(deleted) do
Repo.delete_all(conf, where(Job, [j], j.id in ^Enum.map(deleted, & &1.id)))
end

{:ok, deleted}
end

@impl Engine
def cancel_all_jobs(%Config{} = conf, queryable) do
base_query = where(queryable, [j], j.state not in ["cancelled", "completed", "discarded"])
Expand Down
3 changes: 3 additions & 0 deletions lib/oban/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ defmodule Oban.Telemetry do
Events for bulk operations also include `:jobs` for the `:stop` event:
* `[:oban, :engine, :cancel_all_jobs, :start | :stop | :exception]`
* `[:oban, :engine, :delete_all_jobs, :start | :stop | :exception]`
* `[:oban, :engine, :fetch_jobs, :start | :stop | :exception]`
* `[:oban, :engine, :insert_all_jobs, :start | :stop | :exception]`
* `[:oban, :engine, :prune_jobs, :start | :stop | :exception]`
* `[:oban, :engine, :rescue_jobs, :start | :stop | :exception]`
* `[:oban, :engine, :retry_all_jobs, :start | :stop | :exception]`
* `[:oban, :engine, :stage_jobs, :start | :stop | :exception]`
Expand All @@ -86,6 +88,7 @@ defmodule Oban.Telemetry do
* `[:oban, :engine, :cancel_job, :start | :stop | :exception]`
* `[:oban, :engine, :complete_job, :start | :stop | :exception]`
* `[:oban, :engine, :delete_job, :start | :stop | :exception]`
* `[:oban, :engine, :discard_job, :start | :stop | :exception]`
* `[:oban, :engine, :error_job, :start | :stop | :exception]`
* `[:oban, :engine, :insert_job, :start | :stop | :exception]`
Expand Down
39 changes: 39 additions & 0 deletions test/oban/engine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,45 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite, Oban.Engines.Dolphin] do
end
end

describe "delete_job/2" do
setup :start_supervised_oban

test "deleting a single job", %{name: name} do
TelemetryHandler.attach_events(span_type: [:job, [:engine, :delete_job]])

job_1 = insert!(name, %{ref: 1}, [])
job_2 = insert!(name, %{ref: 2}, [])

assert :ok = Oban.delete_job(name, job_1)
assert :ok = Oban.delete_job(name, job_2.id)

refute reload(name, job_1)
refute reload(name, job_2)

assert_receive {:event, [:delete_job, :stop], _, %{job: _}}
end
end

describe "delete_all_jobs/2" do
setup :start_supervised_oban

test "deleting multiple jobs based on a query", %{name: name} do
TelemetryHandler.attach_events(span_type: [:job, [:engine, :delete_all_jobs]])

job_1 = insert!(name, %{ref: 1}, state: "available")
job_2 = insert!(name, %{ref: 2}, state: "scheduled")
job_3 = insert!(name, %{ref: 3}, state: "executing")

assert {:ok, 2} = Oban.delete_all_jobs(name, Job)

refute reload(name, job_1)
refute reload(name, job_2)
assert reload(name, job_3)

assert_receive {:event, [:delete_all_jobs, :stop], _, %{jobs: _}}
end
end

describe "retry_job/2" do
setup :start_supervised_oban

Expand Down

0 comments on commit 9f410d4

Please sign in to comment.