Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FlowSupervisor #2

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
- Proper supervision tree!!!
- Module behaviour
- restart strategy
- tick source with timeout
- remove chunk_by, introduce buffer
- explicit "map" and "reduce" with function/1,2, and "transform" as module
22 changes: 1 addition & 21 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Strom.DSL do
end
end

defmacro transform(inputs, function, acc, opts) do
defmacro transform(inputs, function, acc \\ nil, opts \\ []) do
quote do
%Strom.DSL.Transform{
function: unquote(function),
Expand All @@ -78,26 +78,6 @@ defmodule Strom.DSL do
end
end

defmacro transform(inputs, function, acc) do
quote do
%Strom.DSL.Transform{
function: unquote(function),
acc: unquote(acc),
inputs: unquote(inputs)
}
end
end

defmacro transform(inputs, function) do
quote do
%Strom.DSL.Transform{
function: unquote(function),
acc: nil,
inputs: unquote(inputs)
}
end
end

defmacro from(module, opts \\ []) do
quote do
unless is_atom(unquote(module)) do
Expand Down
78 changes: 58 additions & 20 deletions lib/flow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,92 @@ defmodule Strom.Flow do
defstruct pid: nil,
name: nil,
module: nil,
sup_pid: nil,
opts: [],
topology: []

use GenServer
alias Strom.DSL
alias Strom.FlowSupervisor

@type t :: %__MODULE__{}

# TODO Supervisor
def start(flow_module, opts \\ []) when is_atom(flow_module) do
state = %__MODULE__{name: flow_module, module: flow_module, opts: opts}

{:ok, pid} = GenServer.start_link(__MODULE__, state, name: flow_module)
strom_sup_pid = Process.whereis(Strom.DynamicSupervisor)

pid =
case DynamicSupervisor.start_child(
strom_sup_pid,
%{
id: __MODULE__,
start:
{__MODULE__, :start_link,
[%__MODULE__{name: flow_module, module: flow_module, opts: opts}]},
restart: :transient
}
) do
{:ok, pid} ->
pid

{:error, {:already_started, pid}} ->
pid
end

__state__(pid)
end

def start_link(%__MODULE__{} = flow) do
GenServer.start_link(__MODULE__, flow, name: flow.name)
end

@impl true
def init(%__MODULE__{module: module} = state) do
def init(%__MODULE__{module: module} = flow) do
sup_pid = start_flow_supervisor(flow.name)

topology =
state.opts
flow.opts
|> module.topology()
|> List.flatten()
|> build()
|> build(self(), sup_pid)

{:ok, %{state | pid: self(), topology: topology}}
{:ok, %{flow | pid: self(), sup_pid: sup_pid, topology: topology}}
end

defp build(components) do
defp start_flow_supervisor(name) do
sup_pid =
case FlowSupervisor.start_link(%{name: :"#{name}_Supervisor"}) do
{:ok, pid} -> pid
{:error, {:already_started, pid}} -> pid
end

Process.unlink(sup_pid)
Process.monitor(sup_pid)
sup_pid
end

defp build(components, flow_pid, sup_pid) do
components
|> Enum.map(fn component ->
case component do
%DSL.Source{origin: origin} = source ->
%{source | source: Strom.Source.start(origin)}
src = %Strom.Source{origin: origin, flow_pid: flow_pid, sup_pid: sup_pid}
%{source | source: Strom.Source.start(src)}

%DSL.Sink{origin: origin} = sink ->
%{sink | sink: Strom.Sink.start(origin)}
snk = %Strom.Sink{origin: origin, flow_pid: flow_pid, sup_pid: sup_pid}
%{sink | sink: Strom.Sink.start(snk)}

%DSL.Mix{opts: opts} = mix ->
%{mix | mixer: Strom.Mixer.start(opts)}
mixer = %Strom.Mixer{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
%{mix | mixer: Strom.Mixer.start(mixer)}

%DSL.Split{opts: opts} = split ->
%{split | splitter: Strom.Splitter.start(opts)}

%DSL.Transform{opts: nil} = transform ->
%{transform | transformer: Strom.Transformer.start()}
splitter = %Strom.Splitter{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
%{split | splitter: Strom.Splitter.start(splitter)}

%DSL.Transform{opts: opts} = transform when is_list(opts) ->
%{transform | transformer: Strom.Transformer.start(opts)}
transformer = %Strom.Transformer{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
%{transform | transformer: Strom.Transformer.start(transformer)}

%DSL.Rename{names: names} = ren ->
rename = Strom.Renamer.start(names)
Expand Down Expand Up @@ -104,8 +141,8 @@ defmodule Strom.Flow do
{:reply, flow, state}
end

def handle_call(:stop, _from, %__MODULE__{} = state) do
state.topology
def handle_call(:stop, _from, %__MODULE__{} = flow) do
flow.topology
|> Enum.each(fn component ->
case component do
%DSL.Source{source: source} ->
Expand All @@ -125,7 +162,8 @@ defmodule Strom.Flow do
end
end)

{:stop, :normal, :ok, state}
Supervisor.stop(flow.sup_pid)
{:stop, :normal, :ok, flow}
end

@impl true
Expand Down
12 changes: 12 additions & 0 deletions lib/flow_supervsor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Strom.FlowSupervisor do
use DynamicSupervisor

def start_link(state) do
DynamicSupervisor.start_link(__MODULE__, state, name: state[:name])
end

@impl true
def init(_state) do
DynamicSupervisor.init(strategy: :one_for_one, max_restarts: 3, max_seconds: 5)
end
end
30 changes: 27 additions & 3 deletions lib/gen_mix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,29 @@ defmodule Strom.GenMix do
@buffer 1000

defstruct pid: nil,
opts: [],
flow_pid: nil,
sup_pid: nil,
running: false,
buffer: @buffer,
producers: %{},
consumers: %{}

alias Strom.GenMix.Consumer

# TODO supervisor
def start(opts \\ []) when is_list(opts) do
def start(opts \\ [])

def start(%__MODULE__{opts: opts} = gen_mix) when is_list(opts) do
gen_mix = %{
gen_mix
| buffer: Keyword.get(opts, :buffer, @buffer)
}

{:ok, pid} = DynamicSupervisor.start_child(gen_mix.sup_pid, {__MODULE__, gen_mix})
__state__(pid)
end

def start(opts) when is_list(opts) do
state = %__MODULE__{
buffer: Keyword.get(opts, :buffer, @buffer)
}
Expand All @@ -21,6 +35,10 @@ defmodule Strom.GenMix do
__state__(pid)
end

def start_link(%__MODULE__{} = state) do
GenServer.start_link(__MODULE__, state)
end

@impl true
def init(%__MODULE__{} = mix) do
{:ok, %{mix | pid: self()}}
Expand Down Expand Up @@ -49,7 +67,13 @@ defmodule Strom.GenMix do
|> Map.merge(sub_flow)
end

def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)
def stop(%__MODULE__{pid: pid, sup_pid: sup_pid}) do
if sup_pid do
:ok
else
GenServer.call(pid, :stop)
end
end

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

Expand Down
2 changes: 1 addition & 1 deletion lib/gen_mix/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule Strom.GenMix.Consumer do
GenServer.call(cons.pid, :register_client)
end,
fn cons ->
case GenServer.call(cons.pid, :get_data) do
case GenServer.call(cons.pid, :get_data, :infinity) do
{:ok, data} ->
if length(data) == 0 do
receive do
Expand Down
11 changes: 10 additions & 1 deletion lib/mixer.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
defmodule Strom.Mixer do
alias Strom.GenMix

def start(opts \\ []) when is_list(opts) do
defstruct [:opts, :flow_pid, :sup_pid]

def start(args \\ [])

def start(%__MODULE__{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}) do
gen_mix = %GenMix{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
GenMix.start(gen_mix)
end

def start(opts) when is_list(opts) do
GenMix.start(opts)
end

Expand Down
23 changes: 21 additions & 2 deletions lib/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ defmodule Strom.Sink do

use GenServer

defstruct [:origin, :pid]
defstruct [:origin, :pid, :flow_pid, :sup_pid]

def start(%__MODULE__{origin: origin} = sink) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
sink = %{sink | origin: origin}
{:ok, pid} = DynamicSupervisor.start_child(sink.sup_pid, {__MODULE__, sink})
__state__(pid)
end

def start(origin) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
Expand All @@ -15,12 +22,24 @@ defmodule Strom.Sink do
__state__(pid)
end

def start_link(%__MODULE__{} = state) do
GenServer.start_link(__MODULE__, state)
end

@impl true
def init(%__MODULE__{} = state), do: {:ok, %{state | pid: self()}}

def call(%__MODULE__{pid: pid}, data), do: GenServer.call(pid, {:call, data})

def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)
def stop(%__MODULE__{origin: origin, pid: pid, sup_pid: sup_pid}) do
apply(origin.__struct__, :stop, [origin])

if sup_pid do
:ok
else
GenServer.call(pid, :stop)
end
end

def call(flow, sink, names, sync \\ false)

Expand Down
2 changes: 1 addition & 1 deletion lib/sink/io_puts.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Strom.Sink.IOPuts do
@behaviour Strom.Sink

defstruct line_sep: "\n", prefix: ""
defstruct line_sep: "", prefix: ""

@impl true
def start(%__MODULE__{} = io_puts), do: io_puts
Expand Down
27 changes: 25 additions & 2 deletions lib/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@ defmodule Strom.Source do

use GenServer

defstruct [:origin, :pid]
defstruct [:origin, :pid, :flow_pid, :sup_pid]

def start(%__MODULE__{origin: list} = source) when is_list(list) do
start(%{source | origin: %Strom.Source.Events{events: list}})
end

def start(%__MODULE__{origin: origin} = source) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
source = %{source | origin: origin}
{:ok, pid} = DynamicSupervisor.start_child(source.sup_pid, {__MODULE__, source})
__state__(pid)
end

def start(list) when is_list(list) do
start(%Strom.Source.Events{events: list})
Expand All @@ -20,14 +31,26 @@ defmodule Strom.Source do
__state__(pid)
end

def start_link(%__MODULE__{} = state) do
GenServer.start_link(__MODULE__, state)
end

@impl true
def init(%__MODULE__{} = state), do: {:ok, %{state | pid: self()}}

def call(%__MODULE__{pid: pid}), do: GenServer.call(pid, :call, :infinity)

def infinite?(%__MODULE__{pid: pid}), do: GenServer.call(pid, :infinite)

def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)
def stop(%__MODULE__{origin: origin, pid: pid, sup_pid: sup_pid}) do
apply(origin.__struct__, :stop, [origin])

if sup_pid do
:ok
else
GenServer.call(pid, :stop)
end
end

def call(flow, %__MODULE__{} = source, names) when is_map(flow) and is_list(names) do
sub_flow =
Expand Down
11 changes: 10 additions & 1 deletion lib/splitter.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
defmodule Strom.Splitter do
alias Strom.GenMix

def start(opts \\ []) when is_list(opts) do
defstruct [:opts, :flow_pid, :sup_pid]

def start(args \\ [])

def start(%__MODULE__{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}) do
gen_mix = %GenMix{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
GenMix.start(gen_mix)
end

def start(opts) when is_list(opts) do
GenMix.start(opts)
end

Expand Down
Loading
Loading