Skip to content

Commit

Permalink
distinct until changed
Browse files Browse the repository at this point in the history
  • Loading branch information
m1dnight committed Jun 6, 2018
1 parent 99f7201 commit 9493732
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 4 deletions.
21 changes: 21 additions & 0 deletions lib/obs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,27 @@ defmodule Observables.Obs do
pid
end

@doc """
Filters out values that have already been produced by any given observable.
Allows duplicate values as soon as a different value has been produces.
Uses the default `==` function if none is given.
The expected function should take 2 arguments, and return a boolean indication
the equality.
More information: http://reactivex.io/documentation/operators/distinct.html and http://rxmarbles.com/#distinctUntilChanged
"""
def distinctuntilchanged(observable, f \\ fn x, y -> x == y end) do
{:ok, pid} = GenObservable.start_link(Distinct, [f])

GenObservable.send_to(observable, pid)

# Creat the continuation.
pid
end

@doc """
Same as map, but returns the original value. Typically used for side effects.
Expand Down
2 changes: 1 addition & 1 deletion lib/observables/delay.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Observables.Operator.Delay do
{:value, x, state}

v ->
Logger.warn "Sending after #{delay}: #{inspect v}"
Logger.warn("Sending after #{delay}: #{inspect(v)}")
Process.send_after(self(), {:event, {:delayed, v}}, delay)
{:novalue, state}
end
Expand Down
24 changes: 24 additions & 0 deletions lib/observables/distinct_until_changed.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Observables.Operator.DistinctUntilChanged do
@moduledoc false
use Observables.GenObservable

def init([comparator]) do
Logger.debug("Distinct Until Changed: #{inspect(self())}")
{:ok, %{:comp => comparator, :seen => []}}
end

def handle_event(v, state = %{:comp => f, :seen => xs}) do
seen? = Enum.any?(xs, fn seen -> f.(v, seen) end)

if not seen? do
{:value, v, %{:comp => f, :seen => []}}
else
{:novalue, state}
end
end

def handle_done(pid, _state) do
Logger.debug("#{inspect(self())}: dependency stopping: #{inspect(pid)}")
{:ok, :continue}
end
end
3 changes: 1 addition & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ defmodule Observables.MixProject do
"Observables in the spirit of Reactive Extensions for Elixir."
end


defp package() do
[
# This option is only needed when you don't want to use the OTP application name
Expand All @@ -44,5 +43,5 @@ defmodule Observables.MixProject do
licenses: ["MIT"],
links: %{"GitHub" => "https://github.com/m1dnight/observables"}
]
end
end
end
2 changes: 1 addition & 1 deletion test/delay_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule DelayTest do
Logger.error("Got #{v}")
send(testproc, v)
end)

# Receive no other values.
receive do
_x ->
Expand All @@ -29,6 +30,5 @@ defmodule DelayTest do
|> Enum.map(fn v ->
assert_receive(^v, 5000, "did not get this message #{v}")
end)

end
end
36 changes: 36 additions & 0 deletions test/distinctuntilchanged_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule DistinctUntilChangedTest do
use ExUnit.Case
alias Observables.{Obs}
require Logger

@tag :distinctuntilchanged
test "distinctuntilchanged" do
testproc = self()

xs = [1, 1, 2, 2, 3, 3]

expected = [1, 2, 3]

xs
|> Obs.from_enum(100)
|> Obs.distinctuntilchanged()
|> Obs.map(fn v -> send(testproc, v) end)

expected
|> Enum.map(fn x ->
assert_receive(^x, 1000, "did not get this message!")

receive do
^x -> assert "duplicates" == ""
after
1000 -> :ok
end
end)

receive do
x -> flunk("Mailbox was supposed to be empty, got: #{inspect(x)}")
after
0 -> :ok
end
end
end

0 comments on commit 9493732

Please sign in to comment.