Skip to content

Commit

Permalink
Monitor downstreams and call stop callback in case of termination
Browse files Browse the repository at this point in the history
  • Loading branch information
m1dnight committed Mar 15, 2021
1 parent cf5b723 commit 683c13a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
29 changes: 28 additions & 1 deletion lib/behavior/gen_observable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,16 @@ defmodule Observables.GenObservable do
################

def handle_cast({:send_to, pid}, state) do
# Add the dependency to our downstreams.
{:noreply, %{state | listeners: [pid | state.listeners]}}
end

def handle_cast({:listen_to, pid}, state) do
# Monitor the upstream.
Process.monitor(pid)
Logger.debug("#{inspect(self())} monitoring upstream #{inspect(pid)}")

# Add the process to our upstreams.
{:noreply, %{state | listeningto: [pid | state.listeningto]}}
end

Expand Down Expand Up @@ -205,7 +211,28 @@ defmodule Observables.GenObservable do
{:noreply, state}
end

def terminate(_reason, _state) do
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
cond do
Enum.member?(state.listeners, pid) ->
Logger.debug("#{inspect(self())} Downstream operator died: #{inspect(pid)}")
{:noreply, state}

Enum.member?(state.listeningto, pid) ->
Logger.debug("#{inspect(self())} Upstream operator died: #{inspect(pid)}")
cast(self(), {:dependency_stopping, pid})
{:noreply, state}

true ->
Logger.error(
"#{inspect(self())} Operator died, but not an upstream or downstream? This should not happen."
)

{:noreply, state}
end
end

def terminate(reason, _state) do
Logger.debug "Terminating: #{inspect reason}"
:ok
end

Expand Down
40 changes: 20 additions & 20 deletions lib/obs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,23 @@ defmodule Observables.Obs do
More information: http://reactivex.io/documentation/operators/merge.html
"""
def merge(left, right) do
{:ok, pid} = GenObservable.start_link(Merge, [])
{:ok, pid} = GenObservable.start(Merge, [])

GenObservable.send_to(left, pid)
GenObservable.send_to(right, pid)

# Creat the continuation.
pid
end
end

@doc """
Given an observable which emits observables, mergeContinuous will
merge all of them and output their values.
More information: http://reactivex.io/documentation/operators/merge.html
"""
def mergeContinuous(observable) do
{:ok, pid} = GenObservable.start_link(MergeContinuous, [])
{:ok, pid} = GenObservable.start(MergeContinuous, [])

GenObservable.send_to(observable, pid)

Expand All @@ -147,7 +147,7 @@ defmodule Observables.Obs do
More information: http://reactivex.io/documentation/operators/merge.html
"""
def mergen(observables) do
{:ok, pid} = GenObservable.start_link(Merge, [])
{:ok, pid} = GenObservable.start(Merge, [])

