Skip to content

Commit

Permalink
Do not exit on error (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
raz-adroll authored Jul 27, 2023
1 parent 1f45292 commit 5969969
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions lib/exmld/kinesis_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ defmodule Exmld.KinesisWorker do
{:done, []},
{:max_pending, 1000},
{:await_sleep_interval, 1000},
{:pending, %{}}]
{:pending, %{}},
{:on_duplicate, :exit}]

@type flusher_token :: any
@type t :: %__MODULE__{# list of identifiers which can be `GenStage.call/3`ed:
Expand All @@ -39,9 +40,11 @@ defmodule Exmld.KinesisWorker do
#
# otherwise, we store {token, []} and await all outstanding
# dispositions for items extracted from the originating record.
pending: %{optional(Exmld.sequence_number) =>
pending: %{optional({Exmld.shard_id, Exmld.sequence_number}) =>
flusher_token | {flusher_token, [{non_neg_integer,
non_neg_integer}]}}}
non_neg_integer}]}},
# What we should do if we find a duplicate sequence number
on_duplicate: :exit | :skip}

@moduledoc """
An [erlmld_flusher](https://github.com/AdRoll/erlmld/blob/HEAD/src/erlmld_flusher.erl)
Expand Down Expand Up @@ -226,15 +229,23 @@ defmodule Exmld.KinesisWorker do
# don't know how many), and we'll expect multiple dispositions for it (each containing a
# faked sequence number also containing populated base, user_sub, and user_total); once we receive
# all of those, it's done.
defp note_pending(%__MODULE__{pending: pending} = state, record, token) do
defp note_pending(%__MODULE__{pending: pending, shard_id: shard_id} = state, record, token) do
sn = Exmld.stream_record(record, :sequence_number)
if pending[sn] do
# we received the same sequence number for two records; this should not happen.
exit({:duplicate_seqno, sn})
end
note_pending(state, token, sn, Map.has_key?(pending, {shard_id, sn}))
end
defp note_pending(%__MODULE__{on_duplicate: :exit, shard_id: shard_id}, _token, sn, true) do
# we received the same sequence number for two records; this should not happen.
Logger.warning("Duplicate sequence number for two records #{sn} on #{shard_id} exiting...")
exit({:duplicate_seqno, sn})
end
defp note_pending(%__MODULE__{on_duplicate: :skip, shard_id: shard_id} = state, _token, sn, true) do
Logger.warning("Duplicate sequence number for two records #{sn} on #{shard_id} skipping...")
state
end
defp note_pending(%__MODULE__{pending: pending, shard_id: shard_id} = state, token, sn, false) do
expect_multiple = :undefined == Exmld.sequence_number(sn, :user_sub)
stored_token = maybe_standard_token(token, sn)
%{state | pending: Map.put(pending, sn, case expect_multiple do
%{state | pending: Map.put(pending, {shard_id, sn}, case expect_multiple do
true ->
{stored_token, []}
false ->
Expand Down Expand Up @@ -276,8 +287,8 @@ defmodule Exmld.KinesisWorker do
|> Enum.reduce(state, &update_pending_1/2)
end

defp update_pending_1(sn, %__MODULE__{pending: pending, done: done} = state) do
case Map.pop(pending, sn) do
defp update_pending_1(sn, %__MODULE__{pending: pending, done: done, shard_id: shard_id} = state) do
case Map.pop(pending, {shard_id, sn}) do
{nil, pending} ->
# the sequence number doesn't exist in pending. this will happen if the sequence
# number has user_sub and user_total fields populated and a non-aggregate record was
Expand All @@ -288,7 +299,7 @@ defmodule Exmld.KinesisWorker do
if :undefined == sub do
exit({:missing_pending, sn})
end
key = Exmld.sequence_number(sn, user_sub: :undefined, user_total: :undefined)
key = {shard_id, Exmld.sequence_number(sn, user_sub: :undefined, user_total: :undefined)}
{{token, seen}, pending} = Map.pop(pending, key)
seen = [{sub, total} | seen]
# if all expected items have been received, move token to done. otherwise,
Expand Down

0 comments on commit 5969969

Please sign in to comment.