diff --git a/lib/behavior/gen_observable.ex b/lib/behavior/gen_observable.ex index b840f85..bd33e01 100644 --- a/lib/behavior/gen_observable.ex +++ b/lib/behavior/gen_observable.ex @@ -134,10 +134,10 @@ defmodule Observables.GenObservable do state.listeningto |> Enum.filter(fn sub -> sub != pid end) - # if count(new_subs) == 0 do - # Logger.warn("#{inspect(self())} all dependencies done, stopping ourselves.") - # cast(self(), :stop) - # end + if count(new_subs) == 0 do + Logger.warn("#{inspect(self())} all dependencies done, stopping ourselves.") + cast(self(), :stop) + end {:noreply, %{state | listeningto: new_subs}} end @@ -248,7 +248,7 @@ defmodule Observables.GenObservable do @doc """ Makes an observable stop and gracefully shut down. """ - def stop(producer, reason \\ :normal) do + def stop(producer, _reason \\ :normal) do # cast(producer, {:stop, reason}) cast(producer, :stop) end diff --git a/lib/observables/delay.ex b/lib/observables/delay.ex index 0d9bc81..3206c25 100644 --- a/lib/observables/delay.ex +++ b/lib/observables/delay.ex @@ -23,6 +23,6 @@ defmodule Observables.Operator.Delay do def handle_done(pid, _state) do Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}") - {:ok, :continue} + {:ok, :done} end end diff --git a/lib/observables/distinct.ex b/lib/observables/distinct.ex index 7c69ce6..8d3c48e 100644 --- a/lib/observables/distinct.ex +++ b/lib/observables/distinct.ex @@ -19,6 +19,6 @@ defmodule Observables.Operator.Distinct do def handle_done(pid, _state) do Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}") - {:ok, :continue} + {:ok, :done} end end diff --git a/lib/observables/distinct_until_changed.ex b/lib/observables/distinct_until_changed.ex index 3960914..6fed7b0 100644 --- a/lib/observables/distinct_until_changed.ex +++ b/lib/observables/distinct_until_changed.ex @@ -19,6 +19,6 @@ defmodule Observables.Operator.DistinctUntilChanged do def handle_done(pid, _state) do Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}") - {:ok, :continue} + {:ok, :done} end end diff --git a/lib/observables/each.ex b/lib/observables/each.ex index 70cb40e..7d342a4 100644 --- a/lib/observables/each.ex +++ b/lib/observables/each.ex @@ -15,6 +15,6 @@ defmodule Observables.Operator.Each do def handle_done(pid, _state) do Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}") - {:ok, :continue} + {:ok, :done} end end diff --git a/lib/observables/filter.ex b/lib/observables/filter.ex index 96d3155..06ff102 100644 --- a/lib/observables/filter.ex +++ b/lib/observables/filter.ex @@ -18,6 +18,6 @@ defmodule Observables.Operator.Filter do def handle_done(pid, _state) do Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}") - {:ok, :continue} + {:ok, :done} end end diff --git a/lib/observables/from_enum.ex b/lib/observables/from_enum.ex index 0ae512c..419223e 100644 --- a/lib/observables/from_enum.ex +++ b/lib/observables/from_enum.ex @@ -19,6 +19,6 @@ defmodule Observables.Operator.FromEnum do def handle_done(pid, _state) do Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}") - {:ok, :continue} + {:ok, :done} end end diff --git a/lib/observables/map.ex b/lib/observables/map.ex index ee33aed..367c2f4 100644 --- a/lib/observables/map.ex +++ b/lib/observables/map.ex @@ -14,6 +14,6 @@ defmodule Observables.Operator.Map do def handle_done(pid, _state) do Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}") - {:ok, :continue} + {:ok, :done} end end diff --git a/lib/observables/merge_continuous.ex b/lib/observables/merge_continuous.ex index edbaf54..f2b7557 100644 --- a/lib/observables/merge_continuous.ex +++ b/lib/observables/merge_continuous.ex @@ -24,6 +24,7 @@ defmodule Observables.Operator.MergeContinuous do end def handle_done(_pid, state) do + # TODO {:ok, state} end end diff --git a/lib/observables/range.ex b/lib/observables/range.ex index 986e8f1..808e716 100644 --- a/lib/observables/range.ex +++ b/lib/observables/range.ex @@ -31,6 +31,6 @@ defmodule Observables.Operator.Range do def handle_done(pid, _state) do Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}") - {:ok, :continue} + {:ok, :done} end end diff --git a/lib/observables/starts_with.ex b/lib/observables/starts_with.ex index 8fd97f0..8fa8be8 100644 --- a/lib/observables/starts_with.ex +++ b/lib/observables/starts_with.ex @@ -14,6 +14,6 @@ defmodule Observables.Operator.StartsWith do def handle_done(pid, _state) do Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}") - {:ok, :continue} + {:ok, :done} end end diff --git a/lib/observables/take.ex b/lib/observables/take.ex index c55b7ff..e94c606 100644 --- a/lib/observables/take.ex +++ b/lib/observables/take.ex @@ -18,6 +18,6 @@ defmodule Observables.Operator.Take do def handle_done(pid, _state) do Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}") - {:ok, :continue} + {:ok, :done} end end diff --git a/lib/subject.ex b/lib/subject.ex index e43fef0..8fe1251 100644 --- a/lib/subject.ex +++ b/lib/subject.ex @@ -18,4 +18,8 @@ defmodule Observables.Subject do def next(pid, v) do GenObservable.send_event(pid, v) end + + def complete(pid) do + GenObservable.stop(pid, "complete") + end end diff --git a/mix.lock b/mix.lock index c7889ca..e35ff9a 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,4 @@ %{ - "earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm"}, - "ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, + "earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm", "c57508ddad47dfb8038ca6de1e616e66e9b87313220ac5d9817bc4a4dc2257b9"}, + "ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm", "33d7b70d87d45ed899180fb0413fb77c7c48843188516e15747e00fdecf572b6"}, }