Skip to content

Commit

Permalink
* Refactored traces and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Oct 25, 2023
1 parent 63560a5 commit 0729d76
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 111 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Refactored traces and metrics
* Renamed `{retry,table}.WithID` option to `{retry,table}.WithLabel`
* Added `ydb.WithTraceRetry` option
* Moved `internal/allocator.Buffers` to package `internal/xstring`
Expand Down
16 changes: 16 additions & 0 deletions metrics/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package metrics

import "context"

type ctxQueryLabel struct{}

func WithQueryLabel(ctx context.Context, label string) context.Context {
return context.WithValue(ctx, ctxQueryLabel{}, label)
}

func queryLabel(ctx context.Context) (label string) {
if label, has := ctx.Value(ctxQueryLabel{}).(string); has {
return label
}
return ""
}
71 changes: 48 additions & 23 deletions metrics/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,34 @@ import (

// driver makes driver with New publishing
func driver(config Config) (t trace.Driver) {
config = config.WithSystem("driver")
endpoints := config.WithSystem("balancer").GaugeVec("endpoints", "local_dc", "az")
balancersDiscoveries := config.WithSystem("balancer").CounterVec("discoveries", "status", "cause")
balancerUpdates := config.WithSystem("balancer").CounterVec("updates", "cause")
conns := config.GaugeVec("conns", "endpoint", "node_id")
banned := config.WithSystem("conn").GaugeVec("banned", "endpoint", "node_id", "cause")
requests := config.WithSystem("conn").CounterVec("requests", "status", "method", "endpoint", "node_id")
tli := config.CounterVec("transaction_locks_invalidated")

type endpointKey struct {
localDC bool
az string
}

config = config.WithSystem("driver")
endpoints := config.WithSystem("balancer").GaugeVec("endpoints", "local_dc", "az")
balancerUpdates := config.WithSystem("balancer").CounterVec("updates", "force")
conns := config.GaugeVec("conns", "address", "node_id")
banned := config.WithSystem("conn").GaugeVec("banned", "address", "node_id", "cause")
requests := config.WithSystem("conn").CounterVec("requests", "status", "method")
tli := config.CounterVec("transaction_locks_invalidated")
knownEndpoints := make(map[endpointKey]struct{})

t.OnConnInvoke = func(info trace.DriverConnInvokeStartInfo) func(trace.DriverConnInvokeDoneInfo) {
method := info.Method
var (
method = info.Method
endpoint = info.Endpoint.Address()
nodeID = info.Endpoint.NodeID()
)
return func(info trace.DriverConnInvokeDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
requests.With(map[string]string{
"status": errorBrief(info.Error),
"method": string(method),
"status": errorBrief(info.Error),
"method": string(method),
"endpoint": endpoint,
"node_id": strconv.FormatUint(uint64(nodeID), 10),
}).Inc()
if xerrors.IsOperationErrorTransactionLocksInvalidated(info.Error) {
tli.With(nil).Inc()
Expand All @@ -42,13 +50,19 @@ func driver(config Config) (t trace.Driver) {
) func(
trace.DriverConnNewStreamDoneInfo,
) {
method := info.Method
var (
method = info.Method
endpoint = info.Endpoint.Address()
nodeID = info.Endpoint.NodeID()
)
return func(info trace.DriverConnNewStreamRecvInfo) func(trace.DriverConnNewStreamDoneInfo) {
return func(info trace.DriverConnNewStreamDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
requests.With(map[string]string{
"status": errorBrief(info.Error),
"method": string(method),
"status": errorBrief(info.Error),
"method": string(method),
"endpoint": endpoint,
"node_id": strconv.FormatUint(uint64(nodeID), 10),
}).Inc()
}
}
Expand All @@ -57,19 +71,30 @@ func driver(config Config) (t trace.Driver) {
t.OnConnBan = func(info trace.DriverConnBanStartInfo) func(trace.DriverConnBanDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
banned.With(map[string]string{
"address": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
"cause": errorBrief(info.Cause),
"endpoint": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
"cause": errorBrief(info.Cause),
}).Add(1)
}
return nil
}
t.OnBalancerClusterDiscoveryAttempt = func(info trace.DriverBalancerClusterDiscoveryAttemptStartInfo) func(
trace.DriverBalancerClusterDiscoveryAttemptDoneInfo,
) {
eventType := repeater.EventType(*info.Context)
return func(info trace.DriverBalancerClusterDiscoveryAttemptDoneInfo) {
balancersDiscoveries.With(map[string]string{
"status": errorBrief(info.Error),
"cause": eventType,
}).Inc()
}
}
t.OnBalancerUpdate = func(info trace.DriverBalancerUpdateStartInfo) func(trace.DriverBalancerUpdateDoneInfo) {
eventType := repeater.EventType(*info.Context)
return func(info trace.DriverBalancerUpdateDoneInfo) {
if config.Details()&trace.DriverBalancerEvents != 0 {
balancerUpdates.With(map[string]string{
"force": strconv.FormatBool(eventType == repeater.EventForce),
"cause": eventType,
}).Inc()
newEndpoints := make(map[endpointKey]int, len(info.Endpoints))
for _, e := range info.Endpoints {
Expand Down Expand Up @@ -99,14 +124,14 @@ func driver(config Config) (t trace.Driver) {
}
}
t.OnConnDial = func(info trace.DriverConnDialStartInfo) func(trace.DriverConnDialDoneInfo) {
address := info.Endpoint.Address()
endpoint := info.Endpoint.Address()
nodeID := info.Endpoint.NodeID()
return func(info trace.DriverConnDialDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
if info.Error == nil {
conns.With(map[string]string{
"address": address,
"node_id": idToString(nodeID),
"endpoint": endpoint,
"node_id": idToString(nodeID),
}).Add(1)
}
}
Expand All @@ -115,8 +140,8 @@ func driver(config Config) (t trace.Driver) {
t.OnConnClose = func(info trace.DriverConnCloseStartInfo) func(trace.DriverConnCloseDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
conns.With(map[string]string{
"address": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
"endpoint": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
}).Add(-1)
}
return nil
Expand Down
37 changes: 37 additions & 0 deletions metrics/retry.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,46 @@
package metrics

import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

func retry(config Config) (t trace.Retry) {
config = config.WithSystem("retry")
errs := config.CounterVec("errors", "status", "retry_label", "final")
attempts := config.HistogramVec("attempts", []float64{0, 1, 2, 3, 4, 5, 7, 10}, "retry_label")
latency := config.TimerVec("latency", "status", "retry_label")
t.OnRetry = func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
var (
label = info.Label
start = time.Now()
)
return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
if info.Error != nil && config.Details()&trace.RetryEvents != 0 {
errs.With(map[string]string{
"status": errorBrief(info.Error),
"retry_label": label,
"final": "false",
}).Inc()
}
return func(info trace.RetryLoopDoneInfo) {
if config.Details()&trace.RetryEvents != 0 {
attempts.With(map[string]string{
"retry_label": label,
}).Record(float64(info.Attempts))
errs.With(map[string]string{
"status": errorBrief(info.Error),
"retry_label": label,
"final": "true",
}).Inc()
latency.With(map[string]string{
"status": errorBrief(info.Error),
"retry_label": label,
}).Record(time.Since(start))
}
}
}
}
return t
}
95 changes: 79 additions & 16 deletions metrics/sql.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
package metrics

import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

// databaseSQL makes trace.DatabaseSQL with measuring `database/sql` events
func databaseSQL(config Config) (t trace.DatabaseSQL) {
config = config.WithSystem("database").WithSystem("sql")
conns := config.GaugeVec("conns")
txs := config.GaugeVec("txs")
inflight := config.WithSystem("conns").GaugeVec("inflight")
query := config.CounterVec("query", "status", "query_label", "query_mode")
queryLatency := config.WithSystem("query").TimerVec("latency", "status", "query_label", "query_mode")
exec := config.CounterVec("exec", "status", "query_label", "query_mode")
execLatency := config.WithSystem("exec").TimerVec("latency", "status", "query_label", "query_mode")
txBegin := config.WithSystem("tx").CounterVec("begin", "status")
txCommit := config.WithSystem("tx").CounterVec("commit", "status")
txRollback := config.WithSystem("tx").CounterVec("rollback", "status")
t.OnConnectorConnect = func(info trace.DatabaseSQLConnectorConnectStartInfo) func(
trace.DatabaseSQLConnectorConnectDoneInfo,
) {
Expand All @@ -32,32 +41,86 @@ func databaseSQL(config Config) (t trace.DatabaseSQL) {
t.OnConnBegin = func(info trace.DatabaseSQLConnBeginStartInfo) func(trace.DatabaseSQLConnBeginDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
return func(info trace.DatabaseSQLConnBeginDoneInfo) {
if info.Tx != nil {
txs.With(nil).Add(1)
}
txBegin.With(map[string]string{
"status": errorBrief(info.Error),
}).Inc()
}
}
return nil
}
t.OnTxCommit = func(info trace.DatabaseSQLTxCommitStartInfo) func(trace.DatabaseSQLTxCommitDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
return func(info trace.DatabaseSQLTxCommitDoneInfo) {
if info.Error == nil {
txs.With(nil).Add(-1)
}
return func(info trace.DatabaseSQLTxCommitDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
txCommit.With(map[string]string{
"status": errorBrief(info.Error),
}).Inc()
}
}
return nil
}
t.OnTxRollback = func(info trace.DatabaseSQLTxRollbackStartInfo) func(trace.DatabaseSQLTxRollbackDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
return func(info trace.DatabaseSQLTxRollbackDoneInfo) {
if info.Error == nil {
txs.With(nil).Add(-1)
}
return func(info trace.DatabaseSQLTxRollbackDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
txRollback.With(map[string]string{
"status": errorBrief(info.Error),
}).Inc()
}
}
}
t.OnConnExec = func(info trace.DatabaseSQLConnExecStartInfo) func(trace.DatabaseSQLConnExecDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(1)
}
var (
mode = info.Mode
label = queryLabel(*info.Context)
start = time.Now()
)
return func(info trace.DatabaseSQLConnExecDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(-1)
}
if config.Details()&trace.DatabaseSQLConnEvents != 0 {
status := errorBrief(info.Error)
exec.With(map[string]string{
"status": status,
"query_label": label,
"query_mode": mode,
}).Inc()
execLatency.With(map[string]string{
"status": status,
"query_label": label,
"query_mode": mode,
}).Record(time.Since(start))
}
}
}
t.OnConnQuery = func(info trace.DatabaseSQLConnQueryStartInfo) func(trace.DatabaseSQLConnQueryDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(1)
}
var (
mode = info.Mode
label = queryLabel(*info.Context)
start = time.Now()
)
return func(info trace.DatabaseSQLConnQueryDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(-1)
}
if config.Details()&trace.DatabaseSQLConnEvents != 0 {
status := errorBrief(info.Error)
query.With(map[string]string{
"status": status,
"query_label": label,
"query_mode": mode,
}).Inc()
queryLatency.With(map[string]string{
"status": status,
"query_label": label,
"query_mode": mode,
}).Record(time.Since(start))
}
}
return nil
}
return t
}
Loading

0 comments on commit 0729d76

Please sign in to comment.