Skip to content

Commit

Permalink
No chunks for now
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Dec 14, 2023
1 parent e441f8c commit a3f1ff0
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 270 deletions.
45 changes: 21 additions & 24 deletions lib/mixer.ex
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
defmodule Strom.Mixer do
use GenServer

@chunk_every 100
@buffer 1000

defstruct streams: %{},
pid: nil,
running: false,
data: %{},
chunk_every: @chunk_every,
buffer: @buffer,
no_data_counter: 0

def start(opts \\ []) when is_list(opts) do
state = %__MODULE__{
chunk_every: Keyword.get(opts, :chunk_every, @chunk_every)
buffer: Keyword.get(opts, :buffer, @buffer)
}

{:ok, pid} = GenServer.start_link(__MODULE__, state)
Expand Down Expand Up @@ -46,11 +46,7 @@ defmodule Strom.Mixer do
fn mixer ->
case GenServer.call(mixer.pid, :get_data) do
{:ok, {data, no_data_counter}} ->
if no_data_counter > 0 do
to_sleep = trunc(:math.pow(2, no_data_counter))
Process.sleep(to_sleep)
end

maybe_wait(no_data_counter, 0)
{data, mixer}

{:error, :done} ->
Expand All @@ -69,45 +65,46 @@ defmodule Strom.Mixer do

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

defp run_streams(streams, pid, chunk_every) do
defp run_streams(streams, pid, buffer) do
Enum.map(streams, fn {{name, fun}, stream} ->
async_run_stream({name, fun}, stream, chunk_every, pid)
async_run_stream({name, fun}, stream, buffer, pid)
end)
end

