Skip to content

Commit

Permalink
take
Browse files Browse the repository at this point in the history
  • Loading branch information
m1dnight committed Apr 29, 2018
1 parent 19c1512 commit 64040eb
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 2 deletions.
5 changes: 5 additions & 0 deletions lib/behavior/gen_observable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ defmodule Observables.GenObservable do
GenServer.cast(self(), {:event, value})
{:noreply, %{state | state: s}}

{:done, value, s} ->
cast(self(), {:notify_all, value}) # We are done, but produced a final value.
cast(self(), :stop)
{:noreply, %{state | state: s}}

{:done, s} ->
cast(self(), :stop)
{:noreply, %{state | state: s}}
Expand Down
21 changes: 20 additions & 1 deletion lib/obs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ defmodule Observables.Obs do
StartsWith,
Buffer,
Chunk,
Scan
Scan,
Take
}

alias Enum
Expand Down Expand Up @@ -315,6 +316,24 @@ defmodule Observables.Obs do
GenObservable.send_to(pid, observer)
end, pid}
end

@doc """
Takes the n first element of the observable, and then stops.
More information: http://reactivex.io/documentation/operators/take.html
"""
def take({observable_fn, parent_pid}, n) do
{:ok, pid} = GenObservable.start_link(Take, [n])

observable_fn.(pid)

# Creat the continuation.
{fn observer ->
GenObservable.send_to(pid, observer)
end, pid}
end


# TERMINATORS ##################################################################

@doc """
Expand Down
24 changes: 24 additions & 0 deletions lib/observables/take.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Observables.Take do
@moduledoc false
use Observables.GenObservable

def init([n]) do
Logger.debug("Take: #{inspect(self())}")
# We don't keep state in merge.
{:ok, %{:n => n}}
end

def handle_event(v, state = %{:n => n}) do

case n do
n when n <= 0 -> {:done, state}
1 -> {:done, v, state}
n -> {:value, v, %{:n => (n - 1)}}
end
end

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
end
end
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%{
"earmark": {:hex, :earmark, "1.2.4", "99b637c62a4d65a20a9fb674b8cffb8baa771c04605a80c911c4418c69b75439", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
}
31 changes: 31 additions & 0 deletions test/observables_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -443,4 +443,35 @@ defmodule ObservablesTest do
end
end

@tag :take
test "Take" do
testproc = self()

xs = [1,2,3,4,5,6,7,8,9,0]

xs
|> Obs.from_enum(100)
|> Obs.take(4)
|> Obs.map(fn v -> send(testproc, v) end)

[1,2,3,4]
|> Enum.map(fn x ->
receive do
^x -> :ok
end
end)

# Receive no other values.
receive do
x ->
Logger.error("Received another value, did not want")
assert "received another value: #{inspect x, charlists: :as_lists} " == ""
after
1000 ->
:ok
end

end


end

0 comments on commit 64040eb

Please sign in to comment.