Skip to content

Commit

Permalink
Demo.
Browse files Browse the repository at this point in the history
Signed-off-by: bwplotka <[email protected]>
  • Loading branch information
bwplotka committed Oct 26, 2022
1 parent 52d728e commit efa02e5
Show file tree
Hide file tree
Showing 12 changed files with 358 additions and 46 deletions.
64 changes: 58 additions & 6 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math"
"runtime"
"sort"
"strings"
"time"

"github.com/efficientgo/core/errors"
Expand Down Expand Up @@ -92,9 +93,10 @@ func (e *compatibilityEngine) NewInstantQuery(q storage.Queryable, opts *promql.
return nil, err
}

optimizationLog := &logicalplan.Log{}
lplan := logicalplan.New(expr, ts, ts)
if !e.disableOptimizers {
lplan = lplan.Optimize(logicalplan.DefaultOptimizers)
lplan = lplan.Optimize(logicalplan.DefaultOptimizers, optimizationLog)
}

exec, err := execution.New(lplan.Expr(), q, ts, ts, 0, e.lookbackDelta)
Expand All @@ -112,7 +114,22 @@ func (e *compatibilityEngine) NewInstantQuery(q storage.Queryable, opts *promql.
}

return &compatibilityQuery{
Query: &Query{exec: exec},
Query: &Query{
exec: exec,
optimizationLog: optimizationLog,
preOptimizations: func() model.VectorOperator {
// Yolo for demo.
expr, err := parser.ParseExpr(qs)
if err != nil {
panic(err)
}
lplan := logicalplan.New(expr, ts, ts)
exec, err := execution.New(lplan.Expr(), q, ts, ts, 0, e.lookbackDelta)
if err != nil {
panic(err)
}
return exec
}},
engine: e,
expr: expr,
ts: ts,
Expand All @@ -130,9 +147,10 @@ func (e *compatibilityEngine) NewRangeQuery(q storage.Queryable, opts *promql.Qu
return nil, errors.Newf("invalid expression type %q for range Query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
}

optimizationLog := &logicalplan.Log{}
lplan := logicalplan.New(expr, start, end)
if !e.disableOptimizers {
lplan = lplan.Optimize(logicalplan.DefaultOptimizers)
lplan = lplan.Optimize(logicalplan.DefaultOptimizers, optimizationLog)
}

exec, err := execution.New(lplan.Expr(), q, start, end, step, e.lookbackDelta)
Expand All @@ -150,20 +168,54 @@ func (e *compatibilityEngine) NewRangeQuery(q storage.Queryable, opts *promql.Qu
}

return &compatibilityQuery{
Query: &Query{exec: exec},
Query: &Query{
exec: exec,
optimizationLog: optimizationLog,
preOptimizations: func() model.VectorOperator {
// Yolo for demo.
expr, err := parser.ParseExpr(qs)
if err != nil {
panic(err)
}
lplan := logicalplan.New(expr, start, end)
exec, err := execution.New(lplan.Expr(), q, start, end, step, e.lookbackDelta)
if err != nil {
panic(err)
}
return exec
}},
engine: e,
expr: expr,
}, nil
}

type Debuggable interface {
Explain() string
}

type Query struct {
exec model.VectorOperator

optimizationLog *logicalplan.Log
preOptimizations func() model.VectorOperator
}

// Explain returns human-readable explanation of the created executor.
func (q *Query) Explain() string {
// TODO(bwplotka): Explain plan and steps.
return "not implemented"
str := strings.Builder{}
str.WriteString("EXPLAIN:\n")
opts := q.optimizationLog.Elems()
if len(opts) > 0 {
str.WriteString("PromQL physical plan before optimization:\n")
explain(&str, q.preOptimizations(), "", "")
for _, o := range opts {
str.WriteString(fmt.Sprintf("--> Logical Optimization: %v\n", o))
}
}

str.WriteString("Final PromQL physical plan:\n")
explain(&str, q.exec, "", "")
return str.String()
}

func (q *Query) Profile() {
Expand Down
122 changes: 122 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2021,5 +2021,127 @@ func TestEngineRecoversFromPanic(t *testing.T) {
r := q.Exec(context.Background())
testutil.Assert(t, r.Err.Error() == "unexpected error: panic!")
})
}

