diff --git a/lib/oban.ex b/lib/oban.ex index 5038b154..2f7fc0d0 100644 --- a/lib/oban.ex +++ b/lib/oban.ex @@ -667,11 +667,18 @@ defmodule Oban do `* (Ecto.InvalidChangesetError) could not perform insert because changeset is invalid.` - > #### 🌟 Unique Jobs and Batching {: .warning} + #### Dolphin Engine and Generated Values + + MySQL doesn't return anything on insertion into the database. That means any values generated by + the database, namely the primary key and timestamps, aren't included in the job structs returned + from `insert_all`. + + > #### 🌟 Unique Jobs and Batching {: .tip} > > Only the [Smart Engine](https://oban.pro/docs/pro/Oban.Pro.Engines.Smart.html) in [Oban - > Pro](https://oban.pro) supports bulk unique jobs and automatic batching. With the basic - > engine, you must use `insert/3` for unique support. + > Pro](https://oban.pro) supports bulk unique jobs, automatic insert batching, and minimizes + > parameters sent over the wire. With the basic engine, you must use `insert/3` to insert unique + > jobs one at a time. ## Options diff --git a/lib/oban/engines/dolphin.ex b/lib/oban/engines/dolphin.ex index ae747502..669e6342 100644 --- a/lib/oban/engines/dolphin.ex +++ b/lib/oban/engines/dolphin.ex @@ -67,25 +67,11 @@ defmodule Oban.Engines.Dolphin do @impl Engine def insert_all_jobs(%Config{} = conf, changesets, opts) do - # MySQL doesn't return a primary key from a bulk insert, which violates the insert_all_jobs - # contract. Inserting one at a time is far less efficient, but it does what's required. - {:ok, jobs} = - Repo.transaction(conf, fn -> - Enum.map(changesets, fn changeset -> - case insert_job(conf, changeset, opts) do - {:ok, job} -> - job - - {:error, %Changeset{} = changeset} -> - raise Ecto.InvalidChangesetError, action: :insert, changeset: changeset + jobs = Enum.map(changesets, &Job.to_map/1) - {:error, reason} -> - raise RuntimeError, inspect(reason) - end - end) - end) + {_count, _jobs} = Repo.insert_all(conf, Job, jobs, opts) - jobs + Enum.map(changesets, &Ecto.Changeset.apply_action!(&1, :insert)) end @impl Engine diff --git a/test/oban/engine_test.exs b/test/oban/engine_test.exs index 702a6d71..f358f903 100644 --- a/test/oban/engine_test.exs +++ b/test/oban/engine_test.exs @@ -659,12 +659,14 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite, Oban.Engines.Dolphin] do test "inserting and executing jobs", %{name: name} do TelemetryHandler.attach_events() - changesets = + [job_1, job_2, job_3, job_4, job_5] = ~w(OK CANCEL DISCARD ERROR SNOOZE) |> Enum.with_index(1) - |> Enum.map(fn {act, ref} -> Worker.new(%{action: act, ref: ref}) end) - - [job_1, job_2, job_3, job_4, job_5] = Oban.insert_all(name, changesets) + |> Enum.map(fn {act, ref} -> + %{action: act, ref: ref} + |> Worker.new() + |> then(&Oban.insert!(name, &1)) + end) assert_receive {:event, [:fetch_jobs, :stop], _, %{jobs: _}} @@ -685,12 +687,14 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite, Oban.Engines.Dolphin] do @tag :capture_log test "safely executing jobs with any type of exit", %{name: name} do - changesets = + jobs = ~w(EXIT KILL TASK_ERROR TASK_EXIT) |> Enum.with_index(1) - |> Enum.map(fn {act, ref} -> Worker.new(%{action: act, ref: ref}) end) - - jobs = Oban.insert_all(name, changesets) + |> Enum.map(fn {act, ref} -> + %{action: act, ref: ref} + |> Worker.new() + |> then(&Oban.insert!(name, &1)) + end) assert_receive {:exit, 1} assert_receive {:kill, 2} @@ -732,11 +736,8 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite, Oban.Engines.Dolphin] do end test "discarding jobs that exceed max attempts", %{name: name} do - [job_1, job_2] = - Oban.insert_all(name, [ - Worker.new(%{action: "ERROR", ref: 1}, max_attempts: 1), - Worker.new(%{action: "ERROR", ref: 2}, max_attempts: 2) - ]) + job_1 = Oban.insert!(name, Worker.new(%{action: "ERROR", ref: 1}, max_attempts: 1)) + job_2 = Oban.insert!(name, Worker.new(%{action: "ERROR", ref: 2}, max_attempts: 2)) assert_receive {:error, 1} assert_receive {:error, 2} diff --git a/test/oban/plugins/lifeline_test.exs b/test/oban/plugins/lifeline_test.exs index 067b1658..2ae88398 100644 --- a/test/oban/plugins/lifeline_test.exs +++ b/test/oban/plugins/lifeline_test.exs @@ -78,11 +78,11 @@ defmodule Oban.Plugins.LifelineTest do repo: DolphinRepo ) - [job_1, job_2] = - Oban.insert_all(name, [ - Worker.new(%{}, state: "executing", attempted_at: seconds_ago(3)), - Worker.new(%{}, state: "executing", attempted_at: seconds_ago(8)) - ]) + job_1 = + Oban.insert!(name, Worker.new(%{}, state: "executing", attempted_at: seconds_ago(3))) + + job_2 = + Oban.insert!(name, Worker.new(%{}, state: "executing", attempted_at: seconds_ago(8))) send_rescue(name)