Skip to content

Commit

Permalink
Basic components - done
Browse files Browse the repository at this point in the history
antonmi committed Jan 13, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 9a56e31 commit 01cc65d
Showing 19 changed files with 571 additions and 621 deletions.
3 changes: 2 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
@@ -4,4 +4,5 @@
- rename from to topology
- Different words for flow as data-structure and Flow module (maybe Topology with the call function)
- tick source with timeout
- :__all__ for calling components on all streams in flow
- :__all__ for calling components on all streams in flow
- source and sink parameterization (opts)
24 changes: 12 additions & 12 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
@@ -1,55 +1,55 @@
defmodule Strom.DSL do
defmacro source(names, origin) do
defmacro source(name, origin) do
quote do
unless is_struct(unquote(origin)) or is_list(unquote(origin)) do
raise "Source origin must be a struct or just simple list, given: #{inspect(unquote(origin))}"
end

%Strom.Source{origin: unquote(origin), names: unquote(names)}
%Strom.Source{origin: unquote(origin), name: unquote(name)}
end
end

defmacro sink(names, origin, sync \\ false) do
defmacro sink(name, origin, sync \\ false) do
quote do
unless is_struct(unquote(origin)) do
raise "Sink origin must be a struct, given: #{inspect(unquote(origin))}"
end

%Strom.Sink{origin: unquote(origin), names: unquote(names), sync: unquote(sync)}
%Strom.Sink{origin: unquote(origin), name: unquote(name), sync: unquote(sync)}
end
end

defmacro mix(inputs, output, opts \\ []) do
quote do
unless is_list(unquote(inputs)) do
raise "Mixer sources must be a list, given: #{inspect(unquote(inputs))}"
unless is_list(unquote(inputs)) or is_map(unquote(inputs)) do
raise "Mixer sources must be a list or map, given: #{inspect(unquote(inputs))}"
end

%Strom.Mixer{inputs: unquote(inputs), output: unquote(output), opts: unquote(opts)}
end
end

defmacro split(input, partitions, opts \\ []) do
defmacro split(input, outputs, opts \\ []) do
quote do
unless is_map(unquote(partitions)) and map_size(unquote(partitions)) > 0 do
raise "Branches in splitter must be a map, given: #{inspect(unquote(partitions))}"
unless is_map(unquote(outputs)) and map_size(unquote(outputs)) > 0 do
raise "Branches in splitter must be a map, given: #{inspect(unquote(outputs))}"
end

%Strom.Splitter{
input: unquote(input),
partitions: unquote(partitions),
outputs: unquote(outputs),
opts: unquote(opts)
}
end
end

defmacro transform(inputs, function, acc \\ nil, opts \\ []) do
defmacro transform(names, function, acc \\ nil, opts \\ []) do
quote do
%Strom.Transformer{
function: unquote(function),
acc: unquote(acc),
opts: unquote(opts),
inputs: unquote(inputs)
names: unquote(names)
}
end
end
78 changes: 34 additions & 44 deletions lib/gen_mix.ex
Original file line number Diff line number Diff line change
@@ -4,6 +4,8 @@ defmodule Strom.GenMix do
@buffer 1000

defstruct pid: nil,
inputs: [],
outputs: [],
opts: [],
flow_pid: nil,
sup_pid: nil,
@@ -22,17 +24,11 @@ defmodule Strom.GenMix do
| buffer: Keyword.get(opts, :buffer, @buffer)
}

{:ok, pid} = DynamicSupervisor.start_child(gen_mix.sup_pid, {__MODULE__, gen_mix})
__state__(pid)
end

def start(opts) when is_list(opts) do
state = %__MODULE__{
buffer: Keyword.get(opts, :buffer, @buffer)
}

{:ok, pid} = GenServer.start_link(__MODULE__, state)
__state__(pid)
if gen_mix.sup_pid do
DynamicSupervisor.start_child(gen_mix.sup_pid, {__MODULE__, gen_mix})
else
start_link(gen_mix)
end
end

def start_link(%__MODULE__{} = state) do
@@ -44,39 +40,18 @@ defmodule Strom.GenMix do
{:ok, %{mix | pid: self()}}
end

def call(flow, %__MODULE__{} = mix, inputs, outputs)
when is_map(flow) and is_map(inputs) and is_map(outputs) do
input_streams =
Enum.reduce(inputs, %{}, fn {name, fun}, acc ->
Map.put(acc, {name, fun}, Map.fetch!(flow, name))
end)

sub_flow =
outputs
|> Enum.reduce(%{}, fn {name, fun}, flow ->
consumer = Consumer.start({name, fun}, mix.pid)
:ok = GenServer.call(mix.pid, {:register_consumer, {{name, fun}, consumer}})
stream = Consumer.call(consumer)
Map.put(flow, name, stream)
end)

:ok = GenServer.call(mix.pid, {:run_inputs, input_streams})

flow
|> Map.drop(Map.keys(inputs))
|> Map.merge(sub_flow)
def call(flow, pid) do
GenServer.call(pid, {:call, flow})
end

def stop(%__MODULE__{pid: pid, sup_pid: sup_pid}) do
def stop(pid, sup_pid) do
if sup_pid do
:ok
else
GenServer.call(pid, :stop)
end
end

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

