diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index ee47c2c6d72..004962e4ba3 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -357,12 +357,21 @@ func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event) curRev := s.store.currentRev compactionRev := s.store.compactMainRev - wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) + minRev := s.unsynced.chooseAll(curRev, compactionRev) + for _, batch := range s.victims { + for w, b := range batch { + watcherRev := max(w.minRev, b.moreRev) + if len(b.evs) > 0 { + watcherRev = max(watcherRev, b.evs[len(b.evs)-1].Kv.ModRevision) + } + minRev = min(minRev, watcherRev) + } + } evs = rangeEventsWithReuse(s.store.lg, s.store.b, evs, minRev, curRev+1) victims := make(watcherBatch) - wb := newWatcherBatch(wg, evs) - for w := range wg.watchers { + wb := newWatcherBatch(&s.unsynced, evs) + for w := range s.unsynced.watchers { if w.minRev < compactionRev { // Skip the watcher that failed to send compacted watch response due to w.ch is full. // Next retry of syncWatchers would try to resend the compacted watch response to w.ch