Skip to content

Commit

Permalink
Simplify and unify components (#6)
Browse files Browse the repository at this point in the history
* Delete test data

* Simplify gen_mix and transformer

* Use Task.Supervisor in Transformer

* Crash tests for Transformer, Mixer and Splitter

* Move back test files

* Move back test files

* Crashes in source and sink

* Properly handle of tasks

* Fix flaky tests

* More todos
  • Loading branch information
antonmi authored Apr 24, 2024
1 parent c187a5f commit 25cd403
Show file tree
Hide file tree
Showing 22 changed files with 1,229 additions and 814 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ strom-*.tar
/tmp/

/test/examples/parcels/*.csv
/test/data/*.csv
/test_data

.DS_Store
11 changes: 5 additions & 6 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
- restart strategy
- renamer can be just a simple module
- composite component
- rename from to topology
- Different words for flow as data-structure and Flow module (maybe Topology with the call function)
- tick source with timeout
- loop for sink and source
- :__all__ for calling components on all streams in flow
- source and sink parameterization (opts)
- source and sink parameterization (opts)
- https://complexevents.com/2020/06/15/whats-the-difference-between-esp-and-cep-2/
- https://getindata.com/blog/lesson-learned-apache-flink-complex-event-processing/
- https://www.researchgate.net/publication/343994577_Complex_Event_Processing_for_the_Internet_of_Things
4 changes: 0 additions & 4 deletions lib/composite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,6 @@ defmodule Strom.Composite do
end

@impl true
def handle_info(:continue, composite) do
{:noreply, composite}
end

def handle_info({_task_ref, :ok}, composite) do
# do nothing for now
{:noreply, composite}
Expand Down
4 changes: 2 additions & 2 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ defmodule Strom.DSL do
"""
alias Strom.{Transformer, Mixer, Renamer, Sink, Source, Splitter}

defmacro source(name, origin) do
defmacro source(name, origin, opts \\ []) do
quote do
Source.new(unquote(name), unquote(origin))
Source.new(unquote(name), unquote(origin), unquote(opts))
end
end

Expand Down
167 changes: 112 additions & 55 deletions lib/gen_mix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@ defmodule Strom.GenMix do
use GenServer

@chunk 1
@buffer 1000

defstruct pid: nil,
inputs: [],
outputs: [],
opts: [],
running: false,
chunk: @chunk,
producers: %{},
consumers: %{}

alias Strom.GenMix.Consumer
buffer: @buffer,
input_streams: %{},
tasks: %{},
data: %{},
waiting_clients: %{}

def start(%__MODULE__{opts: opts} = gen_mix) when is_list(opts) do
gen_mix = %{
gen_mix
| chunk: Keyword.get(opts, :chunk, @chunk)
| chunk: Keyword.get(opts, :chunk, @chunk),
buffer: Keyword.get(opts, :buffer, @buffer)
}

start_link(gen_mix)
Expand All @@ -44,36 +46,43 @@ defmodule Strom.GenMix do
GenServer.call(pid, :stop)
end

defp run_inputs(streams, pid, chunk) do
defp run_inputs(streams, mix) do
Enum.reduce(streams, %{}, fn {{name, fun}, stream}, acc ->
task = async_run_stream({name, fun}, stream, chunk, pid)
Map.put(acc, {name, fun}, task)
task = async_run_stream({name, fun}, stream, mix)
Map.put(acc, name, task)
end)
end

defp async_run_stream({name, fun}, stream, chunk, pid) do
Task.async(fn ->
defp async_run_stream({name, fun}, stream, mix) do
Task.Supervisor.async_nolink(Strom.TaskSupervisor, fn ->
stream
|> Stream.chunk_every(chunk)
|> Stream.chunk_every(mix.chunk)
|> Stream.each(fn chunk ->
{chunk, _} = Enum.split_with(chunk, fun)
GenServer.cast(pid, {:new_data, {name, fun}, chunk})

new_data =
Enum.reduce(mix.outputs, %{}, fn {name, fun}, acc ->
{data, _} = Enum.split_with(chunk, fun)
Map.put(acc, name, data)
end)

GenServer.cast(mix.pid, {:new_data, name, new_data})

receive do
:continue ->
flush()
:continue_task ->
flush(:continue_task)
end
end)
|> Stream.run()

GenServer.cast(pid, {:done, {name, fun}})
{:task_done, name}
end)
end

defp flush do
defp flush(message) do
receive do
:continue ->
flush()
^message ->
flush(message)
after
0 -> :ok
end
Expand All @@ -86,70 +95,118 @@ defmodule Strom.GenMix do
Map.put(acc, {name, fun}, Map.fetch!(flow, name))
end)

{sub_flow, mix} =
mix.outputs
|> Enum.reduce({%{}, mix}, fn {name, fun}, {flow, mix} ->
consumer = Consumer.start({name, fun}, mix.pid)

mix = %{mix | consumers: Map.put(mix.consumers, {name, fun}, consumer)}
tasks = run_inputs(input_streams, mix)

stream = Consumer.call(consumer)
{Map.put(flow, name, stream), mix}
sub_flow =
mix.outputs
|> Enum.reduce(%{}, fn {name, _fun}, flow ->
stream =
Stream.resource(
fn ->
nil
end,
fn nil ->
case GenServer.call(mix.pid, {:get_data, name}, :infinity) do
{:data, data} ->
{data, nil}

:done ->
{:halt, nil}

:pause ->
receive do
:continue_client ->
flush(:continue_client)
{[], nil}
end
end
end,
fn nil -> nil end
)

Map.put(flow, name, stream)
end)

producers = run_inputs(input_streams, mix.pid, mix.chunk)

flow =
flow
|> Map.drop(Map.keys(mix.inputs))
|> Map.merge(sub_flow)

{:reply, flow, %{mix | running: true, producers: producers}}
{:reply, flow, %{mix | tasks: tasks, input_streams: input_streams}}
end

def handle_call(:stop, _from, %__MODULE__{} = mix) do
{:stop, :normal, :ok, %{mix | running: false}}
{:stop, :normal, :ok, mix}
end

@impl true
def handle_cast({:new_data, {_name, _fun}, chunk}, %__MODULE__{} = mix) do
Enum.each(mix.consumers, fn {_, cons} ->
GenServer.cast(cons.pid, {:put_data, chunk})
GenServer.cast(cons.pid, :continue)
end)

{:noreply, mix}
end
def handle_call({:get_data, name}, {pid, _ref}, mix) do
data = Map.get(mix.data, name, [])
mix = %{mix | data: Map.put(mix.data, name, [])}

def handle_cast({:done, {name, fun}}, %__MODULE__{} = mix) do
mix = %{mix | producers: Map.delete(mix.producers, {name, fun})}
total_count = Enum.reduce(mix.data, 0, fn {_name, data}, count -> count + length(data) end)

if map_size(mix.producers) == 0 do
Enum.each(mix.consumers, fn {_, cons} ->
GenServer.cast(cons.pid, :continue)
GenServer.cast(cons.pid, :stop)
end)
if total_count <= mix.buffer do
Enum.each(mix.tasks, fn {_, task} -> send(task.pid, :continue_task) end)
end

{:noreply, mix}
cond do
length(data) == 0 and map_size(mix.tasks) == 0 ->
{:reply, :done, mix}

length(data) == 0 ->
waiting_clients = Map.put(mix.waiting_clients, name, pid)
{:reply, :pause, %{mix | waiting_clients: waiting_clients}}

true ->
{:reply, {:data, data}, mix}
end
end

def handle_cast({:consumer_got_data, {_name, _fun}}, %__MODULE__{} = mix) do
Enum.each(mix.producers, fn {_, task} ->
send(task.pid, :continue)
@impl true
def handle_cast({:new_data, name, new_data}, %__MODULE__{} = mix) do
{all_mix_data, total_count} =
Enum.reduce(new_data, {mix.data, 0}, fn {name, data}, {all_mix_data, count} ->
prev_data = Map.get(all_mix_data, name, [])
all_data = prev_data ++ data
{Map.put(all_mix_data, name, all_data), count + length(all_data)}
end)

Enum.each(mix.waiting_clients, fn {_name, client_pid} ->
send(client_pid, :continue_client)
end)

if total_count < mix.buffer do
task = Map.fetch!(mix.tasks, name)
send(task.pid, :continue_task)
end

mix = %{mix | data: all_mix_data, waiting_clients: %{}}

{:noreply, mix}
end

@impl true
def handle_info({_task_ref, :ok}, mix) do
# do nothing for now
{:noreply, mix}
def handle_info({_task_ref, {:task_done, name}}, mix) do
mix = %{mix | tasks: Map.delete(mix.tasks, name)}

Enum.each(mix.waiting_clients, fn {_name, client_pid} ->
send(client_pid, :continue_client)
end)

{:noreply, %{mix | waiting_clients: %{}}}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, mix) do
# do nothing for now
# do nothing
{:noreply, mix}
end

def handle_info({:DOWN, _task_ref, :process, task_pid, _not_normal}, mix) do
{name, _task} = Enum.find(mix.tasks, fn {_name, task} -> task.pid == task_pid end)
{{^name, function}, stream} = Enum.find(mix.input_streams, fn {{n, _}, _} -> n == name end)
new_task = async_run_stream({name, function}, stream, mix)
tasks = Map.put(mix.tasks, name, new_task)

{:noreply, %{mix | tasks: tasks}}
end
end
Loading

0 comments on commit 25cd403

Please sign in to comment.