Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updating to brod 3.16 and new kafka resp schemas #100

Merged
merged 5 commits into from
Nov 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ jobs:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
otp-version: 22.3
elixir-version: 1.10.4
- name: Get depedencies
run: |
mix local.rebar --force
Expand All @@ -28,8 +28,8 @@ jobs:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
otp-version: 22.3
elixir-version: 1.10.4
- name: Get dependencies
run: |
mix local.rebar --force
Expand All @@ -45,8 +45,8 @@ jobs:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
otp-version: 22.3
elixir-version: 1.10.4
- name: Retrieve cached PLT
uses: actions/cache@v1
with:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ jobs:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
otp-version: 22.3
elixir-version: 1.10.4
- name: Get dependencies
run: |
mix local.rebar --force
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ jobs:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
otp-version: 22.3
elixir-version: 1.10.4
- name: Build
run: |
mix local.rebar --force
Expand Down
2 changes: 2 additions & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
erlang 22.3.4.21
elixir 1.10.4-otp-22
2 changes: 1 addition & 1 deletion lib/elsa/partitioner/random.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ defmodule Elsa.Partitioner.Random do
@behaviour Elsa.Partitioner

def partition(count, _key) do
:crypto.rand_uniform(0, count)
:rand.uniform(count) - 1
end
end
27 changes: 15 additions & 12 deletions lib/elsa/topic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ defmodule Elsa.Topic do
{:ok, metadata} = :brod.get_metadata(reformat_endpoints(endpoints), :all)

topics =
metadata.topic_metadata
metadata.topics
|> Enum.map(fn topic_metadata ->
{topic_metadata.topic, Enum.count(topic_metadata.partition_metadata)}
{topic_metadata.name, Enum.count(topic_metadata.partitions)}
end)

{:ok, topics}
Expand Down Expand Up @@ -48,14 +48,14 @@ defmodule Elsa.Topic do
config =
opts
|> Keyword.get(:config, [])
|> Enum.map(fn {key, val} -> %{config_name: to_string(key), config_value: val} end)
|> Enum.map(fn {key, val} -> %{name: to_string(key), value: val} end)

create_topic_args = %{
topic: topic,
name: topic,
num_partitions: Keyword.get(opts, :partitions, 1),
replication_factor: Keyword.get(opts, :replicas, 1),
replica_assignment: [],
config_entries: config
assignments: [],
configs: config
}

version = Elsa.Util.get_api_version(connection, :create_topics)
Expand Down Expand Up @@ -88,15 +88,18 @@ defmodule Elsa.Topic do
defp check_response(response) do
message = kpro_rsp(response, :msg)

error_key =
case Map.has_key?(message, :topic_errors) do
true -> :topic_errors
false -> :topic_error_codes
response_key =
case Map.has_key?(message, :topics) do
true -> :topics
false -> :responses
end

case Enum.find(message[error_key], fn error -> error.error_code != :no_error end) do
case Enum.find(message[response_key], fn response -> response.error_code != :no_error end) do
nil -> :ok
error -> {:error, {error.error_code, error[:error_message]}}
response -> {:error, {response.error_code, resp_error_msg(response, response_key)}}
end
end

defp resp_error_msg(response, :topics), do: response.error_message
defp resp_error_msg(_response, :responses), do: :delete_topic_error
end
24 changes: 14 additions & 10 deletions lib/elsa/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,25 @@ defmodule Elsa.Util do
def partition_count(endpoints, topic) when is_list(endpoints) do
{:ok, metadata} = :brod.get_metadata(reformat_endpoints(endpoints), [topic])

metadata.topic_metadata
|> Enum.map(fn topic_metadata ->
Enum.count(topic_metadata.partition_metadata)
end)
|> hd()
count_partitions(metadata)
end

def partition_count(connection, topic) when is_atom(connection) or is_pid(connection) do
{:ok, metadata} = :brod_client.get_metadata(connection, topic)

metadata.topic_metadata
|> Enum.map(fn topic_metadata ->
Enum.count(topic_metadata.partition_metadata)
end)
|> hd()
count_partitions(metadata)
end

# Handle brod < 3.16
defp count_partitions(%{topic_metadata: topic_metadatas}) do
[count | _] = for %{partition_metadata: metadata} <- topic_metadatas, do: Enum.count(metadata)
count
end

# Handle brod 3.16+
defp count_partitions(%{topics: topics}) do
[count | _] = for %{partitions: partitions} <- topics, do: Enum.count(partitions)
count
end

defp connect(endpoints, :controller), do: :kpro.connect_controller(endpoints, [])
Expand Down
6 changes: 3 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
defmodule Elsa.MixProject do
use Mix.Project

@version "1.0.0-rc.2"
@version "1.0.0-rc.3"
@github "https://github.com/bbalser/elsa"

