From d9b5f4eae5d5f889ac68cac468f3c9684acb8824 Mon Sep 17 00:00:00 2001 From: Songlin Yang Date: Tue, 12 Dec 2023 01:21:02 +0800 Subject: [PATCH] =?UTF-8?q?chore(*):=20enable=20contextcheck=E3=80=81noctx?= =?UTF-8?q?=E3=80=81gomoddirectives=20linter=20(#405)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit enable linters: - contextcheck - noctx - gomoddirectives --------- Signed-off-by: Soren Yang --- .golangci.yaml | 17 ++- cmd/main.go | 29 ++-- common/container/container.go | 29 ++-- common/metrics/metrics.go | 27 ++-- common/pprof.go | 31 +++-- coordinator/impl/coordinator.go | 10 +- coordinator/impl/node_controller.go | 72 ++++++---- coordinator/impl/shard_controller.go | 197 ++++++++++++++------------- oxia/cache.go | 10 +- oxia/cache_test.go | 4 + oxia/internal/shard_manager.go | 10 +- oxia/notifications.go | 38 ++++-- oxia/sessions.go | 82 ++++++----- perf/perf.go | 2 +- server/follower_controller.go | 48 ++++--- server/follower_cursor.go | 33 +++-- server/kv/notifications_trimmer.go | 14 +- server/leader_controller.go | 142 ++++++++++--------- server/util/stream_reader.go | 6 +- server/wal/wal_impl.go | 14 +- server/wal/wal_trimmer.go | 12 +- 21 files changed, 482 insertions(+), 345 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index e9d8b807..a88aab81 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -31,18 +31,18 @@ linters: - asasalint - asciicheck - bodyclose - # - contextcheck + - contextcheck - errname - errorlint - exportloopref # - gocritic - godot - # - gomoddirectives + - gomoddirectives - gosec - nakedret - nilerr - nilnil - # - noctx + - noctx - nolintlint - prealloc - predeclared @@ -80,4 +80,13 @@ issues: - path: _test\.go linters: - unparam - - gosec \ No newline at end of file + - gosec + - contextcheck + - noctx + - path: \.go + linters: + - contextcheck + text: "Non-inherited new context" + - linters: + - gomoddirectives + text: "github.com/samber/slog-common" \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go index a3a08269..0dc8effa 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -15,6 +15,7 @@ package main import ( + "context" "fmt" "os" @@ -73,16 +74,20 @@ func configureLogLevel(cmd *cobra.Command, args []string) error { } func main() { - common.DoWithLabels(map[string]string{ - "oxia": "main", - }, func() { - if _, err := maxprocs.Set(); err != nil { - _, _ = fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - if err := rootCmd.Execute(); err != nil { - _, _ = fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - }) + common.DoWithLabels( + context.Background(), + map[string]string{ + "oxia": "main", + }, + func() { + if _, err := maxprocs.Set(); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + if err := rootCmd.Execute(); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + }, + ) } diff --git a/common/container/container.go b/common/container/container.go index 1bff82e9..fe5db517 100644 --- a/common/container/container.go +++ b/common/container/container.go @@ -15,6 +15,7 @@ package container import ( + "context" "io" "log/slog" "net" @@ -81,18 +82,22 @@ func newDefaultGrpcProvider(name, bindAddress string, registerFunc func(grpc.Ser slog.String("bindAddress", listener.Addr().String()), ) - go common.DoWithLabels(map[string]string{ - "oxia": name, - "bind": listener.Addr().String(), - }, func() { - if err := c.server.Serve(listener); err != nil { - c.log.Error( - "Failed to start serving", - slog.Any("error", err), - ) - os.Exit(1) - } - }) + go common.DoWithLabels( + context.Background(), + map[string]string{ + "oxia": name, + "bind": listener.Addr().String(), + }, + func() { + if err := c.server.Serve(listener); err != nil { + c.log.Error( + "Failed to start serving", + slog.Any("error", err), + ) + os.Exit(1) + } + }, + ) c.log.Info("Started Grpc server") diff --git a/common/metrics/metrics.go b/common/metrics/metrics.go index 68179195..7560a8ab 100644 --- a/common/metrics/metrics.go +++ b/common/metrics/metrics.go @@ -15,6 +15,7 @@ package metrics import ( + "context" "fmt" "io" "log/slog" @@ -110,17 +111,21 @@ func Start(bindAddress string) (*PrometheusMetrics, error) { slog.Info(fmt.Sprintf("Serving Prometheus metrics at http://localhost:%d/metrics", p.port)) - go common.DoWithLabels(map[string]string{ - "oxia": "metrics", - }, func() { - if err = p.server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { - slog.Error( - "Failed to serve metrics", - slog.Any("error", err), - ) - os.Exit(1) - } - }) + go common.DoWithLabels( + context.Background(), + map[string]string{ + "oxia": "metrics", + }, + func() { + if err = p.server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Error( + "Failed to serve metrics", + slog.Any("error", err), + ) + os.Exit(1) + } + }, + ) return p, nil } diff --git a/common/pprof.go b/common/pprof.go index 17cb787f..6f7d8343 100644 --- a/common/pprof.go +++ b/common/pprof.go @@ -33,14 +33,14 @@ var ( // DoWithLabels attaches the labels to the current go-routine Pprof context, // for the duration of the call to f. -func DoWithLabels(labels map[string]string, f func()) { +func DoWithLabels(ctx context.Context, labels map[string]string, f func()) { l := make([]string, 0, len(labels)*2) for k, v := range labels { l = append(l, k, v) } pprof.Do( - context.Background(), + ctx, pprof.Labels(l...), func(_ context.Context) { f() @@ -68,18 +68,21 @@ func RunProfiling() io.Closer { slog.Info(fmt.Sprintf(" use `go tool pprof http://%s/debug/pprof/heap` to get inuse_space file", s.Addr)) slog.Info("") - go DoWithLabels(map[string]string{ - "oxia": "pprof", - }, func() { - if err := s.ListenAndServe(); err != nil && err != http.ErrServerClosed { - slog.Error( - "Unable to start debug profiling server", - slog.Any("error", err), - slog.String("component", "pprof"), - ) - os.Exit(1) - } - }) + go DoWithLabels( + context.Background(), + map[string]string{ + "oxia": "pprof", + }, + func() { + if err := s.ListenAndServe(); err != nil && err != http.ErrServerClosed { + slog.Error( + "Unable to start debug profiling server", + slog.Any("error", err), + slog.String("component", "pprof"), + ) + os.Exit(1) + } + }) return s } diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index c8dfd800..32f8ad9b 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -140,9 +140,13 @@ func NewCoordinator(metadataProvider MetadataProvider, } } - go common.DoWithLabels(map[string]string{ - "oxia": "coordinator-wait-for-events", - }, c.waitForExternalEvents) + go common.DoWithLabels( + c.ctx, + map[string]string{ + "oxia": "coordinator-wait-for-events", + }, + c.waitForExternalEvents, + ) return c, nil } diff --git a/coordinator/impl/node_controller.go b/coordinator/impl/node_controller.go index e1c8e872..969e6e1c 100644 --- a/coordinator/impl/node_controller.go +++ b/coordinator/impl/node_controller.go @@ -111,15 +111,23 @@ func newNodeController(addr model.ServerAddress, return 0 }) - go common.DoWithLabels(map[string]string{ - "oxia": "node-controller", - "addr": nc.addr.Internal, - }, nc.healthCheckWithRetries) - - go common.DoWithLabels(map[string]string{ - "oxia": "node-controller-send-updates", - "addr": nc.addr.Internal, - }, nc.sendAssignmentsUpdatesWithRetries) + go common.DoWithLabels( + nc.ctx, + map[string]string{ + "oxia": "node-controller", + "addr": nc.addr.Internal, + }, + nc.healthCheckWithRetries, + ) + + go common.DoWithLabels( + nc.ctx, + map[string]string{ + "oxia": "node-controller-send-updates", + "addr": nc.addr.Internal, + }, + nc.sendAssignmentsUpdatesWithRetries, + ) return nc } @@ -163,30 +171,34 @@ func (n *nodeController) healthCheck(backoff backoff.BackOff) error { ctx, cancel := context.WithCancel(n.ctx) defer cancel() - go common.DoWithLabels(map[string]string{ - "oxia": "node-controller-health-check-ping", - "addr": n.addr.Internal, - }, func() { - ticker := time.NewTicker(healthCheckProbeInterval) - - for { - select { - case <-ticker.C: - pingCtx, pingCancel := context.WithTimeout(ctx, healthCheckProbeTimeout) - - res, err := health.Check(pingCtx, &grpc_health_v1.HealthCheckRequest{Service: ""}) - pingCancel() - if err2 := n.processHealthCheckResponse(res, err); err2 != nil { - n.log.Warn("Node stopped responding to ping") - cancel() + go common.DoWithLabels( + n.ctx, + map[string]string{ + "oxia": "node-controller-health-check-ping", + "addr": n.addr.Internal, + }, + func() { + ticker := time.NewTicker(healthCheckProbeInterval) + + for { + select { + case <-ticker.C: + pingCtx, pingCancel := context.WithTimeout(ctx, healthCheckProbeTimeout) + + res, err := health.Check(pingCtx, &grpc_health_v1.HealthCheckRequest{Service: ""}) + pingCancel() + if err2 := n.processHealthCheckResponse(res, err); err2 != nil { + n.log.Warn("Node stopped responding to ping") + cancel() + return + } + + case <-ctx.Done(): return } - - case <-ctx.Done(): - return } - } - }) + }, + ) watch, err := health.Watch(ctx, &grpc_health_v1.HealthCheckRequest{Service: ""}) if err != nil { diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 111ba11a..2fcf80a9 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -210,25 +210,29 @@ func (s *shardController) verifyCurrentEnsemble() bool { } func (s *shardController) electLeaderWithRetries() { - go common.DoWithLabels(map[string]string{ - "oxia": "shard-controller-leader-election", - "namespace": s.namespace, - "shard": fmt.Sprintf("%d", s.shard), - }, func() { - _ = backoff.RetryNotify(func() error { - s.Lock() - defer s.Unlock() - return s.electLeader() - }, common.NewBackOff(s.ctx), - func(err error, duration time.Duration) { - s.leaderElectionsFailed.Inc() - s.log.Warn( - "Leader election has failed, retrying later", - slog.Any("error", err), - slog.Duration("retry-after", duration), - ) - }) - }) + go common.DoWithLabels( + s.ctx, + map[string]string{ + "oxia": "shard-controller-leader-election", + "namespace": s.namespace, + "shard": fmt.Sprintf("%d", s.shard), + }, + func() { + _ = backoff.RetryNotify(func() error { + s.Lock() + defer s.Unlock() + return s.electLeader() + }, common.NewBackOff(s.ctx), + func(err error, duration time.Duration) { + s.leaderElectionsFailed.Inc() + s.log.Warn( + "Leader election has failed, retrying later", + slog.Any("error", err), + slog.Duration("retry-after", duration), + ) + }) + }, + ) } func (s *shardController) electLeader() error { @@ -361,37 +365,41 @@ func (s *shardController) keepFencingFollower(ctx context.Context, node model.Se slog.Any("follower", node), ) - go common.DoWithLabels(map[string]string{ - "oxia": "shard-controller-retry-failed-follower", - "shard": fmt.Sprintf("%d", s.shard), - "follower": node.Internal, - }, func() { - backOff := common.NewBackOffWithInitialInterval(ctx, 1*time.Second) - - _ = backoff.RetryNotify(func() error { - err := s.newTermAndAddFollower(ctx, node) - if status.Code(err) == common.CodeInvalidTerm { - // If we're receiving invalid term error, it would mean - // there's already a new term generated, and we don't have - // to keep trying with this old term + go common.DoWithLabels( + s.ctx, + map[string]string{ + "oxia": "shard-controller-retry-failed-follower", + "shard": fmt.Sprintf("%d", s.shard), + "follower": node.Internal, + }, + func() { + backOff := common.NewBackOffWithInitialInterval(ctx, 1*time.Second) + + _ = backoff.RetryNotify(func() error { + err := s.newTermAndAddFollower(ctx, node) + if status.Code(err) == common.CodeInvalidTerm { + // If we're receiving invalid term error, it would mean + // there's already a new term generated, and we don't have + // to keep trying with this old term + s.log.Warn( + "Failed to newTerm, invalid term. Stop trying", + slog.Any("follower", node), + slog.Int64("term", s.Term()), + ) + return nil + } + return err + }, backOff, func(err error, duration time.Duration) { s.log.Warn( - "Failed to newTerm, invalid term. Stop trying", + "Failed to newTerm, retrying later", + slog.Any("error", err), slog.Any("follower", node), slog.Int64("term", s.Term()), + slog.Duration("retry-after", duration), ) - return nil - } - return err - }, backOff, func(err error, duration time.Duration) { - s.log.Warn( - "Failed to newTerm, retrying later", - slog.Any("error", err), - slog.Any("follower", node), - slog.Int64("term", s.Term()), - slog.Duration("retry-after", duration), - ) - }) - }) + }) + }, + ) } func (s *shardController) newTermAndAddFollower(ctx context.Context, node model.ServerAddress) error { @@ -445,32 +453,35 @@ func (s *shardController) newTermQuorum() (map[model.ServerAddress]*proto.EntryI for _, sa := range fencingQuorum { // We need to save the address because it gets modified in the loop serverAddress := sa - go common.DoWithLabels(map[string]string{ - "oxia": "shard-controller-leader-election", - "shard": fmt.Sprintf("%d", s.shard), - "node": sa.Internal, - }, func() { - entryId, err := s.newTerm(ctx, serverAddress) - if err != nil { - s.log.Warn( - "Failed to newTerm node", - slog.Any("error", err), - slog.String("node", serverAddress.Internal), - ) - } else { - s.log.Info( - "Processed newTerm response", - slog.Any("server-address", serverAddress), - slog.Any("entry-id", entryId), - ) - } + go common.DoWithLabels( + s.ctx, + map[string]string{ + "oxia": "shard-controller-leader-election", + "shard": fmt.Sprintf("%d", s.shard), + "node": sa.Internal, + }, func() { + entryId, err := s.newTerm(ctx, serverAddress) + if err != nil { + s.log.Warn( + "Failed to newTerm node", + slog.Any("error", err), + slog.String("node", serverAddress.Internal), + ) + } else { + s.log.Info( + "Processed newTerm response", + slog.Any("server-address", serverAddress), + slog.Any("entry-id", entryId), + ) + } - ch <- struct { - model.ServerAddress - *proto.EntryId - error - }{serverAddress, entryId, err} - }) + ch <- struct { + model.ServerAddress + *proto.EntryId + error + }{serverAddress, entryId, err} + }, + ) } successResponses := 0 @@ -610,27 +621,31 @@ func (s *shardController) addFollower(leader model.ServerAddress, follower strin } func (s *shardController) DeleteShard() { - go common.DoWithLabels(map[string]string{ - "oxia": "shard-controller-delete-shard", - "namespace": s.namespace, - "shard": fmt.Sprintf("%d", s.shard), - }, func() { - s.Lock() - defer s.Unlock() - - s.log.Info("Deleting shard") - - _ = backoff.RetryNotify(s.deleteShard, common.NewBackOff(s.ctx), - func(err error, duration time.Duration) { - s.log.Warn( - "Delete shard failed, retrying later", - slog.Duration("retry-after", duration), - slog.Any("error", err), - ) - }) + go common.DoWithLabels( + s.ctx, + map[string]string{ + "oxia": "shard-controller-delete-shard", + "namespace": s.namespace, + "shard": fmt.Sprintf("%d", s.shard), + }, + func() { + s.Lock() + defer s.Unlock() - s.cancel() - }) + s.log.Info("Deleting shard") + + _ = backoff.RetryNotify(s.deleteShard, common.NewBackOff(s.ctx), + func(err error, duration time.Duration) { + s.log.Warn( + "Delete shard failed, retrying later", + slog.Duration("retry-after", duration), + slog.Any("error", err), + ) + }) + + s.cancel() + }, + ) } func (s *shardController) deleteShard() error { diff --git a/oxia/cache.go b/oxia/cache.go index f8ca7283..498e0da7 100644 --- a/oxia/cache.go +++ b/oxia/cache.go @@ -126,9 +126,13 @@ func newCacheManager(client SyncClient) (*cacheManager, error) { return nil, errors.Wrap(err, "failed to create notifications client") } - go common.DoWithLabels(map[string]string{ - "oxia": "cache-manager", - }, cm.run) + go common.DoWithLabels( + cm.ctx, + map[string]string{ + "oxia": "cache-manager", + }, + cm.run, + ) return cm, nil } diff --git a/oxia/cache_test.go b/oxia/cache_test.go index d11faf6d..61a91702 100644 --- a/oxia/cache_test.go +++ b/oxia/cache_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "os" "sync/atomic" "testing" @@ -38,6 +39,9 @@ var standalone *server.Standalone var serviceAddress string func TestMain(m *testing.M) { + // Disable zerolog ConsoleWriter to avoid DATA RACE at os.Stdout + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{}))) + dir, _ := os.MkdirTemp(os.TempDir(), "oxia-test-*") config := server.NewTestConfig(dir) standalone, _ = server.NewStandalone(config) diff --git a/oxia/internal/shard_manager.go b/oxia/internal/shard_manager.go index 6d87bcf1..90ea4bbe 100644 --- a/oxia/internal/shard_manager.go +++ b/oxia/internal/shard_manager.go @@ -84,9 +84,13 @@ func (s *shardManagerImpl) Close() error { func (s *shardManagerImpl) start() error { s.Lock() - go common.DoWithLabels(map[string]string{ - "oxia": "receive-shard-updates", - }, s.receiveWithRecovery) + go common.DoWithLabels( + s.ctx, + map[string]string{ + "oxia": "receive-shard-updates", + }, + s.receiveWithRecovery, + ) ctx, cancel := context.WithTimeout(s.ctx, s.requestTimeout) defer cancel() diff --git a/oxia/notifications.go b/oxia/notifications.go index e9cdd12b..5029e2f7 100644 --- a/oxia/notifications.go +++ b/oxia/notifications.go @@ -62,18 +62,22 @@ func newNotifications(options clientOptions, ctx context.Context, clientPool com newShardNotificationsManager(shard, nm) } - go common.DoWithLabels(map[string]string{ - "oxia": "notifications-manager-close", - }, func() { - // Wait until all the shards managers are done before - // closing the user-facing channel - for i := 0; i < len(shards); i++ { - <-nm.closeCh - } + go common.DoWithLabels( + nm.ctx, + map[string]string{ + "oxia": "notifications-manager-close", + }, + func() { + // Wait until all the shards managers are done before + // closing the user-facing channel + for i := 0; i < len(shards); i++ { + <-nm.closeCh + } - close(nm.multiplexCh) - nm.cancelMultiplexChanClosed() - }) + close(nm.multiplexCh) + nm.cancelMultiplexChanClosed() + }, + ) // Wait for the notifications on all the shards to be initialized timeoutCtx, cancel := context.WithTimeout(nm.ctx, options.requestTimeout) @@ -132,10 +136,14 @@ func newShardNotificationsManager(shard int64, nm *notifications) *shardNotifica ), } - go common.DoWithLabels(map[string]string{ - "oxia": "notifications-manager", - "shard": fmt.Sprintf("%d", shard), - }, snm.getNotificationsWithRetries) + go common.DoWithLabels( + snm.ctx, + map[string]string{ + "oxia": "notifications-manager", + "shard": fmt.Sprintf("%d", shard), + }, + snm.getNotificationsWithRetries, + ) return snm } diff --git a/oxia/sessions.go b/oxia/sessions.go index abd53f7a..7d5e6888 100644 --- a/oxia/sessions.go +++ b/oxia/sessions.go @@ -79,10 +79,14 @@ func (s *sessions) startSession(shardId int64) *clientSession { ), } cs.log.Debug("Creating session") - go common.DoWithLabels(map[string]string{ - "oxia": "session-start", - "shard": fmt.Sprintf("%d", cs.shardId), - }, func() { cs.createSessionWithRetries() }) + go common.DoWithLabels( + cs.ctx, + map[string]string{ + "oxia": "session-start", + "shard": fmt.Sprintf("%d", cs.shardId), + }, + func() { cs.createSessionWithRetries() }, + ) return cs } @@ -164,44 +168,48 @@ func (cs *clientSession) createSession() error { close(cs.started) cs.log.Debug("Successfully created session") - go common.DoWithLabels(map[string]string{ - "oxia": "session-keep-alive", - "shard": fmt.Sprintf("%d", cs.shardId), - "session": fmt.Sprintf("%x016", cs.sessionId), - }, func() { + go common.DoWithLabels( + cs.ctx, + map[string]string{ + "oxia": "session-keep-alive", + "shard": fmt.Sprintf("%d", cs.shardId), + "session": fmt.Sprintf("%x016", cs.sessionId), + }, + func() { - backOff := common.NewBackOff(cs.sessions.ctx) - err := backoff.RetryNotify(func() error { - err := cs.keepAlive() - if status.Code(err) == common.CodeInvalidSession { - cs.log.Error( - "Session is no longer valid", + backOff := common.NewBackOff(cs.sessions.ctx) + err := backoff.RetryNotify(func() error { + err := cs.keepAlive() + if status.Code(err) == common.CodeInvalidSession { + cs.log.Error( + "Session is no longer valid", + slog.Any("error", err), + ) + + cs.sessions.Lock() + defer cs.sessions.Unlock() + cs.Lock() + defer cs.Unlock() + delete(cs.sessions.sessionsByShard, cs.shardId) + return backoff.Permanent(err) + } + return err + }, backOff, func(err error, duration time.Duration) { + slog.Debug( + "Failed to send session heartbeat, retrying later", slog.Any("error", err), + slog.Duration("retry-after", duration), ) - cs.sessions.Lock() - defer cs.sessions.Unlock() - cs.Lock() - defer cs.Unlock() - delete(cs.sessions.sessionsByShard, cs.shardId) - return backoff.Permanent(err) + }) + if !errors.Is(err, context.Canceled) { + cs.log.Error( + "Failed to keep alive session.", + slog.Any("error", err), + ) } - return err - }, backOff, func(err error, duration time.Duration) { - slog.Debug( - "Failed to send session heartbeat, retrying later", - slog.Any("error", err), - slog.Duration("retry-after", duration), - ) - - }) - if !errors.Is(err, context.Canceled) { - cs.log.Error( - "Failed to keep alive session.", - slog.Any("error", err), - ) - } - }) + }, + ) return nil } diff --git a/perf/perf.go b/perf/perf.go index 2f03cfbd..57a7fc07 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -70,7 +70,7 @@ func (p *perf) Run(ctx context.Context) { p.keys[i] = fmt.Sprintf("key-%d", i) } - client, err := oxia.NewAsyncClient(p.config.ServiceAddr, + client, err := oxia.NewAsyncClient(p.config.ServiceAddr, //nolint:contextcheck oxia.WithNamespace(p.config.Namespace), oxia.WithBatchLinger(p.config.BatchLinger), oxia.WithMaxRequestsPerBatch(p.config.MaxRequestsPerBatch), diff --git a/server/follower_controller.go b/server/follower_controller.go index 8909764a..9b0eb88d 100644 --- a/server/follower_controller.go +++ b/server/follower_controller.go @@ -150,10 +150,14 @@ func NewFollowerController(config Config, namespace string, shardId int64, wf wa fc.setLogger() - go common.DoWithLabels(map[string]string{ - "oxia": "follower-apply-committed-entries", - "shard": fmt.Sprintf("%d", fc.shardId), - }, fc.applyAllCommittedEntries) + go common.DoWithLabels( + fc.ctx, + map[string]string{ + "oxia": "follower-apply-committed-entries", + "shard": fmt.Sprintf("%d", fc.shardId), + }, + fc.applyAllCommittedEntries, + ) fc.log.Info( "Created follower", @@ -346,15 +350,23 @@ func (fc *followerController) Replicate(stream proto.OxiaLogReplication_Replicat fc.closeStreamWg = closeStreamWg fc.Unlock() - go common.DoWithLabels(map[string]string{ - "oxia": "add-entries", - "shard": fmt.Sprintf("%d", fc.shardId), - }, func() { fc.handleServerStream(stream) }) + go common.DoWithLabels( + stream.Context(), + map[string]string{ + "oxia": "add-entries", + "shard": fmt.Sprintf("%d", fc.shardId), + }, + func() { fc.handleServerStream(stream) }, + ) - go common.DoWithLabels(map[string]string{ - "oxia": "add-entries-sync", - "shard": fmt.Sprintf("%d", fc.shardId), - }, func() { fc.handleReplicateSync(stream) }) + go common.DoWithLabels( + stream.Context(), + map[string]string{ + "oxia": "add-entries-sync", + "shard": fmt.Sprintf("%d", fc.shardId), + }, + func() { fc.handleReplicateSync(stream) }, + ) return closeStreamWg.Wait(fc.ctx) } @@ -578,10 +590,14 @@ func (fc *followerController) SendSnapshot(stream proto.OxiaLogReplication_SendS fc.closeStreamWg = closeStreamWg fc.Unlock() - go common.DoWithLabels(map[string]string{ - "oxia": "receive-snapshot", - "shard": fmt.Sprintf("%d", fc.shardId), - }, func() { fc.handleSnapshot(stream) }) + go common.DoWithLabels( + stream.Context(), + map[string]string{ + "oxia": "receive-snapshot", + "shard": fmt.Sprintf("%d", fc.shardId), + }, + func() { fc.handleSnapshot(stream) }, + ) return closeStreamWg.Wait(fc.ctx) } diff --git a/server/follower_cursor.go b/server/follower_cursor.go index 50dc81d0..32e481ad 100644 --- a/server/follower_cursor.go +++ b/server/follower_cursor.go @@ -148,13 +148,17 @@ func NewFollowerCursor( return nil, err } - go common.DoWithLabels(map[string]string{ - "oxia": "follower-cursor-send", - "namespace": namespace, - "shard": fmt.Sprintf("%d", fc.shardId), - }, func() { - fc.run() - }) + go common.DoWithLabels( + context.Background(), + map[string]string{ + "oxia": "follower-cursor-send", + "namespace": namespace, + "shard": fmt.Sprintf("%d", fc.shardId), + }, + func() { + fc.run() + }, + ) return fc, nil } @@ -336,12 +340,15 @@ func (fc *followerCursor) streamEntries() error { } defer reader.Close() - go common.DoWithLabels(map[string]string{ - "oxia": "follower-cursor-receive", - "shard": fmt.Sprintf("%d", fc.shardId), - }, func() { - fc.receiveAcks(cancel, fc.stream) - }) + go common.DoWithLabels( + ctx, + map[string]string{ + "oxia": "follower-cursor-receive", + "shard": fmt.Sprintf("%d", fc.shardId), + }, func() { + fc.receiveAcks(cancel, fc.stream) + }, + ) fc.log.Info( "Successfully attached cursor follower", diff --git a/server/kv/notifications_trimmer.go b/server/kv/notifications_trimmer.go index 857891be..16fcc5b2 100644 --- a/server/kv/notifications_trimmer.go +++ b/server/kv/notifications_trimmer.go @@ -65,11 +65,15 @@ func newNotificationsTrimmer(ctx context.Context, namespace string, shardId int6 ), } - go common.DoWithLabels(map[string]string{ - "oxia": "notifications-trimmer", - "namespace": namespace, - "shard": fmt.Sprintf("%d", shardId), - }, t.run) + go common.DoWithLabels( + t.ctx, + map[string]string{ + "oxia": "notifications-trimmer", + "namespace": namespace, + "shard": fmt.Sprintf("%d", shardId), + }, + t.run, + ) return t } diff --git a/server/leader_controller.go b/server/leader_controller.go index 7c268fd7..ed2083ef 100644 --- a/server/leader_controller.go +++ b/server/leader_controller.go @@ -326,7 +326,7 @@ func (lc *leaderController) BecomeLeader(ctx context.Context, req *proto.BecomeL lc.sessionManager = NewSessionManager(lc.ctx, lc.namespace, lc.shardId, lc) for follower, followerHeadEntryId := range req.FollowerMaps { - if err := lc.addFollower(follower, followerHeadEntryId); err != nil { + if err := lc.addFollower(follower, followerHeadEntryId); err != nil { //nolint:contextcheck return nil, err } } @@ -573,26 +573,30 @@ func (lc *leaderController) Read(ctx context.Context, request *proto.ReadRequest } func (lc *leaderController) read(ctx context.Context, request *proto.ReadRequest, ch chan<- GetResult) { - common.DoWithLabels(map[string]string{ - "oxia": "read", - "shard": fmt.Sprintf("%d", lc.shardId), - "peer": common.GetPeer(ctx), - }, func() { - lc.log.Debug("Received read request") - - for _, get := range request.Gets { - response, err := lc.db.Get(get) - if err != nil { - return - } - ch <- GetResult{Response: response} - if ctx.Err() != nil { - ch <- GetResult{Err: ctx.Err()} - break + common.DoWithLabels( + ctx, + map[string]string{ + "oxia": "read", + "shard": fmt.Sprintf("%d", lc.shardId), + "peer": common.GetPeer(ctx), + }, + func() { + lc.log.Debug("Received read request") + + for _, get := range request.Gets { + response, err := lc.db.Get(get) + if err != nil { + return + } + ch <- GetResult{Response: response} + if ctx.Err() != nil { + ch <- GetResult{Err: ctx.Err()} + break + } } - } - close(ch) - }) + close(ch) + }, + ) } func (lc *leaderController) List(ctx context.Context, request *proto.ListRequest) (<-chan string, error) { @@ -611,39 +615,43 @@ func (lc *leaderController) List(ctx context.Context, request *proto.ListRequest } func (lc *leaderController) list(ctx context.Context, request *proto.ListRequest, ch chan<- string) { - common.DoWithLabels(map[string]string{ - "oxia": "list", - "shard": fmt.Sprintf("%d", lc.shardId), - "peer": common.GetPeer(ctx), - }, func() { - lc.log.Debug("Received list request") - - it, err := lc.db.List(request) - if err != nil { - lc.log.Warn( - "Failed to process list request", - slog.Any("error", err), - ) - close(ch) - return - } + common.DoWithLabels( + ctx, + map[string]string{ + "oxia": "list", + "shard": fmt.Sprintf("%d", lc.shardId), + "peer": common.GetPeer(ctx), + }, + func() { + lc.log.Debug("Received list request") - defer func() { - _ = it.Close() - // NOTE: - // we must close the channel after iterator is closed, to avoid the - // iterator keep open when caller is trying to process the next step (for example db.Close) - // because this is execute in another goroutine. - close(ch) - }() + it, err := lc.db.List(request) + if err != nil { + lc.log.Warn( + "Failed to process list request", + slog.Any("error", err), + ) + close(ch) + return + } - for ; it.Valid(); it.Next() { - ch <- it.Key() - if ctx.Err() != nil { - break + defer func() { + _ = it.Close() + // NOTE: + // we must close the channel after iterator is closed, to avoid the + // iterator keep open when caller is trying to process the next step (for example db.Close) + // because this is execute in another goroutine. + close(ch) + }() + + for ; it.Valid(); it.Next() { + ch <- it.Key() + if ctx.Err() != nil { + break + } } - } - }) + }, + ) } func (lc *leaderController) ListSliceNoMutex(ctx context.Context, request *proto.ListRequest) ([]string, error) { @@ -678,7 +686,7 @@ func (lc *leaderController) Write(ctx context.Context, request *proto.WriteReque func (lc *leaderController) write(ctx context.Context, request func(int64) *proto.WriteRequest) (int64, *proto.WriteResponse, error) { timer := lc.writeLatencyHisto.Timer() - defer timer.Done() + defer timer.Done() //nolint:contextcheck lc.log.Debug("Write operation") @@ -750,20 +758,24 @@ func (lc *leaderController) GetNotifications(req *proto.NotificationsRequest, st // Create a context for handling this stream ctx, cancel := context.WithCancel(stream.Context()) - go common.DoWithLabels(map[string]string{ - "oxia": "dispatch-notifications", - "shard": fmt.Sprintf("%d", lc.shardId), - "peer": common.GetPeer(stream.Context()), - }, func() { - if err := lc.dispatchNotifications(ctx, req, stream); err != nil && !errors.Is(err, context.Canceled) { - lc.log.Warn( - "Failed to dispatch notifications", - slog.Any("error", err), - slog.String("peer", common.GetPeer(stream.Context())), - ) - cancel() - } - }) + go common.DoWithLabels( + ctx, + map[string]string{ + "oxia": "dispatch-notifications", + "shard": fmt.Sprintf("%d", lc.shardId), + "peer": common.GetPeer(stream.Context()), + }, + func() { + if err := lc.dispatchNotifications(ctx, req, stream); err != nil && !errors.Is(err, context.Canceled) { + lc.log.Warn( + "Failed to dispatch notifications", + slog.Any("error", err), + slog.String("peer", common.GetPeer(stream.Context())), + ) + cancel() + } + }, + ) select { case <-lc.ctx.Done(): diff --git a/server/util/stream_reader.go b/server/util/stream_reader.go index f5ce1f10..8d55152a 100644 --- a/server/util/stream_reader.go +++ b/server/util/stream_reader.go @@ -49,7 +49,11 @@ type streamReader[T any, U any] struct { } func (s *streamReader[T, U]) Run() error { - go common.DoWithLabels(s.labels, func() { s.handleServerStream() }) + go common.DoWithLabels( + s.ctx, + s.labels, + func() { s.handleServerStream() }, + ) select { case err := <-s.closeCh: diff --git a/server/wal/wal_impl.go b/server/wal/wal_impl.go index 1295b470..b15c3bc9 100644 --- a/server/wal/wal_impl.go +++ b/server/wal/wal_impl.go @@ -148,11 +148,15 @@ func newWal(namespace string, shard int64, options *WalFactoryOptions, commitOff w.trimmer = newTrimmer(namespace, shard, w, options.Retention, trimmerCheckInterval, clock, commitOffsetProvider) if options.SyncData { - go common.DoWithLabels(map[string]string{ - "oxia": "wal-sync", - "namespace": namespace, - "shard": fmt.Sprintf("%d", shard), - }, w.runSync) + go common.DoWithLabels( + w.ctx, + map[string]string{ + "oxia": "wal-sync", + "namespace": namespace, + "shard": fmt.Sprintf("%d", shard), + }, + w.runSync, + ) } return w, nil diff --git a/server/wal/wal_trimmer.go b/server/wal/wal_trimmer.go index 059681e0..e0da8fe3 100644 --- a/server/wal/wal_trimmer.go +++ b/server/wal/wal_trimmer.go @@ -60,10 +60,14 @@ func newTrimmer(namespace string, shard int64, wal *wal, retention time.Duration } t.ctx, t.cancel = context.WithCancel(context.Background()) - go common.DoWithLabels(map[string]string{ - "oxia": "wal-trimmer", - "shard": fmt.Sprintf("%d", shard), - }, t.run) + go common.DoWithLabels( + t.ctx, + map[string]string{ + "oxia": "wal-trimmer", + "shard": fmt.Sprintf("%d", shard), + }, + t.run, + ) return t }