From 16f5e54b66615443b27653006fb68db43d32b57d Mon Sep 17 00:00:00 2001 From: Christophe De Troyer Date: Mon, 23 Apr 2018 22:08:33 +0200 Subject: [PATCH] moves switch to seperate file --- lib/obs.ex | 25 +------------------------ lib/observables/switch.ex | 35 ++++++++++++++++++++++------------- test/observables_test.exs | 1 + 3 files changed, 24 insertions(+), 37 deletions(-) diff --git a/lib/obs.ex b/lib/obs.ex index a8d52e5..685d057 100644 --- a/lib/obs.ex +++ b/lib/obs.ex @@ -158,31 +158,8 @@ defmodule Observables.Obs do end def switch({observable_fn, _parent_pid}) do - action = fn new_obs, s -> - switcher = self() - - # Unsubscribe to the previous observer we were forwarding. - if s != nil do - {:forwarder, forwarder, :sender, observable} = s - {_f, pidf} = forwarder - GenObservable.stop_send_to(pidf, self()) - {_f, pids} = observable - GenObservable.stop_send_to(pids, pidf) - end - - # We subscribe to this observable. - # {_, obsvpid} = observable - # GenObservable.send_to(obsvpid, self()) - - forwarder = - new_obs - |> map(fn v -> GenObservable.send_event(switcher, {:forward, v}) end) - - {:novalue, {:forwarder, forwarder, :sender, new_obs}} - end - # Start the producer/consumer server. - {:ok, pid} = GenObservable.start_link(Switch, [action, nil]) + {:ok, pid} = GenObservable.start_link(Switch, []) observable_fn.(pid) diff --git a/lib/observables/switch.ex b/lib/observables/switch.ex index 1280046..bedbc0e 100644 --- a/lib/observables/switch.ex +++ b/lib/observables/switch.ex @@ -3,29 +3,38 @@ defmodule Observables.Switch do A GenServer template for a "singleton" process. """ use Observables.GenObservable + alias Observables.GenObservable + alias Observables.Obs - def init([action, state]) do - {:ok, %{:state => state, :action => action}} + def init([]) do + {:ok, nil} end def handle_event({:forward, v}, state) do {:value, v, state} end - def handle_event(e, %{:state => s, :action => a}) do - case a.(e, s) do - {:value, v, new_s} -> - {:value, v, %{:state => new_s, :action => a}} + def handle_event(new_obs, s) do + switcher = self() - {:novalue, new_s} -> - {:novalue, %{:state => new_s, :action => a}} + # Unsubscribe to the previous observer we were forwarding. + if s != nil do + {:forwarder, forwarder, :sender, observable} = s + {_f, pidf} = forwarder + GenObservable.stop_send_to(pidf, self()) + {_f, pids} = observable + GenObservable.stop_send_to(pids, pidf) + end - {:done, new_s} -> - {:done, new_s} + # We subscribe to this observable. + # {_, obsvpid} = observable + # GenObservable.send_to(obsvpid, self()) - _ -> - Logger.error("Invalid return value from Observable action!") - end + forwarder = + new_obs + |> Obs.map(fn v -> GenObservable.send_event(switcher, {:forward, v}) end) + + {:novalue, {:forwarder, forwarder, :sender, new_obs}} end def handle_done(_pid, state) do diff --git a/test/observables_test.exs b/test/observables_test.exs index d24d072..6c8f287 100644 --- a/test/observables_test.exs +++ b/test/observables_test.exs @@ -215,6 +215,7 @@ defmodule ObservablesTest do end end + @tag :switch test "switch" do testproc = self()