Skip to content

Commit

Permalink
moves switch to seperate file
Browse files Browse the repository at this point in the history
  • Loading branch information
cdetroye committed Apr 23, 2018
1 parent 2e7031c commit 16f5e54
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 37 deletions.
25 changes: 1 addition & 24 deletions lib/obs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -158,31 +158,8 @@ defmodule Observables.Obs do
end

def switch({observable_fn, _parent_pid}) do
action = fn new_obs, s ->
switcher = self()

# Unsubscribe to the previous observer we were forwarding.
if s != nil do
{:forwarder, forwarder, :sender, observable} = s
{_f, pidf} = forwarder
GenObservable.stop_send_to(pidf, self())
{_f, pids} = observable
GenObservable.stop_send_to(pids, pidf)
end

# We subscribe to this observable.
# {_, obsvpid} = observable
# GenObservable.send_to(obsvpid, self())

forwarder =
new_obs
|> map(fn v -> GenObservable.send_event(switcher, {:forward, v}) end)

{:novalue, {:forwarder, forwarder, :sender, new_obs}}
end

# Start the producer/consumer server.
{:ok, pid} = GenObservable.start_link(Switch, [action, nil])
{:ok, pid} = GenObservable.start_link(Switch, [])

observable_fn.(pid)

Expand Down
35 changes: 22 additions & 13 deletions lib/observables/switch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,38 @@ defmodule Observables.Switch do
A GenServer template for a "singleton" process.
"""
use Observables.GenObservable
alias Observables.GenObservable
alias Observables.Obs

def init([action, state]) do
{:ok, %{:state => state, :action => action}}
def init([]) do
{:ok, nil}
end

def handle_event({:forward, v}, state) do
{:value, v, state}
end

def handle_event(e, %{:state => s, :action => a}) do
case a.(e, s) do
{:value, v, new_s} ->
{:value, v, %{:state => new_s, :action => a}}
def handle_event(new_obs, s) do
switcher = self()

{:novalue, new_s} ->
{:novalue, %{:state => new_s, :action => a}}
# Unsubscribe to the previous observer we were forwarding.
if s != nil do
{:forwarder, forwarder, :sender, observable} = s
{_f, pidf} = forwarder
GenObservable.stop_send_to(pidf, self())
{_f, pids} = observable
GenObservable.stop_send_to(pids, pidf)
end

{:done, new_s} ->
{:done, new_s}
# We subscribe to this observable.
# {_, obsvpid} = observable
# GenObservable.send_to(obsvpid, self())

_ ->
Logger.error("Invalid return value from Observable action!")
end
forwarder =
new_obs
|> Obs.map(fn v -> GenObservable.send_event(switcher, {:forward, v}) end)

{:novalue, {:forwarder, forwarder, :sender, new_obs}}
end

def handle_done(_pid, state) do
Expand Down
1 change: 1 addition & 0 deletions test/observables_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ defmodule ObservablesTest do
end
end

@tag :switch
test "switch" do
testproc = self()

Expand Down

0 comments on commit 16f5e54

Please sign in to comment.