Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Commit

Permalink
refactor: nats broker retrying logic
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Nov 2, 2023
1 parent fd96a6b commit ed7ea37
Showing 1 changed file with 29 additions and 26 deletions.
55 changes: 29 additions & 26 deletions broker/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,8 @@ createConsumer:
})

if cerr != nil {
if context.DeadlineExceeded == cerr {
if attempts > 0 {
attempts--
n.log.Warnf("failed to create consumer for stream %s, retrying in 500ms...", stream)
time.Sleep(500 * time.Millisecond)
goto createConsumer
}
if n.shouldRetryOnError(cerr, &attempts, 500*time.Millisecond) {
goto createConsumer
}
}
}
Expand Down Expand Up @@ -469,13 +464,8 @@ createStream:
}, func(stream string) {})

if err != nil {
if context.DeadlineExceeded == err {
if attempts > 0 {
attempts--
n.log.Warnf("failed to create stream %s, retrying in 500ms...", stream)
time.Sleep(500 * time.Millisecond)
goto createStream
}
if n.shouldRetryOnError(err, &attempts, 500*time.Millisecond) {
goto createStream
}
}

Expand Down Expand Up @@ -508,14 +498,10 @@ fetchEpoch:

return string(entry.Value()), nil
} else if err != nil {
if context.DeadlineExceeded == err {
if attempts > 0 {
attempts--
n.log.Warnf("failed to retrieve epoch, retrying in 1s...")
time.Sleep(1 * time.Second)
goto fetchEpoch
}
if n.shouldRetryOnError(err, &attempts, 1*time.Second) {
goto fetchEpoch
}

return "", errorx.Decorate(err, "failed to create key: %s", epochKey)
}

Expand Down Expand Up @@ -543,7 +529,7 @@ bucketSetup:
goto bucketSetup
}

return nil, errorx.Decorate(err, "failed to create JetStream KV bucket: %s", key)
return nil, errorx.Decorate(err, "failed to create bucket: %s", key)
}

// That means that bucket has been already created
Expand All @@ -552,25 +538,29 @@ bucketSetup:
bucket, err = n.js.KeyValue(context.Background(), key)

if err != nil {
return nil, errorx.Decorate(err, "Failed to retrieve JetStream KV bucket: %s", key)
return nil, errorx.Decorate(err, "Failed to retrieve bucket: %s", key)
}
}
}

if err != nil {
return nil, errorx.Decorate(err, "Failed to create bucket: %s", key)
}

// Invalidate TTL settings if the bucket is the new one.
// We discard the previous bucket and create a new one with the default TTL.
if !newBucket {
status, serr := bucket.Status(context.Background())

if serr != nil {
return nil, errorx.Decorate(serr, "Failed to retrieve JetStream KV bucket status: %s", key)
return nil, errorx.Decorate(serr, "Failed to retrieve bucket status: %s", key)
}

if status.TTL() != ttl {
n.log.Warnf("JetStream KV bucket TTL has been changed, recreating the bucket: key=%s, old=%s, new=%s", key, status.TTL().String(), ttl.String())
n.log.Warnf("bucket TTL has been changed, recreating the bucket: key=%s, old=%s, new=%s", key, status.TTL().String(), ttl.String())
derr := n.js.DeleteKeyValue(context.Background(), key)
if derr != nil {
return nil, errorx.Decorate(derr, "Failed to delete JetStream KV bucket: %s", key)
return nil, errorx.Decorate(derr, "Failed to delete bucket: %s", key)
}

goto bucketSetup
Expand Down Expand Up @@ -767,3 +757,16 @@ func (s *streamSync) idle() {
s.active = false
close(s.cv)
}

func (n *NATS) shouldRetryOnError(err error, attempts *int, cooldown time.Duration) bool {
if context.DeadlineExceeded == err || jetstream.ErrNoStreamResponse == err {
if *attempts > 0 {
(*attempts)--
n.log.Warnf("operation failed with %s, retrying in %s...", err.Error(), cooldown.String())
time.Sleep(cooldown)
return true
}
}

return false
}

0 comments on commit ed7ea37

Please sign in to comment.