Skip to content

Commit

Permalink
Merge pull request #130 from leroy-merlin-br/feat/improve-config-to-a…
Browse files Browse the repository at this point in the history
…ccept-max-pool-interval-ms

feat: improve config to accept max poll interval ms
  • Loading branch information
GetulioMR authored Sep 4, 2023
2 parents f06105e + 3676365 commit 530f03b
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 0 deletions.
4 changes: 4 additions & 0 deletions config/kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@
// time we need to wait until receiving a message?
'timeout' => 20000,

// A max interval for consumer to make poll calls. That means: how much
// time we need to wait for poll calls until consider the consumer has inactive.
'max_poll_interval_ms' => 300000,

// Once you've enabled this, the Kafka consumer will commit the
// offset of the last message received in response to its poll() call
'auto_commit' => true,
Expand Down
6 changes: 6 additions & 0 deletions src/Connectors/Consumer/HighLevel.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ class HighLevel implements ConnectorInterface
public function getConsumer(bool $autoCommit, AbstractConfigManager $configManager): ConsumerInterface
{
$conf = $this->getConf($configManager);
$maxPollIntervalMs = (int) $configManager->get('max_poll_interval_ms');

$conf->set('group.id', $configManager->get('consumer_group'));
$conf->set('auto.offset.reset', $configManager->get('offset_reset'));
$conf->set(
'max.poll.interval.ms',
$maxPollIntervalMs ?: 300000
);

if (!$autoCommit) {
$conf->set('enable.auto.commit', 'false');
}
Expand Down
7 changes: 7 additions & 0 deletions src/Connectors/Consumer/LowLevel.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ class LowLevel implements ConnectorInterface
public function getConsumer(bool $autoCommit, AbstractConfigManager $configManager): ConsumerInterface
{
$conf = $this->getConf();
$maxPollIntervalMs = (int) $configManager->get('max_poll_interval_ms');

$conf->set('group.id', $configManager->get('consumer_group'));
$conf->set(
'max.poll.interval.ms',
$maxPollIntervalMs ?: 300000
);

if (!$autoCommit) {
$conf->set('enable.auto.commit', 'false');
}
Expand Down
1 change: 1 addition & 0 deletions tests/Unit/Connectors/Consumer/HighLevelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public function testItShouldMakeConnectorSetup(): void
'topic_id' => 'some_topic',
'offset_reset' => 'earliest',
'timeout' => 1000,
'max_poll_interval_ms' => 900000,
]);
$connector = new HighLevel();

Expand Down
1 change: 1 addition & 0 deletions tests/Unit/Connectors/Consumer/LowLevelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public function testItShouldMakeConnectorSetup(): void
'consumer_group' => 'some-group',
'topic' => 'some_topic',
'offset_reset' => 'earliest',
'max_poll_interval_ms' => 900000,
'offset' => 0,
'partition' => 1,
]);
Expand Down

0 comments on commit 530f03b

Please sign in to comment.