Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Jan 5, 2024
1 parent 5747e17 commit ede8090
Show file tree
Hide file tree
Showing 15 changed files with 281 additions and 390 deletions.
13 changes: 12 additions & 1 deletion lib/dsl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Strom.DSL do
end

defmodule Transform do
defstruct function: nil, acc: nil, inputs: [], call: nil
defstruct function: nil, acc: nil, opts: nil, inputs: [], call: nil
end

defmodule Rename do
Expand Down Expand Up @@ -67,6 +67,17 @@ defmodule Strom.DSL do
end
end

defmacro transform(inputs, function, acc, opts) do
quote do
%Strom.DSL.Transform{
function: unquote(function),
acc: unquote(acc),
opts: unquote(opts),
inputs: unquote(inputs)
}
end
end

defmacro transform(inputs, function, acc) do
quote do
%Strom.DSL.Transform{
Expand Down
23 changes: 15 additions & 8 deletions lib/flow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ defmodule Strom.Flow do
%DSL.Splitter{opts: opts} = splitter ->
%{splitter | splitter: Strom.Splitter.start(opts)}

%DSL.Transform{} = fun ->
%DSL.Transform{opts: nil} = fun ->
%{fun | call: Strom.Transformer.start()}

%DSL.Transform{opts: opts} = fun when is_list(opts) ->
%{fun | call: Strom.Transformer.start(opts)}

%DSL.Rename{names: names} = ren ->
rename = Strom.Renamer.start(names)
%{ren | rename: rename}
Expand Down Expand Up @@ -87,10 +90,10 @@ defmodule Strom.Flow do
Strom.Splitter.call(flow, splitter, input, partitions)

%DSL.Transform{call: call, function: function, acc: acc, inputs: inputs} ->
if is_function(function, 2) do
Strom.Transformer.call(flow, call, inputs, {function, acc})
else
if is_function(function, 1) do
Strom.Transformer.call(flow, call, inputs, function)
else
Strom.Transformer.call(flow, call, inputs, {function, acc})
end

%DSL.Rename{rename: rename, names: names} ->
Expand Down Expand Up @@ -126,13 +129,17 @@ defmodule Strom.Flow do
end

@impl true
def handle_info({_task_ref, :ok}, mixer) do
def handle_info(:continue, flow) do
{:noreply, flow}
end

def handle_info({_task_ref, :ok}, flow) do
# do nothing for now
{:noreply, mixer}
{:noreply, flow}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, mixer) do
def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, flow) do
# do nothing for now
{:noreply, mixer}
{:noreply, flow}
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Strom.MixProject do

def application do
[
extra_applications: [:logger, :observer, :runtime_tools, :wx],
extra_applications: [:logger],
mod: {Strom.Application, []}
]
end
Expand Down
3 changes: 3 additions & 0 deletions test/data/even.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
2
4
6
7 changes: 7 additions & 0 deletions test/data/odd.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
11
21
31
41
51
3
5
107 changes: 107 additions & 0 deletions test/data/output.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
72 changes: 37 additions & 35 deletions test/dsl_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,56 +4,35 @@ defmodule Strom.DSLTest do
alias Strom.Source.ReadLines
alias Strom.Sink.WriteLines

defmodule Pipeline do
use ALF.DSL

@components [
stage(:to_integer),
stage(:add_one)
]

def to_integer(event, _), do: String.to_integer(event)
def add_one(event, _), do: event + 1
end

defmodule ToString do
use ALF.DSL

@components [
stage(:to_string)
]

def to_string(event, _), do: "#{event}"
end

defmodule MyFlow do
use Strom.DSL

def odd_fun(event), do: rem(event, 2) == 1

def even_fun(event), do: rem(event, 2) == 0

def to_string(el), do: "#{el}"

def topology(opts) do
source1 = %ReadLines{path: "test/data/numbers1.txt"}
source2 = %ReadLines{path: "test/data/numbers2.txt"}
sink_odd = %WriteLines{path: "test/data/odd.txt"}
sink_even = %WriteLines{path: "test/data/even.txt"}
def to_integer(event), do: String.to_integer(event)

def add_one(event), do: event + 1

def topology(opts) do
partitions = %{
odd: &__MODULE__.odd_fun/1,
even: &__MODULE__.even_fun/1
}

[
source(:numbers1, source1),
source(:numbers2, source2),
source(:numbers1, %ReadLines{path: "test/data/numbers1.txt"}),
source(:numbers2, %ReadLines{path: "test/data/numbers2.txt"}),
mixer([:numbers1, :numbers2], :mixed),
module(:mixed, Pipeline, sync: true),
transform(:mixed, &__MODULE__.to_integer/1),
transform(:mixed, &__MODULE__.add_one/1),
splitter(:mixed, partitions),
function([:odd, :even], opts[:to_string_fun]),
sink(:odd, sink_odd, true),
sink(:even, sink_even, true)
transform([:odd, :even], opts[:to_string_fun]),
sink(:odd, %WriteLines{path: "test/data/odd.txt"}, true),
sink(:even, %WriteLines{path: "test/data/even.txt"}, true)
]
end
end
Expand Down Expand Up @@ -83,6 +62,29 @@ defmodule Strom.DSLTest do
MyFlow.stop()
end

describe "transform with options" do
defmodule FlowTransform do
use Strom.DSL

def fun(event, acc, opts) do
{[event + acc + opts[:add]], acc + opts[:add]}
end

def topology(_opts) do
[
source(:s1, [1, 2, 3]),
transform(:s1, &__MODULE__.fun/3, 1000, opts: %{add: 1})
]
end
end

test "transform with options" do
FlowTransform.start()
%{s1: stream} = FlowTransform.call(%{})
assert Enum.to_list(stream) == [1002, 1004, 1006]
end
end

describe "combining several flows" do
defmodule Flow1 do
use Strom.DSL
Expand All @@ -105,8 +107,8 @@ defmodule Strom.DSLTest do

def topology(_) do
[
function(:stream1, &__MODULE__.add_one/1),
function(:stream2, &__MODULE__.add_one/1)
transform(:stream1, &__MODULE__.add_one/1),
transform(:stream2, &__MODULE__.add_one/1)
]
end
end
Expand Down
Loading

0 comments on commit ede8090

Please sign in to comment.