Skip to content

Commit

Permalink
retry getting events on NotFound and ResourceExhausted
Browse files Browse the repository at this point in the history
  • Loading branch information
peterargue committed Feb 3, 2025
1 parent 97f5f93 commit 1e977a9
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions services/ingestion/block_tracking_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,7 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6
case codes.NotFound:
// we can get not found when reconnecting after a disconnect/restart before the
// next block is finalized. just wait briefly and try again
select {
case <-ctx.Done():
return
case <-time.After(200 * time.Millisecond):
}
time.Sleep(200 * time.Millisecond)
case codes.DeadlineExceeded, codes.Internal:
// these are sometimes returned when the stream is disconnected by a middleware or the server
default:
Expand Down Expand Up @@ -272,13 +268,26 @@ func (r *RPCBlockTrackingSubscriber) getEventsByType(
blockHeader flow.BlockHeader,
eventType string,
) (flow.BlockEvents, error) {
evts, err := r.client.GetEventsForBlockHeader(
ctx,
eventType,
blockHeader,
)
if err != nil {
return flow.BlockEvents{}, err
var evts []flow.BlockEvents
var err error

// retry until we get the block from an execution node that has the events
for {
evts, err = r.client.GetEventsForBlockHeader(
ctx,
eventType,
blockHeader,
)
if err != nil {
// retry after a short pause
if status.Code(err) == codes.NotFound || status.Code(err) == codes.ResourceExhausted {
time.Sleep(200 * time.Millisecond)
continue
}

return flow.BlockEvents{}, err
}
break
}

if len(evts) != 1 {
Expand Down

0 comments on commit 1e977a9

Please sign in to comment.