From 6b4e8eafd18c714ddc7e812f5fb0bf3f8ebf501a Mon Sep 17 00:00:00 2001 From: Karol Galanciak Date: Thu, 25 Jan 2024 14:29:00 +0100 Subject: [PATCH] add limit for unprocessed_causality_keys --- .rubocop.yml | 3 +++ CHANGELOG.md | 4 ++++ README.md | 1 + .../configuration.rb | 8 +++++++- .../outbox_model.rb | 3 ++- .../active_record_processor.rb | 4 +--- .../configuration_spec.rb | 20 +++++++++++++++++++ .../outbox_model_spec.rb | 15 ++++++++++++++ 8 files changed, 53 insertions(+), 5 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index 0b84f4c..39dedad 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -151,3 +151,6 @@ RSpec/AnyInstance: RSpec/VerifiedDoubles: Exclude: - 'spec/rails_transactional_outbox/runner_spec.rb' + +Layout/LineLength: + Max: 125 diff --git a/CHANGELOG.md b/CHANGELOG.md index 393da73..84c2461 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## [Unreleased] +## [0.4.0] - 2024-01-25 + +- add config option to specify causality keys limit + ## [0.3.1] - 2023-05-24 - add config option whether to raise error when outbox entry record is not found diff --git a/README.md b/README.md index b0f47be..a641516 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ Rails.application.config.to_prepare do config.lock_expiry_time = 10_000 # not required, defaults to 10_000, the unit is milliseconds config.outbox_entries_processor = `RailsTransactionalOutbox::OutboxEntriesProcessors::OrderedByCausalityKeyProcessor`.new # not required, defaults to RailsTransactionalOutbox::OutboxEntriesProcessors::NonOrderedProcessor.new config.outbox_entry_causality_key_resolver = ->(model) { model.tenant_id } # not required, defaults to a lambda returning nil. Needed when using `outbox_entry_causality_key_resolver` + config.unprocessed_causality_keys_limit = 100_000 # not required, defaults to 10_000. Might be a good idea to decrease the value when you start experiencing OOMs - they are likely to be caused by fetching too many causality keys. It is likely to happen when you have huge amount of records to process. end end ``` diff --git a/lib/rails_transactional_outbox/configuration.rb b/lib/rails_transactional_outbox/configuration.rb index a5c59e5..37f89bc 100644 --- a/lib/rails_transactional_outbox/configuration.rb +++ b/lib/rails_transactional_outbox/configuration.rb @@ -6,7 +6,7 @@ class Configuration attr_writer :error_handler, :transactional_outbox_worker_sleep_seconds, :transactional_outbox_worker_idle_delay_multiplier, :outbox_batch_size, :outbox_entries_processor, :lock_client, :lock_expiry_time, :outbox_entry_causality_key_resolver, - :raise_not_found_model_error + :raise_not_found_model_error, :unprocessed_causality_keys_limit def error_handler @error_handler || RailsTransactionalOutbox::ErrorHandlers::NullErrorHandler @@ -55,5 +55,11 @@ def lock_expiry_time def outbox_entry_causality_key_resolver @outbox_entry_causality_key_resolver || ->(_model) {} end + + def unprocessed_causality_keys_limit + return @unprocessed_causality_keys_limit.to_i if defined?(@unprocessed_causality_keys_limit) + + 10_000 + end end end diff --git a/lib/rails_transactional_outbox/outbox_model.rb b/lib/rails_transactional_outbox/outbox_model.rb index d17d7e9..424ddfe 100644 --- a/lib/rails_transactional_outbox/outbox_model.rb +++ b/lib/rails_transactional_outbox/outbox_model.rb @@ -24,9 +24,10 @@ module OutboxModel .where("retry_at IS NULL OR retry_at <= ?", Time.current) } - def self.unprocessed_causality_keys + def self.unprocessed_causality_keys(limit: RailsTransactionalOutbox.configuration.unprocessed_causality_keys_limit) processable_now .select("causality_key") + .limit(limit) .distinct .pluck(:causality_key) end diff --git a/lib/rails_transactional_outbox/record_processors/active_record_processor.rb b/lib/rails_transactional_outbox/record_processors/active_record_processor.rb index d521907..16cdfd6 100644 --- a/lib/rails_transactional_outbox/record_processors/active_record_processor.rb +++ b/lib/rails_transactional_outbox/record_processors/active_record_processor.rb @@ -17,9 +17,7 @@ def applies?(record) def call(record) model = record.infer_model if model.nil? - if RailsTransactionalOutbox.configuration.raise_not_found_model_error? - raise CouldNotFindModelError.new(record) - end + raise CouldNotFindModelError.new(record) if RailsTransactionalOutbox.configuration.raise_not_found_model_error? return end diff --git a/spec/rails_transactional_outbox/configuration_spec.rb b/spec/rails_transactional_outbox/configuration_spec.rb index 3d4beb4..f4c0f1d 100644 --- a/spec/rails_transactional_outbox/configuration_spec.rb +++ b/spec/rails_transactional_outbox/configuration_spec.rb @@ -294,4 +294,24 @@ it { is_expected.to be_nil } end end + + describe "#unprocessed_causality_keys_limit" do + subject(:unprocessed_causality_keys_limit) { configuration.unprocessed_causality_keys_limit } + + let(:configuration) { described_class.new } + + context "when set" do + let(:custom_unprocessed_causality_keys_limit) { "101" } + + before do + configuration.unprocessed_causality_keys_limit = custom_unprocessed_causality_keys_limit + end + + it { is_expected.to eq 101 } + end + + context "when not set" do + it { is_expected.to eq 10_000 } + end + end end diff --git a/spec/rails_transactional_outbox/outbox_model_spec.rb b/spec/rails_transactional_outbox/outbox_model_spec.rb index fe70d28..c2b94bd 100644 --- a/spec/rails_transactional_outbox/outbox_model_spec.rb +++ b/spec/rails_transactional_outbox/outbox_model_spec.rb @@ -171,6 +171,21 @@ it "returns unique unprocessed causality_keys" do expect(unprocessed_causality_keys).to match_array([causality_key, "other"]) end + + context "when limit is specified" do + around do |example| + original_limit = RailsTransactionalOutbox.configuration.unprocessed_causality_keys_limit + RailsTransactionalOutbox.configuration.unprocessed_causality_keys_limit = 1 + + example.run + + RailsTransactionalOutbox.configuration.unprocessed_causality_keys_limit = original_limit + end + + it "returns unique unprocessed causality_keys up to the limit" do + expect(unprocessed_causality_keys).to match_array(["other"]) + end + end end describe ".any_records_to_process?" do