func TestExplain(t *testing.T) {
t.Skip("TODO(bwplotka): Fix - memory address is now part of info, so can't assert easily.")

start := time.Unix(0, 0)
end := time.Unix(120, 0)
step := time.Second * 30

for _, tcase := range []struct {
query string
load string

expectedExplain string
}{
{
query: "yolo",
expectedExplain: `EXPLAIN:
Final PromQL physical plan:
[*CancellableOperator]: [*coalesceOperator]:
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="yolo"]} @0[2m0s] 0 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="yolo"]} @0[2m0s] 1 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="yolo"]} @0[2m0s] 2 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="yolo"]} @0[2m0s] 3 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="yolo"]} @0[2m0s] 4 mod 6
└──[*concurrencyOperator(buff=2)]:
└──[*CancellableOperator]: [*vectorSelector] {[__name__="yolo"]} @0[2m0s] 5 mod 6
`,
},
{
query: "count(cluster_version{from_version=\"4.1.9\"}) / count(cluster_version)",
expectedExplain: `EXPLAIN:
PromQL physical plan before optimization:
[*CancellableOperator]: [*vectorOperator] / one-to-one ignoring false group []:
├──[*CancellableOperator]: [*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*aggregate] count by ([]):
│ └──[*CancellableOperator]: [*coalesceOperator]:
│ ├──[*concurrencyOperator(buff=2)]:
│ │ └──[*CancellableOperator]: [*vectorSelector] {[from_version="4.1.9" __name__="cluster_version"]} @0[2m0s] 0 mod 6
│ ├──[*concurrencyOperator(buff=2)]:
│ │ └──[*CancellableOperator]: [*vectorSelector] {[from_version="4.1.9" __name__="cluster_version"]} @0[2m0s] 1 mod 6
│ ├──[*concurrencyOperator(buff=2)]:
│ │ └──[*CancellableOperator]: [*vectorSelector] {[from_version="4.1.9" __name__="cluster_version"]} @0[2m0s] 2 mod 6
│ ├──[*concurrencyOperator(buff=2)]:
│ │ └──[*CancellableOperator]: [*vectorSelector] {[from_version="4.1.9" __name__="cluster_version"]} @0[2m0s] 3 mod 6
│ ├──[*concurrencyOperator(buff=2)]:
│ │ └──[*CancellableOperator]: [*vectorSelector] {[from_version="4.1.9" __name__="cluster_version"]} @0[2m0s] 4 mod 6
│ └──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[from_version="4.1.9" __name__="cluster_version"]} @0[2m0s] 5 mod 6
└──[*CancellableOperator]: [*concurrencyOperator(buff=2)]:
└──[*CancellableOperator]: [*aggregate] count by ([]):
└──[*CancellableOperator]: [*coalesceOperator]:
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 0 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 1 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 2 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 3 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 4 mod 6
└──[*concurrencyOperator(buff=2)]:
└──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 5 mod 6
--> Logical Optimization: SortMatchers: sorting matchers for cluster_version{from_version="4.1.9"}
--> Logical Optimization: MergeSelectsOptimizer: Replacing cluster_version with FilteredSelector{}
Final PromQL physical plan:
[*CancellableOperator]: [*vectorOperator] / one-to-one ignoring false group []:
├──[*CancellableOperator]: [*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*aggregate] count by ([]):
│ └──[*CancellableOperator]: [*coalesceOperator]:
│ ├──[*concurrencyOperator(buff=2)]:
│ │ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version" from_version="4.1.9"]} @0[2m0s] 0 mod 6
│ ├──[*concurrencyOperator(buff=2)]:
│ │ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version" from_version="4.1.9"]} @0[2m0s] 1 mod 6
│ ├──[*concurrencyOperator(buff=2)]:
│ │ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version" from_version="4.1.9"]} @0[2m0s] 2 mod 6
│ ├──[*concurrencyOperator(buff=2)]:
│ │ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version" from_version="4.1.9"]} @0[2m0s] 3 mod 6
│ ├──[*concurrencyOperator(buff=2)]:
│ │ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version" from_version="4.1.9"]} @0[2m0s] 4 mod 6
│ └──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version" from_version="4.1.9"]} @0[2m0s] 5 mod 6
└──[*CancellableOperator]: [*concurrencyOperator(buff=2)]:
└──[*CancellableOperator]: [*aggregate] count by ([]):
└──[*CancellableOperator]: [*coalesceOperator]:
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 0 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 1 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 2 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 3 mod 6
├──[*concurrencyOperator(buff=2)]:
│ └──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 4 mod 6
└──[*concurrencyOperator(buff=2)]:
└──[*CancellableOperator]: [*vectorSelector] {[__name__="cluster_version"]} @0[2m0s] 5 mod 6
`,
},
} {
t.Run("", func(t *testing.T) {
test, err := promql.NewTest(t, tcase.load)
testutil.Ok(t, err)
defer test.Close()

testutil.Ok(t, test.Run())
opts := promql.EngineOpts{
Timeout: 2 * time.Second,
MaxSamples: math.MaxInt64,
}
newEngine := engine.New(engine.Opts{DisableFallback: true, EngineOpts: opts})
q, err := newEngine.NewRangeQuery(test.Storage(), nil, tcase.query, start, end, step)
testutil.Ok(t, err)
testutil.Equals(t, tcase.expectedExplain, q.(engine.Debuggable).Explain())
})
}

}
8 changes: 8 additions & 0 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,17 @@ func New(expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, st
// TODO(fpetkovski): Adjust the step for sub-queries once they are supported.
Step: step.Milliseconds(),
}

