From 61ec6cc3d7a4346abc0d7a55aaa09a5e007fdf57 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Sun, 22 Oct 2023 15:50:29 +0300 Subject: [PATCH] * Added `ydb.WithTraceRetry` option --- CHANGELOG.md | 1 + config/config.go | 6 ++ connection.go | 1 + internal/balancer/balancer.go | 39 ++++---- internal/config/config.go | 11 +++ internal/coordination/client.go | 62 ++++++++----- internal/ratelimiter/client.go | 45 ++++++--- internal/scheme/client.go | 39 +++++--- internal/scripting/client.go | 22 +++-- internal/table/client.go | 34 +++---- internal/table/retry.go | 13 +-- internal/topic/topicclientinternal/client.go | 36 ++++--- internal/xsql/connector.go | 20 +++- metrics/retry.go | 4 +- metrics/traces.go | 1 + options.go | 23 ++++- retry/retry.go | 12 +-- retry/retry_test.go | 6 +- retry/sql.go | 16 ++++ retry/sql_test.go | 2 +- sql.go | 1 + tests/integration/with_trace_retry_test.go | 98 ++++++++++++++++++++ tests/slo/database/sql/storage.go | 4 +- tests/slo/gorm/storage.go | 4 +- tests/slo/xorm/storage.go | 4 +- 25 files changed, 367 insertions(+), 137 deletions(-) create mode 100644 tests/integration/with_trace_retry_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c39139c4..14144137b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Added `ydb.WithTraceRetry` option * Bumped `golang.org/x/sync` to `v0.3.0` * Bumped `google.golang.org/protobuf` to `v1.31.0` * Bumped `google.golang.org/grpc` to `v1.57.1` diff --git a/config/config.go b/config/config.go index 20dc44bb6..5cf9a0a09 100644 --- a/config/config.go +++ b/config/config.go @@ -155,6 +155,12 @@ func WithTrace(t trace.Driver, opts ...trace.DriverComposeOption) Option { //nol } } +func WithTraceRetry(t *trace.Retry, opts ...trace.RetryComposeOption) Option { + return func(c *Config) { + config.SetTraceRetry(&c.Common, t, opts...) + } +} + func WithUserAgent(userAgent string) Option { return func(c *Config) { c.metaOptions = append(c.metaOptions, meta.WithUserAgentOption(userAgent)) diff --git a/connection.go b/connection.go index fd2822a1f..28b81788d 100644 --- a/connection.go +++ b/connection.go @@ -427,6 +427,7 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e WithTraceDiscovery(log.Discovery(d.logger, d.loggerDetails, d.loggerOpts...)), WithTraceTopic(log.Topic(d.logger, d.loggerDetails, d.loggerOpts...)), WithTraceDatabaseSQL(log.DatabaseSQL(d.logger, d.loggerDetails, d.loggerOpts...)), + WithTraceRetry(log.Retry(d.logger, d.loggerDetails, d.loggerOpts...)), } { if opt != nil { err = opt(ctx, d) diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index 23c07b710..60f1d4abf 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -63,26 +63,27 @@ func (b *Balancer) OnUpdate(onApplyDiscoveredEndpoints func(ctx context.Context, } func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) { - if err = retry.Retry(ctx, func(childCtx context.Context) (err error) { - if err = b.clusterDiscoveryAttempt(childCtx); err != nil { - if credentials.IsAccessError(err) { - return credentials.AccessError("cluster discovery failed", err, - credentials.WithEndpoint(b.driverConfig.Endpoint()), - credentials.WithDatabase(b.driverConfig.Database()), - credentials.WithCredentials(b.driverConfig.Credentials()), - ) + return retry.Retry(ctx, + func(childCtx context.Context) (err error) { + if err = b.clusterDiscoveryAttempt(childCtx); err != nil { + if credentials.IsAccessError(err) { + return credentials.AccessError("cluster discovery failed", err, + credentials.WithEndpoint(b.driverConfig.Endpoint()), + credentials.WithDatabase(b.driverConfig.Database()), + credentials.WithCredentials(b.driverConfig.Credentials()), + ) + } + // if got err but parent context is not done - mark error as retryable + if ctx.Err() == nil && xerrors.IsTimeoutError(err) { + return xerrors.WithStackTrace(xerrors.Retryable(err)) + } + return xerrors.WithStackTrace(err) } - // if got err but parent context is not done - mark error as retryable - if ctx.Err() == nil && xerrors.IsTimeoutError(err) { - return xerrors.WithStackTrace(xerrors.Retryable(err)) - } - return xerrors.WithStackTrace(err) - } - return nil - }, retry.WithIdempotent(true)); err != nil { - return xerrors.WithStackTrace(err) - } - return nil + return nil + }, + retry.WithIdempotent(true), + retry.WithTrace(b.driverConfig.TraceRetry()), + ) } func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) { diff --git a/internal/config/config.go b/internal/config/config.go index f7a094d7e..dcf89c4b7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,12 +2,15 @@ package config import ( "time" + + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) type Common struct { operationTimeout time.Duration operationCancelAfter time.Duration disableAutoRetry bool + traceRetry trace.Retry panicCallback func(e interface{}) } @@ -41,6 +44,10 @@ func (c *Common) OperationCancelAfter() time.Duration { return c.operationCancelAfter } +func (c *Common) TraceRetry() *trace.Retry { + return &c.traceRetry +} + // SetOperationTimeout define the maximum amount of time a YDB server will process // an operation. After timeout exceeds YDB will try to cancel operation and // regardless of the cancellation appropriate error will be returned to @@ -70,3 +77,7 @@ func SetPanicCallback(c *Common, panicCallback func(e interface{})) { func SetAutoRetry(c *Common, autoRetry bool) { c.disableAutoRetry = !autoRetry } + +func SetTraceRetry(c *Common, t *trace.Retry, opts ...trace.RetryComposeOption) { + c.traceRetry = *c.traceRetry.Compose(t, opts...) +} diff --git a/internal/coordination/client.go b/internal/coordination/client.go index a73a81d45..b709a0f9b 100644 --- a/internal/coordination/client.go +++ b/internal/coordination/client.go @@ -34,7 +34,7 @@ func New(cc grpc.ClientConnInterface, config config.Config) *Client { } } -func (c *Client) CreateNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) { +func (c *Client) CreateNode(ctx context.Context, path string, config coordination.NodeConfig) error { if c == nil { return xerrors.WithStackTrace(errNilClient) } @@ -42,13 +42,17 @@ func (c *Client) CreateNode(ctx context.Context, path string, config coordinatio return xerrors.WithStackTrace(c.createNode(ctx, path, config)) } if !c.config.AutoRetry() { - return call(ctx) + return xerrors.WithStackTrace(call(ctx)) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry(ctx, + call, retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + ) } -func (c *Client) createNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) { - _, err = c.service.CreateNode( +func (c *Client) createNode(ctx context.Context, path string, config coordination.NodeConfig) error { + _, err := c.service.CreateNode( ctx, &Ydb_Coordination.CreateNodeRequest{ Path: path, @@ -71,7 +75,7 @@ func (c *Client) createNode(ctx context.Context, path string, config coordinatio return xerrors.WithStackTrace(err) } -func (c *Client) AlterNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) { +func (c *Client) AlterNode(ctx context.Context, path string, config coordination.NodeConfig) error { if c == nil { return xerrors.WithStackTrace(errNilClient) } @@ -79,13 +83,18 @@ func (c *Client) AlterNode(ctx context.Context, path string, config coordination return xerrors.WithStackTrace(c.alterNode(ctx, path, config)) } if !c.config.AutoRetry() { - return call(ctx) + return xerrors.WithStackTrace(call(ctx)) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry(ctx, + call, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + ) } -func (c *Client) alterNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) { - _, err = c.service.AlterNode( +func (c *Client) alterNode(ctx context.Context, path string, config coordination.NodeConfig) error { + _, err := c.service.AlterNode( ctx, &Ydb_Coordination.AlterNodeRequest{ Path: path, @@ -108,7 +117,7 @@ func (c *Client) alterNode(ctx context.Context, path string, config coordination return xerrors.WithStackTrace(err) } -func (c *Client) DropNode(ctx context.Context, path string) (err error) { +func (c *Client) DropNode(ctx context.Context, path string) error { if c == nil { return xerrors.WithStackTrace(errNilClient) } @@ -116,13 +125,17 @@ func (c *Client) DropNode(ctx context.Context, path string) (err error) { return xerrors.WithStackTrace(c.dropNode(ctx, path)) } if !c.config.AutoRetry() { - return call(ctx) + return xerrors.WithStackTrace(call(ctx)) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + ) } -func (c *Client) dropNode(ctx context.Context, path string) (err error) { - _, err = c.service.DropNode( +func (c *Client) dropNode(ctx context.Context, path string) error { + _, err := c.service.DropNode( ctx, &Ydb_Coordination.DropNodeRequest{ Path: path, @@ -143,22 +156,25 @@ func (c *Client) DescribeNode( ) ( entry *scheme.Entry, config *coordination.NodeConfig, - err error, + _ error, ) { if c == nil { - err = xerrors.WithStackTrace(errNilClient) - return + return nil, nil, xerrors.WithStackTrace(errNilClient) } - call := func(ctx context.Context) error { + call := func(ctx context.Context) (err error) { entry, config, err = c.describeNode(ctx, path) return xerrors.WithStackTrace(err) } if !c.config.AutoRetry() { - err = call(ctx) - return + err := call(ctx) + return entry, config, xerrors.WithStackTrace(err) } - err = retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) - return + err := retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + ) + return entry, config, xerrors.WithStackTrace(err) } // DescribeNode describes a coordination node diff --git a/internal/ratelimiter/client.go b/internal/ratelimiter/client.go index 6b51a21c5..df8286a2b 100644 --- a/internal/ratelimiter/client.go +++ b/internal/ratelimiter/client.go @@ -57,7 +57,11 @@ func (c *Client) CreateResource( if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + ) } func (c *Client) createResource( @@ -100,7 +104,11 @@ func (c *Client) AlterResource( if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + ) } func (c *Client) alterResource( @@ -143,7 +151,11 @@ func (c *Client) DropResource( if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + ) } func (c *Client) dropResource( @@ -169,20 +181,24 @@ func (c *Client) ListResource( coordinationNodePath string, resourcePath string, recursive bool, -) (list []string, err error) { +) (list []string, _ error) { if c == nil { return list, xerrors.WithStackTrace(errNilClient) } - call := func(ctx context.Context) error { + call := func(ctx context.Context) (err error) { list, err = c.listResource(ctx, coordinationNodePath, resourcePath, recursive) return xerrors.WithStackTrace(err) } if !c.config.AutoRetry() { - err = call(ctx) - return + err := call(ctx) + return list, err } - err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace()) - return + err := retry.Retry(ctx, call, + retry.WithIdempotent(true), + retry.WithStackTrace(), + retry.WithTrace(c.config.TraceRetry()), + ) + return list, err } func (c *Client) listResource( @@ -232,7 +248,11 @@ func (c *Client) DescribeResource( err = call(ctx) return } - err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace()) + err = retry.Retry(ctx, call, + retry.WithIdempotent(true), + retry.WithStackTrace(), + retry.WithTrace(c.config.TraceRetry()), + ) return } @@ -295,7 +315,10 @@ func (c *Client) AcquireResource( if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace()) + return retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithTrace(c.config.TraceRetry()), + ) } func (c *Client) acquireResource( diff --git a/internal/scheme/client.go b/internal/scheme/client.go index d345e47e2..a3acaa9ed 100644 --- a/internal/scheme/client.go +++ b/internal/scheme/client.go @@ -52,7 +52,11 @@ func (c *Client) MakeDirectory(ctx context.Context, path string) (err error) { if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + ) } func (c *Client) makeDirectory(ctx context.Context, path string) (err error) { @@ -81,7 +85,11 @@ func (c *Client) RemoveDirectory(ctx context.Context, path string) (err error) { if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + ) } func (c *Client) removeDirectory(ctx context.Context, path string) (err error) { @@ -110,10 +118,14 @@ func (c *Client) ListDirectory(ctx context.Context, path string) (d scheme.Direc } if !c.config.AutoRetry() { err = call(ctx) - return + return d, xerrors.WithStackTrace(err) } - err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace()) - return + err = retry.Retry(ctx, call, + retry.WithIdempotent(true), + retry.WithStackTrace(), + retry.WithTrace(c.config.TraceRetry()), + ) + return d, err } func (c *Client) listDirectory(ctx context.Context, path string) (scheme.Directory, error) { @@ -163,11 +175,12 @@ func (c *Client) DescribePath(ctx context.Context, path string) (e scheme.Entry, err = call(ctx) return } - err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace()) - if err != nil { - return e, xerrors.WithStackTrace(err) - } - return e, nil + err = retry.Retry(ctx, call, + retry.WithIdempotent(true), + retry.WithStackTrace(), + retry.WithTrace(c.config.TraceRetry()), + ) + return e, err } func (c *Client) describePath(ctx context.Context, path string) (e scheme.Entry, err error) { @@ -208,7 +221,11 @@ func (c *Client) ModifyPermissions(ctx context.Context, path string, opts ...sch if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + ) } func (c *Client) modifyPermissions(ctx context.Context, path string, opts ...scheme.PermissionsOption) (err error) { diff --git a/internal/scripting/client.go b/internal/scripting/client.go index b79c2bea1..086c9ee9a 100644 --- a/internal/scripting/client.go +++ b/internal/scripting/client.go @@ -52,8 +52,11 @@ func (c *Client) Execute( err = call(ctx) return } - err = retry.Retry(ctx, call, retry.WithStackTrace()) - return + err = retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithTrace(c.config.TraceRetry()), + ) + return r, xerrors.WithStackTrace(err) } func (c *Client) execute( @@ -120,8 +123,12 @@ func (c *Client) Explain( err = call(ctx) return } - err = retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) - return + err = retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + ) + return e, xerrors.WithStackTrace(err) } func (c *Client) explain( @@ -184,8 +191,11 @@ func (c *Client) StreamExecute( err = call(ctx) return } - err = retry.Retry(ctx, call, retry.WithStackTrace()) - return + err = retry.Retry(ctx, call, + retry.WithStackTrace(), + retry.WithTrace(c.config.TraceRetry()), + ) + return r, xerrors.WithStackTrace(err) } func (c *Client) streamExecute( diff --git a/internal/table/client.go b/internal/table/client.go index 63c2b5102..fa7148f6a 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -252,8 +252,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab append( []retry.Option{ retry.WithIdempotent(true), - retry.WithID("CreateSession"), - retry.WithTrace(trace.Retry{ + retry.WithTrace(&trace.Retry{ OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { onIntermediate := trace.TableOnCreateSession(c.config.Trace(), info.Context) return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { @@ -264,13 +263,10 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab } }, }), - }, retryOptions(c.config.Trace(), opts...).RetryOptions..., + }, c.retryOptions(opts...).RetryOptions..., )..., ) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - return s, nil + return s, xerrors.WithStackTrace(err) } func (c *Client) isClosed() bool { @@ -615,36 +611,28 @@ func (c *Client) Close(ctx context.Context) (err error) { // - deadline was canceled or deadlined // - retry operation returned nil as error // Warning: if deadline without deadline or cancellation func Retry will be worked infinite -func (c *Client) Do(ctx context.Context, op table.Operation, opts ...table.Option) (err error) { +func (c *Client) Do(ctx context.Context, op table.Operation, opts ...table.Option) error { if c == nil { return xerrors.WithStackTrace(errNilClient) } if c.isClosed() { return xerrors.WithStackTrace(errClosedClient) } - return do( - ctx, - c, - c.config, - op, - retryOptions(c.config.Trace(), opts...), - ) + err := do(ctx, c, c.config, op, c.retryOptions(opts...)) + if err != nil { + return xerrors.WithStackTrace(err) + } + return nil } -func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.Option) (err error) { +func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.Option) error { if c == nil { return xerrors.WithStackTrace(errNilClient) } if c.isClosed() { return xerrors.WithStackTrace(errClosedClient) } - err = doTx( - ctx, - c, - c.config, - op, - retryOptions(c.config.Trace(), opts...), - ) + err := doTx(ctx, c, c.config, op, c.retryOptions(opts...)) if err != nil { return xerrors.WithStackTrace(err) } diff --git a/internal/table/retry.go b/internal/table/retry.go index 764764bd6..7bd6d79a9 100644 --- a/internal/table/retry.go +++ b/internal/table/retry.go @@ -3,7 +3,6 @@ package table import ( "context" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/retry" @@ -57,7 +56,7 @@ func doTx( defer func() { onIntermediate(err)(attempts, err) }() - err = retryBackoff(ctx, c, + return retryBackoff(ctx, c, func(ctx context.Context, s table.Session) (err error) { attempts++ @@ -108,10 +107,6 @@ func doTx( }, opts.RetryOptions..., ) - if err != nil { - return xerrors.WithStackTrace(err) - } - return nil } func do( @@ -188,14 +183,14 @@ func retryBackoff( ) } -func retryOptions(trace *trace.Table, opts ...table.Option) *table.Options { +func (c *Client) retryOptions(opts ...table.Option) *table.Options { options := &table.Options{ - Trace: trace, + Trace: c.config.Trace(), TxSettings: table.TxSettings( table.WithSerializableReadWrite(), ), RetryOptions: []retry.Option{ - retry.WithID(stack.Record(1, stack.Lambda(false), stack.FileName(false))), + retry.WithTrace(c.config.TraceRetry()), }, } for _, opt := range opts { diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index 6b2c44f50..535dbc954 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -12,6 +12,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreaderinternal" "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicwriterinternal" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" @@ -76,10 +77,13 @@ func (c *Client) Alter(ctx context.Context, path string, opts ...topicoptions.Al return alterErr } - if c.cfg.AutoRetry() { - return retry.Retry(ctx, call, retry.WithIdempotent(true)) + if !c.cfg.AutoRetry() { + return xerrors.WithStackTrace(call(ctx)) } - return call(ctx) + return retry.Retry(ctx, call, + retry.WithIdempotent(true), + retry.WithTrace(c.cfg.TraceRetry()), + ) } // Create new topic @@ -103,10 +107,13 @@ func (c *Client) Create( return createErr } - if c.cfg.AutoRetry() { - return retry.Retry(ctx, call, retry.WithIdempotent(true)) + if !c.cfg.AutoRetry() { + return xerrors.WithStackTrace(call(ctx)) } - return call(ctx) + return retry.Retry(ctx, call, + retry.WithIdempotent(true), + retry.WithTrace(c.cfg.TraceRetry()), + ) } // Describe topic @@ -136,9 +143,12 @@ func (c *Client) Describe( var err error if c.cfg.AutoRetry() { - err = retry.Retry(ctx, call, retry.WithIdempotent(true)) + err = xerrors.WithStackTrace(call(ctx)) } else { - err = call(ctx) + err = retry.Retry(ctx, call, + retry.WithIdempotent(true), + retry.WithTrace(c.cfg.TraceRetry()), + ) } if err != nil { @@ -166,11 +176,13 @@ func (c *Client) Drop(ctx context.Context, path string, opts ...topicoptions.Dro return removeErr } - if c.cfg.AutoRetry() { - return retry.Retry(ctx, call, retry.WithIdempotent(true)) + if !c.cfg.AutoRetry() { + return xerrors.WithStackTrace(call(ctx)) } - - return call(ctx) + return retry.Retry(ctx, call, + retry.WithIdempotent(true), + retry.WithTrace(c.cfg.TraceRetry()), + ) } // StartReader create new topic reader and start pull messages from server diff --git a/internal/xsql/connector.go b/internal/xsql/connector.go index 9735d3109..b81bcf8bd 100644 --- a/internal/xsql/connector.go +++ b/internal/xsql/connector.go @@ -150,6 +150,19 @@ func WithOnClose(f func(connector *Connector)) ConnectorOption { return onCloseConnectorOption(f) } +type traceRetryConnectorOption struct { + t *trace.Retry +} + +func (t traceRetryConnectorOption) Apply(c *Connector) error { + c.traceRetry = t.t + return nil +} + +func WithTraceRetry(t *trace.Retry) ConnectorOption { + return traceRetryConnectorOption{t: t} +} + type fakeTxConnectorOption QueryMode func (m fakeTxConnectorOption) Apply(c *Connector) error { @@ -221,7 +234,8 @@ type Connector struct { disableServerBalancer bool idleThreshold time.Duration - trace *trace.DatabaseSQL + trace *trace.DatabaseSQL + traceRetry *trace.Retry } var ( @@ -312,6 +326,10 @@ type driverWrapper struct { c *Connector } +func (d *driverWrapper) TraceRetry() *trace.Retry { + return d.c.traceRetry +} + func (d *driverWrapper) Open(_ string) (driver.Conn, error) { return nil, ErrUnsupported } diff --git a/metrics/retry.go b/metrics/retry.go index a0eaca54f..8d3a059ff 100644 --- a/metrics/retry.go +++ b/metrics/retry.go @@ -4,7 +4,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) -// Retry makes table.RetryTrace with New publishing -func Retry(config Config) (t trace.Retry) { +// retry makes table.RetryTrace with New publishing +func retry(config Config) (t trace.Retry) { return t } diff --git a/metrics/traces.go b/metrics/traces.go index a601b5bcc..5fc978606 100644 --- a/metrics/traces.go +++ b/metrics/traces.go @@ -18,5 +18,6 @@ func WithTraces(config Config) ydb.Option { ydb.WithTraceRatelimiter(ratelimiter(config)), ydb.WithTraceDiscovery(discovery(config)), ydb.WithTraceDatabaseSQL(databaseSQL(config)), + ydb.WithTraceRetry(retry(config)), ) } diff --git a/options.go b/options.go index 1f92d4ce0..c6e14cfa9 100644 --- a/options.go +++ b/options.go @@ -249,10 +249,25 @@ func WithDiscoveryInterval(discoveryInterval time.Duration) Option { } } -// WithTraceDriver returns deadline which has associated Driver with it. -func WithTraceDriver(trace trace.Driver, opts ...trace.DriverComposeOption) Option { //nolint:gocritic +// WithTraceDriver appends trace.Driver into driver traces +func WithTraceDriver(t trace.Driver, opts ...trace.DriverComposeOption) Option { //nolint:gocritic return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithTrace(trace, opts...)) + c.options = append(c.options, config.WithTrace(t, opts...)) + return nil + } +} + +// WithTraceRetry appends trace.Retry into retry traces +func WithTraceRetry(t trace.Retry, opts ...trace.RetryComposeOption) Option { + return func(ctx context.Context, c *Driver) error { + c.options = append(c.options, + config.WithTraceRetry(&t, append( + []trace.RetryComposeOption{ + trace.WithRetryPanicCallback(c.panicCallback), + }, + opts..., + )...), + ) return nil } } @@ -394,7 +409,7 @@ func WithPanicCallback(panicCallback func(e interface{})) Option { } } -// WithTraceTable returns table trace option +// WithTraceTable appends trace.Table into table traces func WithTraceTable(t trace.Table, opts ...trace.TableComposeOption) Option { //nolint:gocritic return func(ctx context.Context, c *Driver) error { c.tableOptions = append( diff --git a/retry/retry.go b/retry/retry.go index 2c2589a07..2cada7b09 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -76,24 +76,24 @@ func WithStackTrace() stackTraceOption { var _ Option = traceOption{} type traceOption struct { - trace *trace.Retry + t *trace.Retry } func (t traceOption) ApplyRetryOption(opts *retryOptions) { - opts.trace = t.trace + opts.trace = opts.trace.Compose(t.t) } func (t traceOption) ApplyDoOption(opts *doOptions) { - opts.retryOptions = append(opts.retryOptions, WithTrace(*t.trace)) + opts.retryOptions = append(opts.retryOptions, WithTrace(t.t)) } func (t traceOption) ApplyDoTxOption(opts *doTxOptions) { - opts.retryOptions = append(opts.retryOptions, WithTrace(*t.trace)) + opts.retryOptions = append(opts.retryOptions, WithTrace(t.t)) } // WithTrace returns trace option -func WithTrace(trace trace.Retry) traceOption { - return traceOption{trace: &trace} +func WithTrace(t *trace.Retry) traceOption { + return traceOption{t: t} } var _ Option = idempotentOption(false) diff --git a/retry/retry_test.go b/retry/retry_test.go index f467b3dcc..bb9818775 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -121,7 +121,7 @@ func TestRetryWithCustomErrors(t *testing.T) { } { t.Run(tt.error.Error(), func(t *testing.T) { i := 0 - err := Retry(ctx, func(ctx context.Context) (err error) { + err := Retry(ctx, func(ctx context.Context) error { i++ if i < limit { return tt.error @@ -149,7 +149,7 @@ func TestRetryTransportDeadlineExceeded(t *testing.T) { } { counter := 0 ctx, cancel := xcontext.WithTimeout(context.Background(), time.Hour) - err := Retry(ctx, func(ctx context.Context) (err error) { + err := Retry(ctx, func(ctx context.Context) error { counter++ if !(counter < cancelCounterValue) { cancel() @@ -169,7 +169,7 @@ func TestRetryTransportCancelled(t *testing.T) { } { counter := 0 ctx, cancel := xcontext.WithCancel(context.Background()) - err := Retry(ctx, func(ctx context.Context) (err error) { + err := Retry(ctx, func(ctx context.Context) error { counter++ if !(counter < cancelCounterValue) { cancel() diff --git a/retry/sql.go b/retry/sql.go index f6e6dd3c9..8df0ad1cd 100644 --- a/retry/sql.go +++ b/retry/sql.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) type doOptions struct { @@ -40,6 +41,13 @@ func Do(ctx context.Context, db *sql.DB, f func(ctx context.Context, cc *sql.Con options = doOptions{} attempts = 0 ) + if tracer, has := db.Driver().(interface { + TraceRetry() *trace.Retry + }); has { + options.retryOptions = append(options.retryOptions, nil) + copy(options.retryOptions[1:], options.retryOptions) + options.retryOptions[0] = WithTrace(tracer.TraceRetry()) + } for _, opt := range opts { if opt != nil { opt.ApplyDoOption(&options) @@ -112,6 +120,7 @@ func WithTxOptions(txOptions *sql.TxOptions) txOptionsOption { func DoTx(ctx context.Context, db *sql.DB, f func(context.Context, *sql.Tx) error, opts ...doTxOption) error { var ( options = doTxOptions{ + retryOptions: []Option{}, txOptions: &sql.TxOptions{ Isolation: sql.LevelDefault, ReadOnly: false, @@ -119,6 +128,13 @@ func DoTx(ctx context.Context, db *sql.DB, f func(context.Context, *sql.Tx) erro } attempts = 0 ) + if tracer, has := db.Driver().(interface { + TraceRetry() *trace.Retry + }); has { + options.retryOptions = append(options.retryOptions, nil) + copy(options.retryOptions[1:], options.retryOptions) + options.retryOptions[0] = WithTrace(tracer.TraceRetry()) + } for _, opt := range opts { if opt != nil { opt.ApplyDoTxOption(&options) diff --git a/retry/sql_test.go b/retry/sql_test.go index 2b968fb33..6b91367ab 100644 --- a/retry/sql_test.go +++ b/retry/sql_test.go @@ -196,7 +196,7 @@ func TestDoTx(t *testing.T) { WithIdempotent(bool(idempotentType)), WithFastBackoff(backoff.New(backoff.WithSlotDuration(time.Nanosecond))), WithSlowBackoff(backoff.New(backoff.WithSlotDuration(time.Nanosecond))), - WithTrace(trace.Retry{ + WithTrace(&trace.Retry{ //nolint:lll OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { t.Logf("attempt %d, conn %d, mode: %+v", attempts, m.conns, Check(m.queryErr)) diff --git a/sql.go b/sql.go index 31638ee49..eac5314ed 100644 --- a/sql.go +++ b/sql.go @@ -164,6 +164,7 @@ func Connector(parent *Driver, opts ...ConnectorOption) (SQLConnector, error) { opts..., ), xsql.WithOnClose(d.detach), + xsql.WithTraceRetry(parent.config.TraceRetry()), )..., ) if err != nil { diff --git a/tests/integration/with_trace_retry_test.go b/tests/integration/with_trace_retry_test.go new file mode 100644 index 000000000..0d27ecf06 --- /dev/null +++ b/tests/integration/with_trace_retry_test.go @@ -0,0 +1,98 @@ +//go:build integration +// +build integration + +package integration + +import ( + "context" + "database/sql" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +func TestWithTraceRetry(t *testing.T) { + ctx := xtest.Context(t) + + t.Run("table", func(t *testing.T) { + var ( + retryCalled = make(map[string]bool, 2) + scope = newScope(t) + db = scope.Driver( + ydb.WithTraceRetry(trace.Retry{ + OnRetry: func( + info trace.RetryLoopStartInfo, + ) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { + retryCalled[info.ID] = true + return nil + }, + }), + ) + ) + + require.NoError(t, db.Table().Do(ctx, + func(ctx context.Context, s table.Session) error { + return nil + }, + table.WithID("db.Table().Do"), + )) + + require.NoError(t, db.Table().DoTx(ctx, + func(ctx context.Context, tx table.TransactionActor) error { + return nil + }, + table.WithID("db.Table().DoTx"), + )) + + for _, key := range []string{ + "db.Table().Do", + "db.Table().DoTx", + } { + require.True(t, retryCalled[key], key) + } + }) + + t.Run("database/sql", func(t *testing.T) { + var ( + retryCalled = make(map[string]bool, 2) + scope = newScope(t) + nativeDb = scope.Driver( + ydb.WithTraceRetry(trace.Retry{ + OnRetry: func( + info trace.RetryLoopStartInfo, + ) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { + retryCalled[info.ID] = true + return nil + }, + }), + ) + db = sql.OpenDB(ydb.MustConnector(nativeDb)) + ) + require.NoError(t, retry.Do(ctx, db, + func(ctx context.Context, cc *sql.Conn) error { + return nil + }, + retry.WithID("retry.Do"), + )) + + require.NoError(t, retry.DoTx(ctx, db, + func(ctx context.Context, tx *sql.Tx) error { + return nil + }, + retry.WithID("retry.DoTx"), + )) + + for _, key := range []string{ + "retry.Do", + "retry.DoTx", + } { + require.True(t, retryCalled[key], key) + } + }) +} diff --git a/tests/slo/database/sql/storage.go b/tests/slo/database/sql/storage.go index 0fcd90d82..d757920eb 100755 --- a/tests/slo/database/sql/storage.go +++ b/tests/slo/database/sql/storage.go @@ -137,7 +137,7 @@ func (s *Storage) Read(ctx context.Context, entryID generator.RowID) (res genera }, retry.WithIdempotent(true), retry.WithTrace( - trace.Retry{ + &trace.Retry{ OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopDoneInfo) { @@ -177,7 +177,7 @@ func (s *Storage) Write(ctx context.Context, e generator.Row) (attempts int, err }, retry.WithIdempotent(true), retry.WithTrace( - trace.Retry{ + &trace.Retry{ OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopDoneInfo) { diff --git a/tests/slo/gorm/storage.go b/tests/slo/gorm/storage.go index 2b12dad37..2481014f1 100644 --- a/tests/slo/gorm/storage.go +++ b/tests/slo/gorm/storage.go @@ -109,7 +109,7 @@ func (s *Storage) Read(ctx context.Context, id generator.RowID) (r generator.Row }, retry.WithIdempotent(true), retry.WithTrace( - trace.Retry{ + &trace.Retry{ OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopDoneInfo) { @@ -157,7 +157,7 @@ func (s *Storage) Write(ctx context.Context, row generator.Row) (attempts int, e }, retry.WithIdempotent(true), retry.WithTrace( - trace.Retry{ + &trace.Retry{ OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopDoneInfo) { diff --git a/tests/slo/xorm/storage.go b/tests/slo/xorm/storage.go index daabe8320..b6bc813df 100644 --- a/tests/slo/xorm/storage.go +++ b/tests/slo/xorm/storage.go @@ -140,7 +140,7 @@ func (s *Storage) Read(ctx context.Context, id generator.RowID) (row generator.R }, retry.WithIdempotent(true), retry.WithTrace( - trace.Retry{ + &trace.Retry{ OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopDoneInfo) { @@ -174,7 +174,7 @@ func (s *Storage) Write(ctx context.Context, row generator.Row) (attempts int, e }, retry.WithIdempotent(true), retry.WithTrace( - trace.Retry{ + &trace.Retry{ OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { return func(info trace.RetryLoopDoneInfo) {