Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

* Refactored traces and metrics #860

Merged
merged 1 commit into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Refactored traces and metrics
* Renamed `{retry,table}.WithID` option to `{retry,table}.WithLabel`
* Added `ydb.WithTraceRetry` option
* Moved `internal/allocator.Buffers` to package `internal/xstring`
Expand Down
71 changes: 48 additions & 23 deletions metrics/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,34 @@ import (

// driver makes driver with New publishing
func driver(config Config) (t trace.Driver) {
config = config.WithSystem("driver")
endpoints := config.WithSystem("balancer").GaugeVec("endpoints", "local_dc", "az")
balancersDiscoveries := config.WithSystem("balancer").CounterVec("discoveries", "status", "cause")
balancerUpdates := config.WithSystem("balancer").CounterVec("updates", "cause")
conns := config.GaugeVec("conns", "endpoint", "node_id")
banned := config.WithSystem("conn").GaugeVec("banned", "endpoint", "node_id", "cause")
requests := config.WithSystem("conn").CounterVec("requests", "status", "method", "endpoint", "node_id")
tli := config.CounterVec("transaction_locks_invalidated")

type endpointKey struct {
localDC bool
az string
}

config = config.WithSystem("driver")
endpoints := config.WithSystem("balancer").GaugeVec("endpoints", "local_dc", "az")
balancerUpdates := config.WithSystem("balancer").CounterVec("updates", "force")
conns := config.GaugeVec("conns", "address", "node_id")
banned := config.WithSystem("conn").GaugeVec("banned", "address", "node_id", "cause")
requests := config.WithSystem("conn").CounterVec("requests", "status", "method")
tli := config.CounterVec("transaction_locks_invalidated")
knownEndpoints := make(map[endpointKey]struct{})

t.OnConnInvoke = func(info trace.DriverConnInvokeStartInfo) func(trace.DriverConnInvokeDoneInfo) {
method := info.Method
var (
method = info.Method
endpoint = info.Endpoint.Address()
nodeID = info.Endpoint.NodeID()
)
return func(info trace.DriverConnInvokeDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
requests.With(map[string]string{
"status": errorBrief(info.Error),
"method": string(method),
"status": errorBrief(info.Error),
"method": string(method),
"endpoint": endpoint,
"node_id": strconv.FormatUint(uint64(nodeID), 10),
}).Inc()
if xerrors.IsOperationErrorTransactionLocksInvalidated(info.Error) {
tli.With(nil).Inc()
Expand All @@ -42,13 +50,19 @@ func driver(config Config) (t trace.Driver) {
) func(
trace.DriverConnNewStreamDoneInfo,
) {
method := info.Method
var (
method = info.Method
endpoint = info.Endpoint.Address()
nodeID = info.Endpoint.NodeID()
)
return func(info trace.DriverConnNewStreamRecvInfo) func(trace.DriverConnNewStreamDoneInfo) {
return func(info trace.DriverConnNewStreamDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
requests.With(map[string]string{
"status": errorBrief(info.Error),
"method": string(method),
"status": errorBrief(info.Error),
"method": string(method),
"endpoint": endpoint,
"node_id": strconv.FormatUint(uint64(nodeID), 10),
}).Inc()
}
}
Expand All @@ -57,19 +71,30 @@ func driver(config Config) (t trace.Driver) {
t.OnConnBan = func(info trace.DriverConnBanStartInfo) func(trace.DriverConnBanDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
banned.With(map[string]string{
"address": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
"cause": errorBrief(info.Cause),
"endpoint": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
"cause": errorBrief(info.Cause),
}).Add(1)
}
return nil
}
t.OnBalancerClusterDiscoveryAttempt = func(info trace.DriverBalancerClusterDiscoveryAttemptStartInfo) func(
trace.DriverBalancerClusterDiscoveryAttemptDoneInfo,
) {
eventType := repeater.EventType(*info.Context)
return func(info trace.DriverBalancerClusterDiscoveryAttemptDoneInfo) {
balancersDiscoveries.With(map[string]string{
"status": errorBrief(info.Error),
"cause": eventType,
}).Inc()
}
}
t.OnBalancerUpdate = func(info trace.DriverBalancerUpdateStartInfo) func(trace.DriverBalancerUpdateDoneInfo) {
eventType := repeater.EventType(*info.Context)
return func(info trace.DriverBalancerUpdateDoneInfo) {
if config.Details()&trace.DriverBalancerEvents != 0 {
balancerUpdates.With(map[string]string{
"force": strconv.FormatBool(eventType == repeater.EventForce),
"cause": eventType,
}).Inc()
newEndpoints := make(map[endpointKey]int, len(info.Endpoints))
for _, e := range info.Endpoints {
Expand Down Expand Up @@ -99,14 +124,14 @@ func driver(config Config) (t trace.Driver) {
}
}
t.OnConnDial = func(info trace.DriverConnDialStartInfo) func(trace.DriverConnDialDoneInfo) {
address := info.Endpoint.Address()
endpoint := info.Endpoint.Address()
nodeID := info.Endpoint.NodeID()
return func(info trace.DriverConnDialDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
if info.Error == nil {
conns.With(map[string]string{
"address": address,
"node_id": idToString(nodeID),
"endpoint": endpoint,
"node_id": idToString(nodeID),
}).Add(1)
}
}
Expand All @@ -115,8 +140,8 @@ func driver(config Config) (t trace.Driver) {
t.OnConnClose = func(info trace.DriverConnCloseStartInfo) func(trace.DriverConnCloseDoneInfo) {
if config.Details()&trace.DriverConnEvents != 0 {
conns.With(map[string]string{
"address": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
"endpoint": info.Endpoint.Address(),
"node_id": idToString(info.Endpoint.NodeID()),
}).Add(-1)
}
return nil
Expand Down
38 changes: 38 additions & 0 deletions metrics/retry.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,47 @@
package metrics

import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

func retry(config Config) (t trace.Retry) {
config = config.WithSystem("retry")
errs := config.CounterVec("errors", "status", "retry_label", "final")
attempts := config.HistogramVec("attempts", []float64{0, 1, 2, 3, 4, 5, 7, 10}, "retry_label")
latency := config.TimerVec("latency", "status", "retry_label")
t.OnRetry = func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
label := info.Label
if label == "" {
return nil
}
start := time.Now()
return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
if info.Error != nil && config.Details()&trace.RetryEvents != 0 {
errs.With(map[string]string{
"status": errorBrief(info.Error),
"retry_label": label,
"final": "false",
}).Inc()
}
return func(info trace.RetryLoopDoneInfo) {
if config.Details()&trace.RetryEvents != 0 {
attempts.With(map[string]string{
"retry_label": label,
}).Record(float64(info.Attempts))
errs.With(map[string]string{
"status": errorBrief(info.Error),
"retry_label": label,
"final": "true",
}).Inc()
latency.With(map[string]string{
"status": errorBrief(info.Error),
"retry_label": label,
}).Record(time.Since(start))
}
}
}
}
return t
}
89 changes: 73 additions & 16 deletions metrics/sql.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
package metrics

import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

// databaseSQL makes trace.DatabaseSQL with measuring `database/sql` events
func databaseSQL(config Config) (t trace.DatabaseSQL) {
config = config.WithSystem("database").WithSystem("sql")
conns := config.GaugeVec("conns")
txs := config.GaugeVec("txs")
inflight := config.WithSystem("conns").GaugeVec("inflight")
query := config.CounterVec("query", "status", "query_mode")
queryLatency := config.WithSystem("query").TimerVec("latency", "status", "query_mode")
exec := config.CounterVec("exec", "status", "query_label", "query_mode")
execLatency := config.WithSystem("exec").TimerVec("latency", "status", "query_mode")
txBegin := config.WithSystem("tx").CounterVec("begin", "status")
txCommit := config.WithSystem("tx").CounterVec("commit", "status")
txRollback := config.WithSystem("tx").CounterVec("rollback", "status")
t.OnConnectorConnect = func(info trace.DatabaseSQLConnectorConnectStartInfo) func(
trace.DatabaseSQLConnectorConnectDoneInfo,
) {
Expand All @@ -32,32 +41,80 @@ func databaseSQL(config Config) (t trace.DatabaseSQL) {
t.OnConnBegin = func(info trace.DatabaseSQLConnBeginStartInfo) func(trace.DatabaseSQLConnBeginDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
return func(info trace.DatabaseSQLConnBeginDoneInfo) {
if info.Tx != nil {
txs.With(nil).Add(1)
}
txBegin.With(map[string]string{
"status": errorBrief(info.Error),
}).Inc()
}
}
return nil
}
t.OnTxCommit = func(info trace.DatabaseSQLTxCommitStartInfo) func(trace.DatabaseSQLTxCommitDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
return func(info trace.DatabaseSQLTxCommitDoneInfo) {
if info.Error == nil {
txs.With(nil).Add(-1)
}
return func(info trace.DatabaseSQLTxCommitDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
txCommit.With(map[string]string{
"status": errorBrief(info.Error),
}).Inc()
}
}
return nil
}
t.OnTxRollback = func(info trace.DatabaseSQLTxRollbackStartInfo) func(trace.DatabaseSQLTxRollbackDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
return func(info trace.DatabaseSQLTxRollbackDoneInfo) {
if info.Error == nil {
txs.With(nil).Add(-1)
}
return func(info trace.DatabaseSQLTxRollbackDoneInfo) {
if config.Details()&trace.DatabaseSQLTxEvents != 0 {
txRollback.With(map[string]string{
"status": errorBrief(info.Error),
}).Inc()
}
}
}
t.OnConnExec = func(info trace.DatabaseSQLConnExecStartInfo) func(trace.DatabaseSQLConnExecDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(1)
}
var (
mode = info.Mode
start = time.Now()
)
return func(info trace.DatabaseSQLConnExecDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(-1)
}
if config.Details()&trace.DatabaseSQLConnEvents != 0 {
status := errorBrief(info.Error)
exec.With(map[string]string{
"status": status,
"query_mode": mode,
}).Inc()
execLatency.With(map[string]string{
"status": status,
"query_mode": mode,
}).Record(time.Since(start))
}
}
}
t.OnConnQuery = func(info trace.DatabaseSQLConnQueryStartInfo) func(trace.DatabaseSQLConnQueryDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(1)
}
var (
mode = info.Mode
start = time.Now()
)
return func(info trace.DatabaseSQLConnQueryDoneInfo) {
if config.Details()&trace.DatabaseSQLEvents != 0 {
inflight.With(nil).Add(-1)
}
if config.Details()&trace.DatabaseSQLConnEvents != 0 {
status := errorBrief(info.Error)
query.With(map[string]string{
"status": status,
"query_mode": mode,
}).Inc()
queryLatency.With(map[string]string{
"status": status,
"query_mode": mode,
}).Record(time.Since(start))
}
}
return nil
}
return t
}
70 changes: 0 additions & 70 deletions metrics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,81 +17,11 @@ func table(config Config) (t trace.Table) {
wait := config.WithSystem("pool").GaugeVec("wait")
waitLatency := config.WithSystem("pool").WithSystem("wait").TimerVec("latency")
alive := config.GaugeVec("sessions", "node_id")
doAttempts := config.WithSystem("do").HistogramVec("attempts", []float64{0, 1, 2, 5, 10}, "name")
doErrors := config.WithSystem("do").CounterVec("errors", "status", "name")
doIntermediateErrors := config.WithSystem("do").WithSystem("intermediate").CounterVec("errors", "status", "name")
doLatency := config.WithSystem("do").TimerVec("latency", "status", "name")
doTxAttempts := config.WithSystem("doTx").HistogramVec("attempts", []float64{0, 1, 2, 5, 10}, "name")
doTxIntermediateErrors := config.WithSystem("doTx").WithSystem("intermediate").CounterVec("errors", "status", "name")
doTxErrors := config.WithSystem("doTx").CounterVec("errors", "status", "name")
doTxLatency := config.WithSystem("doTx").TimerVec("latency", "status", "name")
t.OnInit = func(info trace.TableInitStartInfo) func(trace.TableInitDoneInfo) {
return func(info trace.TableInitDoneInfo) {
limit.With(nil).Set(float64(info.Limit))
}
}
t.OnDo = func(info trace.TableDoStartInfo) func(
info trace.TableDoIntermediateInfo,
) func(
trace.TableDoDoneInfo,
) {
var (
label = info.Label
start = time.Now()
)
return func(info trace.TableDoIntermediateInfo) func(trace.TableDoDoneInfo) {
if info.Error != nil && config.Details()&trace.TableEvents != 0 {
doIntermediateErrors.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Inc()
}
return func(info trace.TableDoDoneInfo) {
if config.Details()&trace.TableEvents != 0 {
doAttempts.With(nil).Record(float64(info.Attempts))
doErrors.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Inc()
doLatency.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Record(time.Since(start))
}
}
}
}
t.OnDoTx = func(info trace.TableDoTxStartInfo) func(
info trace.TableDoTxIntermediateInfo,
) func(
trace.TableDoTxDoneInfo,
) {
var (
label = info.Label
start = time.Now()
)
return func(info trace.TableDoTxIntermediateInfo) func(trace.TableDoTxDoneInfo) {
if info.Error != nil && config.Details()&trace.TableEvents != 0 {
doTxIntermediateErrors.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Inc()
}
return func(info trace.TableDoTxDoneInfo) {
if config.Details()&trace.TableEvents != 0 {
doTxAttempts.With(nil).Record(float64(info.Attempts))
doTxErrors.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Inc()
doTxLatency.With(map[string]string{
"status": errorBrief(info.Error),
"label": label,
}).Record(time.Since(start))
}
}
}
}
t.OnSessionNew = func(info trace.TableSessionNewStartInfo) func(trace.TableSessionNewDoneInfo) {
return func(info trace.TableSessionNewDoneInfo) {
if info.Error == nil && config.Details()&trace.TableSessionEvents != 0 {
Expand Down
Loading