Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce sequential processing of session events #4493

Merged
merged 9 commits into from
Sep 3, 2024
8 changes: 8 additions & 0 deletions lib/plausible/cache/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,12 @@ defmodule Plausible.Cache.Adapter do
Logger.error("Error retrieving key from '#{inspect(cache_name)}'")
[]
end

@spec with_lock!(atom(), any(), pos_integer(), (-> result)) :: result when result: any()
def with_lock!(cache_name, key, timeout, fun) do
ConCache.isolated(cache_name, key, timeout, fun)
catch
:exit, {:timeout, _} ->
Logger.error("Timeout while executing with lock on key in '#{inspect(cache_name)}'")
end
end
31 changes: 20 additions & 11 deletions lib/plausible/session/cache_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,27 @@ defmodule Plausible.Session.CacheStore do
require Logger
alias Plausible.Session.WriteBuffer

def on_event(event, session_attributes, prev_user_id, buffer \\ WriteBuffer) do
found_session = find_session(event, event.user_id) || find_session(event, prev_user_id)
@lock_timeout 500

if found_session do
updated_session = update_session(found_session, event)
buffer.insert([%{found_session | sign: -1}, %{updated_session | sign: 1}])
persist_session(updated_session)
else
new_session = new_session_from_event(event, session_attributes)
buffer.insert([new_session])
persist_session(new_session)
end
def on_event(event, session_attributes, prev_user_id, buffer_insert \\ &WriteBuffer.insert/1) do
Plausible.Cache.Adapter.with_lock!(
zoldar marked this conversation as resolved.
Show resolved Hide resolved
:sessions,
{event.site_id, event.user_id},
@lock_timeout,
fn ->
found_session = find_session(event, event.user_id) || find_session(event, prev_user_id)

if found_session do
updated_session = update_session(found_session, event)
buffer_insert.([%{found_session | sign: -1}, %{updated_session | sign: 1}])
persist_session(updated_session)
else
new_session = new_session_from_event(event, session_attributes)
buffer_insert.([new_session])
persist_session(new_session)
end
end
)
end

defp find_session(_domain, nil), do: nil
Expand Down
181 changes: 173 additions & 8 deletions test/plausible/session/cache_store_test.exs
Original file line number Diff line number Diff line change
@@ -1,16 +1,181 @@
defmodule Plausible.Session.CacheStoreTest do
use Plausible.DataCase

import ExUnit.CaptureLog

alias Plausible.Session.CacheStore

defmodule FakeBuffer do
def insert(sessions) do
send(self(), {WriteBuffer, :insert, [sessions]})
setup do
current_pid = self()

buffer = fn sessions ->
send(current_pid, {:buffer, :insert, [sessions]})
{:ok, sessions}
end

slow_buffer = fn sessions ->
Process.sleep(200)
send(current_pid, {:slow_buffer, :insert, [sessions]})
{:ok, sessions}
end

[buffer: buffer, slow_buffer: slow_buffer]
end

setup do
[buffer: FakeBuffer]
test "event processing is sequential within session", %{
buffer: buffer,
slow_buffer: slow_buffer
} do
event1 = build(:event, name: "pageview")
event2 = build(:event, name: "pageview", user_id: event1.user_id, site_id: event1.site_id)
event3 = build(:event, name: "pageview", user_id: event1.user_id, site_id: event1.site_id)

session_params = %{
referrer: "ref",
referrer_source: "refsource",
utm_medium: "medium",
utm_source: "source",
utm_campaign: "campaign",
utm_content: "content",
utm_term: "term",
browser: "browser",
browser_version: "55",
country_code: "EE",
screen_size: "Desktop",
operating_system: "Mac",
operating_system_version: "11"
}

CacheStore.on_event(event1, session_params, nil, buffer)

assert_receive({:buffer, :insert, [[session1]]})

[event2, event3]
|> Enum.map(fn e ->
Task.async(fn ->
CacheStore.on_event(e, session_params, nil, slow_buffer)
end)
end)
|> Task.await_many()

assert_receive({:slow_buffer, :insert, [[removed_session11, updated_session12]]})
assert_receive({:slow_buffer, :insert, [[removed_session12, updated_session13]]})

