Skip to content

Commit

Permalink
refactored query stats: implemetation instead interface
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Jan 16, 2025
1 parent f3860a6 commit 1d838c9
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 86 deletions.
2 changes: 1 addition & 1 deletion internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type executeSettings interface {
ExecMode() options.ExecMode
StatsMode() options.StatsMode
StatsCallback() func(stats stats.QueryStats)
StatsCallback() func(stats *stats.QueryStats)
TxControl() *query.TransactionControl
Syntax() options.Syntax
Params() params.Parameters
Expand Down
8 changes: 4 additions & 4 deletions internal/query/options/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type (
execMode ExecMode
statsMode StatsMode
resourcePool string
statsCallback func(queryStats stats.QueryStats)
statsCallback func(queryStats *stats.QueryStats)
callOptions []grpc.CallOption
txControl *tx.Control
retryOptions []retry.Option
Expand All @@ -59,7 +59,7 @@ type (
syntaxOption = Syntax
statsModeOption struct {
mode StatsMode
callback func(stats.QueryStats)
callback func(*stats.QueryStats)
}
execModeOption = ExecMode
responsePartLimitBytes int64
Expand All @@ -73,7 +73,7 @@ func (s *executeSettings) RetryOpts() []retry.Option {
return s.retryOptions
}

func (s *executeSettings) StatsCallback() func(stats.QueryStats) {
func (s *executeSettings) StatsCallback() func(*stats.QueryStats) {
return s.statsCallback
}

Expand Down Expand Up @@ -225,7 +225,7 @@ func (opt statsModeOption) applyExecuteOption(s *executeSettings) {
s.statsCallback = opt.callback
}

func WithStatsMode(mode StatsMode, callback func(stats.QueryStats)) statsModeOption {
func WithStatsMode(mode StatsMode, callback func(*stats.QueryStats)) statsModeOption {
return statsModeOption{
mode: mode,
callback: callback,
Expand Down
2 changes: 1 addition & 1 deletion internal/query/options/execute_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type (
Query string
}
Mode ExecMode
Stats stats.QueryStats
Stats *stats.QueryStats
ResultSetsMeta []struct {
Columns []struct {
Name string
Expand Down
4 changes: 2 additions & 2 deletions internal/query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type (
resultSetIndex int64
closed chan struct{}
trace *trace.Query
statsCallback func(queryStats stats.QueryStats)
statsCallback func(queryStats *stats.QueryStats)
onClose []func()
onNextPartErr []func(err error)
onTxMeta []func(txMeta *Ydb_Query.TransactionMeta)
Expand Down Expand Up @@ -93,7 +93,7 @@ func withTrace(t *trace.Query) resultOption {
}
}

func withStatsCallback(callback func(queryStats stats.QueryStats)) resultOption {
func withStatsCallback(callback func(queryStats *stats.QueryStats)) resultOption {
return func(s *streamResult) {
s.statsCallback = callback
}
Expand Down
32 changes: 16 additions & 16 deletions internal/query/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1926,8 +1926,8 @@ func TestResultStats(t *testing.T) {
},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
var s *stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -2287,8 +2287,8 @@ func TestResultStats(t *testing.T) {
},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
var s *stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -2649,8 +2649,8 @@ func TestResultStats(t *testing.T) {
},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
var s *stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -2986,8 +2986,8 @@ func TestResultStats(t *testing.T) {
},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
var s *stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -3358,8 +3358,8 @@ func TestMaterializedResultStats(t *testing.T) {
},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
var s *stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -3719,8 +3719,8 @@ func TestMaterializedResultStats(t *testing.T) {
},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
var s *stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -4081,8 +4081,8 @@ func TestMaterializedResultStats(t *testing.T) {
},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
var s *stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -4418,8 +4418,8 @@ func TestMaterializedResultStats(t *testing.T) {
},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
var s *stats.QueryStats
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down
75 changes: 26 additions & 49 deletions internal/stats/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,6 @@ import (
)

type (
// QueryStats holds query execution statistics.
QueryStats interface {
ProcessCPUTime() time.Duration
Compilation() (c *CompilationStats)
QueryPlan() string
QueryAST() string
TotalCPUTime() time.Duration
TotalDuration() time.Duration

// NextPhase returns next execution phase within query.
// If ok flag is false, then there are no more phases and p is invalid.
NextPhase() (p QueryPhase, ok bool)
}
// QueryPhase holds query execution phase statistics.
QueryPhase interface {
// NextTableAccess returns next accessed table within query execution phase.
// If ok flag is false, then there are no more accessed tables and t is invalid.
NextTableAccess() (t *TableAccess, ok bool)
Duration() time.Duration
CPUTime() time.Duration
AffectedShards() uint64
IsLiteralPhase() bool
}
OperationStats struct {
Rows uint64
Bytes uint64
Expand All @@ -55,13 +32,13 @@ type (
Duration time.Duration
CPUTime time.Duration
}
// queryStats holds query execution statistics.
queryStats struct {
// QueryStats holds query execution statistics.
QueryStats struct {
pb *Ydb_TableStats.QueryStats
pos int
}
// queryPhase holds query execution phase statistics.
queryPhase struct {
// QueryPhase holds query execution phase statistics.
QueryPhase struct {
pb *Ydb_TableStats.QueryPhaseStats
pos int
}
Expand All @@ -86,33 +63,33 @@ func fromOperationStats(pb *Ydb_TableStats.OperationStats) OperationStats {
}
}

func (s *queryStats) ProcessCPUTime() time.Duration {
func (s *QueryStats) ProcessCPUTime() time.Duration {
return fromUs(s.pb.GetProcessCpuTimeUs())
}

func (s *queryStats) Compilation() (c *CompilationStats) {
func (s *QueryStats) Compilation() (c *CompilationStats) {
return fromCompilationStats(s.pb.GetCompilation())
}

func (s *queryStats) QueryPlan() string {
func (s *QueryStats) QueryPlan() string {
return s.pb.GetQueryPlan()
}

func (s *queryStats) QueryAST() string {
func (s *QueryStats) QueryAST() string {
return s.pb.GetQueryAst()
}

func (s *queryStats) TotalCPUTime() time.Duration {
func (s *QueryStats) TotalCPUTime() time.Duration {
return fromUs(s.pb.GetTotalCpuTimeUs())
}

func (s *queryStats) TotalDuration() time.Duration {
func (s *QueryStats) TotalDuration() time.Duration {
return fromUs(s.pb.GetTotalDurationUs())
}

// NextPhase returns next execution phase within query.
// If ok flag is false, then there are no more phases and p is invalid.
func (s *queryStats) NextPhase() (p QueryPhase, ok bool) {
func (s *QueryStats) NextPhase() (p QueryPhase, ok bool) {
if s.pos >= len(s.pb.GetQueryPhases()) {
return
}
Expand All @@ -122,7 +99,7 @@ func (s *queryStats) NextPhase() (p QueryPhase, ok bool) {
}
s.pos++

return &queryPhase{
return QueryPhase{
pb: pb,
}, true
}
Expand All @@ -131,12 +108,12 @@ func (s *queryStats) NextPhase() (p QueryPhase, ok bool) {
//
// If ok flag is false, then there are no more accessed tables and t is
// invalid.
func (queryPhase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) {
if queryPhase.pos >= len(queryPhase.pb.GetTableAccess()) {
func (QueryPhase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) {
if QueryPhase.pos >= len(QueryPhase.pb.GetTableAccess()) {
return
}
pb := queryPhase.pb.GetTableAccess()[queryPhase.pos]
queryPhase.pos++
pb := QueryPhase.pb.GetTableAccess()[QueryPhase.pos]
QueryPhase.pos++

return &TableAccess{
Name: pb.GetName(),
Expand All @@ -147,28 +124,28 @@ func (queryPhase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) {
}, true
}

func (queryPhase *queryPhase) Duration() time.Duration {
return fromUs(queryPhase.pb.GetDurationUs())
func (QueryPhase *QueryPhase) Duration() time.Duration {
return fromUs(QueryPhase.pb.GetDurationUs())
}

func (queryPhase *queryPhase) CPUTime() time.Duration {
return fromUs(queryPhase.pb.GetCpuTimeUs())
func (QueryPhase *QueryPhase) CPUTime() time.Duration {
return fromUs(QueryPhase.pb.GetCpuTimeUs())
}

func (queryPhase *queryPhase) AffectedShards() uint64 {
return queryPhase.pb.GetAffectedShards()
func (QueryPhase *QueryPhase) AffectedShards() uint64 {
return QueryPhase.pb.GetAffectedShards()
}

func (queryPhase *queryPhase) IsLiteralPhase() bool {
return queryPhase.pb.GetLiteralPhase()
func (QueryPhase *QueryPhase) IsLiteralPhase() bool {
return QueryPhase.pb.GetLiteralPhase()
}

func FromQueryStats(pb *Ydb_TableStats.QueryStats) QueryStats {
func FromQueryStats(pb *Ydb_TableStats.QueryStats) *QueryStats {
if pb == nil {
return nil
}

return &queryStats{
return &QueryStats{
pb: pb,
}
}
2 changes: 1 addition & 1 deletion internal/table/scanner/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (r *baseResult) CurrentResultSet() result.Set {
}

// Stats returns query execution queryStats.
func (r *baseResult) Stats() stats.QueryStats {
func (r *baseResult) Stats() *stats.QueryStats {
r.statsMtx.RLock()
defer r.statsMtx.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion internal/xsql/propose/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *Conn) Explain(ctx context.Context, sql string, _ *params.Params) (ast s
_, err := c.session.Query(
ctx, sql,
options.WithExecMode(options.ExecModeExplain),
options.WithStatsMode(options.StatsModeNone, func(stats stats.QueryStats) {
options.WithStatsMode(options.StatsModeNone, func(stats *stats.QueryStats) {
ast = stats.QueryAST()
plan = stats.QueryPlan()
}),
Expand Down
18 changes: 9 additions & 9 deletions query/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func Example_resultStats() {
id int32 // required value
myStr string // required value
)
var stats query.Stats
var s query.Stats
// Do retry operation on errors with best effort
row, err := db.Query().QueryRow(ctx, // context manage exiting from Do
`SELECT CAST($id AS Uint64) AS id, CAST($myStr AS Text) AS myStr`,
Expand All @@ -256,8 +256,8 @@ func Example_resultStats() {
Param("$myStr").Text("123").
Build(),
),
query.WithStatsMode(query.StatsModeFull, func(s query.Stats) {
stats = s
query.WithStatsMode(query.StatsModeFull, func(stats query.Stats) {
s = stats
}),
query.WithIdempotent(),
)
Expand All @@ -275,14 +275,14 @@ func Example_resultStats() {

fmt.Printf("id=%v, myStr='%s'\n", id, myStr)
fmt.Println("Stats:")
fmt.Printf("- Compilation='%v'\n", stats.Compilation())
fmt.Printf("- TotalCPUTime='%v'\n", stats.TotalCPUTime())
fmt.Printf("- ProcessCPUTime='%v'\n", stats.ProcessCPUTime())
fmt.Printf("- QueryAST='%v'\n", stats.QueryAST())
fmt.Printf("- QueryPlan='%v'\n", stats.QueryPlan())
fmt.Printf("- Compilation='%v'\n", s.Compilation())
fmt.Printf("- TotalCPUTime='%v'\n", s.TotalCPUTime())
fmt.Printf("- ProcessCPUTime='%v'\n", s.ProcessCPUTime())
fmt.Printf("- QueryAST='%v'\n", s.QueryAST())
fmt.Printf("- QueryPlan='%v'\n", s.QueryPlan())
fmt.Println("- Phases:")
for {
phase, ok := stats.NextPhase()
phase, ok := s.NextPhase()
if !ok {
break
}
Expand Down
2 changes: 1 addition & 1 deletion query/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ type (

Begin(ctx context.Context, txSettings TransactionSettings) (Transaction, error)
}
Stats = stats.QueryStats
Stats = *stats.QueryStats
)
4 changes: 3 additions & 1 deletion table/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type BaseResult interface {
// Stats returns query execution QueryStats.
//
// If query result have no stats - returns nil
Stats() (s stats.QueryStats)
Stats() (s Stats)

// Err return scanner error
// To handle errors, do not need to check after scanning each row
Expand All @@ -130,3 +130,5 @@ type Result interface {
type StreamResult interface {
BaseResult
}

type Stats = *stats.QueryStats

0 comments on commit 1d838c9

Please sign in to comment.