Skip to content

Commit

Permalink
* Added trace.Retry into driver instance for usage in call `retry.R…
Browse files Browse the repository at this point in the history
…etry` func

* Refactored traces and metrics
* Renamed retry option `WithID` to `WithLabel`
  • Loading branch information
asmyasnikov committed Oct 18, 2023
1 parent 899fac3 commit 4a45a7e
Show file tree
Hide file tree
Showing 30 changed files with 884 additions and 314 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
* Added `trace.Retry` into driver instance for usage in call `retry.Retry` func
* Refactored traces and metrics
* Renamed retry option `WithID` to `WithLabel`

## v3.53.3
* Refactored credentials options (from funcs to interfaces and types)
* Fixed stringification of credentials object
Expand All @@ -8,6 +12,8 @@
## v3.53.1
* Bumps `github.com/ydb-platform/ydb-go-genproto` for support `query` service
* Bumps `golang.org/x/net` from `0.7.0` to `0.17.0`
* Bumps `golang.org/x/sys` from `v0.5.0` to `v0.13.0`
* Bumps `golang.org/x/text` from `v0.7.0` to `v0.13.0`

## v3.53.0
* Removed `internal/backoff.Backoff.Wait` interface method for exclude resource leak with bug-provoked usage of `time.After` method
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func WithTrace(t trace.Driver, opts ...trace.DriverComposeOption) Option { //nol
}
}

func WithTraceRetry(t *trace.Retry, opts ...trace.RetryComposeOption) Option {
return func(c *Config) {
config.SetTraceRetry(&c.Common, t, opts...)
}
}

