Skip to content

Commit

Permalink
* Added ydb.WithTraceRetry option
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Oct 23, 2023
1 parent e1a1bb5 commit 61ec6cc
Show file tree
Hide file tree
Showing 25 changed files with 367 additions and 137 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Added `ydb.WithTraceRetry` option
* Bumped `golang.org/x/sync` to `v0.3.0`
* Bumped `google.golang.org/protobuf` to `v1.31.0`
* Bumped `google.golang.org/grpc` to `v1.57.1`
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func WithTrace(t trace.Driver, opts ...trace.DriverComposeOption) Option { //nol
}
}

func WithTraceRetry(t *trace.Retry, opts ...trace.RetryComposeOption) Option {
return func(c *Config) {
config.SetTraceRetry(&c.Common, t, opts...)
}
}

func WithUserAgent(userAgent string) Option {
return func(c *Config) {
c.metaOptions = append(c.metaOptions, meta.WithUserAgentOption(userAgent))
Expand Down
1 change: 1 addition & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e
WithTraceDiscovery(log.Discovery(d.logger, d.loggerDetails, d.loggerOpts...)),
WithTraceTopic(log.Topic(d.logger, d.loggerDetails, d.loggerOpts...)),
WithTraceDatabaseSQL(log.DatabaseSQL(d.logger, d.loggerDetails, d.loggerOpts...)),
WithTraceRetry(log.Retry(d.logger, d.loggerDetails, d.loggerOpts...)),
} {
if opt != nil {
err = opt(ctx, d)
Expand Down
39 changes: 20 additions & 19 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,27 @@ func (b *Balancer) OnUpdate(onApplyDiscoveredEndpoints func(ctx context.Context,
}

func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
if err = retry.Retry(ctx, func(childCtx context.Context) (err error) {
if err = b.clusterDiscoveryAttempt(childCtx); err != nil {
if credentials.IsAccessError(err) {
return credentials.AccessError("cluster discovery failed", err,
credentials.WithEndpoint(b.driverConfig.Endpoint()),
credentials.WithDatabase(b.driverConfig.Database()),
credentials.WithCredentials(b.driverConfig.Credentials()),
)
return retry.Retry(ctx,
func(childCtx context.Context) (err error) {
if err = b.clusterDiscoveryAttempt(childCtx); err != nil {
if credentials.IsAccessError(err) {
return credentials.AccessError("cluster discovery failed", err,
credentials.WithEndpoint(b.driverConfig.Endpoint()),
credentials.WithDatabase(b.driverConfig.Database()),
credentials.WithCredentials(b.driverConfig.Credentials()),
)
}
// if got err but parent context is not done - mark error as retryable
if ctx.Err() == nil && xerrors.IsTimeoutError(err) {
return xerrors.WithStackTrace(xerrors.Retryable(err))
}
return xerrors.WithStackTrace(err)
}
// if got err but parent context is not done - mark error as retryable
if ctx.Err() == nil && xerrors.IsTimeoutError(err) {
return xerrors.WithStackTrace(xerrors.Retryable(err))
}
return xerrors.WithStackTrace(err)
}
return nil
}, retry.WithIdempotent(true)); err != nil {
return xerrors.WithStackTrace(err)
}
return nil
return nil
},
retry.WithIdempotent(true),
retry.WithTrace(b.driverConfig.TraceRetry()),
)
}

func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
Expand Down
11 changes: 11 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package config

import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

type Common struct {
operationTimeout time.Duration
operationCancelAfter time.Duration
disableAutoRetry bool
traceRetry trace.Retry

panicCallback func(e interface{})
}
Expand Down Expand Up @@ -41,6 +44,10 @@ func (c *Common) OperationCancelAfter() time.Duration {
return c.operationCancelAfter
}

func (c *Common) TraceRetry() *trace.Retry {
return &c.traceRetry
}

// SetOperationTimeout define the maximum amount of time a YDB server will process
// an operation. After timeout exceeds YDB will try to cancel operation and
// regardless of the cancellation appropriate error will be returned to
Expand Down Expand Up @@ -70,3 +77,7 @@ func SetPanicCallback(c *Common, panicCallback func(e interface{})) {
func SetAutoRetry(c *Common, autoRetry bool) {
c.disableAutoRetry = !autoRetry
}

func SetTraceRetry(c *Common, t *trace.Retry, opts ...trace.RetryComposeOption) {
c.traceRetry = *c.traceRetry.Compose(t, opts...)
}
62 changes: 39 additions & 23 deletions internal/coordination/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,25 @@ func New(cc grpc.ClientConnInterface, config config.Config) *Client {
}
}

func (c *Client) CreateNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) {
func (c *Client) CreateNode(ctx context.Context, path string, config coordination.NodeConfig) error {
if c == nil {
return xerrors.WithStackTrace(errNilClient)
}
call := func(ctx context.Context) error {
return xerrors.WithStackTrace(c.createNode(ctx, path, config))
}
if !c.config.AutoRetry() {
return call(ctx)
return xerrors.WithStackTrace(call(ctx))
}
return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true))
return retry.Retry(ctx,
call, retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
)
}

func (c *Client) createNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) {
_, err = c.service.CreateNode(
func (c *Client) createNode(ctx context.Context, path string, config coordination.NodeConfig) error {
_, err := c.service.CreateNode(
ctx,
&Ydb_Coordination.CreateNodeRequest{
Path: path,
Expand All @@ -71,21 +75,26 @@ func (c *Client) createNode(ctx context.Context, path string, config coordinatio
return xerrors.WithStackTrace(err)
}

func (c *Client) AlterNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) {
func (c *Client) AlterNode(ctx context.Context, path string, config coordination.NodeConfig) error {
if c == nil {
return xerrors.WithStackTrace(errNilClient)
}
call := func(ctx context.Context) error {
return xerrors.WithStackTrace(c.alterNode(ctx, path, config))
}
if !c.config.AutoRetry() {
return call(ctx)
return xerrors.WithStackTrace(call(ctx))
}
return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true))
return retry.Retry(ctx,
call,
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
)
}

func (c *Client) alterNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) {
_, err = c.service.AlterNode(
func (c *Client) alterNode(ctx context.Context, path string, config coordination.NodeConfig) error {
_, err := c.service.AlterNode(
ctx,
&Ydb_Coordination.AlterNodeRequest{
Path: path,
Expand All @@ -108,21 +117,25 @@ func (c *Client) alterNode(ctx context.Context, path string, config coordination
return xerrors.WithStackTrace(err)
}

func (c *Client) DropNode(ctx context.Context, path string) (err error) {
func (c *Client) DropNode(ctx context.Context, path string) error {
if c == nil {
return xerrors.WithStackTrace(errNilClient)
}
call := func(ctx context.Context) error {
return xerrors.WithStackTrace(c.dropNode(ctx, path))
}
if !c.config.AutoRetry() {
return call(ctx)
return xerrors.WithStackTrace(call(ctx))
}
return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true))
return retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
)
}

func (c *Client) dropNode(ctx context.Context, path string) (err error) {
_, err = c.service.DropNode(
func (c *Client) dropNode(ctx context.Context, path string) error {
_, err := c.service.DropNode(
ctx,
&Ydb_Coordination.DropNodeRequest{
Path: path,
Expand All @@ -143,22 +156,25 @@ func (c *Client) DescribeNode(
) (
entry *scheme.Entry,
config *coordination.NodeConfig,
err error,
_ error,
) {
if c == nil {
err = xerrors.WithStackTrace(errNilClient)
return
return nil, nil, xerrors.WithStackTrace(errNilClient)
}
call := func(ctx context.Context) error {
call := func(ctx context.Context) (err error) {
entry, config, err = c.describeNode(ctx, path)
return xerrors.WithStackTrace(err)
}
if !c.config.AutoRetry() {
err = call(ctx)
return
err := call(ctx)
return entry, config, xerrors.WithStackTrace(err)
}
err = retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true))
return
err := retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
)
return entry, config, xerrors.WithStackTrace(err)
}

// DescribeNode describes a coordination node
Expand Down
45 changes: 34 additions & 11 deletions internal/ratelimiter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func (c *Client) CreateResource(
if !c.config.AutoRetry() {
return call(ctx)
}
return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true))
return retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
)
}

func (c *Client) createResource(
Expand Down Expand Up @@ -100,7 +104,11 @@ func (c *Client) AlterResource(
if !c.config.AutoRetry() {
return call(ctx)
}
return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true))
return retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
)
}

func (c *Client) alterResource(
Expand Down Expand Up @@ -143,7 +151,11 @@ func (c *Client) DropResource(
if !c.config.AutoRetry() {
return call(ctx)
}
return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true))
return retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
)
}

func (c *Client) dropResource(
Expand All @@ -169,20 +181,24 @@ func (c *Client) ListResource(
coordinationNodePath string,
resourcePath string,
recursive bool,
) (list []string, err error) {
) (list []string, _ error) {
if c == nil {
return list, xerrors.WithStackTrace(errNilClient)
}
call := func(ctx context.Context) error {
call := func(ctx context.Context) (err error) {
list, err = c.listResource(ctx, coordinationNodePath, resourcePath, recursive)
return xerrors.WithStackTrace(err)
}
if !c.config.AutoRetry() {
err = call(ctx)
return
err := call(ctx)
return list, err
}
err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace())
return
err := retry.Retry(ctx, call,
retry.WithIdempotent(true),
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
)
return list, err
}

func (c *Client) listResource(
Expand Down Expand Up @@ -232,7 +248,11 @@ func (c *Client) DescribeResource(
err = call(ctx)
return
}
err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace())
err = retry.Retry(ctx, call,
retry.WithIdempotent(true),
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
)
return
}

Expand Down Expand Up @@ -295,7 +315,10 @@ func (c *Client) AcquireResource(
if !c.config.AutoRetry() {
return call(ctx)
}
return retry.Retry(ctx, call, retry.WithStackTrace())
return retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
)
}

func (c *Client) acquireResource(
Expand Down
Loading

0 comments on commit 61ec6cc

Please sign in to comment.