Skip to content

Commit

Permalink
Separate memory for different streams in composer
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Nov 25, 2023
1 parent 6344a43 commit 8892378
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
7 changes: 5 additions & 2 deletions lib/components/composer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule ALF.Components.Composer do
function: nil,
source_code: nil,
acc: nil,
stream_accs: %{},
collected_ips: [],
new_collected_ips: %{}
]
Expand Down Expand Up @@ -63,8 +64,9 @@ defmodule ALF.Components.Composer do

defp process_ip(current_ip, state) do
history = history(current_ip, state)
acc = Map.get(state.stream_accs, current_ip.stream_ref, state.acc)

case call_function(state.module, state.function, current_ip.event, state.acc, state.opts) do
case call_function(state.module, state.function, current_ip.event, acc, state.opts) do
{:ok, {events, acc}} when is_list(events) ->
ips =
Enum.map(events, fn event ->
Expand All @@ -74,7 +76,8 @@ defmodule ALF.Components.Composer do
end)

send_result(current_ip, :destroyed)
{ips, %{state | acc: acc}}
stream_accs = Map.put(state.stream_accs, current_ip.stream_ref, acc)
{ips, %{state | stream_accs: stream_accs}}

{:error, error, stacktrace} ->
send_error_result(current_ip, error, stacktrace, state)
Expand Down
38 changes: 19 additions & 19 deletions test/mixer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,23 @@ defmodule ALF.Sink.MixerTest do
assert (orders ++ parcels) -- lines == []
end

# @tag timeout: 300_000
# test "memory" do
## add :observer, :runtime_tools, :wx to extra_applications
# :observer.start()
#
# source1 = FileSource.open("test_data/input.csv")
# source2 = FileSource.open("test_data/input.csv")
#
# stream1 = FileSource.stream(source1)
# stream2 = FileSource.stream(source2)
# sink = ALF.Sink.FileSink.open("test_data/output.csv")
#
# stream1
# |> Mixer.new()
# |> Mixer.add(stream2)
# |> Mixer.stream()
# |> ALF.Sink.FileSink.stream(sink)
# |> Stream.run()
# end
# @tag timeout: 300_000
# test "memory" do
## add :observer, :runtime_tools, :wx to extra_applications
# :observer.start()
#
# source1 = FileSource.open("test_data/input.csv")
# source2 = FileSource.open("test_data/input.csv")
#
# stream1 = FileSource.stream(source1)
# stream2 = FileSource.stream(source2)
# sink = ALF.Sink.FileSink.open("test_data/output.csv")
#
# stream1
# |> Mixer.new()
# |> Mixer.add(stream2)
# |> Mixer.stream()
# |> ALF.Sink.FileSink.stream(sink)
# |> Stream.run()
# end
end

0 comments on commit 8892378

Please sign in to comment.