From 297b29f4cf804d240bd5a289357bbae1a111e823 Mon Sep 17 00:00:00 2001 From: Anton Mishchuk Date: Mon, 10 Jun 2024 14:26:52 +0200 Subject: [PATCH 1/3] Simplify the Sink interface --- lib/sink.ex | 23 +++++++--------------- lib/sink/io_puts.ex | 2 +- lib/sink/null.ex | 2 +- lib/sink/write_lines.ex | 2 +- test/experiments_test.exs | 41 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 51 insertions(+), 19 deletions(-) create mode 100644 test/experiments_test.exs diff --git a/lib/sink.ex b/lib/sink.ex index 45f46fe..01362af 100644 --- a/lib/sink.ex +++ b/lib/sink.ex @@ -15,7 +15,7 @@ defmodule Strom.Sink do See `Strom.Sink.Writeline`, `Strom.Sink.IOPuts`, `Strom.Sink.Null` """ @callback start(map) :: map - @callback call(map, term) :: {:ok, {term, map}} | {:error, {term, map}} + @callback call(map, term) :: map | no_return() @callback stop(map) :: map use GenServer @@ -67,12 +67,12 @@ defmodule Strom.Sink do Map.delete(flow, name) end - defp async_run_sink(sink, stream) do + defp async_run_sink(%__MODULE__{origin: origin} = sink, stream) do Task.Supervisor.async_nolink( {:via, PartitionSupervisor, {Strom.TaskSupervisor, self()}}, fn -> - Stream.transform(stream, sink, fn el, sink -> - call_sink(sink, el) + Stream.transform(stream, sink, fn el, _sink -> + sink = apply(origin.__struct__, :call, [origin, el]) {[], sink} end) |> Stream.run() @@ -82,16 +82,6 @@ defmodule Strom.Sink do ) end - defp call_sink(%__MODULE__{origin: origin} = sink, data) do - case apply(origin.__struct__, :call, [origin, data]) do - {:ok, {[], origin}} -> - {[], %{sink | origin: origin}} - - {:error, {:halt, origin}} -> - {:halt, %{sink | origin: origin}} - end - end - @spec stop(__MODULE__.t()) :: :ok def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop) @@ -106,8 +96,9 @@ defmodule Strom.Sink do {:reply, :ok, %{sink | task: task, stream: stream}} end - def handle_call({:call, data}, _from, %__MODULE__{} = sink) do - {:reply, call_sink(sink, data), sink} + def handle_call({:call, data}, _from, %__MODULE__{origin: origin}) do + sink = apply(origin.__struct__, :call, [origin, data]) + {:reply, {[], sink}, sink} end def handle_call(:stop, _from, %__MODULE__{origin: origin, task: task} = sink) do diff --git a/lib/sink/io_puts.ex b/lib/sink/io_puts.ex index c43059a..9d98657 100644 --- a/lib/sink/io_puts.ex +++ b/lib/sink/io_puts.ex @@ -12,7 +12,7 @@ defmodule Strom.Sink.IOPuts do def call(%__MODULE__{} = io_puts, data) do IO.puts(io_puts.prefix <> "#{data}" <> io_puts.line_sep) - {:ok, {[], io_puts}} + io_puts end @impl true diff --git a/lib/sink/null.ex b/lib/sink/null.ex index db35a0d..55934ec 100644 --- a/lib/sink/null.ex +++ b/lib/sink/null.ex @@ -9,7 +9,7 @@ defmodule Strom.Sink.Null do def start(%__MODULE__{} = null), do: null @impl true - def call(%__MODULE__{} = null, _data), do: {:ok, {[], null}} + def call(%__MODULE__{} = null, _data), do: null @impl true def stop(%__MODULE__{} = null), do: null diff --git a/lib/sink/write_lines.ex b/lib/sink/write_lines.ex index 0590740..6a44183 100644 --- a/lib/sink/write_lines.ex +++ b/lib/sink/write_lines.ex @@ -19,7 +19,7 @@ defmodule Strom.Sink.WriteLines do def call(%__MODULE__{} = write_lines, data) do :ok = IO.write(write_lines.file, data <> write_lines.line_sep) - {:ok, {[], write_lines}} + write_lines end @impl true diff --git a/test/experiments_test.exs b/test/experiments_test.exs new file mode 100644 index 0000000..33844f8 --- /dev/null +++ b/test/experiments_test.exs @@ -0,0 +1,41 @@ +defmodule Strom.ExperimentsTest do + use ExUnit.Case, async: true + + # alias Strom.{Composite, Transformer} + + # @tag timeout: :infinity + # test "event speed" do + # :observer.start() + # print_time = + # Transformer.new(:stream, fn event -> + # IO.inspect(:erlang.system_time(:millisecond)) + # event + # end) + # + # transformer = Transformer.new(:stream, &(&1 + 1)) + # transformers = List.duplicate(transformer, 200_000) + # + # composite = + # [print_time, transformers, print_time] + # |> Composite.new() + # |> Composite.start() + # + # start = :erlang.system_time(:millisecond) + # + # %{stream: [1]} + # |> Composite.call(composite) + # |> Map.get(:stream) + # |> Enum.to_list() + # |> IO.inspect + # + # IO.inspect(:erlang.system_time(:millisecond) - start, label: :total) + # end + # + # test "tasks memory consumption" do + # :observer.start() + # Enum.map(1..1_000_000, fn _i -> + # Task.async(fn -> Process.sleep(50000) end) + # end) + # |> Enum.map(&Task.await(&1, :infinity)) + # end +end From 42cc9673356ed7ad6eee46af0644c2411c9d023a Mon Sep 17 00:00:00 2001 From: Anton Mishchuk Date: Mon, 10 Jun 2024 14:48:10 +0200 Subject: [PATCH 2/3] Simplify the Source interface --- lib/composite.ex | 2 +- lib/loop.ex | 12 +++++++----- lib/sink.ex | 5 +++-- lib/source.ex | 34 ++++++++++------------------------ lib/source/io_gets.ex | 2 +- lib/source/read_lines.ex | 4 ++-- 6 files changed, 24 insertions(+), 35 deletions(-) diff --git a/lib/composite.ex b/lib/composite.ex index 7eeedb2..006d87f 100644 --- a/lib/composite.ex +++ b/lib/composite.ex @@ -152,7 +152,7 @@ defmodule Strom.Composite do end defp timestamp_postfix do - :erlang.system_time() + System.os_time() |> rem(round(1.0e9)) |> to_string() end diff --git a/lib/loop.ex b/lib/loop.ex index 45c494d..edabae8 100644 --- a/lib/loop.ex +++ b/lib/loop.ex @@ -28,6 +28,7 @@ defmodule Strom.Loop do end end + # call as a source def call(%__MODULE__{name: name} = loop) do Agent.get_and_update(name, fn data -> case data do @@ -40,24 +41,25 @@ defmodule Strom.Loop do case loop.last_empty_call_at do nil -> Process.sleep(loop.sleep) - {:ok, {[], %{loop | last_empty_call_at: System.os_time(:millisecond)}}} + {[], %{loop | last_empty_call_at: System.os_time(:millisecond)}} last_empty_call_at -> if System.os_time(:millisecond) - last_empty_call_at > loop.timeout do - {:error, {:halt, loop}} + {:halt, loop} else - {:ok, {[], loop}} + {[], loop} end end datum -> - {:ok, {[datum], %{loop | last_empty_call_at: nil}}} + {[datum], %{loop | last_empty_call_at: nil}} end end + # call as a sink def call(%__MODULE__{name: name} = loop, data) do :ok = Agent.update(name, fn prev_data -> prev_data ++ [data] end) - {:ok, {[], loop}} + loop end def stop(%__MODULE__{name: name} = loop) do diff --git a/lib/sink.ex b/lib/sink.ex index 01362af..f8c5ec0 100644 --- a/lib/sink.ex +++ b/lib/sink.ex @@ -96,8 +96,9 @@ defmodule Strom.Sink do {:reply, :ok, %{sink | task: task, stream: stream}} end - def handle_call({:call, data}, _from, %__MODULE__{origin: origin}) do - sink = apply(origin.__struct__, :call, [origin, data]) + def handle_call({:call, data}, _from, %__MODULE__{origin: origin} = sink) do + origin = apply(origin.__struct__, :call, [origin, data]) + sink = %{sink | origin: origin} {:reply, {[], sink}, sink} end diff --git a/lib/source.ex b/lib/source.ex index da1022b..a10525a 100644 --- a/lib/source.ex +++ b/lib/source.ex @@ -29,7 +29,7 @@ defmodule Strom.Source do """ @callback start(map) :: map - @callback call(map) :: {:ok, {[term], map}} | {:error, {:halt, map}} + @callback call(map) :: {[term], map} | {:halt, map} | no_return() @callback stop(map) :: map @callback infinite?(map) :: true | false @@ -159,12 +159,12 @@ defmodule Strom.Source do ) end - defp loop_call(source) do - case call_source(source) do - {:halt, _source} -> + defp loop_call(%__MODULE__{origin: origin} = source) do + case apply(origin.__struct__, :call, [origin]) do + {:halt, _origin} -> :task_done - {events, source} -> + {events, origin} -> GenServer.cast(source.pid, {:new_data, events}) receive do @@ -172,23 +172,7 @@ defmodule Strom.Source do flush(:continue_task) end - loop_call(source) - end - end - - defp call_source(%__MODULE__{origin: origin} = source) do - case apply(origin.__struct__, :call, [origin]) do - {:ok, {events, origin}} -> - source = %{source | origin: origin} - {events, source} - - {:error, {:halt, origin}} -> - source = %{source | origin: origin} - - case apply(origin.__struct__, :infinite?, [origin]) do - true -> {[], source} - false -> {:halt, source} - end + loop_call(%{source | origin: origin}) end end @@ -200,8 +184,10 @@ defmodule Strom.Source do end @impl true - def handle_call(:call, _from, %__MODULE__{} = source) do - {:reply, call_source(source), source} + def handle_call(:call, _from, %__MODULE__{origin: origin} = source) do + {events, origin} = apply(origin.__struct__, :call, [origin]) + source = %{source | origin: origin} + {:reply, {events, source}, source} end def handle_call(:stop, _from, %__MODULE__{origin: origin, task: task} = source) diff --git a/lib/source/io_gets.ex b/lib/source/io_gets.ex index 4aabfe0..7ce5eb8 100644 --- a/lib/source/io_gets.ex +++ b/lib/source/io_gets.ex @@ -11,7 +11,7 @@ defmodule Strom.Source.IOGets do @impl true def call(%__MODULE__{} = io_gets) do data = IO.gets("IOGets> ") - {:ok, {[String.trim(data)], io_gets}} + {[String.trim(data)], io_gets} end @impl true diff --git a/lib/source/read_lines.ex b/lib/source/read_lines.ex index b78649d..78f79db 100644 --- a/lib/source/read_lines.ex +++ b/lib/source/read_lines.ex @@ -12,10 +12,10 @@ defmodule Strom.Source.ReadLines do def call(%__MODULE__{} = read_lines) do case read_line(read_lines.file) do {:ok, data} -> - {:ok, {[String.trim(data)], read_lines}} + {[String.trim(data)], read_lines} {:error, :eof} -> - {:error, {:halt, read_lines}} + {:halt, read_lines} end end From 48f281804cf65d9b1f0b35b5e84f4b67cab5c668 Mon Sep 17 00:00:00 2001 From: Anton Mishchuk Date: Mon, 10 Jun 2024 14:54:30 +0200 Subject: [PATCH 3/3] Fix test --- test/crash_test.exs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/crash_test.exs b/test/crash_test.exs index c63ce0d..f5c1005 100644 --- a/test/crash_test.exs +++ b/test/crash_test.exs @@ -141,11 +141,11 @@ defmodule Strom.CrashTest do if String.trim(data) == "4" do raise "error" else - {:ok, {[String.trim(data)], read_lines}} + {[String.trim(data)], read_lines} end {:error, :eof} -> - {:error, {:halt, read_lines}} + {:halt, read_lines} end end @@ -215,7 +215,7 @@ defmodule Strom.CrashTest do :ok = IO.write(write_lines.file, data <> write_lines.line_sep) end - {:ok, {[], write_lines}} + {[], write_lines} end @impl true