Skip to content

Commit

Permalink
WIP: fan out consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
ewollesen committed Jul 24, 2024
1 parent af75a36 commit 9b012dd
Showing 1 changed file with 80 additions and 24 deletions.
104 changes: 80 additions & 24 deletions asyncevents/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"log"
"log/slog"
"math"
"slices"
"strings"
"time"

"github.com/IBM/sarama"
Expand Down Expand Up @@ -254,21 +252,27 @@ func Fib(n int) int {
}

// TopicShiftingRetryingConsumer retries by moving failing messages between topics.
//
// Inspired by https://www.uber.com/blog/reliable-reprocessing/
type TopicShiftingRetryingConsumer struct {
// TopicSuffixSeparator allows calculation of the next topic.
TopicSuffixSeparator string
// TopicSuffixes of the retry process, in desired transition order.
TopicSuffixes []string
// Consumer processes messages, those that fail will be shifted.
// Delay before processing a message.
//
// The default, 0, will process immediately, and is suitable for use in the first
// consumer of a sequence.
Delay time.Duration
// NextTopic is the target topic of failing messages.
NextTopic string
// Consumer processes messages, those that fail will be shifted to the next topic.
Consumer SaramaMessageConsumer
// Producer shifts the failed messages to the next retry topic.
// Producer shifts the failed messages to the next topic.
Producer sarama.AsyncProducer
// GroupID for the sarama consumer group.
GroupID string
}

func (c *TopicShiftingRetryingConsumer) Setup(session sarama.ConsumerGroupSession) error {
// TODO start retry topic consumers
// TODO start retry topic consumers? Or is that handled somewhere else?
// if this consumer only had a single "next" topic... that might do the thing..?
// TODO ensure retry consumers are in "read committed" mode
return c.Consumer.Setup(session)
}
Expand All @@ -281,6 +285,13 @@ func (c *TopicShiftingRetryingConsumer) Cleanup(session sarama.ConsumerGroupSess
func (c *TopicShiftingRetryingConsumer) Consume(ctx context.Context,
session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error {

select {
case <-time.After(c.Delay):
// no op
case <-ctx.Done():
return nil
}

err := c.Consumer.Consume(ctx, session, msg)
if err != nil {
if shiftErr := c.shiftMessage(msg); shiftErr != nil {
Expand All @@ -292,10 +303,6 @@ func (c *TopicShiftingRetryingConsumer) Consume(ctx context.Context,
}

func (c *TopicShiftingRetryingConsumer) shiftMessage(msg *sarama.ConsumerMessage) error {
nextTopic, nextErr := c.nextTopic(msg.Topic)
if nextErr != nil {
return fmt.Errorf("getting next topic: %s", nextErr)
}
if txnErr := c.Producer.BeginTxn(); txnErr != nil {
return fmt.Errorf("beginning transaction: %s", txnErr)
}
Expand All @@ -307,7 +314,7 @@ func (c *TopicShiftingRetryingConsumer) shiftMessage(msg *sarama.ConsumerMessage
pHeaders[i] = *msg.Headers[i]
}
pMsg := &sarama.ProducerMessage{
Topic: nextTopic,
Topic: c.NextTopic,
Key: sarama.ByteEncoder(msg.Key),
Value: sarama.ByteEncoder(msg.Value),
Headers: pHeaders,
Expand All @@ -319,16 +326,65 @@ func (c *TopicShiftingRetryingConsumer) shiftMessage(msg *sarama.ConsumerMessage
return nil
}

func (c *TopicShiftingRetryingConsumer) nextTopic(topic string) (string, error) {
pieces := strings.Split(topic, c.TopicSuffixSeparator)
if len(pieces) < 2 {
return "", fmt.Errorf("no suffix found")
// SaramaClientConfig joins together the necessary configuration for a [sarama.Client].
type SaramaClientConfig struct {
Brokers []string
GroupID string
Topics []string

SaramConfig *sarama.Config
}

type FanOutConsumer struct {
Consumers []SaramaMessageConsumer
}

func NewFanOutConsumer2(consumers []SaramaMessageConsumer) *FanOutConsumer {
return &FanOutConsumer{
Consumers: consumers,
}
}

func NewFanOutConsumer(topicBase string, producer sarama.AsyncProducer, groupID string,
delays []time.Duration, consumer SaramaMessageConsumer) *FanOutConsumer {

consumers := make([]SaramaMessageConsumer, 0, len(delays))
for idx, delay := range delays {
topicSuffix := "dead"
if len(delays) > idx+1 {
topicSuffix = delays[idx+1].String()
}
consumers = append(consumers, &TopicShiftingRetryingConsumer{
NextTopic: topicBase + topicSuffix,
Delay: delay,
Producer: producer,
Consumer: consumer,
})
}
suffix := pieces[len(pieces)-1]
base, _ := strings.CutSuffix(topic, suffix)
index := slices.IndexFunc(c.TopicSuffixes, func(s string) bool { return s == suffix })
if index == -1 || index == len(c.TopicSuffixes)-1 {
return "", fmt.Errorf("no more suffixes")

return &FanOutConsumer{
Consumers: consumers,
}
}

func (c *FanOutConsumer) Consume(ctx context.Context, session sarama.ConsumerGroupSession,
msg *sarama.ConsumerMessage) error {

return c.Consumers[0].Consume(ctx, session, msg)
}

func (c *FanOutConsumer) Setup(session sarama.ConsumerGroupSession) error {
var err error
for _, consumer := range c.Consumers {
err = errors.Join(err, consumer.Setup(session))
}
return err
}

func (c *FanOutConsumer) Cleanup(session sarama.ConsumerGroupSession) error {
var err error
for _, consumer := range c.Consumers {
err = errors.Join(err, consumer.Cleanup(session))
}
return base + c.TopicSuffixSeparator + c.TopicSuffixes[index+1], nil
return err
}

0 comments on commit 9b012dd

Please sign in to comment.