Skip to content

Commit

Permalink
Keep the minRev between syncWatchers loops
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Dec 3, 2024
1 parent b038739 commit c8fee01
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,21 @@ func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event)
curRev := s.store.currentRev
compactionRev := s.store.compactMainRev

wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minRev := s.unsynced.chooseAll(curRev, compactionRev)
for _, batch := range s.victims {
for w, b := range batch {
watcherRev := max(w.minRev, b.moreRev)
if len(b.evs) > 0 {
watcherRev = max(watcherRev, b.evs[len(b.evs)-1].Kv.ModRevision)

Check warning on line 365 in server/storage/mvcc/watchable_store.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/watchable_store.go#L362-L365

Added lines #L362 - L365 were not covered by tests
}
minRev = min(minRev, watcherRev)

Check warning on line 367 in server/storage/mvcc/watchable_store.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/watchable_store.go#L367

Added line #L367 was not covered by tests
}
}
evs = rangeEventsWithReuse(s.store.lg, s.store.b, evs, minRev, curRev+1)

victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
wb := newWatcherBatch(&s.unsynced, evs)
for w := range s.unsynced.watchers {
if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
Expand Down

0 comments on commit c8fee01

Please sign in to comment.