From bb89f1e2dcad1c944ecd1555398e12d136df954f Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 18:37:51 +0300 Subject: [PATCH] rolled back changes --- internal/stats/query.go | 115 +++++++++++++++++++++------- internal/stats/query_go1.23_test.go | 96 ++++++++++++----------- query/session.go | 2 +- 3 files changed, 135 insertions(+), 78 deletions(-) diff --git a/internal/stats/query.go b/internal/stats/query.go index fc9eb4f0f..8292cbf57 100644 --- a/internal/stats/query.go +++ b/internal/stats/query.go @@ -1,12 +1,41 @@ package stats import ( + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" "time" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" ) type ( + // QueryStats holds query execution statistics. + QueryStats interface { + ProcessCPUTime() time.Duration + Compilation() (c *CompilationStats) + QueryPlan() string + QueryAST() string + TotalCPUTime() time.Duration + TotalDuration() time.Duration + + // NextPhase returns next execution phase within query. + // If ok flag is false, then there are no more phases and p is invalid. + NextPhase() (p QueryPhase, ok bool) + + // QueryPhases is a range iterator over query phases. + QueryPhases() xiter.Seq[QueryPhase] + } + // QueryPhase holds query execution phase statistics. + QueryPhase interface { + // NextTableAccess returns next accessed table within query execution phase. + // If ok flag is false, then there are no more accessed tables and t is invalid. + NextTableAccess() (t *TableAccess, ok bool) + // TableAccess is a range iterator over query execution phase's accessed tables. + TableAccess() xiter.Seq[*TableAccess] + Duration() time.Duration + CPUTime() time.Duration + AffectedShards() uint64 + IsLiteralPhase() bool + } OperationStats struct { Rows uint64 Bytes uint64 @@ -32,13 +61,13 @@ type ( Duration time.Duration CPUTime time.Duration } - // QueryStats holds query execution statistics. - QueryStats struct { + // queryStats holds query execution statistics. + queryStats struct { pb *Ydb_TableStats.QueryStats pos int } - // QueryPhase holds query execution phase statistics. - QueryPhase struct { + // queryPhase holds query execution phase statistics. + queryPhase struct { pb *Ydb_TableStats.QueryPhaseStats pos int } @@ -63,52 +92,65 @@ func fromOperationStats(pb *Ydb_TableStats.OperationStats) OperationStats { } } -func (s *QueryStats) ProcessCPUTime() time.Duration { - return fromUs(s.pb.GetProcessCpuTimeUs()) +func (stats *queryStats) ProcessCPUTime() time.Duration { + return fromUs(stats.pb.GetProcessCpuTimeUs()) } -func (s *QueryStats) Compilation() (c *CompilationStats) { - return fromCompilationStats(s.pb.GetCompilation()) +func (stats *queryStats) Compilation() (c *CompilationStats) { + return fromCompilationStats(stats.pb.GetCompilation()) } -func (s *QueryStats) QueryPlan() string { - return s.pb.GetQueryPlan() +func (stats *queryStats) QueryPlan() string { + return stats.pb.GetQueryPlan() } -func (s *QueryStats) QueryAST() string { - return s.pb.GetQueryAst() +func (stats *queryStats) QueryAST() string { + return stats.pb.GetQueryAst() } -func (s *QueryStats) TotalCPUTime() time.Duration { - return fromUs(s.pb.GetTotalCpuTimeUs()) +func (stats *queryStats) TotalCPUTime() time.Duration { + return fromUs(stats.pb.GetTotalCpuTimeUs()) } -func (s *QueryStats) TotalDuration() time.Duration { - return fromUs(s.pb.GetTotalDurationUs()) +func (stats *queryStats) TotalDuration() time.Duration { + return fromUs(stats.pb.GetTotalDurationUs()) } // NextPhase returns next execution phase within query. // If ok flag is false, then there are no more phases and p is invalid. -func (s *QueryStats) NextPhase() (p QueryPhase, ok bool) { - if s.pos >= len(s.pb.GetQueryPhases()) { +func (stats *queryStats) NextPhase() (p QueryPhase, ok bool) { + if stats.pos >= len(stats.pb.GetQueryPhases()) { return } - pb := s.pb.GetQueryPhases()[s.pos] + pb := stats.pb.GetQueryPhases()[stats.pos] if pb == nil { return } - s.pos++ + stats.pos++ - return QueryPhase{ + return &queryPhase{ pb: pb, }, true } +func (stats *queryStats) QueryPhases() xiter.Seq[QueryPhase] { + return func(yield func(p QueryPhase) bool) { + for _, pb := range stats.pb.GetQueryPhases() { + cont := yield(&queryPhase{ + pb: pb, + }) + if !cont { + return + } + } + } +} + // NextTableAccess returns next accessed table within query execution phase. // // If ok flag is false, then there are no more accessed tables and t is // invalid. -func (phase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) { +func (phase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) { if phase.pos >= len(phase.pb.GetTableAccess()) { return } @@ -124,28 +166,45 @@ func (phase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) { }, true } -func (phase *QueryPhase) Duration() time.Duration { +func (phase *queryPhase) TableAccess() xiter.Seq[*TableAccess] { + return func(yield func(access *TableAccess) bool) { + for _, pb := range phase.pb.GetTableAccess() { + cont := yield(&TableAccess{ + Name: pb.GetName(), + Reads: fromOperationStats(pb.GetReads()), + Updates: fromOperationStats(pb.GetUpdates()), + Deletes: fromOperationStats(pb.GetDeletes()), + PartitionsCount: pb.GetPartitionsCount(), + }) + if !cont { + return + } + } + } +} + +func (phase *queryPhase) Duration() time.Duration { return fromUs(phase.pb.GetDurationUs()) } -func (phase *QueryPhase) CPUTime() time.Duration { +func (phase *queryPhase) CPUTime() time.Duration { return fromUs(phase.pb.GetCpuTimeUs()) } -func (phase *QueryPhase) AffectedShards() uint64 { +func (phase *queryPhase) AffectedShards() uint64 { return phase.pb.GetAffectedShards() } -func (phase *QueryPhase) IsLiteralPhase() bool { +func (phase *queryPhase) IsLiteralPhase() bool { return phase.pb.GetLiteralPhase() } -func FromQueryStats(pb *Ydb_TableStats.QueryStats) *QueryStats { +func FromQueryStats(pb *Ydb_TableStats.QueryStats) QueryStats { if pb == nil { return nil } - return &QueryStats{ + return &queryStats{ pb: pb, } } diff --git a/internal/stats/query_go1.23_test.go b/internal/stats/query_go1.23_test.go index 1e7036f1d..ba13045e4 100644 --- a/internal/stats/query_go1.23_test.go +++ b/internal/stats/query_go1.23_test.go @@ -5,85 +5,84 @@ package stats import ( "fmt" "testing" + "time" "github.com/stretchr/testify/require" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" ) func TestIterateOverQueryPhases(t *testing.T) { - s := &QueryStats{ - pb: &Ydb_TableStats.QueryStats{ - QueryPhases: []*Ydb_TableStats.QueryPhaseStats{ - { - DurationUs: 1, - TableAccess: []*Ydb_TableStats.TableAccessStats{ - { - Name: "a", - }, - { - Name: "b", - }, - { - Name: "c", - }, + s := FromQueryStats(&Ydb_TableStats.QueryStats{ + QueryPhases: []*Ydb_TableStats.QueryPhaseStats{ + { + DurationUs: 1, + TableAccess: []*Ydb_TableStats.TableAccessStats{ + { + Name: "a", + }, + { + Name: "b", + }, + { + Name: "c", }, }, - { - DurationUs: 2, - TableAccess: []*Ydb_TableStats.TableAccessStats{ - { - Name: "d", - }, - { - Name: "e", - }, - { - Name: "f", - }, + }, + { + DurationUs: 2, + TableAccess: []*Ydb_TableStats.TableAccessStats{ + { + Name: "d", + }, + { + Name: "e", + }, + { + Name: "f", }, }, - { - DurationUs: 3, - TableAccess: []*Ydb_TableStats.TableAccessStats{ - { - Name: "g", - }, - { - Name: "h", - }, - { - Name: "i", - }, + }, + { + DurationUs: 3, + TableAccess: []*Ydb_TableStats.TableAccessStats{ + { + Name: "g", + }, + { + Name: "h", + }, + { + Name: "i", }, }, }, }, - } + }) t.Run("ImmutableIteration", func(t *testing.T) { for i := range make([]struct{}, 3) { t.Run(fmt.Sprintf("Pass#%d", i), func(t *testing.T) { - durations := make([]uint64, 0, 3) + durations := make([]time.Duration, 0, 3) tables := make([]string, 0, 9) - for phase := range s.RangeQueryPhases() { - durations = append(durations, phase.pb.GetDurationUs()) - for access := range phase.RangeTableAccess() { + for phase := range s.QueryPhases() { + durations = append(durations, phase.Duration()) + for access := range phase.TableAccess() { tables = append(tables, access.Name) } } - require.Equal(t, []uint64{1, 2, 3}, durations) + require.Equal(t, []time.Duration{1000, 2000, 3000}, durations) require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables) }) } }) t.Run("MutableIteration", func(t *testing.T) { - durations := make([]uint64, 0, 3) + durations := make([]time.Duration, 0, 3) tables := make([]string, 0, 9) for { phase, ok := s.NextPhase() if !ok { break } - durations = append(durations, phase.pb.GetDurationUs()) + durations = append(durations, phase.Duration()) for { access, ok := phase.NextTableAccess() if !ok { @@ -92,9 +91,8 @@ func TestIterateOverQueryPhases(t *testing.T) { tables = append(tables, access.Name) } } - require.Equal(t, []uint64{1, 2, 3}, durations) + require.Equal(t, []time.Duration{1000, 2000, 3000}, durations) require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables) - require.Equal(t, 3, s.pos) _, ok := s.NextPhase() require.False(t, ok) diff --git a/query/session.go b/query/session.go index fe4e9b872..1891c159c 100644 --- a/query/session.go +++ b/query/session.go @@ -18,5 +18,5 @@ type ( Begin(ctx context.Context, txSettings TransactionSettings) (Transaction, error) } - Stats = *stats.QueryStats + Stats = stats.QueryStats )