From 4b6157bd701737e8817d22024c1ff9cd1601b59e Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 7 Nov 2023 17:59:45 +0300 Subject: [PATCH 1/4] splitted connection.go to connection.go (interface Connection only) and driver.go --- connection.go | 475 ------------------------------------------------ driver.go | 488 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 488 insertions(+), 475 deletions(-) create mode 100644 driver.go diff --git a/connection.go b/connection.go index 28b81788d..df33f42d8 100644 --- a/connection.go +++ b/connection.go @@ -2,44 +2,14 @@ package ydb import ( "context" - "errors" - "os" - "sync" - "google.golang.org/grpc" - - "github.com/ydb-platform/ydb-go-sdk/v3/config" "github.com/ydb-platform/ydb-go-sdk/v3/coordination" "github.com/ydb-platform/ydb-go-sdk/v3/discovery" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/conn" - internalCoordination "github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination" - coordinationConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination/config" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/credentials" - internalDiscovery "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery" - discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/dsn" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" - internalRatelimiter "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter" - ratelimiterConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter/config" - internalScheme "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme" - schemeConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme/config" - internalScripting "github.com/ydb-platform/ydb-go-sdk/v3/internal/scripting" - scriptingConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/scripting/config" - internalTable "github.com/ydb-platform/ydb-go-sdk/v3/internal/table" - tableConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicclientinternal" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" - "github.com/ydb-platform/ydb-go-sdk/v3/log" "github.com/ydb-platform/ydb-go-sdk/v3/ratelimiter" "github.com/ydb-platform/ydb-go-sdk/v3/scheme" "github.com/ydb-platform/ydb-go-sdk/v3/scripting" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/topic" - "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" - "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) // Connection interface provide access to YDB service clients @@ -80,448 +50,3 @@ type Connection interface { // Topic returns topic client Topic() topic.Client } - -var _ Connection = (*Driver)(nil) - -// Driver type provide access to YDB service clients -type Driver struct { //nolint:maligned - userInfo *dsn.UserInfo - - logger log.Logger - loggerOpts []log.Option - loggerDetails trace.Detailer - - opts []Option - - config *config.Config - options []config.Option - - discoveryOnce initOnce - discovery *internalDiscovery.Client - discoveryOptions []discoveryConfig.Option - - tableOnce initOnce - table *internalTable.Client - tableOptions []tableConfig.Option - - scriptingOnce initOnce - scripting *internalScripting.Client - scriptingOptions []scriptingConfig.Option - - schemeOnce initOnce - scheme *internalScheme.Client - schemeOptions []schemeConfig.Option - - coordinationOnce initOnce - coordination *internalCoordination.Client - coordinationOptions []coordinationConfig.Option - - ratelimiterOnce initOnce - ratelimiter *internalRatelimiter.Client - ratelimiterOptions []ratelimiterConfig.Option - - topicOnce initOnce - topic *topicclientinternal.Client - topicOptions []topicoptions.TopicOption - - databaseSQLOptions []xsql.ConnectorOption - - pool *conn.Pool - - mtx sync.Mutex - balancer *balancer.Balancer - - children map[uint64]*Driver - childrenMtx xsync.Mutex - onClose []func(c *Driver) - - panicCallback func(e interface{}) -} - -// Close closes Driver and clear resources -func (d *Driver) Close(ctx context.Context) error { - d.mtx.Lock() - defer d.mtx.Unlock() - - defer func() { - for _, f := range d.onClose { - f(d) - } - }() - - closers := make([]func(context.Context) error, 0) - d.childrenMtx.WithLock(func() { - for _, child := range d.children { - closers = append(closers, child.Close) - } - d.children = nil - }) - - closers = append( - closers, - d.ratelimiterOnce.Close, - d.coordinationOnce.Close, - d.schemeOnce.Close, - d.scriptingOnce.Close, - d.tableOnce.Close, - d.topicOnce.Close, - d.balancer.Close, - d.pool.Release, - ) - - var issues []error - for _, closer := range closers { - if err := closer(ctx); err != nil { - issues = append(issues, err) - } - } - - if len(issues) > 0 { - return xerrors.WithStackTrace(xerrors.NewWithIssues("close failed", issues...)) - } - - return nil -} - -// Endpoint returns initial endpoint -func (d *Driver) Endpoint() string { - return d.config.Endpoint() -} - -// Name returns database name -func (d *Driver) Name() string { - return d.config.Database() -} - -// Secure returns true if database Driver is secure -func (d *Driver) Secure() bool { - return d.config.Secure() -} - -// Table returns table client -func (d *Driver) Table() table.Client { - d.tableOnce.Init(func() closeFunc { - d.table = internalTable.New( - d.balancer, - tableConfig.New( - append( - // prepend common params from root config - []tableConfig.Option{ - tableConfig.With(d.config.Common), - }, - d.tableOptions..., - )..., - ), - ) - return d.table.Close - }) - // may be nil if driver closed early - return d.table -} - -// Scheme returns scheme client -func (d *Driver) Scheme() scheme.Client { - d.schemeOnce.Init(func() closeFunc { - d.scheme = internalScheme.New( - d.balancer, - schemeConfig.New( - append( - // prepend common params from root config - []schemeConfig.Option{ - schemeConfig.WithDatabaseName(d.Name()), - schemeConfig.With(d.config.Common), - }, - d.schemeOptions..., - )..., - ), - ) - return d.scheme.Close - }) - // may be nil if driver closed early - return d.scheme -} - -// Coordination returns coordination client -func (d *Driver) Coordination() coordination.Client { - d.coordinationOnce.Init(func() closeFunc { - d.coordination = internalCoordination.New( - d.balancer, - coordinationConfig.New( - append( - // prepend common params from root config - []coordinationConfig.Option{ - coordinationConfig.With(d.config.Common), - }, - d.coordinationOptions..., - )..., - ), - ) - return d.coordination.Close - }) - // may be nil if driver closed early - return d.coordination -} - -// Ratelimiter returns ratelimiter client -func (d *Driver) Ratelimiter() ratelimiter.Client { - d.ratelimiterOnce.Init(func() closeFunc { - d.ratelimiter = internalRatelimiter.New( - d.balancer, - ratelimiterConfig.New( - append( - // prepend common params from root config - []ratelimiterConfig.Option{ - ratelimiterConfig.With(d.config.Common), - }, - d.ratelimiterOptions..., - )..., - ), - ) - return d.ratelimiter.Close - }) - // may be nil if driver closed early - return d.ratelimiter -} - -// Discovery returns discovery client -func (d *Driver) Discovery() discovery.Client { - d.discoveryOnce.Init(func() closeFunc { - d.discovery = internalDiscovery.New( - d.pool.Get(endpoint.New(d.config.Endpoint())), - discoveryConfig.New( - append( - // prepend common params from root config - []discoveryConfig.Option{ - discoveryConfig.With(d.config.Common), - discoveryConfig.WithEndpoint(d.Endpoint()), - discoveryConfig.WithDatabase(d.Name()), - discoveryConfig.WithSecure(d.Secure()), - discoveryConfig.WithMeta(d.config.Meta()), - }, - d.discoveryOptions..., - )..., - ), - ) - return d.discovery.Close - }) - // may be nil if driver closed early - return d.discovery -} - -// Scripting returns scripting client -func (d *Driver) Scripting() scripting.Client { - d.scriptingOnce.Init(func() closeFunc { - d.scripting = internalScripting.New( - d.balancer, - scriptingConfig.New( - append( - // prepend common params from root config - []scriptingConfig.Option{ - scriptingConfig.With(d.config.Common), - }, - d.scriptingOptions..., - )..., - ), - ) - return d.scripting.Close - }) - // may be nil if driver closed early - return d.scripting -} - -// Topic returns topic client -func (d *Driver) Topic() topic.Client { - d.topicOnce.Init(func() closeFunc { - d.topic = topicclientinternal.New(d.balancer, d.config.Credentials(), - append( - // prepend common params from root config - []topicoptions.TopicOption{ - topicoptions.WithOperationTimeout(d.config.OperationTimeout()), - topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()), - }, - d.topicOptions..., - )..., - ) - return d.topic.Close - }) - return d.topic -} - -// Open connects to database by DSN and return driver runtime holder -// -// DSN accept Driver string like -// -// "grpc[s]://{endpoint}/{database}[?param=value]" -// -// See sugar.DSN helper for make dsn from endpoint and database -func Open(ctx context.Context, dsn string, opts ...Option) (_ *Driver, err error) { - return open( - ctx, - append( - []Option{ - WithConnectionString(dsn), - }, - opts..., - )..., - ) -} - -func MustOpen(ctx context.Context, dsn string, opts ...Option) *Driver { - db, err := Open(ctx, dsn, opts...) - if err != nil { - panic(err) - } - return db -} - -// New connects to database and return driver runtime holder -// -// Deprecated: use Open with required param connectionString instead -func New(ctx context.Context, opts ...Option) (_ *Driver, err error) { - return open(ctx, opts...) -} - -func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, err error) { - d := &Driver{ - children: make(map[uint64]*Driver), - } - if caFile, has := os.LookupEnv("YDB_SSL_ROOT_CERTIFICATES_FILE"); has { - d.opts = append(d.opts, - WithCertificatesFromFile(caFile), - ) - } - if logLevel, has := os.LookupEnv("YDB_LOG_SEVERITY_LEVEL"); has { - if l := log.FromString(logLevel); l < log.QUIET { - d.opts = append(d.opts, - WithLogger( - log.Default(os.Stderr, - log.WithMinLevel(log.FromString(logLevel)), - log.WithColoring(), - ), - trace.MatchDetails( - os.Getenv("YDB_LOG_DETAILS"), - trace.WithDefaultDetails(trace.DetailsAll), - ), - log.WithLogQuery(), - ), - ) - } - } - d.opts = append(d.opts, opts...) - for _, opt := range d.opts { - if opt != nil { - err = opt(ctx, d) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - } - } - if d.logger != nil { - for _, opt := range []Option{ - WithTraceDriver(log.Driver(d.logger, d.loggerDetails, d.loggerOpts...)), - WithTraceTable(log.Table(d.logger, d.loggerDetails, d.loggerOpts...)), - WithTraceScripting(log.Scripting(d.logger, d.loggerDetails, d.loggerOpts...)), - WithTraceScheme(log.Scheme(d.logger, d.loggerDetails, d.loggerOpts...)), - WithTraceCoordination(log.Coordination(d.logger, d.loggerDetails, d.loggerOpts...)), - WithTraceRatelimiter(log.Ratelimiter(d.logger, d.loggerDetails, d.loggerOpts...)), - WithTraceDiscovery(log.Discovery(d.logger, d.loggerDetails, d.loggerOpts...)), - WithTraceTopic(log.Topic(d.logger, d.loggerDetails, d.loggerOpts...)), - WithTraceDatabaseSQL(log.DatabaseSQL(d.logger, d.loggerDetails, d.loggerOpts...)), - WithTraceRetry(log.Retry(d.logger, d.loggerDetails, d.loggerOpts...)), - } { - if opt != nil { - err = opt(ctx, d) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - } - } - } - d.config = config.New(d.options...) - return d, nil -} - -func connect(ctx context.Context, c *Driver) error { - var err error - - if c.config.Endpoint() == "" { - return xerrors.WithStackTrace(errors.New("configuration: empty dial address")) - } - if c.config.Database() == "" { - return xerrors.WithStackTrace(errors.New("configuration: empty database")) - } - - onDone := trace.DriverOnInit( - c.config.Trace(), - &ctx, - c.config.Endpoint(), - c.config.Database(), - c.config.Secure(), - ) - defer func() { - onDone(err) - }() - - if c.userInfo != nil { - c.config = c.config.With(config.WithCredentials( - credentials.NewStaticCredentials( - c.userInfo.User, c.userInfo.Password, - c.config.Endpoint(), - credentials.WithGrpcDialOptions(c.config.GrpcDialOptions()...), - ), - )) - } - - if c.pool == nil { - c.pool = conn.NewPool(c.config) - } - - c.balancer, err = balancer.New(ctx, c.config, c.pool, c.discoveryOptions...) - if err != nil { - return xerrors.WithStackTrace(err) - } - - return nil -} - -func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { - c, err := newConnectionFromOptions(ctx, opts...) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - err = connect(ctx, c) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - return c, nil -} - -// GRPCConn casts *ydb.Driver to grpc.ClientConnInterface for executing -// unary and streaming RPC over internal driver balancer. -// -// Warning: for connect to driver-unsupported YDB services -func GRPCConn(cc *Driver) grpc.ClientConnInterface { - return conn.WithContextModifier(cc.balancer, conn.WithoutWrapping) -} - -// Helper types for closing lazy clients -type closeFunc func(ctx context.Context) error - -type initOnce struct { - once sync.Once - close closeFunc -} - -func (lo *initOnce) Init(f func() closeFunc) { - lo.once.Do(func() { - lo.close = f() - }) -} - -func (lo *initOnce) Close(ctx context.Context) error { - lo.once.Do(func() {}) - if lo.close == nil { - return nil - } - return lo.close(ctx) -} diff --git a/driver.go b/driver.go new file mode 100644 index 000000000..2c0ec3b08 --- /dev/null +++ b/driver.go @@ -0,0 +1,488 @@ +package ydb + +import ( + "context" + "errors" + "os" + "sync" + + "google.golang.org/grpc" + + "github.com/ydb-platform/ydb-go-sdk/v3/config" + "github.com/ydb-platform/ydb-go-sdk/v3/coordination" + "github.com/ydb-platform/ydb-go-sdk/v3/discovery" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/conn" + internalCoordination "github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination" + coordinationConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/credentials" + internalDiscovery "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery" + discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/dsn" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" + internalRatelimiter "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter" + ratelimiterConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter/config" + internalScheme "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme" + schemeConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme/config" + internalScripting "github.com/ydb-platform/ydb-go-sdk/v3/internal/scripting" + scriptingConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/scripting/config" + internalTable "github.com/ydb-platform/ydb-go-sdk/v3/internal/table" + tableConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicclientinternal" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" + "github.com/ydb-platform/ydb-go-sdk/v3/log" + "github.com/ydb-platform/ydb-go-sdk/v3/ratelimiter" + "github.com/ydb-platform/ydb-go-sdk/v3/scheme" + "github.com/ydb-platform/ydb-go-sdk/v3/scripting" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/topic" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +var _ Connection = (*Driver)(nil) + +// Driver type provide access to YDB service clients +type Driver struct { //nolint:maligned + userInfo *dsn.UserInfo + + logger log.Logger + loggerOpts []log.Option + loggerDetails trace.Detailer + + opts []Option + + config *config.Config + options []config.Option + + discoveryOnce initOnce + discovery *internalDiscovery.Client + discoveryOptions []discoveryConfig.Option + + tableOnce initOnce + table *internalTable.Client + tableOptions []tableConfig.Option + + scriptingOnce initOnce + scripting *internalScripting.Client + scriptingOptions []scriptingConfig.Option + + schemeOnce initOnce + scheme *internalScheme.Client + schemeOptions []schemeConfig.Option + + coordinationOnce initOnce + coordination *internalCoordination.Client + coordinationOptions []coordinationConfig.Option + + ratelimiterOnce initOnce + ratelimiter *internalRatelimiter.Client + ratelimiterOptions []ratelimiterConfig.Option + + topicOnce initOnce + topic *topicclientinternal.Client + topicOptions []topicoptions.TopicOption + + databaseSQLOptions []xsql.ConnectorOption + + pool *conn.Pool + + mtx sync.Mutex + balancer *balancer.Balancer + + children map[uint64]*Driver + childrenMtx xsync.Mutex + onClose []func(c *Driver) + + panicCallback func(e interface{}) +} + +// Close closes Driver and clear resources +func (d *Driver) Close(ctx context.Context) error { + d.mtx.Lock() + defer d.mtx.Unlock() + + defer func() { + for _, f := range d.onClose { + f(d) + } + }() + + closers := make([]func(context.Context) error, 0) + d.childrenMtx.WithLock(func() { + for _, child := range d.children { + closers = append(closers, child.Close) + } + d.children = nil + }) + + closers = append( + closers, + d.ratelimiterOnce.Close, + d.coordinationOnce.Close, + d.schemeOnce.Close, + d.scriptingOnce.Close, + d.tableOnce.Close, + d.topicOnce.Close, + d.balancer.Close, + d.pool.Release, + ) + + var issues []error + for _, closer := range closers { + if err := closer(ctx); err != nil { + issues = append(issues, err) + } + } + + if len(issues) > 0 { + return xerrors.WithStackTrace(xerrors.NewWithIssues("close failed", issues...)) + } + + return nil +} + +// Endpoint returns initial endpoint +func (d *Driver) Endpoint() string { + return d.config.Endpoint() +} + +// Name returns database name +func (d *Driver) Name() string { + return d.config.Database() +} + +// Secure returns true if database Driver is secure +func (d *Driver) Secure() bool { + return d.config.Secure() +} + +// Table returns table client +func (d *Driver) Table() table.Client { + d.tableOnce.Init(func() closeFunc { + d.table = internalTable.New( + d.balancer, + tableConfig.New( + append( + // prepend common params from root config + []tableConfig.Option{ + tableConfig.With(d.config.Common), + }, + d.tableOptions..., + )..., + ), + ) + return d.table.Close + }) + // may be nil if driver closed early + return d.table +} + +// Scheme returns scheme client +func (d *Driver) Scheme() scheme.Client { + d.schemeOnce.Init(func() closeFunc { + d.scheme = internalScheme.New( + d.balancer, + schemeConfig.New( + append( + // prepend common params from root config + []schemeConfig.Option{ + schemeConfig.WithDatabaseName(d.Name()), + schemeConfig.With(d.config.Common), + }, + d.schemeOptions..., + )..., + ), + ) + return d.scheme.Close + }) + // may be nil if driver closed early + return d.scheme +} + +// Coordination returns coordination client +func (d *Driver) Coordination() coordination.Client { + d.coordinationOnce.Init(func() closeFunc { + d.coordination = internalCoordination.New( + d.balancer, + coordinationConfig.New( + append( + // prepend common params from root config + []coordinationConfig.Option{ + coordinationConfig.With(d.config.Common), + }, + d.coordinationOptions..., + )..., + ), + ) + return d.coordination.Close + }) + // may be nil if driver closed early + return d.coordination +} + +// Ratelimiter returns ratelimiter client +func (d *Driver) Ratelimiter() ratelimiter.Client { + d.ratelimiterOnce.Init(func() closeFunc { + d.ratelimiter = internalRatelimiter.New( + d.balancer, + ratelimiterConfig.New( + append( + // prepend common params from root config + []ratelimiterConfig.Option{ + ratelimiterConfig.With(d.config.Common), + }, + d.ratelimiterOptions..., + )..., + ), + ) + return d.ratelimiter.Close + }) + // may be nil if driver closed early + return d.ratelimiter +} + +// Discovery returns discovery client +func (d *Driver) Discovery() discovery.Client { + d.discoveryOnce.Init(func() closeFunc { + d.discovery = internalDiscovery.New( + d.pool.Get(endpoint.New(d.config.Endpoint())), + discoveryConfig.New( + append( + // prepend common params from root config + []discoveryConfig.Option{ + discoveryConfig.With(d.config.Common), + discoveryConfig.WithEndpoint(d.Endpoint()), + discoveryConfig.WithDatabase(d.Name()), + discoveryConfig.WithSecure(d.Secure()), + discoveryConfig.WithMeta(d.config.Meta()), + }, + d.discoveryOptions..., + )..., + ), + ) + return d.discovery.Close + }) + // may be nil if driver closed early + return d.discovery +} + +// Scripting returns scripting client +func (d *Driver) Scripting() scripting.Client { + d.scriptingOnce.Init(func() closeFunc { + d.scripting = internalScripting.New( + d.balancer, + scriptingConfig.New( + append( + // prepend common params from root config + []scriptingConfig.Option{ + scriptingConfig.With(d.config.Common), + }, + d.scriptingOptions..., + )..., + ), + ) + return d.scripting.Close + }) + // may be nil if driver closed early + return d.scripting +} + +// Topic returns topic client +func (d *Driver) Topic() topic.Client { + d.topicOnce.Init(func() closeFunc { + d.topic = topicclientinternal.New(d.balancer, d.config.Credentials(), + append( + // prepend common params from root config + []topicoptions.TopicOption{ + topicoptions.WithOperationTimeout(d.config.OperationTimeout()), + topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()), + }, + d.topicOptions..., + )..., + ) + return d.topic.Close + }) + return d.topic +} + +// Open connects to database by DSN and return driver runtime holder +// +// DSN accept Driver string like +// +// "grpc[s]://{endpoint}/{database}[?param=value]" +// +// See sugar.DSN helper for make dsn from endpoint and database +func Open(ctx context.Context, dsn string, opts ...Option) (_ *Driver, err error) { + return open( + ctx, + append( + []Option{ + WithConnectionString(dsn), + }, + opts..., + )..., + ) +} + +func MustOpen(ctx context.Context, dsn string, opts ...Option) *Driver { + db, err := Open(ctx, dsn, opts...) + if err != nil { + panic(err) + } + return db +} + +// New connects to database and return driver runtime holder +// +// Deprecated: use Open with required param connectionString instead +func New(ctx context.Context, opts ...Option) (_ *Driver, err error) { + return open(ctx, opts...) +} + +func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, err error) { + d := &Driver{ + children: make(map[uint64]*Driver), + } + if caFile, has := os.LookupEnv("YDB_SSL_ROOT_CERTIFICATES_FILE"); has { + d.opts = append(d.opts, + WithCertificatesFromFile(caFile), + ) + } + if logLevel, has := os.LookupEnv("YDB_LOG_SEVERITY_LEVEL"); has { + if l := log.FromString(logLevel); l < log.QUIET { + d.opts = append(d.opts, + WithLogger( + log.Default(os.Stderr, + log.WithMinLevel(log.FromString(logLevel)), + log.WithColoring(), + ), + trace.MatchDetails( + os.Getenv("YDB_LOG_DETAILS"), + trace.WithDefaultDetails(trace.DetailsAll), + ), + log.WithLogQuery(), + ), + ) + } + } + d.opts = append(d.opts, opts...) + for _, opt := range d.opts { + if opt != nil { + err = opt(ctx, d) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + } + } + if d.logger != nil { + for _, opt := range []Option{ + WithTraceDriver(log.Driver(d.logger, d.loggerDetails, d.loggerOpts...)), + WithTraceTable(log.Table(d.logger, d.loggerDetails, d.loggerOpts...)), + WithTraceScripting(log.Scripting(d.logger, d.loggerDetails, d.loggerOpts...)), + WithTraceScheme(log.Scheme(d.logger, d.loggerDetails, d.loggerOpts...)), + WithTraceCoordination(log.Coordination(d.logger, d.loggerDetails, d.loggerOpts...)), + WithTraceRatelimiter(log.Ratelimiter(d.logger, d.loggerDetails, d.loggerOpts...)), + WithTraceDiscovery(log.Discovery(d.logger, d.loggerDetails, d.loggerOpts...)), + WithTraceTopic(log.Topic(d.logger, d.loggerDetails, d.loggerOpts...)), + WithTraceDatabaseSQL(log.DatabaseSQL(d.logger, d.loggerDetails, d.loggerOpts...)), + WithTraceRetry(log.Retry(d.logger, d.loggerDetails, d.loggerOpts...)), + } { + if opt != nil { + err = opt(ctx, d) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + } + } + } + d.config = config.New(d.options...) + return d, nil +} + +func connect(ctx context.Context, c *Driver) error { + var err error + + if c.config.Endpoint() == "" { + return xerrors.WithStackTrace(errors.New("configuration: empty dial address")) + } + if c.config.Database() == "" { + return xerrors.WithStackTrace(errors.New("configuration: empty database")) + } + + onDone := trace.DriverOnInit( + c.config.Trace(), + &ctx, + c.config.Endpoint(), + c.config.Database(), + c.config.Secure(), + ) + defer func() { + onDone(err) + }() + + if c.userInfo != nil { + c.config = c.config.With(config.WithCredentials( + credentials.NewStaticCredentials( + c.userInfo.User, c.userInfo.Password, + c.config.Endpoint(), + credentials.WithGrpcDialOptions(c.config.GrpcDialOptions()...), + ), + )) + } + + if c.pool == nil { + c.pool = conn.NewPool(c.config) + } + + c.balancer, err = balancer.New(ctx, c.config, c.pool, c.discoveryOptions...) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil +} + +func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { + c, err := newConnectionFromOptions(ctx, opts...) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + err = connect(ctx, c) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + return c, nil +} + +// GRPCConn casts *ydb.Driver to grpc.ClientConnInterface for executing +// unary and streaming RPC over internal driver balancer. +// +// Warning: for connect to driver-unsupported YDB services +func GRPCConn(cc *Driver) grpc.ClientConnInterface { + return conn.WithContextModifier(cc.balancer, conn.WithoutWrapping) +} + +// Helper types for closing lazy clients +type closeFunc func(ctx context.Context) error + +type initOnce struct { + once sync.Once + close closeFunc +} + +func (lo *initOnce) Init(f func() closeFunc) { + lo.once.Do(func() { + lo.close = f() + }) +} + +func (lo *initOnce) Close(ctx context.Context) error { + lo.once.Do(func() {}) + if lo.close == nil { + return nil + } + return lo.close(ctx) +} From 2fce79dfc5dce9d98ad25adbf68a4ac0a2b71e05 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 7 Nov 2023 18:05:56 +0300 Subject: [PATCH 2/4] Replaced lazy initialization of ydb clients (table, topic, etc.) to explicit initialization on ydb.Open step --- CHANGELOG.md | 2 + driver.go | 279 +++++++++++++++++++++------------------------------ 2 files changed, 114 insertions(+), 167 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e08be7f28..ce8977932 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Replaced lazy initialization of ydb clients (table, topic, etc.) to explicit initialization on `ydb.Open` step + ## v3.54.1 * Fixed inconsistent labels in `metrics` diff --git a/driver.go b/driver.go index 2c0ec3b08..b916fdb65 100644 --- a/driver.go +++ b/driver.go @@ -57,31 +57,24 @@ type Driver struct { //nolint:maligned config *config.Config options []config.Option - discoveryOnce initOnce discovery *internalDiscovery.Client discoveryOptions []discoveryConfig.Option - tableOnce initOnce table *internalTable.Client tableOptions []tableConfig.Option - scriptingOnce initOnce scripting *internalScripting.Client scriptingOptions []scriptingConfig.Option - schemeOnce initOnce scheme *internalScheme.Client schemeOptions []schemeConfig.Option - coordinationOnce initOnce coordination *internalCoordination.Client coordinationOptions []coordinationConfig.Option - ratelimiterOnce initOnce ratelimiter *internalRatelimiter.Client ratelimiterOptions []ratelimiterConfig.Option - topicOnce initOnce topic *topicclientinternal.Client topicOptions []topicoptions.TopicOption @@ -120,12 +113,12 @@ func (d *Driver) Close(ctx context.Context) error { closers = append( closers, - d.ratelimiterOnce.Close, - d.coordinationOnce.Close, - d.schemeOnce.Close, - d.scriptingOnce.Close, - d.tableOnce.Close, - d.topicOnce.Close, + d.ratelimiter.Close, + d.coordination.Close, + d.scheme.Close, + d.scripting.Close, + d.table.Close, + d.topic.Close, d.balancer.Close, d.pool.Release, ) @@ -161,150 +154,36 @@ func (d *Driver) Secure() bool { // Table returns table client func (d *Driver) Table() table.Client { - d.tableOnce.Init(func() closeFunc { - d.table = internalTable.New( - d.balancer, - tableConfig.New( - append( - // prepend common params from root config - []tableConfig.Option{ - tableConfig.With(d.config.Common), - }, - d.tableOptions..., - )..., - ), - ) - return d.table.Close - }) - // may be nil if driver closed early return d.table } // Scheme returns scheme client func (d *Driver) Scheme() scheme.Client { - d.schemeOnce.Init(func() closeFunc { - d.scheme = internalScheme.New( - d.balancer, - schemeConfig.New( - append( - // prepend common params from root config - []schemeConfig.Option{ - schemeConfig.WithDatabaseName(d.Name()), - schemeConfig.With(d.config.Common), - }, - d.schemeOptions..., - )..., - ), - ) - return d.scheme.Close - }) - // may be nil if driver closed early return d.scheme } // Coordination returns coordination client func (d *Driver) Coordination() coordination.Client { - d.coordinationOnce.Init(func() closeFunc { - d.coordination = internalCoordination.New( - d.balancer, - coordinationConfig.New( - append( - // prepend common params from root config - []coordinationConfig.Option{ - coordinationConfig.With(d.config.Common), - }, - d.coordinationOptions..., - )..., - ), - ) - return d.coordination.Close - }) - // may be nil if driver closed early return d.coordination } // Ratelimiter returns ratelimiter client func (d *Driver) Ratelimiter() ratelimiter.Client { - d.ratelimiterOnce.Init(func() closeFunc { - d.ratelimiter = internalRatelimiter.New( - d.balancer, - ratelimiterConfig.New( - append( - // prepend common params from root config - []ratelimiterConfig.Option{ - ratelimiterConfig.With(d.config.Common), - }, - d.ratelimiterOptions..., - )..., - ), - ) - return d.ratelimiter.Close - }) - // may be nil if driver closed early return d.ratelimiter } // Discovery returns discovery client func (d *Driver) Discovery() discovery.Client { - d.discoveryOnce.Init(func() closeFunc { - d.discovery = internalDiscovery.New( - d.pool.Get(endpoint.New(d.config.Endpoint())), - discoveryConfig.New( - append( - // prepend common params from root config - []discoveryConfig.Option{ - discoveryConfig.With(d.config.Common), - discoveryConfig.WithEndpoint(d.Endpoint()), - discoveryConfig.WithDatabase(d.Name()), - discoveryConfig.WithSecure(d.Secure()), - discoveryConfig.WithMeta(d.config.Meta()), - }, - d.discoveryOptions..., - )..., - ), - ) - return d.discovery.Close - }) - // may be nil if driver closed early return d.discovery } // Scripting returns scripting client func (d *Driver) Scripting() scripting.Client { - d.scriptingOnce.Init(func() closeFunc { - d.scripting = internalScripting.New( - d.balancer, - scriptingConfig.New( - append( - // prepend common params from root config - []scriptingConfig.Option{ - scriptingConfig.With(d.config.Common), - }, - d.scriptingOptions..., - )..., - ), - ) - return d.scripting.Close - }) - // may be nil if driver closed early return d.scripting } // Topic returns topic client func (d *Driver) Topic() topic.Client { - d.topicOnce.Init(func() closeFunc { - d.topic = topicclientinternal.New(d.balancer, d.config.Credentials(), - append( - // prepend common params from root config - []topicoptions.TopicOption{ - topicoptions.WithOperationTimeout(d.config.OperationTimeout()), - topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()), - }, - d.topicOptions..., - )..., - ) - return d.topic.Close - }) return d.topic } @@ -402,42 +281,42 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e return d, nil } -func connect(ctx context.Context, c *Driver) error { +func connect(ctx context.Context, d *Driver) error { var err error - if c.config.Endpoint() == "" { + if d.config.Endpoint() == "" { return xerrors.WithStackTrace(errors.New("configuration: empty dial address")) } - if c.config.Database() == "" { + if d.config.Database() == "" { return xerrors.WithStackTrace(errors.New("configuration: empty database")) } onDone := trace.DriverOnInit( - c.config.Trace(), + d.config.Trace(), &ctx, - c.config.Endpoint(), - c.config.Database(), - c.config.Secure(), + d.config.Endpoint(), + d.config.Database(), + d.config.Secure(), ) defer func() { onDone(err) }() - if c.userInfo != nil { - c.config = c.config.With(config.WithCredentials( + if d.userInfo != nil { + d.config = d.config.With(config.WithCredentials( credentials.NewStaticCredentials( - c.userInfo.User, c.userInfo.Password, - c.config.Endpoint(), - credentials.WithGrpcDialOptions(c.config.GrpcDialOptions()...), + d.userInfo.User, d.userInfo.Password, + d.config.Endpoint(), + credentials.WithGrpcDialOptions(d.config.GrpcDialOptions()...), ), )) } - if c.pool == nil { - c.pool = conn.NewPool(c.config) + if d.pool == nil { + d.pool = conn.NewPool(d.config) } - c.balancer, err = balancer.New(ctx, c.config, c.pool, c.discoveryOptions...) + d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...) if err != nil { return xerrors.WithStackTrace(err) } @@ -446,15 +325,103 @@ func connect(ctx context.Context, c *Driver) error { } func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { - c, err := newConnectionFromOptions(ctx, opts...) + d, err := newConnectionFromOptions(ctx, opts...) if err != nil { return nil, xerrors.WithStackTrace(err) } - err = connect(ctx, c) + err = connect(ctx, d) if err != nil { return nil, xerrors.WithStackTrace(err) } - return c, nil + d.table = internalTable.New( + d.balancer, + tableConfig.New( + append( + // prepend common params from root config + []tableConfig.Option{ + tableConfig.With(d.config.Common), + }, + d.tableOptions..., + )..., + ), + ) + d.scheme = internalScheme.New( + d.balancer, + schemeConfig.New( + append( + // prepend common params from root config + []schemeConfig.Option{ + schemeConfig.WithDatabaseName(d.Name()), + schemeConfig.With(d.config.Common), + }, + d.schemeOptions..., + )..., + ), + ) + d.coordination = internalCoordination.New( + d.balancer, + coordinationConfig.New( + append( + // prepend common params from root config + []coordinationConfig.Option{ + coordinationConfig.With(d.config.Common), + }, + d.coordinationOptions..., + )..., + ), + ) + d.ratelimiter = internalRatelimiter.New( + d.balancer, + ratelimiterConfig.New( + append( + // prepend common params from root config + []ratelimiterConfig.Option{ + ratelimiterConfig.With(d.config.Common), + }, + d.ratelimiterOptions..., + )..., + ), + ) + d.discovery = internalDiscovery.New( + d.pool.Get(endpoint.New(d.config.Endpoint())), + discoveryConfig.New( + append( + // prepend common params from root config + []discoveryConfig.Option{ + discoveryConfig.With(d.config.Common), + discoveryConfig.WithEndpoint(d.Endpoint()), + discoveryConfig.WithDatabase(d.Name()), + discoveryConfig.WithSecure(d.Secure()), + discoveryConfig.WithMeta(d.config.Meta()), + }, + d.discoveryOptions..., + )..., + ), + ) + d.scripting = internalScripting.New( + d.balancer, + scriptingConfig.New( + append( + // prepend common params from root config + []scriptingConfig.Option{ + scriptingConfig.With(d.config.Common), + }, + d.scriptingOptions..., + )..., + ), + ) + d.topic = topicclientinternal.New(d.balancer, d.config.Credentials(), + append( + // prepend common params from root config + []topicoptions.TopicOption{ + topicoptions.WithOperationTimeout(d.config.OperationTimeout()), + topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()), + }, + d.topicOptions..., + )..., + ) + + return d, nil } // GRPCConn casts *ydb.Driver to grpc.ClientConnInterface for executing @@ -464,25 +431,3 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { func GRPCConn(cc *Driver) grpc.ClientConnInterface { return conn.WithContextModifier(cc.balancer, conn.WithoutWrapping) } - -// Helper types for closing lazy clients -type closeFunc func(ctx context.Context) error - -type initOnce struct { - once sync.Once - close closeFunc -} - -func (lo *initOnce) Init(f func() closeFunc) { - lo.once.Do(func() { - lo.close = f() - }) -} - -func (lo *initOnce) Close(ctx context.Context) error { - lo.once.Do(func() {}) - if lo.close == nil { - return nil - } - return lo.close(ctx) -} From 0d043f2403b51f5363ad9862f861eeec0fc99e06 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 7 Nov 2023 18:31:21 +0300 Subject: [PATCH 3/4] added context to internal clients constructors --- driver.go | 67 +++- internal/balancer/balancer.go | 20 +- internal/coordination/client.go | 4 +- internal/discovery/discovery.go | 4 +- internal/ratelimiter/client.go | 4 +- internal/scheme/client.go | 4 +- internal/scripting/client.go | 8 +- internal/table/client.go | 21 +- internal/table/client_test.go | 10 +- internal/table/session_test.go | 362 +++++++++---------- internal/topic/topicclientinternal/client.go | 9 +- retry/retry.go | 2 +- trace/table.go | 1 + trace/table_gtrace.go | 5 +- 14 files changed, 280 insertions(+), 241 deletions(-) diff --git a/driver.go b/driver.go index b916fdb65..d87c35b10 100644 --- a/driver.go +++ b/driver.go @@ -321,19 +321,7 @@ func connect(ctx context.Context, d *Driver) error { return xerrors.WithStackTrace(err) } - return nil -} - -func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { - d, err := newConnectionFromOptions(ctx, opts...) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - err = connect(ctx, d) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - d.table = internalTable.New( + d.table, err = internalTable.New(ctx, d.balancer, tableConfig.New( append( @@ -345,7 +333,11 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { )..., ), ) - d.scheme = internalScheme.New( + if err != nil { + return xerrors.WithStackTrace(err) + } + + d.scheme, err = internalScheme.New(ctx, d.balancer, schemeConfig.New( append( @@ -358,7 +350,11 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { )..., ), ) - d.coordination = internalCoordination.New( + if err != nil { + return xerrors.WithStackTrace(err) + } + + d.coordination, err = internalCoordination.New(ctx, d.balancer, coordinationConfig.New( append( @@ -370,7 +366,11 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { )..., ), ) - d.ratelimiter = internalRatelimiter.New( + if err != nil { + return xerrors.WithStackTrace(err) + } + + d.ratelimiter, err = internalRatelimiter.New(ctx, d.balancer, ratelimiterConfig.New( append( @@ -382,7 +382,11 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { )..., ), ) - d.discovery = internalDiscovery.New( + if err != nil { + return xerrors.WithStackTrace(err) + } + + d.discovery, err = internalDiscovery.New(ctx, d.pool.Get(endpoint.New(d.config.Endpoint())), discoveryConfig.New( append( @@ -398,7 +402,11 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { )..., ), ) - d.scripting = internalScripting.New( + if err != nil { + return xerrors.WithStackTrace(err) + } + + d.scripting, err = internalScripting.New(ctx, d.balancer, scriptingConfig.New( append( @@ -410,7 +418,13 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { )..., ), ) - d.topic = topicclientinternal.New(d.balancer, d.config.Credentials(), + if err != nil { + return xerrors.WithStackTrace(err) + } + + d.topic, err = topicclientinternal.New(ctx, + d.balancer, + d.config.Credentials(), append( // prepend common params from root config []topicoptions.TopicOption{ @@ -420,7 +434,22 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { d.topicOptions..., )..., ) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil +} +func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { + d, err := newConnectionFromOptions(ctx, opts...) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + err = connect(ctx, d) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } return d, nil } diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index 4300ef11d..b0e66885e 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -186,7 +186,7 @@ func New( driverConfig *config.Config, pool *conn.Pool, opts ...discoveryConfig.Option, -) (b *Balancer, err error) { +) (b *Balancer, finalErr error) { var ( onDone = trace.DriverOnBalancerInit( driverConfig.Trace(), @@ -201,20 +201,22 @@ func New( )...) ) defer func() { - onDone(err) + onDone(finalErr) }() b = &Balancer{ driverConfig: driverConfig, pool: pool, localDCDetector: detectLocalDC, - discoveryClient: internalDiscovery.New( - pool.Get( - endpoint.New(driverConfig.Endpoint()), - ), - discoveryConfig, - ), } + d, err := internalDiscovery.New(ctx, pool.Get( + endpoint.New(driverConfig.Endpoint()), + ), discoveryConfig) + if err != nil { + return nil, err + } + + b.discoveryClient = d if config := driverConfig.Balancer(); config == nil { b.balancerConfig = balancerConfig.Config{} @@ -228,7 +230,7 @@ func New( }, "") } else { // initialization of balancer state - if err = b.clusterDiscovery(ctx); err != nil { + if err := b.clusterDiscovery(ctx); err != nil { return nil, xerrors.WithStackTrace(err) } // run background discovering diff --git a/internal/coordination/client.go b/internal/coordination/client.go index b709a0f9b..f077aabfa 100644 --- a/internal/coordination/client.go +++ b/internal/coordination/client.go @@ -27,11 +27,11 @@ type Client struct { service Ydb_Coordination_V1.CoordinationServiceClient } -func New(cc grpc.ClientConnInterface, config config.Config) *Client { +func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) { return &Client{ config: config, service: Ydb_Coordination_V1.NewCoordinationServiceClient(cc), - } + }, nil } func (c *Client) CreateNode(ctx context.Context, path string, config coordination.NodeConfig) error { diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index b79c62ba5..864508846 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -18,12 +18,12 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) -func New(cc grpc.ClientConnInterface, config *config.Config) *Client { +func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) (*Client, error) { return &Client{ config: config, cc: cc, client: Ydb_Discovery_V1.NewDiscoveryServiceClient(cc), - } + }, nil } var _ discovery.Client = &Client{} diff --git a/internal/ratelimiter/client.go b/internal/ratelimiter/client.go index df8286a2b..8705d7897 100644 --- a/internal/ratelimiter/client.go +++ b/internal/ratelimiter/client.go @@ -36,11 +36,11 @@ func (c *Client) Close(ctx context.Context) error { return nil } -func New(cc grpc.ClientConnInterface, config config.Config) *Client { +func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) { return &Client{ config: config, service: Ydb_RateLimiter_V1.NewRateLimiterServiceClient(cc), - } + }, nil } func (c *Client) CreateResource( diff --git a/internal/scheme/client.go b/internal/scheme/client.go index ee5604b6e..487c1ee4d 100644 --- a/internal/scheme/client.go +++ b/internal/scheme/client.go @@ -35,11 +35,11 @@ func (c *Client) Close(_ context.Context) error { return nil } -func New(cc grpc.ClientConnInterface, config config.Config) *Client { +func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) { return &Client{ config: config, service: Ydb_Scheme_V1.NewSchemeServiceClient(cc), - } + }, nil } func (c *Client) MakeDirectory(ctx context.Context, path string) (err error) { diff --git a/internal/scripting/client.go b/internal/scripting/client.go index 086c9ee9a..ed7ee2c10 100644 --- a/internal/scripting/client.go +++ b/internal/scripting/client.go @@ -156,11 +156,11 @@ func (c *Client) explain( }() response, err = c.service.ExplainYql(ctx, request) if err != nil { - return + return e, err } err = response.GetOperation().GetResult().UnmarshalTo(&result) if err != nil { - return + return e, err } result.GetParametersTypes() e = table.ScriptingYQLExplanation{ @@ -273,9 +273,9 @@ func (c *Client) Close(ctx context.Context) (err error) { return nil } -func New(cc grpc.ClientConnInterface, config config.Config) *Client { +func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) { return &Client{ config: config, service: Ydb_Scripting_V1.NewScriptingServiceClient(cc), - } + }, nil } diff --git a/internal/table/client.go b/internal/table/client.go index 617f83885..cd9645a32 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -33,22 +33,23 @@ type balancer interface { nodeChecker } -func New(balancer balancer, config *config.Config) *Client { - return newClient(balancer, func(ctx context.Context) (s *session, err error) { +func New(ctx context.Context, balancer balancer, config *config.Config) (*Client, error) { + return newClient(ctx, balancer, func(ctx context.Context) (s *session, err error) { return newSession(ctx, balancer, config) }, config) } func newClient( + ctx context.Context, balancer balancer, builder sessionBuilder, config *config.Config, -) *Client { - var ( - ctx = context.Background() - onDone = trace.TableOnInit(config.Trace(), &ctx) - ) - c := &Client{ +) (c *Client, finalErr error) { + onDone := trace.TableOnInit(config.Trace(), &ctx) + defer func() { + onDone(config.SizeLimit(), finalErr) + }() + c = &Client{ clock: config.Clock(), config: config, cc: balancer, @@ -70,8 +71,8 @@ func newClient( c.wg.Add(1) go c.internalPoolGC(ctx, idleThreshold) } - onDone(c.limit) - return c + + return c, nil } // Client is a set of session instances that may be reused. diff --git a/internal/table/client_test.go b/internal/table/client_test.go index 66a42a964..0ab4a0b61 100644 --- a/internal/table/client_test.go +++ b/internal/table/client_test.go @@ -403,7 +403,8 @@ func TestSessionPoolRacyGet(t *testing.T) { session *session } create := make(chan createReq) - p := newClient( + p, err := newClient( + context.Background(), nil, (&StubBuilder{ Limit: 1, @@ -422,11 +423,11 @@ func TestSessionPoolRacyGet(t *testing.T) { config.WithIdleThreshold(-1), ), ) + require.NoError(t, err) var ( expSession *session done = make(chan struct{}, 2) ) - var err error for i := 0; i < 2; i++ { go func() { defer func() { @@ -855,7 +856,8 @@ func newClientWithStubBuilder( stubLimit int, options ...config.Option, ) *Client { - return newClient( + c, err := newClient( + context.Background(), balancer, (&StubBuilder{ T: t, @@ -864,6 +866,8 @@ func newClientWithStubBuilder( }).createSession, config.New(options...), ) + require.NoError(t, err) + return c } func (s *StubBuilder) createSession(ctx context.Context) (session *session, err error) { diff --git a/internal/table/session_test.go b/internal/table/session_test.go index 8d2281209..d841c5fd4 100644 --- a/internal/table/session_test.go +++ b/internal/table/session_test.go @@ -338,52 +338,50 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { func(t *testing.T) { for _, srcDst := range fromTo { t.Run(srcDst.srcMode.String()+"->"+srcDst.dstMode.String(), func(t *testing.T) { - client := New( - testutil.NewBalancer( - testutil.WithInvokeHandlers( - testutil.InvokeHandlers{ - testutil.TableExecuteDataQuery: func(interface{}) (proto.Message, error) { - return &Ydb_Table.ExecuteQueryResult{ - TxMeta: &Ydb_Table.TransactionMeta{ - Id: "", - }, - }, nil - }, - testutil.TableBeginTransaction: func(interface{}) (proto.Message, error) { - return &Ydb_Table.BeginTransactionResult{ - TxMeta: &Ydb_Table.TransactionMeta{ - Id: "", - }, - }, nil - }, - testutil.TableExplainDataQuery: func(request interface{}) (result proto.Message, err error) { - return &Ydb_Table.ExplainQueryResult{}, nil - }, - testutil.TablePrepareDataQuery: func(request interface{}) (result proto.Message, err error) { - return &Ydb_Table.PrepareQueryResult{}, nil - }, - testutil.TableCreateSession: func(interface{}) (proto.Message, error) { - return &Ydb_Table.CreateSessionResult{ - SessionId: testutil.SessionID(), - }, nil - }, - testutil.TableDeleteSession: func(request interface{}) (result proto.Message, err error) { - return &Ydb_Table.DeleteSessionResponse{}, nil - }, - testutil.TableCommitTransaction: func(request interface{}) (result proto.Message, err error) { - return &Ydb_Table.CommitTransactionResult{}, nil - }, - testutil.TableRollbackTransaction: func(request interface{}) (result proto.Message, err error) { - return &Ydb_Table.RollbackTransactionResponse{}, nil - }, - testutil.TableKeepAlive: func(request interface{}) (result proto.Message, err error) { - return &Ydb_Table.KeepAliveResult{}, nil - }, + client, err := New(context.Background(), testutil.NewBalancer( + testutil.WithInvokeHandlers( + testutil.InvokeHandlers{ + testutil.TableExecuteDataQuery: func(interface{}) (proto.Message, error) { + return &Ydb_Table.ExecuteQueryResult{ + TxMeta: &Ydb_Table.TransactionMeta{ + Id: "", + }, + }, nil }, - ), + testutil.TableBeginTransaction: func(interface{}) (proto.Message, error) { + return &Ydb_Table.BeginTransactionResult{ + TxMeta: &Ydb_Table.TransactionMeta{ + Id: "", + }, + }, nil + }, + testutil.TableExplainDataQuery: func(request interface{}) (result proto.Message, err error) { + return &Ydb_Table.ExplainQueryResult{}, nil + }, + testutil.TablePrepareDataQuery: func(request interface{}) (result proto.Message, err error) { + return &Ydb_Table.PrepareQueryResult{}, nil + }, + testutil.TableCreateSession: func(interface{}) (proto.Message, error) { + return &Ydb_Table.CreateSessionResult{ + SessionId: testutil.SessionID(), + }, nil + }, + testutil.TableDeleteSession: func(request interface{}) (result proto.Message, err error) { + return &Ydb_Table.DeleteSessionResponse{}, nil + }, + testutil.TableCommitTransaction: func(request interface{}) (result proto.Message, err error) { + return &Ydb_Table.CommitTransactionResult{}, nil + }, + testutil.TableRollbackTransaction: func(request interface{}) (result proto.Message, err error) { + return &Ydb_Table.RollbackTransactionResponse{}, nil + }, + testutil.TableKeepAlive: func(request interface{}) (result proto.Message, err error) { + return &Ydb_Table.KeepAliveResult{}, nil + }, + }, ), - config.New(), - ) + ), config.New()) + require.NoError(t, err) ctx, cancel := xcontext.WithTimeout( context.Background(), time.Second, @@ -398,84 +396,83 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { } func TestCreateTableRegression(t *testing.T) { - client := New( - testutil.NewBalancer( - testutil.WithInvokeHandlers( - testutil.InvokeHandlers{ - testutil.TableCreateSession: func(request interface{}) (proto.Message, error) { - return &Ydb_Table.CreateSessionResult{ - SessionId: "", - }, nil - }, - testutil.TableCreateTable: func(act interface{}) (proto.Message, error) { - exp := &Ydb_Table.CreateTableRequest{ - SessionId: "", - Path: "episodes", - Columns: []*Ydb_Table.ColumnMeta{ - { - Name: "series_id", - Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ - OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }}}, - }}, - }, - { - Name: "season_id", - Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ - OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }}}, - }}, - }, - { - Name: "episode_id", - Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ - OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }}}, - }}, - }, - { - Name: "title", - Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ - OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }}}, - }}, - }, - { - Name: "air_date", - Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ - OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }}}, - }}, - }, + client, err := New(context.Background(), testutil.NewBalancer( + testutil.WithInvokeHandlers( + testutil.InvokeHandlers{ + testutil.TableCreateSession: func(request interface{}) (proto.Message, error) { + return &Ydb_Table.CreateSessionResult{ + SessionId: "", + }, nil + }, + testutil.TableCreateTable: func(act interface{}) (proto.Message, error) { + exp := &Ydb_Table.CreateTableRequest{ + SessionId: "", + Path: "episodes", + Columns: []*Ydb_Table.ColumnMeta{ + { + Name: "series_id", + Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ + OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }}}, + }}, }, - PrimaryKey: []string{ - "series_id", - "season_id", - "episode_id", + { + Name: "season_id", + Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ + OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }}}, + }}, }, - OperationParams: &Ydb_Operations.OperationParams{ - OperationMode: Ydb_Operations.OperationParams_SYNC, + { + Name: "episode_id", + Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ + OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }}}, + }}, }, - Attributes: map[string]string{ - "attr": "attr_value", + { + Name: "title", + Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ + OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }}}, + }}, }, - } - if !proto.Equal(exp, act.(proto.Message)) { - //nolint:revive - return nil, fmt.Errorf("proto's not equal: \n\nact: %v\n\nexp: %s\n\n", act, exp) - } - return &Ydb_Table.CreateTableResponse{}, nil - }, + { + Name: "air_date", + Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ + OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }}}, + }}, + }, + }, + PrimaryKey: []string{ + "series_id", + "season_id", + "episode_id", + }, + OperationParams: &Ydb_Operations.OperationParams{ + OperationMode: Ydb_Operations.OperationParams_SYNC, + }, + Attributes: map[string]string{ + "attr": "attr_value", + }, + } + if !proto.Equal(exp, act.(proto.Message)) { + //nolint:revive + return nil, fmt.Errorf("proto's not equal: \n\nact: %v\n\nexp: %s\n\n", act, exp) + } + return &Ydb_Table.CreateTableResponse{}, nil }, - ), + }, ), - config.New(), - ) + ), config.New()) + + require.NoError(t, err) ctx, cancel := xcontext.WithTimeout( context.Background(), @@ -483,7 +480,7 @@ func TestCreateTableRegression(t *testing.T) { ) defer cancel() - err := client.Do(ctx, func(ctx context.Context, s table.Session) error { + err = client.Do(ctx, func(ctx context.Context, s table.Session) error { return s.CreateTable(ctx, "episodes", options.WithColumn("series_id", types.Optional(types.TypeUint64)), options.WithColumn("season_id", types.Optional(types.TypeUint64)), @@ -499,77 +496,76 @@ func TestCreateTableRegression(t *testing.T) { } func TestDescribeTableRegression(t *testing.T) { - client := New( - testutil.NewBalancer( - testutil.WithInvokeHandlers( - testutil.InvokeHandlers{ - testutil.TableCreateSession: func(request interface{}) (proto.Message, error) { - return &Ydb_Table.CreateSessionResult{ - SessionId: "", - }, nil - }, - testutil.TableDescribeTable: func(act interface{}) (proto.Message, error) { - return &Ydb_Table.DescribeTableResult{ - Self: &Ydb_Scheme.Entry{ - Name: "episodes", + client, err := New(context.Background(), testutil.NewBalancer( + testutil.WithInvokeHandlers( + testutil.InvokeHandlers{ + testutil.TableCreateSession: func(request interface{}) (proto.Message, error) { + return &Ydb_Table.CreateSessionResult{ + SessionId: "", + }, nil + }, + testutil.TableDescribeTable: func(act interface{}) (proto.Message, error) { + return &Ydb_Table.DescribeTableResult{ + Self: &Ydb_Scheme.Entry{ + Name: "episodes", + }, + Columns: []*Ydb_Table.ColumnMeta{ + { + Name: "series_id", + Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ + OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }}}, + }}, }, - Columns: []*Ydb_Table.ColumnMeta{ - { - Name: "series_id", - Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ - OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }}}, - }}, - }, - { - Name: "season_id", - Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ - OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }}}, - }}, - }, - { - Name: "episode_id", - Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ - OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }}}, - }}, - }, - { - Name: "title", - Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ - OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }}}, - }}, - }, - { - Name: "air_date", - Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ - OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }}}, - }}, - }, + { + Name: "season_id", + Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ + OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }}}, + }}, }, - PrimaryKey: []string{ - "series_id", - "season_id", - "episode_id", + { + Name: "episode_id", + Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ + OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }}}, + }}, }, - Attributes: map[string]string{ - "attr": "attr_value", + { + Name: "title", + Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ + OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }}}, + }}, }, - }, nil - }, + { + Name: "air_date", + Type: &Ydb.Type{Type: &Ydb.Type_OptionalType{ + OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }}}, + }}, + }, + }, + PrimaryKey: []string{ + "series_id", + "season_id", + "episode_id", + }, + Attributes: map[string]string{ + "attr": "attr_value", + }, + }, nil }, - ), + }, ), - config.New(), - ) + ), config.New()) + + require.NoError(t, err) ctx, cancel := xcontext.WithTimeout( context.Background(), @@ -579,7 +575,7 @@ func TestDescribeTableRegression(t *testing.T) { var act options.Description - err := client.Do(ctx, func(ctx context.Context, s table.Session) (err error) { + err = client.Do(ctx, func(ctx context.Context, s table.Session) (err error) { act, err = s.DescribeTable(ctx, "episodes") return err }) diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index 8744c1ff3..d774a30fb 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -27,7 +27,12 @@ type Client struct { rawClient rawtopic.Client } -func New(conn grpc.ClientConnInterface, cred credentials.Credentials, opts ...topicoptions.TopicOption) *Client { +func New( + ctx context.Context, + conn grpc.ClientConnInterface, + cred credentials.Credentials, + opts ...topicoptions.TopicOption, +) (*Client, error) { rawClient := rawtopic.NewClient(Ydb_Topic_V1.NewTopicServiceClient(conn)) cfg := newTopicConfig(opts...) @@ -40,7 +45,7 @@ func New(conn grpc.ClientConnInterface, cred credentials.Credentials, opts ...to cred: cred, defaultOperationParams: defaultOperationParams, rawClient: rawClient, - } + }, nil } func newTopicConfig(opts ...topicoptions.TopicOption) topic.Config { diff --git a/retry/retry.go b/retry/retry.go index 388316b12..5bb224427 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -275,7 +275,7 @@ func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr err }() if err == nil { - return + return nil } if ctxErr := ctx.Err(); ctxErr != nil { diff --git a/trace/table.go b/trace/table.go index 049ef55d7..e4a70546e 100644 --- a/trace/table.go +++ b/trace/table.go @@ -295,6 +295,7 @@ type ( } TableInitDoneInfo struct { Limit int + Error error } TablePoolStateChangeInfo struct { Size int diff --git a/trace/table_gtrace.go b/trace/table_gtrace.go index a34fc178d..9abbd0ce7 100644 --- a/trace/table_gtrace.go +++ b/trace/table_gtrace.go @@ -1400,13 +1400,14 @@ func (t *Table) onPoolWait(t1 TablePoolWaitStartInfo) func(TablePoolWaitDoneInfo } return res } -func TableOnInit(t *Table, c *context.Context) func(limit int) { +func TableOnInit(t *Table, c *context.Context) func(limit int, _ error) { var p TableInitStartInfo p.Context = c res := t.onInit(p) - return func(limit int) { + return func(limit int, e error) { var p TableInitDoneInfo p.Limit = limit + p.Error = e res(p) } } From c5244de1dca4600479e94bf0521e6c3c096b7d01 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 7 Nov 2023 19:02:26 +0300 Subject: [PATCH 4/4] fix linter issue --- internal/xerrors/join_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/xerrors/join_test.go b/internal/xerrors/join_test.go index b970a3f61..a3db6a55c 100644 --- a/internal/xerrors/join_test.go +++ b/internal/xerrors/join_test.go @@ -46,7 +46,7 @@ func TestUnwrapJoined(t *testing.T) { var joined error = Join(err1, err2) - unwrappable := joined.(interface{ Unwrap() []error }) + unwrappable := joined.(interface{ Unwrap() []error }) //nolint:errorlint inners := unwrappable.Unwrap() assert.Contains(t, inners, err1) assert.Contains(t, inners, err2)