diff --git a/README.md b/README.md index d7e788d..a4dcfec 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ #### ALF is NOT a general-purpose "language", so implementing a complex domain logic with it might be questionable (although it is possible). -#### ALF is a successor of the [Flowex](https://github.com/antonmi/flowex) project. Check it's [README](https://github.com/antonmi/flowex#readme) to get the general idea. ALF adds conditional branching, packet cloning, goto statement, decomposer/recomposer and other functionalities. Therefore, one can create application trees (graphs) of arbitrary complexity. +#### ALF is a successor of the [Flowex](https://github.com/antonmi/flowex) project. Check it's [README](https://github.com/antonmi/flowex#readme) to get the general idea. ALF adds conditional branching, packet cloning, goto statement and other functionalities. Therefore, one can create application trees (graphs) of arbitrary complexity. ### Something to read and watch @@ -330,58 +330,10 @@ Event won't propagate further. It's used alongside with the `Clone` component to ```elixir dead_end(:dead_end) ``` -### Decomposer and Recomposer +### Composer -These components transform IPs. Decomposer creates several IPs based on one input IP. Recomposer does the opposite - creates a single IP based on a list of previously received IPs. -Decomposer must implement a 2-arity function (or a module with the `call` function) that return either a list of new events: +TODO -```elixir -def decomposer_function(event, _opts) do - [event + 1, event + 2, event + 3] -end -``` - -or the the `{list(event), event}` tuple: - -```elixir -def decomposer_function(event, _opts) do - {[event + 1, event + 2, event + 3], event * 100} -end -``` - -In the first case the initial IP will disappear and a list of new IPs will be created based on athe returned events. -In the second case, the `event` of original IP will be replaced by the event from the second value in the tuple. - -Recomposer is a bit more tricky. It is a 3-arity function (module with the `call` function). The first argument is incoming `event`, the second one is a list of previously accumulated events, and the last one is `opts` -The function must return `event`, `{event, list(event)}` or `:continue` atom. -For example: - -```elixir - def recomposer_function(event, prev_events, _opts) do - sum = Enum.reduce(prev_events, 0, &(&1 + &2)) + event - - case sum > 5 do - true -> sum - false -> :continue - end - end -``` - -The component will return one event (`sum`) if the sum of previously received events is more than 5. If no, the events are just store in the component. -`{event, list(event)}` tuple allows to return a event and also specify what to store till another call. - -```elixir -def recomposer_function_tuple(event, prev_events, _opts) do - sum = Enum.reduce(prev_events, 0, &(&1 + &2)) + event - - case sum > 5 do - true -> {sum, [hd(prev_events)]} - false -> :continue - end -end -``` - -In that case, the sum will be returned and the first `event` from `prev_events` will be stored. See the [telegram_test.exs](https://github.com/antonmi/ALF/tree/main/test/examples/telegram_test.exs) example which solves the famous "Telegram Problem". ## Implicit components diff --git a/lib/components/basic.ex b/lib/components/basic.ex index cfbdb28..5f4ba39 100644 --- a/lib/components/basic.ex +++ b/lib/components/basic.ex @@ -110,7 +110,7 @@ defmodule ALF.Components.Basic do end end - @type result :: :cloned | :destroyed | :created_decomposer | :created_recomposer + @type result :: :cloned | :destroyed | :composed @spec send_result(IP.t() | ErrorIP.t(), result | IP.t() | ErrorIP.t()) :: IP.t() | ErrorIP.t() diff --git a/lib/components/decomposer.ex b/lib/components/decomposer.ex deleted file mode 100644 index ae62456..0000000 --- a/lib/components/decomposer.ex +++ /dev/null @@ -1,167 +0,0 @@ -defmodule ALF.Components.Decomposer do - use ALF.Components.Basic - - defstruct Basic.common_attributes() ++ - [ - type: :decomposer, - module: nil, - function: nil, - source_code: nil - ] - - alias ALF.DSLError - @dsl_options [:opts, :name, :count] - - @spec start_link(t()) :: GenServer.on_start() - def start_link(%__MODULE__{} = state) do - GenStage.start_link(__MODULE__, state) - end - - @impl true - def init(state) do - state = %{ - state - | pid: self(), - opts: init_opts(state.module, state.opts), - source_code: state.source_code || read_source_code(state.module, state.function) - } - - component_added(state) - {:producer_consumer, state} - end - - @spec init_sync(t(), boolean) :: t() - def init_sync(state, telemetry) do - %{ - state - | pid: make_ref(), - opts: init_opts(state.module, state.opts), - source_code: state.source_code || read_source_code(state.module, state.function), - telemetry: telemetry - } - end - - @impl true - def handle_events([%ALF.IP{} = ip], _from, %__MODULE__{telemetry: true} = state) do - :telemetry.span( - [:alf, :component], - telemetry_data(ip, state), - fn -> - case process_ip(ip, state) do - {[], state} -> - {{:noreply, [], state}, telemetry_data(nil, state)} - - {ips, state} -> - {{:noreply, ips, state}, telemetry_data(ips, state)} - end - end - ) - end - - def handle_events([%ALF.IP{} = ip], _from, %__MODULE__{telemetry: false} = state) do - case process_ip(ip, state) do - {[], state} -> - {:noreply, [], state} - - {ips, state} -> - {:noreply, ips, state} - end - end - - defp process_ip(ip, state) do - case call_function(state.module, state.function, ip.event, state.opts) do - {:ok, events} when is_list(events) -> - ips = build_ips(events, ip, history(ip, state)) - - Enum.each(ips, &send_result(&1, :created_decomposer)) - send_result(ip, :destroyed) - - {ips, state} - - {:ok, {events, event}} when is_list(events) -> - ips = build_ips(events, ip, history(ip, state)) - - Enum.each(ips, &send_result(&1, :created_decomposer)) - - ip = %{ip | event: event, history: history(ip, state)} - {ips ++ [ip], state} - - {:error, error, stacktrace} -> - send_error_result(ip, error, stacktrace, state) - {[], state} - end - end - - def sync_process(ip, %__MODULE__{telemetry: false} = state) do - do_sync_process(ip, state) - end - - def sync_process(ip, %__MODULE__{telemetry: true} = state) do - :telemetry.span( - [:alf, :component], - telemetry_data(ip, state), - fn -> - ips = do_sync_process(ip, state) - {ips, telemetry_data(ips, state)} - end - ) - end - - defp do_sync_process(ip, state) do - case call_function(state.module, state.function, ip.event, state.opts) do - {:ok, events} when is_list(events) -> - build_ips(events, ip, history(ip, state)) - - {:ok, {events, event}} when is_list(events) -> - ips = build_ips(events, ip, history(ip, state)) - - ip = %{ip | event: event, history: history(ip, state)} - [ip | ips] - - {:error, error, stacktrace} -> - build_error_ip(ip, error, stacktrace, state) - end - end - - defp build_ips(events, ip, history) do - events - |> Enum.map(fn event -> - %IP{ - stream_ref: ip.stream_ref, - destination: ip.destination, - ref: ip.ref, - init_event: event, - event: event, - pipeline_module: ip.pipeline_module, - decomposed: true, - debug: ip.debug, - history: history, - sync_path: ip.sync_path - } - end) - end - - def validate_options(name, options) do - wrong_options = Keyword.keys(options) -- @dsl_options - - unless is_atom(name) do - raise DSLError, "Decomposer name must be an atom: #{inspect(name)}" - end - - if Enum.any?(wrong_options) do - raise DSLError, - "Wrong options for the #{name} decomposer: #{inspect(wrong_options)}. " <> - "Available options are #{inspect(@dsl_options)}" - end - end - - defp call_function(module, function, event, opts) when is_atom(module) and is_atom(function) do - {:ok, apply(module, function, [event, opts])} - rescue - error -> - {:error, error, __STACKTRACE__} - catch - kind, value -> - {:error, kind, value} - end -end diff --git a/lib/components/recomposer.ex b/lib/components/recomposer.ex deleted file mode 100644 index ac48c33..0000000 --- a/lib/components/recomposer.ex +++ /dev/null @@ -1,254 +0,0 @@ -defmodule ALF.Components.Recomposer do - use ALF.Components.Basic - - defstruct Basic.common_attributes() ++ - [ - type: :recomposer, - module: nil, - function: nil, - source_code: nil, - collected_ips: [], - new_collected_ips: %{} - ] - - alias ALF.{DSLError, IP, ErrorIP} - - @dsl_options [:name, :opts, :count] - - @spec start_link(t()) :: GenServer.on_start() - def start_link(%__MODULE__{} = state) do - GenStage.start_link(__MODULE__, state) - end - - @impl true - def init(state) do - state = %{ - state - | pid: self(), - opts: init_opts(state.module, state.opts), - source_code: state.source_code || read_source_code(state.module, state.function) - } - - component_added(state) - {:producer_consumer, state} - end - - def init_sync(state, telemetry) do - %{ - state - | pid: make_ref(), - opts: init_opts(state.module, state.opts), - source_code: state.source_code || read_source_code(state.module, state.function), - telemetry: telemetry - } - end - - @impl true - def handle_events([%ALF.IP{} = ip], _from, %__MODULE__{telemetry: true} = state) do - :telemetry.span( - [:alf, :component], - telemetry_data(ip, state), - fn -> - case process_ip(ip, state) do - {nil, state} -> - {{:noreply, [], state}, telemetry_data(nil, state)} - - {%IP{} = ip, state} -> - {{:noreply, [ip], state}, telemetry_data(ip, state)} - - {%ErrorIP{} = error_ip, state} -> - {{:noreply, [], state}, telemetry_data(error_ip, state)} - end - end - ) - end - - def handle_events([%ALF.IP{} = ip], _from, %__MODULE__{telemetry: false} = state) do - case process_ip(ip, state) do - {nil, state} -> - {:noreply, [], state} - - {%IP{} = ip, state} -> - {:noreply, [ip], state} - - {%ErrorIP{}, state} -> - {:noreply, [], state} - end - end - - defp process_ip(current_ip, state) do - collected_data = - Enum.map(Map.get(state.new_collected_ips, current_ip.stream_ref, []), & &1.event) - - history = history(current_ip, state) - - case call_function( - state.module, - state.function, - current_ip.event, - collected_data, - state.opts - ) do - {:ok, :continue} -> - send_result(current_ip, :destroyed) - - collected = Map.get(state.new_collected_ips, current_ip.stream_ref, []) ++ [current_ip] - - {nil, - %{ - state - | new_collected_ips: Map.put(state.new_collected_ips, current_ip.stream_ref, collected) - }} - - {:ok, {nil, events}} -> - send_result(current_ip, :destroyed) - - collected = - Enum.map(events, fn event -> - build_ip(event, current_ip, history) - end) - - {nil, - %{ - state - | new_collected_ips: Map.put(state.new_collected_ips, current_ip.stream_ref, collected) - }} - - {:ok, {event, events}} -> - ip = build_ip(event, current_ip, history) - - send_result(ip, :created_recomposer) - - collected = - Enum.map(events, fn event -> - build_ip(event, ip, history) - end) - - {ip, - %{ - state - | new_collected_ips: Map.put(state.new_collected_ips, current_ip.stream_ref, collected) - }} - - {:ok, event} -> - ip = build_ip(event, current_ip, history) - - send_result(ip, :created_recomposer) - - {ip, - %{ - state - | new_collected_ips: Map.put(state.new_collected_ips, current_ip.stream_ref, []) - }} - - {:error, error, stacktrace} -> - error_ip = send_error_result(current_ip, error, stacktrace, state) - {error_ip, state} - end - end - - def sync_process(ip, %__MODULE__{telemetry: false} = state) do - do_sync_process(ip, state) - end - - def sync_process(ip, %__MODULE__{telemetry: true} = state) do - :telemetry.span( - [:alf, :component], - telemetry_data(ip, state), - fn -> - ip = do_sync_process(ip, state) - {ip, telemetry_data(ip, state)} - end - ) - end - - defp do_sync_process(ip, state) do - collected_ips = get_from_process_dict({state.pid, ip.stream_ref}) - collected_data = Enum.map(collected_ips, & &1.event) - history = history(ip, state) - - case call_function( - state.module, - state.function, - ip.event, - collected_data, - state.opts - ) do - {:ok, :continue} -> - collected_ips = collected_ips ++ [ip] - put_to_process_dict({state.pid, ip.stream_ref}, collected_ips) - nil - - {:ok, {nil, events}} -> - collected = - Enum.map(events, fn event -> - build_ip(event, ip, history) - end) - - put_to_process_dict({state.pid, ip.stream_ref}, collected) - nil - - {:ok, {event, events}} -> - ip = build_ip(event, ip, history) - - collected = - Enum.map(events, fn event -> - build_ip(event, ip, history) - end) - - put_to_process_dict({state.pid, ip.stream_ref}, collected) - ip - - {:ok, event} -> - put_to_process_dict({state.pid, ip.stream_ref}, []) - build_ip(event, ip, history) - - {:error, error, stacktrace} -> - send_error_result(ip, error, stacktrace, state) - end - end - - defp build_ip(event, ip, history) do - %IP{ - stream_ref: ip.stream_ref, - ref: ip.ref, - destination: ip.destination, - init_event: event, - event: event, - pipeline_module: ip.pipeline_module, - recomposed: true, - debug: ip.debug, - history: history, - sync_path: ip.sync_path - } - end - - def validate_options(name, options) do - wrong_options = Keyword.keys(options) -- @dsl_options - - unless is_atom(name) do - raise DSLError, "Recomposer name must be an atom: #{inspect(name)}" - end - - if Enum.any?(wrong_options) do - raise DSLError, - "Wrong options for the #{name} recomposer: #{inspect(wrong_options)}. " <> - "Available options are #{inspect(@dsl_options)}" - end - end - - defp call_function(module, function, event, collected_data, opts) - when is_atom(module) and is_atom(function) do - {:ok, apply(module, function, [event, collected_data, opts])} - rescue - error -> - {:error, error, __STACKTRACE__} - catch - kind, value -> - {:error, kind, value} - end - - defp get_from_process_dict(key), do: Process.get(key, []) - - defp put_to_process_dict(key, ips), do: Process.put(key, ips) -end diff --git a/lib/dsl.ex b/lib/dsl.ex index 45ad623..879edf0 100644 --- a/lib/dsl.ex +++ b/lib/dsl.ex @@ -10,8 +10,6 @@ defmodule ALF.DSL do Done, Plug, Unplug, - Decomposer, - Recomposer, Composer, Tbd } @@ -93,26 +91,6 @@ defmodule ALF.DSL do end end - defmacro decomposer(name, options \\ [opts: []]) do - opts = options[:opts] || [] - count = options[:count] || 1 - - quote do - Decomposer.validate_options(unquote(name), unquote(options)) - Basic.build_component(Decomposer, unquote(name), unquote(count), unquote(opts), __MODULE__) - end - end - - defmacro recomposer(name, options \\ [opts: []]) do - opts = options[:opts] || [] - count = options[:count] || 1 - - quote do - Recomposer.validate_options(unquote(name), unquote(options)) - Basic.build_component(Recomposer, unquote(name), unquote(count), unquote(opts), __MODULE__) - end - end - defmacro composer(name, options \\ [opts: []]) do opts = options[:opts] || [] memo = options[:memo] diff --git a/lib/error_ip.ex b/lib/error_ip.ex index b4f3246..a52eac9 100644 --- a/lib/error_ip.ex +++ b/lib/error_ip.ex @@ -11,8 +11,6 @@ defmodule ALF.ErrorIP do stacktrace: list(), component: map(), pipeline_module: atom(), - decomposed: boolean(), - recomposed: boolean(), debug: boolean(), history: list(), plugs: map() @@ -27,8 +25,6 @@ defmodule ALF.ErrorIP do stacktrace: nil, component: nil, pipeline_module: nil, - decomposed: false, - recomposed: false, debug: false, history: [], plugs: %{} diff --git a/lib/ip.ex b/lib/ip.ex index e3d855f..679f971 100644 --- a/lib/ip.ex +++ b/lib/ip.ex @@ -11,8 +11,6 @@ defmodule ALF.IP do pipeline_module: atom(), debug: boolean(), history: list(), - decomposed: boolean(), - recomposed: boolean(), composed: boolean(), plugs: map(), sync_path: nil | list() @@ -27,8 +25,6 @@ defmodule ALF.IP do pipeline_module: nil, debug: false, history: [], - decomposed: false, - recomposed: false, composed: false, plugs: %{}, sync_path: nil diff --git a/lib/manager.ex b/lib/manager.ex index 2dc8e47..14f3b55 100644 --- a/lib/manager.ex +++ b/lib/manager.ex @@ -248,10 +248,7 @@ defmodule ALF.Manager do defp wait_result(ref, acc, {timeout, initial_ip}, count \\ 0) do receive do - {^ref, :created_recomposer} -> - wait_result(ref, acc, {timeout, initial_ip}, count + 1) - - {^ref, reason} when reason in [:created_decomposer, :cloned, :composed] -> + {^ref, reason} when reason in [:cloned, :composed] -> wait_result( ref, acc ++ wait_result(ref, [], {timeout, initial_ip}, count + 1), diff --git a/test/dsl/decomposer_test.exs b/test/dsl/decomposer_test.exs deleted file mode 100644 index 53abaff..0000000 --- a/test/dsl/decomposer_test.exs +++ /dev/null @@ -1,93 +0,0 @@ -defmodule ALF.DSL.DecomposerTest do - use ExUnit.Case, async: true - - alias ALF.Builder - - alias ALF.Components.{ - Decomposer - } - - setup do - sup_pid = Process.whereis(ALF.DynamicSupervisor) - %{sup_pid: sup_pid} - end - - describe "Decomposer with function as a name" do - defmodule PipelineDecomposer1 do - use ALF.DSL - - @components [ - decomposer(:the_decomposer, opts: [foo: :bar]) - ] - end - - test "build PipelineDecomposer1", %{sup_pid: sup_pid} do - {:ok, pipeline} = Builder.build(PipelineDecomposer1, sup_pid, false) - - [decomposer] = pipeline.components - - assert %Decomposer{ - name: :the_decomposer, - module: PipelineDecomposer1, - function: :the_decomposer, - opts: [foo: :bar], - pipeline_module: PipelineDecomposer1 - } = decomposer - end - end - - describe "Decomposer with module as a name" do - defmodule PipelineDecomposer2 do - use ALF.DSL - - defmodule DecomposerModule do - def init(opts), do: Keyword.put(opts, :baz, :qux) - - def call(event, _), do: [event] - end - - @components [ - decomposer(DecomposerModule, opts: [foo: :bar]) - ] - end - - test "build PipelineDecomposer2", %{sup_pid: sup_pid} do - {:ok, pipeline} = Builder.build(PipelineDecomposer2, sup_pid, false) - - [decomposer] = pipeline.components - decomposer = Decomposer.__state__(decomposer.pid) - - assert %Decomposer{ - name: PipelineDecomposer2.DecomposerModule, - module: PipelineDecomposer2.DecomposerModule, - function: :call, - opts: [baz: :qux, foo: :bar], - pipeline_module: PipelineDecomposer2 - } = decomposer - end - end - - describe "Decomposer with custom name" do - defmodule PipelineDecomposer3 do - use ALF.DSL - - @components [ - decomposer(:the_decomposer, opts: [foo: :bar]) - ] - end - - test "build PipelineDecomposer3", %{sup_pid: sup_pid} do - {:ok, pipeline} = Builder.build(PipelineDecomposer3, sup_pid, false) - - [decomposer] = pipeline.components - - assert %Decomposer{ - name: :the_decomposer, - module: PipelineDecomposer3, - function: :the_decomposer, - opts: [foo: :bar], - pipeline_module: PipelineDecomposer3 - } = decomposer - end - end -end diff --git a/test/dsl/dsl_test.exs b/test/dsl/dsl_test.exs index f31b886..95237e9 100644 --- a/test/dsl/dsl_test.exs +++ b/test/dsl/dsl_test.exs @@ -11,9 +11,7 @@ defmodule ALF.DSLTest do GotoPoint, Goto, Plug, - Unplug, - Decomposer, - Recomposer + Unplug } defmodule PipelineA do @@ -63,29 +61,6 @@ defmodule ALF.DSLTest do ] end - defmodule PipelineCompose do - use ALF.DSL - - @components [ - decomposer(:decomposer_function, opts: [foo: :bar]), - recomposer(:recomposer_function, opts: [foo: :bar]) - ] - - def decomposer_function(event, _) do - [event + 1, event + 2, event + 3] - end - - def recomposer_function(event, prev_events, _) do - sum = Enum.reduce(prev_events, 0, &(&1 + &2)) + event - - if sum > 5 do - sum - else - :continue - end - end - end - setup do sup_pid = Process.whereis(ALF.DynamicSupervisor) %{sup_pid: sup_pid} @@ -184,41 +159,4 @@ defmodule ALF.DSLTest do } = ALF.Components.Basic.__state__(another_unplug.pid) end end - - describe "PipelineCompose" do - test "build PipelineCompose", %{sup_pid: sup_pid} do - {:ok, pipeline} = Builder.build(PipelineCompose, sup_pid, false) - - Process.sleep(10) - [decomposer, recomposer] = pipeline.components - - assert %Decomposer{ - module: PipelineCompose, - function: :decomposer_function, - name: :decomposer_function, - opts: [foo: :bar], - pid: decomposer_pid, - pipeline_module: ALF.DSLTest.PipelineCompose, - subscribed_to: [{{producer_pid, _ref1}, _opts1}], - subscribers: [{{recomposer_pid, _ref2}, _opts2}] - } = ALF.Components.Basic.__state__(decomposer.pid) - - assert is_pid(decomposer_pid) - assert is_pid(producer_pid) - assert is_pid(recomposer_pid) - - assert %Recomposer{ - module: PipelineCompose, - function: :recomposer_function, - name: :recomposer_function, - opts: [foo: :bar], - pid: ^recomposer_pid, - pipeline_module: ALF.DSLTest.PipelineCompose, - subscribed_to: [{{^decomposer_pid, _ref1}, _opts1}], - subscribers: [{{consumer_pid, _ref2}, _opts2}] - } = ALF.Components.Basic.__state__(recomposer.pid) - - assert is_pid(consumer_pid) - end - end end diff --git a/test/dsl/recomposer_test.exs b/test/dsl/recomposer_test.exs deleted file mode 100644 index b550b37..0000000 --- a/test/dsl/recomposer_test.exs +++ /dev/null @@ -1,93 +0,0 @@ -defmodule ALF.DSL.RecomposerTest do - use ExUnit.Case, async: true - - alias ALF.Builder - - alias ALF.Components.{ - Recomposer - } - - setup do - sup_pid = Process.whereis(ALF.DynamicSupervisor) - %{sup_pid: sup_pid} - end - - describe "Recomposer with function as a name" do - defmodule PipelineRecomposer1 do - use ALF.DSL - - @components [ - recomposer(:the_recomposer, opts: [foo: :bar]) - ] - end - - test "build PipelineRecomposer1", %{sup_pid: sup_pid} do - {:ok, pipeline} = Builder.build(PipelineRecomposer1, sup_pid, false) - - [recomposer] = pipeline.components - - assert %Recomposer{ - name: :the_recomposer, - module: PipelineRecomposer1, - function: :the_recomposer, - opts: [foo: :bar], - pipeline_module: PipelineRecomposer1 - } = recomposer - end - end - - describe "Recomposer with module as a name" do - defmodule PipelineRecomposer2 do - use ALF.DSL - - defmodule RecomposerModule do - def init(opts), do: Keyword.put(opts, :baz, :qux) - - def call(event, _), do: [event] - end - - @components [ - recomposer(RecomposerModule, opts: [foo: :bar]) - ] - end - - test "build PipelineRecomposer2", %{sup_pid: sup_pid} do - {:ok, pipeline} = Builder.build(PipelineRecomposer2, sup_pid, false) - - [recomposer] = pipeline.components - recomposer = Recomposer.__state__(recomposer.pid) - - assert %Recomposer{ - name: PipelineRecomposer2.RecomposerModule, - module: PipelineRecomposer2.RecomposerModule, - function: :call, - opts: [baz: :qux, foo: :bar], - pipeline_module: PipelineRecomposer2 - } = recomposer - end - end - - describe "Recomposer with custom name" do - defmodule PipelineRecomposer3 do - use ALF.DSL - - @components [ - recomposer(:the_recomposer, opts: [foo: :bar]) - ] - end - - test "build PipelineRecomposer3", %{sup_pid: sup_pid} do - {:ok, pipeline} = Builder.build(PipelineRecomposer3, sup_pid, false) - - [recomposer] = pipeline.components - - assert %Recomposer{ - name: :the_recomposer, - module: PipelineRecomposer3, - function: :the_recomposer, - opts: [foo: :bar], - pipeline_module: PipelineRecomposer3 - } = recomposer - end - end -end diff --git a/test/examples/telegram_test.exs b/test/examples/telegram_test.exs index d755157..e3d10ee 100644 --- a/test/examples/telegram_test.exs +++ b/test/examples/telegram_test.exs @@ -1,17 +1,20 @@ -defmodule ALF.Examples.Telegram.Pipeline do +defmodule ALF.Examples.TelegramWithComposer.Pipeline do use ALF.DSL @components [ - decomposer(:split_to_words), - recomposer(:create_lines) + composer(:split_to_words), + composer(:create_lines, memo: []) ] @length_limit 50 - def split_to_words(line, _) do - line - |> String.trim() - |> String.split() + def split_to_words(line, nil, _) do + words = + line + |> String.trim() + |> String.split() + + {words, nil} end def create_lines(word, words, _) do @@ -20,21 +23,21 @@ defmodule ALF.Examples.Telegram.Pipeline do cond do String.length(string_after) == @length_limit -> - string_after + {[string_after], []} String.length(string_after) > @length_limit -> - {string_before, [word]} + {[string_before], [word]} true -> - :continue + {[], words ++ [word]} end end end -defmodule ALF.Examples.TelegramTest do +defmodule ALF.Examples.TelegramWithComposerTest do use ExUnit.Case, async: true - alias ALF.Examples.Telegram.Pipeline + alias ALF.Examples.TelegramWithComposer.Pipeline setup do file_stream = File.stream!("test/examples/telegram_input.txt") diff --git a/test/examples/telegram_with_composer_test.exs b/test/examples/telegram_with_composer_test.exs deleted file mode 100644 index e3d10ee..0000000 --- a/test/examples/telegram_with_composer_test.exs +++ /dev/null @@ -1,58 +0,0 @@ -defmodule ALF.Examples.TelegramWithComposer.Pipeline do - use ALF.DSL - - @components [ - composer(:split_to_words), - composer(:create_lines, memo: []) - ] - - @length_limit 50 - - def split_to_words(line, nil, _) do - words = - line - |> String.trim() - |> String.split() - - {words, nil} - end - - def create_lines(word, words, _) do - string_before = Enum.join(words, " ") - string_after = Enum.join(words ++ [word], " ") - - cond do - String.length(string_after) == @length_limit -> - {[string_after], []} - - String.length(string_after) > @length_limit -> - {[string_before], [word]} - - true -> - {[], words ++ [word]} - end - end -end - -defmodule ALF.Examples.TelegramWithComposerTest do - use ExUnit.Case, async: true - - alias ALF.Examples.TelegramWithComposer.Pipeline - - setup do - file_stream = File.stream!("test/examples/telegram_input.txt") - Pipeline.start() - on_exit(&Pipeline.stop/0) - %{file_stream: file_stream} - end - - test "process input", %{file_stream: file_stream} do - lines = - file_stream - |> Pipeline.stream() - |> Enum.to_list() - - assert Enum.count(lines) > 100 - assert String.length(hd(lines)) <= 50 - end -end diff --git a/test/integration/crash_pipeline_test.exs b/test/integration/crash_pipeline_test.exs index be461ac..7aeec77 100644 --- a/test/integration/crash_pipeline_test.exs +++ b/test/integration/crash_pipeline_test.exs @@ -233,69 +233,4 @@ defmodule ALF.CrashPipelineTest do assert BubbleSortWithSwitchPipeline.call([3, 1, 2]) == [1, 2, 3] end end - - describe "crashes in decomposer and recomposer" do - defmodule DeRePipeline do - use ALF.DSL - - @components [ - decomposer(:decomposer_function), - recomposer(:recomposer_function) - ] - - def decomposer_function(event, _) do - String.split(event) - end - - def recomposer_function(event, prev_events, _) do - string = Enum.join(prev_events ++ [event], " ") - - if String.length(string) > 10 do - string - else - # testing identical behaviour - if Enum.random([true, false]) do - :continue - else - {nil, prev_events ++ [event]} - end - end - end - end - - setup do - DeRePipeline.start() - Process.sleep(10) - on_exit(&DeRePipeline.stop/0) - %{components: DeRePipeline.components()} - end - - def it_works! do - [ip1, ip2] = - ["foo foo", "bar bar", "baz baz"] - |> DeRePipeline.stream(debug: true) - |> Enum.to_list() - - assert ip1.event == "foo foo bar" - assert ip2.event == "bar baz baz" - end - - test "returns strings" do - it_works!() - end - - test "kill decomposer", %{components: components} do - decomposer = Enum.find(components, &(&1.name == :decomposer_function)) - kill(decomposer.pid) - Process.sleep(50) - it_works!() - end - - test "kill recomposer", %{components: components} do - recomposer = Enum.find(components, &(&1.name == :recomposer_function)) - kill(recomposer.pid) - Process.sleep(10) - it_works!() - end - end end diff --git a/test/integration/decompose_recompose_test.exs b/test/integration/decompose_recompose_test.exs deleted file mode 100644 index 5900a65..0000000 --- a/test/integration/decompose_recompose_test.exs +++ /dev/null @@ -1,119 +0,0 @@ -defmodule ALF.DecomposeRecomposeTest do - use ExUnit.Case, async: true - - describe "decompose an recompose" do - defmodule Pipeline do - use ALF.DSL - - @components [ - decomposer(:decomposer_function), - recomposer(:recomposer_function) - ] - - def decomposer_function(event, _) do - String.split(event) - end - - def recomposer_function(event, prev_events, _) do - string = Enum.join(prev_events ++ [event], " ") - - if String.length(string) > 10 do - string - else - # testing identical behaviour - if Enum.random([true, false]) do - :continue - else - {nil, prev_events ++ [event]} - end - end - end - end - - setup do - Pipeline.start() - on_exit(&Pipeline.stop/0) - end - - test "returns strings" do - [ip1, ip2] = - ["foo foo", "bar bar", "baz baz"] - |> Pipeline.stream(debug: true) - |> Enum.to_list() - - assert ip1.event == "foo foo bar" - assert ip2.event == "bar baz baz" - - assert ip1.history == [ - {{:recomposer_function, 0}, "bar"}, - {{:decomposer_function, 0}, "bar bar"} - ] - end - - test "several streams returns strings" do - stream1 = Pipeline.stream(["111 foo", "bar bar", "baz baz"]) - stream2 = Pipeline.stream(["222 foo", "bar bar", "baz baz"]) - stream3 = Pipeline.stream(["333 foo", "bar bar", "baz baz"]) - - [result1, result2, result3] = - [stream1, stream2, stream3] - |> Enum.map(&Task.async(fn -> Enum.to_list(&1) end)) - |> Task.await_many() - - assert [ - ["111 foo bar", "bar baz baz"], - ["222 foo bar", "bar baz baz"], - ["333 foo bar", "bar baz baz"] - ] = [result1, result2, result3] - end - end - - describe "telegram" do - defmodule TelegramPipeline do - use ALF.DSL - - @components [ - decomposer(:split_to_words), - recomposer(:create_lines) - ] - - @length_limit 10 - - def split_to_words(line, _) do - line - |> String.trim() - |> String.split() - end - - def create_lines(word, words, _) do - string_before = Enum.join(words, " ") - string_after = Enum.join(words ++ [word], " ") - - cond do - String.length(string_after) == @length_limit -> - string_after - - String.length(string_after) > @length_limit -> - {string_before, [word]} - - true -> - :continue - end - end - end - - setup do - TelegramPipeline.start() - on_exit(&TelegramPipeline.stop/0) - end - - test "with call" do - assert is_nil(TelegramPipeline.call("aaa")) - assert is_nil(TelegramPipeline.call("bbb")) - assert TelegramPipeline.call("ccc") == "aaa bbb" - assert is_nil(TelegramPipeline.call("ddd")) - assert TelegramPipeline.call("12345678") == "ccc ddd" - assert TelegramPipeline.call("z") == "12345678 z" - end - end -end diff --git a/test/manager_test.exs b/test/manager_test.exs index 03a3945..db0a59a 100644 --- a/test/manager_test.exs +++ b/test/manager_test.exs @@ -413,76 +413,6 @@ defmodule ALF.ManagerTest do end end - describe "stream with decomposer" do - defmodule DecomposerPipeline do - use ALF.DSL - - @components [ - decomposer(:decomposer_function) - ] - - def decomposer_function(event, _) do - String.split(event) - end - end - - setup do - DecomposerPipeline.start() - on_exit(&DecomposerPipeline.stop/0) - end - - test "call" do - assert DecomposerPipeline.call("aaa") == "aaa" - assert DecomposerPipeline.call("aaa bbb ccc") == ["aaa", "bbb", "ccc"] - end - - test "stream" do - results = - ["aaa bbb ccc", "ddd eee", "xxx"] - |> DecomposerPipeline.stream() - |> Enum.to_list() - - assert length(results) == 6 - end - end - - describe "stream with recomposer" do - defmodule RecomposerPipeline do - use ALF.DSL - - @components [ - recomposer(:recomposer_function) - ] - - def recomposer_function(event, prev_events, _) do - string = Enum.join(prev_events ++ [event], " ") - - if String.length(string) >= 5 do - string - else - :continue - end - end - end - - setup do - RecomposerPipeline.start() - on_exit(&RecomposerPipeline.stop/0) - end - - test "call" do - assert RecomposerPipeline.call("aaaaa") == "aaaaa" - assert is_nil(RecomposerPipeline.call("aaa")) - assert RecomposerPipeline.call("bbb") == "aaa bbb" - end - - test "stream" do - ["aa", "bb", "xxxxx"] - |> RecomposerPipeline.stream() - |> Enum.to_list() - end - end - describe "timeout with call" do defmodule TimeoutPipeline do use ALF.DSL diff --git a/test/sync_run/decompose_recompose_test.exs b/test/sync_run/decompose_recompose_test.exs deleted file mode 100644 index e5f63fc..0000000 --- a/test/sync_run/decompose_recompose_test.exs +++ /dev/null @@ -1,115 +0,0 @@ -defmodule ALF.SyncRun.DecomposeRecomposeTest do - use ExUnit.Case, async: true - - describe "decompose an recompose" do - defmodule Pipeline do - use ALF.DSL - - @components [ - decomposer(:decomposer_function), - recomposer(:recomposer_function) - ] - - def decomposer_function(event, _) do - String.split(event) - end - - def recomposer_function(event, prev_events, _) do - string = Enum.join(prev_events ++ [event], " ") - - if String.length(string) > 10 do - string - else - # testing identical behaviour - if Enum.random([true, false]) do - :continue - else - {nil, prev_events ++ [event]} - end - end - end - end - - setup do - Pipeline.start(sync: true) - on_exit(&Pipeline.stop/0) - end - - test "returns strings" do - [ip1, ip2] = - ["foo foo", "bar bar", "baz baz"] - |> Pipeline.stream(debug: true) - |> Enum.to_list() - - assert ip1.event == "foo foo bar" - assert ip2.event == "bar baz baz" - - assert ip1.history == [ - {{:recomposer_function, 0}, "bar"}, - {{:decomposer_function, 0}, "bar bar"} - ] - end - - test "several streams returns strings" do - stream1 = Pipeline.stream(["foo foo", "bar bar", "baz baz"]) - stream2 = Pipeline.stream(["foo foo", "bar bar", "baz baz"]) - stream3 = Pipeline.stream(["foo foo", "bar bar", "baz baz"]) - - [result1, result2, result3] = - [stream1, stream2, stream3] - |> Enum.map(&Task.async(fn -> Enum.to_list(&1) end)) - |> Task.await_many() - - assert ^result1 = ^result2 = ^result3 = ["foo foo bar", "bar baz baz"] - end - end - - describe "telegram" do - defmodule TelegramPipeline do - use ALF.DSL - - @components [ - decomposer(:split_to_words), - recomposer(:create_lines) - ] - - @length_limit 10 - - def split_to_words(line, _) do - line - |> String.trim() - |> String.split() - end - - def create_lines(word, words, _) do - string_before = Enum.join(words, " ") - string_after = Enum.join(words ++ [word], " ") - - cond do - String.length(string_after) == @length_limit -> - string_after - - String.length(string_after) > @length_limit -> - {string_before, [word]} - - true -> - :continue - end - end - end - - setup do - TelegramPipeline.start(sync: true) - on_exit(&TelegramPipeline.stop/0) - end - - test "with call" do - assert is_nil(TelegramPipeline.call("aaa")) - assert is_nil(TelegramPipeline.call("bbb")) - assert TelegramPipeline.call("ccc") == "aaa bbb" - assert is_nil(TelegramPipeline.call("ddd")) - assert TelegramPipeline.call("12345678") == "ccc ddd" - assert TelegramPipeline.call("z") == "12345678 z" - end - end -end diff --git a/test/telemetry/composers_telemery_test.exs b/test/telemetry/composers_telemery_test.exs deleted file mode 100644 index 00573e9..0000000 --- a/test/telemetry/composers_telemery_test.exs +++ /dev/null @@ -1,178 +0,0 @@ -defmodule ALF.Telemetry.ComposersTelemetryTest do - use ExUnit.Case - - defmodule Pipeline do - use ALF.DSL - - @components [ - decomposer(:the_decomposer), - recomposer(:the_recomposer) - ] - - def the_decomposer(event, _) do - {[event + 1], event} - end - - def the_recomposer(event, prev_events, _) do - sum = Enum.reduce(prev_events, 0, &(&1 + &2)) + event - - case sum >= 5 do - true -> {sum, [hd(prev_events)]} - false -> :continue - end - end - end - - setup do - {:ok, agent} = Agent.start_link(fn -> [] end) - %{agent: agent} - end - - describe "telemetry events" do - defmodule Handler do - def handle_event([:alf, :component, type], measurements, metadata, %{agent: agent}) do - Agent.update(agent, fn list -> [{type, measurements, metadata} | list] end) - end - end - - setup %{agent: agent} do - Pipeline.start(telemetry: true) - - :ok = - :telemetry.attach_many( - "test-events-handler", - [ - [:alf, :component, :start], - [:alf, :component, :stop], - [:alf, :component, :exception] - ], - &Handler.handle_event/4, - %{agent: agent} - ) - - on_exit(fn -> - :telemetry.detach("test-events-handler") - Pipeline.stop() - end) - end - - test "check recomposer events", %{agent: agent} do - [result] = - [2] - |> Pipeline.stream() - |> Enum.to_list() - - assert result == 5 - Process.sleep(5) - - [ - _consumer_stop, - _consumer_start, - recomposer_stop2, - recomposer_start2, - recomposer_stop1, - recomposer_start1, - decomposer_stop, - decomposer_start, - _producer_stop, - _producer_start - ] = Agent.get(agent, & &1) - - check_recomposer_events1(recomposer_stop1, recomposer_start1) - check_recomposer_events2(recomposer_stop2, recomposer_start2) - check_decomposer_events(decomposer_stop, decomposer_start) - end - - def check_recomposer_events1(recomposer_stop, recomposer_start) do - assert { - :stop, - %{duration: _duration}, - %{ - component: %{ - name: :the_recomposer, - pipeline_module: __MODULE__.Pipeline - }, - ip: nil, - telemetry_span_context: _ref - } - } = recomposer_stop - - assert { - :start, - %{system_time: _system_time}, - %{ - component: %{ - name: :the_recomposer, - pipeline_module: __MODULE__.Pipeline - }, - ip: %{ - event: 3 - }, - telemetry_span_context: _ref - } - } = recomposer_start - end - - def check_recomposer_events2(recomposer_stop, recomposer_start) do - assert { - :stop, - %{duration: _duration}, - %{ - component: %{ - name: :the_recomposer, - pipeline_module: __MODULE__.Pipeline - }, - ip: %{ - event: 5 - }, - telemetry_span_context: _ref - } - } = recomposer_stop - - assert { - :start, - %{system_time: _system_time}, - %{ - component: %{ - name: :the_recomposer, - pipeline_module: __MODULE__.Pipeline - }, - ip: %{ - event: 2 - }, - telemetry_span_context: _ref - } - } = recomposer_start - end - - def check_decomposer_events(decomposer_stop, decomposer_start) do - assert { - :stop, - %{duration: _duration}, - %{ - component: %{ - name: :the_decomposer, - pipeline_module: __MODULE__.Pipeline - }, - ips: [%{event: 3}, %{event: 2}], - telemetry_span_context: _ref - } - } = decomposer_stop - - assert { - :start, - %{system_time: _system_time}, - %{ - component: %{ - name: :the_decomposer, - pipeline_module: __MODULE__.Pipeline - }, - ip: %{ - event: 2 - }, - telemetry_span_context: _ref - } - } = decomposer_start - end - end -end