Skip to content

Commit

Permalink
retry on topic lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-bsft committed Feb 23, 2023
1 parent 31fa3ad commit f6804a5
Showing 1 changed file with 55 additions and 2 deletions.
57 changes: 55 additions & 2 deletions lib/pulsar_ex/partition_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,66 @@ defmodule PulsarEx.PartitionManager do

@watch_interval 30_000

def lookup(cluster, topic_name) do
# this has to be larger than the hackney default timeouts
# https://github.com/benoitc/hackney/blob/master/doc/hackney.md#request-5
@timeout 10_000
@max_attempts 5

def lookup(cluster, topic_name, state \\ %{error: nil, attempts: 0})

def lookup(_cluster, _topic_name, %{attempts: @max_attempts, error: err}), do: {:error, err}

def lookup(cluster, topic_name, %{attempts: 0}) do
do_lookup(cluster, topic_name)
|> case do
{:ok, {topic, partitions}} ->
{:ok, {topic, partitions}}

{:error, err} ->
Logger.error(
"Error looking up topic #{topic_name}, on cluster #{cluster}, #{inspect(err)}"
)

:telemetry.execute(
[:pulsar_ex, :lookup, :error],
%{count: 1},
%{cluster: cluster, topic: topic_name}
)

lookup(cluster, topic_name, %{attempts: 1, error: err})
end
end

def lookup(cluster, topic_name, %{attempts: attempts}) do
Process.sleep(2 ** attempts * 500)

do_lookup(cluster, topic_name)
|> case do
{:ok, {topic, partitions}} ->
{:ok, {topic, partitions}}

{:error, err} ->
Logger.error(
"Error looking up topic #{topic_name}, on cluster #{cluster}, #{inspect(err)}"
)

:telemetry.execute(
[:pulsar_ex, :lookup, :error],
%{count: 1},
%{cluster: cluster, topic: topic_name}
)

lookup(cluster, topic_name, %{attempts: attempts + 1, error: err})
end
end

defp do_lookup(cluster, topic_name) do
case :ets.lookup(PulsarEx.Application.partitions_table(), {cluster, topic_name}) do
[{{^cluster, ^topic_name}, {topic, partitions}}] ->
{:ok, {topic, partitions}}

[] ->
GenServer.call(name(cluster), {:lookup, topic_name})
GenServer.call(name(cluster), {:lookup, topic_name}, @timeout)
end
end

Expand Down

0 comments on commit f6804a5

Please sign in to comment.