Skip to content

Commit

Permalink
buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
cdetroye committed Apr 27, 2018
1 parent 91ac3d0 commit fb31abe
Show file tree
Hide file tree
Showing 5 changed files with 461 additions and 2 deletions.
6 changes: 6 additions & 0 deletions lib/behavior/gen_observable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ defmodule Observables.GenObservable do
cast(self(), :stop)
{:noreply, state}

{:ok, :done, {:value, value}} ->
Logger.debug("#{inspect(self())} stopping.")
cast(self(), {:notify_all, value})
cast(self(), :stop)
{:noreply, state}

{:ok, :continue} ->
Logger.debug("#{inspect(self())} going on.")
# Remove observee.
Expand Down
22 changes: 20 additions & 2 deletions lib/obs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ defmodule Observables.Obs do
Distinct,
Each,
Filter,
StartsWith
StartsWith,
Buffer
}

alias Enum
Expand All @@ -20,7 +21,7 @@ defmodule Observables.Obs do
# GENERATORS ###################################################################

@doc """
from_pid/1 can be considered to be a subject. Any process that implements
from_pid/1 can be considered to be a subject. Any process that implements
"""
def from_pid(producer) do
{fn consumer ->
Expand Down Expand Up @@ -184,6 +185,23 @@ defmodule Observables.Obs do
end, pid}
end

@doc """
Periodically gather items emitted by an Observable into bundles of size `size` and emit
these bundles rather than emitting the items one at a time.
Source: http://reactivex.io/documentation/operators/buffer.html
"""
def buffer({observable_fn, _parent_pid}, size) do
{:ok, pid} = GenObservable.start_link(Buffer, [size])

observable_fn.(pid)

# Create the continuation.
{fn observer ->
GenObservable.send_to(pid, observer)
end, pid}
end

# TERMINATORS ##################################################################

def print({observable_fn, parent_pid}) do
Expand Down
31 changes: 31 additions & 0 deletions lib/observables/buffer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Observables.Buffer do
@moduledoc false
use Observables.GenObservable

def init([size]) do
Logger.debug("Buffer: #{inspect(self())}")
# We don't keep state in merge.
{:ok, %{:size => size, :buffer => []}}
end

def handle_event(v, %{:size => size, :buffer => bs}) do
count = Enum.count(bs)

case {bs, size, count} do
{b, s, c} when s <= c ->
{:value, b, %{:size => s, :buffer => [v]}}

{b, s, c} when s > c ->
{:novalue, %{:size => s, :buffer => b ++ [v]}}
# {[], max_size, c} ->
# {:novalue, %{:max_size => max_size, :buffer => [v]}}
end
end

def handle_done(pid, %{:size => size, :buffer => bs}) do

# We are buffering, and if our dependency stops we need to flush our buffer or we are losing values.
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :done, {:value, bs}}
end
end
Loading

0 comments on commit fb31abe

Please sign in to comment.