Skip to content

Commit

Permalink
offset
Browse files Browse the repository at this point in the history
Signed-off-by: Meng Yan <[email protected]>
  • Loading branch information
yanmxa committed Dec 19, 2023
1 parent 6b1fe72 commit 195a55e
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 6 deletions.
16 changes: 15 additions & 1 deletion protocol/kafka_confluent/v2/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"context"
"fmt"

cloudevents "github.com/cloudevents/sdk-go/v2"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

Expand Down Expand Up @@ -94,7 +96,7 @@ func WithReceiver(consumer *kafka.Consumer) Option {
}
}

// Opaque key type used to store offsets
// Opaque key type used to store offsets: assgin offset from ctx, commit offset from context
type commitOffsetType struct{}

var offsetKey = commitOffsetType{}
Expand All @@ -114,3 +116,15 @@ func CommitOffsetFrom(ctx context.Context) []kafka.TopicPartition {
}
return nil
}

const (
OffsetEventSource = "io.cloudevents.kafka.confluent.consumer"
OffsetEventType = "io.cloudevents.kafka.confluent.consumer.offsets"
)

func NewOffsetEvent() cloudevents.Event {
e := cloudevents.NewEvent()
e.SetSource(OffsetEventSource)
e.SetType(OffsetEventType)
return e
}
8 changes: 7 additions & 1 deletion protocol/kafka_confluent/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ func (p *Protocol) OpenInbound(ctx context.Context) error {
defer p.consumerMux.Unlock()
logger := cecontext.LoggerFrom(ctx)

// Query committed offsets for each partition
if positions := CommitOffsetFrom(ctx); positions != nil {
if err := p.consumer.Assign(positions); err != nil {
return err
}
}

logger.Infof("Subscribing to topics: %v", p.consumerTopics)
err := p.consumer.SubscribeTopics(p.consumerTopics, p.consumerRebalanceCb)
if err != nil {
Expand All @@ -158,7 +165,6 @@ func (p *Protocol) OpenInbound(ctx context.Context) error {
for run {
select {
case <-ctx.Done():
logger.Info("Context canceled")
run = false
default:
ev := p.consumer.Poll(p.consumerPollTimeout)
Expand Down
57 changes: 57 additions & 0 deletions samples/kafka_confluent/receiver-assign-offset/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package main

import (
"context"
"fmt"
"log"

kafka_confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

var topic = "test-confluent-topic"

func main() {
ctx, cancel := context.WithCancel(context.Background())

receiver, err := kafka_confluent.New(ctx, kafka_confluent.WithConfigMap(&kafka.ConfigMap{
"bootstrap.servers": "127.0.0.1:9092",
"group.id": "test-confluent-offset-id",
// "auto.offset.reset": "earliest",
"enable.auto.commit": "true",
}), kafka_confluent.WithReceiverTopics([]string{topic}))

defer receiver.Close(ctx)

c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1))
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

offsetToStart := []kafka.TopicPartition{
{Topic: &topic, Partition: 0, Offset: 495},
}

log.Printf("will listen consuming topic %s\n", topic)
err = c.StartReceiver(kafka_confluent.CommitOffsetCtx(ctx, offsetToStart), receive)
if err != nil {
log.Fatalf("failed to start receiver: %s", err)
} else {
log.Printf("receiver stopped\n")
}
cancel()
}

func receive(ctx context.Context, event cloudevents.Event) {
ext := event.Extensions()

fmt.Printf("%s[%s:%s] \n", ext[kafka_confluent.KafkaTopicKey],
ext[kafka_confluent.KafkaPartitionKey], ext[kafka_confluent.KafkaOffsetKey])
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ func main() {
"bootstrap.servers": "127.0.0.1:9092",
"group.id": "test-confluent-offset-id",
// "auto.offset.reset": "earliest",
"auto.offset.reset": "latest",
"enable.auto.commit": "false",
}), kafka_confluent.WithReceiverTopics([]string{topic}))

defer receiver.Close(ctx)

c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1))
c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1), client.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
Expand All @@ -48,8 +47,13 @@ func main() {
return
default:
if lastCommitOffset < int64(offsetToCommit.Offset) {
offsetCtx := kafka_confluent.CommitOffsetCtx(ctx, []kafka.TopicPartition{offsetToCommit})
res := c.Send(offsetCtx, kafka_confluent.NewOffsetEvent())
if res != nil {
log.Printf("failed to commit offset: %v", res)
return
}
fmt.Printf(">> commit offset %s[%d:%d] \n", *offsetToCommit.Topic, offsetToCommit.Partition, offsetToCommit.Offset)
c.Send(kafka_confluent.CommitOffsetCtx(ctx, []kafka.TopicPartition{offsetToCommit}), cloudevents.NewEvent())
lastCommitOffset = int64(offsetToCommit.Offset)
}
}
Expand All @@ -73,7 +77,7 @@ func receive(ctx context.Context, event cloudevents.Event) {
log.Printf("failed to parse offset: %s", err)
return
}
if offset%5 == 0 {
if offset%3 == 0 {
offsetToCommit.Offset = kafka.Offset(offset)
}
fmt.Printf("%s[%s:%s] \n", ext[kafka_confluent.KafkaTopicKey],
Expand Down

0 comments on commit 195a55e

Please sign in to comment.