Skip to content

Commit

Permalink
drop consumers if their out buffer fills up (#417)
Browse files Browse the repository at this point in the history
right now if these buffers fill up we just start dropping events, and
then when the buffers drain we just resume sending events. this seems
quite wrong.
  • Loading branch information
whyrusleeping authored Nov 9, 2023
2 parents 32b3dce + 92bcf49 commit d47a672
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
label "github.com/bluesky-social/indigo/api/label"
Expand Down Expand Up @@ -73,11 +74,20 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) {
select {
case s.outgoing <- evt:
case <-s.done:
default:
log.Warnw("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident)
go func(torem *Subscriber) {
em.rmSubscriber(torem)
select {
case torem.outgoing <- &XRPCStreamEvent{
Error: &ErrorFrame{
Error: "ConsumerTooSlow",
},
}:
case <-time.After(time.Second * 5):
log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident)
}
torem.cleanup()
}(s)
default:
log.Warnf("event overflow (%d)", len(s.outgoing))
}
s.broadcastCounter.Inc()
}
Expand All @@ -100,6 +110,8 @@ type Subscriber struct {

done chan struct{}

cleanup func()

ident string
enqueuedCounter prometheus.Counter
broadcastCounter prometheus.Counter
Expand Down Expand Up @@ -164,15 +176,15 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func
broadcastCounter: eventsBroadcast.WithLabelValues(ident),
}

cleanup := func() {
sub.cleanup = sync.OnceFunc(func() {
close(done)
em.rmSubscriber(sub)
close(sub.outgoing)
}
})

if since == nil {
em.addSubscriber(sub)
return sub.outgoing, cleanup, nil
return sub.outgoing, sub.cleanup, nil
}

out := make(chan *XRPCStreamEvent, em.bufferSize)
Expand Down Expand Up @@ -243,7 +255,7 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func
}
}()

return out, cleanup, nil
return out, sub.cleanup, nil
}

func sequenceForEvent(evt *XRPCStreamEvent) int64 {
Expand Down

0 comments on commit d47a672

Please sign in to comment.