diff --git a/lib/dsl.ex b/lib/dsl.ex index 89e4dfc..d80d13d 100644 --- a/lib/dsl.ex +++ b/lib/dsl.ex @@ -16,7 +16,7 @@ defmodule Strom.DSL do end defmodule Transform do - defstruct function: nil, acc: nil, inputs: [], call: nil + defstruct function: nil, acc: nil, opts: nil, inputs: [], call: nil end defmodule Rename do @@ -67,6 +67,17 @@ defmodule Strom.DSL do end end + defmacro transform(inputs, function, acc, opts) do + quote do + %Strom.DSL.Transform{ + function: unquote(function), + acc: unquote(acc), + opts: unquote(opts), + inputs: unquote(inputs) + } + end + end + defmacro transform(inputs, function, acc) do quote do %Strom.DSL.Transform{ diff --git a/lib/flow.ex b/lib/flow.ex index 6348f4e..ef91ad2 100644 --- a/lib/flow.ex +++ b/lib/flow.ex @@ -46,9 +46,12 @@ defmodule Strom.Flow do %DSL.Splitter{opts: opts} = splitter -> %{splitter | splitter: Strom.Splitter.start(opts)} - %DSL.Transform{} = fun -> + %DSL.Transform{opts: nil} = fun -> %{fun | call: Strom.Transformer.start()} + %DSL.Transform{opts: opts} = fun when is_list(opts) -> + %{fun | call: Strom.Transformer.start(opts)} + %DSL.Rename{names: names} = ren -> rename = Strom.Renamer.start(names) %{ren | rename: rename} @@ -87,10 +90,10 @@ defmodule Strom.Flow do Strom.Splitter.call(flow, splitter, input, partitions) %DSL.Transform{call: call, function: function, acc: acc, inputs: inputs} -> - if is_function(function, 2) do - Strom.Transformer.call(flow, call, inputs, {function, acc}) - else + if is_function(function, 1) do Strom.Transformer.call(flow, call, inputs, function) + else + Strom.Transformer.call(flow, call, inputs, {function, acc}) end %DSL.Rename{rename: rename, names: names} -> @@ -126,13 +129,17 @@ defmodule Strom.Flow do end @impl true - def handle_info({_task_ref, :ok}, mixer) do + def handle_info(:continue, flow) do + {:noreply, flow} + end + + def handle_info({_task_ref, :ok}, flow) do # do nothing for now - {:noreply, mixer} + {:noreply, flow} end - def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, mixer) do + def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, flow) do # do nothing for now - {:noreply, mixer} + {:noreply, flow} end end diff --git a/mix.exs b/mix.exs index 2c138ad..5ed1e43 100644 --- a/mix.exs +++ b/mix.exs @@ -16,7 +16,7 @@ defmodule Strom.MixProject do def application do [ - extra_applications: [:logger, :observer, :runtime_tools, :wx], + extra_applications: [:logger], mod: {Strom.Application, []} ] end diff --git a/test/data/even.txt b/test/data/even.txt index e69de29..e2ba1ef 100644 --- a/test/data/even.txt +++ b/test/data/even.txt @@ -0,0 +1,3 @@ +2 +4 +6 diff --git a/test/data/odd.txt b/test/data/odd.txt index e69de29..443b0bc 100644 --- a/test/data/odd.txt +++ b/test/data/odd.txt @@ -0,0 +1,7 @@ +11 +21 +31 +41 +51 +3 +5 diff --git a/test/data/output.csv b/test/data/output.csv index e69de29..f65c9b7 100644 --- a/test/data/output.csv +++ b/test/data/output.csv @@ -0,0 +1,107 @@ +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 +ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 +ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 diff --git a/test/dsl_test.exs b/test/dsl_test.exs index cdc465a..6753b41 100644 --- a/test/dsl_test.exs +++ b/test/dsl_test.exs @@ -4,56 +4,35 @@ defmodule Strom.DSLTest do alias Strom.Source.ReadLines alias Strom.Sink.WriteLines - defmodule Pipeline do - use ALF.DSL - - @components [ - stage(:to_integer), - stage(:add_one) - ] - - def to_integer(event, _), do: String.to_integer(event) - def add_one(event, _), do: event + 1 - end - - defmodule ToString do - use ALF.DSL - - @components [ - stage(:to_string) - ] - - def to_string(event, _), do: "#{event}" - end - defmodule MyFlow do use Strom.DSL def odd_fun(event), do: rem(event, 2) == 1 + def even_fun(event), do: rem(event, 2) == 0 def to_string(el), do: "#{el}" - def topology(opts) do - source1 = %ReadLines{path: "test/data/numbers1.txt"} - source2 = %ReadLines{path: "test/data/numbers2.txt"} - sink_odd = %WriteLines{path: "test/data/odd.txt"} - sink_even = %WriteLines{path: "test/data/even.txt"} + def to_integer(event), do: String.to_integer(event) + def add_one(event), do: event + 1 + + def topology(opts) do partitions = %{ odd: &__MODULE__.odd_fun/1, even: &__MODULE__.even_fun/1 } [ - source(:numbers1, source1), - source(:numbers2, source2), + source(:numbers1, %ReadLines{path: "test/data/numbers1.txt"}), + source(:numbers2, %ReadLines{path: "test/data/numbers2.txt"}), mixer([:numbers1, :numbers2], :mixed), - module(:mixed, Pipeline, sync: true), + transform(:mixed, &__MODULE__.to_integer/1), + transform(:mixed, &__MODULE__.add_one/1), splitter(:mixed, partitions), - function([:odd, :even], opts[:to_string_fun]), - sink(:odd, sink_odd, true), - sink(:even, sink_even, true) + transform([:odd, :even], opts[:to_string_fun]), + sink(:odd, %WriteLines{path: "test/data/odd.txt"}, true), + sink(:even, %WriteLines{path: "test/data/even.txt"}, true) ] end end @@ -83,6 +62,29 @@ defmodule Strom.DSLTest do MyFlow.stop() end + describe "transform with options" do + defmodule FlowTransform do + use Strom.DSL + + def fun(event, acc, opts) do + {[event + acc + opts[:add]], acc + opts[:add]} + end + + def topology(_opts) do + [ + source(:s1, [1, 2, 3]), + transform(:s1, &__MODULE__.fun/3, 1000, opts: %{add: 1}) + ] + end + end + + test "transform with options" do + FlowTransform.start() + %{s1: stream} = FlowTransform.call(%{}) + assert Enum.to_list(stream) == [1002, 1004, 1006] + end + end + describe "combining several flows" do defmodule Flow1 do use Strom.DSL @@ -105,8 +107,8 @@ defmodule Strom.DSLTest do def topology(_) do [ - function(:stream1, &__MODULE__.add_one/1), - function(:stream2, &__MODULE__.add_one/1) + transform(:stream1, &__MODULE__.add_one/1), + transform(:stream2, &__MODULE__.add_one/1) ] end end diff --git a/test/examples/parcels/old_test.exs b/test/examples/parcels/old_test.exs deleted file mode 100644 index af9040d..0000000 --- a/test/examples/parcels/old_test.exs +++ /dev/null @@ -1,209 +0,0 @@ -defmodule Strom.Examples.Parcels.BuildPipeline do - use ALF.DSL - - @components [ - stage(:build_event) - ] - - def build_event(event, _) do - list = String.split(event, ",") - type = Enum.at(list, 0) - {:ok, occurred_at, _} = DateTime.from_iso8601(Enum.at(list, 1)) - order_number = String.to_integer(Enum.at(list, 2)) - - case type do - "ORDER_CREATED" -> - %{ - type: type, - occurred_at: occurred_at, - order_number: order_number, - to_ship: String.to_integer(Enum.at(list, 3)) - } - - "PARCEL_SHIPPED" -> - %{type: type, occurred_at: occurred_at, order_number: order_number} - end - end -end - -defmodule Strom.Examples.Parcels.OrderingPipeline do - use ALF.DSL - - @components [ - composer(:check_order, memo: MapSet.new()), - composer(:wait, memo: %{}) - ] - - def check_order(event, order_numbers, _) do - order_number = event[:order_number] - - case event[:type] do - "ORDER_CREATED" -> - {[event], MapSet.put(order_numbers, order_number)} - - "PARCEL_SHIPPED" -> - if MapSet.member?(order_numbers, order_number) do - {[event], order_numbers} - else - {[Map.put(event, :wait, order_number)], order_numbers} - end - end - end - - def wait(event, waiting, _) do - case event[:type] do - "ORDER_CREATED" -> - order_number = event[:order_number] - {[event | Map.get(waiting, order_number, [])], Map.delete(waiting, order_number)} - - "PARCEL_SHIPPED" -> - if event[:wait] do - other_waiting = Map.get(waiting, event[:wait], []) - {[], Map.put(waiting, event[:wait], [event | other_waiting])} - else - {[event], waiting} - end - end - end -end - -defmodule Strom.Examples.Parcels.Pipeline do - use ALF.DSL - - @components [ - composer(:check_expired, memo: []), - composer(:check_count, memo: %{}) - ] - - @seconds_in_week 3600 * 24 * 7 - - def check_expired(event, memo, _) do - order_number = event[:order_number] - - case event[:type] do - "ORDER_CREATED" -> - memo = [{order_number, event[:occurred_at]} | memo] - {[event], memo} - - "PARCEL_SHIPPED" -> - {expired, still_valid} = - Enum.split_while(Enum.reverse(memo), fn {_, order_time} -> - DateTime.diff(event[:occurred_at], order_time, :second) > @seconds_in_week - end) - - expired_events = - Enum.map(expired, fn {order_number, time} -> - %{type: "THRESHOLD_EXCEEDED", order_number: order_number, occurred_at: time} - end) - - {expired_events ++ [event], still_valid} - end - end - - def check_count(event, memo, _) do - order_number = event[:order_number] - - case event[:type] do - "ORDER_CREATED" -> - # putting order time here, it's always less than parcels time - memo = Map.put(memo, order_number, {event[:to_ship], event[:occurred_at]}) - {[], memo} - - "PARCEL_SHIPPED" -> - case Map.get(memo, order_number) do - # was deleted in THRESHOLD_EXCEEDED - nil -> - {[], memo} - - {1, last_occurred_at} -> - last_occurred_at = latest_occurred_at(event[:occurred_at], last_occurred_at) - - ok_event = %{ - type: "ALL_PARCELS_SHIPPED", - order_number: order_number, - occurred_at: last_occurred_at - } - - memo = Map.put(memo, order_number, :all_parcels_shipped) - {[ok_event], memo} - - {amount, last_occurred_at} when amount > 1 -> - last_occurred_at = latest_occurred_at(event[:occurred_at], last_occurred_at) - memo = Map.put(memo, order_number, {amount - 1, last_occurred_at}) - {[], memo} - end - - "THRESHOLD_EXCEEDED" -> - case Map.get(memo, order_number) do - :all_parcels_shipped -> - {[], Map.delete(memo, order_number)} - - _count -> - {[event], Map.delete(memo, order_number)} - end - end - end - - def latest_occurred_at(occurred_at, last_occurred_at) do - case DateTime.compare(occurred_at, last_occurred_at) do - :gt -> - occurred_at - - _ -> - last_occurred_at - end - end -end - -defmodule Strom.Examples.Parcels.OldParcelsTest do - use ExUnit.Case, async: true - - alias Strom.Examples.Parcels.BuildPipeline - alias Strom.Examples.Parcels.OrderingPipeline - alias Strom.Examples.Parcels.Pipeline - alias Strom.Source.ReadLines - - def expected_results do - [ - %{ - order_number: 111, - type: "ALL_PARCELS_SHIPPED", - occurred_at: ~U[2017-04-21T08:00:00.000Z] - }, - %{ - order_number: 222, - type: "THRESHOLD_EXCEEDED", - occurred_at: ~U[2017-04-20 09:00:00.000Z] - }, - %{ - order_number: 333, - type: "THRESHOLD_EXCEEDED", - occurred_at: ~U[2017-04-21 09:00:00.000Z] - } - ] - end - - describe "with several pipelines" do - defmodule SeveralPipelinesFlow do - use Strom.DSL - - def topology(_opts) do - [ - source(:parcels, %ReadLines{path: "test/examples/parcels/parcels.csv"}), - source(:orders, %ReadLines{path: "test/examples/parcels/orders.csv"}), - mixer([:orders, :parcels], :mixed), - module(:mixed, BuildPipeline), - module(:mixed, OrderingPipeline), - module(:mixed, Pipeline) - ] - end - end - - test "with several pipelines" do - SeveralPipelinesFlow.start() - %{mixed: mixed} = SeveralPipelinesFlow.call(%{}) - - assert Enum.sort(Enum.to_list(mixed)) == Enum.sort(expected_results()) - end - end -end diff --git a/test/examples/parcels_data_test.exs b/test/examples/parcels/parcels_data_test.exs similarity index 72% rename from test/examples/parcels_data_test.exs rename to test/examples/parcels/parcels_data_test.exs index 025a6e6..ea77fa1 100644 --- a/test/examples/parcels_data_test.exs +++ b/test/examples/parcels/parcels_data_test.exs @@ -5,25 +5,11 @@ defmodule Strom.Examples.ParcelsDataTest do use Strom.DSL defmodule BuildEvent do - def start(_opts) do - %{ - occurred_at: DateTime.add(DateTime.now!("Etc/UTC"), -(3600 * 24 * 30), :second), - order_number: 0 - } - end - - def stop(_opts, _acc), do: :ok - - def call(:tick, last_order, _opts) do + def call(:tick, last_order) do occurred_at = DateTime.add(last_order[:occurred_at], :rand.uniform(10), :second) to_ship = :rand.uniform(5) order_number = last_order[:order_number] + 1 - if order_number > 10_010 do - Process.sleep(5000) - raise("done") - end - order = %{ type: "ORDER_CREATED", occurred_at: occurred_at, @@ -62,19 +48,25 @@ defmodule Strom.Examples.ParcelsDataTest do parcels: &(&1[:type] == "PARCEL_SHIPPED") } + acc = %{ + occurred_at: DateTime.add(DateTime.now!("Etc/UTC"), -(3600 * 24 * 30), :second), + order_number: 0 + } + [ - module(:stream, BuildEvent), + transform(:stream, &BuildEvent.call/2, acc), splitter(:stream, partitions), - function(:orders, &__MODULE__.order_to_string/1), - function(:parcels, &__MODULE__.parcel_to_string/1), + transform(:orders, &__MODULE__.order_to_string/1), + transform(:parcels, &__MODULE__.parcel_to_string/1), sink(:orders, %Strom.Sink.WriteLines{path: "test_data/orders.csv"}), sink(:parcels, %Strom.Sink.WriteLines{path: "test_data/parcels.csv"}, true) ] end end - # test "test" do - # GenData.start() - # GenData.call(%{stream: Stream.cycle([:tick])}) - # end + test "test" do + GenData.start() + # GenData.call(%{stream: List.duplicate(:tick, 10_000)}) + GenData.stop() + end end diff --git a/test/examples/parcels_test.exs b/test/examples/parcels/parcels_test.exs similarity index 99% rename from test/examples/parcels_test.exs rename to test/examples/parcels/parcels_test.exs index 9965dba..d62cbe1 100644 --- a/test/examples/parcels_test.exs +++ b/test/examples/parcels/parcels_test.exs @@ -3,7 +3,6 @@ defmodule Strom.Examples.ParcelsTest do defmodule ParcelsFlow do alias Strom.Source.ReadLines - alias Strom.Sink.WriteLines use Strom.DSL diff --git a/test/examples/simple_numbers_test.exs b/test/examples/simple_numbers_test.exs index 33210d3..48395ae 100644 --- a/test/examples/simple_numbers_test.exs +++ b/test/examples/simple_numbers_test.exs @@ -1,7 +1,7 @@ defmodule Strom.Examples.SimpleNumbersTest do use ExUnit.Case - alias Strom.{Mixer, Splitter, Function} + alias Strom.{Mixer, Splitter, Transformer} test "simple numbers" do flow = %{numbers1: [1, 2, 3, 4, 5], numbers2: [6, 7, 8, 9, 10]} @@ -14,12 +14,12 @@ defmodule Strom.Examples.SimpleNumbersTest do even: fn el -> rem(el, 2) == 0 end } - function = Function.start(&(&1 + 1)) + transformer = Transformer.start() %{odd: odd, even: even} = flow |> Mixer.call(mixer, [:numbers1, :numbers2], :number) - |> Function.call(function, :number) + |> Transformer.call(transformer, :number, &(&1 + 1)) |> Splitter.call(splitter, :number, partitions) assert Enum.sort(Enum.to_list(odd)) == [3, 5, 7, 9, 11] @@ -30,35 +30,29 @@ defmodule Strom.Examples.SimpleNumbersTest do defmodule RoundRobin do use Strom.DSL - def add_label(event, label), do: {event, label} - - defmodule DoMix do - def start(names) do - Enum.reduce(names, %{}, &Map.put(&2, &1, [])) - end + def add_label(event, label) do + {[{event, label}], label} + end - def call({number, label}, acc, names) do - [another] = Enum.reject(names, &(&1 == label)) + def call({number, label}, acc) do + [another] = Enum.reject(Map.keys(acc), &(&1 == label)) - case Map.fetch!(acc, another) do - [hd | tl] -> - {[hd, number], Map.put(acc, another, tl)} + case Map.fetch!(acc, another) do + [hd | tl] -> + {[hd, number], Map.put(acc, another, tl)} - [] -> - numbers = Map.fetch!(acc, label) - {[], Map.put(acc, label, numbers ++ [number])} - end + [] -> + numbers = Map.fetch!(acc, label) + {[], Map.put(acc, label, numbers ++ [number])} end - - def stop(_acc, _opts), do: :ok end def topology(_opts) do [ - function(:first, &__MODULE__.add_label/2, :first), - function(:second, &__MODULE__.add_label/2, :second), + transform(:first, &__MODULE__.add_label/2, :first), + transform(:second, &__MODULE__.add_label/2, :second), mixer([:first, :second], :mixed), - module(:mixed, DoMix, [:first, :second]) + transform(:mixed, &__MODULE__.call/2, %{first: [], second: []}) ] end end @@ -84,37 +78,31 @@ defmodule Strom.Examples.SimpleNumbersTest do defmodule RoundRobinMany do use Strom.DSL - def add_label(event, label), do: {event, label} - - defmodule DoMix do - def start(names) do - Enum.reduce(names, %{}, &Map.put(&2, &1, [])) - end - - def call({number, label}, acc, names) do - others = Enum.reject(names, &(&1 == label)) + def add_label(event, label) do + {[{event, label}], label} + end - if Enum.all?(others, &(length(Map.fetch!(acc, &1)) > 0)) do - Enum.reduce(others, {[number], acc}, fn other, {nums, acc} -> - [hd | tl] = Map.fetch!(acc, other) - {[hd | nums], Map.put(acc, other, tl)} - end) - else - numbers = Map.fetch!(acc, label) - {[], Map.put(acc, label, numbers ++ [number])} - end + def call({number, label}, acc) do + others = Enum.reject(Map.keys(acc), &(&1 == label)) + + if Enum.all?(others, &(length(Map.fetch!(acc, &1)) > 0)) do + Enum.reduce(others, {[number], acc}, fn other, {nums, acc} -> + [hd | tl] = Map.fetch!(acc, other) + {[hd | nums], Map.put(acc, other, tl)} + end) + else + numbers = Map.fetch!(acc, label) + {[], Map.put(acc, label, numbers ++ [number])} end - - def stop(_acc, _opts), do: :ok end def topology(names) do Enum.map(names, fn name -> - function(name, &__MODULE__.add_label/2, name) + transform(name, &__MODULE__.add_label/2, name) end) ++ [ mixer(names, :mixed), - module(:mixed, DoMix, names) + transform(:mixed, &__MODULE__.call/2, Enum.reduce(names, %{}, &Map.put(&2, &1, []))) ] end end diff --git a/test/examples/telegram_test.exs b/test/examples/telegram_test.exs index d5dea5a..f9241dc 100644 --- a/test/examples/telegram_test.exs +++ b/test/examples/telegram_test.exs @@ -8,21 +8,15 @@ defmodule Strom.Integration.TelegramTest do alias Strom.Sink.WriteLines defmodule Decompose do - def start([]), do: nil - - def call(event, nil, []) do + def call(event, nil) do {String.split(event, ","), nil} end - - def stop(nil, []), do: :ok end defmodule Recompose do @length 100 - def start([]), do: [] - - def call(event, words, []) do + def call(event, words) do line = Enum.join(words, " ") new_line = line <> " " <> event @@ -32,15 +26,13 @@ defmodule Strom.Integration.TelegramTest do {[], words ++ [event]} end end - - def stop(_acc, []), do: :ok end def topology(_opts) do [ source(:input, %ReadLines{path: "test/data/orders.csv"}), - module(:input, Decompose), - module(:input, Recompose), + transform(:input, &Decompose.call/2, nil), + transform(:input, &Recompose.call/2, []), sink(:input, %WriteLines{path: "test/data/telegram.txt"}, true) ] end diff --git a/test/examples/words_count_test.exs b/test/examples/words_count_test.exs index bac4cec..24d7ebf 100644 --- a/test/examples/words_count_test.exs +++ b/test/examples/words_count_test.exs @@ -7,11 +7,9 @@ defmodule Strom.Examples.WordsCountTest do alias Strom.Source.ReadLines defmodule DoCount do - def start(_opts), do: %{} + def call(:done, acc), do: {[acc], %{}} - def call(:done, acc, _), do: {[acc], %{}} - - def call(string, acc, _) do + def call(string, acc) do acc = string |> String.downcase() @@ -23,16 +21,12 @@ defmodule Strom.Examples.WordsCountTest do {[], acc} end - - def stop(_acc, _opts), do: :ok end defmodule SumAll do - def start(_opts), do: %{} + def call(:done, acc), do: {[acc], %{}} - def call(:done, acc, _), do: {[acc], %{}} - - def call(sums, acc, _) do + def call(sums, acc) do acc = sums |> Enum.reduce(acc, fn {word, count}, acc -> @@ -42,8 +36,6 @@ defmodule Strom.Examples.WordsCountTest do {[], acc} end - - def stop(_acc, _opts), do: :ok end def topology({file_name, count}) do @@ -59,10 +51,10 @@ defmodule Strom.Examples.WordsCountTest do ] ++ dones ++ [ - module(all_names, DoCount), + transform(all_names, &DoCount.call/2, %{}), mixer(all_names, :mixed), source(:mixed, [:done]), - module(:mixed, SumAll) + transform(:mixed, &SumAll.call/2, %{}) ] end end diff --git a/test/gen_mix_test.exs b/test/gen_mix_test.exs index 4a85626..20c13f2 100644 --- a/test/gen_mix_test.exs +++ b/test/gen_mix_test.exs @@ -71,43 +71,43 @@ defmodule Strom.GenMixTest do Task.await(task2, :infinity) end - test "huge files" do - :observer.start() - source1 = Strom.Source.start(%Strom.Source.ReadLines{path: "test_data/orders.csv"}) - source2 = Strom.Source.start(%Strom.Source.ReadLines{path: "test_data/parcels.csv"}) - - sink1 = Strom.Sink.start(%Strom.Sink.WriteLines{path: "test_data/odd.csv"}) - sink2 = Strom.Sink.start(%Strom.Sink.WriteLines{path: "test_data/even.csv"}) - - flow = - %{} - |> Strom.Source.call(source1, :source1) - |> Strom.Source.call(source2, :source2) - - mix1 = GenMix.start() - mix2 = GenMix.start() - call1 = Strom.GenCall.start() - call2 = Strom.GenCall.start() - - inputs = %{ - source1: fn el -> el end, - source2: fn el -> el end - } - - outputs = %{ - odd: fn el -> rem(el, 2) == 1 end, - even: fn el -> rem(el, 2) == 0 end - } - - function1 = fn el -> String.length(el) end - function2 = fn el -> "#{el}" end - - flow - |> GenMix.call(mix1, inputs, inputs) - |> Strom.GenCall.call(call1, [:source1, :source2], function1) - |> GenMix.call(mix2, inputs, outputs) - |> Strom.GenCall.call(call2, [:odd, :even], function2) - |> Strom.Sink.call(sink1, [:odd]) - |> Strom.Sink.call(sink2, [:even], true) - end +# test "huge files" do +# :observer.start() +# source1 = Strom.Source.start(%Strom.Source.ReadLines{path: "test_data/orders.csv"}) +# source2 = Strom.Source.start(%Strom.Source.ReadLines{path: "test_data/parcels.csv"}) +# +# sink1 = Strom.Sink.start(%Strom.Sink.WriteLines{path: "test_data/odd.csv"}) +# sink2 = Strom.Sink.start(%Strom.Sink.WriteLines{path: "test_data/even.csv"}) +# +# flow = +# %{} +# |> Strom.Source.call(source1, :source1) +# |> Strom.Source.call(source2, :source2) +# +# mix1 = GenMix.start() +# mix2 = GenMix.start() +# call1 = Strom.Transformer.start() +# call2 = Strom.Transformer.start() +# +# inputs = %{ +# source1: fn el -> el end, +# source2: fn el -> el end +# } +# +# outputs = %{ +# odd: fn el -> rem(el, 2) == 1 end, +# even: fn el -> rem(el, 2) == 0 end +# } +# +# function1 = fn el -> String.length(el) end +# function2 = fn el -> "#{el}" end +# +# flow +# |> GenMix.call(mix1, inputs, inputs) +# |> Strom.Transformer.call(call1, [:source1, :source2], function1) +# |> GenMix.call(mix2, inputs, outputs) +# |> Strom.Transformer.call(call2, [:odd, :even], function2) +# |> Strom.Sink.call(sink1, [:odd]) +# |> Strom.Sink.call(sink2, [:even], true) +# end end diff --git a/test/integration/split_and_mix_test.exs b/test/integration/split_and_mix_test.exs index 77afa28..ddf315c 100644 --- a/test/integration/split_and_mix_test.exs +++ b/test/integration/split_and_mix_test.exs @@ -1,7 +1,7 @@ defmodule Strom.Integration.SplitAndMixTest do use ExUnit.Case - alias Strom.{Source, Mixer, Splitter, Function} + alias Strom.{Source, Mixer, Splitter, Transformer} alias Strom.Source.ReadLines setup do @@ -13,7 +13,7 @@ defmodule Strom.Integration.SplitAndMixTest do splitter = Splitter.start() mixer = Mixer.start() - function = Function.start(&"foo-#{&1}") + transformer = Transformer.start() partitions = %{ "111" => fn el -> String.contains?(el, ",111,") end, @@ -25,7 +25,7 @@ defmodule Strom.Integration.SplitAndMixTest do %{} |> Source.call(orders_source, :orders) |> Splitter.call(splitter, :orders, partitions) - |> Function.call(function, ["111", "222", "333"]) + |> Transformer.call(transformer, ["111", "222", "333"], &"foo-#{&1}") |> Mixer.call(mixer, ["111", "222", "333"], :modified) modified = Enum.to_list(stream)