From f89286fcb3e82df491fdfd76ae340fa9d5154b3b Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Wed, 18 Dec 2024 15:47:40 +0100 Subject: [PATCH] Marco's suggestions --- go.mod | 19 ++++--- go.sum | 29 ++++------ pkg/kine/server/maintenance.go | 6 +- pkg/kine/server/server.go | 8 +-- pkg/kine/server/types.go | 10 +--- pkg/kine/server/watch.go | 101 ++++++++++++++++++++------------- pkg/kine/sqllog/sqllog.go | 60 +++++++++----------- 7 files changed, 116 insertions(+), 117 deletions(-) diff --git a/go.mod b/go.mod index 67faf97f..52ca0cc0 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.22.10 require ( github.com/canonical/go-dqlite/v2 v2.0.0 github.com/mattn/go-sqlite3 v1.14.22 - github.com/onsi/gomega v1.27.10 + github.com/onsi/gomega v1.33.1 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 github.com/sirupsen/logrus v1.9.3 @@ -23,11 +23,9 @@ require ( go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 - golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d - golang.org/x/sys v0.26.0 + golang.org/x/sys v0.28.0 google.golang.org/grpc v1.67.1 gopkg.in/yaml.v2 v2.4.0 - k8s.io/apimachinery v0.31.3 ) require ( @@ -37,6 +35,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -45,6 +44,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.1.2 // indirect github.com/google/go-cmp v0.6.0 // indirect + github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af // indirect github.com/google/renameio v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.1 // indirect @@ -57,6 +57,8 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/onsi/ginkgo/v2 v2.19.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/common v0.50.0 // indirect github.com/prometheus/procfs v0.13.0 // indirect @@ -73,18 +75,17 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.28.0 // indirect - golang.org/x/net v0.30.0 // indirect + golang.org/x/crypto v0.30.0 // indirect + golang.org/x/net v0.32.0 // indirect golang.org/x/sync v0.10.0 // indirect - golang.org/x/text v0.19.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.5.0 // indirect + golang.org/x/tools v0.28.0 // indirect google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect google.golang.org/protobuf v1.35.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/klog/v2 v2.130.1 // indirect - k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) diff --git a/go.sum b/go.sum index 00424172..5a52c69f 100644 --- a/go.sum +++ b/go.sum @@ -42,7 +42,6 @@ github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ4 github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -111,8 +110,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= -github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= -github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= +github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/peterh/liner v1.2.2/go.mod h1:xFwJyiKIXJZUKItq5dGHZSTBRAuG/CpeNpWLyiNRNwI= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -208,11 +207,9 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= -golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= +golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d h1:0olWaB5pg3+oychR51GUVCEsGkeCU/2JxjBgIo4f3M0= -golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -231,8 +228,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= -golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= +golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -257,14 +254,14 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -321,11 +318,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -k8s.io/apimachinery v0.31.3 h1:6l0WhcYgasZ/wk9ktLq5vLaoXJJr5ts6lkaQzgeYPq4= -k8s.io/apimachinery v0.31.3/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= -k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= -k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= -k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/pkg/kine/server/maintenance.go b/pkg/kine/server/maintenance.go index 5e79cd9e..cc6a26b5 100644 --- a/pkg/kine/server/maintenance.go +++ b/pkg/kine/server/maintenance.go @@ -9,6 +9,10 @@ import ( var _ etcdserverpb.MaintenanceServer = (*KVServerBridge)(nil) +// The emulated etcd version is returned on a call to the status endpoint. The version 3.5.13, indicates support for the watch progress notifications. +// See: https://github.com/kubernetes/kubernetes/blob/beb696c2c9467dbc44cbaf35c5a4a3daf0321db3/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go#L157 +const emulatedEtcdVersion = "3.5.13" + func (s *KVServerBridge) Alarm(context.Context, *etcdserverpb.AlarmRequest) (*etcdserverpb.AlarmResponse, error) { return nil, fmt.Errorf("alarm is not supported") } @@ -21,7 +25,7 @@ func (s *KVServerBridge) Status(ctx context.Context, r *etcdserverpb.StatusReque return &etcdserverpb.StatusResponse{ Header: &etcdserverpb.ResponseHeader{}, DbSize: size, - Version: s.emulatedEtcdVersion, + Version: emulatedEtcdVersion, }, nil } diff --git a/pkg/kine/server/server.go b/pkg/kine/server/server.go index 7881bb83..e2eed0a6 100644 --- a/pkg/kine/server/server.go +++ b/pkg/kine/server/server.go @@ -13,23 +13,17 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -// The emulated etcd version is returned on a call to the status endpoint. The version 3.5.13, indicates support for the watch progress notifications. -// See: https://github.com/kubernetes/kubernetes/blob/beb696c2c9467dbc44cbaf35c5a4a3daf0321db3/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go#L157 -const emulatedEtcdVersion = "3.5.13" - var ( _ etcdserverpb.KVServer = (*KVServerBridge)(nil) _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) ) type KVServerBridge struct { - limited *LimitedServer - emulatedEtcdVersion string + limited *LimitedServer } func New(backend Backend, notifyInterval time.Duration) *KVServerBridge { return &KVServerBridge{ - emulatedEtcdVersion: emulatedEtcdVersion, limited: &LimitedServer{ backend: backend, notifyInterval: notifyInterval, diff --git a/pkg/kine/server/types.go b/pkg/kine/server/types.go index 82ee4aeb..dd9fc883 100644 --- a/pkg/kine/server/types.go +++ b/pkg/kine/server/types.go @@ -20,9 +20,10 @@ type Backend interface { List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, bool, error) - Watch(ctx context.Context, key string, revision int64) (WatchResult, error) + Watch(ctx context.Context, key string, revision int64) (<-chan []*Event, error) DbSize(ctx context.Context) (int64, error) CurrentRevision(ctx context.Context) (int64, error) + GetCompactRevision(ctx context.Context) (int64, int64, error) DoCompact(ctx context.Context) error Close() error } @@ -41,10 +42,3 @@ type Event struct { KV *KeyValue PrevKV *KeyValue } - -type WatchResult struct { - CurrentRevision int64 - CompactRevision int64 - Events <-chan []*Event - Errorc <-chan error -} diff --git a/pkg/kine/server/watch.go b/pkg/kine/server/watch.go index 559dfe26..761fe653 100644 --- a/pkg/kine/server/watch.go +++ b/pkg/kine/server/watch.go @@ -10,8 +10,6 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" - "golang.org/x/exp/rand" - "k8s.io/apimachinery/pkg/util/wait" ) var ( @@ -21,15 +19,6 @@ var ( // explicit interface check var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) -// getProgressReportInterval returns the configured progress report interval, with some jitter -func (s *KVServerBridge) getProgressReportInterval() time.Duration { - // add rand(1/10*notifyInterval) as jitter so that kine will not - // send progress notifications to watchers at the same time even when watchers - // are created at the same time. - jitter := time.Duration(rand.Int63n(int64(s.limited.notifyInterval) / 10)) - return s.limited.notifyInterval + jitter -} - func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { w := watcher{ server: ws, @@ -39,7 +28,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { } defer w.Close() - go wait.PollInfiniteWithContext(ws.Context(), s.getProgressReportInterval(), w.ProgressIfSynced) + w.pollProgressNotify(ws.Context(), s.limited.notifyInterval) for { msg, err := ws.Recv() @@ -52,7 +41,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { } if cr := msg.GetCancelRequest(); cr != nil { logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) - w.Cancel(cr.WatchId, 0, 0, nil) + w.Cancel(cr.WatchId, nil, ws.Context()) } if pr := msg.GetProgressRequest(); pr != nil { w.Progress(ws.Context()) @@ -60,8 +49,38 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { } } +// pollProgressNotify periodically sends progress notifications to all watchers. +func (w *watcher) pollProgressNotify(ctx context.Context, interval time.Duration) { + ch := make(chan struct{}, 1) + + go func() { + defer close(ch) + + tick := time.NewTicker(interval) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + // Skip this tick if ProgressIfSynced is still running. + select { + case ch <- struct{}{}: + if err := w.ProgressIfSynced(ctx); err != nil { + logrus.Errorf("Failed to send progress notification: %v", err) + } + <-ch + default: + logrus.Warn("Skipping progress notification: still busy.") + } + } + } + }() +} + type watcher struct { - sync.RWMutex + sync.Mutex wg sync.WaitGroup backend Backend @@ -72,7 +91,7 @@ type watcher struct { func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) { if r.WatchId != clientv3.AutoWatchID { - logrus.Warnf("WATCH START id=%d ignoring request with client-provided id", r.WatchId) + logrus.Errorf("WATCH START id=%d ignoring request with client-provided id", r.WatchId) return } @@ -103,15 +122,14 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) Created: true, WatchId: id, }); err != nil { - w.Cancel(id, 0, 0, err) + w.Cancel(id, err, ctx) return } - wr, err := w.backend.Watch(ctx, key, startRevision) - // the requested start revision is compacted. Pass the current and and compact - // revision to the client via the cancel response, along with the correct error message. - if err == ErrCompacted { - w.Cancel(id, wr.CurrentRevision, wr.CompactRevision, ErrCompacted) + watchCh, err := w.backend.Watch(ctx, key, startRevision) + if err != nil { + logrus.Errorf("Failed to start watch: %v", err) + w.Cancel(id, err, ctx) return } @@ -124,13 +142,13 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) // Wait for events or progress notifications select { - case events = <-wr.Events: + case events = <-watchCh: // We received events; batch any additional queued events reads++ inner := true for inner { select { - case e, ok := <-wr.Events: + case e, ok := <-watchCh: reads++ events = append(events, e...) if !ok { @@ -166,17 +184,11 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) } logrus.Tracef("WATCH SEND id=%d, key=%s, revision=%d, events=%d, size=%d, reads=%d", id, key, revision, len(wr.Events), wr.Size(), reads) if err := w.server.Send(wr); err != nil { - w.Cancel(id, 0, 0, err) + w.Cancel(id, err, ctx) } } } - // handle errors from the channel or gracefully cancel the watch - select { - case err := <-wr.Errorc: - w.Cancel(id, 0, 0, err) - default: - w.Cancel(id, 0, 0, nil) - } + logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key) }() } @@ -204,7 +216,7 @@ func toEvent(event *Event) *mvccpb.Event { return e } -func (w *watcher) Cancel(watchID int64, revision, compactRev int64, err error) { +func (w *watcher) Cancel(watchID int64, err error, ctx context.Context) { w.Lock() if progressCh, ok := w.progress[watchID]; ok { close(progressCh) @@ -216,9 +228,20 @@ func (w *watcher) Cancel(watchID int64, revision, compactRev int64, err error) { } w.Unlock() + revision := int64(0) + compactRev := int64(0) reason := "" if err != nil { reason = err.Error() + if err == ErrCompacted { + // the requested start revision is compacted. Pass the current and and compact + // revision to the client via the cancel response, along with the correct error message. + compactRev, revision, err = w.backend.GetCompactRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get compact and current revision for cancel response: %v", err) + compactRev = 0 + } + } } logrus.Tracef("WATCH CANCEL id=%d, reason=%s, compactRev=%d", watchID, reason, compactRev) @@ -252,8 +275,8 @@ func (w *watcher) Close() { // Progress sends a progress report if all watchers are synced. // Ref: https://github.com/etcd-io/etcd/blob/v3.5.11/server/mvcc/watchable_store.go#L500-L504 func (w *watcher) Progress(ctx context.Context) { - w.RLock() - defer w.RUnlock() + w.Lock() + defer w.Unlock() logrus.Tracef("WATCH REQUEST PROGRESS") @@ -282,16 +305,16 @@ func (w *watcher) Progress(ctx context.Context) { } // ProgressIfSynced sends a progress report on any channels that are synced and blocked on the outer loop -func (w *watcher) ProgressIfSynced(ctx context.Context) (bool, error) { +func (w *watcher) ProgressIfSynced(ctx context.Context) error { logrus.Tracef("WATCH PROGRESS TICK") revision, err := w.backend.CurrentRevision(ctx) if err != nil { logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) - return false, nil + return err } - w.RLock() - defer w.RUnlock() + w.Lock() + defer w.Unlock() // Send revision to all synced channels for _, progressCh := range w.progress { @@ -300,5 +323,5 @@ func (w *watcher) ProgressIfSynced(ctx context.Context) (bool, error) { default: } } - return false, nil + return nil } diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index fa020aa7..6535a1b3 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -72,7 +72,6 @@ type SQLLog struct { d Dialect broadcaster broadcaster.Broadcaster[[]*server.Event] notify chan int64 - currentRev int64 wg sync.WaitGroup } @@ -194,7 +193,7 @@ func (s *SQLLog) DoCompact(ctx context.Context) (err error) { // small batches. Given that this logic runs every second, // on regime it should take usually just a couple batches // to keep the pace. - start, target, err := s.d.GetCompactRevision(ctx) + start, target, err := s.GetCompactRevision(ctx) if err != nil { return err } @@ -219,12 +218,13 @@ func (s *SQLLog) DoCompact(ctx context.Context) (err error) { } func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) { - if s.currentRev != 0 { - return s.currentRev, nil - } return s.d.CurrentRevision(ctx) } +func (s *SQLLog) GetCompactRevision(ctx context.Context) (int64, int64, error) { + return s.d.GetCompactRevision(ctx) +} + func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) { var err error ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.After", otelName)) @@ -238,7 +238,7 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64 attribute.Int64("limit", limit), ) - compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.GetCompactRevision(ctx) if err != nil { return 0, nil, err } @@ -275,7 +275,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis attribute.Int64("revision", revision), ) - compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.GetCompactRevision(ctx) if err != nil { return 0, nil, err } @@ -347,13 +347,13 @@ func (s *SQLLog) ttl(ctx context.Context) { go run(ctx, key, revision, time.Duration(lease)*time.Second) } - wr, err := s.Watch(ctx, "/", startRevision) + watchCh, err := s.Watch(ctx, "/", startRevision) if err != nil { logrus.Errorf("failed to watch events for ttl: %v", err) return } - for events := range wr.Events { + for events := range watchCh { for _, event := range events { if event.KV.Lease > 0 { go run(ctx, event.KV.Key, event.KV.ModRevision, time.Duration(event.KV.Lease)*time.Second) @@ -363,7 +363,7 @@ func (s *SQLLog) ttl(ctx context.Context) { }() } -func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (server.WatchResult, error) { +func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<-chan []*server.Event, error) { ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Watch", otelName)) defer span.End() span.SetAttributes( @@ -371,16 +371,12 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se attribute.Int64("startRevision", startRevision), ) - res := make(chan []*server.Event, 100) - errc := make(chan error, 1) - wr := server.WatchResult{Events: res, Errorc: errc} - // starting watching right away so we don't miss anything ctx, cancel := context.WithCancel(ctx) values, err := s.broadcaster.Subscribe(ctx) if err != nil { cancel() - return wr, err + return nil, err } if startRevision > 0 { @@ -390,23 +386,19 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se initialRevision, initialEvents, err := s.After(ctx, key, startRevision, 0) if err != nil { if !errors.Is(err, context.Canceled) { - // In case the key has been compacted we need to inform the api-server about the current-revision and the compact revision in the cancel watch response to the api-server. - if err == server.ErrCompacted { - logrus.Errorf("Failed to list %s for revision %d: %v", key, startRevision, err) - span.RecordError(err) - compact, _, _ := s.d.GetCompactRevision(ctx) - wr.CompactRevision = compact - wr.CurrentRevision = initialRevision - } else { - // If the After query fails because k8s-dqlite restarts we cancel the watch and return an error message that the api-server understands: server.ErrGRPCUnhealthy - // See fix: https://github.com/k3s-io/kine/pull/373 - errc <- server.ErrGRPCUnhealthy + span.RecordError(err) + logrus.Errorf("Failed to list %s for revision %d: %v", key, startRevision, err) + // We return an error message that the api-server understands: server.ErrGRPCUnhealthy + if err != server.ErrCompacted { + err = server.ErrGRPCUnhealthy } } // Cancel the watcher by cancelling the context of its subscription to the broadcaster cancel() + return nil, err } + res := make(chan []*server.Event, 100) if len(initialEvents) > 0 { res <- initialEvents } @@ -428,7 +420,7 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se } }() - return wr, nil + return res, nil } func filterEvents(events []*server.Event, key string, startRevision int64) []*server.Event { @@ -453,7 +445,7 @@ func (s *SQLLog) startWatch(ctx context.Context) (chan []*server.Event, error) { return nil, err } - pollStart, _, err := s.d.GetCompactRevision(ctx) + pollStart, _, err := s.GetCompactRevision(ctx) if err != nil { return nil, err } @@ -489,8 +481,8 @@ func (s *SQLLog) startWatch(ctx context.Context) (chan []*server.Event, error) { } func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStart int64) { - s.currentRev = pollStart var ( + last = pollStart skip int64 skipTime time.Time waitForMore = true @@ -506,7 +498,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar case <-ctx.Done(): return case check := <-s.notify: - if check <= s.currentRev { + if check <= last { continue } case <-wait.C: @@ -516,7 +508,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar watchCtx, cancel := context.WithTimeout(ctx, s.d.GetWatchQueryTimeout()) defer cancel() - rows, err := s.d.After(watchCtx, s.currentRev, pollBatchSize) + rows, err := s.d.After(watchCtx, last, pollBatchSize) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { logrus.Errorf("fail to list latest changes: %v", err) @@ -536,7 +528,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar waitForMore = len(events) < 100 - rev := s.currentRev + rev := last var ( sequential []*server.Event saveLast bool @@ -586,7 +578,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar } if saveLast { - s.currentRev = rev + last = rev if len(sequential) > 0 { result <- sequential } @@ -611,7 +603,7 @@ func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision in attribute.Int64("revision", revision), ) - compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.GetCompactRevision(ctx) if err != nil { return 0, 0, err }