From c910b2f43f1281d0e0b456ba2f702e4e39f9022b Mon Sep 17 00:00:00 2001 From: Tony Pitale Date: Fri, 29 Oct 2021 21:31:45 -0500 Subject: [PATCH] Handle partition response format for newer brod Resolve #97 --- lib/elsa/util.ex | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/lib/elsa/util.ex b/lib/elsa/util.ex index a739797..e75a5c9 100644 --- a/lib/elsa/util.ex +++ b/lib/elsa/util.ex @@ -109,23 +109,33 @@ defmodule Elsa.Util do def partition_count(endpoints, topic) when is_list(endpoints) do {:ok, metadata} = :brod.get_metadata(reformat_endpoints(endpoints), [topic]) - metadata.topic_metadata - |> Enum.map(fn topic_metadata -> - Enum.count(topic_metadata.partition_metadata) - end) - |> hd() + count_partitions(metadata) end def partition_count(connection, topic) when is_atom(connection) or is_pid(connection) do {:ok, metadata} = :brod_client.get_metadata(connection, topic) - metadata.topic_metadata + count_partitions(metadata) + end + + # Handle brod < 3.16 + defp count_partitions(%{topic_metada: topic_metadata}) do + topic_metadata |> Enum.map(fn topic_metadata -> Enum.count(topic_metadata.partition_metadata) end) |> hd() end + # Handle brod 3.16+ + defp count_partitions(%{topics: topics}) do + topics + |> Enum.map(fn topic -> + Enum.count(topic.partitions) + end) + |> hd() + end + defp connect(endpoints, :controller), do: :kpro.connect_controller(endpoints, []) defp connect(endpoints, _type), do: :kpro.connect_any(endpoints, [])