Skip to content

Commit

Permalink
Add no_wait option to GenMix
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed May 5, 2024
1 parent 3223f65 commit 6788f71
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
19 changes: 14 additions & 5 deletions lib/gen_mix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Strom.GenMix do
opts: [],
chunk: @chunk,
buffer: @buffer,
no_wait: false,
input_streams: %{},
tasks: %{},
data: %{},
Expand All @@ -23,7 +24,8 @@ defmodule Strom.GenMix do
gen_mix = %{
gen_mix
| chunk: Keyword.get(opts, :chunk, @chunk),
buffer: Keyword.get(opts, :buffer, @buffer)
buffer: Keyword.get(opts, :buffer, @buffer),
no_wait: Keyword.get(opts, :no_wait, false)
}

start_link(gen_mix)
Expand Down Expand Up @@ -149,12 +151,12 @@ defmodule Strom.GenMix do

total_count = Enum.reduce(mix.data, 0, fn {_name, data}, count -> count + length(data) end)

if total_count <= mix.buffer do
if total_count <= mix.buffer and mix.no_wait != :first_stream_done do
Enum.each(mix.tasks, fn {_, task} -> send(task.pid, :continue_task) end)
end

cond do
length(data) == 0 and map_size(mix.tasks) == 0 ->
length(data) == 0 and (map_size(mix.tasks) == 0 or mix.no_wait == :first_stream_done) ->
{:reply, :done, mix}

length(data) == 0 ->
Expand All @@ -179,8 +181,8 @@ defmodule Strom.GenMix do
send(client_pid, :continue_client)
end)

if total_count < mix.buffer do
task = Map.fetch!(mix.tasks, name)
if total_count < mix.buffer and mix.no_wait != :first_stream_done do
task = Map.get(mix.tasks, name)
send(task.pid, :continue_task)
end

Expand All @@ -197,6 +199,13 @@ defmodule Strom.GenMix do
send(client_pid, :continue_client)
end)

mix =
if mix.no_wait do
%{mix | no_wait: :first_stream_done}
else
mix
end

{:noreply, %{mix | waiting_clients: %{}}}
end

Expand Down
25 changes: 23 additions & 2 deletions test/mixer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ defmodule Strom.MixerTest do
use ExUnit.Case, async: true
doctest Strom.Mixer

alias Strom.Source
alias Strom.{Composite, Mixer, Source}
alias Strom.Source.ReadLines
alias Strom.Mixer

setup do
mixer =
Expand Down Expand Up @@ -156,4 +155,26 @@ defmodule Strom.MixerTest do
|> Enum.each(fn el -> assert String.contains?(el, "111,3") end)
end
end

describe "halt mix stream if one of the stream ends" do
setup do
source_finite = Source.new(:finite, [1, 2, 3, 4, 5])
source_infinite = Source.new(:infinite, Stream.cycle([9, 8, 7]))

mixer = Mixer.new([:finite, :infinite], :stream, no_wait: true)

composite =
[source_finite, source_infinite, mixer]
|> Composite.new()
|> Composite.start()

%{composite: composite}
end

test "halt when one stream halts", %{composite: composite} do

Check failure on line 174 in test/mixer_test.exs

View workflow job for this annotation

GitHub Actions / Build and test

test halt mix stream if one of the stream ends halt when one stream halts (Strom.MixerTest)
%{stream: stream} = Composite.call(%{}, composite)
results = Enum.to_list(stream)
Enum.each([1, 2, 3, 4, 5], fn num -> assert Enum.member?(results, num) end)
end
end
end

0 comments on commit 6788f71

Please sign in to comment.