Skip to content

Commit

Permalink
* Added environment variable `YDB_EXECUTE_DATA_QUERY_OVER_QUERY_SERVI…
Browse files Browse the repository at this point in the history
…CE` for execute data queries from table service client using query client API
  • Loading branch information
asmyasnikov committed Jan 23, 2025
1 parent 6c58f66 commit e640e6f
Show file tree
Hide file tree
Showing 20 changed files with 518 additions and 216 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ jobs:
YDB_SSL_ROOT_CERTIFICATES_FILE: /tmp/ydb_certs/ca.pem
YDB_SESSIONS_SHUTDOWN_URLS: http://localhost:8765/actors/kqp_proxy?force_shutdown=all
YDB_DATABASE_SQL_OVER_QUERY_SERVICE: 1
YDB_TABLE_CLIENT_USE_QUERY_SESSION: 1
YDB_EXECUTE_DATA_QUERY_OVER_QUERY_SERVICE: 1
HIDE_APPLICATION_OUTPUT: 1
steps:
- name: Checkout code
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
* Added environment variable `YDB_TABLE_CLIENT_USE_QUERY_SESSION` for create session in table client using query service client API
* Added environment variable `YDB_EXECUTE_DATA_QUERY_OVER_QUERY_SERVICE` for execute data queries from table service client using query client API

## v3.98.0
* Supported pool of encoders, which implement ResetableWriter interface
Expand Down
8 changes: 4 additions & 4 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
onDone(finalErr)
}()

row, err := clientQueryRow(ctx, c.pool, q, options.ExecuteSettings(opts...), withTrace(c.config.Trace()))
row, err := clientQueryRow(ctx, c.pool, q, options.ExecuteSettings(opts...), WithTrace(c.config.Trace()))
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand All @@ -337,7 +337,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) {
settings := options.ExecuteSettings(opts...)
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace))
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, WithTrace(s.trace))
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -382,7 +382,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
settings := options.ExecuteSettings(opts...)
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
streamResult, err := execute(ctx, s.ID(), s.client, q,
options.ExecuteSettings(opts...), withTrace(s.trace),
options.ExecuteSettings(opts...), WithTrace(s.trace),
)
if err != nil {
return xerrors.WithStackTrace(err)
Expand Down Expand Up @@ -467,7 +467,7 @@ func (c *Client) QueryResultSet(
onDone(finalErr)
}()

rs, err := clientQueryResultSet(ctx, c.pool, q, options.ExecuteSettings(opts...), withTrace(c.config.Trace()))
rs, err := clientQueryResultSet(ctx, c.pool, q, options.ExecuteSettings(opts...), WithTrace(c.config.Trace()))
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func execute(
}

r, err := newResult(ctx, stream, append(opts,
withStatsCallback(settings.StatsCallback()),
withOnClose(executeCancel),
WithStatsCallback(settings.StatsCallback()),
WithOnClose(executeCancel),
)...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
Expand Down
6 changes: 3 additions & 3 deletions internal/query/execute_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestExecute(t *testing.T) {
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
var txID string
r, err := execute(ctx, "123", client, "", options.ExecuteSettings(),
onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
txID = txMeta.GetId()
}),
)
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestExecute(t *testing.T) {
t.Log("execute")
var txID string
r, err := execute(ctx, "123", client, "", options.ExecuteSettings(),
onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
txID = txMeta.GetId()
}),
)
Expand Down Expand Up @@ -718,7 +718,7 @@ func TestExecute(t *testing.T) {
t.Log("execute")
var txID string
r, err := execute(ctx, "123", client, "", options.ExecuteSettings(),
onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
txID = txMeta.GetId()
}),
)
Expand Down
10 changes: 5 additions & 5 deletions internal/query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,31 +87,31 @@ func (r *materializedResult) NextResultSet(ctx context.Context) (result.Set, err
return r.resultSets[r.idx], nil
}

