From 36763656bb5c1bea6f0081669f1b4573d9565e32 Mon Sep 17 00:00:00 2001 From: GetulioMR Date: Thu, 24 Aug 2023 15:10:07 -0300 Subject: [PATCH] feat: improve config to accept max pool interval ms --- config/kafka.php | 4 ++++ src/Connectors/Consumer/HighLevel.php | 6 ++++++ src/Connectors/Consumer/LowLevel.php | 7 +++++++ tests/Unit/Connectors/Consumer/HighLevelTest.php | 1 + tests/Unit/Connectors/Consumer/LowLevelTest.php | 1 + 5 files changed, 19 insertions(+) diff --git a/config/kafka.php b/config/kafka.php index 9f9ae370..00912858 100644 --- a/config/kafka.php +++ b/config/kafka.php @@ -102,6 +102,10 @@ // time we need to wait until receiving a message? 'timeout' => 20000, + // A max interval for consumer to make poll calls. That means: how much + // time we need to wait for poll calls until consider the consumer has inactive. + 'max_poll_interval_ms' => 300000, + // Once you've enabled this, the Kafka consumer will commit the // offset of the last message received in response to its poll() call 'auto_commit' => true, diff --git a/src/Connectors/Consumer/HighLevel.php b/src/Connectors/Consumer/HighLevel.php index 7a61a171..27d5f3a3 100644 --- a/src/Connectors/Consumer/HighLevel.php +++ b/src/Connectors/Consumer/HighLevel.php @@ -14,9 +14,15 @@ class HighLevel implements ConnectorInterface public function getConsumer(bool $autoCommit, AbstractConfigManager $configManager): ConsumerInterface { $conf = $this->getConf($configManager); + $maxPollIntervalMs = (int) $configManager->get('max_poll_interval_ms'); $conf->set('group.id', $configManager->get('consumer_group')); $conf->set('auto.offset.reset', $configManager->get('offset_reset')); + $conf->set( + 'max.poll.interval.ms', + $maxPollIntervalMs ?: 300000 + ); + if (!$autoCommit) { $conf->set('enable.auto.commit', 'false'); } diff --git a/src/Connectors/Consumer/LowLevel.php b/src/Connectors/Consumer/LowLevel.php index 6c4d3993..c270fc74 100644 --- a/src/Connectors/Consumer/LowLevel.php +++ b/src/Connectors/Consumer/LowLevel.php @@ -15,7 +15,14 @@ class LowLevel implements ConnectorInterface public function getConsumer(bool $autoCommit, AbstractConfigManager $configManager): ConsumerInterface { $conf = $this->getConf(); + $maxPollIntervalMs = (int) $configManager->get('max_poll_interval_ms'); + $conf->set('group.id', $configManager->get('consumer_group')); + $conf->set( + 'max.poll.interval.ms', + $maxPollIntervalMs ?: 300000 + ); + if (!$autoCommit) { $conf->set('enable.auto.commit', 'false'); } diff --git a/tests/Unit/Connectors/Consumer/HighLevelTest.php b/tests/Unit/Connectors/Consumer/HighLevelTest.php index 52d58fc4..681c7ce5 100644 --- a/tests/Unit/Connectors/Consumer/HighLevelTest.php +++ b/tests/Unit/Connectors/Consumer/HighLevelTest.php @@ -20,6 +20,7 @@ public function testItShouldMakeConnectorSetup(): void 'topic_id' => 'some_topic', 'offset_reset' => 'earliest', 'timeout' => 1000, + 'max_poll_interval_ms' => 900000, ]); $connector = new HighLevel(); diff --git a/tests/Unit/Connectors/Consumer/LowLevelTest.php b/tests/Unit/Connectors/Consumer/LowLevelTest.php index 039d096c..b3a081df 100644 --- a/tests/Unit/Connectors/Consumer/LowLevelTest.php +++ b/tests/Unit/Connectors/Consumer/LowLevelTest.php @@ -19,6 +19,7 @@ public function testItShouldMakeConnectorSetup(): void 'consumer_group' => 'some-group', 'topic' => 'some_topic', 'offset_reset' => 'earliest', + 'max_poll_interval_ms' => 900000, 'offset' => 0, 'partition' => 1, ]);