Skip to content

Commit

Permalink
Add new option consume jobs (#11)
Browse files Browse the repository at this point in the history
* chore: update dependency versions

* add: :jobs enum to replace :only_new and :only_latest

* chore: update GH actions runner from broadway repo

* fix: typo

* add: update ubuntu version for GH actions

* add: loosen up on otp-version

* add: don't need to test ancient versions
  • Loading branch information
rhblind authored Aug 20, 2024
1 parent 2b52189 commit cd04a82
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 63 deletions.
67 changes: 36 additions & 31 deletions .github/workflows/elixir.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
name: CI Pipeline

on:
push:
branches: ["main"]
pull_request:
push:
branches: ["main"]

env:
Expand All @@ -16,46 +15,52 @@ permissions:
jobs:
build:
name: Build and test
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
strategy:
fail-fast: true
fail-fast: false
matrix:
otp: ["24.3", "25.1", "26.1"]
elixir: ["1.14.5", "1.15.6"]
include:
- elixir: "1.14"
otp: "23.0"

# Latest versions.
- elixir: "1.17"
otp: "27.0"
lint: lint
coverage: coverage
steps:
- uses: actions/checkout@v3
- name: Set up Elixir
- name: Check out this repository
uses: actions/checkout@v4

- name: Set up Erlang and Elixir
uses: erlef/setup-beam@v1
with:
elixir-version: ${{matrix.elixir}}
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}

- name: Restore dependencies cache
- name: Cache Mix dependencies
uses: actions/cache@v3
id: dependency-cache
id: cache-deps
with:
path: deps
key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }}
restore-keys: ${{ runner.os }}-mix-

- name: Install dependencies
if: steps.dependency-cache.outputs.cache-hit != 'true'
run: |
mix local.rebar --force
mix local.hex --force
mix deps.get
path: |
deps
_build
key: |
mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}-
# - name: Run the formatter
# run: mix format --check-formatted
- run: mix do deps.get --check-locked, deps.compile
if: steps.cache-deps.outputs.cache-hit != 'true'

- name: Check for unused dependencies
run: mix deps.unlock --check-unused
- run: mix format --check-formatted
if: ${{ matrix.lint }}

- name: Compile Mix dependencies
run: mix deps.compile
- run: mix deps.unlock --check-unused
if: ${{ matrix.lint }}

- name: Compile code and check for warnings
run: mix compile --warnings-as-errors
- run: mix compile --warnings-as-errors
if: ${{ matrix.lint }}

- name: Run test suite with coverage
run: mix coveralls
- run: mix coveralls
if: ${{matrix.coverage}}
10 changes: 10 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## 2.1.2 - Patch release

_Released 2024-08-20_

- Replace `:only_new` and `:only_latest` options with a `:jobs` option. Set `:jobs` to `:new` to only process new jobs, and `:latest`
to only process the latest available job.
- Add deprecation warnings to `:only_new` and `:only_latest` option. The options are still accepted but will be removed in the
next major release.
- If the client receives an `{:error, reason}` tuple, reschedule another fetch instead of blowing up.

## 2.1.1 - Patch release

_Released 2023-12-28_
Expand Down
11 changes: 9 additions & 2 deletions lib/off_broadway/splunk/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ defmodule OffBroadway.Splunk.Options do
The report or alert name for the Splunk job we want to consume events from.
"""
],
jobs: [
default: :all,
type: {:in, [:all, :new, :latest]},
doc: """
Which jobs to add to the initial queue. Possible values are: `:all`, `:new`, `:latest`.
"""
],
receive_interval: [
type: :non_neg_integer,
doc: """
Expand All @@ -35,14 +42,14 @@ defmodule OffBroadway.Splunk.Options do
If set to `true`, the pipeline will skip adding any existing jobs to the initial queue.
""",
type: :boolean,
default: false
deprecated: "Use jobs: :new instead."
],
only_latest: [
doc: """
If set to `true`, the pipeline will only add the most recent job to the initial queue.
""",
type: :boolean,
default: false
deprecated: "Use jobs: :latest instead."
],
shutdown_timeout: [
type: :timeout,
Expand Down
31 changes: 20 additions & 11 deletions lib/off_broadway/splunk/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,21 @@ defmodule OffBroadway.Splunk.Producer do
refetch_timer: nil,
refetch_interval: opts[:refetch_interval],
name: opts[:name],
jobs: opts[:jobs],
current_job: nil,
first_fetch: true,
completed_jobs: MapSet.new(),
queue: :queue.new(),
splunk_client: {client, client_opts},
broadway: opts[:broadway][:name],
only_new: opts[:only_new],
only_latest: opts[:only_latest],
shutdown_timeout: opts[:shutdown_timeout]
}}
end

@impl true
def prepare_for_start(_module, broadway_opts) do
{producer_module, client_opts} = broadway_opts[:producer][:module]
client_opts = preprocess_options(client_opts)

case NimbleOptions.validate(client_opts, OffBroadway.Splunk.Options.definition()) do
{:error, error} ->
Expand All @@ -177,6 +178,15 @@ defmodule OffBroadway.Splunk.Producer do
end
end

# NOTE Remove next major release when :only_new and :only_latest are removed.
defp preprocess_options(opts) do
Enum.reduce(opts, [], fn
{:only_new, true}, acc -> Keyword.put_new(acc, :jobs, :new)
{:only_latest, true}, acc -> Keyword.put_new(acc, :jobs, :latest)
{key, value}, acc -> Keyword.put(acc, key, value)
end)
end

defp format_error(%ValidationError{keys_path: [], message: message}) do
"invalid configuration given to OffBroadway.Splunk.Producer.prepare_for_start/2, " <>
message
Expand Down Expand Up @@ -436,11 +446,10 @@ defmodule OffBroadway.Splunk.Producer do
|> Enum.filter(& &1.is_done)
|> Enum.sort_by(& &1.published, {:asc, DateTime})

# This flag can only be true *once*, on the first fetch.
# If set, add all current jobs to "completed jobs", and set the flag false
# so it will never trigger again.
# If the `:jobs` option is set to `:new`, add all existing jobs to
# "completed jobs", and wait for new to arrive.
completed_jobs =
if state.only_new do
if state.first_fetch && state.jobs == :new do
Enum.reduce(jobs, state.completed_jobs, fn job, acc ->
MapSet.put(acc, job)
end)
Expand All @@ -460,18 +469,18 @@ defmodule OffBroadway.Splunk.Producer do
end
end,
state.queue,
:queue.from_list(only_latest?(jobs, state.only_latest))
:queue.from_list(only_latest?(jobs, state.jobs))
)

%{state | queue: new_queue, completed_jobs: completed_jobs, only_new: false}
%{state | queue: new_queue, completed_jobs: completed_jobs, first_fetch: false}
end

defp update_queue_from_response({:ok, _response}, state), do: state
defp update_queue_from_response({:error, _reason}, state), do: state

@spec only_latest?(list :: list(), flag :: boolean()) :: list()
defp only_latest?(list, true), do: Enum.take(list, -1)
defp only_latest?(list, false), do: list
@spec only_latest?(list :: list(), flag :: atom()) :: list()
defp only_latest?(list, :latest), do: Enum.take(list, -1)
defp only_latest?(list, _), do: list

@spec merge_non_nil_fields(map_a :: map(), map_b :: map()) :: map()
defp merge_non_nil_fields(map_a, map_b) do
Expand Down
Loading

0 comments on commit cd04a82

Please sign in to comment.