return newCancellableOperator(expr, selectorPool, opts, hints)
}

type DataInfo struct {
}

type SetupInfo struct {
CPUs int
}

func newCancellableOperator(expr parser.Expr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (*exchange.CancellableOperator, error) {
operator, err := newOperator(expr, selectorPool, opts, hints)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion execution/scan/vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func NewVectorSelector(
}

func (o *vectorSelector) Explain() (me string, next []model.VectorOperator) {
return fmt.Sprintf("[*vectorSelector] {%v} %v mod %v", o.storage.Matchers(), o.shard, o.numShards), nil
// TODO(bwplotka): Inconsistent. Move interface to (string, []Explainable)
return fmt.Sprintf("[*vectorSelector] %v mod %v: %v", o.shard, o.numShards, o.storage.Explain()), nil
}

func (o *vectorSelector) Series(ctx context.Context) ([]labels.Labels, error) {
Expand Down
36 changes: 13 additions & 23 deletions execution/storage/filtered_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,47 @@ package storage

import (
"context"
"sync"
"fmt"

"github.com/prometheus/prometheus/model/labels"
)

type filteredSelector struct {
selector *seriesSelector
selector SeriesSelector
filter Filter

once sync.Once
series []SignedSeries
}

func NewFilteredSelector(selector *seriesSelector, filter Filter) SeriesSelector {
func NewFilteredSelector(selector SeriesSelector, filter Filter) SeriesSelector {
return &filteredSelector{
selector: selector,
filter: filter,
}
}

func (f *filteredSelector) Explain() string {
return fmt.Sprintf("[*filteredSelector] {%v}: %v", f.filter.Matchers(), f.selector.Explain())
}

func (f *filteredSelector) Matchers() []*labels.Matcher {
return append(f.selector.matchers, f.filter.Matchers()...)
return append(f.selector.Matchers(), f.filter.Matchers()...)
}

func (f *filteredSelector) GetSeries(ctx context.Context, shard, numShards int) ([]SignedSeries, error) {
var err error
f.once.Do(func() { err = f.loadSeries(ctx) })
series, err := f.selector.GetSeries(ctx, shard, numShards)
if err != nil {
return nil, err
}

return seriesShard(f.series, shard, numShards), nil
}

func (f *filteredSelector) loadSeries(ctx context.Context) error {
series, err := f.selector.GetSeries(ctx, 0, 1)
if err != nil {
return err
}

var i uint64
f.series = make([]SignedSeries, 0, len(series))
i := uint64(0)
ss := make([]SignedSeries, 0, len(series))
for _, s := range series {
if f.filter.Matches(s) {
f.series = append(f.series, SignedSeries{
ss = append(ss, SignedSeries{
Series: s.Series,
Signature: i,
})
i++
}
}

return nil
return ss, nil
}
8 changes: 4 additions & 4 deletions execution/storage/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,30 @@ import (
var sep = []byte{'\xff'}

type SelectorPool struct {
selectors map[uint64]*seriesSelector
selectors map[uint64]SeriesSelector

queryable storage.Queryable
}

func NewSelectorPool(queryable storage.Queryable) *SelectorPool {
return &SelectorPool{
selectors: make(map[uint64]*seriesSelector),
selectors: make(map[uint64]SeriesSelector),
queryable: queryable,
}
}

func (p *SelectorPool) GetSelector(mint, maxt, step int64, matchers []*labels.Matcher, hints storage.SelectHints) SeriesSelector {
key := hashMatchers(matchers, mint, maxt, hints)
if _, ok := p.selectors[key]; !ok {
p.selectors[key] = newSeriesSelector(p.queryable, mint, maxt, step, matchers, hints)
p.selectors[key] = newShardedStorageSeriesSelector(p.queryable, mint, maxt, step, matchers, hints)
}
return p.selectors[key]
}

func (p *SelectorPool) GetFilteredSelector(mint, maxt, step int64, matchers, filters []*labels.Matcher, hints storage.SelectHints) SeriesSelector {
key := hashMatchers(matchers, mint, maxt, hints)
if _, ok := p.selectors[key]; !ok {
p.selectors[key] = newSeriesSelector(p.queryable, mint, maxt, step, matchers, hints)
p.selectors[key] = newShardedStorageSeriesSelector(p.queryable, mint, maxt, step, matchers, hints)
}

return NewFilteredSelector(p.selectors[key], NewFilter(filters))
Expand Down
Loading

0 comments on commit efa02e5

Please sign in to comment.