From cd04a820eefe0f57c9c7758ba1b444ff8f9a6fe9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rolf=20H=C3=A5vard=20Blindheim?= Date: Tue, 20 Aug 2024 14:27:32 +0200 Subject: [PATCH] Add new option consume jobs (#11) * chore: update dependency versions * add: :jobs enum to replace :only_new and :only_latest * chore: update GH actions runner from broadway repo * fix: typo * add: update ubuntu version for GH actions * add: loosen up on otp-version * add: don't need to test ancient versions --- .github/workflows/elixir.yaml | 67 ++++++++++++---------- Changelog.md | 10 ++++ lib/off_broadway/splunk/options.ex | 11 +++- lib/off_broadway/splunk/producer.ex | 31 ++++++---- mix.lock | 32 +++++------ test/off_broadway/splunk/producer_test.exs | 32 ++++++++++- 6 files changed, 120 insertions(+), 63 deletions(-) diff --git a/.github/workflows/elixir.yaml b/.github/workflows/elixir.yaml index 61008e2..d937db0 100644 --- a/.github/workflows/elixir.yaml +++ b/.github/workflows/elixir.yaml @@ -1,9 +1,8 @@ name: CI Pipeline on: - push: - branches: ["main"] pull_request: + push: branches: ["main"] env: @@ -16,46 +15,52 @@ permissions: jobs: build: name: Build and test - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 strategy: - fail-fast: true + fail-fast: false matrix: - otp: ["24.3", "25.1", "26.1"] - elixir: ["1.14.5", "1.15.6"] + include: + - elixir: "1.14" + otp: "23.0" + + # Latest versions. + - elixir: "1.17" + otp: "27.0" + lint: lint + coverage: coverage steps: - - uses: actions/checkout@v3 - - name: Set up Elixir + - name: Check out this repository + uses: actions/checkout@v4 + + - name: Set up Erlang and Elixir uses: erlef/setup-beam@v1 with: - elixir-version: ${{matrix.elixir}} otp-version: ${{matrix.otp}} + elixir-version: ${{matrix.elixir}} - - name: Restore dependencies cache + - name: Cache Mix dependencies uses: actions/cache@v3 - id: dependency-cache + id: cache-deps with: - path: deps - key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }} - restore-keys: ${{ runner.os }}-mix- - - - name: Install dependencies - if: steps.dependency-cache.outputs.cache-hit != 'true' - run: | - mix local.rebar --force - mix local.hex --force - mix deps.get + path: | + deps + _build + key: | + mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}-${{ hashFiles('**/mix.lock') }} + restore-keys: | + mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}- - # - name: Run the formatter - # run: mix format --check-formatted + - run: mix do deps.get --check-locked, deps.compile + if: steps.cache-deps.outputs.cache-hit != 'true' - - name: Check for unused dependencies - run: mix deps.unlock --check-unused + - run: mix format --check-formatted + if: ${{ matrix.lint }} - - name: Compile Mix dependencies - run: mix deps.compile + - run: mix deps.unlock --check-unused + if: ${{ matrix.lint }} - - name: Compile code and check for warnings - run: mix compile --warnings-as-errors + - run: mix compile --warnings-as-errors + if: ${{ matrix.lint }} - - name: Run test suite with coverage - run: mix coveralls + - run: mix coveralls + if: ${{matrix.coverage}} diff --git a/Changelog.md b/Changelog.md index 3b2c163..d41b1c0 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,5 +1,15 @@ # Changelog +## 2.1.2 - Patch release + +_Released 2024-08-20_ + +- Replace `:only_new` and `:only_latest` options with a `:jobs` option. Set `:jobs` to `:new` to only process new jobs, and `:latest` +to only process the latest available job. +- Add deprecation warnings to `:only_new` and `:only_latest` option. The options are still accepted but will be removed in the +next major release. +- If the client receives an `{:error, reason}` tuple, reschedule another fetch instead of blowing up. + ## 2.1.1 - Patch release _Released 2023-12-28_ diff --git a/lib/off_broadway/splunk/options.ex b/lib/off_broadway/splunk/options.ex index 2868e6d..97a3376 100644 --- a/lib/off_broadway/splunk/options.ex +++ b/lib/off_broadway/splunk/options.ex @@ -15,6 +15,13 @@ defmodule OffBroadway.Splunk.Options do The report or alert name for the Splunk job we want to consume events from. """ ], + jobs: [ + default: :all, + type: {:in, [:all, :new, :latest]}, + doc: """ + Which jobs to add to the initial queue. Possible values are: `:all`, `:new`, `:latest`. + """ + ], receive_interval: [ type: :non_neg_integer, doc: """ @@ -35,14 +42,14 @@ defmodule OffBroadway.Splunk.Options do If set to `true`, the pipeline will skip adding any existing jobs to the initial queue. """, type: :boolean, - default: false + deprecated: "Use jobs: :new instead." ], only_latest: [ doc: """ If set to `true`, the pipeline will only add the most recent job to the initial queue. """, type: :boolean, - default: false + deprecated: "Use jobs: :latest instead." ], shutdown_timeout: [ type: :timeout, diff --git a/lib/off_broadway/splunk/producer.ex b/lib/off_broadway/splunk/producer.ex index a269c7a..8f66cb5 100644 --- a/lib/off_broadway/splunk/producer.ex +++ b/lib/off_broadway/splunk/producer.ex @@ -142,13 +142,13 @@ defmodule OffBroadway.Splunk.Producer do refetch_timer: nil, refetch_interval: opts[:refetch_interval], name: opts[:name], + jobs: opts[:jobs], current_job: nil, + first_fetch: true, completed_jobs: MapSet.new(), queue: :queue.new(), splunk_client: {client, client_opts}, broadway: opts[:broadway][:name], - only_new: opts[:only_new], - only_latest: opts[:only_latest], shutdown_timeout: opts[:shutdown_timeout] }} end @@ -156,6 +156,7 @@ defmodule OffBroadway.Splunk.Producer do @impl true def prepare_for_start(_module, broadway_opts) do {producer_module, client_opts} = broadway_opts[:producer][:module] + client_opts = preprocess_options(client_opts) case NimbleOptions.validate(client_opts, OffBroadway.Splunk.Options.definition()) do {:error, error} -> @@ -177,6 +178,15 @@ defmodule OffBroadway.Splunk.Producer do end end + # NOTE Remove next major release when :only_new and :only_latest are removed. + defp preprocess_options(opts) do + Enum.reduce(opts, [], fn + {:only_new, true}, acc -> Keyword.put_new(acc, :jobs, :new) + {:only_latest, true}, acc -> Keyword.put_new(acc, :jobs, :latest) + {key, value}, acc -> Keyword.put(acc, key, value) + end) + end + defp format_error(%ValidationError{keys_path: [], message: message}) do "invalid configuration given to OffBroadway.Splunk.Producer.prepare_for_start/2, " <> message @@ -436,11 +446,10 @@ defmodule OffBroadway.Splunk.Producer do |> Enum.filter(& &1.is_done) |> Enum.sort_by(& &1.published, {:asc, DateTime}) - # This flag can only be true *once*, on the first fetch. - # If set, add all current jobs to "completed jobs", and set the flag false - # so it will never trigger again. + # If the `:jobs` option is set to `:new`, add all existing jobs to + # "completed jobs", and wait for new to arrive. completed_jobs = - if state.only_new do + if state.first_fetch && state.jobs == :new do Enum.reduce(jobs, state.completed_jobs, fn job, acc -> MapSet.put(acc, job) end) @@ -460,18 +469,18 @@ defmodule OffBroadway.Splunk.Producer do end end, state.queue, - :queue.from_list(only_latest?(jobs, state.only_latest)) + :queue.from_list(only_latest?(jobs, state.jobs)) ) - %{state | queue: new_queue, completed_jobs: completed_jobs, only_new: false} + %{state | queue: new_queue, completed_jobs: completed_jobs, first_fetch: false} end defp update_queue_from_response({:ok, _response}, state), do: state defp update_queue_from_response({:error, _reason}, state), do: state - @spec only_latest?(list :: list(), flag :: boolean()) :: list() - defp only_latest?(list, true), do: Enum.take(list, -1) - defp only_latest?(list, false), do: list + @spec only_latest?(list :: list(), flag :: atom()) :: list() + defp only_latest?(list, :latest), do: Enum.take(list, -1) + defp only_latest?(list, _), do: list @spec merge_non_nil_fields(map_a :: map(), map_b :: map()) :: map() defp merge_non_nil_fields(map_a, map_b) do diff --git a/mix.lock b/mix.lock index 34dba26..cef2173 100644 --- a/mix.lock +++ b/mix.lock @@ -1,29 +1,29 @@ %{ - "broadway": {:hex, :broadway, "1.0.7", "7808f9e3eb6f53ca6d060f0f9d61012dd8feb0d7a82e62d087dd517b9b66fa53", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e76cfb0a7d64176c387b8b1ddbfb023e2ee8a63e92f43664d78e6d5d0b1177c6"}, - "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, + "broadway": {:hex, :broadway, "1.1.0", "8ed3aea01fd6f5640b3e1515b90eca51c4fc1fac15fb954cdcf75dc054ae719c", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "25e315ef1afe823129485d981dcc6d9b221cea30e625fd5439e9b05f44fb60e4"}, + "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, - "credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"}, + "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, - "ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"}, - "exconstructor": {:hex, :exconstructor, "1.2.10", "72a540c89b4c5af75f88c076727c0318a8f4038df6412dcf546e8771dcac118d", [:mix], [], "hexpm", "6e504f88cc56c3a7313f0d7687c5d3454b014255867232957512a61e5f90bea7"}, - "excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"}, - "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, + "ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"}, + "exconstructor": {:hex, :exconstructor, "1.2.13", "7021eed1450202dcbcd1ef021d6aacf7351854ff9d7964f166931567f9dfa9fb", [:mix], [], "hexpm", "69d3f0251a07bb7c5ef85bde22a1eee577dfbb49852d77fb7ad7b937035aeef2"}, + "excoveralls": {:hex, :excoveralls, "0.18.2", "86efd87a0676a3198ff50b8c77620ea2f445e7d414afa9ec6c4ba84c9f8bdcc2", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "230262c418f0de64077626a498bd4fdf1126d5c2559bb0e6b43deac3005225a4"}, + "file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"}, "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, - "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, - "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, - "makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, - "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, - "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, - "nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"}, + "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, + "mimerl": {:hex, :mimerl, "1.3.0", "d0cd9fc04b9061f82490f6581e0128379830e78535e017f7780f37fea7545726", [:rebar3], [], "hexpm", "a1e15a50d1887217de95f0b9b0793e32853f7c258a5cd227650889b38839fe9d"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, - "tesla": {:hex, :tesla, "1.8.0", "d511a4f5c5e42538d97eef7c40ec4f3e44effdc5068206f42ed859e09e51d1fd", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "10501f360cd926a309501287470372af1a6e1cbed0f43949203a4c13300bc79f"}, + "tesla": {:hex, :tesla, "1.12.1", "fe2bf4250868ee72e5d8b8dfa408d13a00747c41b7237b6aa3b9a24057346681", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "2391efc6243d37ead43afd0327b520314c7b38232091d4a440c1212626fdd6e7"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, } diff --git a/test/off_broadway/splunk/producer_test.exs b/test/off_broadway/splunk/producer_test.exs index 109ecde..1e6c3a3 100644 --- a/test/off_broadway/splunk/producer_test.exs +++ b/test/off_broadway/splunk/producer_test.exs @@ -6,6 +6,7 @@ defmodule OffBroadway.Splunk.ProducerTest do use ExUnit.Case, async: false import ExUnit.CaptureLog + require Logger alias Broadway.Message defmodule MessageServer do @@ -226,16 +227,41 @@ defmodule OffBroadway.Splunk.ProducerTest do assert result_module_opts[:config][:max_events] == 10 end - test ":only_latest is default false" do + test "when :only_latest is true sets jobs: :latest" do assert {[], [ producer: [ module: {OffBroadway.Splunk.Producer, result_module_opts}, concurrency: 1 ] - ]} = prepare_for_start_module_opts(name: "My fine report") + ]} = prepare_for_start_module_opts(name: "My fine report", only_latest: true) + + assert result_module_opts[:jobs] == :latest + end + + test "when :only_new is true sets jobs: :new" do + assert {[], + [ + producer: [ + module: {OffBroadway.Splunk.Producer, result_module_opts}, + concurrency: 1 + ] + ]} = prepare_for_start_module_opts(name: "My fine report", only_new: true) - assert result_module_opts[:only_latest] == false + assert result_module_opts[:jobs] == :new + end + + test "when :jobs is an invalid value" do + assert_raise( + ArgumentError, + ~r/invalid value for :jobs option: expected one of \[:all, :new, :latest\], got: :invalid/, + fn -> + prepare_for_start_module_opts( + name: "My fine report", + jobs: :invalid + ) + end + ) end test ":config is optional with default values" do