From 48739f79745f9ec8cc757204303087bf40499cc4 Mon Sep 17 00:00:00 2001 From: Eric Wollesen Date: Tue, 22 Oct 2024 09:17:12 -0600 Subject: [PATCH] WIP --- asyncevents/sarama.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/asyncevents/sarama.go b/asyncevents/sarama.go index 94d4f5c..93a044f 100644 --- a/asyncevents/sarama.go +++ b/asyncevents/sarama.go @@ -12,8 +12,7 @@ import ( "github.com/IBM/sarama" ) -// SaramaEventsConsumer consumes Kafka messages for asynchronous event -// handling. +// SaramaEventsConsumer consumes Kafka messages for asynchronous event handling. type SaramaEventsConsumer struct { Handler sarama.ConsumerGroupHandler ConsumerGroup sarama.ConsumerGroup @@ -32,24 +31,30 @@ func NewSaramaEventsConsumer(consumerGroup sarama.ConsumerGroup, // Run the consumer, to begin consuming Kafka messages. // -// Run is stopped by its context being canceled. When its context is canceled, -// it returns nil. +// Run is stopped by its context being canceled. When its context is canceled, it returns +// nil. func (p *SaramaEventsConsumer) Run(ctx context.Context) (err error) { - defer canceledContextReturnsNil(&err) + //defer canceledContextReturnsNil(&err) for { err := p.ConsumerGroup.Consume(ctx, p.Topics, p.Handler) + //fmt.Println("SaramaEventsConsumer.Run's call to ConsumerGroup.Consume has returned") if err != nil { + //fmt.Println("SaramaEventsConsumer.Run's call to ConsumerGroup.Consume has returned",err) return err } if ctxErr := ctx.Err(); ctxErr != nil { + //fmt.Println("SaramaEventsConsumer.Run's call to ConsumerGroup.Consume has returned ctx",ctxErr) return ctxErr } + //fmt.Println("SaramaEventsConsumer.Run's LOOP") } + //fmt.Println("SaramaEventsConsumer.Run's returning") + return nil } -// canceledContextReturnsNil checks for a context.Canceled error, and when -// found, returns nil instead. +// canceledContextReturnsNil checks for a context.Canceled error, and when found, returns +// nil instead. // // It is meant to be called via defer. func canceledContextReturnsNil(err *error) {