Skip to content

Commit

Permalink
Merge pull request #1 from antonmi/new_approach
Browse files Browse the repository at this point in the history
New approach
  • Loading branch information
antonmi authored Jan 5, 2024
2 parents d5ec583 + ede8090 commit c5575a7
Show file tree
Hide file tree
Showing 29 changed files with 937 additions and 1,397 deletions.
31 changes: 19 additions & 12 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,8 @@ defmodule Strom.DSL do
defstruct splitter: nil, opts: [], input: nil, partitions: %{}
end

defmodule Function do
defstruct function: nil, opts: [], inputs: []
end

defmodule Module do
defstruct module: nil, opts: [], inputs: [], state: nil
defmodule Transform do
defstruct function: nil, acc: nil, opts: nil, inputs: [], call: nil
end

defmodule Rename do
Expand Down Expand Up @@ -71,21 +67,32 @@ defmodule Strom.DSL do
end
end

defmacro function(inputs, function, opts \\ []) do
defmacro transform(inputs, function, acc, opts) do
quote do
%Strom.DSL.Function{
%Strom.DSL.Transform{
function: unquote(function),
acc: unquote(acc),
opts: unquote(opts),
inputs: unquote(inputs)
}
end
end

defmacro module(inputs, module, opts \\ []) do
defmacro transform(inputs, function, acc) do
quote do
%Strom.DSL.Module{
module: unquote(module),
opts: unquote(opts),
%Strom.DSL.Transform{
function: unquote(function),
acc: unquote(acc),
inputs: unquote(inputs)
}
end
end

defmacro transform(inputs, function) do
quote do
%Strom.DSL.Transform{
function: unquote(function),
acc: nil,
inputs: unquote(inputs)
}
end
Expand Down
43 changes: 22 additions & 21 deletions lib/flow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,14 @@ defmodule Strom.Flow do
%DSL.Splitter{opts: opts} = splitter ->
%{splitter | splitter: Strom.Splitter.start(opts)}

%DSL.Function{function: function, opts: opts} = fun ->
%{fun | function: Strom.Function.start(function, opts)}
%DSL.Transform{opts: nil} = fun ->
%{fun | call: Strom.Transformer.start()}

%DSL.Module{module: module, opts: opts} = mod ->
module = Strom.Module.start(module, opts)
%{mod | module: module}
%DSL.Transform{opts: opts} = fun when is_list(opts) ->
%{fun | call: Strom.Transformer.start(opts)}

%DSL.Rename{names: names} = ren ->
rename = Strom.Rename.start(names)
rename = Strom.Renamer.start(names)
%{ren | rename: rename}
end
end)
Expand Down Expand Up @@ -90,14 +89,15 @@ defmodule Strom.Flow do
%DSL.Splitter{splitter: splitter, input: input, partitions: partitions} ->
Strom.Splitter.call(flow, splitter, input, partitions)

%DSL.Function{function: function, inputs: inputs} ->
Strom.Function.call(flow, function, inputs)

%DSL.Module{module: module, inputs: inputs} ->
Strom.Module.call(flow, module, inputs)
%DSL.Transform{call: call, function: function, acc: acc, inputs: inputs} ->
if is_function(function, 1) do
Strom.Transformer.call(flow, call, inputs, function)
else
Strom.Transformer.call(flow, call, inputs, {function, acc})
end

%DSL.Rename{rename: rename, names: names} ->
Strom.Rename.call(flow, rename, names)
Strom.Renamer.call(flow, rename, names)
end
end)

Expand All @@ -120,25 +120,26 @@ defmodule Strom.Flow do
%DSL.Splitter{splitter: splitter} ->
Strom.Splitter.stop(splitter)

%DSL.Function{function: function} ->
Strom.Function.stop(function)

%DSL.Module{module: module} ->
Strom.Module.stop(module)
%DSL.Transform{call: call} ->
Strom.Transformer.stop(call)
end
end)

{:stop, :normal, :ok, state}
end

@impl true
def handle_info({_task_ref, :ok}, mixer) do
def handle_info(:continue, flow) do
{:noreply, flow}
end

def handle_info({_task_ref, :ok}, flow) do
# do nothing for now
{:noreply, mixer}
{:noreply, flow}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, mixer) do
def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, flow) do
# do nothing for now
{:noreply, mixer}
{:noreply, flow}
end
end
59 changes: 0 additions & 59 deletions lib/function.ex