def project do
[
app: :elsa,
name: "Elsa",
version: @version,
elixir: "~> 1.8",
elixir: "~> 1.10",
start_permanent: Mix.env() == :prod,
description: description(),
package: package(),
Expand All @@ -30,7 +30,7 @@ defmodule Elsa.MixProject do

defp deps do
[
{:brod, "~> 3.14.0"},
{:brod, "~> 3.16"},
{:patiently, "~> 0.2", only: [:dev, :test, :integration]},
{:divo, "~> 1.3", only: [:dev, :test, :integration], override: true},
{:divo_kafka, "~> 0.1.7", only: [:dev, :test, :integration]},
Expand Down
17 changes: 8 additions & 9 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
%{
"brod": {:hex, :brod, "3.14.0", "f959408e88acd0feca22f6a43ca26e70201c6e5c57dc74b87f08ef65a5e7fe18", [:rebar3], [{:kafka_protocol, "2.3.6", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.11", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "a0153437835b810d93e79c7000f55b0c4bd62c281a3224a2615f6aa72b52d484"},
"brod": {:hex, :brod, "3.16.1", "1c7b03f99c7cc310de5511cadad9879ab0cc5f1a2612211e68c26dad517d31b0", [:rebar3], [{:kafka_protocol, "4.0.1", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:snappyer, "1.2.8", [hex: :snappyer, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.11", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "8297c47cd1ff0657955027fa1beb62edfaab1cc5e09b714cc29bd7f1c8d40083"},
"checkov": {:hex, :checkov, "1.0.0", "cecf1be22ea506b2fbd6741d7c00f4876bb2be76ea1b95493c25b51028f24410", [:mix], [], "hexpm", "9fa85e6fdf1bcec2dd0d996d0c1e5a83e336dafb97c931232af1cb1e7ef4420a"},
"crc32cer": {:hex, :crc32cer, "0.1.4", "a656dff19474d1a1fc5bb0081610ab6b0695b23affc47fa90abeb079a8ef9752", [:rebar3], [], "hexpm", "964735a5422cf65bbc5354860a560fff546f0026f83f8860525bd58ab5bade5d"},
"crc32cer": {:hex, :crc32cer, "0.1.8", "c6c2275c5fb60a95f4935d414f30b50ee9cfed494081c9b36ebb02edfc2f48db", [:rebar3], [], "hexpm", "251499085482920deb6c9b7aadabf9fb4c432f96add97ab42aee4501e5b6f591"},
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
"divo": {:hex, :divo, "1.3.1", "a7cdb05d4525a9703e11dbcf40567d426b546f8e816b9c9465232c10bc6a257b", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:patiently, "~> 0.2", [hex: :patiently, repo: "hexpm", optional: false]}], "hexpm", "b3edef7baf068bbf864c01c8f3ed06fcf81c08e8d4adcc1cfc4b7d6eb69c6a18"},
"divo_kafka": {:hex, :divo_kafka, "0.1.7", "e8253bb735e001c41f35645ac0429740b6b6350ceb0ae268609f769f0b3883c5", [:mix], [{:divo, "~> 1.1", [hex: :divo, repo: "hexpm", optional: false]}], "hexpm", "25f9b89a1f59f6801b8b1e044eaa8cdce4e0756b4a8512458ea31f9c99ec338f"},
"earmark": {:hex, :earmark, "1.4.5", "62ffd3bd7722fb7a7b1ecd2419ea0b458c356e7168c1f5d65caf09b4fbdd13c8", [:mix], [], "hexpm", "b7d0e6263d83dc27141a523467799a685965bf8b13b6743413f19a7079843f4f"},
"earmark_parser": {:hex, :earmark_parser, "1.4.15", "b29e8e729f4aa4a00436580dcc2c9c5c51890613457c193cc8525c388ccb2f06", [:mix], [], "hexpm", "044523d6438ea19c1b8ec877ec221b008661d3c27e3b848f4c879f500421ca5c"},
"earmark_parser": {:hex, :earmark_parser, "1.4.17", "6f3c7e94170377ba45241d394389e800fb15adc5de51d0a3cd52ae766aafd63f", [:mix], [], "hexpm", "f93ac89c9feca61c165b264b5837bf82344d13bebc634cd575cb711e2e342023"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.25.3", "3edf6a0d70a39d2eafde030b8895501b1c93692effcbd21347296c18e47618ce", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "9ebebc2169ec732a38e9e779fd0418c9189b3ca93f4a676c961be6c1527913f5"},
"ex_doc": {:hex, :ex_doc, "0.25.5", "ac3c5425a80b4b7c4dfecdf51fa9c23a44877124dd8ca34ee45ff608b1c6deb9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "688cfa538cdc146bc4291607764a7f1fcfa4cce8009ecd62de03b27197528350"},
"jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
"kafka_protocol": {:hex, :kafka_protocol, "2.3.6", "df076a8ef49fffae3535c805cb00f3a057ce1895e63398bf8a10569eeeac02f8", [:rebar, :rebar3], [{:crc32cer, "0.1.4", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "1.2.5", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "7cb061fe46babc7fd269d2c0e5b4dba5d1efc4f7dacce85b17a9cca973106b23"},
"kafka_protocol": {:hex, :kafka_protocol, "4.0.1", "fc696880c73483c8b032c4bb60f2873046035c7824e1edcb924cfce643cf23dd", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "687bfd9989998ec8fbbc3ed50d1239a6c07a7dc15b52914ad477413b89ecb621"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.0", "b44d75e2a6542dcb6acf5d71c32c74ca88960421b6874777f79153bbbbd7dccc", [:mix], [], "hexpm", "52b2871a7515a5ac49b00f214e4165a40724cf99798d8e4a65e4fd64ebd002c1"},
"patiently": {:hex, :patiently, "0.2.0", "67eb139591e10c4b363ae0198e832552f191c58894731efd3bf124ec4722267a", [:mix], [], "hexpm", "c08cc5edc27def565647a9b55a0bea8025a5f81a4472e57692f28f2292c44c94"},
"placebo": {:hex, :placebo, "2.0.0", "c0e773dec77e941bcbcc14d10b759f2d66775aff9b75051f3e41939b64300e81", [:mix], [{:meck, "~> 0.9", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "e0872cec8848d7e59ba96396f45ee1ad34662c689c86ba6190694d38b4289844"},
"snappyer": {:hex, :snappyer, "1.2.5", "9154b9ac84031f0a799f72a4aa87df23ab2193b5631475fa2cdc304382d2df77", [:rebar3], [], "hexpm", "d2adc26a81efd5f138397a38a0bb545188d302972721f8be0de37fa452c8aed7"},
"snappyer": {:hex, :snappyer, "1.2.8", "201ce9067a33c71a6a5087c0c3a49a010b17112d461e6df696c722dcb6d0934a", [:rebar3], [], "hexpm", "35518e79a28548b56d8fd6aee2f565f12f51c2d3d053f9cfa817c83be88c4f3d"},
"supervisor3": {:hex, :supervisor3, "1.1.11", "d81cdec31d102fde407423e1d05b569572850deebed86b951d5233c387cba80b", [:rebar3], [], "hexpm", "e6c2dedbcabcba24995a218aca12db5e208b80d3252692b22ef0f1a266104b50"},
}
4 changes: 2 additions & 2 deletions test/integration/elsa/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ defmodule Elsa.ConsumerTest do

send_messages(topic, ["message1", "message2"])

assert_receive {:message, %{topic: topic, partition: 0, offset: _, key: "", value: "message1"}}, 5_000
assert_receive {:message, %{topic: topic, partition: 1, offset: _, key: "", value: "message2"}}, 5_000
assert_receive {:message, %{topic: ^topic, partition: 0, offset: _, key: "", value: "message1"}}, 5_000
assert_receive {:message, %{topic: ^topic, partition: 1, offset: _, key: "", value: "message2"}}, 5_000
end

test "Elsa.Consumer will hand messages to the handler without state" do
Expand Down
30 changes: 15 additions & 15 deletions test/unit/elsa/topic_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ defmodule Elsa.TopicTest do
allow :kpro_req_lib.create_topics(any(), any(), any()), return: :topic_request

message = %{
topic_errors: [
topics: [
%{
error_code: :topic_already_exists,
error_message: "Topic 'elsa-topic' already exists.",
topic: "elsa-topic"
name: "elsa-topic"
}
]
}
Expand All @@ -54,10 +54,10 @@ defmodule Elsa.TopicTest do
allow :kpro_req_lib.delete_topics(any(), any(), any()), return: :topic_request

message = %{
topic_error_codes: [
responses: [
%{
error_code: :topic_doesnt_exist,
topic: "elsa-topic"
name: "elsa-topic"
}
]
}
Expand All @@ -70,21 +70,21 @@ defmodule Elsa.TopicTest do

internal_result = function.(:connection)

assert {:error, {:topic_doesnt_exist, nil}} == internal_result
assert {:error, {:topic_doesnt_exist, :delete_topic_error}} == internal_result
end
end

describe "list_topics/1" do
test "extracts topics and partitions as a list of tuples" do
metadata = %{
topic_metadata: [
topics: [
%{
partition_metadata: [%{partition: 0}],
topic: "elsa-other-topic"
partitions: [%{partition: 0}],
name: "elsa-other-topic"
},
%{
partition_metadata: [%{partition: 0}, %{partition: 1}],
topic: "elsa-topic"
partitions: [%{partition: 0}, %{partition: 1}],
name: "elsa-topic"
}
]
}
Expand All @@ -102,14 +102,14 @@ defmodule Elsa.TopicTest do
describe "exists?/2" do
test "returns a boolean identifying the presence of a given topic" do
metadata = %{
topic_metadata: [
topics: [
%{
partition_metadata: [%{partition: 0}],
topic: "elsa-other-topic"
partitions: [%{partition: 0}],
name: "elsa-other-topic"
},
%{
partition_metadata: [%{partition: 0}, %{partition: 1}],
topic: "elsa-topic"
partitions: [%{partition: 0}, %{partition: 1}],
name: "elsa-topic"
}
]
}
Expand Down