Skip to content

Commit

Permalink
delay observable - beta
Browse files Browse the repository at this point in the history
  • Loading branch information
m1dnight committed May 19, 2018
1 parent 4d07921 commit 8aa7add
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 3 deletions.
18 changes: 17 additions & 1 deletion lib/obs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ defmodule Observables.Obs do
Scan,
Take,
CombineLatest,
CombineLatestSilent
CombineLatestSilent,
Delay
}

alias Enum
Expand Down Expand Up @@ -420,6 +421,21 @@ defmodule Observables.Obs do
end, pid}
end

@doc """
Delays the given observable for n miliseconds.
More information: http://reactivex.io/documentation/operators/delay.html
"""
def delay({observable_fn, _parent_pid}, n) do
{:ok, pid} = GenObservable.start(Delay, [n])

observable_fn.(pid)

{fn observer ->
GenObservable.send_to(pid, observer)
end, pid}
end

# TERMINATORS ##################################################################

@doc """
Expand Down
25 changes: 25 additions & 0 deletions lib/observables/delay.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule Observables.Operator.Delay do
@moduledoc false
use Observables.GenObservable

def init([delay]) do
{:ok, %{:delay => delay}}
end

def handle_event(v, state = %{:delay => delay}) do
Logger.warn "Got this: #{inspect v}"
case v do
{:delayed, x} ->
{:value, x, state}
v ->
Process.send_after(self(), {:event, {:delayed, v}}, delay)
{:novalue, state}
end

end

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
end
end
36 changes: 36 additions & 0 deletions test/delay_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule DelayTest do
use ExUnit.Case
alias Observables.{Obs}
require Logger

@tag :delay
test "delay" do
Code.load_file("test/util.ex")
testproc = self()

# Create a range.
Obs.range(1, 5, 100)
|> Obs.delay(1000)
|> Obs.each(fn v ->
Logger.error "Got #{v}"
send(testproc, v)
end)

# Receive no other values.
# receive do
# _x ->
# Logger.error("Received another value, did not want")
# assert "received another value:" == ""
# after
# 5000 ->
# :ok
# end

# 1..5
# |> Enum.map(fn v ->
# assert_receive(^v, 5000, "did not get this message #{v}")
# end)
Test.Util.sleep(100000)

end
end
4 changes: 2 additions & 2 deletions test/unsubscribe_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ defmodule UnsubscribeTest do
@tag :unsubscribe
test "ubsubscribe" do
Code.load_file("test/util.ex")

testproc = self()

Obs.range(1, :infinity, 500)
|> Obs.map(fn v ->
if v > 5 do
Observables.Observable.unsubscribe()
else
else
send(testproc, v)
end
end)
Expand Down

0 comments on commit 8aa7add

Please sign in to comment.