Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sink and source #10

Merged
merged 3 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/composite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ defmodule Strom.Composite do
end

defp timestamp_postfix do
:erlang.system_time()
System.os_time()
|> rem(round(1.0e9))
|> to_string()
end
Expand Down
12 changes: 7 additions & 5 deletions lib/loop.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule Strom.Loop do
end
end

# call as a source
def call(%__MODULE__{name: name} = loop) do
Agent.get_and_update(name, fn data ->
case data do
Expand All @@ -40,24 +41,25 @@ defmodule Strom.Loop do
case loop.last_empty_call_at do
nil ->
Process.sleep(loop.sleep)
{:ok, {[], %{loop | last_empty_call_at: System.os_time(:millisecond)}}}
{[], %{loop | last_empty_call_at: System.os_time(:millisecond)}}

last_empty_call_at ->
if System.os_time(:millisecond) - last_empty_call_at > loop.timeout do
{:error, {:halt, loop}}
{:halt, loop}
else
{:ok, {[], loop}}
{[], loop}
end
end

datum ->
{:ok, {[datum], %{loop | last_empty_call_at: nil}}}
{[datum], %{loop | last_empty_call_at: nil}}
end
end

# call as a sink
def call(%__MODULE__{name: name} = loop, data) do
:ok = Agent.update(name, fn prev_data -> prev_data ++ [data] end)
{:ok, {[], loop}}
loop
end

def stop(%__MODULE__{name: name} = loop) do
Expand Down
24 changes: 8 additions & 16 deletions lib/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Strom.Sink do
See `Strom.Sink.Writeline`, `Strom.Sink.IOPuts`, `Strom.Sink.Null`
"""
@callback start(map) :: map
@callback call(map, term) :: {:ok, {term, map}} | {:error, {term, map}}
@callback call(map, term) :: map | no_return()
@callback stop(map) :: map

use GenServer
Expand Down Expand Up @@ -67,12 +67,12 @@ defmodule Strom.Sink do
Map.delete(flow, name)
end

defp async_run_sink(sink, stream) do
defp async_run_sink(%__MODULE__{origin: origin} = sink, stream) do
Task.Supervisor.async_nolink(
{:via, PartitionSupervisor, {Strom.TaskSupervisor, self()}},
fn ->
Stream.transform(stream, sink, fn el, sink ->
call_sink(sink, el)
Stream.transform(stream, sink, fn el, _sink ->
sink = apply(origin.__struct__, :call, [origin, el])
{[], sink}
end)
|> Stream.run()
Expand All @@ -82,16 +82,6 @@ defmodule Strom.Sink do
)
end

defp call_sink(%__MODULE__{origin: origin} = sink, data) do
case apply(origin.__struct__, :call, [origin, data]) do
{:ok, {[], origin}} ->
{[], %{sink | origin: origin}}

{:error, {:halt, origin}} ->
{:halt, %{sink | origin: origin}}
end
end

@spec stop(__MODULE__.t()) :: :ok
def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)

Expand All @@ -106,8 +96,10 @@ defmodule Strom.Sink do
{:reply, :ok, %{sink | task: task, stream: stream}}
end

def handle_call({:call, data}, _from, %__MODULE__{} = sink) do
{:reply, call_sink(sink, data), sink}
def handle_call({:call, data}, _from, %__MODULE__{origin: origin} = sink) do
origin = apply(origin.__struct__, :call, [origin, data])
sink = %{sink | origin: origin}
{:reply, {[], sink}, sink}
end

def handle_call(:stop, _from, %__MODULE__{origin: origin, task: task} = sink) do
Expand Down
2 changes: 1 addition & 1 deletion lib/sink/io_puts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Strom.Sink.IOPuts do
def call(%__MODULE__{} = io_puts, data) do
IO.puts(io_puts.prefix <> "#{data}" <> io_puts.line_sep)

{:ok, {[], io_puts}}
io_puts
end

@impl true
Expand Down
2 changes: 1 addition & 1 deletion lib/sink/null.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Strom.Sink.Null do
def start(%__MODULE__{} = null), do: null

@impl true
def call(%__MODULE__{} = null, _data), do: {:ok, {[], null}}
def call(%__MODULE__{} = null, _data), do: null

@impl true
def stop(%__MODULE__{} = null), do: null
Expand Down
2 changes: 1 addition & 1 deletion lib/sink/write_lines.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Strom.Sink.WriteLines do
def call(%__MODULE__{} = write_lines, data) do
:ok = IO.write(write_lines.file, data <> write_lines.line_sep)

{:ok, {[], write_lines}}
write_lines
end

@impl true
Expand Down
34 changes: 10 additions & 24 deletions lib/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule Strom.Source do
"""

@callback start(map) :: map
@callback call(map) :: {:ok, {[term], map}} | {:error, {:halt, map}}
@callback call(map) :: {[term], map} | {:halt, map} | no_return()
@callback stop(map) :: map
@callback infinite?(map) :: true | false

Expand Down Expand Up @@ -159,36 +159,20 @@ defmodule Strom.Source do
)
end

