From 2b9d001ea9cdae1355bc9a92a43f908590ca0dd8 Mon Sep 17 00:00:00 2001 From: fairyhunter13 Date: Wed, 24 Jun 2020 11:52:19 +0700 Subject: [PATCH] -Adding manual assignment in the application :sparkles: --- const.go | 5 +++-- consumer.go | 8 ++++++++ container_consumer.go | 9 ++++++--- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/const.go b/const.go index 9e8166b..c8bdf75 100644 --- a/const.go +++ b/const.go @@ -2,8 +2,9 @@ package kafkaclient // List of const for consumers const ( - GoEventsChannelEnable = "go.events.channel.enable" - ClientID = "client.id" + GoEventsChannelEnable = "go.events.channel.enable" + GoApplicationRebalanceEnable = "go.application.rebalance.enable" + ClientID = "client.id" ) // List of resources type diff --git a/consumer.go b/consumer.go index 5939757..8cad03a 100644 --- a/consumer.go +++ b/consumer.go @@ -34,6 +34,10 @@ func (c *Consumer) consume(args ConsumeArgs) (err error) { switch realType := event.(type) { case *kafka.Message: c.handleMessage(realType, &args) + case kafka.AssignedPartitions: + c.Assign(realType.Partitions) + case kafka.RevokedPartitions: + c.Unassign() } } }(c, args) @@ -67,6 +71,10 @@ func (c *Consumer) consumeBatch(args ConsumeArgs) (err error) { switch realType := event.(type) { case *kafka.Message: c.handleMessage(realType, &args) + case kafka.AssignedPartitions: + c.Assign(realType.Partitions) + case kafka.RevokedPartitions: + c.Unassign() } } }(c, args) diff --git a/container_consumer.go b/container_consumer.go index d53dad2..00d90c9 100644 --- a/container_consumer.go +++ b/container_consumer.go @@ -4,9 +4,11 @@ import "github.com/confluentinc/confluent-kafka-go/kafka" // Consume create consumers based on per thread and directly consume messages from the Kafka broker. func (c *Container) Consume(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error) { + newConfig := c.cloneConfig(config) + newConfig[GoApplicationRebalanceEnable] = true for numWorker := uint64(1); numWorker <= args.Workers; numWorker++ { var cons *Consumer - cons, err = c.NewConsumer(config) + cons, err = c.NewConsumer(newConfig) if err != nil { return } @@ -39,10 +41,11 @@ func (c *Container) ConsumeEvent(config kafka.ConfigMap, args ConsumeArgs) (cons // ConsumeBatch create consumers based on per thread and directly consume messages from the Kafka broker. // ConsumeBatch is an improved version of Consume but polling in a batch manner. func (c *Container) ConsumeBatch(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error) { + newConfig := c.cloneConfig(config) + newConfig[GoEventsChannelEnable] = true + newConfig[GoApplicationRebalanceEnable] = true for numWorker := uint64(1); numWorker <= args.Workers; numWorker++ { var cons *Consumer - newConfig := c.cloneConfig(config) - newConfig[GoEventsChannelEnable] = true cons, err = c.NewConsumer(newConfig) if err != nil { return