Skip to content

Commit

Permalink
[v1.8.x] Customize Deserializer in kafka:consume Command (#140)
Browse files Browse the repository at this point in the history
* Resolve consumer via service container

* Use app() helper function

* Optional deserializer property

* Support optional deserializer option
  • Loading branch information
cragonnyunt authored Sep 12, 2022
1 parent 136f948 commit 2ef6aa8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
8 changes: 7 additions & 1 deletion src/Console/Commands/KafkaConsumer/Options.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class Options
{
private ?array $topics = null;
private ?string $consumer = null;
private ?string $deserializer = null;
private ?string $groupId = null;
private ?int $commit = 1;
private ?string $dlq = null;
Expand Down Expand Up @@ -36,14 +37,19 @@ public function __construct(array $options, array $config)

public function getTopics(): array
{
return ! empty($this->topics) ? $this->topics : [];
return !empty($this->topics) ? $this->topics : [];
}

public function getConsumer(): ?string
{
return $this->consumer;
}

public function getDeserializer(): ?string
{
return $this->deserializer;
}

public function getGroupId(): ?string
{
return strlen($this->groupId) > 1 ? $this->groupId : $this->config['groupId'];
Expand Down
6 changes: 4 additions & 2 deletions src/Console/Commands/KafkaConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@

use Illuminate\Console\Command;
use Junges\Kafka\Config\Config;
use Junges\Kafka\Console\Commands\KafkaConsumer\Options;
use Junges\Kafka\Consumers\Consumer;
use Junges\Kafka\Contracts\MessageDeserializer;
use Junges\Kafka\Console\Commands\KafkaConsumer\Options;

class KafkaConsumerCommand extends Command
{
protected $signature = 'kafka:consume
{--topics= : The topics to listen for messages (topic1,topic2,...,topicN)}
{--consumer= : The consumer which will consume messages in the specified topic}
{--deserializer= : The deserializer class to use when consuming message}
{--groupId=anonymous : The consumer group id}
{--commit=1}
{--dlq=? : The Dead Letter Queue}
Expand Down Expand Up @@ -55,6 +56,7 @@ public function handle()
$options = new Options($this->options(), $this->config);

$consumer = $options->getConsumer();
$deserializer = $options->getDeserializer();

$config = new Config(
broker: $options->getBroker(),
Expand All @@ -71,7 +73,7 @@ public function handle()
/** @var Consumer $consumer */
$consumer = app(Consumer::class, [
'config' => $config,
'deserializer' => app(MessageDeserializer::class),
'deserializer' => app($deserializer ?? MessageDeserializer::class),
]);

$consumer->consume();
Expand Down

0 comments on commit 2ef6aa8

Please sign in to comment.