Skip to content

Commit

Permalink
rolled back changes
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Jan 16, 2025
1 parent 62cd2d6 commit bb89f1e
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 78 deletions.
115 changes: 87 additions & 28 deletions internal/stats/query.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,41 @@
package stats

import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
"time"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats"
)

type (
// QueryStats holds query execution statistics.
QueryStats interface {
ProcessCPUTime() time.Duration
Compilation() (c *CompilationStats)
QueryPlan() string
QueryAST() string
TotalCPUTime() time.Duration
TotalDuration() time.Duration

// NextPhase returns next execution phase within query.
// If ok flag is false, then there are no more phases and p is invalid.
NextPhase() (p QueryPhase, ok bool)

// QueryPhases is a range iterator over query phases.
QueryPhases() xiter.Seq[QueryPhase]
}
// QueryPhase holds query execution phase statistics.
QueryPhase interface {
// NextTableAccess returns next accessed table within query execution phase.
// If ok flag is false, then there are no more accessed tables and t is invalid.
NextTableAccess() (t *TableAccess, ok bool)
// TableAccess is a range iterator over query execution phase's accessed tables.
TableAccess() xiter.Seq[*TableAccess]
Duration() time.Duration
CPUTime() time.Duration
AffectedShards() uint64
IsLiteralPhase() bool
}
OperationStats struct {
Rows uint64
Bytes uint64
Expand All @@ -32,13 +61,13 @@ type (
Duration time.Duration
CPUTime time.Duration
}
// QueryStats holds query execution statistics.
QueryStats struct {
// queryStats holds query execution statistics.
queryStats struct {
pb *Ydb_TableStats.QueryStats
pos int
}
// QueryPhase holds query execution phase statistics.
QueryPhase struct {
// queryPhase holds query execution phase statistics.
queryPhase struct {
pb *Ydb_TableStats.QueryPhaseStats
pos int
}
Expand All @@ -63,52 +92,65 @@ func fromOperationStats(pb *Ydb_TableStats.OperationStats) OperationStats {
}
}

func (s *QueryStats) ProcessCPUTime() time.Duration {
return fromUs(s.pb.GetProcessCpuTimeUs())
func (stats *queryStats) ProcessCPUTime() time.Duration {
return fromUs(stats.pb.GetProcessCpuTimeUs())
}

func (s *QueryStats) Compilation() (c *CompilationStats) {
return fromCompilationStats(s.pb.GetCompilation())
func (stats *queryStats) Compilation() (c *CompilationStats) {
return fromCompilationStats(stats.pb.GetCompilation())
}

func (s *QueryStats) QueryPlan() string {
return s.pb.GetQueryPlan()
func (stats *queryStats) QueryPlan() string {
return stats.pb.GetQueryPlan()
}

func (s *QueryStats) QueryAST() string {
return s.pb.GetQueryAst()
func (stats *queryStats) QueryAST() string {
return stats.pb.GetQueryAst()
}

func (s *QueryStats) TotalCPUTime() time.Duration {
return fromUs(s.pb.GetTotalCpuTimeUs())
func (stats *queryStats) TotalCPUTime() time.Duration {
return fromUs(stats.pb.GetTotalCpuTimeUs())
}

func (s *QueryStats) TotalDuration() time.Duration {
return fromUs(s.pb.GetTotalDurationUs())
func (stats *queryStats) TotalDuration() time.Duration {
return fromUs(stats.pb.GetTotalDurationUs())
}

// NextPhase returns next execution phase within query.
// If ok flag is false, then there are no more phases and p is invalid.
func (s *QueryStats) NextPhase() (p QueryPhase, ok bool) {
if s.pos >= len(s.pb.GetQueryPhases()) {
func (stats *queryStats) NextPhase() (p QueryPhase, ok bool) {
if stats.pos >= len(stats.pb.GetQueryPhases()) {
return
}
pb := s.pb.GetQueryPhases()[s.pos]
pb := stats.pb.GetQueryPhases()[stats.pos]
if pb == nil {
return
}
s.pos++
stats.pos++

return QueryPhase{
return &queryPhase{
pb: pb,
}, true
}

func (stats *queryStats) QueryPhases() xiter.Seq[QueryPhase] {
return func(yield func(p QueryPhase) bool) {
for _, pb := range stats.pb.GetQueryPhases() {
cont := yield(&queryPhase{
pb: pb,
})
if !cont {
return
}
}
}
}

// NextTableAccess returns next accessed table within query execution phase.
//
// If ok flag is false, then there are no more accessed tables and t is
// invalid.
func (phase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) {
func (phase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) {
if phase.pos >= len(phase.pb.GetTableAccess()) {
return
}
Expand All @@ -124,28 +166,45 @@ func (phase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) {
}, true
}

func (phase *QueryPhase) Duration() time.Duration {
func (phase *queryPhase) TableAccess() xiter.Seq[*TableAccess] {
return func(yield func(access *TableAccess) bool) {
for _, pb := range phase.pb.GetTableAccess() {
cont := yield(&TableAccess{
Name: pb.GetName(),
Reads: fromOperationStats(pb.GetReads()),
Updates: fromOperationStats(pb.GetUpdates()),
Deletes: fromOperationStats(pb.GetDeletes()),
PartitionsCount: pb.GetPartitionsCount(),
})
if !cont {
return
}
}
}
}

func (phase *queryPhase) Duration() time.Duration {
return fromUs(phase.pb.GetDurationUs())
}

func (phase *QueryPhase) CPUTime() time.Duration {
func (phase *queryPhase) CPUTime() time.Duration {
return fromUs(phase.pb.GetCpuTimeUs())
}

func (phase *QueryPhase) AffectedShards() uint64 {
func (phase *queryPhase) AffectedShards() uint64 {
return phase.pb.GetAffectedShards()
}

func (phase *QueryPhase) IsLiteralPhase() bool {
func (phase *queryPhase) IsLiteralPhase() bool {
return phase.pb.GetLiteralPhase()
}

func FromQueryStats(pb *Ydb_TableStats.QueryStats) *QueryStats {
func FromQueryStats(pb *Ydb_TableStats.QueryStats) QueryStats {
if pb == nil {
return nil
}

return &QueryStats{
return &queryStats{
pb: pb,
}
}
96 changes: 47 additions & 49 deletions internal/stats/query_go1.23_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,85 +5,84 @@ package stats
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats"
)

func TestIterateOverQueryPhases(t *testing.T) {
s := &QueryStats{
pb: &Ydb_TableStats.QueryStats{
QueryPhases: []*Ydb_TableStats.QueryPhaseStats{
{
DurationUs: 1,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "a",
},
{
Name: "b",
},
{
Name: "c",
},
s := FromQueryStats(&Ydb_TableStats.QueryStats{
QueryPhases: []*Ydb_TableStats.QueryPhaseStats{
{
DurationUs: 1,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "a",
},
{
Name: "b",
},
{
Name: "c",
},
},
{
DurationUs: 2,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "d",
},
{
Name: "e",
},
{
Name: "f",
},
},
{
DurationUs: 2,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "d",
},
{
Name: "e",
},
{
Name: "f",
},
},
{
DurationUs: 3,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "g",
},
{
Name: "h",
},
{
Name: "i",
},
},
{
DurationUs: 3,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "g",
},
{
Name: "h",
},
{
Name: "i",
},
},
},
},
}
})
t.Run("ImmutableIteration", func(t *testing.T) {
for i := range make([]struct{}, 3) {
t.Run(fmt.Sprintf("Pass#%d", i), func(t *testing.T) {
durations := make([]uint64, 0, 3)
durations := make([]time.Duration, 0, 3)
tables := make([]string, 0, 9)
for phase := range s.RangeQueryPhases() {
durations = append(durations, phase.pb.GetDurationUs())
for access := range phase.RangeTableAccess() {
for phase := range s.QueryPhases() {
durations = append(durations, phase.Duration())
for access := range phase.TableAccess() {
tables = append(tables, access.Name)
}
}
require.Equal(t, []uint64{1, 2, 3}, durations)
require.Equal(t, []time.Duration{1000, 2000, 3000}, durations)
require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables)
})
}
})
t.Run("MutableIteration", func(t *testing.T) {
durations := make([]uint64, 0, 3)
durations := make([]time.Duration, 0, 3)
tables := make([]string, 0, 9)
for {
phase, ok := s.NextPhase()
if !ok {
break
}
durations = append(durations, phase.pb.GetDurationUs())
durations = append(durations, phase.Duration())
for {
access, ok := phase.NextTableAccess()
if !ok {
Expand All @@ -92,9 +91,8 @@ func TestIterateOverQueryPhases(t *testing.T) {
tables = append(tables, access.Name)
}
}
require.Equal(t, []uint64{1, 2, 3}, durations)
require.Equal(t, []time.Duration{1000, 2000, 3000}, durations)
require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables)
require.Equal(t, 3, s.pos)

_, ok := s.NextPhase()
require.False(t, ok)
Expand Down
2 changes: 1 addition & 1 deletion query/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ type (

Begin(ctx context.Context, txSettings TransactionSettings) (Transaction, error)
}
Stats = *stats.QueryStats
Stats = stats.QueryStats
)

0 comments on commit bb89f1e

Please sign in to comment.