diff --git a/engine/engine.go b/engine/engine.go index f8130ca4..bbd3e43a 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -69,6 +69,9 @@ type Opts struct { // This will default to false. EnableXFunctions bool + // EnableAnalysis enables query analysis. + EnableAnalysis bool + // FallbackEngine Engine v1.QueryEngine } @@ -211,6 +214,7 @@ func New(opts Opts) *compatibilityEngine { timeout: opts.Timeout, metrics: metrics, extLookbackDelta: opts.ExtLookbackDelta, + enableAnalysis: opts.EnableAnalysis, } } @@ -226,6 +230,8 @@ type compatibilityEngine struct { timeout time.Duration metrics *engineMetrics + enableAnalysis bool + extLookbackDelta time.Duration } @@ -260,7 +266,7 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que }) lplan = lplan.Optimize(e.logicalOptimizers) - exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta) + exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta, e.enableAnalysis) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewInstantQuery(ctx, q, opts, qs, ts) @@ -275,12 +281,13 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que } return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, - engine: e, - expr: expr, - ts: ts, - t: InstantQuery, - resultSort: resultSort, + Query: &Query{exec: exec, opts: opts}, + engine: e, + expr: expr, + ts: ts, + t: InstantQuery, + resultSort: resultSort, + debugWriter: e.debugWriter, }, nil } @@ -311,7 +318,7 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query }) lplan = lplan.Optimize(e.logicalOptimizers) - exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta) + exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta, e.enableAnalysis) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step) @@ -326,10 +333,11 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query } return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, - engine: e, - expr: expr, - t: RangeQuery, + Query: &Query{exec: exec, opts: opts}, + engine: e, + expr: expr, + t: RangeQuery, + debugWriter: e.debugWriter, }, nil } @@ -337,7 +345,11 @@ type ExplainableQuery interface { promql.Query Explain() *ExplainOutputNode - Profile() + Analyze() *AnalyzeOutputNode +} + +type AnalyzeOutputNode struct { + // TODO: Add fields. } type ExplainOutputNode struct { @@ -358,8 +370,11 @@ func (q *Query) Explain() *ExplainOutputNode { return explainVector(q.exec) } -func (q *Query) Profile() { - // TODO(bwplotka): Return profile. +// Analyze returns human-readable query analysis of the created exector. +// This must always be called after Exec, in order to populate telemetry data. +func (q *Query) Analyze() *AnalyzeOutputNode { + // TODO: Implement by calling Analyze on ObservableVectorOperators. + return &AnalyzeOutputNode{} } func explainVector(v model.VectorOperator) *ExplainOutputNode { @@ -478,13 +493,17 @@ type compatibilityQuery struct { t QueryType resultSort resultSorter - cancel context.CancelFunc + cancel context.CancelFunc + debugWriter io.Writer } func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { // Handle case with strings early on as this does not need us to process samples. switch e := q.expr.(type) { case *parser.StringLiteral: + if q.debugWriter != nil { + analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "") + } return &promql.Result{Value: promql.String{V: e.Val, T: q.ts.UnixMilli()}} } ret = &promql.Result{ @@ -567,6 +586,9 @@ loop: } sort.Sort(resultMatrix) ret.Value = resultMatrix + if q.debugWriter != nil { + analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "") + } return ret } @@ -610,6 +632,10 @@ loop: } ret.Value = result + + if q.debugWriter != nil { + analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "") + } return ret } @@ -688,6 +714,12 @@ func recoverEngine(logger log.Logger, expr parser.Expr, errp *error) { } } +// Useful for local testing. + +func analyze(w io.Writer, o model.ObservableVectorOperator, indent, indentNext string) { + // TODO: Implement. +} + func explain(w io.Writer, o model.VectorOperator, indent, indentNext string) { me, next := o.Explain() _, _ = w.Write([]byte(indent)) diff --git a/execution/execution.go b/execution/execution.go index 58e2916d..3d0ee945 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -52,7 +52,7 @@ const stepsBatch = 10 // New creates new physical query execution for a given query expression which represents logical plan. // TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan. -func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration) (model.VectorOperator, error) { +func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration, enableAnalysis bool) (model.VectorOperator, error) { opts := &query.Options{ Context: ctx, Start: mint, @@ -61,6 +61,7 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min LookbackDelta: lookbackDelta, StepsBatch: stepsBatch, ExtLookbackDelta: extLookbackDelta, + EnableAnalysis: enableAnalysis, } selectorPool := engstore.NewSelectorPool(queryable) hints := storage.SelectHints{ diff --git a/execution/function/scalar.go b/execution/function/scalar.go index 4335b6fe..57cbf1be 100644 --- a/execution/function/scalar.go +++ b/execution/function/scalar.go @@ -15,6 +15,7 @@ import ( type scalarFunctionOperator struct { pool *model.VectorPool next model.VectorOperator + model.TimingInformation } func (o *scalarFunctionOperator) Explain() (me string, next []model.VectorOperator) { diff --git a/execution/model/operator.go b/execution/model/operator.go index 40d04eb6..ee08687b 100644 --- a/execution/model/operator.go +++ b/execution/model/operator.go @@ -5,10 +5,34 @@ package model import ( "context" + "time" "github.com/prometheus/prometheus/model/labels" ) +type NoopTimingInformation struct{} + +func (ti *NoopTimingInformation) AddCPUTimeTaken(t time.Duration) {} + +type TimingInformation struct { + CPUTime time.Duration +} + +func (ti *TimingInformation) AddCPUTimeTaken(t time.Duration) { + ti.CPUTime += t +} + +type OperatorTelemetry interface { + AddCPUTimeTaken(time.Duration) +} + +type ObservableVectorOperator interface { + VectorOperator + OperatorTelemetry + + Analyze() (*TimingInformation, []ObservableVectorOperator) +} + // VectorOperator performs operations on series in step by step fashion. type VectorOperator interface { // Next yields vectors of samples from all series for one or more execution steps. diff --git a/execution/scan/literal_selector.go b/execution/scan/literal_selector.go index 802f067a..c62ac65a 100644 --- a/execution/scan/literal_selector.go +++ b/execution/scan/literal_selector.go @@ -27,10 +27,11 @@ type numberLiteralSelector struct { once sync.Once val float64 + t model.OperatorTelemetry } func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val float64) *numberLiteralSelector { - return &numberLiteralSelector{ + op := &numberLiteralSelector{ vectorPool: pool, numSteps: opts.NumSteps(), mint: opts.Start.UnixMilli(), @@ -39,6 +40,13 @@ func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val f currentStep: opts.Start.UnixMilli(), val: val, } + + op.t = &model.NoopTimingInformation{} + if opts.EnableAnalysis { + op.t = &model.TimingInformation{} + } + + return op } func (o *numberLiteralSelector) Explain() (me string, next []model.VectorOperator) { diff --git a/query/options.go b/query/options.go index 0143eea9..fed4fa26 100644 --- a/query/options.go +++ b/query/options.go @@ -16,7 +16,8 @@ type Options struct { LookbackDelta time.Duration ExtLookbackDelta time.Duration - StepsBatch int64 + StepsBatch int64 + EnableAnalysis bool } func (o *Options) NumSteps() int {