Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ewollesen committed Oct 22, 2024
1 parent e6945d3 commit 48739f7
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions asyncevents/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (
"github.com/IBM/sarama"
)

// SaramaEventsConsumer consumes Kafka messages for asynchronous event
// handling.
// SaramaEventsConsumer consumes Kafka messages for asynchronous event handling.
type SaramaEventsConsumer struct {
Handler sarama.ConsumerGroupHandler
ConsumerGroup sarama.ConsumerGroup
Expand All @@ -32,24 +31,30 @@ func NewSaramaEventsConsumer(consumerGroup sarama.ConsumerGroup,

// Run the consumer, to begin consuming Kafka messages.
//
// Run is stopped by its context being canceled. When its context is canceled,
// it returns nil.
// Run is stopped by its context being canceled. When its context is canceled, it returns
// nil.
func (p *SaramaEventsConsumer) Run(ctx context.Context) (err error) {
defer canceledContextReturnsNil(&err)
//defer canceledContextReturnsNil(&err)

for {
err := p.ConsumerGroup.Consume(ctx, p.Topics, p.Handler)
//fmt.Println("SaramaEventsConsumer.Run's call to ConsumerGroup.Consume has returned")
if err != nil {
//fmt.Println("SaramaEventsConsumer.Run's call to ConsumerGroup.Consume has returned",err)
return err
}
if ctxErr := ctx.Err(); ctxErr != nil {
//fmt.Println("SaramaEventsConsumer.Run's call to ConsumerGroup.Consume has returned ctx",ctxErr)
return ctxErr
}
//fmt.Println("SaramaEventsConsumer.Run's LOOP")
}
//fmt.Println("SaramaEventsConsumer.Run's returning")
return nil
}

// canceledContextReturnsNil checks for a context.Canceled error, and when
// found, returns nil instead.
// canceledContextReturnsNil checks for a context.Canceled error, and when found, returns
// nil instead.
//
// It is meant to be called via defer.
func canceledContextReturnsNil(err *error) {
Expand Down

0 comments on commit 48739f7

Please sign in to comment.