Skip to content

Commit

Permalink
fix regression to allow multiple topics per producer supervisor (bbal…
Browse files Browse the repository at this point in the history
…ser#98)

* fix regression to allow multiple topics per producer supervisor

* fix CI setup beam vm action
  • Loading branch information
jeffgrunewald authored Oct 5, 2021
1 parent 4f88aef commit 8a461b8
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 38 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-elixir@v1.2.0
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
Expand All @@ -26,7 +26,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-elixir@v1.2.0
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
Expand All @@ -43,7 +43,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-elixir@v1.2.0
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-elixir@v1.2.0
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-elixir@v1.2.0
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
Expand Down
23 changes: 15 additions & 8 deletions lib/elsa/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ defmodule Elsa.Supervisor do
{Elsa.Registry, name: registry},
{DynamicSupervisor, strategy: :one_for_one, name: dynamic_supervisor(registry)},
start_client(args),
producer_spec(registry, Keyword.get(args, :producer, [])),
producer_spec(registry, Keyword.get(args, :producer)),
start_group_consumer(connection, registry, Keyword.get(args, :group_consumer)),
start_consumer(connection, registry, Keyword.get(args, :consumer))
]
Expand Down Expand Up @@ -208,19 +208,26 @@ defmodule Elsa.Supervisor do
initializer: {Elsa.Consumer.Worker.Initializer, :init, [consumer_args]}}
end

defp producer_spec(registry, args) do
initializer =
case Keyword.take(args, [:topic, :config]) do
[] -> nil
init_args -> {Elsa.Producer.Initializer, :init, [registry, init_args]}
end
defp producer_spec(registry, nil) do
[
{
Elsa.DynamicProcessManager,
id: :producer_process_manager,
dynamic_supervisor: dynamic_supervisor(registry),
initializer: nil,
poll: false,
name: via_name(registry, :producer_process_manager)
}
]
end

