From 1019de4c757a4c8ad37bfc3f34bef0aa704421f3 Mon Sep 17 00:00:00 2001 From: Anton Mishchuk Date: Tue, 12 Dec 2023 13:36:23 +0100 Subject: [PATCH] Fix splitter logic --- lib/splitter.ex | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/lib/splitter.ex b/lib/splitter.ex index 6ce754d..582e8c2 100644 --- a/lib/splitter.ex +++ b/lib/splitter.ex @@ -7,7 +7,7 @@ defmodule Strom.Splitter do def start(opts \\ []) when is_list(opts) do state = %__MODULE__{ - running: MapSet.new(), + running: false, partitions: %{}, chunk_every: Keyword.get(opts, :chunk_every, @chunk_every) } @@ -33,12 +33,14 @@ defmodule Strom.Splitter do GenServer.call(splitter.pid, {:set_partitions, partitions}) stream_to_run = Map.fetch!(flow, name) + :ok = GenServer.call(splitter.pid, {:run_stream, stream_to_run}) + sub_flow = partitions |> Enum.reduce(%{}, fn {name, fun}, flow -> stream = Stream.resource( - fn -> GenServer.call(splitter.pid, {:run_stream, stream_to_run, {name, fun}}) end, + fn -> splitter end, fn splitter -> case GenServer.call(splitter.pid, {:get_data, {name, fun}}) do {:ok, data} -> @@ -63,7 +65,7 @@ defmodule Strom.Splitter do def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__) - defp async_run_stream(stream, {name, fun}, chunk_every, pid) do + defp async_run_stream(stream, chunk_every, pid) do Task.async(fn -> stream |> Stream.chunk_every(chunk_every) @@ -73,7 +75,7 @@ defmodule Strom.Splitter do end) |> Stream.run() - GenServer.call(pid, {:done, {name, fun}}) + GenServer.call(pid, :done) end) end @@ -81,6 +83,7 @@ defmodule Strom.Splitter do if data_size > 10 * chunk_every do div = div(data_size, 10 * chunk_every) to_sleep = trunc(:math.pow(2, div)) + Process.sleep(to_sleep) end end @@ -104,10 +107,9 @@ defmodule Strom.Splitter do {:reply, data_size, %{splitter | partitions: new_partitions}} end - def handle_call({:run_stream, stream, {name, fun}}, _from, %__MODULE__{} = splitter) do - async_run_stream(stream, {name, fun}, splitter.chunk_every, splitter.pid) - splitter = %{splitter | running: MapSet.put(splitter.running, {name, fun})} - {:reply, splitter, splitter} + def handle_call({:run_stream, stream}, _from, %__MODULE__{} = splitter) do + async_run_stream(stream, splitter.chunk_every, splitter.pid) + {:reply, :ok, %{splitter | running: true}} end def handle_call({:set_partitions, partitions}, _from, %__MODULE__{} = splitter) do @@ -118,9 +120,8 @@ defmodule Strom.Splitter do {:reply, splitter, splitter} end - def handle_call({:done, {name, fun}}, _from, %__MODULE__{} = splitter) do - running = MapSet.delete(splitter.running, {name, fun}) - {:reply, :ok, %{splitter | running: running}} + def handle_call(:done, _from, %__MODULE__{} = splitter) do + {:reply, :ok, %{splitter | running: false}} end def handle_call( @@ -130,7 +131,7 @@ defmodule Strom.Splitter do ) do data = Map.get(partitions, partition_fun) - if length(data) == 0 && MapSet.size(running) == 0 do + if length(data) == 0 && !running do {:reply, {:error, :done}, splitter} else {:reply, {:ok, data}, %{splitter | partitions: Map.put(partitions, partition_fun, [])}} @@ -138,7 +139,7 @@ defmodule Strom.Splitter do end def handle_call(:stop, _from, %__MODULE__{} = splitter) do - {:stop, :normal, :ok, %{splitter | running: MapSet.new(), partitions: %{}}} + {:stop, :normal, :ok, %{splitter | running: false, partitions: %{}}} end def handle_call(:__state__, _from, splitter), do: {:reply, splitter, splitter}