Skip to content

Commit

Permalink
combinelatest
Browse files Browse the repository at this point in the history
  • Loading branch information
m1dnight committed Apr 29, 2018
1 parent 64040eb commit dd48b28
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 26 deletions.
7 changes: 7 additions & 0 deletions lib/behavior/gen_observable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ defmodule Observables.GenObservable do
cast(producer, {:delay, delay})
end

@doc """
Makes an observable stop and gracefully shut down.
"""
def stop(producer, reason \\ :normal) do
cast(producer, {:stop, reason})
end

###########
# Helpers #
###########
Expand Down
38 changes: 37 additions & 1 deletion lib/obs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ defmodule Observables.Obs do
Buffer,
Chunk,
Scan,
Take
Take,
CombineLatest
}

alias Enum
Expand Down Expand Up @@ -333,6 +334,41 @@ defmodule Observables.Obs do
end, pid}
end

@doc """
Given two observables, merges them together and always merges the last result of on of both, and
reuses the last value from the other.
E.g.
1 -> 2 ------> 3
A -----> B ------> C
=
1A --> 2A -> 2B -> 3B -> 3C
More information: http://reactivex.io/documentation/operators/combinelatest.html
"""
def combinelatest(l, r) do
# We tag each value from left and right with their respective label.
{f_l, _pid_l} =
l
|> map(fn v -> {:left, v} end)

{f_r, _pid_r} =
r
|> map(fn v -> {:right, v} end)

# Start our zipper observable.
{:ok, pid} = GenObservable.start(CombineLatest, [])

# Make left and right send to us.
f_l.(pid)
f_r.(pid)

# Creat the continuation.
{fn observer ->
GenObservable.send_to(pid, observer)
end, pid}
end


# TERMINATORS ##################################################################

Expand Down
42 changes: 42 additions & 0 deletions lib/observables/combineLatest.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Observables.CombineLatest do
@moduledoc false
use Observables.GenObservable

def init([]) do
Logger.debug("CombineLatest: #{inspect(self())}")
{:ok, {:left, nil, :right, nil}}
end

def handle_event(value, state) do
case {value, state} do
# No values at all, and got a left.
{{:left, vl}, {:left, nil, :right, nil}} ->
{:novalue, {:left, vl, :right, nil}}

# No values yet, and got a right.
{{:right, vr}, {:left, nil, :right, nil}} ->
{:novalue, {:left, nil, :right, vr}}

# Already have left, now got right.
{{:right, vr}, {:left, vl, :right, nil}} ->
{:value, {vl, vr}, {:left, nil, :right, nil}}

# Already have a left, and received a left.
{{:left, vl}, {:left, _vl, :right, nil}} ->
{:novalue, {:left, vl, :right, nil}}

# Already have a right value, and now received left.
{{:left, vl}, {:left, nil, :right, vr}} ->
{:value, {vl, vr}, {:left, nil, :right, nil}}

# Already have a right, and received a right.
{{:right, vr}, {:left, nil, :right, _vr}} ->
{:novalue, {:left, nil, :right, vr}}
end
end

def handle_done(_pid, _state) do
Logger.debug("#{inspect(self())}: zip has one dead dependency, stopping.")
{:ok, :done}
end
end
66 changes: 41 additions & 25 deletions test/observables_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ defmodule ObservablesTest do
require Logger

def sleep(ms) do

receive do
:nevergonnahappen -> :ok
after
ms -> :ok
end
end


test "Test from pid" do
testproc = self()
{:ok, pid1} = GenObservable.spawn_supervised(Subject, 0)
Expand Down Expand Up @@ -307,9 +305,13 @@ defmodule ObservablesTest do
test "repeat" do
testproc = self()

Obs.repeat(fn ->
send(testproc, :hello)
end, [interval: 500, times: 5])
Obs.repeat(
fn ->
send(testproc, :hello)
end,
interval: 500,
times: 5
)

