diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a0ba4f1f..58055fcf7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Refactored traces and metrics * Renamed `{retry,table}.WithID` option to `{retry,table}.WithLabel` * Added `ydb.WithTraceRetry` option * Moved `internal/allocator.Buffers` to package `internal/xstring` diff --git a/metrics/driver.go b/metrics/driver.go index fb15b1681..257c2d4e4 100644 --- a/metrics/driver.go +++ b/metrics/driver.go @@ -10,26 +10,34 @@ import ( // driver makes driver with New publishing func driver(config Config) (t trace.Driver) { + config = config.WithSystem("driver") + endpoints := config.WithSystem("balancer").GaugeVec("endpoints", "local_dc", "az") + balancersDiscoveries := config.WithSystem("balancer").CounterVec("discoveries", "status", "cause") + balancerUpdates := config.WithSystem("balancer").CounterVec("updates", "cause") + conns := config.GaugeVec("conns", "endpoint", "node_id") + banned := config.WithSystem("conn").GaugeVec("banned", "endpoint", "node_id", "cause") + requests := config.WithSystem("conn").CounterVec("requests", "status", "method", "endpoint", "node_id") + tli := config.CounterVec("transaction_locks_invalidated") + type endpointKey struct { localDC bool az string } - - config = config.WithSystem("driver") - endpoints := config.WithSystem("balancer").GaugeVec("endpoints", "local_dc", "az") - balancerUpdates := config.WithSystem("balancer").CounterVec("updates", "force") - conns := config.GaugeVec("conns", "address", "node_id") - banned := config.WithSystem("conn").GaugeVec("banned", "address", "node_id", "cause") - requests := config.WithSystem("conn").CounterVec("requests", "status", "method") - tli := config.CounterVec("transaction_locks_invalidated") knownEndpoints := make(map[endpointKey]struct{}) + t.OnConnInvoke = func(info trace.DriverConnInvokeStartInfo) func(trace.DriverConnInvokeDoneInfo) { - method := info.Method + var ( + method = info.Method + endpoint = info.Endpoint.Address() + nodeID = info.Endpoint.NodeID() + ) return func(info trace.DriverConnInvokeDoneInfo) { if config.Details()&trace.DriverConnEvents != 0 { requests.With(map[string]string{ - "status": errorBrief(info.Error), - "method": string(method), + "status": errorBrief(info.Error), + "method": string(method), + "endpoint": endpoint, + "node_id": strconv.FormatUint(uint64(nodeID), 10), }).Inc() if xerrors.IsOperationErrorTransactionLocksInvalidated(info.Error) { tli.With(nil).Inc() @@ -42,13 +50,19 @@ func driver(config Config) (t trace.Driver) { ) func( trace.DriverConnNewStreamDoneInfo, ) { - method := info.Method + var ( + method = info.Method + endpoint = info.Endpoint.Address() + nodeID = info.Endpoint.NodeID() + ) return func(info trace.DriverConnNewStreamRecvInfo) func(trace.DriverConnNewStreamDoneInfo) { return func(info trace.DriverConnNewStreamDoneInfo) { if config.Details()&trace.DriverConnEvents != 0 { requests.With(map[string]string{ - "status": errorBrief(info.Error), - "method": string(method), + "status": errorBrief(info.Error), + "method": string(method), + "endpoint": endpoint, + "node_id": strconv.FormatUint(uint64(nodeID), 10), }).Inc() } } @@ -57,19 +71,30 @@ func driver(config Config) (t trace.Driver) { t.OnConnBan = func(info trace.DriverConnBanStartInfo) func(trace.DriverConnBanDoneInfo) { if config.Details()&trace.DriverConnEvents != 0 { banned.With(map[string]string{ - "address": info.Endpoint.Address(), - "node_id": idToString(info.Endpoint.NodeID()), - "cause": errorBrief(info.Cause), + "endpoint": info.Endpoint.Address(), + "node_id": idToString(info.Endpoint.NodeID()), + "cause": errorBrief(info.Cause), }).Add(1) } return nil } + t.OnBalancerClusterDiscoveryAttempt = func(info trace.DriverBalancerClusterDiscoveryAttemptStartInfo) func( + trace.DriverBalancerClusterDiscoveryAttemptDoneInfo, + ) { + eventType := repeater.EventType(*info.Context) + return func(info trace.DriverBalancerClusterDiscoveryAttemptDoneInfo) { + balancersDiscoveries.With(map[string]string{ + "status": errorBrief(info.Error), + "cause": eventType, + }).Inc() + } + } t.OnBalancerUpdate = func(info trace.DriverBalancerUpdateStartInfo) func(trace.DriverBalancerUpdateDoneInfo) { eventType := repeater.EventType(*info.Context) return func(info trace.DriverBalancerUpdateDoneInfo) { if config.Details()&trace.DriverBalancerEvents != 0 { balancerUpdates.With(map[string]string{ - "force": strconv.FormatBool(eventType == repeater.EventForce), + "cause": eventType, }).Inc() newEndpoints := make(map[endpointKey]int, len(info.Endpoints)) for _, e := range info.Endpoints { @@ -99,14 +124,14 @@ func driver(config Config) (t trace.Driver) { } } t.OnConnDial = func(info trace.DriverConnDialStartInfo) func(trace.DriverConnDialDoneInfo) { - address := info.Endpoint.Address() + endpoint := info.Endpoint.Address() nodeID := info.Endpoint.NodeID() return func(info trace.DriverConnDialDoneInfo) { if config.Details()&trace.DriverConnEvents != 0 { if info.Error == nil { conns.With(map[string]string{ - "address": address, - "node_id": idToString(nodeID), + "endpoint": endpoint, + "node_id": idToString(nodeID), }).Add(1) } } @@ -115,8 +140,8 @@ func driver(config Config) (t trace.Driver) { t.OnConnClose = func(info trace.DriverConnCloseStartInfo) func(trace.DriverConnCloseDoneInfo) { if config.Details()&trace.DriverConnEvents != 0 { conns.With(map[string]string{ - "address": info.Endpoint.Address(), - "node_id": idToString(info.Endpoint.NodeID()), + "endpoint": info.Endpoint.Address(), + "node_id": idToString(info.Endpoint.NodeID()), }).Add(-1) } return nil diff --git a/metrics/retry.go b/metrics/retry.go index 43bd1f7f1..a1dfa72ad 100644 --- a/metrics/retry.go +++ b/metrics/retry.go @@ -1,9 +1,47 @@ package metrics import ( + "time" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) func retry(config Config) (t trace.Retry) { + config = config.WithSystem("retry") + errs := config.CounterVec("errors", "status", "retry_label", "final") + attempts := config.HistogramVec("attempts", []float64{0, 1, 2, 3, 4, 5, 7, 10}, "retry_label") + latency := config.TimerVec("latency", "status", "retry_label") + t.OnRetry = func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { + label := info.Label + if label == "" { + return nil + } + start := time.Now() + return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { + if info.Error != nil && config.Details()&trace.RetryEvents != 0 { + errs.With(map[string]string{ + "status": errorBrief(info.Error), + "retry_label": label, + "final": "false", + }).Inc() + } + return func(info trace.RetryLoopDoneInfo) { + if config.Details()&trace.RetryEvents != 0 { + attempts.With(map[string]string{ + "retry_label": label, + }).Record(float64(info.Attempts)) + errs.With(map[string]string{ + "status": errorBrief(info.Error), + "retry_label": label, + "final": "true", + }).Inc() + latency.With(map[string]string{ + "status": errorBrief(info.Error), + "retry_label": label, + }).Record(time.Since(start)) + } + } + } + } return t } diff --git a/metrics/sql.go b/metrics/sql.go index d6a2a4162..76ae1ae4d 100644 --- a/metrics/sql.go +++ b/metrics/sql.go @@ -1,6 +1,8 @@ package metrics import ( + "time" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -8,7 +10,14 @@ import ( func databaseSQL(config Config) (t trace.DatabaseSQL) { config = config.WithSystem("database").WithSystem("sql") conns := config.GaugeVec("conns") - txs := config.GaugeVec("txs") + inflight := config.WithSystem("conns").GaugeVec("inflight") + query := config.CounterVec("query", "status", "query_mode") + queryLatency := config.WithSystem("query").TimerVec("latency", "status", "query_mode") + exec := config.CounterVec("exec", "status", "query_label", "query_mode") + execLatency := config.WithSystem("exec").TimerVec("latency", "status", "query_mode") + txBegin := config.WithSystem("tx").CounterVec("begin", "status") + txCommit := config.WithSystem("tx").CounterVec("commit", "status") + txRollback := config.WithSystem("tx").CounterVec("rollback", "status") t.OnConnectorConnect = func(info trace.DatabaseSQLConnectorConnectStartInfo) func( trace.DatabaseSQLConnectorConnectDoneInfo, ) { @@ -32,32 +41,80 @@ func databaseSQL(config Config) (t trace.DatabaseSQL) { t.OnConnBegin = func(info trace.DatabaseSQLConnBeginStartInfo) func(trace.DatabaseSQLConnBeginDoneInfo) { if config.Details()&trace.DatabaseSQLTxEvents != 0 { return func(info trace.DatabaseSQLConnBeginDoneInfo) { - if info.Tx != nil { - txs.With(nil).Add(1) - } + txBegin.With(map[string]string{ + "status": errorBrief(info.Error), + }).Inc() } } return nil } t.OnTxCommit = func(info trace.DatabaseSQLTxCommitStartInfo) func(trace.DatabaseSQLTxCommitDoneInfo) { - if config.Details()&trace.DatabaseSQLTxEvents != 0 { - return func(info trace.DatabaseSQLTxCommitDoneInfo) { - if info.Error == nil { - txs.With(nil).Add(-1) - } + return func(info trace.DatabaseSQLTxCommitDoneInfo) { + if config.Details()&trace.DatabaseSQLTxEvents != 0 { + txCommit.With(map[string]string{ + "status": errorBrief(info.Error), + }).Inc() } } - return nil } t.OnTxRollback = func(info trace.DatabaseSQLTxRollbackStartInfo) func(trace.DatabaseSQLTxRollbackDoneInfo) { - if config.Details()&trace.DatabaseSQLTxEvents != 0 { - return func(info trace.DatabaseSQLTxRollbackDoneInfo) { - if info.Error == nil { - txs.With(nil).Add(-1) - } + return func(info trace.DatabaseSQLTxRollbackDoneInfo) { + if config.Details()&trace.DatabaseSQLTxEvents != 0 { + txRollback.With(map[string]string{ + "status": errorBrief(info.Error), + }).Inc() + } + } + } + t.OnConnExec = func(info trace.DatabaseSQLConnExecStartInfo) func(trace.DatabaseSQLConnExecDoneInfo) { + if config.Details()&trace.DatabaseSQLEvents != 0 { + inflight.With(nil).Add(1) + } + var ( + mode = info.Mode + start = time.Now() + ) + return func(info trace.DatabaseSQLConnExecDoneInfo) { + if config.Details()&trace.DatabaseSQLEvents != 0 { + inflight.With(nil).Add(-1) + } + if config.Details()&trace.DatabaseSQLConnEvents != 0 { + status := errorBrief(info.Error) + exec.With(map[string]string{ + "status": status, + "query_mode": mode, + }).Inc() + execLatency.With(map[string]string{ + "status": status, + "query_mode": mode, + }).Record(time.Since(start)) + } + } + } + t.OnConnQuery = func(info trace.DatabaseSQLConnQueryStartInfo) func(trace.DatabaseSQLConnQueryDoneInfo) { + if config.Details()&trace.DatabaseSQLEvents != 0 { + inflight.With(nil).Add(1) + } + var ( + mode = info.Mode + start = time.Now() + ) + return func(info trace.DatabaseSQLConnQueryDoneInfo) { + if config.Details()&trace.DatabaseSQLEvents != 0 { + inflight.With(nil).Add(-1) + } + if config.Details()&trace.DatabaseSQLConnEvents != 0 { + status := errorBrief(info.Error) + query.With(map[string]string{ + "status": status, + "query_mode": mode, + }).Inc() + queryLatency.With(map[string]string{ + "status": status, + "query_mode": mode, + }).Record(time.Since(start)) } } - return nil } return t } diff --git a/metrics/table.go b/metrics/table.go index 01c2dd9f2..bff1f8c42 100644 --- a/metrics/table.go +++ b/metrics/table.go @@ -17,81 +17,11 @@ func table(config Config) (t trace.Table) { wait := config.WithSystem("pool").GaugeVec("wait") waitLatency := config.WithSystem("pool").WithSystem("wait").TimerVec("latency") alive := config.GaugeVec("sessions", "node_id") - doAttempts := config.WithSystem("do").HistogramVec("attempts", []float64{0, 1, 2, 5, 10}, "name") - doErrors := config.WithSystem("do").CounterVec("errors", "status", "name") - doIntermediateErrors := config.WithSystem("do").WithSystem("intermediate").CounterVec("errors", "status", "name") - doLatency := config.WithSystem("do").TimerVec("latency", "status", "name") - doTxAttempts := config.WithSystem("doTx").HistogramVec("attempts", []float64{0, 1, 2, 5, 10}, "name") - doTxIntermediateErrors := config.WithSystem("doTx").WithSystem("intermediate").CounterVec("errors", "status", "name") - doTxErrors := config.WithSystem("doTx").CounterVec("errors", "status", "name") - doTxLatency := config.WithSystem("doTx").TimerVec("latency", "status", "name") t.OnInit = func(info trace.TableInitStartInfo) func(trace.TableInitDoneInfo) { return func(info trace.TableInitDoneInfo) { limit.With(nil).Set(float64(info.Limit)) } } - t.OnDo = func(info trace.TableDoStartInfo) func( - info trace.TableDoIntermediateInfo, - ) func( - trace.TableDoDoneInfo, - ) { - var ( - label = info.Label - start = time.Now() - ) - return func(info trace.TableDoIntermediateInfo) func(trace.TableDoDoneInfo) { - if info.Error != nil && config.Details()&trace.TableEvents != 0 { - doIntermediateErrors.With(map[string]string{ - "status": errorBrief(info.Error), - "label": label, - }).Inc() - } - return func(info trace.TableDoDoneInfo) { - if config.Details()&trace.TableEvents != 0 { - doAttempts.With(nil).Record(float64(info.Attempts)) - doErrors.With(map[string]string{ - "status": errorBrief(info.Error), - "label": label, - }).Inc() - doLatency.With(map[string]string{ - "status": errorBrief(info.Error), - "label": label, - }).Record(time.Since(start)) - } - } - } - } - t.OnDoTx = func(info trace.TableDoTxStartInfo) func( - info trace.TableDoTxIntermediateInfo, - ) func( - trace.TableDoTxDoneInfo, - ) { - var ( - label = info.Label - start = time.Now() - ) - return func(info trace.TableDoTxIntermediateInfo) func(trace.TableDoTxDoneInfo) { - if info.Error != nil && config.Details()&trace.TableEvents != 0 { - doTxIntermediateErrors.With(map[string]string{ - "status": errorBrief(info.Error), - "label": label, - }).Inc() - } - return func(info trace.TableDoTxDoneInfo) { - if config.Details()&trace.TableEvents != 0 { - doTxAttempts.With(nil).Record(float64(info.Attempts)) - doTxErrors.With(map[string]string{ - "status": errorBrief(info.Error), - "label": label, - }).Inc() - doTxLatency.With(map[string]string{ - "status": errorBrief(info.Error), - "label": label, - }).Record(time.Since(start)) - } - } - } - } t.OnSessionNew = func(info trace.TableSessionNewStartInfo) func(trace.TableSessionNewDoneInfo) { return func(info trace.TableSessionNewDoneInfo) { if info.Error == nil && config.Details()&trace.TableSessionEvents != 0 { diff --git a/retry/retry.go b/retry/retry.go index f4f723ce1..388316b12 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/wait" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -223,7 +222,6 @@ func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr err trace: &trace.Retry{}, fastBackoff: backoff.Fast, slowBackoff: backoff.Slow, - label: stack.Record(1, stack.Lambda(false), stack.FileName(false)), } for _, opt := range opts { if opt != nil {