func withTrace(t *trace.Query) resultOption {
func WithTrace(t *trace.Query) resultOption {
return func(s *streamResult) {
s.trace = t
}
}

func withStatsCallback(callback func(queryStats stats.QueryStats)) resultOption {
func WithStatsCallback(callback func(queryStats stats.QueryStats)) resultOption {
return func(s *streamResult) {
s.statsCallback = callback
}
}

func withOnClose(onClose func()) resultOption {
func WithOnClose(onClose func()) resultOption {
return func(s *streamResult) {
s.onClose = append(s.onClose, onClose)
}
}

func onNextPartErr(callback func(err error)) resultOption {
func OnNextPartErr(callback func(err error)) resultOption {
return func(s *streamResult) {
s.onNextPartErr = append(s.onNextPartErr, callback)
}
}

func onTxMeta(callback func(txMeta *Ydb_Query.TransactionMeta)) resultOption {
func OnTxMeta(callback func(txMeta *Ydb_Query.TransactionMeta)) resultOption {
return func(s *streamResult) {
s.onTxMeta = append(s.onTxMeta, callback)
}
Expand Down
18 changes: 9 additions & 9 deletions internal/query/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,7 +1544,7 @@ func TestCloseResultOnCloseClosableResultSet(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var closed bool
r, err := newResult(ctx, stream, withTrace(&trace.Query{
r, err := newResult(ctx, stream, WithTrace(&trace.Query{
OnResultClose: func(info trace.QueryResultCloseStartInfo) func(info trace.QueryResultCloseDoneInfo) {
require.False(t, closed)
closed = true
Expand Down Expand Up @@ -1927,7 +1927,7 @@ func TestResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -2288,7 +2288,7 @@ func TestResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -2650,7 +2650,7 @@ func TestResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -2987,7 +2987,7 @@ func TestResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -3359,7 +3359,7 @@ func TestMaterializedResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -3720,7 +3720,7 @@ func TestMaterializedResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -4082,7 +4082,7 @@ func TestMaterializedResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -4419,7 +4419,7 @@ func TestMaterializedResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions internal/query/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *Session) QueryResultSet(
onDone(finalErr)
}()

r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace))
r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), WithTrace(s.trace))
if err != nil {
s.setStatusFromError(err)

Expand Down Expand Up @@ -77,7 +77,7 @@ func (s *Session) QueryRow(ctx context.Context, q string, opts ...options.Execut
onDone(finalErr)
}()

row, err := s.queryRow(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace))
row, err := s.queryRow(ctx, q, options.ExecuteSettings(opts...), WithTrace(s.trace))
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func (s *Session) Exec(
onDone(finalErr)
}()

r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace))
r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), WithTrace(s.trace))
if err != nil {
s.setStatusFromError(err)

Expand All @@ -169,7 +169,7 @@ func (s *Session) Query(
onDone(finalErr)
}()

r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace))
r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), WithTrace(s.trace))
if err != nil {
s.setStatusFromError(err)

Expand Down
24 changes: 12 additions & 12 deletions internal/query/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (tx *Transaction) QueryResultSet(
}

resultOpts := []resultOption{
withTrace(tx.s.trace),
onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
WithTrace(tx.s.trace),
OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
tx.SetTxID(txMeta.GetId())
}),
}
Expand All @@ -107,7 +107,7 @@ func (tx *Transaction) QueryResultSet(
// notification about complete transaction must be sended for any error or for successfully read all result if
// it was execution with commit flag
resultOpts = append(resultOpts,
onNextPartErr(func(err error) {
OnNextPartErr(func(err error) {
tx.notifyOnCompleted(xerrors.HideEOF(err))
}),
)
Expand Down Expand Up @@ -144,8 +144,8 @@ func (tx *Transaction) QueryRow(
)

resultOpts := []resultOption{
withTrace(tx.s.trace),
onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
WithTrace(tx.s.trace),
OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
tx.SetTxID(txMeta.GetId())
}),
}
Expand All @@ -158,7 +158,7 @@ func (tx *Transaction) QueryRow(
// notification about complete transaction must be sended for any error or for successfully read all result if
// it was execution with commit flag
resultOpts = append(resultOpts,
onNextPartErr(func(err error) {
OnNextPartErr(func(err error) {
tx.notifyOnCompleted(xerrors.HideEOF(err))
}),
)
Expand Down Expand Up @@ -211,8 +211,8 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu
}

resultOpts := []resultOption{
withTrace(tx.s.trace),
onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
WithTrace(tx.s.trace),
OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
tx.SetTxID(txMeta.GetId())
}),
}
Expand All @@ -225,7 +225,7 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu
// notification about complete transaction must be sended for any error or for successfully read all result if
// it was execution with commit flag
resultOpts = append(resultOpts,
onNextPartErr(func(err error) {
OnNextPartErr(func(err error) {
tx.notifyOnCompleted(xerrors.HideEOF(err))
}),
)
Expand Down Expand Up @@ -282,8 +282,8 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec
}

resultOpts := []resultOption{
withTrace(tx.s.trace),
onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
WithTrace(tx.s.trace),
OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
tx.SetTxID(txMeta.GetId())
}),
}
Expand All @@ -296,7 +296,7 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec
// notification about complete transaction must be sended for any error or for successfully read all result if
// it was execution with commit flag
resultOpts = append(resultOpts,
onNextPartErr(func(err error) {
OnNextPartErr(func(err error) {
tx.notifyOnCompleted(xerrors.HideEOF(err))
}),
)
Expand Down
30 changes: 15 additions & 15 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

// sessionBuilder is the interface that holds logic of creating sessions.
type sessionBuilder func(ctx context.Context) (*session, error)
type sessionBuilder func(ctx context.Context) (*Session, error)

func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) *Client {
onDone := trace.TableOnInit(config.Trace(), &ctx,
Expand All @@ -30,28 +30,28 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config
clock: config.Clock(),
config: config,
cc: cc,
build: func(ctx context.Context) (s *session, err error) {
build: func(ctx context.Context) (s *Session, err error) {
return newSession(ctx, cc, config)
},
pool: pool.New[*session, session](ctx,
pool.WithLimit[*session, session](config.SizeLimit()),
pool.WithItemUsageLimit[*session, session](config.SessionUsageLimit()),
pool.WithIdleTimeToLive[*session, session](config.IdleThreshold()),
pool.WithCreateItemTimeout[*session, session](config.CreateSessionTimeout()),
pool.WithCloseItemTimeout[*session, session](config.DeleteTimeout()),
pool.WithClock[*session, session](config.Clock()),
pool.WithCreateItemFunc[*session, session](func(ctx context.Context) (*session, error) {
pool: pool.New[*Session, Session](ctx,
pool.WithLimit[*Session, Session](config.SizeLimit()),
pool.WithItemUsageLimit[*Session, Session](config.SessionUsageLimit()),
pool.WithIdleTimeToLive[*Session, Session](config.IdleThreshold()),
pool.WithCreateItemTimeout[*Session, Session](config.CreateSessionTimeout()),
pool.WithCloseItemTimeout[*Session, Session](config.DeleteTimeout()),
pool.WithClock[*Session, Session](config.Clock()),
pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) {
return newSession(ctx, cc, config)
}),
pool.WithTrace[*session, session](&pool.Trace{
pool.WithTrace[*Session, Session](&pool.Trace{
OnNew: func(ctx *context.Context, call stack.Caller) func(limit int) {
return func(limit int) {
onDone(limit)
}
},
OnPut: func(ctx *context.Context, call stack.Caller, item any) func(err error) {
onDone := trace.TableOnPoolPut( //nolint:forcetypeassert
config.Trace(), ctx, call, item.(*session),
config.Trace(), ctx, call, item.(*Session),
)

return func(err error) {
Expand All @@ -62,7 +62,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config
onDone := trace.TableOnPoolGet(config.Trace(), ctx, call)

return func(item any, attempts int, err error) {
onDone(item.(*session), attempts, err) //nolint:forcetypeassert
onDone(item.(*Session), attempts, err) //nolint:forcetypeassert
}
},
OnWith: func(ctx *context.Context, call stack.Caller) func(attempts int, err error) {
Expand Down Expand Up @@ -102,7 +102,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
if c.isClosed() {
return nil, xerrors.WithStackTrace(errClosedClient)
}
createSession := func(ctx context.Context) (*session, error) {
createSession := func(ctx context.Context) (*Session, error) {
s, err := c.build(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(err)
Expand All @@ -125,7 +125,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).CreateSession"),
)
attempts = 0
s *session
s *Session
)
defer func() {
if s != nil {
Expand Down
Loading

0 comments on commit e640e6f

Please sign in to comment.