observables
|> Enum.map(fn o ->
Expand All @@ -164,7 +164,7 @@ defmodule Observables.Obs do
More information: http://reactivex.io/documentation/operators/map.html
"""
def map(observable, f) do
{:ok, pid} = GenObservable.start_link(Map, [f])
{:ok, pid} = GenObservable.start(Map, [f])

GenObservable.send_to(observable, pid)

Expand All @@ -182,7 +182,7 @@ defmodule Observables.Obs do
More information: http://reactivex.io/documentation/operators/distinct.html
"""
def distinct(observable, f \\ fn x, y -> x == y end) do
{:ok, pid} = GenObservable.start_link(Distinct, [f])
{:ok, pid} = GenObservable.start(Distinct, [f])

GenObservable.send_to(observable, pid)

Expand All @@ -203,7 +203,7 @@ defmodule Observables.Obs do
More information: http://reactivex.io/documentation/operators/distinct.html and http://rxmarbles.com/#distinctUntilChanged
"""
def distinctuntilchanged(observable, f \\ fn x, y -> x == y end) do
{:ok, pid} = GenObservable.start_link(DistinctUntilChanged, [f])
{:ok, pid} = GenObservable.start(DistinctUntilChanged, [f])

GenObservable.send_to(observable, pid)

Expand All @@ -217,7 +217,7 @@ defmodule Observables.Obs do
More information: http://reactivex.io/documentation/operators/subscribe.html
"""
def each(observable, f) do
{:ok, pid} = GenObservable.start_link(Each, [f])
{:ok, pid} = GenObservable.start(Each, [f])

GenObservable.send_to(observable, pid)

Expand All @@ -234,7 +234,7 @@ defmodule Observables.Obs do
More information: http://reactivex.io/documentation/operators/filter.html
"""
def filter(observable, f) do
{:ok, pid} = GenObservable.start_link(Filter, [f])
{:ok, pid} = GenObservable.start(Filter, [f])
GenObservable.send_to(observable, pid)
# Creat the continuation.
pid
Expand All @@ -247,7 +247,7 @@ defmodule Observables.Obs do
"""
def starts_with(observable, start_vs) do
# Start the producer/consumer server.
{:ok, pid} = GenObservable.start_link(StartsWith, [])
{:ok, pid} = GenObservable.start(StartsWith, [])

# After the subscription has been made, send all the start values to the producers
# so he can start pushing them out to our dependees.
Expand All @@ -272,7 +272,7 @@ defmodule Observables.Obs do
"""
def switch(observable) do
# Start the producer/consumer server.
{:ok, pid} = GenObservable.start_link(Switch, [])
{:ok, pid} = GenObservable.start(Switch, [])

GenObservable.send_to(observable, pid)

Expand All @@ -291,7 +291,7 @@ defmodule Observables.Obs do
Source: http://reactivex.io/documentation/operators/buffer.html
"""
def chunk(observable, interval) do
{:ok, pid} = GenObservable.start_link(Chunk, [interval])
{:ok, pid} = GenObservable.start(Chunk, [interval])

GenObservable.send_to(observable, pid)

Expand All @@ -308,7 +308,7 @@ defmodule Observables.Obs do
Source: http://reactivex.io/documentation/operators/buffer.html
"""
def buffer(observable, size) do
{:ok, pid} = GenObservable.start_link(Buffer, [size])
{:ok, pid} = GenObservable.start(Buffer, [size])

GenObservable.send_to(observable, pid)

Expand All @@ -326,7 +326,7 @@ defmodule Observables.Obs do
More information: http://reactivex.io/documentation/operators/scan.html
"""
def scan(observable, f, default \\ nil) do
{:ok, pid} = GenObservable.start_link(Scan, [f, default])
{:ok, pid} = GenObservable.start(Scan, [f, default])

GenObservable.send_to(observable, pid)

Expand All @@ -340,7 +340,7 @@ defmodule Observables.Obs do
More information: http://reactivex.io/documentation/operators/take.html
"""
def take(observable, n) do
{:ok, pid} = GenObservable.start_link(Take, [n])
{:ok, pid} = GenObservable.start(Take, [n])

GenObservable.send_to(observable, pid)

Expand Down Expand Up @@ -466,21 +466,21 @@ defmodule Observables.Obs do
end

@doc """
Aggregates all values produces by its dependency, and stops and returns them in a list
Aggregates all values produces by its dependency, and stops and returns them in a list
as soon as the dependency has signaled it's stopping.
Do not use this on an infinite stream, for obvious reasons.
"""
def to_list(observable) do
ref = :erlang.make_ref()
{:ok, pid} = GenObservable.start_link(ToList, [{self(), ref}])
{:ok, pid} = GenObservable.start(ToList, [{self(), ref}])

GenObservable.send_to(observable, pid)

# Our process is now buffering.
# Our process is now buffering.
# When the dependency stops, we will stop as well, and we need to getg
receive do
{^ref, vs} ->
vs
end
end
end
end

0 comments on commit 683c13a

Please sign in to comment.