Skip to content

Commit

Permalink
Merge branch 'combinelatestsilent'
Browse files Browse the repository at this point in the history
  • Loading branch information
m1dnight committed May 10, 2018
2 parents 6b0fe96 + 08daf27 commit 983b312
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 2 deletions.
48 changes: 47 additions & 1 deletion lib/obs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule Observables.Obs do
Chunk,
Scan,
Take,
CombineLatest
CombineLatest,
CombineLatestSilent
}

alias Enum
Expand Down Expand Up @@ -374,6 +375,51 @@ defmodule Observables.Obs do
end, pid}
end

@doc """
Given two observables, merges them together and always merges the last result of on of both, and
reuses the last value from the other.
The nuance with combinelatest here is that one of both observables will not trigger an update,
but will update silently.
E.g.
1 -> 2 ------> 3
A -----> B ------> C
=
1A --> 2A ----> 3B
More information: http://reactivex.io/documentation/operators/combinelatest.html
"""
def combineLatestSilent(l, r, opts \\ [left: nil, right: nil, silent: :right]) do
left_initial = Keyword.get(opts, :left, nil)
right_initial = Keyword.get(opts, :right, nil)
left_initial = Keyword.get(opts, :silent, :right)

# We tag each value from left and right with their respective label.
{f_l, _pid_l} =
l
|> Observables.Obs.inspect()
|> map(fn v -> {:left, v} end)

{f_r, _pid_r} =
r
|> Observables.Obs.inspect()
|> map(fn v -> {:right, v} end)

# Start our zipper observable.
{:ok, pid} = GenObservable.start(CombineLatestSilent, [left_initial, right_initial, :left])

# Make left and right send to us.
f_l.(pid)
f_r.(pid)

# Creat the continuation.
{fn observer ->
GenObservable.send_to(pid, observer)
end, pid}
end

# TERMINATORS ##################################################################

@doc """
Expand Down
61 changes: 61 additions & 0 deletions lib/observables/combineLatestSilent.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule Observables.CombineLatestSilent do
@moduledoc false
use Observables.GenObservable

# silent == :left or :right
def init([left_initial, right_initial, silent]) do
Logger.debug("CombineLatest: #{inspect(self())}")
{:ok, {:left, left_initial, :right, right_initial, :silent, silent}}
end

def handle_event(value, state) do
{:left, l, :right, r, :silent, s} = state

case {value, l, r, s} do
# No values yet.
{{:left, vl}, nil, nil, _} ->
{:novalue, {:left, vl, :right, nil, :silent, s}}

{{:right, vr}, nil, nil, _} ->
{:novalue, {:left, nil, :right, vr, :silent, s}}

# Have one value, and got a newever version of that value.
{{:left, vl}, _, nil, _} ->
{:novalue, {:left, vl, :right, nil, :silent, s}}

{{:right, vr}, nil, _, _} ->
{:novalue, {:left, nil, :right, vr, :silent, s}}

# Already have the other value.
{{:left, vl}, nil, vr, :right} ->
{:value, {vl, vr}, {:left, vl, :right, vr, :silent, s}}

{{:left, vl}, nil, vr, :left} ->
{:novalue, {:left, vl, :right, vr, :silent, s}}

{{:right, vr}, vl, nil, :right} ->
{:novalue, {:left, vl, :right, vr, :silent, s}}

{{:right, vr}, vl, nil, :left} ->
{:value, {vl, vr}, {:left, vl, :right, vr, :silent, s}}

# We have a history for both, and now we got a new one.
{{:left, vl}, _, vr, :left} ->
{:novalue, {:left, vl, :right, vr, :silent, s}}

{{:left, vl}, _, vr, :right} ->
{:value, {vl, vr}, {:left, vl, :right, vr, :silent, s}}

{{:right, vr}, vl, _, :left} ->
{:value, {vl, vr}, {:left, vl, :right, vr, :silent, s}}

{{:right, vr}, vl, _, :right} ->
{:novalue, {:left, vl, :right, vr, :silent, s}}
end
end

def handle_done(_pid, _state) do
Logger.debug("#{inspect(self())}: combinelatest has one dead dependency, going on.")
{:ok, :continue}
end
end
40 changes: 39 additions & 1 deletion test/observables_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ defmodule ObservablesTest do
{:ok, pid2} = GenObservable.spawn_supervised(Observables.Subject)
ys = Obs.from_pid(pid2)

Obs.combinelatest(xs, ys)
Obs.combinelatest(xs, ys, [left: nil, right: nil])
|> Obs.inspect()
|> Obs.map(fn v -> send(testproc, v) end)

Expand Down Expand Up @@ -521,4 +521,42 @@ defmodule ObservablesTest do
0 -> :ok
end
end


@tag :combinelatestsilent
test "Combine Latest Silent" do
testproc = self()

xs = Obs.range(1, 3, 500)
ys = Obs.range(11, 15, 1200)

# 1 2 3
# 11 12 13 14 15
# ===================================
# 1/11 2/11 3/12

Obs.combineLatestSilent(xs, ys, [left: nil, right: nil, silent: :right])
|> Obs.inspect()
|> Obs.map(fn v -> send(testproc, v) end)

[{2, 11}, {2, 11}, {3, 12}]
|> Enum.map(fn x ->
receive do
^x -> Logger.debug("Got #{inspect(x)}")
after
5000 ->
assert "did not get expected value #{inspect(x)}"
end
end)

# Receive no other values.
receive do
x ->
Logger.error("Received another value, did not want")
assert "received another value: #{inspect(x, charlists: :as_lists)} " == ""
after
1000 ->
:ok
end
end
end

0 comments on commit 983b312

Please sign in to comment.