This file was deleted.

149 changes: 149 additions & 0 deletions lib/gen_mix.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
defmodule Strom.GenMix do
use GenServer

@buffer 1000

defstruct pid: nil,
running: false,
buffer: @buffer,
producers: %{},
consumers: %{}

alias Strom.GenMix.Consumer

# TODO supervisor
def start(opts \\ []) when is_list(opts) do
state = %__MODULE__{
buffer: Keyword.get(opts, :buffer, @buffer)
}

{:ok, pid} = GenServer.start_link(__MODULE__, state)
__state__(pid)
end

@impl true
def init(%__MODULE__{} = mix) do
{:ok, %{mix | pid: self()}}
end

def call(flow, %__MODULE__{} = mix, inputs, outputs)
when is_map(flow) and is_map(inputs) and is_map(outputs) do
input_streams =
Enum.reduce(inputs, %{}, fn {name, fun}, acc ->
Map.put(acc, {name, fun}, Map.fetch!(flow, name))
end)

sub_flow =
outputs
|> Enum.reduce(%{}, fn {name, fun}, flow ->
consumer = Consumer.start({name, fun}, mix.pid)
:ok = GenServer.call(mix.pid, {:register_consumer, {{name, fun}, consumer}})
stream = Consumer.call(consumer)
Map.put(flow, name, stream)
end)

:ok = GenServer.call(mix.pid, {:run_inputs, input_streams})

flow
|> Map.drop(Map.keys(inputs))
|> Map.merge(sub_flow)
end

def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

defp run_inputs(streams, pid, buffer) do
Enum.reduce(streams, %{}, fn {{name, fun}, stream}, acc ->
task = async_run_stream({name, fun}, stream, buffer, pid)
Map.put(acc, {name, fun}, task)
end)
end

defp async_run_stream({name, fun}, stream, buffer, pid) do
Task.async(fn ->
stream
|> Stream.chunk_every(buffer)
|> Stream.each(fn chunk ->
{chunk, _} = Enum.split_with(chunk, fun)
GenServer.cast(pid, {:new_data, {name, fun}, chunk})

receive do
:continue ->
flush()
end
end)
|> Stream.run()

GenServer.cast(pid, {:done, {name, fun}})
end)
end

defp flush do
receive do
_ -> flush()
after
0 -> :ok
end
end

@impl true
def handle_call({:run_inputs, streams_to_mix}, _from, %__MODULE__{} = mix) do
producers = run_inputs(streams_to_mix, mix.pid, mix.buffer)

{:reply, :ok, %{mix | running: true, producers: producers}}
end

def handle_call({:register_consumer, {{name, fun}, cons}}, _from, %__MODULE__{} = mix) do
mix = %{mix | consumers: Map.put(mix.consumers, {name, fun}, cons)}
{:reply, :ok, mix}
end

def handle_call(:stop, _from, %__MODULE__{} = mix) do
{:stop, :normal, :ok, %{mix | running: false}}
end

def handle_call(:__state__, _from, mix), do: {:reply, mix, mix}

@impl true
def handle_cast({:new_data, {_name, _fun}, chunk}, %__MODULE__{} = mix) do
Enum.each(mix.consumers, fn {_, cons} ->
GenServer.cast(cons.pid, {:put_data, chunk})
GenServer.cast(cons.pid, :continue)
end)

{:noreply, mix}
end

def handle_cast({:done, {name, fun}}, %__MODULE__{} = mix) do
mix = %{mix | producers: Map.delete(mix.producers, {name, fun})}

if map_size(mix.producers) == 0 do
Enum.each(mix.consumers, fn {_, cons} ->
GenServer.cast(cons.pid, :continue)
GenServer.cast(cons.pid, :stop)
end)
end

{:noreply, mix}
end

def handle_cast({:consumer_got_data, {_name, _fun}}, %__MODULE__{} = mix) do
Enum.each(mix.producers, fn {_, task} ->
send(task.pid, :continue)
end)

{:noreply, mix}
end

@impl true
def handle_info({_task_ref, :ok}, mix) do
# do nothing for now
{:noreply, mix}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, mix) do
# do nothing for now
{:noreply, mix}
end
end
Loading

0 comments on commit c5575a7

Please sign in to comment.