Skip to content

Commit

Permalink
does not work
Browse files Browse the repository at this point in the history
  • Loading branch information
m1dnight committed Jun 18, 2019
1 parent 8ddf653 commit b0f4289
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 24 deletions.
3 changes: 1 addition & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
# and its dependencies with the aid of the Mix.Config module.
use Mix.Config

config :logger, level:
:info
config :logger, level: :error

# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
Expand Down
20 changes: 20 additions & 0 deletions lib/obs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Observables.Obs do
Take,
CombineLatest,
CombineLatestSilent,
ToList,
Delay,
DistinctUntilChanged
}
Expand Down Expand Up @@ -447,4 +448,23 @@ defmodule Observables.Obs do
v
end)
end

@doc """
Aggregates all values produces by its dependency, and stops and returns them in a list
as soon as the dependency has signaled it's stopping.
Do not use this on an infinite stream, for obvious reasons.
"""
def to_list({observable_fn, parent_pid}) do
ref = :erlang.make_ref()
{:ok, pid} = GenObservable.start_link(ToList, [{self(), ref}])

observable_fn.(pid)

# Our process is now buffering.
# When the dependency stops, we will stop as well, and we need to getg
receive do
{^ref, vs} ->
vs
end
end
end
33 changes: 14 additions & 19 deletions lib/observables/to_list.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,23 @@ defmodule Observables.Operator.ToList do
@moduledoc false
use Observables.GenObservable

def init([action, state]) do
{:ok, %{:state => state, :action => action}}
def init([waiter]) do
Logger.debug("ToList: #{inspect(self())}")
{:ok, %{:buffer => [], :waiter => waiter}}
end

def handle_event(e, %{:state => s, :action => a}) do
case a.(e, s) do
{:value, v, new_s} ->
{:value, v, %{:state => new_s, :action => a}}

{:novalue, new_s} ->
{:novalue, %{:state => new_s, :action => a}}

{:buffer, v, new_s} ->
{:buffer, v, %{:state => new_s, :action => a}}

{:done, new_s} ->
Logger.debug("done")
{:done, new_s}
end
# A regular value from the dependencies.
def handle_event(v, %{:waiter => waiter, :buffer => bs}) do
{:novalue, %{:waiter => waiter, :buffer => bs ++ [v]}}
end

def handle_done(_pid, state) do
{:ok, state}
def handle_done(pid, %{:waiter => waiter, :buffer => bs}) do
# The dependency has signaled it will no longer produce.
# Send the buffer over to the waiting pid.
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")

{pid, ref} = waiter
send(pid, {ref, bs})
{:ok, :done}
end
end
6 changes: 3 additions & 3 deletions test/distinctuntilchanged_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ defmodule DistinctUntilChangedTest do
test "distinctuntilchanged" do
testproc = self()

xs = [1,2,3,4,1,2,3,4]
xs = [1, 2, 3, 4, 1, 2, 3, 4]

expected = [1,2,3,4,1,2,3,4]
expected = [1, 2, 3, 4, 1, 2, 3, 4]

xs
|> Obs.from_enum(100)
Expand All @@ -19,7 +19,7 @@ defmodule DistinctUntilChangedTest do

expected
|> Enum.map(fn x ->
assert_receive(^x, 1000, "did not get this message! #{inspect x}")
assert_receive(^x, 1000, "did not get this message! #{inspect(x)}")
end)

receive do
Expand Down
30 changes: 30 additions & 0 deletions test/to_list_test.exs
Original file line number Diff line number Diff line change
@@ -1,2 +1,32 @@
defmodule ToListTest do
use ExUnit.Case
alias Observables.{Obs}
require Logger
@tag :take
test "ToList" do
testproc = self()

xs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]

res =
xs
|> Obs.from_enum(100)
|> Obs.to_list()

res == xs
end

test "ToList take a few" do
testproc = self()

xs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]

res =
xs
|> Obs.from_enum(100)
|> Obs.take(2)
|> Obs.to_list()

res == [1, 2]
end
end

0 comments on commit b0f4289

Please sign in to comment.