Skip to content

Commit

Permalink
make observables stop if the upstream stops
Browse files Browse the repository at this point in the history
  • Loading branch information
m1dnight committed Mar 11, 2021
1 parent d4aef85 commit cf5b723
Show file tree
Hide file tree
Showing 14 changed files with 22 additions and 17 deletions.
10 changes: 5 additions & 5 deletions lib/behavior/gen_observable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ defmodule Observables.GenObservable do
state.listeningto
|> Enum.filter(fn sub -> sub != pid end)

# if count(new_subs) == 0 do
# Logger.warn("#{inspect(self())} all dependencies done, stopping ourselves.")
# cast(self(), :stop)
# end
if count(new_subs) == 0 do
Logger.warn("#{inspect(self())} all dependencies done, stopping ourselves.")
cast(self(), :stop)
end

{:noreply, %{state | listeningto: new_subs}}
end
Expand Down Expand Up @@ -248,7 +248,7 @@ defmodule Observables.GenObservable do
@doc """
Makes an observable stop and gracefully shut down.
"""
def stop(producer, reason \\ :normal) do
def stop(producer, _reason \\ :normal) do
# cast(producer, {:stop, reason})
cast(producer, :stop)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/observables/delay.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ defmodule Observables.Operator.Delay do

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
{:ok, :done}
end
end
2 changes: 1 addition & 1 deletion lib/observables/distinct.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ defmodule Observables.Operator.Distinct do

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
{:ok, :done}
end
end
2 changes: 1 addition & 1 deletion lib/observables/distinct_until_changed.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ defmodule Observables.Operator.DistinctUntilChanged do

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
{:ok, :done}
end
end
2 changes: 1 addition & 1 deletion lib/observables/each.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ defmodule Observables.Operator.Each do

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
{:ok, :done}
end
end
2 changes: 1 addition & 1 deletion lib/observables/filter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ defmodule Observables.Operator.Filter do

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
{:ok, :done}
end
end
2 changes: 1 addition & 1 deletion lib/observables/from_enum.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ defmodule Observables.Operator.FromEnum do

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
{:ok, :done}
end
end
2 changes: 1 addition & 1 deletion lib/observables/map.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ defmodule Observables.Operator.Map do

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
{:ok, :done}
end
end
1 change: 1 addition & 0 deletions lib/observables/merge_continuous.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule Observables.Operator.MergeContinuous do
end

def handle_done(_pid, state) do
# TODO
{:ok, state}
end
end
2 changes: 1 addition & 1 deletion lib/observables/range.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ defmodule Observables.Operator.Range do

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
{:ok, :done}
end
end
2 changes: 1 addition & 1 deletion lib/observables/starts_with.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ defmodule Observables.Operator.StartsWith do

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
{:ok, :done}
end
end
2 changes: 1 addition & 1 deletion lib/observables/take.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ defmodule Observables.Operator.Take do

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
{:ok, :done}
end
end
4 changes: 4 additions & 0 deletions lib/subject.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ defmodule Observables.Subject do
def next(pid, v) do
GenObservable.send_event(pid, v)
end

def complete(pid) do
GenObservable.stop(pid, "complete")
end
end
4 changes: 2 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%{
"earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm", "c57508ddad47dfb8038ca6de1e616e66e9b87313220ac5d9817bc4a4dc2257b9"},
"ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm", "33d7b70d87d45ed899180fb0413fb77c7c48843188516e15747e00fdecf572b6"},
}

0 comments on commit cf5b723

Please sign in to comment.