Skip to content

Commit

Permalink
Merge pull request #860 from ydb-platform/metrics
Browse files Browse the repository at this point in the history
* Refactored traces and metrics
  • Loading branch information
asmyasnikov authored Oct 25, 2023
2 parents 63560a5 + 72a0f51 commit 7e6371f
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 111 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Refactored traces and metrics
* Renamed `{retry,table}.WithID` option to `{retry,table}.WithLabel`
* Added `ydb.WithTraceRetry` option
* Moved `internal/allocator.Buffers` to package `internal/xstring`
Expand Down
71 changes: 48 additions & 23 deletions metrics/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,34 @@ import (

// driver makes driver with New publishing
func driver(config Config) (t trace.Driver) {
config = config.WithSystem("driver")
endpoints := config.WithSystem("balancer").GaugeVec("endpoints", "local_dc", "az")
balancersDiscoveries := config.WithSystem("balancer").CounterVec("discoveries", "status", "cause")
balancerUpdates := config.WithSystem("balancer").CounterVec("updates", "cause")
conns := config.GaugeVec("conns", "endpoint", "node_id")
banned := config.WithSystem("conn").GaugeVec("banned", "endpoint", "node_id", "cause")
requests := config.WithSystem("conn").CounterVec("requests", "status", "method", "endpoint", "node_id")
tli := config.CounterVec("transaction_locks_invalidated")

type endpointKey struct {
localDC bool
az string
}

config = config.WithSystem("driver")
endpoints := config.WithSystem("balancer").GaugeVec("endpoints", "local_dc", "az")
balancerUpdates := config.WithSystem("balancer").CounterVec("updates", "force")
conns := config.GaugeVec("conns", "address", "node_id")
banned := config.WithSystem("conn").GaugeVec("banned", "address", "node_id", "cause")
requests := config.WithSystem("conn").CounterVec("requests", "status", "method")
tli := config.CounterVec("transaction_locks_invalidated")
knownEndpoints := make(map[endpointKey]struct{})

t.OnConnInvoke = func(info trace.DriverConnInvokeStartInfo) func(trace.DriverConnInvokeDoneInfo) {
method := info.Method
var (
method = info.Method
endpoint = info.Endpoint.Address()
nodeID = info.Endpoint.NodeID()
)
return func(info trace.DriverConnInvokeDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
requests.With(map[string]string{
"status": errorBrief(info.Error),
"method": string(method),
"status": errorBrief(info.Error),
"method": string(method),
"endpoint": endpoint,
"node_id": strconv.FormatUint(uint64(nodeID), 10),
}).Inc()
if xerrors.IsOperationErrorTransactionLocksInvalidated(info.Error) {
tli.With(nil).Inc()
Expand All @@ -42,13 +50,19 @@ func driver(config Config) (t trace.Driver) {
) func(
trace.DriverConnNewStreamDoneInfo,
) {
method := info.Method
var (
method = info.Method
endpoint = info.Endpoint.Address()
nodeID = info.Endpoint.NodeID()
)
return func(info trace.DriverConnNewStreamRecvInfo) func(trace.DriverConnNewStreamDoneInfo) {
return func(info trace.DriverConnNewStreamDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
requests.With(map[string]string{
"status": errorBrief(info.Error),
"method": string(method),
"status": errorBrief(info.Error),
"method": string(method),
"endpoint": endpoint,
"node_id": strconv.FormatUint(uint64(nodeID), 10),
}).Inc()
}
}
Expand All @@ -57,19 +71,30 @@ func driver(config Config) (t trace.Driver) {
t.OnConnBan = func(info trace.DriverConnBanStartInfo) func(trace.DriverConnBanDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
banned.With(map[string]string{
"address": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
"cause": errorBrief(info.Cause),
"endpoint": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
"cause": errorBrief(info.Cause),
}).Add(1)
}
return nil
}
t.OnBalancerClusterDiscoveryAttempt = func(info trace.DriverBalancerClusterDiscoveryAttemptStartInfo) func(
trace.DriverBalancerClusterDiscoveryAttemptDoneInfo,
) {
eventType := repeater.EventType(*info.Context)
return func(info trace.DriverBalancerClusterDiscoveryAttemptDoneInfo) {
balancersDiscoveries.With(map[string]string{
"status": errorBrief(info.Error),
"cause": eventType,
}).Inc()
}
}
t.OnBalancerUpdate = func(info trace.DriverBalancerUpdateStartInfo) func(trace.DriverBalancerUpdateDoneInfo) {
eventType := repeater.EventType(*info.Context)
return func(info trace.DriverBalancerUpdateDoneInfo) {
if config.Details()&trace.DriverBalancerEvents != 0 {
balancerUpdates.With(map[string]string{
"force": strconv.FormatBool(eventType == repeater.EventForce),
"cause": eventType,
}).Inc()
newEndpoints := make(map[endpointKey]int, len(info.Endpoints))
for _, e := range info.Endpoints {
Expand Down Expand Up @@ -99,14 +124,14 @@ func driver(config Config) (t trace.Driver) {
}
}
t.OnConnDial = func(info trace.DriverConnDialStartInfo) func(trace.DriverConnDialDoneInfo) {
address := info.Endpoint.Address()
endpoint := info.Endpoint.Address()
nodeID := info.Endpoint.NodeID()
return func(info trace.DriverConnDialDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
if info.Error == nil {
conns.With(map[string]string{
"address": address,
"node_id": idToString(nodeID),
"endpoint": endpoint,
"node_id": idToString(nodeID),
}).Add(1)
}
}
Expand All @@ -115,8 +140,8 @@ func driver(config Config) (t trace.Driver) {
t.OnConnClose = func(info trace.DriverConnCloseStartInfo) func(trace.DriverConnCloseDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
conns.With(map[string]string{
"address": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
"endpoint": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
}).Add(-1)
}
return nil
Expand Down
38 changes: 38 additions & 0 deletions metrics/retry.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,47 @@
package metrics

import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

func retry(config Config) (t trace.Retry) {
config = config.WithSystem("retry")
errs := config.CounterVec("errors", "status", "retry_label", "final")
attempts := config.HistogramVec("attempts", []float64{0, 1, 2, 3, 4, 5, 7, 10}, "retry_label")
latency := config.TimerVec("latency", "status", "retry_label")
t.OnRetry = func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
label := info.Label
if label == "" {
return nil
}
start := time.Now()
return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
if info.Error != nil && config.Details()&trace.RetryEvents != 0 {
errs.With(map[string]string{
"status": errorBrief(info.Error),
"retry_label": label,
"final": "false",
}).Inc()
}
return func(info trace.RetryLoopDoneInfo) {
if config.Details()&trace.RetryEvents != 0 {
attempts.With(map[string]string{
"retry_label": label,
}).Record(float64(info.Attempts))
errs.With(map[string]string{
"status": errorBrief(info.Error),
"retry_label": label,
"final": "true",
}).Inc()
latency.With(map[string]string{
"status": errorBrief(info.Error),
"retry_label": label,
}).Record(time.Since(start))
}
}
}
}
return t
}
89 changes: 73 additions & 16 deletions metrics/sql.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
package metrics

import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

// databaseSQL makes trace.DatabaseSQL with measuring `database/sql` events
func databaseSQL(config Config) (t trace.DatabaseSQL) {
config = config.WithSystem("database").WithSystem("sql")
conns := config.GaugeVec("conns")
txs := config.GaugeVec("txs")
inflight := config.WithSystem("conns").GaugeVec("inflight")
query := config.CounterVec("query", "status", "query_mode")
queryLatency := config.WithSystem("query").TimerVec("latency", "status", "query_mode")
exec := config.CounterVec("exec", "status", "query_label", "query_mode")
execLatency := config.WithSystem("exec").TimerVec("latency", "status", "query_mode")
txBegin := config.WithSystem("tx").CounterVec("begin", "status")
txCommit := config.WithSystem("tx").CounterVec("commit", "status")
txRollback := config.WithSystem("tx").CounterVec("rollback", "status")
t.OnConnectorConnect = func(info trace.DatabaseSQLConnectorConnectStartInfo) func(
trace.DatabaseSQLConnectorConnectDoneInfo,
) {
Expand All @@ -32,32 +41,80 @@ func databaseSQL(config Config) (t trace.DatabaseSQL) {
t.OnConnBegin = func(info trace.DatabaseSQLConnBeginStartInfo) func(trace.DatabaseSQLConnBeginDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
return func(info trace.DatabaseSQLConnBeginDoneInfo) {
if info.Tx != nil {
txs.With(nil).Add(1)
}
txBegin.With(map[string]string{
"status": errorBrief(info.Error),
}).Inc()
}
}
return nil
}
t.OnTxCommit = func(info trace.DatabaseSQLTxCommitStartInfo) func(trace.DatabaseSQLTxCommitDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
return func(info trace.DatabaseSQLTxCommitDoneInfo) {
if info.Error == nil {
txs.With(nil).Add(-1)
}
return func(info trace.DatabaseSQLTxCommitDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
txCommit.With(map[string]string{
"status": errorBrief(info.Error),
}).Inc()
}
}
return nil
}
t.OnTxRollback = func(info trace.DatabaseSQLTxRollbackStartInfo) func(trace.DatabaseSQLTxRollbackDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
return func(info trace.DatabaseSQLTxRollbackDoneInfo) {
if info.Error == nil {
txs.With(nil).Add(-1)
}
return func(info trace.DatabaseSQLTxRollbackDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
txRollback.With(map[string]string{
"status": errorBrief(info.Error),
}).Inc()
}
}
}
t.OnConnExec = func(info trace.DatabaseSQLConnExecStartInfo) func(trace.DatabaseSQLConnExecDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(1)
}
var (
mode = info.Mode
start = time.Now()
)
return func(info trace.DatabaseSQLConnExecDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(-1)
}
if config.Details()&trace.DatabaseSQLConnEvents != 0 {
status := errorBrief(info.Error)
exec.With(map[string]string{
"status": status,
"query_mode": mode,
}).Inc()
execLatency.With(map[string]string{
"status": status,
"query_mode": mode,
}).Record(time.Since(start))
}
}
}
t.OnConnQuery = func(info trace.DatabaseSQLConnQueryStartInfo) func(trace.DatabaseSQLConnQueryDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(1)
}
var (
mode = info.Mode
start = time.Now()
)
return func(info trace.DatabaseSQLConnQueryDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(-1)
}
if config.Details()&trace.DatabaseSQLConnEvents != 0 {
status := errorBrief(info.Error)
query.With(map[string]string{
"status": status,
"query_mode": mode,
}).Inc()
queryLatency.With(map[string]string{
"status": status,
"query_mode": mode,
}).Record(time.Since(start))
}
}
return nil
}
return t
}
70 changes: 0 additions & 70 deletions metrics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,81 +17,11 @@ func table(config Config) (t trace.Table) {
wait := config.WithSystem("pool").GaugeVec("wait")
waitLatency := config.WithSystem("pool").WithSystem("wait").TimerVec("latency")
alive := config.GaugeVec("sessions", "node_id")
doAttempts := config.WithSystem("do").HistogramVec("attempts", []float64{0, 1, 2, 5, 10}, "name")
doErrors := config.WithSystem("do").CounterVec("errors", "status", "name")
doIntermediateErrors := config.WithSystem("do").WithSystem("intermediate").CounterVec("errors", "status", "name")
doLatency := config.WithSystem("do").TimerVec("latency", "status", "name")
doTxAttempts := config.WithSystem("doTx").HistogramVec("attempts", []float64{0, 1, 2, 5, 10}, "name")
doTxIntermediateErrors := config.WithSystem("doTx").WithSystem("intermediate").CounterVec("errors", "status", "name")
doTxErrors := config.WithSystem("doTx").CounterVec("errors", "status", "name")
doTxLatency := config.WithSystem("doTx").TimerVec("latency", "status", "name")
t.OnInit = func(info trace.TableInitStartInfo) func(trace.TableInitDoneInfo) {
return func(info trace.TableInitDoneInfo) {
limit.With(nil).Set(float64(info.Limit))
}
}
t.OnDo = func(info trace.TableDoStartInfo) func(
info trace.TableDoIntermediateInfo,
) func(
trace.TableDoDoneInfo,
) {
var (
label = info.Label
start = time.Now()
)
return func(info trace.TableDoIntermediateInfo) func(trace.TableDoDoneInfo) {
if info.Error != nil && config.Details()&trace.TableEvents != 0 {
doIntermediateErrors.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Inc()
}
return func(info trace.TableDoDoneInfo) {
if config.Details()&trace.TableEvents != 0 {
doAttempts.With(nil).Record(float64(info.Attempts))
doErrors.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Inc()
doLatency.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Record(time.Since(start))
}
}
}
}
t.OnDoTx = func(info trace.TableDoTxStartInfo) func(
info trace.TableDoTxIntermediateInfo,
) func(
trace.TableDoTxDoneInfo,
) {
var (
label = info.Label
start = time.Now()
)
return func(info trace.TableDoTxIntermediateInfo) func(trace.TableDoTxDoneInfo) {
if info.Error != nil && config.Details()&trace.TableEvents != 0 {
doTxIntermediateErrors.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Inc()
}
return func(info trace.TableDoTxDoneInfo) {
if config.Details()&trace.TableEvents != 0 {
doTxAttempts.With(nil).Record(float64(info.Attempts))
doTxErrors.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Inc()
doTxLatency.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Record(time.Since(start))
}
}
}
}
t.OnSessionNew = func(info trace.TableSessionNewStartInfo) func(trace.TableSessionNewDoneInfo) {
return func(info trace.TableSessionNewDoneInfo) {
if info.Error == nil && config.Details()&trace.TableSessionEvents != 0 {
Expand Down
Loading

0 comments on commit 7e6371f

Please sign in to comment.