[:hello, :hello, :hello, :hello, :hello]
|> Enum.map(fn x ->
Expand Down Expand Up @@ -342,16 +344,17 @@ defmodule ObservablesTest do
Obs.range(start, tend, 100)
|> Obs.buffer(size)
|> Obs.each(fn v ->
Logger.debug "Sending #{inspect v}"
Logger.debug("Sending #{inspect(v)}")
send(testproc, v)
end)

# Receive all the values.
Enum.chunk_every(1..100, size)
|> Enum.map(fn list ->
Logger.debug "Waiting for #{inspect list}"
Logger.debug("Waiting for #{inspect(list)}")

receive do
^list -> Logger.debug "Got list #{inspect list}"
^list -> Logger.debug("Got list #{inspect(list)}")
end
end)

Expand All @@ -376,36 +379,37 @@ defmodule ObservablesTest do

# Create a range.
Obs.range(start, tend, 100)
|> Obs.chunk(size * 100 + 10) # We should consume 5 values each time.
# We should consume 5 values each time.
|> Obs.chunk(size * 100 + 10)
|> Obs.each(fn v ->
Logger.debug "Sending #{inspect v, charlists: :as_lists}"
Logger.debug("Sending #{inspect(v, charlists: :as_lists)}")
send(testproc, v)
end)

# Receive all the values.
Enum.chunk_every(1..50, size)
|> Enum.map(fn list ->
Logger.debug "Waiting for #{inspect list, charlists: :as_lists}"
Logger.debug("Waiting for #{inspect(list, charlists: :as_lists)}")

receive do
^list -> Logger.debug "Got list #{inspect list, charlists: :as_lists}"
^list -> Logger.debug("Got list #{inspect(list, charlists: :as_lists)}")
after
10000 ->
assert "Did not receive list in time: #{inspect list, charlists: :as_lists}" == ""
assert "Did not receive list in time: #{inspect(list, charlists: :as_lists)}" == ""
end
end)

# Receive no other values.
receive do
x ->
Logger.error("Received another value, did not want")
assert "received another value: #{inspect x, charlists: :as_lists} " == ""
assert "received another value: #{inspect(x, charlists: :as_lists)} " == ""
after
1000 ->
:ok
end
end


@tag :scan
test "scan" do
testproc = self()
Expand All @@ -415,28 +419,28 @@ defmodule ObservablesTest do

# Create a range.
Obs.range(start, tend, 100)
|> Obs.scan(fn (x,y) -> x + y end)
|> Obs.scan(fn x, y -> x + y end)
|> Obs.each(fn v ->
Logger.debug "Sending #{inspect v, charlists: :as_lists}"
Logger.debug("Sending #{inspect(v, charlists: :as_lists)}")
send(testproc, v)
end)

# Receive all the values.
Enum.scan(1..50, fn(x,y) -> x + y end)
Enum.scan(1..50, fn x, y -> x + y end)
|> Enum.map(fn v ->
receive do
^v -> :ok
after
10000 ->
assert "Did not receive item in time: #{inspect v}" == ""
assert "Did not receive item in time: #{inspect(v)}" == ""
end
end)

# Receive no other values.
receive do
x ->
Logger.error("Received another value, did not want")
assert "received another value: #{inspect x, charlists: :as_lists} " == ""
assert "received another value: #{inspect(x, charlists: :as_lists)} " == ""
after
1000 ->
:ok
Expand All @@ -447,31 +451,43 @@ defmodule ObservablesTest do
test "Take" do
testproc = self()

xs = [1,2,3,4,5,6,7,8,9,0]
xs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]

xs
|> Obs.from_enum(100)
|> Obs.take(4)
|> Obs.map(fn v -> send(testproc, v) end)

[1,2,3,4]
[1, 2, 3, 4]
|> Enum.map(fn x ->
receive do
^x -> :ok
end
end)

# Receive no other values.
receive do
x ->
Logger.error("Received another value, did not want")
assert "received another value: #{inspect x, charlists: :as_lists} " == ""
assert "received another value: #{inspect(x, charlists: :as_lists)} " == ""
after
1000 ->
:ok
end

end

@tag :combinelatest
test "Combine Latest" do
testproc = self()

Obs.combinelatest(Obs.range(1, 10, 1000), Obs.range(11, 20, 2000))
|> Obs.map(fn v -> send(testproc, v) end)

[1..100]
|> Enum.map(fn x ->
receive do
^x -> Logger.debug("Got #{inspect(x)}")
end
end)
end
end

0 comments on commit dd48b28

Please sign in to comment.