Skip to content

Commit

Permalink
combine latest silent
Browse files Browse the repository at this point in the history
  • Loading branch information
m1dnight committed May 10, 2018
1 parent 983b312 commit 7f3483b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 38 deletions.
6 changes: 3 additions & 3 deletions lib/obs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,10 @@ defmodule Observables.Obs do
More information: http://reactivex.io/documentation/operators/combinelatest.html
"""
def combineLatestSilent(l, r, opts \\ [left: nil, right: nil, silent: :right]) do
def combinelatestsilent(l, r, opts \\ [left: nil, right: nil, silent: :right]) do
left_initial = Keyword.get(opts, :left, nil)
right_initial = Keyword.get(opts, :right, nil)
left_initial = Keyword.get(opts, :silent, :right)
silent = Keyword.get(opts, :silent, :right)

# We tag each value from left and right with their respective label.
{f_l, _pid_l} =
Expand All @@ -408,7 +408,7 @@ defmodule Observables.Obs do
|> map(fn v -> {:right, v} end)

# Start our zipper observable.
{:ok, pid} = GenObservable.start(CombineLatestSilent, [left_initial, right_initial, :left])
{:ok, pid} = GenObservable.start(CombineLatestSilent, [left_initial, right_initial, silent])

# Make left and right send to us.
f_l.(pid)
Expand Down
14 changes: 7 additions & 7 deletions lib/observables/combineLatestSilent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@ defmodule Observables.CombineLatestSilent do

case {value, l, r, s} do
# No values yet.
{{:left, vl}, nil, nil, _} ->
{{:left, vl}, nil, nil, _} ->
{:novalue, {:left, vl, :right, nil, :silent, s}}

{{:right, vr}, nil, nil, _} ->
{{:right, vr}, nil, nil, _} ->
{:novalue, {:left, nil, :right, vr, :silent, s}}

# Have one value, and got a newever version of that value.
{{:left, vl}, _, nil, _} ->
{{:left, vl}, _, nil, _} ->
{:novalue, {:left, vl, :right, nil, :silent, s}}

{{:right, vr}, nil, _, _} ->
{{:right, vr}, nil, _, _} ->
{:novalue, {:left, nil, :right, vr, :silent, s}}

# Already have the other value.
{{:left, vl}, nil, vr, :right} ->
{{:left, vl}, nil, vr, :right} ->
{:value, {vl, vr}, {:left, vl, :right, vr, :silent, s}}

{{:left, vl}, nil, vr, :left} ->
{{:left, vl}, nil, vr, :left} ->
{:novalue, {:left, vl, :right, vr, :silent, s}}

{{:right, vr}, vl, nil, :right} ->
{{:right, vr}, vl, nil, :right} ->
{:novalue, {:left, vl, :right, vr, :silent, s}}

{{:right, vr}, vl, nil, :left} ->
Expand Down
90 changes: 62 additions & 28 deletions test/observables_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule ObservablesTest do
ms -> :ok
end
end

@tag :frompid
test "Test from pid" do
testproc = self()
Expand Down Expand Up @@ -480,23 +481,22 @@ defmodule ObservablesTest do
test "Combine Latest" do
testproc = self()


{:ok, pid1} = GenObservable.spawn_supervised(Observables.Subject)
xs = Obs.from_pid(pid1)

{:ok, pid2} = GenObservable.spawn_supervised(Observables.Subject)
ys = Obs.from_pid(pid2)

Obs.combinelatest(xs, ys, [left: nil, right: nil])
Obs.combinelatest(xs, ys, left: nil, right: nil)
|> Obs.inspect()
|> Obs.map(fn v -> send(testproc, v) end)

# Send first value, should not produce.
GenObservable.send_event(pid1, :x0)

receive do
x -> flunk "Mailbox was supposed to be empty, got: #{inspect x}"
after
x -> flunk("Mailbox was supposed to be empty, got: #{inspect(x)}")
after
0 -> :ok
end

Expand All @@ -516,47 +516,81 @@ defmodule ObservablesTest do
assert_receive({:xfinal, :y1}, 1000, "did not get this message!")

receive do
x -> flunk "Mailbox was supposed to be empty, got: #{inspect x}"
after
x -> flunk("Mailbox was supposed to be empty, got: #{inspect(x)}")
after
0 -> :ok
end
end


@tag :combinelatestsilent
test "Combine Latest Silent" do
testproc = self()

xs = Obs.range(1, 3, 500)
ys = Obs.range(11, 15, 1200)

# 1 2 3
# 11 12 13 14 15
# ===================================
# 1/11 2/11 3/12

Obs.combineLatestSilent(xs, ys, [left: nil, right: nil, silent: :right])
testproc = self()

{:ok, pid1} = GenObservable.spawn_supervised(Observables.Subject)
xs = Obs.from_pid(pid1)

{:ok, pid2} = GenObservable.spawn_supervised(Observables.Subject)
ys = Obs.from_pid(pid2)

Obs.combinelatestsilent(xs, ys, [left: nil, right: nil, silent: :right])
|> Obs.inspect()
|> Obs.map(fn v -> send(testproc, v) end)

[{2, 11}, {2, 11}, {3, 12}]
|> Enum.map(fn x ->
receive do
^x -> Logger.debug("Got #{inspect(x)}")
after
5000 ->
assert "did not get expected value #{inspect(x)}"
end
end)
# Send first value, should not produce.
GenObservable.send_event(pid1, :x0)
receive do
x -> flunk("Mailbox was supposed to be empty, got: #{inspect(x)}")
after
100 -> :ok
end

# Receive no other values.
# Send second value, should not produce because silent.
GenObservable.send_event(pid2, :y0)
receive do
x ->
Logger.error("Received another value, did not want")
assert "received another value: #{inspect(x, charlists: :as_lists)} " == ""
x -> flunk("Mailbox was supposed to be empty, got: #{inspect(x)}")
after
1000 ->
:ok
100 -> :ok
end

# Update the left observable. Should produce with history.
GenObservable.send_event(pid1, :x1)
assert_receive({:x1, :y0}, 5000, "did not get this message {:x1, :y0}!")

# Update the right observable, should be silent.
GenObservable.send_event(pid2, :y1)
receive do
x -> flunk("Mailbox was supposed to be empty, got: #{inspect(x)}")
after
100 -> :ok
end

GenObservable.send_event(pid2, :y2)
receive do
x -> flunk("Mailbox was supposed to be empty, got: #{inspect(x)}")
after
100 -> :ok
end

GenObservable.send_event(pid2, :y3)
receive do
x -> flunk("Mailbox was supposed to be empty, got: #{inspect(x)}")
after
100 -> :ok
end
# Send a final value, should produce.
GenObservable.send_event(pid1, :x2)
assert_receive({:x2, :y3}, 1000, "did not get this message {:x2, :y3}!")

# Mailbox should be empty.
receive do
x -> flunk("Mailbox was supposed to be empty, got: #{inspect(x)}")
after
100 -> :ok
end
end
end

0 comments on commit 7f3483b

Please sign in to comment.