defp async_run_stream({name, fun}, stream, chunk_every, pid) do
defp async_run_stream({name, fun}, stream, buffer, pid) do
Task.async(fn ->
stream
|> Stream.chunk_every(chunk_every)
|> Stream.each(fn chunk ->
{chunk, _} = Enum.split_with(chunk, fun)
data_length = GenServer.call(pid, {:new_data, {name, fun}, chunk})
maybe_wait(data_length, chunk_every)
|> Stream.each(fn el ->
if fun.(el) do
data_length = GenServer.call(pid, {:new_data, {name, fun}, el})
maybe_wait(data_length, buffer)
end
end)
|> Stream.run()

GenServer.call(pid, {:done, {name, fun}})
end)
end

defp maybe_wait(data_length, chunk_every) do
if data_length > 10 * chunk_every do
div = div(data_length, 10 * chunk_every)
to_sleep = trunc(:math.pow(2, div))
defp maybe_wait(current, allowed) do
if current > allowed do
diff = current - allowed
to_sleep = trunc(:math.pow(2, diff))
Process.sleep(to_sleep)
to_sleep
end
end

def handle_call({:new_data, {name, fun}, data}, _from, %__MODULE__{data: prev_data} = mixer) do
def handle_call({:new_data, {name, fun}, datum}, _from, %__MODULE__{data: prev_data} = mixer) do
prev_data_from_stream = Map.get(prev_data, {name, fun}, [])
data_from_stream = prev_data_from_stream ++ data
data_from_stream = [datum | prev_data_from_stream]
data = Map.put(prev_data, {name, fun}, data_from_stream)

{:reply, length(data_from_stream), %{mixer | data: data}}
end

def handle_call({:run_streams, streams_to_mix}, _from, %__MODULE__{} = mixer) do
run_streams(streams_to_mix, mixer.pid, mixer.chunk_every)
run_streams(streams_to_mix, mixer.pid, mixer.buffer)

{:reply, :ok, %{mixer | running: true, streams: streams_to_mix}}
end
Expand All @@ -118,7 +115,7 @@ defmodule Strom.Mixer do
end

def handle_call(:get_data, _from, %__MODULE__{data: data, streams: streams} = mixer) do
all_data = Enum.reduce(data, [], fn {_, d}, acc -> acc ++ d end)
all_data = Enum.reduce(data, [], fn {_, d}, acc -> acc ++ Enum.reverse(d) end)

if length(all_data) == 0 && map_size(streams) == 0 do
{:reply, {:error, :done}, mixer}
Expand Down
50 changes: 23 additions & 27 deletions lib/splitter.ex
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
defmodule Strom.Splitter do
use GenServer

@chunk_every 100
@buffer 1000

defstruct pid: nil,
stream: nil,
partitions: %{},
running: false,
chunk_every: 100,
buffer: @buffer,
no_data_counter: 0

def start(opts \\ []) when is_list(opts) do
state = %__MODULE__{
chunk_every: Keyword.get(opts, :chunk_every, @chunk_every)
buffer: Keyword.get(opts, :buffer, @buffer)
}

{:ok, pid} = GenServer.start_link(__MODULE__, state)
Expand Down Expand Up @@ -47,10 +47,7 @@ defmodule Strom.Splitter do
fn splitter ->
case GenServer.call(splitter.pid, {:get_data, {name, fun}}) do
{:ok, {data, no_data_counter}} ->
if no_data_counter > 0 do
to_sleep = trunc(:math.pow(2, no_data_counter))
Process.sleep(to_sleep)
end
maybe_wait(no_data_counter, 0)

{data, splitter}

Expand All @@ -73,39 +70,35 @@ defmodule Strom.Splitter do

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

defp async_run_stream(stream, chunk_every, pid) do
defp async_run_stream(stream, buffer, pid) do
Task.async(fn ->
stream
|> Stream.chunk_every(chunk_every)
|> Stream.each(fn chunk ->
data_size = GenServer.call(pid, {:new_data, chunk})
maybe_wait(data_size, chunk_every)
|> Stream.each(fn el ->
data_size = GenServer.call(pid, {:new_data, el})
maybe_wait(data_size, buffer)
end)
|> Stream.run()

GenServer.call(pid, :done)
end)
end

defp maybe_wait(data_size, chunk_every) do
if data_size > 10 * chunk_every do
div = div(data_size, 10 * chunk_every)
to_sleep = trunc(:math.pow(2, div))

defp maybe_wait(current, allowed) do
if current > allowed do
diff = current - allowed
to_sleep = trunc(:math.pow(2, diff))
Process.sleep(to_sleep)
to_sleep
end
end

def handle_call({:new_data, data}, _from, %__MODULE__{} = splitter) do
def handle_call({:new_data, datum}, _from, %__MODULE__{} = splitter) do
new_partitions =
Enum.reduce(splitter.partitions, %{}, fn {{name, fun}, prev_data}, acc ->
case Enum.split_with(data, fun) do
{[], _} ->
Map.put(acc, {name, fun}, prev_data)

{data, _} ->
new_data = prev_data ++ data
Map.put(acc, {name, fun}, new_data)
if fun.(datum) do
Map.put(acc, {name, fun}, [datum | prev_data])
else
Map.put(acc, {name, fun}, prev_data)
end
end)

Expand All @@ -116,7 +109,7 @@ defmodule Strom.Splitter do
end

def handle_call({:run_stream, stream}, _from, %__MODULE__{} = splitter) do
async_run_stream(stream, splitter.chunk_every, splitter.pid)
async_run_stream(stream, splitter.buffer, splitter.pid)
{:reply, :ok, %{splitter | running: true}}
end

Expand All @@ -137,7 +130,10 @@ defmodule Strom.Splitter do
_from,
%__MODULE__{partitions: partitions, running: running} = splitter
) do
data = Map.get(partitions, partition_fun)
data =
partitions
|> Map.get(partition_fun)
|> Enum.reverse()

if length(data) == 0 && !running do
{:reply, {:error, :done}, splitter}
Expand Down
Loading

0 comments on commit a3f1ff0

Please sign in to comment.