# Without isolation enforced in `CacheStore.on_event/4`,
# event2 and event3 would both get executed in parallel
# and treat event1 as event to be updated. This would result
# in a following set of entries in Clickhouse sessions
# table for _the same_ session:
#
# session_id | is_bounce | sign
# (event1) 123 1 1
# (event2) 123 1 -1
# 123 0 1
# (event3) 123 1 -1
# 123 0 1
#
# Once collapsing merge tree table does collapsing, we'd end up with:
#
# session_id | is_bounce | sign
# 123 0 1
# 123 1 -1
# 123 0 1
#
# This in turn led to sum(sign * is_bounce) < 0 which, after underflowed casting,
# ended up with 2^32-(small n) for bounce_rate value.

assert removed_session11 == %{session1 | sign: -1}
assert updated_session12.sign == 1
assert updated_session12.events == 2
assert updated_session12.pageviews == 2
assert removed_session12 == %{updated_session12 | sign: -1}
assert updated_session13.sign == 1
assert updated_session13.events == 3
assert updated_session13.pageviews == 3
end

@tag :slow
test "in case of lock kicking in, the slow event finishes processing", %{buffer: buffer} do
# FIXME: for some reason sometimes `very_slow_buffer` is called twice,
# as if there was some implicit retry? Happens once per multiple runs
# of this test.
current_pid = self()

very_slow_buffer = fn sessions ->
Process.sleep(1000)
send(current_pid, {:very_slow_buffer, :insert, [sessions]})
{:ok, sessions}
end

event1 = build(:event, name: "pageview")
event2 = build(:event, name: "pageview", user_id: event1.user_id, site_id: event1.site_id)
event3 = build(:event, name: "pageview", user_id: event1.user_id, site_id: event1.site_id)

session_params = %{
referrer: "ref",
referrer_source: "refsource",
utm_medium: "medium",
utm_source: "source",
utm_campaign: "campaign",
utm_content: "content",
utm_term: "term",
browser: "browser",
browser_version: "55",
country_code: "EE",
screen_size: "Desktop",
operating_system: "Mac",
operating_system_version: "11"
}

async1 =
Task.async(fn ->
CacheStore.on_event(event1, session_params, nil, very_slow_buffer)
end)

async2 =
Task.async(fn ->
CacheStore.on_event(event2, session_params, nil, buffer)
end)

async3 =
Task.async(fn ->
CacheStore.on_event(event3, session_params, nil, buffer)
end)

capture_log(fn ->
Task.await_many([async1, async2, async3])
end) =~ "Timeout while executing with lock on key in ':sessions'"

assert_receive({:very_slow_buffer, :insert, [[_session]]})
refute_receive({:buffer, :insert, [[_updated_session]]})
end

test "exploding event processing is passed through by locking mechanism" do
crashing_buffer = fn _sessions ->
raise "boom"
end

event = build(:event, name: "pageview")

session_params = %{
referrer: "ref",
referrer_source: "refsource",
utm_medium: "medium",
utm_source: "source",
utm_campaign: "campaign",
utm_term: "term",
utm_content: "content",
browser: "browser",
browser_version: "55",
country_code: "EE",
screen_size: "Desktop",
operating_system: "Mac",
operating_system_version: "11"
}

assert_raise RuntimeError, "boom", fn ->
CacheStore.on_event(event, session_params, nil, crashing_buffer)
end
end

test "creates a session from an event", %{buffer: buffer} do
Expand Down Expand Up @@ -39,7 +204,7 @@ defmodule Plausible.Session.CacheStoreTest do

CacheStore.on_event(event, session_params, nil, buffer)

assert_receive({WriteBuffer, :insert, [sessions]})
assert_receive({:buffer, :insert, [sessions]})
assert [session] = sessions
assert session.hostname == event.hostname

Expand Down Expand Up @@ -82,7 +247,7 @@ defmodule Plausible.Session.CacheStoreTest do

CacheStore.on_event(event1, %{}, nil, buffer)
CacheStore.on_event(event2, %{}, nil, buffer)
assert_receive({WriteBuffer, :insert, [[_negative_record, session]]})
assert_receive({:buffer, :insert, [[_negative_record, session]]})
assert session.is_bounce == false
assert session.duration == 10
assert session.pageviews == 2
Expand Down Expand Up @@ -286,7 +451,7 @@ defmodule Plausible.Session.CacheStoreTest do
CacheStore.on_event(event1, %{}, nil, buffer)
CacheStore.on_event(event2, %{}, nil, buffer)

assert_receive({WriteBuffer, :insert, [[_negative_record, session]]})
assert_receive({:buffer, :insert, [[_negative_record, session]]})
assert session.duration == 10
end

Expand Down
Loading