diff --git a/server/config/config.go b/server/config/config.go index b4a8f61a575..2229ca5cb58 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -363,6 +363,6 @@ func (c *ServerConfig) BootstrapTimeoutEffective() time.Duration { func (c *ServerConfig) BackendPath() string { return datadir.ToBackendFileName(c.DataDir) } -func (c *ServerConfig) MaxRequestBytesWithOverhead() uint { - return c.MaxRequestBytes + grpcOverheadBytes +func (c *ServerConfig) MaxRequestBytesWithOverhead() int { + return int(c.MaxRequestBytes) + grpcOverheadBytes } diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index 32949207805..044666d6eb3 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -61,7 +61,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer opts = append(opts, grpc.ChainUnaryInterceptor(chainUnaryInterceptors...)) opts = append(opts, grpc.ChainStreamInterceptor(chainStreamInterceptors...)) - opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytesWithOverhead()))) + opts = append(opts, grpc.MaxRecvMsgSize(s.Cfg.MaxRequestBytesWithOverhead())) opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams)) diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index b0a7e4a1926..a33250a40f3 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -43,7 +43,7 @@ type watchServer struct { clusterID int64 memberID int64 - maxRequestBytes uint + maxRequestBytes int sg apply.RaftStatusGetter watchable mvcc.WatchableKV @@ -126,7 +126,7 @@ type serverWatchStream struct { clusterID int64 memberID int64 - maxRequestBytes uint + maxRequestBytes int sg apply.RaftStatusGetter watchable mvcc.WatchableKV @@ -544,12 +544,12 @@ func IsCreateEvent(e mvccpb.Event) bool { func sendFragments( wr *pb.WatchResponse, - maxRequestBytes uint, + maxRequestBytes int, sendFunc func(*pb.WatchResponse) error, ) error { // no need to fragment if total request size is smaller // than max request limit or response contains only one event - if uint(wr.Size()) < maxRequestBytes || len(wr.Events) < 2 { + if wr.Size() < maxRequestBytes || len(wr.Events) < 2 { return sendFunc(wr) } @@ -562,7 +562,7 @@ func sendFragments( cur := ow for _, ev := range wr.Events[idx:] { cur.Events = append(cur.Events, ev) - if len(cur.Events) > 1 && uint(cur.Size()) >= maxRequestBytes { + if len(cur.Events) > 1 && cur.Size() >= maxRequestBytes { cur.Events = cur.Events[:len(cur.Events)-1] break } diff --git a/server/etcdserver/api/v3rpc/watch_test.go b/server/etcdserver/api/v3rpc/watch_test.go index caa86f91ad7..e7868ddf8d2 100644 --- a/server/etcdserver/api/v3rpc/watch_test.go +++ b/server/etcdserver/api/v3rpc/watch_test.go @@ -27,7 +27,7 @@ import ( func TestSendFragment(t *testing.T) { tt := []struct { wr *pb.WatchResponse - maxRequestBytes uint + maxRequestBytes int fragments int werr error }{ diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 6001542d348..fe1f192146a 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -516,7 +516,7 @@ func TestHashKVHandler(t *testing.T) { etcdSrv.cluster.SetID(types.ID(localClusterID), types.ID(localClusterID)) be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be) - etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{}) defer func() { assert.NoError(t, etcdSrv.kv.Close()) }() diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2677e929f98..824b6328be8 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -368,9 +368,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } - mvccStoreConfig := mvcc.StoreConfig{ - CompactionBatchLimit: cfg.CompactionBatchLimit, - CompactionSleepInterval: cfg.CompactionSleepInterval, + mvccStoreConfig := mvcc.WatchableStoreConfig{ + StoreConfig: mvcc.StoreConfig{ + CompactionBatchLimit: cfg.CompactionBatchLimit, + CompactionSleepInterval: cfg.CompactionSleepInterval, + }, + WatchBatchMaxSize: cfg.MaxRequestBytesWithOverhead(), } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage()) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 265bce38f56..a804bb06146 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -652,7 +652,7 @@ func TestSnapshotDisk(t *testing.T) { v2store: st, consistIndex: cindex.NewConsistentIndex(be), } - srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{}) defer func() { assert.NoError(t, srv.kv.Close()) }() @@ -703,7 +703,7 @@ func TestSnapshotMemory(t *testing.T) { v2store: st, consistIndex: cindex.NewConsistentIndex(be), } - srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{}) defer func() { assert.NoError(t, srv.kv.Close()) }() @@ -775,7 +775,7 @@ func TestSnapshotOrdering(t *testing.T) { beHooks: serverstorage.NewBackendHooks(lg, ci), } - s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{}) s.be = be s.start() @@ -869,7 +869,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 1), } - s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{}) s.be = be s.start() diff --git a/server/storage/mvcc/kv_test.go b/server/storage/mvcc/kv_test.go index c727b444af7..6296b9c375b 100644 --- a/server/storage/mvcc/kv_test.go +++ b/server/storage/mvcc/kv_test.go @@ -757,7 +757,7 @@ func TestKVSnapshot(t *testing.T) { func TestWatchableKVWatch(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index ee47c2c6d72..19cd3d48138 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -50,6 +50,7 @@ type watchable interface { type watchableStore struct { *store + watchBatchMaxSize int // mu protects watcher groups and batches. It should never be locked // before locking store.mu to avoid deadlock. @@ -76,7 +77,7 @@ var _ WatchableKV = (*watchableStore)(nil) // cancel operations. type cancelFunc func() -func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore { +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg WatchableStoreConfig) *watchableStore { s := newWatchableStore(lg, b, le, cfg) s.wg.Add(2) go s.syncWatchersLoop() @@ -84,16 +85,22 @@ func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *w return s } -func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore { +type WatchableStoreConfig struct { + StoreConfig + WatchBatchMaxSize int +} + +func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg WatchableStoreConfig) *watchableStore { if lg == nil { lg = zap.NewNop() } s := &watchableStore{ - store: NewStore(lg, b, le, cfg), - victimc: make(chan struct{}, 1), - unsynced: newWatcherGroup(), - synced: newWatcherGroup(), - stopc: make(chan struct{}), + store: NewStore(lg, b, le, cfg.StoreConfig), + victimc: make(chan struct{}, 1), + unsynced: newWatcherGroup(), + synced: newWatcherGroup(), + stopc: make(chan struct{}), + watchBatchMaxSize: cfg.WatchBatchMaxSize, } s.store.ReadView = &readView{s} s.store.WriteView = &writeView{s} @@ -237,7 +244,7 @@ func (s *watchableStore) syncWatchersLoop() { delayTicker.Reset(waitDuration) // more work pending? - if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers { + if unsyncedWatchers != 0 && lastUnsyncedWatchers >= unsyncedWatchers { // be fair to other store operations by yielding time taken delayTicker.Reset(syncDuration) } @@ -361,7 +368,7 @@ func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event) evs = rangeEventsWithReuse(s.store.lg, s.store.b, evs, minRev, curRev+1) victims := make(watcherBatch) - wb := newWatcherBatch(wg, evs) + wb := newWatcherBatch(wg, evs, s.watchBatchMaxSize) for w := range wg.watchers { if w.minRev < compactionRev { // Skip the watcher that failed to send compacted watch response due to w.ch is full. @@ -484,13 +491,7 @@ func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) { // watchers that watch on the key of the event. func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { victim := make(watcherBatch) - for w, eb := range newWatcherBatch(&s.synced, evs) { - if eb.revs != 1 { - s.store.lg.Panic( - "unexpected multiple revisions in watch notification", - zap.Int("number-of-revisions", eb.revs), - ) - } + for w, eb := range newWatcherBatch(&s.synced, evs, s.watchBatchMaxSize) { if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) { pendingEventsGauge.Add(float64(len(eb.evs))) } else { diff --git a/server/storage/mvcc/watchable_store_bench_test.go b/server/storage/mvcc/watchable_store_bench_test.go index c8990576b30..07bde585944 100644 --- a/server/storage/mvcc/watchable_store_bench_test.go +++ b/server/storage/mvcc/watchable_store_bench_test.go @@ -27,7 +27,7 @@ import ( func BenchmarkWatchableStorePut(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, be) // arbitrary number of bytes @@ -47,7 +47,7 @@ func BenchmarkWatchableStorePut(b *testing.B) { // some synchronization operations, such as mutex locking. func BenchmarkWatchableStoreTxnPut(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, be) // arbitrary number of bytes @@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) { func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { be, _ := betesting.NewDefaultTmpBackend(b) - s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, be) k := []byte("testkey") @@ -122,7 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { // we should put to simulate the real-world use cases. func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(ws, be) @@ -164,7 +164,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, be) diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index a418c6c78fe..28ea6babfde 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -33,7 +33,7 @@ import ( func TestWatch(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -52,7 +52,7 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -81,7 +81,7 @@ func TestCancelUnsynced(t *testing.T) { // because newWatchableStore automatically calls syncWatchers // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) // Put a key so that we can spawn watchers on that key. @@ -125,7 +125,7 @@ func TestCancelUnsynced(t *testing.T) { // and moves these watchers to synced. func TestSyncWatchers(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -271,7 +271,7 @@ func TestRangeEvents(t *testing.T) { // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -309,7 +309,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) lg := zaptest.NewLogger(t) - s := New(lg, b, &lease.FakeLessor{}, StoreConfig{}) + s := New(lg, b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer func() { cleanup(s, b) @@ -363,7 +363,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -402,7 +402,7 @@ func TestWatchRestore(t *testing.T) { test := func(delay time.Duration) func(t *testing.T) { return func(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -448,11 +448,11 @@ func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) { // 5. choose the watcher from step 1, without panic func TestWatchRestoreSyncedWatcher(t *testing.T) { b1, _ := betesting.NewDefaultTmpBackend(t) - s1 := New(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, StoreConfig{}) + s1 := New(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s1, b1) b2, _ := betesting.NewDefaultTmpBackend(t) - s2 := New(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, StoreConfig{}) + s2 := New(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s2, b2) testKey, testValue := []byte("foo"), []byte("bar") @@ -503,49 +503,69 @@ func TestWatchBatchUnsynced(t *testing.T) { tcs := []struct { name string revisions int - watchBatchMaxRevs int + eventSize int + watchBatchMaxSize int eventsPerRevision int expectRevisionBatches [][]int64 }{ { - name: "3 revisions, 4 revs per batch, 1 events per revision", - revisions: 12, - watchBatchMaxRevs: 4, + name: "Fits into a single batch", + revisions: 10, + eventSize: 100, + watchBatchMaxSize: 1000, eventsPerRevision: 1, expectRevisionBatches: [][]int64{ - {2, 3, 4, 5}, - {6, 7, 8, 9}, - {10, 11, 12, 13}, + {2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, }, }, { - name: "3 revisions, 4 revs per batch, 3 events per revision", - revisions: 12, - watchBatchMaxRevs: 4, + name: "Spills to second batch", + revisions: 15, + eventSize: 100, + watchBatchMaxSize: 1000, + eventsPerRevision: 1, + expectRevisionBatches: [][]int64{ + {2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + {12, 13, 14, 15, 16}, + }, + }, + { + name: "Spills to second batch, but maintains revision pairs", + revisions: 8, + eventSize: 100, + watchBatchMaxSize: 1000, + eventsPerRevision: 2, + expectRevisionBatches: [][]int64{ + {2, 2, 3, 3, 4, 4, 5, 5, 6, 6}, + {7, 7, 8, 8, 9, 9}, + }, + }, + { + name: "Spills to second batch, but maintains revision triples", + revisions: 6, + eventSize: 100, + watchBatchMaxSize: 1000, eventsPerRevision: 3, expectRevisionBatches: [][]int64{ {2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5}, - {6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9}, - {10, 10, 10, 11, 11, 11, 12, 12, 12, 13, 13, 13}, + {6, 6, 6, 7, 7, 7}, }, }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - oldMaxRevs := watchBatchMaxRevs - defer func() { - watchBatchMaxRevs = oldMaxRevs - cleanup(s, b) - }() - watchBatchMaxRevs = tc.watchBatchMaxRevs + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{ + WatchBatchMaxSize: tc.watchBatchMaxSize, + }) - v := []byte("foo") + k := []byte("k") + eventProtoOverhead := 13 + v := make([]byte, tc.eventSize-eventProtoOverhead) for i := 0; i < tc.revisions; i++ { txn := s.Write(traceutil.TODO()) for j := 0; j < tc.eventsPerRevision; j++ { - txn.Put(v, v, lease.NoLease) + txn.Put(k, v, lease.NoLease) } txn.End() } @@ -553,7 +573,7 @@ func TestWatchBatchUnsynced(t *testing.T) { w := s.NewWatchStream() defer w.Close() - w.Watch(0, v, nil, 1) + w.Watch(0, k, nil, 1) var revisionBatches [][]int64 eventCount := 0 for eventCount < tc.revisions*tc.eventsPerRevision { @@ -561,6 +581,7 @@ func TestWatchBatchUnsynced(t *testing.T) { for _, e := range (<-w.Chan()).Events { revisions = append(revisions, e.Kv.ModRevision) eventCount++ + assert.Equal(t, tc.eventSize, e.Size()) } revisionBatches = append(revisionBatches, revisions) } @@ -655,7 +676,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) { wg.add(w) } - gwe := newWatcherBatch(&wg, tt.evs) + gwe := newWatcherBatch(&wg, tt.evs, 0) if len(gwe) != len(tt.wwe) { t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe)) } @@ -677,7 +698,7 @@ func TestWatchVictims(t *testing.T) { oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer func() { cleanup(s, b) @@ -754,7 +775,7 @@ func TestWatchVictims(t *testing.T) { // canceling its watches. func TestStressWatchCancelClose(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey, testValue := []byte("foo"), []byte("bar") diff --git a/server/storage/mvcc/watcher_bench_test.go b/server/storage/mvcc/watcher_bench_test.go index 3d0dccea342..170075ab2a9 100644 --- a/server/storage/mvcc/watcher_bench_test.go +++ b/server/storage/mvcc/watcher_bench_test.go @@ -26,7 +26,7 @@ import ( func BenchmarkKVWatcherMemoryUsage(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - watchable := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + watchable := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(watchable, be) diff --git a/server/storage/mvcc/watcher_group.go b/server/storage/mvcc/watcher_group.go index c9db0e2bd9b..6db755a497e 100644 --- a/server/storage/mvcc/watcher_group.go +++ b/server/storage/mvcc/watcher_group.go @@ -22,53 +22,55 @@ import ( "go.etcd.io/etcd/pkg/v3/adt" ) -// watchBatchMaxRevs is the maximum distinct revisions that -// may be sent to an unsynced watcher at a time. Declared as -// var instead of const for testing purposes. -var watchBatchMaxRevs = 1000 +func newEventBatch(watchBatchMaxSize int) *eventBatch { + return &eventBatch{ + watchBatchMaxSize: watchBatchMaxSize, + } +} type eventBatch struct { // evs is a batch of revision-ordered events evs []mvccpb.Event - // revs is the minimum unique revisions observed for this batch - revs int + // evsSize is total size of events in the batch. + evsSize int // moreRev is first revision with more events following this batch moreRev int64 + // watchBatchMaxSize is maximum size of batch + watchBatchMaxSize int } func (eb *eventBatch) add(ev mvccpb.Event) { - if eb.revs > watchBatchMaxRevs { - // maxed out batch size - return - } - if len(eb.evs) == 0 { - // base case - eb.revs = 1 + eb.evsSize = ev.Size() eb.evs = append(eb.evs, ev) return } - - // revision accounting ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision evRev := ev.Kv.ModRevision - if evRev > ebRev { - eb.revs++ - if eb.revs > watchBatchMaxRevs { - eb.moreRev = evRev - return - } + if evRev == ebRev { + eb.evsSize += ev.Size() + eb.evs = append(eb.evs, ev) + return + } + if eb.moreRev != 0 { + return } + size := ev.Size() + if eb.watchBatchMaxSize != 0 && eb.evsSize+size > eb.watchBatchMaxSize { + eb.moreRev = ev.Kv.ModRevision + return + } + eb.evsSize += size eb.evs = append(eb.evs, ev) } type watcherBatch map[*watcher]*eventBatch -func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) { +func (wb watcherBatch) add(w *watcher, ev mvccpb.Event, watchBatchMaxSize int) { eb := wb[w] if eb == nil { - eb = &eventBatch{} + eb = newEventBatch(watchBatchMaxSize) wb[w] = eb } eb.add(ev) @@ -76,7 +78,7 @@ func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) { // newWatcherBatch maps watchers to their matched events. It enables quick // events look up by watcher. -func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch { +func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event, watchBatchMaxSize int) watcherBatch { if len(wg.watchers) == 0 { return nil } @@ -86,7 +88,7 @@ func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch { for w := range wg.watcherSetByKey(string(ev.Kv.Key)) { if ev.Kv.ModRevision >= w.minRev { // don't double notify - wb.add(w, ev) + wb.add(w, ev, watchBatchMaxSize) } } } diff --git a/server/storage/mvcc/watcher_test.go b/server/storage/mvcc/watcher_test.go index e774c70cfac..507a9079a17 100644 --- a/server/storage/mvcc/watcher_test.go +++ b/server/storage/mvcc/watcher_test.go @@ -35,7 +35,7 @@ import ( // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -85,7 +85,7 @@ func TestWatcherWatchID(t *testing.T) { func TestWatcherRequestsCustomID(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -122,7 +122,7 @@ func TestWatcherRequestsCustomID(t *testing.T) { // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -196,7 +196,7 @@ func TestWatcherWatchPrefix(t *testing.T) { // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -216,7 +216,7 @@ func TestWatcherWatchWrongRange(t *testing.T) { func TestWatchDeleteRange(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer func() { b.Close() @@ -256,7 +256,7 @@ func TestWatchDeleteRange(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -293,7 +293,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) { // report its correct progress. func TestWatcherRequestProgress(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) @@ -336,7 +336,7 @@ func TestWatcherRequestProgress(t *testing.T) { func TestWatcherRequestProgressAll(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) @@ -375,7 +375,7 @@ func TestWatcherRequestProgressAll(t *testing.T) { func TestWatcherWatchWithFilter(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() diff --git a/tests/integration/clientv3/watch_fragment_test.go b/tests/integration/clientv3/watch_fragment_test.go index 81450f5f9aa..a3c6f87d501 100644 --- a/tests/integration/clientv3/watch_fragment_test.go +++ b/tests/integration/clientv3/watch_fragment_test.go @@ -28,34 +28,18 @@ import ( integration2 "go.etcd.io/etcd/tests/v3/framework/integration" ) -// TestWatchFragmentDisable ensures that large watch -// response exceeding server-side request limit can -// arrive even without watch response fragmentation. func TestWatchFragmentDisable(t *testing.T) { testWatchFragment(t, false, false) } -// TestWatchFragmentDisableWithGRPCLimit verifies -// large watch response exceeding server-side request -// limit and client-side gRPC response receive limit -// cannot arrive without watch events fragmentation, -// because multiple events exceed client-side gRPC -// response receive limit. func TestWatchFragmentDisableWithGRPCLimit(t *testing.T) { testWatchFragment(t, false, true) } -// TestWatchFragmentEnable ensures that large watch -// response exceeding server-side request limit arrive -// with watch response fragmentation. func TestWatchFragmentEnable(t *testing.T) { testWatchFragment(t, true, false) } -// TestWatchFragmentEnableWithGRPCLimit verifies -// large watch response exceeding server-side request -// limit and client-side gRPC response receive limit -// can arrive only when watch events are fragmented. func TestWatchFragmentEnableWithGRPCLimit(t *testing.T) { testWatchFragment(t, true, true) } @@ -99,29 +83,21 @@ func testWatchFragment(t *testing.T, fragment, exceedRecvLimit bool) { wch := cli.Watch(context.TODO(), "foo", opts...) // expect 10 MiB watch response - select { - case ws := <-wch: - // without fragment, should exceed gRPC client receive limit - if !fragment && exceedRecvLimit { - if len(ws.Events) != 0 { - t.Fatalf("expected 0 events with watch fragmentation, got %d", len(ws.Events)) + eventCount := 0 + for eventCount < 10 { + select { + case ws := <-wch: + // still expect merged watch events + if ws.Err() != nil { + t.Fatalf("unexpected error %v", ws.Err()) } - exp := "code = ResourceExhausted desc = grpc: received message larger than max (" - if !strings.Contains(ws.Err().Error(), exp) { - t.Fatalf("expected 'ResourceExhausted' error, got %v", ws.Err()) - } - return - } + eventCount += len(ws.Events) - // still expect merged watch events - if len(ws.Events) != 10 { - t.Fatalf("expected 10 events with watch fragmentation, got %d", len(ws.Events)) - } - if ws.Err() != nil { - t.Fatalf("unexpected error %v", ws.Err()) + case <-time.After(testutil.RequestTimeout): + t.Fatalf("took too long to receive events") } - - case <-time.After(testutil.RequestTimeout): - t.Fatalf("took too long to receive events") + } + if eventCount != 10 { + t.Fatalf("expected 10 events with watch fragmentation, got %d", eventCount) } }