Skip to content

Commit

Permalink
Add flow interface
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Dec 6, 2023
1 parent ca8ca22 commit ae26b24
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 15 deletions.
2 changes: 0 additions & 2 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
TODOs:
- Mixed streams see ALF.MixStreamsTest ??? Do I need that?
- Limit number of tasks in manager
- Flow interface!!!
- Composer docs
18 changes: 18 additions & 0 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,24 @@ defmodule ALF.DSL do
ALF.Manager.stream(stream, __MODULE__, opts)
end

@spec flow(map(), list(), Keyword.t()) :: Enumerable.t()
def flow(flow, names, opts \\ [debug: false])

def flow(flow, names, opts) when is_map(flow) and is_list(names) do
Enum.reduce(flow, %{}, fn {name, stream}, acc ->
if name in names do
stream = ALF.Manager.stream(stream, __MODULE__, opts)
Map.put(acc, name, stream)
else
Map.put(acc, name, stream)
end
end)
end

def flow(flow, name, opts) when is_map(flow) do
flow(flow, [name], opts)
end

@spec components() :: list(map())
def components() do
ALF.Manager.components(__MODULE__)
Expand Down
16 changes: 8 additions & 8 deletions lib/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -402,14 +402,6 @@ defmodule ALF.Manager do
{:reply, {ips, tasks_count}, state}
end

defp maybe_wait(tasks_count) do
if tasks_count > 2 * @wait_tasks_count do
div = div(tasks_count, @wait_tasks_count)
to_sleep = trunc(:math.pow(2, div))
Process.sleep(to_sleep)
end
end

def handle_call({:done?, stream_ref}, _from, state) do
tasks_set = Map.fetch!(state.tasks, stream_ref)
{:reply, {MapSet.size(tasks_set) == 0, state.ips[stream_ref]}, state}
Expand Down Expand Up @@ -522,6 +514,14 @@ defmodule ALF.Manager do
end
end

defp maybe_wait(tasks_count) do
if tasks_count > 2 * @wait_tasks_count do
div = div(tasks_count, @wait_tasks_count)
to_sleep = trunc(:math.pow(2, div))
Process.sleep(to_sleep)
end
end

defp maybe_resubscribe(removed_stages, set_ref, component_pid) do
case Map.get(removed_stages, set_ref) do
nil ->
Expand Down
6 changes: 1 addition & 5 deletions test/examples/add_mult_minus_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ defmodule ALF.Examples.AddMultMinus.Pipeline do
stage(:minus_three)
]

def add_one(event, _) do
# Process.sleep(500)
event + 1
end

def add_one(event, _), do: event + 1
def mult_by_two(event, _), do: event * 2
def minus_three(event, _), do: event - 3
end
Expand Down
39 changes: 39 additions & 0 deletions test/integration/flow_interface.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule ALF.Integration.FlowInterfaceTest do
use ExUnit.Case, async: true

defmodule Pipeline do
use ALF.DSL

@components [
stage(:add_one),
stage(:mult_by_two),
stage(:minus_three)
]

def add_one(event, _), do: event + 1
def mult_by_two(event, _), do: event * 2
def minus_three(event, _), do: event - 3
end

setup do
Pipeline.start()
on_exit(&Pipeline.stop/0)
end

test "flow with list of names" do
flow = %{s1: [1, 2, 3], s2: [4, 5, 6], s3: [7, 8, 9]}

flow = Pipeline.flow(flow, [:s1, :s2])

assert Enum.sort(Enum.to_list(flow[:s1])) == [1, 3, 5]
assert Enum.sort(Enum.to_list(flow[:s2])) == [7, 9, 11]
assert Enum.sort(Enum.to_list(flow[:s3])) == [7, 8, 9]
end

test "flow with one name" do
flow = %{s1: [1, 2, 3], s2: [4, 5, 6]}
flow = Pipeline.flow(flow, :s1)
assert Enum.sort(Enum.to_list(flow[:s1])) == [1, 3, 5]
assert Enum.sort(Enum.to_list(flow[:s2])) == [4, 5, 6]
end
end

0 comments on commit ae26b24

Please sign in to comment.