Skip to content

Commit

Permalink
Support passing changeset streams to insert_all
Browse files Browse the repository at this point in the history
Accepting streams makes `Oban.insert_all` more flexible and may, with
alternative engines, make it possible to reduce memory usage for streams
of large resources.
  • Loading branch information
sorentwo committed Jan 8, 2024
1 parent 61e30f7 commit baf3d8e
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 6 deletions.
15 changes: 12 additions & 3 deletions lib/oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ defmodule Oban do

defguardp is_list_or_wrapper(cw)
when is_list(cw) or
(is_map(cw) and is_map_key(cw, :changesets) and is_list(cw.changesets)) or
is_function(cw, 1)
is_struct(cw, Stream) or
is_function(cw, 1) or
(is_map_key(cw, :changesets) and is_list(cw.changesets)) or
(is_map_key(cw, :changesets) and is_struct(cw.changesets, Stream))

@doc """
Creates a facade for `Oban` functions and automates fetching configuration from the application
Expand Down Expand Up @@ -602,12 +604,19 @@ defmodule Oban do
## Example
Insert 100 jobs with a single operation:
Insert a list of 100 jobs at once:
1..100
|> Enum.map(&MyApp.Worker.new(%{id: &1}))
|> Oban.insert_all()
Insert a stream of jobs at once (be sure the stream terminates!):
(fn -> MyApp.Worker.new(%{}))
|> Stream.repeatedly()
|> Stram.take(100)
|> Oban.insert_all()
Insert with a custom timeout:
1..100
Expand Down
1 change: 1 addition & 0 deletions lib/oban/engine.ex
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ defmodule Oban.Engine do

defp expand(fun, changes) when is_function(fun, 1), do: expand(fun.(changes), changes)
defp expand(%{changesets: changesets}, _), do: expand(changesets, %{})
defp expand(changesets, _) when is_struct(changesets, Stream), do: changesets
defp expand(changesets, _) when is_list(changesets), do: changesets

defp with_span(event, %Config{} = conf, base_meta, fun) do
Expand Down
2 changes: 1 addition & 1 deletion lib/oban/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ defmodule Oban.Job do

@type changeset :: Ecto.Changeset.t(t())
@type changeset_fun :: (map() -> changeset())
@type changeset_list :: [changeset()]
@type changeset_list :: Enumerable.t(changeset())
@type changeset_list_fun :: (map() -> changeset_list())

schema "oban_jobs" do
Expand Down
16 changes: 14 additions & 2 deletions test/oban/engine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,22 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do
assert_receive {:event, [:insert_all_jobs, :stop], _, %{jobs: _, opts: []}}
end

test "inserting multiple jobs with a stream", %{name: name} do
changesets = Stream.map(0..1, &Worker.new(%{ref: &1}))

[_job_1, _job_2] = Oban.insert_all(name, changesets)
end

test "inserting multiple jobs from a changeset wrapper", %{name: name} do
wrap = %{changesets: [Worker.new(%{ref: 0}), Worker.new(%{ref: 1})]}
changesets = [Worker.new(%{ref: 0}), Worker.new(%{ref: 1})]

[_job_1, _job_2] = Oban.insert_all(name, %{changesets: changesets})
end

test "inserting multiple jobs from a changeset wrapped stream", %{name: name} do
changesets = Stream.map(0..1, &Worker.new(%{ref: &1}))

[_job_1, _job_2] = Oban.insert_all(name, wrap)
[_job_1, _job_2] = Oban.insert_all(name, %{changesets: changesets})
end

test "handling empty changesets list from a wrapper", %{name: name} do
Expand Down
6 changes: 6 additions & 0 deletions test/support/exercise.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ defmodule Oban.Test.Exercise do

def check_insert_all do
changeset = changeset()
stream = Stream.duplicate(changeset, 1)

Check warning on line 23 in test/support/exercise.ex

View workflow job for this annotation

GitHub Actions / ci (1.13, 23.3, 12.13-alpine, lite)

Stream.duplicate/2 is undefined or private
wrapper = %{changesets: [changeset]}

[_ | _] = Oban.insert_all([changeset])
[_ | _] = Oban.insert_all(Oban, [changeset])
[_ | _] = Oban.insert_all([changeset], timeout: 500)
[_ | _] = Oban.insert_all(Oban, [changeset], timeout: 500)

[_ | _] = Oban.insert_all(stream)
[_ | _] = Oban.insert_all(Oban, stream)
[_ | _] = Oban.insert_all(stream, timeout: 500)
[_ | _] = Oban.insert_all(Oban, stream, timeout: 500)

[_ | _] = Oban.insert_all(wrapper)
[_ | _] = Oban.insert_all(Oban, wrapper)
[_ | _] = Oban.insert_all(wrapper, timeout: 500)
Expand Down

0 comments on commit baf3d8e

Please sign in to comment.