Skip to content

Commit

Permalink
pubsub: add option to skip msg retrying and option to limit number of…
Browse files Browse the repository at this point in the history
… retries
  • Loading branch information
magicxyyz committed Nov 22, 2024
1 parent fb241d0 commit f8db4cf
Showing 1 changed file with 61 additions and 26 deletions.
87 changes: 61 additions & 26 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,31 @@ type ConsumerConfig struct {
ResponseEntryTimeout time.Duration `koanf:"response-entry-timeout"`
// Minimum idle time after which messages will be autoclaimed
IdletimeToAutoclaim time.Duration `koanf:"idletime-to-autoclaim"`
// Enables retrying too long pending messages
Retry bool `koanf:"retry"`
// Number of message retries after which we set error response
MaxRetryCount int64 `koanf:"max-retry-count"`
}

var DefaultConsumerConfig = ConsumerConfig{
ResponseEntryTimeout: time.Hour,
IdletimeToAutoclaim: 5 * time.Minute,
Retry: true,
MaxRetryCount: -1,
}

var TestConsumerConfig = ConsumerConfig{
ResponseEntryTimeout: time.Minute,
IdletimeToAutoclaim: 30 * time.Millisecond,
Retry: true,
MaxRetryCount: -1,
}

func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".response-entry-timeout", DefaultConsumerConfig.ResponseEntryTimeout, "timeout for response entry")
f.Duration(prefix+".idletime-to-autoclaim", DefaultConsumerConfig.IdletimeToAutoclaim, "After a message spends this amount of time in PEL (Pending Entries List i.e claimed by another consumer but not Acknowledged) it will be allowed to be autoclaimed by other consumers")
f.Duration(prefix+".idletime-to-autoclaim", DefaultConsumerConfig.IdletimeToAutoclaim, "After a message spends this amount of time in PEL (Pending Entries List i.e claimed by another consumer but not Acknowledged) it will be allowed to be autoclaimed by other consumers. This option should be set to the same value for all consumers and producers.")
f.Bool(prefix+".retry", DefaultConsumerConfig.Retry, "enables autoclaim for this consumer, if set to false this consumer will not check messages from PEL (Pending Entries List)")
f.Int64(prefix+".max-retry-count", DefaultConsumerConfig.MaxRetryCount, "number of message retries after which this consumer will set an error response and Acknowledge the message (-1 = no limit)")
}

// Consumer implements a consumer for redis stream provides heartbeat to
Expand Down Expand Up @@ -109,32 +119,55 @@ func decrementMsgIdByOne(msgId string) string {
// Consumer first checks it there exists pending message that is claimed by
// unresponsive consumer, if not then reads from the stream.
func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Request], error) {
// First try to XAUTOCLAIM, with start as a random messageID from PEL with MinIdle as IdletimeToAutoclaim
// this prioritizes processing PEL messages that have been waiting for more than IdletimeToAutoclaim duration
var messages []redis.XMessage
if pendingMsgs, err := c.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: c.redisStream,
Group: c.redisGroup,
Start: "-",
End: "+",
Count: 50,
Idle: c.cfg.IdletimeToAutoclaim,
}).Result(); err != nil {
if !errors.Is(err, redis.Nil) {
log.Error("Error from XpendingExt in getting PEL for auto claim", "err", err, "penindlen", len(pendingMsgs))
}
} else if len(pendingMsgs) > 0 {
idx := rand.Intn(len(pendingMsgs))
messages, _, err = c.client.XAutoClaim(ctx, &redis.XAutoClaimArgs{
Group: c.redisGroup,
Consumer: c.id,
MinIdle: c.cfg.IdletimeToAutoclaim, // Minimum idle time for messages to claim (in milliseconds)
Stream: c.redisStream,
Start: decrementMsgIdByOne(pendingMsgs[idx].ID),
Count: 1,
}).Result()
if err != nil {
log.Info("error from xautoclaim", "err", err)
if c.cfg.Retry {
// First try to XAUTOCLAIM, with start as a random messageID from PEL with MinIdle as IdletimeToAutoclaim
// this prioritizes processing PEL messages that have been waiting for more than IdletimeToAutoclaim duration
if pendingMsgs, err := c.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: c.redisStream,
Group: c.redisGroup,
Start: "-",
End: "+",
Count: 50,
Idle: c.cfg.IdletimeToAutoclaim,
}).Result(); err != nil {
if !errors.Is(err, redis.Nil) {
log.Error("Error from XpendingExt in getting PEL for auto claim", "err", err, "pendingLen", len(pendingMsgs))
}
} else if len(pendingMsgs) > 0 {
if c.cfg.MaxRetryCount != -1 {
var exceededRetries []redis.XPendingExt
var filtered []redis.XPendingExt
for _, msg := range pendingMsgs {
if msg.RetryCount > c.cfg.MaxRetryCount {
exceededRetries = append(exceededRetries, msg)
} else {
filtered = append(filtered, msg)
}
}
if len(exceededRetries) > 0 {
idx := rand.Intn(len(exceededRetries))
if err := c.SetError(ctx, exceededRetries[idx].ID, "too many retries"); err != nil {
// TODO(magic): don't log error when other consumer set the error before us
log.Error("Failed to set error response for a message that exceeded retries limit", "err", err, "retryCount", exceededRetries[idx].RetryCount)
}
}
pendingMsgs = filtered
}
if len(pendingMsgs) > 0 {
idx := rand.Intn(len(pendingMsgs))
messages, _, err = c.client.XAutoClaim(ctx, &redis.XAutoClaimArgs{
Group: c.redisGroup,
Consumer: c.id,
MinIdle: c.cfg.IdletimeToAutoclaim, // Minimum idle time for messages to claim (in milliseconds)
Stream: c.redisStream,
Start: decrementMsgIdByOne(pendingMsgs[idx].ID),
Count: 1,
}).Result()
if err != nil {
log.Info("error from xautoclaim", "err", err)
}
}
}
}
if len(messages) == 0 {
Expand Down Expand Up @@ -228,6 +261,7 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
log.Debug("consumer: xdel", "cid", c.id, "messageId", messageID)
if _, err := c.client.XDel(ctx, c.redisStream, messageID).Result(); err != nil {
return fmt.Errorf("deleting message: %v, error: %w", messageID, err)
}
Expand All @@ -249,6 +283,7 @@ func (c *Consumer[Request, Response]) SetError(ctx context.Context, messageID st
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
log.Debug("consumer: xdel", "cid", c.id, "messageId", messageID)
if _, err := c.client.XDel(ctx, c.redisStream, messageID).Result(); err != nil {
return fmt.Errorf("deleting message: %v, error: %w", messageID, err)
}
Expand Down

0 comments on commit f8db4cf

Please sign in to comment.