diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index b4bda35294..905bbdcb84 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -24,7 +24,9 @@ import ( "encoding/json" "fmt" "net/http" - _ "net/http/pprof" // needed for pprof handler registration + + // needed for pprof handler registration + _ "net/http/pprof" "time" "github.com/gorilla/mux" @@ -495,6 +497,7 @@ func (h *Handler) RegisterRoutes() error { ResolutionMultiplier: h.middlewareConfig.Prometheus.ResolutionMultiplier, DefaultLookback: h.options.DefaultLookback(), Storage: h.options.Storage(), + PrometheusEngineFn: h.options.PrometheusEngineFn(), }, } override := h.registry.MiddlewareOpts(route) diff --git a/src/query/api/v1/middleware/rewrite.go b/src/query/api/v1/middleware/rewrite.go index bdcb823c2e..a55ca812f7 100644 --- a/src/query/api/v1/middleware/rewrite.go +++ b/src/query/api/v1/middleware/rewrite.go @@ -22,21 +22,28 @@ package middleware import ( "bytes" + "context" + "errors" "io/ioutil" "net/http" "net/url" "time" "github.com/gorilla/mux" + "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" + promstorage "github.com/prometheus/prometheus/storage" "go.uber.org/zap" "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/storage" xhttp "github.com/m3db/m3/src/x/net/http" + xtime "github.com/m3db/m3/src/x/time" ) +var errIgnorableQuerierError = errors.New("ignorable error") + // PrometheusRangeRewriteOptions are the options for the prometheus range rewriting middleware. type PrometheusRangeRewriteOptions struct { // nolint:maligned Enabled bool @@ -45,6 +52,11 @@ type PrometheusRangeRewriteOptions struct { // nolint:maligned ResolutionMultiplier int DefaultLookback time.Duration Storage storage.Storage + + // TODO(marcus): There's a conversation with Prometheus about supporting dynamic lookback. + // We can replace this with a single engine reference if that work is ever completed. + // https://groups.google.com/g/prometheus-developers/c/9wzuobfLMV8 + PrometheusEngineFn func(time.Duration) (*promql.Engine, error) } // PrometheusRangeRewrite is middleware that, when enabled, will rewrite the query parameter @@ -67,7 +79,7 @@ func PrometheusRangeRewrite(opts Options) mux.MiddlewareFunc { } logger := opts.InstrumentOpts.Logger() - if err := rewriteRangeDuration(r, mwOpts, logger); err != nil { + if err := RewriteRangeDuration(r, mwOpts, logger); err != nil { logger.Error("could not rewrite range", zap.Error(err)) xhttp.WriteError(w, err) return @@ -84,7 +96,8 @@ const ( lookbackParam = handleroptions.LookbackParam ) -func rewriteRangeDuration( +// RewriteRangeDuration is the driver function for the PrometheusRangeRewrite middleware +func RewriteRangeDuration( r *http.Request, opts PrometheusRangeRewriteOptions, logger *zap.Logger, @@ -118,33 +131,39 @@ func rewriteRangeDuration( return err } - attrs, err := store.QueryStorageMetadataAttributes(ctx, params.start, params.end, fetchOpts) + // Get the appropriate time range before updating the lookback + // This is necessary to cover things like the offset and `@` modifiers. + startTime, endTime := getQueryBounds(opts, params, fetchOpts, logger) + res, err := findLargestQueryResolution(ctx, store, fetchOpts, startTime, endTime) if err != nil { return err } - - // Find the largest resolution - var res time.Duration - for _, attr := range attrs { - if attr.Resolution > res { - res = attr.Resolution - } - } - // Largest resolution is 0 which means we're routing to the unaggregated namespace. // Unaggregated namespace can service all requests, so return. if res == 0 { return nil } - // Rewrite ranges within the query, if necessary + updatedLookback, updateLookback := maybeUpdateLookback(params, res, opts) + originalLookback := params.lookback + + // We use the lookback as a part of bounds calculation + // If the lookback had changed, we need to recalculate the bounds + if updateLookback { + params.lookback = updatedLookback + startTime, endTime = getQueryBounds(opts, params, fetchOpts, logger) + res, err = findLargestQueryResolution(ctx, store, fetchOpts, startTime, endTime) + if err != nil { + return err + } + } + + // parse the query so that we can manipulate it expr, err := parser.ParseExpr(params.query) if err != nil { return err } - - updateQuery, updatedQuery := maybeRewriteRangeInQuery(params.query, expr, res, opts.ResolutionMultiplier) - updateLookback, updatedLookback := maybeUpdateLookback(params, res, opts) + updatedQuery, updateQuery := maybeRewriteRangeInQuery(params.query, expr, res, opts.ResolutionMultiplier) if !updateQuery && !updateLookback { return nil @@ -183,12 +202,77 @@ func rewriteRangeDuration( logger.Debug("rewrote duration values in request", zap.String("originalQuery", params.query), zap.String("updatedQuery", updatedQuery), - zap.Duration("originalLookback", params.lookback), + zap.Duration("originalLookback", originalLookback), zap.Duration("updatedLookback", updatedLookback)) return nil } +func findLargestQueryResolution(ctx context.Context, + store storage.Storage, + fetchOpts *storage.FetchOptions, + startTime time.Time, + endTime time.Time, +) (time.Duration, error) { + attrs, err := store.QueryStorageMetadataAttributes(ctx, startTime, endTime, fetchOpts) + if err != nil { + return 0, err + } + + // Find the largest resolution + var res time.Duration + for _, attr := range attrs { + if attr.Resolution > res { + res = attr.Resolution + } + } + return res, nil +} + +// Using the prometheus engine in this way should be considered +// optional and best effort. Fall back to the frequently accurate logic +// of using the start and end time in the request +func getQueryBounds( + opts PrometheusRangeRewriteOptions, + params params, + fetchOpts *storage.FetchOptions, + logger *zap.Logger, +) (start time.Time, end time.Time) { + start = params.start + end = params.end + if opts.PrometheusEngineFn == nil { + return start, end + } + + lookback := opts.DefaultLookback + if params.isLookbackSet { + lookback = params.lookback + } + engine, err := opts.PrometheusEngineFn(lookback) + if err != nil { + logger.Debug("Found an error when getting a Prom engine to "+ + "calculate start/end time for query rewriting. Falling back to request start/end time", + zap.String("originalQuery", params.query), + zap.Duration("lookbackDuration", lookback)) + return start, end + } + + queryable := fakeQueryable{ + engine: engine, + instant: opts.Instant, + } + err = queryable.calculateQueryBounds(params.query, params.start, params.end, fetchOpts.Step) + if err != nil { + logger.Debug("Found an error when using the Prom engine to "+ + "calculate start/end time for query rewriting. Falling back to request start/end time", + zap.String("originalQuery", params.query)) + return start, end + } + // calculates the query boundaries in roughly the same way as prometheus + start, end = queryable.getQueryBounds() + return start, end +} + type params struct { query string start, end time.Time @@ -226,7 +310,7 @@ func extractParams(r *http.Request, instant bool) (params, error) { }, nil } -func maybeRewriteRangeInQuery(query string, expr parser.Node, res time.Duration, multiplier int) (bool, string) { +func maybeRewriteRangeInQuery(query string, expr parser.Node, res time.Duration, multiplier int) (string, bool) { updated := false // nolint: ifshort parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { // nolint:gocritic @@ -241,16 +325,16 @@ func maybeRewriteRangeInQuery(query string, expr parser.Node, res time.Duration, }) if updated { - return true, expr.String() + return expr.String(), true } - return false, query + return query, false } func maybeUpdateLookback( params params, maxResolution time.Duration, opts PrometheusRangeRewriteOptions, -) (bool, time.Duration) { +) (time.Duration, bool) { var ( lookback = params.lookback resolutionBasedLookback = maxResolution * time.Duration(opts.ResolutionMultiplier) // nolint: durationcheck @@ -259,7 +343,48 @@ func maybeUpdateLookback( lookback = opts.DefaultLookback } if lookback < resolutionBasedLookback { - return true, resolutionBasedLookback + return resolutionBasedLookback, true + } + return lookback, false +} + +type fakeQueryable struct { + engine *promql.Engine + instant bool + calculatedStartTime time.Time + calculatedEndTime time.Time +} + +func (f *fakeQueryable) Querier(ctx context.Context, mint, maxt int64) (promstorage.Querier, error) { + f.calculatedStartTime = xtime.FromUnixMillis(mint) + f.calculatedEndTime = xtime.FromUnixMillis(maxt) + // fail here to cause prometheus to give up on query execution + return nil, errIgnorableQuerierError +} + +func (f *fakeQueryable) calculateQueryBounds( + q string, + start time.Time, + end time.Time, + step time.Duration, +) (err error) { + var query promql.Query + if f.instant { + // startTime and endTime are the same for instant queries + query, err = f.engine.NewInstantQuery(f, q, start) + } else { + query, err = f.engine.NewRangeQuery(f, q, start, end, step) + } + if err != nil { + return err + } + // The result returned by Exec will be an error, but that's expected + if res := query.Exec(context.Background()); !errors.Is(res.Err, errIgnorableQuerierError) { + return err } - return false, lookback + return nil +} + +func (f *fakeQueryable) getQueryBounds() (startTime time.Time, endTime time.Time) { + return f.calculatedStartTime, f.calculatedEndTime } diff --git a/src/query/api/v1/middleware/rewrite_test.go b/src/query/api/v1/middleware/rewrite_test.go index 469f525abb..0c79615682 100644 --- a/src/query/api/v1/middleware/rewrite_test.go +++ b/src/query/api/v1/middleware/rewrite_test.go @@ -29,8 +29,12 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" + kitlogzap "github.com/go-kit/kit/log/zap" "github.com/gorilla/mux" + "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/storage/m3/storagemetadata" @@ -51,6 +55,7 @@ func TestPrometheusRangeRewrite(t *testing.T) { expectedQuery string expectedLookback *time.Duration + usePromEngine bool }{ { name: "query with range to unagg", @@ -214,12 +219,53 @@ func TestPrometheusRangeRewrite(t *testing.T) { expectedQuery: "foo", expectedLookback: durationPtr(15 * time.Minute), }, + { + name: "instant query; rewrite w/ prom engine", + attrs: aggregatedAttrs(5 * time.Minute), + enabled: true, + mult: 3, + instant: true, + usePromEngine: true, + query: "rate(foo[30s])", + lookback: durationPtr(15 * time.Minute), + + expectedQuery: "rate(foo[15m])", + expectedLookback: durationPtr(15 * time.Minute), + }, + { + name: "instant query; rewrite w/ prom engine & offset", + // Just testing the parsing code paths since this is a fake storage + attrs: aggregatedAttrs(5 * time.Minute), + enabled: true, + usePromEngine: true, + mult: 3, + instant: true, + query: "rate(foo[30s] offset 1w)", + lookback: durationPtr(15 * time.Minute), + + expectedQuery: "rate(foo[15m] offset 1w)", + expectedLookback: durationPtr(15 * time.Minute), + }, + { + name: "range query; rewrite w/ prom engine & offset", + // Just testing the parsing code paths since this is a fake storage + attrs: aggregatedAttrs(5 * time.Minute), + enabled: true, + usePromEngine: true, + mult: 3, + instant: false, + query: "rate(foo[30s] offset 1w)", + lookback: durationPtr(15 * time.Minute), + + expectedQuery: "rate(foo[15m] offset 1w)", + expectedLookback: durationPtr(15 * time.Minute), + }, } for _, tt := range queryTests { t.Run(tt.name, func(t *testing.T) { r := mux.NewRouter() - opts := makeBaseOpts(t, r) + opts := makeBaseOpts(t, r, tt.usePromEngine) store := opts.PrometheusRangeRewrite.Storage.(mock.Storage) store.SetQueryStorageMetadataAttributesResult(tt.attrs, nil) @@ -303,7 +349,24 @@ func TestPrometheusRangeRewrite(t *testing.T) { } } -func makeBaseOpts(t *testing.T, r *mux.Router) Options { +func durationMilliseconds(d time.Duration) int64 { + return int64(d / (time.Millisecond / time.Nanosecond)) +} + +func makeBaseOpts(t *testing.T, r *mux.Router, addPromEngine bool) Options { + var ( + instrumentOpts = instrument.NewOptions() + kitLogger = kitlogzap.NewZapSugarLogger(instrumentOpts.Logger(), zapcore.InfoLevel) + engineOpts = promql.EngineOpts{ + Logger: log.With(kitLogger, "component", "query engine"), + MaxSamples: 100, + Timeout: 1 * time.Minute, + NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { + return durationMilliseconds(1 * time.Minute) + }, + } + ) + engine := promql.NewEngine(engineOpts) route := r.NewRoute() mockStorage := mock.NewMockStorage() @@ -314,7 +377,7 @@ func makeBaseOpts(t *testing.T, r *mux.Router) Options { fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder(fetchOptsBuilderCfg) require.NoError(t, err) - return Options{ + opts := Options{ InstrumentOpts: instrument.NewOptions(), Route: route, PrometheusRangeRewrite: PrometheusRangeRewriteOptions{ @@ -325,6 +388,12 @@ func makeBaseOpts(t *testing.T, r *mux.Router) Options { Storage: mockStorage, }, } + if addPromEngine { + opts.PrometheusRangeRewrite.PrometheusEngineFn = func(duration time.Duration) (*promql.Engine, error) { + return engine, nil + } + } + return opts } func unaggregatedAttrs() []storagemetadata.Attributes { diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index 5635a73ffe..047bbd9a6f 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -96,7 +96,7 @@ func (s *m3storage) QueryStorageMetadataAttributes( s.clusters, opts.FanoutOptions, opts.RestrictQueryOptions, - nil) + opts.RelatedQueryOptions) if err != nil { return nil, err }