defp run_inputs(streams, pid, buffer) do
Enum.reduce(streams, %{}, fn {{name, fun}, stream}, acc ->
task = async_run_stream({name, fun}, stream, buffer, pid)
@@ -112,23 +87,38 @@ defmodule Strom.GenMix do
end

@impl true
def handle_call({:run_inputs, streams_to_mix}, _from, %__MODULE__{} = mix) do
producers = run_inputs(streams_to_mix, mix.pid, mix.buffer)
def handle_call({:call, flow}, _from, %__MODULE__{} = mix) do
input_streams =
Enum.reduce(mix.inputs, %{}, fn {name, fun}, acc ->
Map.put(acc, {name, fun}, Map.fetch!(flow, name))
end)

{:reply, :ok, %{mix | running: true, producers: producers}}
end
{sub_flow, mix} =
mix.outputs
|> Enum.reduce({%{}, mix}, fn {name, fun}, {flow, mix} ->
consumer = Consumer.start({name, fun}, mix.pid)

mix = %{mix | consumers: Map.put(mix.consumers, {name, fun}, consumer)}

def handle_call({:register_consumer, {{name, fun}, cons}}, _from, %__MODULE__{} = mix) do
mix = %{mix | consumers: Map.put(mix.consumers, {name, fun}, cons)}
{:reply, :ok, mix}
stream = Consumer.call(consumer)
{Map.put(flow, name, stream), mix}
end)

producers = run_inputs(input_streams, mix.pid, mix.buffer)

flow =
flow
|> Map.drop(Map.keys(mix.inputs))
|> Map.merge(sub_flow)

{:reply, flow, %{mix | running: true, producers: producers}}
end


def handle_call(:stop, _from, %__MODULE__{} = mix) do
{:stop, :normal, :ok, %{mix | running: false}}
end

def handle_call(:__state__, _from, mix), do: {:reply, mix, mix}

@impl true
def handle_cast({:new_data, {_name, _fun}, chunk}, %__MODULE__{} = mix) do
Enum.each(mix.consumers, fn {_, cons} ->
73 changes: 42 additions & 31 deletions lib/mixer.ex
Original file line number Diff line number Diff line change
@@ -1,42 +1,53 @@
defmodule Strom.Mixer do
alias Strom.GenMix

defstruct [:gen_mix, :inputs, :output, :opts, :flow_pid, :sup_pid]

def new(inputs, output, opts \\ []) do
unless is_list(inputs) do
raise "Mixer sources must be a list, given: #{inspect(inputs)}"
end

%__MODULE__{inputs: inputs, output: output, opts: opts}
end

def start(args \\ [])

def start(%__MODULE__{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid} = mixer) do
gen_mix = %GenMix{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
GenMix.start(gen_mix)
defstruct pid: nil,
inputs: [],
output: nil,
opts: [],
flow_pid: nil,
sup_pid: nil

def new(inputs, output)
when is_list(inputs) or (is_map(inputs) and map_size(inputs) > 0) do
%__MODULE__{inputs: inputs, output: output}
end

def start(opts) when is_list(opts) do
GenMix.start(opts)
end

def call(flow, %__MODULE__{gen_mix: mix}, to_mix, name) when is_map(flow) and is_list(to_mix) do
def start(
%__MODULE__{
inputs: inputs,
output: output,
flow_pid: flow_pid,
sup_pid: sup_pid
} = mixer,
opts \\ []
) do
inputs =
Enum.reduce(to_mix, %{}, fn name, acc ->
Map.put(acc, name, fn _el -> true end)
end)

outputs = %{name => fn _el -> true end}

GenMix.call(flow, mix, inputs, outputs)
if is_list(inputs) do
Enum.reduce(inputs, %{}, fn name, acc ->
Map.put(acc, name, fn _el -> true end)
end)
else
inputs
end

outputs = %{output => fn _el -> true end}

gen_mix = %GenMix{
inputs: inputs,
outputs: outputs,
opts: opts,
flow_pid: flow_pid,
sup_pid: sup_pid
}

{:ok, pid} = GenMix.start(gen_mix)
%{mixer | pid: pid, opts: opts}
end

def call(flow, %__MODULE__{gen_mix: mix}, to_mix, name) when is_map(flow) and is_map(to_mix) do
outputs = %{name => fn _el -> true end}
GenMix.call(flow, mix, to_mix, outputs)
def call(flow, %__MODULE__{pid: pid}) do
GenMix.call(flow, pid)
end

def stop(%__MODULE__{gen_mix: mix}), do: GenMix.stop(mix)
def stop(%__MODULE__{pid: pid, sup_pid: sup_pid}), do: GenMix.stop(pid, sup_pid)
end
6 changes: 5 additions & 1 deletion lib/renamer.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
defmodule Strom.Renamer do
defstruct names: %{}

def start(names) when is_map(names) do
def new(names) when is_map(names) and map_size(names) > 0 do
%__MODULE__{names: names}
end

def start(%__MODULE__{names: names} = renamer) when is_map(names) and map_size(names) > 0 do
renamer
end

def call(flow, %__MODULE__{names: names}) do
Enum.reduce(names, flow, fn {name, new_name}, acc ->
acc
79 changes: 33 additions & 46 deletions lib/sink.ex
Original file line number Diff line number Diff line change
@@ -6,41 +6,40 @@ defmodule Strom.Sink do
use GenServer

defstruct origin: nil,
names: [],
name: nil,
sync: false,
pid: nil,
flow_pid: nil,
sup_pid: nil

def new(names, origin, sync \\ false) do
def new(name, origin, sync \\ false) do
unless is_struct(origin) do
raise "Sink origin must be a struct, given: #{inspect(origin)}"
end

%__MODULE__{origin: origin, names: names, sync: sync}
%__MODULE__{origin: origin, name: name, sync: sync}
end

def start(%__MODULE__{origin: origin} = sink) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
sink = %{sink | origin: origin}
{:ok, pid} = DynamicSupervisor.start_child(sink.sup_pid, {__MODULE__, sink})
__state__(pid)
end

def start(origin) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
state = %__MODULE__{origin: origin}
{:ok, pid} =
if sink.sup_pid do
DynamicSupervisor.start_child(sink.sup_pid, {__MODULE__, sink})
else
start_link(sink)
end

{:ok, pid} = GenServer.start_link(__MODULE__, state)
__state__(pid)
end

def start_link(%__MODULE__{} = state) do
GenServer.start_link(__MODULE__, state)
def start_link(%__MODULE__{} = sink) do
GenServer.start_link(__MODULE__, sink)
end

@impl true
def init(%__MODULE__{} = state), do: {:ok, %{state | pid: self()}}
def init(%__MODULE__{} = sink), do: {:ok, %{sink | pid: self()}}

def call(%__MODULE__{pid: pid}, data), do: GenServer.call(pid, {:call, data})

@@ -54,57 +53,45 @@ defmodule Strom.Sink do
end
end

def call(flow, sink, names, sync \\ false)
def call(flow, %__MODULE__{name: name, sync: sync} = sink) when is_map(flow) do
stream = Map.fetch!(flow, name)

def call(flow, %__MODULE__{} = sink, names, sync)
when is_map(flow) and is_list(names) and is_boolean(sync) do
sub_flow =
Enum.reduce(names, %{}, fn name, acc ->
Map.put(acc, name, Map.fetch!(flow, name))
stream =
Stream.transform(stream, sink, fn el, sink ->
call(sink, el)
{[], sink}
end)

Enum.reduce(sub_flow, flow, fn {name, stream}, acc ->
stream =
Stream.transform(stream, sink, fn el, sink ->
call(sink, el)
{[], sink}
end)

if sync do
Stream.run(stream)
else
Task.async(fn -> Stream.run(stream) end)
end

Map.delete(acc, name)
end)
end
if sync do
Stream.run(stream)
else
Task.async(fn -> Stream.run(stream) end)
end

def call(flow, sink, name, sync) when is_map(flow) and is_boolean(sync) do
call(flow, sink, [name], sync)
Map.delete(flow, name)
end

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

@impl true
def handle_call({:call, data}, _from, %__MODULE__{origin: origin} = state) do
{[], state} =
def handle_call({:call, data}, _from, %__MODULE__{origin: origin} = sink) do
{[], sink} =
case apply(origin.__struct__, :call, [origin, data]) do
{:ok, {[], origin}} ->
{[], %{state | origin: origin}}
{[], %{sink | origin: origin}}

{:error, {:halt, origin}} ->
{:halt, %{state | origin: origin}}
{:halt, %{sink | origin: origin}}
end

{:reply, {[], state}, state}
{:reply, {[], sink}, sink}
end

def handle_call(:stop, _from, %__MODULE__{origin: origin} = state) do
def handle_call(:stop, _from, %__MODULE__{origin: origin} = sink) do
origin = apply(origin.__struct__, :stop, [origin])
state = %{state | origin: origin}
{:stop, :normal, :ok, state}
sink = %{sink | origin: origin}
{:stop, :normal, :ok, sink}
end

def handle_call(:__state__, _from, state), do: {:reply, state, state}
def handle_call(:__state__, _from, sink), do: {:reply, sink, sink}
end
50 changes: 18 additions & 32 deletions lib/source.ex
Original file line number Diff line number Diff line change
@@ -7,17 +7,17 @@ defmodule Strom.Source do
use GenServer

defstruct origin: nil,
names: [],
name: nil,
pid: nil,
flow_pid: nil,
sup_pid: nil

def new(names, origin) do
def new(name, origin) do
unless is_struct(origin) or is_list(origin) do
raise "Source origin must be a struct or just simple list, given: #{inspect(origin)}"
end

%__MODULE__{origin: origin, names: names}
%__MODULE__{origin: origin, name: name}
end

def start(%__MODULE__{origin: list} = source) when is_list(list) do
@@ -27,19 +27,14 @@ defmodule Strom.Source do
def start(%__MODULE__{origin: origin} = source) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
source = %{source | origin: origin}
{:ok, pid} = DynamicSupervisor.start_child(source.sup_pid, {__MODULE__, source})
__state__(pid)
end

def start(list) when is_list(list) do
start(%Strom.Source.Events{events: list})
end

def start(origin) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
state = %__MODULE__{origin: origin}
{:ok, pid} =
if source.sup_pid do
DynamicSupervisor.start_child(source.sup_pid, {__MODULE__, source})
else
start_link(source)
end

{:ok, pid} = GenServer.start_link(__MODULE__, state)
__state__(pid)
end

@@ -64,25 +59,16 @@ defmodule Strom.Source do
end
end

def call(flow, %__MODULE__{} = source, names) when is_map(flow) and is_list(names) do
sub_flow =
Enum.reduce(names, %{}, fn name, acc ->
stream =
Stream.resource(
fn -> source end,
fn source -> call(source) end,
fn source -> source end
)

prev_stream = Map.get(flow, name, [])
Map.put(acc, name, Stream.concat(prev_stream, stream))
end)

Map.merge(flow, sub_flow)
end
def call(flow, %__MODULE__{name: name} = source) when is_map(flow) do
stream =
Stream.resource(
fn -> source end,
fn source -> call(source) end,
fn source -> source end
)

def call(flow, source, name) when is_map(flow) do
call(flow, source, [name])
prev_stream = Map.get(flow, name, [])
Map.put(flow, name, Stream.concat(prev_stream, stream))
end

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)
66 changes: 36 additions & 30 deletions lib/splitter.ex
Original file line number Diff line number Diff line change
@@ -1,42 +1,48 @@
defmodule Strom.Splitter do
alias Strom.GenMix

defstruct [:gen_mix, :input, :partitions, :opts, :flow_pid, :sup_pid]

def new(input, partitions, opts \\ []) do
unless is_map(partitions) and map_size(partitions) > 0 do
raise "Branches in splitter must be a map, given: #{inspect(partitions)}"
end

%Strom.Splitter{input: input, partitions: partitions, opts: opts}
end

def start(args \\ [])

def start(%__MODULE__{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}) do
gen_mix = %GenMix{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
GenMix.start(gen_mix)
end

def start(opts) when is_list(opts) do
GenMix.start(opts)
defstruct pid: nil,
input: nil,
outputs: [],
opts: [],
flow_pid: nil,
sup_pid: nil

def new(input, outputs) when is_list(outputs) or (is_map(outputs) and map_size(outputs)) > 0 do
%Strom.Splitter{input: input, outputs: outputs}
end

def call(flow, %__MODULE__{gen_mix: mix}, name, partitions) when is_list(partitions) do
inputs = %{name => fn _el -> true end}
def start(
%__MODULE__{input: input, outputs: outputs, flow_pid: flow_pid, sup_pid: sup_pid} =
splitter,
opts \\ []
) do
inputs = %{input => fn _el -> true end}

outputs =
Enum.reduce(partitions, %{}, fn name, acc ->
Map.put(acc, name, fn _el -> true end)
end)

GenMix.call(flow, mix, inputs, outputs)
if is_list(outputs) do
Enum.reduce(outputs, %{}, fn name, acc ->
Map.put(acc, name, fn _el -> true end)
end)
else
outputs
end

gen_mix = %GenMix{
inputs: inputs,
outputs: outputs,
opts: opts,
flow_pid: flow_pid,
sup_pid: sup_pid
}

{:ok, pid} = GenMix.start(gen_mix)
%{splitter | pid: pid, opts: opts}
end

def call(flow, %__MODULE__{gen_mix: mix}, name, partitions) when is_map(partitions) do
inputs = %{name => fn _el -> true end}
GenMix.call(flow, mix, inputs, partitions)
def call(flow, %__MODULE__{pid: pid}) do
GenMix.call(flow, pid)
end

def stop(%__MODULE__{gen_mix: mix}), do: GenMix.stop(mix)
def stop(%__MODULE__{pid: pid, sup_pid: sup_pid}), do: GenMix.stop(pid, sup_pid)
end
47 changes: 21 additions & 26 deletions lib/topology.ex
Original file line number Diff line number Diff line change
@@ -28,24 +28,23 @@ defmodule Strom.Topology do
components
|> Enum.map(fn component ->
case component do
%Strom.Source{origin: origin, names: names} ->
%{Strom.Source.start(origin) | names: names}
%Strom.Source{} = source ->
Strom.Source.start(source)

%Strom.Sink{origin: origin, names: names} ->
%{Strom.Sink.start(origin) | names: names}
%Strom.Sink{} = sink ->
Strom.Sink.start(sink)

%Strom.Mixer{opts: opts} = mixer ->
%{mixer | gen_mix: Strom.Mixer.start(opts)}
Strom.Mixer.start(mixer, opts)

%Strom.Splitter{opts: opts} = split ->
%{split | gen_mix: Strom.Splitter.start(opts)}
%Strom.Splitter{opts: opts} = splitter ->
Strom.Splitter.start(splitter, opts)

%Strom.Transformer{opts: opts, function: function, acc: acc, inputs: inputs}
when is_list(opts) ->
%{Strom.Transformer.start(opts) | function: function, acc: acc, inputs: inputs}
%Strom.Transformer{opts: opts} = transformer when is_list(opts) ->
Strom.Transformer.start(transformer, opts)

%Strom.Renamer{names: names} ->
Strom.Renamer.start(names)
%Strom.Renamer{} = renamer ->
Strom.Renamer.start(renamer)
end
end)
end
@@ -63,24 +62,20 @@ defmodule Strom.Topology do
flow =
Enum.reduce(topology.components, init_flow, fn component, flow ->
case component do
%Strom.Source{names: names} = source ->
Strom.Source.call(flow, source, names)
%Strom.Source{} = source ->
Strom.Source.call(flow, source)

%Strom.Sink{names: names, sync: sync} = sink ->
Strom.Sink.call(flow, sink, names, sync)
%Strom.Sink{} = sink ->
Strom.Sink.call(flow, sink)

%Strom.Mixer{inputs: inputs, output: output} = mixer ->
Strom.Mixer.call(flow, mixer, inputs, output)
%Strom.Mixer{} = mixer ->
Strom.Mixer.call(flow, mixer)

%Strom.Splitter{input: input, partitions: partitions} = splitter ->
Strom.Splitter.call(flow, splitter, input, partitions)
%Strom.Splitter{} = splitter ->
Strom.Splitter.call(flow, splitter)

%Strom.Transformer{function: function, acc: acc, inputs: inputs} = transformer ->
if is_function(function, 1) do
Strom.Transformer.call(flow, transformer, inputs, function)
else
Strom.Transformer.call(flow, transformer, inputs, {function, acc})
end
%Strom.Transformer{} = transformer ->
Strom.Transformer.call(flow, transformer)

%Strom.Renamer{} = renamer ->
Strom.Renamer.call(flow, renamer)
47 changes: 21 additions & 26 deletions lib/transformer.ex
Original file line number Diff line number Diff line change
@@ -8,42 +8,35 @@ defmodule Strom.Transformer do
buffer: @buffer,
function: nil,
acc: nil,
opts: nil,
inputs: [],
opts: [],
names: [],
sup_pid: nil,
flow_pid: nil,
tasks: %{},
data: %{}

def new(inputs, function, acc \\ nil, opts \\ []) do
def new(names, function, acc \\ nil) do
%__MODULE__{
function: function,
acc: acc,
opts: opts,
inputs: inputs
names: names
}
end

def start(args \\ [])

def start(%__MODULE__{opts: opts, sup_pid: sup_pid} = transformer) do
def start(%__MODULE__{sup_pid: sup_pid} = transformer, opts \\ []) do
transformer = %{
transformer
| buffer: Keyword.get(opts, :buffer, @buffer),
opts: Keyword.get(opts, :opts, nil)
opts: if(length(opts) > 0, do: opts, else: Keyword.get(opts, :opts, []))
}

{:ok, pid} = DynamicSupervisor.start_child(sup_pid, {__MODULE__, transformer})
__state__(pid)
end

def start(opts) when is_list(opts) do
state = %__MODULE__{
buffer: Keyword.get(opts, :buffer, @buffer),
opts: Keyword.get(opts, :opts, nil)
}
{:ok, pid} =
if sup_pid do
DynamicSupervisor.start_child(sup_pid, {__MODULE__, transformer})
else
start_link(transformer)
end

{:ok, pid} = GenServer.start_link(__MODULE__, state)
__state__(pid)
end

@@ -56,7 +49,7 @@ defmodule Strom.Transformer do
{:ok, %{call | pid: self()}}
end

def call(flow, %__MODULE__{} = transformer, names, {function, acc})
def call(flow, %__MODULE__{names: names, function: function, acc: acc} = transformer)
when is_map(flow) and is_function(function, 3) do
names = if is_list(names), do: names, else: [names]

@@ -102,16 +95,18 @@ defmodule Strom.Transformer do
|> Map.merge(sub_flow)
end

def call(flow, %__MODULE__{} = transformer, names, {function, acc})
def call(flow, %__MODULE__{function: function} = transformer)
when is_map(flow) and is_function(function, 2) do
fun = fn el, acc, nil -> function.(el, acc) end
call(flow, transformer, names, {fun, acc})
fun = fn el, acc, [] -> function.(el, acc) end
transformer = %{transformer | function: fun}
call(flow, transformer)
end

def call(flow, %__MODULE__{} = transformer, names, function)
def call(flow, %__MODULE__{function: function} = transformer)
when is_map(flow) and is_function(function, 1) do
fun = fn el, nil, nil -> {[function.(el)], nil} end
call(flow, transformer, names, {fun, nil})
fun = fn el, nil, [] -> {[function.(el)], nil} end
transformer = %{transformer | function: fun}
call(flow, transformer)
end

def stop(%__MODULE__{pid: pid, sup_pid: sup_pid}) do
107 changes: 0 additions & 107 deletions test/data/output.csv
Original file line number Diff line number Diff line change
@@ -105,110 +105,3 @@ ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
120 changes: 71 additions & 49 deletions test/gen_mix_test.exs
Original file line number Diff line number Diff line change
@@ -4,17 +4,15 @@ defmodule Strom.GenMixTest do
alias Strom.GenMix

test "start and stop" do
mix = GenMix.start()
assert Process.alive?(mix.pid)
:ok = GenMix.stop(mix)
refute Process.alive?(mix.pid)
{:ok, pid} = GenMix.start(%GenMix{})
assert Process.alive?(pid)
:ok = GenMix.stop(pid, nil)
refute Process.alive?(pid)
end

test "call" do
flow = %{numbers1: [1, 2, 3, 4, 5], numbers2: [6, 7, 8, 9, 10], numbers3: [0, 0, 0, 0, 0]}

mix = GenMix.start()

inputs = %{
numbers1: fn el -> el < 5 end,
numbers2: fn el -> el > 6 end
@@ -25,7 +23,11 @@ defmodule Strom.GenMixTest do
even: fn el -> rem(el, 2) == 0 end
}

flow = GenMix.call(flow, mix, inputs, outputs)
gen_mix = %GenMix{inputs: inputs, outputs: outputs}

{:ok, pid} = GenMix.start(gen_mix)

flow = GenMix.call(flow, pid)

assert Enum.sort(Enum.to_list(flow[:odd])) == [1, 3, 7, 9]
assert Enum.sort(Enum.to_list(flow[:even])) == [2, 4, 8, 10]
@@ -40,8 +42,6 @@ defmodule Strom.GenMixTest do
numbers3: Enum.to_list(1..100_000)
}

mix = GenMix.start()

inputs = %{
numbers1: fn el -> rem(el, 3) == 0 end,
numbers2: fn el -> rem(el, 4) == 0 end,
@@ -53,7 +53,10 @@ defmodule Strom.GenMixTest do
even: fn el -> rem(el, 2) == 0 end
}

flow = GenMix.call(flow, mix, inputs, outputs)
gen_mix = %GenMix{inputs: inputs, outputs: outputs}
{:ok, pid} = GenMix.start(gen_mix)

flow = GenMix.call(flow, pid)

task1 =
Task.async(fn ->
@@ -71,43 +74,62 @@ defmodule Strom.GenMixTest do
Task.await(task2, :infinity)
end

# test "huge files" do
# :observer.start()
# source1 = Strom.Source.start(%Strom.Source.ReadLines{path: "test_data/orders.csv"})
# source2 = Strom.Source.start(%Strom.Source.ReadLines{path: "test_data/parcels.csv"})
#
# sink1 = Strom.Sink.start(%Strom.Sink.WriteLines{path: "test_data/odd.csv"})
# sink2 = Strom.Sink.start(%Strom.Sink.WriteLines{path: "test_data/even.csv"})
#
# flow =
# %{}
# |> Strom.Source.call(source1, :source1)
# |> Strom.Source.call(source2, :source2)
#
# mix1 = GenMix.start()
# mix2 = GenMix.start()
# call1 = Strom.Transformer.start()
# call2 = Strom.Transformer.start()
#
# inputs = %{
# source1: fn el -> el end,
# source2: fn el -> el end
# }
#
# outputs = %{
# odd: fn el -> rem(el, 2) == 1 end,
# even: fn el -> rem(el, 2) == 0 end
# }
#
# function1 = fn el -> String.length(el) end
# function2 = fn el -> "#{el}" end
#
# flow
# |> GenMix.call(mix1, inputs, inputs)
# |> Strom.Transformer.call(call1, [:source1, :source2], function1)
# |> GenMix.call(mix2, inputs, outputs)
# |> Strom.Transformer.call(call2, [:odd, :even], function2)
# |> Strom.Sink.call(sink1, [:odd])
# |> Strom.Sink.call(sink2, [:even], true)
# end
test "huge files" do
# :observer.start()
# source1 =
# :source1
# |> Strom.Source.new(%Strom.Source.ReadLines{path: "test_data/orders.csv"})
# |> Strom.Source.start()
#
# source2 =
# :source2
# |> Strom.Source.new(%Strom.Source.ReadLines{path: "test_data/parcels.csv"})
# |> Strom.Source.start()
#
# sink1 =
# :odd
# |> Strom.Sink.new(%Strom.Sink.WriteLines{path: "test_data/odd.csv"})
# |> Strom.Sink.start()
#
# sink2 =
# :even
# |> Strom.Sink.new(%Strom.Sink.WriteLines{path: "test_data/even.csv"}, true)
# |> Strom.Sink.start()
#
# flow =
# %{}
# |> Strom.Source.call(source1)
# |> Strom.Source.call(source2)
#
# inputs = %{
# source1: fn el -> el end,
# source2: fn el -> el end
# }
#
# outputs = %{
# odd: fn el -> rem(el, 2) == 1 end,
# even: fn el -> rem(el, 2) == 0 end
# }
#
# {:ok, mix1} = GenMix.start(%GenMix{inputs: inputs, outputs: inputs})
# {:ok, mix2} = GenMix.start(%GenMix{inputs: inputs, outputs: outputs})
#
# transformer1 =
# [:source1, :source2]
# |> Strom.Transformer.new(fn el -> String.length(el) end)
# |> Strom.Transformer.start()
#
# transformer2 =
# [:odd, :even]
# |> Strom.Transformer.new(fn el -> "#{el}" end)
# |> Strom.Transformer.start()
#
# flow
# |> GenMix.call(mix1)
# |> Strom.Transformer.call(transformer1)
# |> GenMix.call(mix2)
# |> Strom.Transformer.call(transformer2)
# |> Strom.Sink.call(sink1)
# |> Strom.Sink.call(sink2)
end
end
228 changes: 139 additions & 89 deletions test/mixer_test.exs
Original file line number Diff line number Diff line change
@@ -5,104 +5,154 @@ defmodule Strom.MixerTest do
alias Strom.Source.ReadLines
alias Strom.Mixer

def orders_and_parcels do
orders =
"test/data/orders.csv"
|> File.read!()
|> String.split("\n")

parcels =
"test/data/parcels.csv"
|> File.read!()
|> String.split("\n")

{orders, parcels}
end

setup do
source1 = Source.start(%ReadLines{path: "test/data/orders.csv"})
source2 = Source.start(%ReadLines{path: "test/data/parcels.csv"})
source3 = Source.start(%ReadLines{path: "test/data/parcels.csv"})

flow =
%{}
|> Source.call(source1, :source1)
|> Source.call(source2, :source2)
|> Source.call(source3, :source3)
mixer =
[:stream1, :stream2]
|> Mixer.new(:stream)
|> Mixer.start(buffer: 1)

%{flow: flow}
%{mixer: mixer}
end

test "start and stop" do
mixer = Mixer.start()
test "start and stop", %{mixer: mixer} do
assert Process.alive?(mixer.pid)
:ok = Mixer.stop(mixer)
Mixer.stop(mixer)
refute Process.alive?(mixer.pid)
end

test "call with map", %{flow: flow} do
mixer = Mixer.start()

partitions = %{
source1: fn el -> String.contains?(el, "111") end,
source2: fn el -> String.contains?(el, "111") end
}

%{mixed: mixed, source3: source3} = Mixer.call(flow, mixer, partitions, :mixed)

lines = Enum.to_list(mixed)
assert length(lines) == 99
{_orders, parcels} = orders_and_parcels()

source3_lines = Enum.to_list(source3)
assert length(source3_lines) == length(parcels)
end

test "call with list of streams", %{flow: flow} do
mixer = Mixer.start()

%{mixed: mixed, source3: source3} = Mixer.call(flow, mixer, [:source1, :source2], :mixed)

lines = Enum.to_list(mixed)
{orders, parcels} = orders_and_parcels()
assert lines -- (orders ++ parcels) == []
assert (orders ++ parcels) -- lines == []

source3_lines = Enum.to_list(source3)
assert length(source3_lines) == length(parcels)
end

test "stream two identical sources" do
source = Source.start(%ReadLines{path: "test/data/orders.csv"})
mixer = Mixer.start()

%{stream: stream} =
%{}
|> Source.call(source, :s1)
|> Source.call(source, :s2)
|> Mixer.call(mixer, [:s1, :s2], :stream)

lines = Enum.to_list(stream)

{orders, _parcels} = orders_and_parcels()
assert length(lines) == length(orders)
test "do mix", %{mixer: mixer} do
flow = %{stream1: [1, 2, 3], stream2: [4, 5, 6]}
%{stream: stream} = Mixer.call(flow, mixer)
assert Enum.sort(Enum.to_list(stream)) == [1, 2, 3, 4, 5, 6]
end

test "mixer as filter" do
source1 = Source.start(%ReadLines{path: "test/data/orders.csv"})
mixer = Mixer.start()

partitions = %{
stream: fn el -> String.contains?(el, "111,3") end
}

%{stream: stream} =
%{}
|> Source.call(source1, :stream)
|> Mixer.call(mixer, partitions, :stream)

stream
|> Enum.to_list()
|> Enum.each(fn el -> assert String.contains?(el, "111,3") end)
describe "complex cases" do
def orders_and_parcels do
orders =
"test/data/orders.csv"
|> File.read!()
|> String.split("\n")

parcels =
"test/data/parcels.csv"
|> File.read!()
|> String.split("\n")

{orders, parcels}
end

setup do
source1 =
:source1
|> Source.new(%ReadLines{path: "test/data/orders.csv"})
|> Source.start()

source2 =
:source2
|> Source.new(%ReadLines{path: "test/data/parcels.csv"})
|> Source.start()

source3 =
:source3
|> Source.new(%ReadLines{path: "test/data/parcels.csv"})
|> Source.start()

flow =
%{}
|> Source.call(source1)
|> Source.call(source2)
|> Source.call(source3)

%{flow: flow}
end

test "call with map", %{flow: flow} do
partitions = %{
source1: fn el -> String.contains?(el, "111") end,
source2: fn el -> String.contains?(el, "111") end
}

mixer =
partitions
|> Mixer.new(:mixed)
|> Mixer.start()

%{mixed: mixed, source3: source3} = Mixer.call(flow, mixer)

lines = Enum.to_list(mixed)
assert length(lines) == 99
{_orders, parcels} = orders_and_parcels()

source3_lines = Enum.to_list(source3)
assert length(source3_lines) == length(parcels)
end

test "call with list of streams", %{flow: flow} do
mixer =
[:source1, :source2]
|> Mixer.new(:mixed)
|> Mixer.start()

%{mixed: mixed, source3: source3} = Mixer.call(flow, mixer)

lines = Enum.to_list(mixed)
{orders, parcels} = orders_and_parcels()
assert lines -- (orders ++ parcels) == []
assert (orders ++ parcels) -- lines == []

source3_lines = Enum.to_list(source3)
assert length(source3_lines) == length(parcels)
end

test "stream one file into two streams" do
source1 =
:s1
|> Source.new(%ReadLines{path: "test/data/orders.csv"})
|> Source.start()

source2 =
:s2
|> Source.new(%ReadLines{path: "test/data/orders.csv"})
|> Source.start()

mixer =
[:s1, :s2]
|> Mixer.new(:stream)
|> Mixer.start()

%{stream: stream} =
%{}
|> Source.call(source1)
|> Source.call(source2)
|> Mixer.call(mixer)

lines = Enum.to_list(stream)

{orders, _parcels} = orders_and_parcels()
assert length(lines) == length(orders) * 2
end

test "mixer as simple filter" do
source1 =
:stream
|> Source.new(%ReadLines{path: "test/data/orders.csv"})
|> Source.start()

mixer =
%{
stream: fn el -> String.contains?(el, "111,3") end
}
|> Mixer.new(:stream)
|> Mixer.start()

%{stream: stream} =
%{}
|> Source.call(source1)
|> Mixer.call(mixer)

stream
|> Enum.to_list()
|> Enum.each(fn el -> assert String.contains?(el, "111,3") end)
end
end
end
22 changes: 18 additions & 4 deletions test/renamer_test.exs
Original file line number Diff line number Diff line change
@@ -4,15 +4,24 @@ defmodule Strom.RenamerTest do
alias Strom.Renamer

test "start and stop" do
assert Renamer.start() == %Renamer{}
renamer =
%{a: :b}
|> Renamer.new()
|> Renamer.start()

assert renamer == %Renamer{names: %{a: :b}}
assert :ok = Renamer.stop(%Renamer{})
end

test "rename" do
names = %{s1: :foo1, s2: :foo2}
renamer =
%{s1: :foo1, s2: :foo2}
|> Renamer.new()
|> Renamer.start()

flow = %{s1: [1], s2: [2], s3: [3]}

new_flow = Renamer.call(flow, names)
new_flow = Renamer.call(flow, renamer)

refute new_flow[:s1]
refute new_flow[:s2]
@@ -23,8 +32,13 @@ defmodule Strom.RenamerTest do
end

test "raise when there is no such name" do
renamer =
%{s2: :foo2}
|> Renamer.new()
|> Renamer.start()

assert_raise KeyError, fn ->
Renamer.call(%{s1: [1]}, %{s2: :foo2})
Renamer.call(%{s1: [1]}, renamer)
end
end
end
54 changes: 14 additions & 40 deletions test/sink_test.exs
Original file line number Diff line number Diff line change
@@ -7,59 +7,33 @@ defmodule Strom.SinkTest do
alias Strom.Sink.WriteLines

def source do
path = "test/data/orders.csv"
Source.start(%ReadLines{path: path})
:my_stream
|> Source.new(%ReadLines{path: "test/data/orders.csv"})
|> Source.start()
end

setup do
sink = Sink.start(%WriteLines{path: "test/data/output.csv"})
sink =
:my_stream
|> Sink.new(%WriteLines{path: "test/data/output.csv"}, true)
|> Sink.start()

%{sink: sink}
end

test "sink init args", %{sink: sink} do
test "start and stop", %{sink: sink} do
assert Process.alive?(sink.pid)
assert sink.origin.path == "test/data/output.csv"
Sink.stop(sink)
refute Process.alive?(sink.pid)
end

test "stream lines", %{sink: sink} do
assert %{another_stream: another_stream} =
%{}
|> Source.call(source(), :my_stream)
|> Source.call(source(), :another_stream)
|> Sink.call(sink, :my_stream)

Process.sleep(100)
lines = Enum.to_list(another_stream)

assert Enum.join(lines, "\n") <> "\n" == File.read!("test/data/output.csv")
end

test "stream lines to one_sink", %{sink: sink} do
assert %{} =
%{}
|> Source.call(source(), :my_stream)
|> Source.call(source(), :another_stream)
|> Sink.call(sink, [:my_stream, :another_stream], true)
|> Source.call(source())
|> Sink.call(sink)

original_size = String.length(File.read!("test/data/orders.csv"))
output_size = String.length(File.read!("test/data/output.csv"))
assert 2 * (original_size + 1) == output_size
end

test "with sync lines", %{sink: sink} do
assert %{another_stream: another_stream} =
%{}
|> Source.call(source(), :my_stream)
|> Source.call(source(), :another_stream)
|> Sink.call(sink, :my_stream, true)

lines = Enum.to_list(another_stream)

assert Enum.join(lines, "\n") <> "\n" == File.read!("test/data/output.csv")
end

test "stop", %{sink: sink} do
assert Sink.stop(sink) == :ok
refute Process.alive?(sink.pid)
assert File.read!("test/data/orders.csv") <> "\n" == File.read!("test/data/output.csv")
end
end
52 changes: 19 additions & 33 deletions test/source_test.exs
Original file line number Diff line number Diff line change
@@ -5,8 +5,11 @@ defmodule Strom.SourceTest do
alias Strom.Source.ReadLines

setup do
path = "test/data/orders.csv"
source = Source.start(%ReadLines{path: path})
source =
:my_stream
|> Source.new(%ReadLines{path: "test/data/orders.csv"})
|> Source.start()

%{source: source}
end

@@ -22,18 +25,21 @@ defmodule Strom.SourceTest do
end

test "stream lines", %{source: source} do
%{my_stream: stream} = Source.call(%{}, source, :my_stream)
%{my_stream: stream} = Source.call(%{}, source)
lines = Enum.to_list(stream)
assert Enum.join(lines, "\n") == File.read!("test/data/orders.csv")
end

test "several sources", %{source: source} do
another_source = Source.start(%ReadLines{path: "test/data/orders.csv"})
another_source =
:another_stream
|> Source.new(%ReadLines{path: "test/data/orders.csv"})
|> Source.start()

%{my_stream: stream, another_stream: another_stream} =
%{}
|> Source.call(source, :my_stream)
|> Source.call(another_source, :another_stream)
|> Source.call(source)
|> Source.call(another_source)

list = Enum.to_list(stream)
another_list = Enum.to_list(another_stream)
@@ -43,34 +49,14 @@ defmodule Strom.SourceTest do
end

test "several sources for one stream", %{source: source} do
%{my_stream: stream} =
Source.call(%{my_stream: [1, 2, 3]}, source, :my_stream)

lines = Enum.to_list(stream)
assert Enum.member?(lines, 1)
assert Enum.member?(lines, 2)
assert Enum.member?(lines, 3)
end

test "apply source to several streams", %{source: source} do
%{stream1: stream1, stream2: stream2} = Source.call(%{}, source, [:stream1, :stream2])
source =
:my_stream
|> Source.new([4, 5, 6])
|> Source.start()

lines1 = Enum.to_list(stream1)
lines2 = Enum.to_list(stream2)

assert Enum.join(lines1 ++ lines2, "\n") == File.read!("test/data/orders.csv")
end

test "stop", %{source: source} do
assert Source.stop(source) == :ok
refute Process.alive?(source.pid)
end
%{my_stream: stream} = Source.call(%{my_stream: [1, 2, 3]}, source)

describe "events source" do
test "events" do
source = Source.start([1, 2, 3])
%{events: events} = Source.call(%{events: [0]}, source, :events)
assert Enum.to_list(events) == [0, 1, 2, 3]
end
numbers = Enum.to_list(stream)
assert Enum.sort(numbers) == [1, 2, 3, 4, 5, 6]
end
end
53 changes: 32 additions & 21 deletions test/splitter_test.exs
Original file line number Diff line number Diff line change
@@ -20,19 +20,36 @@ defmodule Strom.SplitterTest do
end

setup do
source1 = Source.start(%ReadLines{path: "test/data/orders.csv"})
source2 = Source.start(%ReadLines{path: "test/data/parcels.csv"})
source1 =
:orders
|> Source.new(%ReadLines{path: "test/data/orders.csv"})
|> Source.start()

source2 =
:parcels
|> Source.new(%ReadLines{path: "test/data/parcels.csv"})
|> Source.start()

flow =
%{}
|> Source.call(source1, :orders)
|> Source.call(source2, :parcels)
|> Source.call(source1)
|> Source.call(source2)

%{flow: flow}
end

test "start and stop" do
splitter = Splitter.start(Splitter.new(:s, [:s1, :s2]))
assert Process.alive?(splitter.pid)
:ok = Splitter.stop(splitter)
refute Process.alive?(splitter.pid)
end

test "splitter with list of streams", %{flow: flow} do
splitter = Splitter.start([])
splitter =
:orders
|> Splitter.new(["111", "222", "333"])
|> Splitter.start()

assert %{
:parcels => parcels,
@@ -41,7 +58,7 @@ defmodule Strom.SplitterTest do
"333" => stream3
} =
flow
|> Splitter.call(splitter, :orders, ["111", "222", "333"])
|> Splitter.call(splitter)

task111 = Task.async(fn -> Enum.to_list(stream1) end)
task222 = Task.async(fn -> Enum.to_list(stream2) end)
@@ -63,20 +80,21 @@ defmodule Strom.SplitterTest do
end

test "splitter with partitions", %{flow: flow} do
splitter = Splitter.start([])
splitter =
:orders
|> Splitter.new(%{
"111" => fn el -> String.contains?(el, ",111,") end,
"222" => fn el -> String.contains?(el, ",222,") end,
"333" => fn el -> String.contains?(el, ",333,") end
})
|> Splitter.start()

assert %{
:parcels => parcels,
"111" => stream1,
"222" => stream2,
"333" => stream3
} =
flow
|> Splitter.call(splitter, :orders, %{
"111" => fn el -> String.contains?(el, ",111,") end,
"222" => fn el -> String.contains?(el, ",222,") end,
"333" => fn el -> String.contains?(el, ",333,") end
})
} = Splitter.call(flow, splitter)

orders111 = Enum.to_list(stream1)
orders222 = Enum.to_list(stream2)
@@ -91,11 +109,4 @@ defmodule Strom.SplitterTest do
assert parcels -- original_parcels == []
assert original_parcels -- parcels == []
end

test "stop" do
splitter = Splitter.start([])
assert Process.alive?(splitter.pid)
:ok = Splitter.stop(splitter)
refute Process.alive?(splitter.pid)
end
end
22 changes: 15 additions & 7 deletions test/topology_test.exs
Original file line number Diff line number Diff line change
@@ -39,19 +39,19 @@ defmodule Strom.TopologyTest do
[source1, source2, mixer, transformer, splitter, sink1] = topology.components
assert Process.alive?(source1.pid)
assert Process.alive?(source2.pid)
assert Process.alive?(mixer.gen_mix.pid)
assert Process.alive?(mixer.pid)
assert Process.alive?(transformer.pid)
assert Process.alive?(splitter.gen_mix.pid)
assert Process.alive?(splitter.pid)
assert Process.alive?(sink1.pid)
end

def check_dead(topology) do
[source1, source2, mixer, transformer, splitter, sink1] = topology.components
refute Process.alive?(source1.pid)
refute Process.alive?(source2.pid)
refute Process.alive?(mixer.gen_mix.pid)
refute Process.alive?(mixer.pid)
refute Process.alive?(transformer.pid)
refute Process.alive?(splitter.gen_mix.pid)
refute Process.alive?(splitter.pid)
refute Process.alive?(sink1.pid)
end

@@ -102,13 +102,21 @@ defmodule Strom.TopologyTest do
test "compose" do
topology = Topology.start(MyTopology.components())
another_topology = Topology.start(AnotherTopology.components())
transformer = Transformer.start()
renamer = Renamer.start(%{even: :numbers})

transformer =
:even
|> Transformer.new(&(&1 * 3))
|> Transformer.start()

renamer =
%{even: :numbers}
|> Renamer.new()
|> Renamer.start()

flow =
%{}
|> Topology.call(topology)
|> Transformer.call(transformer, :even, &(&1 * 3))
|> Transformer.call(transformer)
|> Renamer.call(renamer)
|> Topology.call(another_topology)

61 changes: 39 additions & 22 deletions test/transformer_test.exs
Original file line number Diff line number Diff line change
@@ -4,56 +4,73 @@ defmodule Strom.TransformerTest do
alias Strom.Transformer

test "start and stop" do
transformer = Transformer.start()
transformer =
:stream
|> Transformer.new(&(&1 + 1))
|> Transformer.start()

assert Process.alive?(transformer.pid)
:ok = Transformer.stop(transformer)
refute Process.alive?(transformer.pid)
end

test "call" do
transformer = Transformer.start()
test "call with one stream" do
transformer =
:numbers
|> Transformer.new(&(&1 * &1))
|> Transformer.start()

flow = %{numbers: [1, 2, 3, 4, 5]}
flow = Transformer.call(flow, transformer)
assert Enum.sort(Enum.to_list(flow[:numbers])) == [1, 4, 9, 16, 25]
end

test "call with several streams" do
transformer =
[:numbers1, :numbers2]
|> Transformer.new(&(&1 * &1))
|> Transformer.start()

flow = %{numbers1: [1, 2, 3, 4, 5], numbers2: [6, 7, 8, 9, 10], numbers3: [0, 0, 0, 0, 0]}
fun = &(&1 * &1)
flow = Transformer.call(flow, transformer, [:numbers1, :numbers2], fun)
flow = Transformer.call(flow, transformer)

assert Enum.sort(Enum.to_list(flow[:numbers1])) == [1, 4, 9, 16, 25]
assert Enum.sort(Enum.to_list(flow[:numbers2])) == [36, 49, 64, 81, 100]
assert Enum.sort(Enum.to_list(flow[:numbers3])) == [0, 0, 0, 0, 0]
end

test "call with one stream" do
transformer = Transformer.start()
flow = %{numbers1: [1, 2, 3, 4, 5]}
flow = Transformer.call(flow, transformer, :numbers1, &(&1 * &1))
assert Enum.sort(Enum.to_list(flow[:numbers1])) == [1, 4, 9, 16, 25]
end

test "call with accumulator" do
transformer = Transformer.start()

flow = %{numbers1: [1, 2, 3, 4, 5], numbers2: [6, 7, 8, 9, 10], numbers3: [0, 0, 0, 0, 0]}

fun = fn el, acc ->
{[el, acc], acc + 1}
end

flow = Transformer.call(flow, transformer, [:numbers1, :numbers2], {fun, 100})
transformer =
[:numbers1, :numbers2]
|> Transformer.new(fun, 100)
|> Transformer.start()

flow = %{numbers1: [1, 2, 3, 4, 5], numbers2: [6, 7, 8, 9, 10], numbers3: [0, 0, 0, 0, 0]}

flow = Transformer.call(flow, transformer)

assert Enum.sort(Enum.to_list(flow[:numbers1])) == [1, 2, 3, 4, 5, 100, 101, 102, 103, 104]
assert Enum.sort(Enum.to_list(flow[:numbers2])) == [6, 7, 8, 9, 10, 100, 101, 102, 103, 104]
assert Enum.sort(Enum.to_list(flow[:numbers3])) == [0, 0, 0, 0, 0]
end

test "call with opts and accumulator" do
transformer = Transformer.start(opts: %{add: 1})

flow = %{numbers1: [1, 2, 3, 4, 5], numbers2: [6, 7, 8, 9, 10], numbers3: [0, 0, 0, 0, 0]}

fun = fn el, acc, opts ->
{[el, acc], acc + opts[:add]}
end

flow = Transformer.call(flow, transformer, [:numbers1, :numbers2], {fun, 100})
transformer =
[:numbers1, :numbers2]
|> Transformer.new(fun, 100)
|> Transformer.start(opts: %{add: 1})

flow = %{numbers1: [1, 2, 3, 4, 5], numbers2: [6, 7, 8, 9, 10], numbers3: [0, 0, 0, 0, 0]}

flow = Transformer.call(flow, transformer)

assert Enum.sort(Enum.to_list(flow[:numbers1])) == [1, 2, 3, 4, 5, 100, 101, 102, 103, 104]
assert Enum.sort(Enum.to_list(flow[:numbers2])) == [6, 7, 8, 9, 10, 100, 101, 102, 103, 104]

0 comments on commit 01cc65d

Please sign in to comment.