diff --git a/lib/dsl.ex b/lib/dsl.ex index a7c7f87..89e4dfc 100644 --- a/lib/dsl.ex +++ b/lib/dsl.ex @@ -15,18 +15,10 @@ defmodule Strom.DSL do defstruct splitter: nil, opts: [], input: nil, partitions: %{} end - defmodule Function do - defstruct function: nil, opts: [], inputs: [] - end - defmodule Transform do defstruct function: nil, acc: nil, inputs: [], call: nil end - defmodule Module do - defstruct module: nil, opts: [], inputs: [], state: nil - end - defmodule Rename do defstruct names: nil, rename: nil end @@ -75,16 +67,6 @@ defmodule Strom.DSL do end end - defmacro function(inputs, function, opts \\ []) do - quote do - %Strom.DSL.Function{ - function: unquote(function), - opts: unquote(opts), - inputs: unquote(inputs) - } - end - end - defmacro transform(inputs, function, acc) do quote do %Strom.DSL.Transform{ @@ -105,16 +87,6 @@ defmodule Strom.DSL do end end - defmacro module(inputs, module, opts \\ []) do - quote do - %Strom.DSL.Module{ - module: unquote(module), - opts: unquote(opts), - inputs: unquote(inputs) - } - end - end - defmacro from(module, opts \\ []) do quote do unless is_atom(unquote(module)) do diff --git a/lib/flow.ex b/lib/flow.ex index fa4e24d..6348f4e 100644 --- a/lib/flow.ex +++ b/lib/flow.ex @@ -46,16 +46,9 @@ defmodule Strom.Flow do %DSL.Splitter{opts: opts} = splitter -> %{splitter | splitter: Strom.Splitter.start(opts)} - %DSL.Function{function: function, opts: opts} = fun -> - %{fun | function: Strom.Function.start(function, opts)} - %DSL.Transform{} = fun -> %{fun | call: Strom.Transformer.start()} - %DSL.Module{module: module, opts: opts} = mod -> - module = Strom.Module.start(module, opts) - %{mod | module: module} - %DSL.Rename{names: names} = ren -> rename = Strom.Renamer.start(names) %{ren | rename: rename} @@ -93,9 +86,6 @@ defmodule Strom.Flow do %DSL.Splitter{splitter: splitter, input: input, partitions: partitions} -> Strom.Splitter.call(flow, splitter, input, partitions) - %DSL.Function{function: function, inputs: inputs} -> - Strom.Function.call(flow, function, inputs) - %DSL.Transform{call: call, function: function, acc: acc, inputs: inputs} -> if is_function(function, 2) do Strom.Transformer.call(flow, call, inputs, {function, acc}) @@ -103,9 +93,6 @@ defmodule Strom.Flow do Strom.Transformer.call(flow, call, inputs, function) end - %DSL.Module{module: module, inputs: inputs} -> - Strom.Module.call(flow, module, inputs) - %DSL.Rename{rename: rename, names: names} -> Strom.Renamer.call(flow, rename, names) end @@ -130,14 +117,8 @@ defmodule Strom.Flow do %DSL.Splitter{splitter: splitter} -> Strom.Splitter.stop(splitter) - %DSL.Function{function: function} -> - Strom.Function.stop(function) - %DSL.Transform{call: call} -> Strom.Transformer.stop(call) - - %DSL.Module{module: module} -> - Strom.Module.stop(module) end end) diff --git a/lib/function.ex b/lib/function.ex deleted file mode 100644 index 51e0312..0000000 --- a/lib/function.ex +++ /dev/null @@ -1,59 +0,0 @@ -defmodule Strom.Function do - use GenServer - - defstruct function: nil, opts: nil, pid: nil - - def start(function, opts \\ nil) do - state = %__MODULE__{function: function, opts: opts} - - {:ok, pid} = GenServer.start_link(__MODULE__, state) - __state__(pid) - end - - @impl true - def init(%__MODULE__{} = state), do: {:ok, %{state | pid: self()}} - - def call(flow, %__MODULE__{function: function, pid: pid}, names) - when is_map(flow) and is_function(function) and is_list(names) do - streams = - Enum.reduce(names, %{}, fn name, acc -> - Map.put(acc, name, Map.fetch!(flow, name)) - end) - - sub_flow = - Enum.reduce(streams, %{}, fn {name, stream}, acc -> - stream = - Stream.map(stream, fn event -> - GenServer.call(pid, {:call, event}, :infinity) - end) - - Map.put(acc, name, stream) - end) - - Map.merge(flow, sub_flow) - end - - def call(flow, function, name) when is_map(flow), do: call(flow, function, [name]) - - def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop) - - def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__) - - @impl true - def handle_call({:call, event}, _from, state) do - new_event = - if is_function(state.function, 1) do - state.function.(event) - else - state.function.(event, state.opts) - end - - {:reply, new_event, state} - end - - def handle_call(:stop, _from, state) do - {:stop, :normal, :ok, state} - end - - def handle_call(:__state__, _from, state), do: {:reply, state, state} -end diff --git a/lib/loop.ex b/lib/loop.ex deleted file mode 100644 index 659ff84..0000000 --- a/lib/loop.ex +++ /dev/null @@ -1,67 +0,0 @@ -defmodule Strom.Loop do - use GenServer - - @default_timeout 5_000 - defstruct data: [], pid: nil, infinite: false, last_data_at: nil, timeout: @default_timeout - - def start, do: start([]) - - def start(%__MODULE__{} = loop), do: loop - - def start(opts) do - loop = %__MODULE__{ - timeout: Keyword.get(opts, :timeout, @default_timeout) - } - - {:ok, pid} = GenServer.start_link(__MODULE__, loop) - __state__(pid) - end - - @impl true - def init(%__MODULE__{} = loop), do: {:ok, %{loop | pid: self()}} - - def call(%__MODULE__{} = loop), do: GenServer.call(loop.pid, :get_data) - - def call(%__MODULE__{} = loop, data), do: GenServer.call(loop.pid, {:put_data, data}) - - def stop(%__MODULE__{} = loop), do: GenServer.call(loop.pid, :stop) - - def infinite?(%__MODULE__{infinite: infinite}), do: infinite - - def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__) - - @impl true - def handle_call(:get_data, _from, %__MODULE__{data: data} = loop) do - last_data_at = if is_nil(loop.last_data_at), do: time_now(), else: loop.last_data_at - loop = %{loop | data: [], last_data_at: last_data_at} - - case data do - [] -> - if time_now() - last_data_at > loop.timeout do - {:reply, {:error, {:halt, loop}}, loop} - else - {:reply, {:ok, {[], loop}}, loop} - end - - data -> - {:reply, {:ok, {data, loop}}, loop} - end - end - - def handle_call({:put_data, data}, _from, %__MODULE__{} = loop) do - loop = %{loop | data: loop.data ++ [data], last_data_at: time_now()} - {:reply, {:ok, {[], loop}}, loop} - end - - def handle_call(:stop, _from, %__MODULE__{} = loop) do - {:stop, :normal, :ok, loop} - end - - def handle_call(:__state__, _from, state), do: {:reply, state, state} - - defp time_now do - "Etc/UTC" - |> DateTime.now!() - |> DateTime.to_unix(:millisecond) - end -end diff --git a/lib/module.ex b/lib/module.ex deleted file mode 100644 index de74e02..0000000 --- a/lib/module.ex +++ /dev/null @@ -1,73 +0,0 @@ -defmodule Strom.Module do - # TODO define behaviour - use GenServer - - defstruct module: nil, pid: nil, opts: [], state: nil - - def start(module, opts \\ []) when is_atom(module) do - state = apply(module, :start, [opts]) - state = %__MODULE__{module: module, opts: opts, state: state} - {:ok, pid} = GenServer.start_link(__MODULE__, state) - __state__(pid) - end - - @impl true - def init(%__MODULE__{} = state), do: {:ok, %{state | pid: self()}} - - def call(flow, %__MODULE__{pid: pid} = state, names) - when is_map(flow) and is_list(names) do - streams = - Enum.reduce(names, %{}, fn name, acc -> - Map.put(acc, name, Map.fetch!(flow, name)) - end) - - sub_flow = - Enum.reduce(streams, %{}, fn {name, stream}, acc -> - stream = - if is_pipeline_module?(state.module) do - apply(state.module, :stream, [stream]) - else - Stream.transform(stream, state.state, fn event, acc -> - GenServer.call(pid, {:call, event, acc}, :infinity) - end) - end - - Map.put(acc, name, stream) - end) - - Map.merge(flow, sub_flow) - end - - def call(flow, state, name) when is_map(flow), do: call(flow, state, [name]) - - def stop(%__MODULE__{module: module, state: state, opts: opts, pid: pid}) do - if is_pipeline_module?(module) do - apply(module, :stop, []) - else - apply(module, :stop, [state, opts]) - end - - GenServer.call(pid, :stop) - end - - def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__) - - @impl true - def handle_call({:call, event, acc}, _from, state) do - {events, acc} = apply(state.module, :call, [event, acc, state.opts]) - - {:reply, {events, acc}, state} - end - - def handle_call(:stop, _from, state) do - {:stop, :normal, :ok, state} - end - - def handle_call(:__state__, _from, state), do: {:reply, state, state} - - defp is_pipeline_module?(module) when is_atom(module) do - is_list(module.alf_components()) - rescue - _error -> false - end -end diff --git a/lib/transformer.ex b/lib/transformer.ex index ea9a8ac..3bdf3f8 100644 --- a/lib/transformer.ex +++ b/lib/transformer.ex @@ -28,7 +28,9 @@ defmodule Strom.Transformer do end def call(flow, %__MODULE__{} = call, names, {function, acc}) - when is_map(flow) and is_list(names) and is_function(function, 3) do + when is_map(flow) and is_function(function, 3) do + names = if is_list(names), do: names, else: [names] + input_streams = Enum.reduce(names, %{}, fn name, streams -> Map.put(streams, {name, function, acc}, Map.fetch!(flow, name)) @@ -72,13 +74,13 @@ defmodule Strom.Transformer do end def call(flow, %__MODULE__{} = call, names, {function, acc}) - when is_map(flow) and is_list(names) and is_function(function, 2) do + when is_map(flow) and is_function(function, 2) do fun = fn el, acc, nil -> function.(el, acc) end call(flow, %__MODULE__{} = call, names, {fun, acc}) end def call(flow, %__MODULE__{} = call, names, function) - when is_map(flow) and is_list(names) and is_function(function, 1) do + when is_map(flow) and is_function(function, 1) do fun = fn el, nil, nil -> {[function.(el)], nil} end call(flow, %__MODULE__{} = call, names, {fun, nil}) end diff --git a/test/function_test.exs b/test/function_test.exs deleted file mode 100644 index 5d801a6..0000000 --- a/test/function_test.exs +++ /dev/null @@ -1,65 +0,0 @@ -defmodule Strom.FunctionTest do - use ExUnit.Case, async: true - - alias Strom.Function - alias Strom.Source - alias Strom.Source.ReadLines - - setup do - path = "test/data/orders.csv" - source = Source.start(%ReadLines{path: path}) - flow = Source.call(%{}, source, :orders) - %{flow: flow} - end - - test "start and stop" do - function = Function.start(&"foo-#{&1}") - assert Process.alive?(function.pid) - :ok = Function.stop(function) - refute Process.alive?(function.pid) - end - - test "function", %{flow: flow} do - function = Function.start(&"foo-#{&1}") - - %{orders: orders} = Function.call(flow, function, [:orders]) - orders = Enum.to_list(orders) - Enum.each(orders, fn line -> assert String.starts_with?(line, "foo-") end) - assert length(orders) == length(String.split(File.read!("test/data/orders.csv"), "\n")) - end - - test "with several streams", %{flow: flow} do - path = "test/data/parcels.csv" - source2 = Source.start(%ReadLines{path: path}) - - function = - Function.start(&"foo-#{&1}") - - %{orders: orders, parcels: parcels} = - flow - |> Source.call(source2, :parcels) - |> Function.call(function, [:parcels]) - - parcels = Enum.to_list(parcels) - Enum.each(parcels, fn line -> assert String.starts_with?(line, "foo-") end) - assert length(parcels) == length(String.split(File.read!("test/data/parcels.csv"), "\n")) - - orders = Enum.to_list(orders) - assert Enum.join(orders, "\n") == File.read!("test/data/orders.csv") - end - - test "when applied to empty flow" do - function = Function.start(&"foo-#{&1}") - - assert_raise KeyError, fn -> - Function.call(%{}, function, [:orders]) - end - end - - test "with extra argument" do - function = Function.start(fn event, extra -> "#{extra}-#{event}" end, "foo") - - %{events: stream} = Function.call(%{events: [1, 2, 3]}, function, :events) - assert Enum.to_list(stream) == ["foo-1", "foo-2", "foo-3"] - end -end diff --git a/test/integration/sleep_test.exs b/test/integration/sleep_test.exs deleted file mode 100644 index a52182b..0000000 --- a/test/integration/sleep_test.exs +++ /dev/null @@ -1,26 +0,0 @@ -defmodule Strom.Integration.SleepTest do - use ExUnit.Case, async: false - - # test "sleep in mixer" do - # flow = %{s1: Stream.cycle([1, 2, 3]), s2: Stream.cycle([10, 20, 30])} - # - # sleep_fun = - # Strom.Function.start( - # &Stream.map(&1, fn el -> - # Process.sleep(100) - # el - # end) - # ) - # - # to_string = Strom.Function.start(&Stream.map(&1, fn el -> "#{el}" end)) - # - # sink = Strom.Sink.start(%Strom.Sink.WriteLines{path: "test_data/sleep.txt"}) - # - # flow - # |> Strom.Function.call(sleep_fun, :s1) - # |> Strom.Mixer.call(Strom.Mixer.start(), [:s1, :s2], :stream) - # |> Strom.Function.call(to_string, :stream) - # |> Strom.Function.call(sleep_fun, :stream) - # |> Strom.Sink.call(sink, :stream, true) - # end -end diff --git a/test/loop_test.exs b/test/loop_test.exs deleted file mode 100644 index 28a2948..0000000 --- a/test/loop_test.exs +++ /dev/null @@ -1,30 +0,0 @@ -defmodule Strom.LoopTest do - use ExUnit.Case - - test "loop" do - flow = %{stream: [1, 2, 3]} - - plus_one = - Strom.Function.start(&(&1 + 1)) - - mixer = Strom.Mixer.start() - splitter = Strom.Splitter.start() - - loop = Strom.Loop.start(timeout: 100) - source_loop = Strom.Source.start(loop) - sink_loop = Strom.Sink.start(loop) - - flow = - flow - |> Strom.Source.call(source_loop, :looped) - |> Strom.Mixer.call(mixer, [:looped, :stream], :merged) - |> Strom.Function.call(plus_one, :merged) - |> Strom.Splitter.call(splitter, :merged, %{ - ok: fn el -> el >= 10 end, - not_ok: fn el -> el < 10 end - }) - |> Strom.Sink.call(sink_loop, :not_ok, true) - - assert Enum.to_list(flow[:ok]) == [10, 10, 10] - end -end diff --git a/test/module_test.exs b/test/module_test.exs deleted file mode 100644 index 426a944..0000000 --- a/test/module_test.exs +++ /dev/null @@ -1,70 +0,0 @@ -defmodule Strom.ModuleTest do - use ExUnit.Case, async: true - - alias Strom.Module - alias Strom.Source - alias Strom.Source.ReadLines - - defmodule MyModule do - defstruct state: nil - - def start(_opts) do - :memo - end - - def call(event, memo, opts) do - {["#{opts[:prefix]}-#{event}"], memo} - end - - def stop(:memo, opts), do: opts - end - - setup do - path = "test/data/orders.csv" - source = Source.start(%ReadLines{path: path}) - flow = Source.call(%{}, source, :orders) - %{flow: flow} - end - - test "start and stop" do - module = Module.start(MyModule, prefix: "foo") - assert Process.alive?(module.pid) - :ok = Module.stop(module) - refute Process.alive?(module.pid) - end - - test "module", %{flow: flow} do - module = Module.start(MyModule, prefix: "foo") - %{orders: orders} = Module.call(flow, module, [:orders]) - orders = Enum.to_list(orders) - Enum.each(orders, fn line -> assert String.starts_with?(line, "foo-") end) - assert length(orders) == length(String.split(File.read!("test/data/orders.csv"), "\n")) - end - - test "with several streams", %{flow: flow} do - path = "test/data/parcels.csv" - source2 = Source.start(%ReadLines{path: path}) - - module = Module.start(MyModule, prefix: "foo") - - %{orders: orders, parcels: parcels} = - flow - |> Source.call(source2, :parcels) - |> Module.call(module, [:parcels]) - - parcels = Enum.to_list(parcels) - Enum.each(parcels, fn line -> assert String.starts_with?(line, "foo-") end) - assert length(parcels) == length(String.split(File.read!("test/data/parcels.csv"), "\n")) - - orders = Enum.to_list(orders) - assert Enum.join(orders, "\n") == File.read!("test/data/orders.csv") - end - - test "when applied to empty flow" do - module = Module.start(MyModule, prefix: "foo") - - assert_raise KeyError, fn -> - Module.call(%{}, module, [:orders]) - end - end -end diff --git a/test/transformer_test.exs b/test/transformer_test.exs index aacc7e8..0187917 100644 --- a/test/transformer_test.exs +++ b/test/transformer_test.exs @@ -4,25 +4,32 @@ defmodule Strom.TransformerTest do alias Strom.Transformer test "start and stop" do - call = Transformer.start() - assert Process.alive?(call.pid) - :ok = Transformer.stop(call) - refute Process.alive?(call.pid) + transformer = Transformer.start() + assert Process.alive?(transformer.pid) + :ok = Transformer.stop(transformer) + refute Process.alive?(transformer.pid) end test "call" do - call = Transformer.start() + transformer = Transformer.start() flow = %{numbers1: [1, 2, 3, 4, 5], numbers2: [6, 7, 8, 9, 10], numbers3: [0, 0, 0, 0, 0]} fun = &(&1 * &1) - flow = Transformer.call(flow, call, [:numbers1, :numbers2], fun) + flow = Transformer.call(flow, transformer, [:numbers1, :numbers2], fun) assert Enum.sort(Enum.to_list(flow[:numbers1])) == [1, 4, 9, 16, 25] assert Enum.sort(Enum.to_list(flow[:numbers2])) == [36, 49, 64, 81, 100] assert Enum.sort(Enum.to_list(flow[:numbers3])) == [0, 0, 0, 0, 0] end + test "call with one stream" do + transformer = Transformer.start() + flow = %{numbers1: [1, 2, 3, 4, 5]} + flow = Transformer.call(flow, transformer, :numbers1, &(&1 * &1)) + assert Enum.sort(Enum.to_list(flow[:numbers1])) == [1, 4, 9, 16, 25] + end + test "call with accumulator" do - call = Transformer.start() + transformer = Transformer.start() flow = %{numbers1: [1, 2, 3, 4, 5], numbers2: [6, 7, 8, 9, 10], numbers3: [0, 0, 0, 0, 0]} @@ -30,7 +37,7 @@ defmodule Strom.TransformerTest do {[el, acc], acc + 1} end - flow = Transformer.call(flow, call, [:numbers1, :numbers2], {fun, 100}) + flow = Transformer.call(flow, transformer, [:numbers1, :numbers2], {fun, 100}) assert Enum.sort(Enum.to_list(flow[:numbers1])) == [1, 2, 3, 4, 5, 100, 101, 102, 103, 104] assert Enum.sort(Enum.to_list(flow[:numbers2])) == [6, 7, 8, 9, 10, 100, 101, 102, 103, 104] @@ -38,7 +45,7 @@ defmodule Strom.TransformerTest do end test "call with opts and accumulator" do - call = Transformer.start(opts: %{add: 1}) + transformer = Transformer.start(opts: %{add: 1}) flow = %{numbers1: [1, 2, 3, 4, 5], numbers2: [6, 7, 8, 9, 10], numbers3: [0, 0, 0, 0, 0]} @@ -46,7 +53,7 @@ defmodule Strom.TransformerTest do {[el, acc], acc + opts[:add]} end - flow = Transformer.call(flow, call, [:numbers1, :numbers2], {fun, 100}) + flow = Transformer.call(flow, transformer, [:numbers1, :numbers2], {fun, 100}) assert Enum.sort(Enum.to_list(flow[:numbers1])) == [1, 2, 3, 4, 5, 100, 101, 102, 103, 104] assert Enum.sort(Enum.to_list(flow[:numbers2])) == [6, 7, 8, 9, 10, 100, 101, 102, 103, 104]