Skip to content

Commit

Permalink
Refactor Flow
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Dec 2, 2023
1 parent af6aec6 commit 852964c
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 131 deletions.
13 changes: 7 additions & 6 deletions lib/builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,27 @@ defmodule Strom.Builder do
case component do
%DSL.Source{origin: origin} ->
source = Strom.Source.start(origin)
Flow.add_component(flow_pid, {:source, source})
Flow.add_component(flow_pid, source)
Strom.Source.stream(source)

%DSL.Sink{origin: origin} ->
sink = Strom.Sink.start(origin)
Flow.add_component(flow_pid, {:sink, sink})
Flow.add_component(flow_pid, sink)
Strom.Sink.stream(stream, sink)

%DSL.Mixer{sources: sources} ->
sources = Enum.map(sources, &do_build(&1, nil, flow_pid))
mixer = Strom.Mixer.start(sources)
Flow.add_component(flow_pid, {:mixer, mixer})
Flow.add_component(flow_pid, mixer)
Strom.Mixer.stream(mixer)

%DSL.Function{function: function} ->
function.(stream)

%DSL.Module{module: module, opts: opts} ->
%DSL.Module{module: module, opts: opts} = mod ->
state = apply(module, :start, [opts])
Flow.add_component(flow_pid, {:module, {module, state}})
mod = %{mod | state: state}
Flow.add_component(flow_pid, mod)

if DSL.Module.is_pipeline_module?(module) do
apply(module, :stream, [stream])
Expand All @@ -43,7 +44,7 @@ defmodule Strom.Builder do
partitions = Map.keys(branches)

splitter = Strom.Splitter.start(stream, partitions)
Flow.add_component(flow_pid, {:splitter, splitter})
Flow.add_component(flow_pid, splitter)

splitter
|> Strom.Splitter.stream()
Expand Down
2 changes: 1 addition & 1 deletion lib/dsl.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Strom.DSL do
defmodule Module do
defstruct module: nil, opts: []
defstruct module: nil, opts: [], state: nil

def is_pipeline_module?(module) when is_atom(module) do
is_list(module.alf_components())
Expand Down
20 changes: 10 additions & 10 deletions lib/flow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ defmodule Strom.Flow do

def add_stream(pid, stream), do: GenServer.call(pid, {:add_stream, stream})

def add_component(pid, {type, component}),
do: GenServer.call(pid, {:add_component, {type, component}})
def add_component(pid, component),
do: GenServer.call(pid, {:add_component, component})

def run(flow_module), do: GenServer.call(flow_module, :run)

Expand All @@ -46,22 +46,22 @@ defmodule Strom.Flow do
{:reply, :ok, state}
end

def handle_call({:add_component, {type, component}}, _from, %__MODULE__{} = state) do
def handle_call({:add_component, component}, _from, %__MODULE__{} = state) do
state =
case type do
:mixer ->
case component do
%Strom.Mixer{} ->
%{state | mixers: [component | state.mixers]}

:splitter ->
%Strom.Splitter{} ->
%{state | splitters: [component | state.splitters]}

:source ->
%Strom.Source{} ->
%{state | sources: [component | state.sources]}

:sink ->
%Strom.Sink{} ->
%{state | sinks: [component | state.sinks]}

:module ->
%Strom.DSL.Module{} ->
%{state | modules: [component | state.modules]}
end

Expand All @@ -86,7 +86,7 @@ defmodule Strom.Flow do
Enum.each(state.mixers, & &1.__struct__.stop(&1))
Enum.each(state.splitters, & &1.__struct__.stop(&1))

Enum.each(state.modules, fn {module, state} ->
Enum.each(state.modules, fn %{module: module, state: state} ->
if Strom.DSL.Module.is_pipeline_module?(module) do
apply(module, :stop, [])
else
Expand Down
4 changes: 2 additions & 2 deletions test/data/odd.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
3
5
11
21
31
41
51
3
5
107 changes: 0 additions & 107 deletions test/data/output.csv
Original file line number Diff line number Diff line change
@@ -1,107 +0,0 @@
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
14 changes: 9 additions & 5 deletions test/flow_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,16 @@ defmodule Strom.FlowTest do
name: Strom.FlowTest.MyFlow,
streams: [stream1, _stream2],
modules: [
"Elixir.Strom.FlowTest.MyFlow.ToString": %Strom.FlowTest.MyFlow.ToString{
state: nil
%Strom.DSL.Module{
module: Strom.FlowTest.MyFlow.ToString,
opts: [],
state: %Strom.FlowTest.MyFlow.ToString{state: nil}
},
"Elixir.Strom.FlowTest.Pipeline": :ok,
"Elixir.Strom.FlowTest.MyFlow.MyModule": %Strom.FlowTest.MyFlow.MyModule{
state: nil
%Strom.DSL.Module{module: Strom.FlowTest.Pipeline, opts: [], state: :ok},
%Strom.DSL.Module{
module: Strom.FlowTest.MyFlow.MyModule,
opts: [],
state: %Strom.FlowTest.MyFlow.MyModule{state: nil}
}
],
sources: [
Expand Down

0 comments on commit 852964c

Please sign in to comment.