diff --git a/lib/behavior/gen_observable.ex b/lib/behavior/gen_observable.ex index bd33e01..e59f733 100644 --- a/lib/behavior/gen_observable.ex +++ b/lib/behavior/gen_observable.ex @@ -75,10 +75,16 @@ defmodule Observables.GenObservable do ################ def handle_cast({:send_to, pid}, state) do + # Add the dependency to our downstreams. {:noreply, %{state | listeners: [pid | state.listeners]}} end def handle_cast({:listen_to, pid}, state) do + # Monitor the upstream. + Process.monitor(pid) + Logger.debug("#{inspect(self())} monitoring upstream #{inspect(pid)}") + + # Add the process to our upstreams. {:noreply, %{state | listeningto: [pid | state.listeningto]}} end @@ -205,7 +211,28 @@ defmodule Observables.GenObservable do {:noreply, state} end - def terminate(_reason, _state) do + def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + cond do + Enum.member?(state.listeners, pid) -> + Logger.debug("#{inspect(self())} Downstream operator died: #{inspect(pid)}") + {:noreply, state} + + Enum.member?(state.listeningto, pid) -> + Logger.debug("#{inspect(self())} Upstream operator died: #{inspect(pid)}") + cast(self(), {:dependency_stopping, pid}) + {:noreply, state} + + true -> + Logger.error( + "#{inspect(self())} Operator died, but not an upstream or downstream? This should not happen." + ) + + {:noreply, state} + end + end + + def terminate(reason, _state) do + Logger.debug "Terminating: #{inspect reason}" :ok end diff --git a/lib/obs.ex b/lib/obs.ex index 197cd70..52b44d0 100644 --- a/lib/obs.ex +++ b/lib/obs.ex @@ -115,15 +115,15 @@ defmodule Observables.Obs do More information: http://reactivex.io/documentation/operators/merge.html """ def merge(left, right) do - {:ok, pid} = GenObservable.start_link(Merge, []) + {:ok, pid} = GenObservable.start(Merge, []) GenObservable.send_to(left, pid) GenObservable.send_to(right, pid) # Creat the continuation. pid - end - + end + @doc """ Given an observable which emits observables, mergeContinuous will merge all of them and output their values. @@ -131,7 +131,7 @@ defmodule Observables.Obs do More information: http://reactivex.io/documentation/operators/merge.html """ def mergeContinuous(observable) do - {:ok, pid} = GenObservable.start_link(MergeContinuous, []) + {:ok, pid} = GenObservable.start(MergeContinuous, []) GenObservable.send_to(observable, pid) @@ -147,7 +147,7 @@ defmodule Observables.Obs do More information: http://reactivex.io/documentation/operators/merge.html """ def mergen(observables) do - {:ok, pid} = GenObservable.start_link(Merge, []) + {:ok, pid} = GenObservable.start(Merge, []) observables |> Enum.map(fn o -> @@ -164,7 +164,7 @@ defmodule Observables.Obs do More information: http://reactivex.io/documentation/operators/map.html """ def map(observable, f) do - {:ok, pid} = GenObservable.start_link(Map, [f]) + {:ok, pid} = GenObservable.start(Map, [f]) GenObservable.send_to(observable, pid) @@ -182,7 +182,7 @@ defmodule Observables.Obs do More information: http://reactivex.io/documentation/operators/distinct.html """ def distinct(observable, f \\ fn x, y -> x == y end) do - {:ok, pid} = GenObservable.start_link(Distinct, [f]) + {:ok, pid} = GenObservable.start(Distinct, [f]) GenObservable.send_to(observable, pid) @@ -203,7 +203,7 @@ defmodule Observables.Obs do More information: http://reactivex.io/documentation/operators/distinct.html and http://rxmarbles.com/#distinctUntilChanged """ def distinctuntilchanged(observable, f \\ fn x, y -> x == y end) do - {:ok, pid} = GenObservable.start_link(DistinctUntilChanged, [f]) + {:ok, pid} = GenObservable.start(DistinctUntilChanged, [f]) GenObservable.send_to(observable, pid) @@ -217,7 +217,7 @@ defmodule Observables.Obs do More information: http://reactivex.io/documentation/operators/subscribe.html """ def each(observable, f) do - {:ok, pid} = GenObservable.start_link(Each, [f]) + {:ok, pid} = GenObservable.start(Each, [f]) GenObservable.send_to(observable, pid) @@ -234,7 +234,7 @@ defmodule Observables.Obs do More information: http://reactivex.io/documentation/operators/filter.html """ def filter(observable, f) do - {:ok, pid} = GenObservable.start_link(Filter, [f]) + {:ok, pid} = GenObservable.start(Filter, [f]) GenObservable.send_to(observable, pid) # Creat the continuation. pid @@ -247,7 +247,7 @@ defmodule Observables.Obs do """ def starts_with(observable, start_vs) do # Start the producer/consumer server. - {:ok, pid} = GenObservable.start_link(StartsWith, []) + {:ok, pid} = GenObservable.start(StartsWith, []) # After the subscription has been made, send all the start values to the producers # so he can start pushing them out to our dependees. @@ -272,7 +272,7 @@ defmodule Observables.Obs do """ def switch(observable) do # Start the producer/consumer server. - {:ok, pid} = GenObservable.start_link(Switch, []) + {:ok, pid} = GenObservable.start(Switch, []) GenObservable.send_to(observable, pid) @@ -291,7 +291,7 @@ defmodule Observables.Obs do Source: http://reactivex.io/documentation/operators/buffer.html """ def chunk(observable, interval) do - {:ok, pid} = GenObservable.start_link(Chunk, [interval]) + {:ok, pid} = GenObservable.start(Chunk, [interval]) GenObservable.send_to(observable, pid) @@ -308,7 +308,7 @@ defmodule Observables.Obs do Source: http://reactivex.io/documentation/operators/buffer.html """ def buffer(observable, size) do - {:ok, pid} = GenObservable.start_link(Buffer, [size]) + {:ok, pid} = GenObservable.start(Buffer, [size]) GenObservable.send_to(observable, pid) @@ -326,7 +326,7 @@ defmodule Observables.Obs do More information: http://reactivex.io/documentation/operators/scan.html """ def scan(observable, f, default \\ nil) do - {:ok, pid} = GenObservable.start_link(Scan, [f, default]) + {:ok, pid} = GenObservable.start(Scan, [f, default]) GenObservable.send_to(observable, pid) @@ -340,7 +340,7 @@ defmodule Observables.Obs do More information: http://reactivex.io/documentation/operators/take.html """ def take(observable, n) do - {:ok, pid} = GenObservable.start_link(Take, [n]) + {:ok, pid} = GenObservable.start(Take, [n]) GenObservable.send_to(observable, pid) @@ -466,21 +466,21 @@ defmodule Observables.Obs do end @doc """ - Aggregates all values produces by its dependency, and stops and returns them in a list + Aggregates all values produces by its dependency, and stops and returns them in a list as soon as the dependency has signaled it's stopping. Do not use this on an infinite stream, for obvious reasons. """ def to_list(observable) do ref = :erlang.make_ref() - {:ok, pid} = GenObservable.start_link(ToList, [{self(), ref}]) + {:ok, pid} = GenObservable.start(ToList, [{self(), ref}]) GenObservable.send_to(observable, pid) - # Our process is now buffering. + # Our process is now buffering. # When the dependency stops, we will stop as well, and we need to getg receive do {^ref, vs} -> vs end -end + end end