Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use size of events to batch Watch responses #18975

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,6 @@ func (c *ServerConfig) BootstrapTimeoutEffective() time.Duration {

func (c *ServerConfig) BackendPath() string { return datadir.ToBackendFileName(c.DataDir) }

func (c *ServerConfig) MaxRequestBytesWithOverhead() uint {
return c.MaxRequestBytes + grpcOverheadBytes
func (c *ServerConfig) MaxRequestBytesWithOverhead() int {
return int(c.MaxRequestBytes) + grpcOverheadBytes
}
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
opts = append(opts, grpc.ChainUnaryInterceptor(chainUnaryInterceptors...))
opts = append(opts, grpc.ChainStreamInterceptor(chainStreamInterceptors...))

opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytesWithOverhead())))
opts = append(opts, grpc.MaxRecvMsgSize(s.Cfg.MaxRequestBytesWithOverhead()))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))

Expand Down
10 changes: 5 additions & 5 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type watchServer struct {
clusterID int64
memberID int64

maxRequestBytes uint
maxRequestBytes int

sg apply.RaftStatusGetter
watchable mvcc.WatchableKV
Expand Down Expand Up @@ -126,7 +126,7 @@ type serverWatchStream struct {
clusterID int64
memberID int64

maxRequestBytes uint
maxRequestBytes int

sg apply.RaftStatusGetter
watchable mvcc.WatchableKV
Expand Down Expand Up @@ -544,12 +544,12 @@ func IsCreateEvent(e mvccpb.Event) bool {

func sendFragments(
wr *pb.WatchResponse,
maxRequestBytes uint,
maxRequestBytes int,
sendFunc func(*pb.WatchResponse) error,
) error {
// no need to fragment if total request size is smaller
// than max request limit or response contains only one event
if uint(wr.Size()) < maxRequestBytes || len(wr.Events) < 2 {
if wr.Size() < maxRequestBytes || len(wr.Events) < 2 {
return sendFunc(wr)
}

Expand All @@ -562,7 +562,7 @@ func sendFragments(
cur := ow
for _, ev := range wr.Events[idx:] {
cur.Events = append(cur.Events, ev)
if len(cur.Events) > 1 && uint(cur.Size()) >= maxRequestBytes {
if len(cur.Events) > 1 && cur.Size() >= maxRequestBytes {
cur.Events = cur.Events[:len(cur.Events)-1]
break
}
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3rpc/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func TestSendFragment(t *testing.T) {
tt := []struct {
wr *pb.WatchResponse
maxRequestBytes uint
maxRequestBytes int
fragments int
werr error
}{
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func TestHashKVHandler(t *testing.T) {
etcdSrv.cluster.SetID(types.ID(localClusterID), types.ID(localClusterID))
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
defer func() {
assert.NoError(t, etcdSrv.kv.Close())
}()
Expand Down
9 changes: 6 additions & 3 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
return nil, err
}

mvccStoreConfig := mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
mvccStoreConfig := mvcc.WatchableStoreConfig{
StoreConfig: mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
},
WatchBatchMaxSize: cfg.MaxRequestBytesWithOverhead(),
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())
Expand Down
8 changes: 4 additions & 4 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func TestSnapshotDisk(t *testing.T) {
v2store: st,
consistIndex: cindex.NewConsistentIndex(be),
}
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
defer func() {
assert.NoError(t, srv.kv.Close())
}()
Expand Down Expand Up @@ -703,7 +703,7 @@ func TestSnapshotMemory(t *testing.T) {
v2store: st,
consistIndex: cindex.NewConsistentIndex(be),
}
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
defer func() {
assert.NoError(t, srv.kv.Close())
}()
Expand Down Expand Up @@ -775,7 +775,7 @@ func TestSnapshotOrdering(t *testing.T) {
beHooks: serverstorage.NewBackendHooks(lg, ci),
}

s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
s.be = be

s.start()
Expand Down Expand Up @@ -869,7 +869,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 1),
}

s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
s.be = be

s.start()
Expand Down
2 changes: 1 addition & 1 deletion server/storage/mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func TestKVSnapshot(t *testing.T) {

func TestWatchableKVWatch(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, b)

w := s.NewWatchStream()
Expand Down
33 changes: 17 additions & 16 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type watchable interface {

type watchableStore struct {
*store
watchBatchMaxSize int

// mu protects watcher groups and batches. It should never be locked
// before locking store.mu to avoid deadlock.
Expand All @@ -76,24 +77,30 @@ var _ WatchableKV = (*watchableStore)(nil)
// cancel operations.
type cancelFunc func()

func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg WatchableStoreConfig) *watchableStore {
s := newWatchableStore(lg, b, le, cfg)
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()
return s
}

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
type WatchableStoreConfig struct {
StoreConfig
WatchBatchMaxSize int
}

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg WatchableStoreConfig) *watchableStore {
if lg == nil {
lg = zap.NewNop()
}
s := &watchableStore{
store: NewStore(lg, b, le, cfg),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
stopc: make(chan struct{}),
store: NewStore(lg, b, le, cfg.StoreConfig),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
stopc: make(chan struct{}),
watchBatchMaxSize: cfg.WatchBatchMaxSize,
}
s.store.ReadView = &readView{s}
s.store.WriteView = &writeView{s}
Expand Down Expand Up @@ -237,7 +244,7 @@ func (s *watchableStore) syncWatchersLoop() {

delayTicker.Reset(waitDuration)
// more work pending?
if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
if unsyncedWatchers != 0 && lastUnsyncedWatchers >= unsyncedWatchers {
// be fair to other store operations by yielding time taken
delayTicker.Reset(syncDuration)
}
Expand Down Expand Up @@ -361,7 +368,7 @@ func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event)
evs = rangeEventsWithReuse(s.store.lg, s.store.b, evs, minRev, curRev+1)

victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
wb := newWatcherBatch(wg, evs, s.watchBatchMaxSize)
for w := range wg.watchers {
if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
Expand Down Expand Up @@ -484,13 +491,7 @@ func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) {
// watchers that watch on the key of the event.
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
victim := make(watcherBatch)
for w, eb := range newWatcherBatch(&s.synced, evs) {
if eb.revs != 1 {
s.store.lg.Panic(
"unexpected multiple revisions in watch notification",
zap.Int("number-of-revisions", eb.revs),
)
}
for w, eb := range newWatcherBatch(&s.synced, evs, s.watchBatchMaxSize) {
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
} else {
Expand Down
10 changes: 5 additions & 5 deletions server/storage/mvcc/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func BenchmarkWatchableStorePut(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, be)

// arbitrary number of bytes
Expand All @@ -47,7 +47,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
// some synchronization operations, such as mutex locking.
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, be)

// arbitrary number of bytes
Expand Down Expand Up @@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {

func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, be)

k := []byte("testkey")
Expand Down Expand Up @@ -122,7 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
// we should put to simulate the real-world use cases.
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})

defer cleanup(ws, be)

Expand Down Expand Up @@ -164,7 +164,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {

func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})

defer cleanup(s, be)

Expand Down
Loading
Loading