defp producer_spec(registry, args) do
[
{
Elsa.DynamicProcessManager,
id: :producer_process_manager,
dynamic_supervisor: dynamic_supervisor(registry),
initializer: initializer,
initializer: {Elsa.Producer.Initializer, :init, [registry, args]},
poll: Keyword.get(args, :poll, false),
name: via_name(registry, :producer_process_manager)
}
Expand Down
12 changes: 6 additions & 6 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ defmodule Elsa.MixProject do

defp deps do
[
{:brod, "~> 3.14"},
{:brod, "~> 3.14.0"},
{:patiently, "~> 0.2", only: [:dev, :test, :integration]},
{:divo, "~> 1.1", only: [:dev, :test, :integration], override: true},
{:divo_kafka, "~> 0.1.0", only: [:dev, :test, :integration]},
{:placebo, "~> 2.0.0-rc.2", only: [:dev, :test]},
{:divo, "~> 1.3", only: [:dev, :test, :integration], override: true},
{:divo_kafka, "~> 0.1.7", only: [:dev, :test, :integration]},
{:placebo, "~> 2.0", only: [:dev, :test]},
{:checkov, "~> 1.0", only: [:test, :integration]},
{:ex_doc, "~> 0.22.1", only: [:dev]},
{:dialyxir, "~> 1.0.0", only: [:dev], runtime: false}
{:ex_doc, "~> 0.25.3", only: [:dev]},
{:dialyxir, "~> 1.1.0", only: [:dev], runtime: false}
]
end

Expand Down
22 changes: 12 additions & 10 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@
"brod": {:hex, :brod, "3.14.0", "f959408e88acd0feca22f6a43ca26e70201c6e5c57dc74b87f08ef65a5e7fe18", [:rebar3], [{:kafka_protocol, "2.3.6", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.11", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "a0153437835b810d93e79c7000f55b0c4bd62c281a3224a2615f6aa72b52d484"},
"checkov": {:hex, :checkov, "1.0.0", "cecf1be22ea506b2fbd6741d7c00f4876bb2be76ea1b95493c25b51028f24410", [:mix], [], "hexpm", "9fa85e6fdf1bcec2dd0d996d0c1e5a83e336dafb97c931232af1cb1e7ef4420a"},
"crc32cer": {:hex, :crc32cer, "0.1.4", "a656dff19474d1a1fc5bb0081610ab6b0695b23affc47fa90abeb079a8ef9752", [:rebar3], [], "hexpm", "964735a5422cf65bbc5354860a560fff546f0026f83f8860525bd58ab5bade5d"},
"dialyxir": {:hex, :dialyxir, "1.0.0", "6a1fa629f7881a9f5aaf3a78f094b2a51a0357c843871b8bc98824e7342d00a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "aeb06588145fac14ca08d8061a142d52753dbc2cf7f0d00fc1013f53f8654654"},
"divo": {:hex, :divo, "1.1.9", "6f91b0a02bd97800eb9a99abd771b4c9b67d282b67abc223eb2832b93f557b7e", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:patiently, "~> 0.2", [hex: :patiently, repo: "hexpm", optional: false]}], "hexpm", "b0edcd689089d723802c2d582bab54a77725f673445aa474eea259448910c252"},
"divo_kafka": {:hex, :divo_kafka, "0.1.6", "dffaa5d419d75e6607b581187347e6fef18b9d06d517a0f7a49772b52f60115f", [:mix], [{:divo, "~> 1.1", [hex: :divo, repo: "hexpm", optional: false]}], "hexpm", "cbc408a8b6593784524b5fee09aae0e9cc58328a174fc6d3f337d9fb34b4bc62"},
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
"divo": {:hex, :divo, "1.3.1", "a7cdb05d4525a9703e11dbcf40567d426b546f8e816b9c9465232c10bc6a257b", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:patiently, "~> 0.2", [hex: :patiently, repo: "hexpm", optional: false]}], "hexpm", "b3edef7baf068bbf864c01c8f3ed06fcf81c08e8d4adcc1cfc4b7d6eb69c6a18"},
"divo_kafka": {:hex, :divo_kafka, "0.1.7", "e8253bb735e001c41f35645ac0429740b6b6350ceb0ae268609f769f0b3883c5", [:mix], [{:divo, "~> 1.1", [hex: :divo, repo: "hexpm", optional: false]}], "hexpm", "25f9b89a1f59f6801b8b1e044eaa8cdce4e0756b4a8512458ea31f9c99ec338f"},
"earmark": {:hex, :earmark, "1.4.5", "62ffd3bd7722fb7a7b1ecd2419ea0b458c356e7168c1f5d65caf09b4fbdd13c8", [:mix], [], "hexpm", "b7d0e6263d83dc27141a523467799a685965bf8b13b6743413f19a7079843f4f"},
"earmark_parser": {:hex, :earmark_parser, "1.4.15", "b29e8e729f4aa4a00436580dcc2c9c5c51890613457c193cc8525c388ccb2f06", [:mix], [], "hexpm", "044523d6438ea19c1b8ec877ec221b008661d3c27e3b848f4c879f500421ca5c"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.22.1", "9bb6d51508778193a4ea90fa16eac47f8b67934f33f8271d5e1edec2dc0eee4c", [:mix], [{:earmark, "~> 1.4.0", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "d957de1b75cb9f78d3ee17820733dc4460114d8b1e11f7ee4fd6546e69b1db60"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
"ex_doc": {:hex, :ex_doc, "0.25.3", "3edf6a0d70a39d2eafde030b8895501b1c93692effcbd21347296c18e47618ce", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "9ebebc2169ec732a38e9e779fd0418c9189b3ca93f4a676c961be6c1527913f5"},
"jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
"kafka_protocol": {:hex, :kafka_protocol, "2.3.6", "df076a8ef49fffae3535c805cb00f3a057ce1895e63398bf8a10569eeeac02f8", [:rebar, :rebar3], [{:crc32cer, "0.1.4", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "1.2.5", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "7cb061fe46babc7fd269d2c0e5b4dba5d1efc4f7dacce85b17a9cca973106b23"},
"makeup": {:hex, :makeup, "1.0.3", "e339e2f766d12e7260e6672dd4047405963c5ec99661abdc432e6ec67d29ef95", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "2e9b4996d11832947731f7608fed7ad2f9443011b3b479ae288011265cdd3dad"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"},
"meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm", "d34f013c156db51ad57cc556891b9720e6a1c1df5fe2e15af999c84d6cebeb1a"},
"nimble_parsec": {:hex, :nimble_parsec, "0.6.0", "32111b3bf39137144abd7ba1cce0914533b2d16ef35e8abc5ec8be6122944263", [:mix], [], "hexpm", "27eac315a94909d4dc68bc07a4a83e06c8379237c5ea528a9acff4ca1c873c52"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"patiently": {:hex, :patiently, "0.2.0", "67eb139591e10c4b363ae0198e832552f191c58894731efd3bf124ec4722267a", [:mix], [], "hexpm", "c08cc5edc27def565647a9b55a0bea8025a5f81a4472e57692f28f2292c44c94"},
"placebo": {:hex, :placebo, "2.0.0-rc.2", "e148f8b313e75978cdf7ebc762124ec64322d7a092b5d9fdf907d3d6a7b6e0b8", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "46ebd45c7786e92807cd8652e522a236088e71cb43c90a8587edb2a70b8acd5a"},
"placebo": {:hex, :placebo, "2.0.0", "c0e773dec77e941bcbcc14d10b759f2d66775aff9b75051f3e41939b64300e81", [:mix], [{:meck, "~> 0.9", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "e0872cec8848d7e59ba96396f45ee1ad34662c689c86ba6190694d38b4289844"},
"snappyer": {:hex, :snappyer, "1.2.5", "9154b9ac84031f0a799f72a4aa87df23ab2193b5631475fa2cdc304382d2df77", [:rebar3], [], "hexpm", "d2adc26a81efd5f138397a38a0bb545188d302972721f8be0de37fa452c8aed7"},
"supervisor3": {:hex, :supervisor3, "1.1.11", "d81cdec31d102fde407423e1d05b569572850deebed86b951d5233c387cba80b", [:rebar3], [], "hexpm", "e6c2dedbcabcba24995a218aca12db5e208b80d3252692b22ef0f1a266104b50"},
}
28 changes: 21 additions & 7 deletions test/integration/elsa/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,35 @@ defmodule Elsa.ProducerTest do
describe "producer managers" do
setup do
topic = "producer-manager-test"
topic2 = "producer-test-secondary"
connection = :elsa_producer_test2

Elsa.create_topic(@brokers, topic)
Elsa.create_topic(@brokers, topic2)

{:ok, supervisor} =
Elsa.Supervisor.start_link(endpoints: @brokers, connection: connection, producer: [topic: topic])
Elsa.Supervisor.start_link(
endpoints: @brokers,
connection: connection,
producer: [[topic: topic], [topic: topic2]]
)

Elsa.Producer.ready?(connection)

on_exit(fn ->
assert_down(supervisor)
end)

[connection: connection, topic: topic, registry: Elsa.Supervisor.registry(connection)]
[connection: connection, topics: [topic, topic2], registry: Elsa.Supervisor.registry(connection)]
end

test "restarts producers when the client is dropped", %{connection: connection, topic: topic, registry: registry} do
test "restarts producers when the client is dropped", %{
connection: connection,
topics: [topic, topic2],
registry: registry
} do
message = "everything's fine here"
message2 = "also over here"
client_pid = Elsa.Registry.whereis_name({registry, :brod_client})
Process.exit(client_pid, :kill)

Expand All @@ -43,13 +54,16 @@ defmodule Elsa.ProducerTest do
)

Producer.produce(connection, topic, message)
Producer.produce(connection, topic2, message2)

Patiently.wait_for!(
fn ->
case Elsa.fetch(@brokers, topic) do
{:ok, 1, [%Elsa.Message{value: result}]} ->
message == result

with {:ok, 1, [%Elsa.Message{value: result}]} <- Elsa.fetch(@brokers, topic),
true <- message == result,
{:ok, 1, [%Elsa.Message{value: result2}]} <- Elsa.fetch(@brokers, topic2),
true <- message2 == result2 do
true
else
_ ->
false
end
Expand Down
4 changes: 2 additions & 2 deletions test/unit/elsa/dynamic_process_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Elsa.DynamicProcessManagerTest do
Process.sleep(1_000)
assert 0 == Agent.get(:agent1, fn s -> s end)

assert {:ok, test_server} = Elsa.DynamicProcessManager.start_child(:pm, TestServer)
assert {:ok, _test_server} = Elsa.DynamicProcessManager.start_child(:pm, TestServer)
assert "hello" == TestServer.echo(TestServer, "hello")

Process.whereis(:dyn_sup)
Expand All @@ -44,7 +44,7 @@ defmodule Elsa.DynamicProcessManagerTest do
Process.sleep(1_000)
assert 0 == Agent.get(:agent1, fn s -> s end)

assert {:ok, test_server} = Elsa.DynamicProcessManager.start_child(:pm, TestServer)
assert {:ok, _test_server} = Elsa.DynamicProcessManager.start_child(:pm, TestServer)
assert "hello" == TestServer.echo(TestServer, "hello")

Process.whereis(:dyn_sup)
Expand Down

0 comments on commit 8a461b8

Please sign in to comment.