Skip to content

Commit

Permalink
Fix tests, refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Jan 13, 2024
1 parent 01cc65d commit 7ffb199
Show file tree
Hide file tree
Showing 25 changed files with 5,446 additions and 5,905 deletions.
127 changes: 127 additions & 0 deletions lib/composite.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
defmodule Strom.Composite do
defstruct pid: nil,
components: []

use GenServer

@type t :: %__MODULE__{}

def start(components) when is_list(components) do
state = %__MODULE__{components: components}

{:ok, pid} = GenServer.start_link(__MODULE__, state)
__state__(pid)
end

@impl true
def init(%__MODULE__{components: components} = composite) do
components =
components
|> List.flatten()
|> build()

{:ok, %{composite | pid: self(), components: components}}
end

defp build(components) do
components
|> Enum.map(fn component ->
case component do
%Strom.Source{} = source ->
Strom.Source.start(source)

%Strom.Sink{} = sink ->
Strom.Sink.start(sink)

%Strom.Mixer{opts: opts} = mixer ->
Strom.Mixer.start(mixer, opts)

%Strom.Splitter{opts: opts} = splitter ->
Strom.Splitter.start(splitter, opts)

%Strom.Transformer{opts: opts} = transformer when is_list(opts) ->
Strom.Transformer.start(transformer, opts)

%Strom.Renamer{} = renamer ->
Strom.Renamer.start(renamer)
end
end)
end

def call(flow, %__MODULE__{pid: pid}), do: GenServer.call(pid, {:call, flow}, :infinity)

def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

@impl true
def handle_call(:__state__, _from, composite), do: {:reply, composite, composite}

def handle_call({:call, init_flow}, _from, %__MODULE__{} = composite) do
flow =
Enum.reduce(composite.components, init_flow, fn component, flow ->
case component do
%Strom.Source{} = source ->
Strom.Source.call(flow, source)

%Strom.Sink{} = sink ->
Strom.Sink.call(flow, sink)

%Strom.Mixer{} = mixer ->
Strom.Mixer.call(flow, mixer)

%Strom.Splitter{} = splitter ->
Strom.Splitter.call(flow, splitter)

%Strom.Transformer{} = transformer ->
Strom.Transformer.call(flow, transformer)

%Strom.Renamer{} = renamer ->
Strom.Renamer.call(flow, renamer)
end
end)

{:reply, flow, composite}
end

def handle_call(:stop, _from, %__MODULE__{components: components} = composite) do
Enum.each(components, fn component ->
case component do
%Strom.Source{} = source ->
Strom.Source.stop(source)

%Strom.Sink{} = sink ->
Strom.Sink.stop(sink)

%Strom.Mixer{} = mixer ->
Strom.Mixer.stop(mixer)

%Strom.Splitter{} = splitter ->
Strom.Splitter.stop(splitter)

%Strom.Transformer{} = transformer ->
Strom.Transformer.stop(transformer)

%Strom.Renamer{} = renamer ->
Strom.Renamer.stop(renamer)
end
end)

{:stop, :normal, :ok, composite}
end

@impl true
def handle_info(:continue, composite) do
{:noreply, composite}
end

def handle_info({_task_ref, :ok}, composite) do
# do nothing for now
{:noreply, composite}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, composite) do
# do nothing for now
{:noreply, composite}
end
end
26 changes: 0 additions & 26 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,30 +73,4 @@ defmodule Strom.DSL do
%Strom.Renamer{names: unquote(names)}
end
end

defmacro __using__(_opts) do
quote do
import Strom.DSL

@spec start(term) :: Strom.Flow.t()
def start(opts \\ []) do
Strom.Flow.start(__MODULE__, opts)
end

@spec call(map) :: map()
def call(flow) when is_map(flow) do
Strom.Flow.call(__MODULE__, flow)
end

@spec stop() :: :ok
def stop do
Strom.Flow.stop(__MODULE__)
end

@spec info() :: list()
def info do
Strom.Flow.info(__MODULE__)
end
end
end
end
185 changes: 0 additions & 185 deletions lib/flow.ex

This file was deleted.

12 changes: 0 additions & 12 deletions lib/flow_supervsor.ex

This file was deleted.

1 change: 0 additions & 1 deletion lib/gen_mix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ defmodule Strom.GenMix do
{:reply, flow, %{mix | running: true, producers: producers}}
end


def handle_call(:stop, _from, %__MODULE__{} = mix) do
{:stop, :normal, :ok, %{mix | running: false}}
end
Expand Down
20 changes: 10 additions & 10 deletions lib/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,6 @@ defmodule Strom.Sink do

def call(%__MODULE__{pid: pid}, data), do: GenServer.call(pid, {:call, data})

def stop(%__MODULE__{origin: origin, pid: pid, sup_pid: sup_pid}) do
apply(origin.__struct__, :stop, [origin])

if sup_pid do
:ok
else
GenServer.call(pid, :stop)
end
end

def call(flow, %__MODULE__{name: name, sync: sync} = sink) when is_map(flow) do
stream = Map.fetch!(flow, name)

Expand All @@ -71,6 +61,16 @@ defmodule Strom.Sink do
Map.delete(flow, name)
end

def stop(%__MODULE__{origin: origin, pid: pid, sup_pid: sup_pid}) do
apply(origin.__struct__, :stop, [origin])

if sup_pid do
:ok
else
GenServer.call(pid, :stop)
end
end

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

@impl true
Expand Down
Loading

0 comments on commit 7ffb199

Please sign in to comment.