func WithUserAgent(userAgent string) Option {
return func(c *Config) {
c.metaOptions = append(c.metaOptions, meta.WithUserAgentOption(userAgent))
Expand Down
170 changes: 87 additions & 83 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,34 +139,34 @@ type Driver struct { //nolint:maligned
}

// Close closes Driver and clear resources
func (c *Driver) Close(ctx context.Context) error {
c.mtx.Lock()
defer c.mtx.Unlock()
func (d *Driver) Close(ctx context.Context) error {
d.mtx.Lock()
defer d.mtx.Unlock()

defer func() {
for _, f := range c.onClose {
f(c)
for _, f := range d.onClose {
f(d)
}
}()

closers := make([]func(context.Context) error, 0)
c.childrenMtx.WithLock(func() {
for _, child := range c.children {
d.childrenMtx.WithLock(func() {
for _, child := range d.children {
closers = append(closers, child.Close)
}
c.children = nil
d.children = nil
})

closers = append(
closers,
c.ratelimiterOnce.Close,
c.coordinationOnce.Close,
c.schemeOnce.Close,
c.scriptingOnce.Close,
c.tableOnce.Close,
c.topicOnce.Close,
c.balancer.Close,
c.pool.Release,
d.ratelimiterOnce.Close,
d.coordinationOnce.Close,
d.schemeOnce.Close,
d.scriptingOnce.Close,
d.tableOnce.Close,
d.topicOnce.Close,
d.balancer.Close,
d.pool.Release,
)

var issues []error
Expand All @@ -184,167 +184,171 @@ func (c *Driver) Close(ctx context.Context) error {
}

// Endpoint returns initial endpoint
func (c *Driver) Endpoint() string {
return c.config.Endpoint()
func (d *Driver) Endpoint() string {
return d.config.Endpoint()
}

// Name returns database name
func (c *Driver) Name() string {
return c.config.Database()
func (d *Driver) Name() string {
return d.config.Database()
}

// Secure returns true if database Driver is secure
func (c *Driver) Secure() bool {
return c.config.Secure()
func (d *Driver) Secure() bool {
return d.config.Secure()
}

// Table returns table client
func (c *Driver) Table() table.Client {
c.tableOnce.Init(func() closeFunc {
c.table = internalTable.New(
c.balancer,
func (d *Driver) Table() table.Client {
d.tableOnce.Init(func() closeFunc {
d.table = internalTable.New(
d.balancer,
tableConfig.New(
append(
// prepend common params from root config
[]tableConfig.Option{
tableConfig.With(c.config.Common),
tableConfig.With(d.config.Common),
},
c.tableOptions...,
d.tableOptions...,
)...,
),
)
return c.table.Close
return d.table.Close
})
// may be nil if driver closed early
return c.table
return d.table
}

// Scheme returns scheme client
func (c *Driver) Scheme() scheme.Client {
c.schemeOnce.Init(func() closeFunc {
c.scheme = internalScheme.New(
c.balancer,
func (d *Driver) Scheme() scheme.Client {
d.schemeOnce.Init(func() closeFunc {
d.scheme = internalScheme.New(
d.balancer,
schemeConfig.New(
append(
// prepend common params from root config
[]schemeConfig.Option{
schemeConfig.WithDatabaseName(c.Name()),
schemeConfig.With(c.config.Common),
schemeConfig.WithDatabaseName(d.Name()),
schemeConfig.With(d.config.Common),
},
c.schemeOptions...,
d.schemeOptions...,
)...,
),
)
return c.scheme.Close
return d.scheme.Close
})
// may be nil if driver closed early
return c.scheme
return d.scheme
}

func (d *Driver) TraceRetry() *trace.Retry {
return d.config.TraceRetry()
}

// Coordination returns coordination client
func (c *Driver) Coordination() coordination.Client {
c.coordinationOnce.Init(func() closeFunc {
c.coordination = internalCoordination.New(
c.balancer,
func (d *Driver) Coordination() coordination.Client {
d.coordinationOnce.Init(func() closeFunc {
d.coordination = internalCoordination.New(
d.balancer,
coordinationConfig.New(
append(
// prepend common params from root config
[]coordinationConfig.Option{
coordinationConfig.With(c.config.Common),
coordinationConfig.With(d.config.Common),
},
c.coordinationOptions...,
d.coordinationOptions...,
)...,
),
)
return c.coordination.Close
return d.coordination.Close
})
// may be nil if driver closed early
return c.coordination
return d.coordination
}

// Ratelimiter returns ratelimiter client
func (c *Driver) Ratelimiter() ratelimiter.Client {
c.ratelimiterOnce.Init(func() closeFunc {
c.ratelimiter = internalRatelimiter.New(
c.balancer,
func (d *Driver) Ratelimiter() ratelimiter.Client {
d.ratelimiterOnce.Init(func() closeFunc {
d.ratelimiter = internalRatelimiter.New(
d.balancer,
ratelimiterConfig.New(
append(
// prepend common params from root config
[]ratelimiterConfig.Option{
ratelimiterConfig.With(c.config.Common),
ratelimiterConfig.With(d.config.Common),
},
c.ratelimiterOptions...,
d.ratelimiterOptions...,
)...,
),
)
return c.ratelimiter.Close
return d.ratelimiter.Close
})
// may be nil if driver closed early
return c.ratelimiter
return d.ratelimiter
}

// Discovery returns discovery client
func (c *Driver) Discovery() discovery.Client {
c.discoveryOnce.Init(func() closeFunc {
c.discovery = internalDiscovery.New(
c.pool.Get(endpoint.New(c.config.Endpoint())),
func (d *Driver) Discovery() discovery.Client {
d.discoveryOnce.Init(func() closeFunc {
d.discovery = internalDiscovery.New(
d.pool.Get(endpoint.New(d.config.Endpoint())),
discoveryConfig.New(
append(
// prepend common params from root config
[]discoveryConfig.Option{
discoveryConfig.With(c.config.Common),
discoveryConfig.WithEndpoint(c.Endpoint()),
discoveryConfig.WithDatabase(c.Name()),
discoveryConfig.WithSecure(c.Secure()),
discoveryConfig.WithMeta(c.config.Meta()),
discoveryConfig.With(d.config.Common),
discoveryConfig.WithEndpoint(d.Endpoint()),
discoveryConfig.WithDatabase(d.Name()),
discoveryConfig.WithSecure(d.Secure()),
discoveryConfig.WithMeta(d.config.Meta()),
},
c.discoveryOptions...,
d.discoveryOptions...,
)...,
),
)
return c.discovery.Close
return d.discovery.Close
})
// may be nil if driver closed early
return c.discovery
return d.discovery
}

// Scripting returns scripting client
func (c *Driver) Scripting() scripting.Client {
c.scriptingOnce.Init(func() closeFunc {
c.scripting = internalScripting.New(
c.balancer,
func (d *Driver) Scripting() scripting.Client {
d.scriptingOnce.Init(func() closeFunc {
d.scripting = internalScripting.New(
d.balancer,
scriptingConfig.New(
append(
// prepend common params from root config
[]scriptingConfig.Option{
scriptingConfig.With(c.config.Common),
scriptingConfig.With(d.config.Common),
},
c.scriptingOptions...,
d.scriptingOptions...,
)...,
),
)
return c.scripting.Close
return d.scripting.Close
})
// may be nil if driver closed early
return c.scripting
return d.scripting
}

// Topic returns topic client
func (c *Driver) Topic() topic.Client {
c.topicOnce.Init(func() closeFunc {
c.topic = topicclientinternal.New(c.balancer, c.config.Credentials(),
func (d *Driver) Topic() topic.Client {
d.topicOnce.Init(func() closeFunc {
d.topic = topicclientinternal.New(d.balancer, d.config.Credentials(),
append(
// prepend common params from root config
[]topicoptions.TopicOption{
topicoptions.WithOperationTimeout(c.config.OperationTimeout()),
topicoptions.WithOperationCancelAfter(c.config.OperationCancelAfter()),
topicoptions.WithOperationTimeout(d.config.OperationTimeout()),
topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()),
},
c.topicOptions...,
d.topicOptions...,
)...,
)
return c.topic.Close
return d.topic.Close
})
return c.topic
return d.topic
}

// Open connects to database by DSN and return driver runtime holder
Expand Down
35 changes: 19 additions & 16 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,26 @@ func (b *Balancer) OnUpdate(onApplyDiscoveredEndpoints func(ctx context.Context,
}

func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
if err = retry.Retry(ctx, func(childCtx context.Context) (err error) {
if err = b.clusterDiscoveryAttempt(childCtx); err != nil {
if xerrors.IsTransportError(err, grpcCodes.Unauthenticated) {
return credentials.UnauthenticatedError("cluster discovery failed", err,
credentials.WithEndpoint(b.driverConfig.Endpoint()),
credentials.WithDatabase(b.driverConfig.Database()),
credentials.WithCredentials(b.driverConfig.Credentials()),
)
if err = retry.Retry(xcontext.WithTraceRetry(ctx, b.driverConfig.TraceRetry()),
func(childCtx context.Context) (err error) {
if err = b.clusterDiscoveryAttempt(childCtx); err != nil {
if xerrors.IsTransportError(err, grpcCodes.Unauthenticated) {
return credentials.UnauthenticatedError("cluster discovery failed", err,
credentials.WithEndpoint(b.driverConfig.Endpoint()),
credentials.WithDatabase(b.driverConfig.Database()),
credentials.WithCredentials(b.driverConfig.Credentials()),
)
}
// if got err but parent context is not done - mark error as retryable
if ctx.Err() == nil && xerrors.IsTimeoutError(err) {
return xerrors.WithStackTrace(xerrors.Retryable(err))
}
return xerrors.WithStackTrace(err)
}
// if got err but parent context is not done - mark error as retryable
if ctx.Err() == nil && xerrors.IsTimeoutError(err) {
return xerrors.WithStackTrace(xerrors.Retryable(err))
}
return xerrors.WithStackTrace(err)
}
return nil
}, retry.WithIdempotent(true)); err != nil {
return nil
},
retry.WithIdempotent(true),
); err != nil {
return xerrors.WithStackTrace(err)
}
return nil
Expand Down
11 changes: 11 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package config

import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

type Common struct {
operationTimeout time.Duration
operationCancelAfter time.Duration
disableAutoRetry bool
traceRetry trace.Retry

panicCallback func(e interface{})
}
Expand Down Expand Up @@ -41,6 +44,10 @@ func (c *Common) OperationCancelAfter() time.Duration {
return c.operationCancelAfter
}

func (c *Common) TraceRetry() *trace.Retry {
return &c.traceRetry
}

// SetOperationTimeout define the maximum amount of time a YDB server will process
// an operation. After timeout exceeds YDB will try to cancel operation and
// regardless of the cancellation appropriate error will be returned to
Expand Down Expand Up @@ -70,3 +77,7 @@ func SetPanicCallback(c *Common, panicCallback func(e interface{})) {
func SetAutoRetry(c *Common, autoRetry bool) {
c.disableAutoRetry = !autoRetry
}

func SetTraceRetry(c *Common, t *trace.Retry, opts ...trace.RetryComposeOption) {
c.traceRetry = *c.traceRetry.Compose(t, opts...)
}
Loading

0 comments on commit 4a45a7e

Please sign in to comment.