defp loop_call(source) do
case call_source(source) do
{:halt, _source} ->
defp loop_call(%__MODULE__{origin: origin} = source) do
case apply(origin.__struct__, :call, [origin]) do
{:halt, _origin} ->
:task_done

{events, source} ->
{events, origin} ->
GenServer.cast(source.pid, {:new_data, events})

receive do
:continue_task ->
flush(:continue_task)
end

loop_call(source)
end
end

defp call_source(%__MODULE__{origin: origin} = source) do
case apply(origin.__struct__, :call, [origin]) do
{:ok, {events, origin}} ->
source = %{source | origin: origin}
{events, source}

{:error, {:halt, origin}} ->
source = %{source | origin: origin}

case apply(origin.__struct__, :infinite?, [origin]) do
true -> {[], source}
false -> {:halt, source}
end
loop_call(%{source | origin: origin})
end
end

Expand All @@ -200,8 +184,10 @@ defmodule Strom.Source do
end

@impl true
def handle_call(:call, _from, %__MODULE__{} = source) do
{:reply, call_source(source), source}
def handle_call(:call, _from, %__MODULE__{origin: origin} = source) do
{events, origin} = apply(origin.__struct__, :call, [origin])
source = %{source | origin: origin}
{:reply, {events, source}, source}
end

def handle_call(:stop, _from, %__MODULE__{origin: origin, task: task} = source)
Expand Down
2 changes: 1 addition & 1 deletion lib/source/io_gets.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Strom.Source.IOGets do
@impl true
def call(%__MODULE__{} = io_gets) do
data = IO.gets("IOGets> ")
{:ok, {[String.trim(data)], io_gets}}
{[String.trim(data)], io_gets}
end

@impl true
Expand Down
4 changes: 2 additions & 2 deletions lib/source/read_lines.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ defmodule Strom.Source.ReadLines do
def call(%__MODULE__{} = read_lines) do
case read_line(read_lines.file) do
{:ok, data} ->
{:ok, {[String.trim(data)], read_lines}}
{[String.trim(data)], read_lines}

{:error, :eof} ->
{:error, {:halt, read_lines}}
{:halt, read_lines}
end
end

Expand Down
6 changes: 3 additions & 3 deletions test/crash_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ defmodule Strom.CrashTest do
if String.trim(data) == "4" do
raise "error"
else
{:ok, {[String.trim(data)], read_lines}}
{[String.trim(data)], read_lines}
end

{:error, :eof} ->
{:error, {:halt, read_lines}}
{:halt, read_lines}
end
end

Expand Down Expand Up @@ -215,7 +215,7 @@ defmodule Strom.CrashTest do
:ok = IO.write(write_lines.file, data <> write_lines.line_sep)
end

{:ok, {[], write_lines}}
{[], write_lines}
end

@impl true
Expand Down
41 changes: 41 additions & 0 deletions test/experiments_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule Strom.ExperimentsTest do
use ExUnit.Case, async: true

# alias Strom.{Composite, Transformer}

# @tag timeout: :infinity
# test "event speed" do
# :observer.start()
# print_time =
# Transformer.new(:stream, fn event ->
# IO.inspect(:erlang.system_time(:millisecond))
# event
# end)
#
# transformer = Transformer.new(:stream, &(&1 + 1))
# transformers = List.duplicate(transformer, 200_000)
#
# composite =
# [print_time, transformers, print_time]
# |> Composite.new()
# |> Composite.start()
#
# start = :erlang.system_time(:millisecond)
#
# %{stream: [1]}
# |> Composite.call(composite)
# |> Map.get(:stream)
# |> Enum.to_list()
# |> IO.inspect
#
# IO.inspect(:erlang.system_time(:millisecond) - start, label: :total)
# end
#
# test "tasks memory consumption" do
# :observer.start()
# Enum.map(1..1_000_000, fn _i ->
# Task.async(fn -> Process.sleep(50000) end)
# end)
# |> Enum.map(&Task.await(&1, :infinity))
# end
end
Loading