Skip to content

Commit

Permalink
Handle partition response format for newer brod
Browse files Browse the repository at this point in the history
Resolve bbalser#97
  • Loading branch information
tpitale authored Oct 30, 2021
1 parent 8a461b8 commit c910b2f
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions lib/elsa/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,33 @@ defmodule Elsa.Util do
def partition_count(endpoints, topic) when is_list(endpoints) do
{:ok, metadata} = :brod.get_metadata(reformat_endpoints(endpoints), [topic])

metadata.topic_metadata
|> Enum.map(fn topic_metadata ->
Enum.count(topic_metadata.partition_metadata)
end)
|> hd()
count_partitions(metadata)
end

def partition_count(connection, topic) when is_atom(connection) or is_pid(connection) do
{:ok, metadata} = :brod_client.get_metadata(connection, topic)

metadata.topic_metadata
count_partitions(metadata)
end

# Handle brod < 3.16
defp count_partitions(%{topic_metada: topic_metadata}) do
topic_metadata
|> Enum.map(fn topic_metadata ->
Enum.count(topic_metadata.partition_metadata)
end)
|> hd()
end

# Handle brod 3.16+
defp count_partitions(%{topics: topics}) do
topics
|> Enum.map(fn topic ->
Enum.count(topic.partitions)
end)
|> hd()
end

defp connect(endpoints, :controller), do: :kpro.connect_controller(endpoints, [])
defp connect(endpoints, _type), do: :kpro.connect_any(endpoints, [])

Expand Down

0 comments on commit c910b2f

Please sign in to comment.