diff --git a/CHANGELOG.md b/CHANGELOG.md index 760f3b6fd8..fc5f0ffde4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * [ENHANCEMENT] Ingester: More efficient CPU/memory utilization-based read request limiting. #10325 * [ENHANCEMENT] Dashboards: Add Query-Scheduler <-> Querier Inflight Requests row to Query Reads and Remote Ruler reads dashboards. #10290 * [ENHANCEMENT] OTLP: In addition to the flag `-distributor.otel-created-timestamp-zero-ingestion-enabled` there is now `-distributor.otel-start-time-quiet-zero` to convert OTel start timestamps to Prometheus QuietZeroNaNs. This flag is to make the change rollout safe between Ingesters and Distributors. #10238 +* [ENHANCEMENT] Ruler: When rule concurrency is enabled for a rule group, its rules will now be reordered and run in batches based on their dependencies. This increases the number of rules that can potentially run concurrently. Note that the global and tenant-specific limits still apply #10400 * [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185 * [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154 * [BUGFIX] Query-frontend and querier: show warning/info annotations in some cases where they were missing (if a lazy querier was used). #10277 @@ -25,6 +26,8 @@ * [BUGFIX] Mimirtool: `remote-read` commands will now return data. #10286 * [BUGFIX] PromQL: Fix deriv, predict_linear and double_exponential_smoothing with histograms https://github.com/prometheus/prometheus/pull/15686 #10383 * [BUGFIX] MQE: Fix deriv with histograms #10383 +* [BUGFIX] PromQL: Fix functions with histograms https://github.com/prometheus/prometheus/pull/15711 #10400 +* [BUGFIX] MQE: Fix functions with histograms #10400 ### Mixin diff --git a/go.mod b/go.mod index ff34ae7107..f2bd093067 100644 --- a/go.mod +++ b/go.mod @@ -287,7 +287,7 @@ require ( sigs.k8s.io/yaml v1.4.0 // indirect ) -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250109135143-114aaaadc203 +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250110020350-a1e2bcf4a615 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index de3ca81db0..918a1ec5fb 100644 --- a/go.sum +++ b/go.sum @@ -1283,8 +1283,8 @@ github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 h1:1TeKhyS+pvzO github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40/go.mod h1:IGRj8oOoxwJbHBYl1+OhS9UjQR0dv6SQOep7HqmtyFU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20250109135143-114aaaadc203 h1:gCU3GO2mZUzsLAa/JRRDJpKbYhkXy7caWnzfNqbgDig= -github.com/grafana/mimir-prometheus v0.0.0-20250109135143-114aaaadc203/go.mod h1:KfyZCeyGxf5gvl6VZbrQsd400nJjGw+ygMEtDVZKIT4= +github.com/grafana/mimir-prometheus v0.0.0-20250110020350-a1e2bcf4a615 h1:lr3wUcXU0mScCDn/4NXc0CYglZJfy5l35sOJFar9qE0= +github.com/grafana/mimir-prometheus v0.0.0-20250110020350-a1e2bcf4a615/go.mod h1:KfyZCeyGxf5gvl6VZbrQsd400nJjGw+ygMEtDVZKIT4= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20240930132144-b5e64e81e8d3 h1:6D2gGAwyQBElSrp3E+9lSr7k8gLuP3Aiy20rweLWeBw= diff --git a/pkg/ruler/fixtures/rules_chain.yaml b/pkg/ruler/fixtures/rules_chain.yaml new file mode 100644 index 0000000000..00043b8d6f --- /dev/null +++ b/pkg/ruler/fixtures/rules_chain.yaml @@ -0,0 +1,22 @@ +groups: + - name: chain + rules: + # Evaluated concurrently, no dependencies + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[1m])) + + # Evaluated sequentially, dependents and dependencies + - record: job1:http_requests:rate1m + expr: job:http_requests:rate1m{job="job1"} + - record: job1_cluster1:http_requests:rate1m + expr: job1:http_requests:rate1m{cluster="cluster1"} + + # Evaluated concurrently, no dependents + - record: job1_cluster2:http_requests:rate1m + expr: job1:http_requests:rate1m{cluster="cluster2"} + - record: job1_cluster1_namespace1:http_requests:rate1m + expr: job1_cluster1:http_requests:rate1m{namespace="namespace1"} + - record: job1_cluster1_namespace2:http_requests:rate1m + expr: job1_cluster1:http_requests:rate1m{namespace="namespace2"} diff --git a/pkg/ruler/fixtures/rules_indeterminates.yaml b/pkg/ruler/fixtures/rules_indeterminates.yaml new file mode 100644 index 0000000000..a906d3b504 --- /dev/null +++ b/pkg/ruler/fixtures/rules_indeterminates.yaml @@ -0,0 +1,18 @@ +groups: + - name: indeterminate + rules: + # This shouldn't run in parallel because of the open matcher + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: job:http_requests:rate30m + expr: sum by (job)(rate(http_requests_total[30m])) + - record: job:http_requests:rate1h + expr: sum by (job)(rate(http_requests_total[1h])) + - record: job:http_requests:rate2h + expr: sum by (job)(rate(http_requests_total[2h])) + - record: matcher + expr: '{job="job1"}' diff --git a/pkg/ruler/fixtures/rules_multiple_independent.yaml b/pkg/ruler/fixtures/rules_multiple_independent.yaml new file mode 100644 index 0000000000..e071be3eff --- /dev/null +++ b/pkg/ruler/fixtures/rules_multiple_independent.yaml @@ -0,0 +1,15 @@ +groups: + - name: independents + rules: + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: job:http_requests:rate30m + expr: sum by (job)(rate(http_requests_total[30m])) + - record: job:http_requests:rate1h + expr: sum by (job)(rate(http_requests_total[1h])) + - record: job:http_requests:rate2h + expr: sum by (job)(rate(http_requests_total[2h])) diff --git a/pkg/ruler/fixtures/rules_topological_sort_needed.json b/pkg/ruler/fixtures/rules_topological_sort_needed.json new file mode 100644 index 0000000000..6f63c38c94 --- /dev/null +++ b/pkg/ruler/fixtures/rules_topological_sort_needed.json @@ -0,0 +1,245 @@ +{ + "groups": [ + { + "name": "test-group", + "rules": [ + { + "record": "pf:nginx_http_requests:rate5m", + "expr": "sum by (lp_service, k8scluster) (rate(nginx_http_requests_total{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt)-web\"}[5m]))" + }, + { + "record": "pf:nginx_http_requests:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:nginx_http_requests:rate5m[1w])" + }, + { + "record": "pf:nginx_http_requests:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:nginx_http_requests:rate5m[1w])" + }, + { + "record": "pf:nginx_http_requests:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:nginx_http_requests:rate5m[4h] offset 6d22h) + pf:nginx_http_requests:rate5m:avg_over_time_1w - pf:nginx_http_requests:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_requests:rate5m[4h] offset 13d22h) + pf:nginx_http_requests:rate5m:avg_over_time_1w - pf:nginx_http_requests:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_requests:rate5m[4h] offset 20d22h) + pf:nginx_http_requests:rate5m:avg_over_time_1w - pf:nginx_http_requests:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:nginx_response_time:avg_over_time_5m", + "expr": "sum by (lp_service, k8scluster) (rate(nginx_http_request_duration_seconds_sum{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt)-web\"}[5m])) / sum by (lp_service, k8scluster) (rate(nginx_http_request_duration_seconds_count{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt)-web\"}[5m]))" + }, + { + "record": "pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w", + "expr": "avg_over_time(pf:nginx_response_time:avg_over_time_5m[1w])" + }, + { + "record": "pf:nginx_response_time:avg_over_time_5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:nginx_response_time:avg_over_time_5m[1w])" + }, + { + "record": "pf:nginx_response_time:avg_over_time_5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:nginx_response_time:avg_over_time_5m[4h] offset 6d22h) + pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w - pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_response_time:avg_over_time_5m[4h] offset 13d22h) + pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w - pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_response_time:avg_over_time_5m[4h] offset 20d22h) + pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w - pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:nginx_http_4xx_responses:rate5m", + "expr": "sum by (lp_service, k8scluster) (rate(nginx_http_requests_total{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt)-web\",status=~\"4.*\"}[5m]))" + }, + { + "record": "pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:nginx_http_4xx_responses:rate5m[1w])" + }, + { + "record": "pf:nginx_http_4xx_responses:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:nginx_http_4xx_responses:rate5m[1w])" + }, + { + "record": "pf:nginx_http_4xx_responses:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:nginx_http_4xx_responses:rate5m[4h] offset 6d22h) + pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_4xx_responses:rate5m[4h] offset 13d22h) + pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_4xx_responses:rate5m[4h] offset 20d22h) + pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:nginx_http_5xx_responses:rate5m", + "expr": "sum by (lp_service, k8scluster) (rate(nginx_http_requests_total{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt)-web\",status=~\"5.*\"}[5m]))" + }, + { + "record": "pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:nginx_http_5xx_responses:rate5m[1w])" + }, + { + "record": "pf:nginx_http_5xx_responses:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:nginx_http_5xx_responses:rate5m[1w])" + }, + { + "record": "pf:nginx_http_5xx_responses:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:nginx_http_5xx_responses:rate5m[4h] offset 6d22h) + pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_5xx_responses:rate5m[4h] offset 13d22h) + pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_5xx_responses:rate5m[4h] offset 20d22h) + pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:app_http_requests:rate5m", + "expr": "sum by (application, k8scluster) (rate(http_server_requests_seconds_count{application=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\",k8scluster=\"sy-kube01\"}[5m]))" + }, + { + "record": "pf:app_http_requests:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:app_http_requests:rate5m[1w])" + }, + { + "record": "pf:app_http_requests:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:app_http_requests:rate5m[1w])" + }, + { + "record": "pf:app_http_requests:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:app_http_requests:rate5m[4h] offset 6d22h) + pf:app_http_requests:rate5m:avg_over_time_1w - pf:app_http_requests:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_requests:rate5m[4h] offset 13d22h) + pf:app_http_requests:rate5m:avg_over_time_1w - pf:app_http_requests:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_requests:rate5m[4h] offset 20d22h) + pf:app_http_requests:rate5m:avg_over_time_1w - pf:app_http_requests:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:app_response_time:avg_over_time_5m", + "expr": "sum by (application, k8scluster) (rate(http_server_requests_seconds_sum{application=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\",k8scluster=\"sy-kube01\"}[5m])) / sum by (application, k8scluster) (rate(http_server_requests_seconds_count{application=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\",k8scluster=\"sy-kube01\"}[5m]))" + }, + { + "record": "pf:app_response_time:avg_over_time_5m:avg_over_time_1w", + "expr": "avg_over_time(pf:app_response_time:avg_over_time_5m[1w])" + }, + { + "record": "pf:app_response_time:avg_over_time_5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:app_response_time:avg_over_time_5m[1w])" + }, + { + "record": "pf:app_response_time:avg_over_time_5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:app_response_time:avg_over_time_5m[4h] offset 6d22h) + pf:app_response_time:avg_over_time_5m:avg_over_time_1w - pf:app_response_time:avg_over_time_5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:app_response_time:avg_over_time_5m[4h] offset 13d22h) + pf:app_response_time:avg_over_time_5m:avg_over_time_1w - pf:app_response_time:avg_over_time_5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:app_response_time:avg_over_time_5m[4h] offset 20d22h) + pf:app_response_time:avg_over_time_5m:avg_over_time_1w - pf:app_response_time:avg_over_time_5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:app_http_4xx_responses:rate5m", + "expr": "sum by (application, k8scluster) (rate(http_server_requests_seconds_count{application=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\",k8scluster=\"sy-kube01\",status=~\"4.*\"}[5m]))" + }, + { + "record": "pf:app_http_4xx_responses:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:app_http_4xx_responses:rate5m[1w])" + }, + { + "record": "pf:app_http_4xx_responses:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:app_http_4xx_responses:rate5m[1w])" + }, + { + "record": "pf:app_http_4xx_responses:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:app_http_4xx_responses:rate5m[4h] offset 6d22h) + pf:app_http_4xx_responses:rate5m:avg_over_time_1w - pf:app_http_4xx_responses:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_4xx_responses:rate5m[4h] offset 13d22h) + pf:app_http_4xx_responses:rate5m:avg_over_time_1w - pf:app_http_4xx_responses:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_4xx_responses:rate5m[4h] offset 20d22h) + pf:app_http_4xx_responses:rate5m:avg_over_time_1w - pf:app_http_4xx_responses:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:app_http_5xx_responses:rate5m", + "expr": "sum by (application, k8scluster) (rate(http_server_requests_seconds_count{application=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\",k8scluster=\"sy-kube01\",status=~\"5.*\"}[5m]))" + }, + { + "record": "pf:app_http_5xx_responses:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:app_http_5xx_responses:rate5m[1w])" + }, + { + "record": "pf:app_http_5xx_responses:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:app_http_5xx_responses:rate5m[1w])" + }, + { + "record": "pf:app_http_5xx_responses:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:app_http_5xx_responses:rate5m[4h] offset 6d22h) + pf:app_http_5xx_responses:rate5m:avg_over_time_1w - pf:app_http_5xx_responses:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_5xx_responses:rate5m[4h] offset 13d22h) + pf:app_http_5xx_responses:rate5m:avg_over_time_1w - pf:app_http_5xx_responses:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_5xx_responses:rate5m[4h] offset 20d22h) + pf:app_http_5xx_responses:rate5m:avg_over_time_1w - pf:app_http_5xx_responses:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:app_log_events:rate5m", + "expr": "sum by (lp_service, level, k8scluster) (rate(log4j2_events_total{k8scluster=\"sy-kube01\",level=~\"error|warn\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\"}[5m]))" + }, + { + "record": "pf:app_log_events:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:app_log_events:rate5m[1w])" + }, + { + "record": "pf:app_log_events:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:app_log_events:rate5m[1w])" + }, + { + "record": "pf:app_log_events:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:app_log_events:rate5m[4h] offset 6d22h) + pf:app_log_events:rate5m:avg_over_time_1w - pf:app_log_events:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:app_log_events:rate5m[4h] offset 13d22h) + pf:app_log_events:rate5m:avg_over_time_1w - pf:app_log_events:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:app_log_events:rate5m[4h] offset 20d22h) + pf:app_log_events:rate5m:avg_over_time_1w - pf:app_log_events:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf_pods_restart_too_much", + "expr": "rate(kube_pod_container_status_restarts_total{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection)-(web|app)\"}[5m]) > 0" + }, + { + "record": "pf_pods_are_unhealthy", + "expr": "health{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection)-(web|app)\"} > 0" + }, + { + "record": "pf_pod_dependencies_are_unhealthy", + "expr": "health_dependency{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection)-(web|app)\"} > 0" + }, + { + "record": "pf_nginx_request_rate_is_too_low", + "expr": "pf:nginx_http_requests:rate5m == 0" + }, + { + "record": "pf_app_request_rate_is_too_low", + "expr": "pf:app_http_requests:rate5m{application!~\"(lp-encryptionmgmt-app|lp-rtbf-app)\"} == 0" + }, + { + "record": "pf_nginx_request_rate_is_too_high", + "expr": "pf:nginx_http_requests:rate5m > 10000" + }, + { + "record": "pf_app_request_rate_is_too_high", + "expr": "pf:app_http_requests:rate5m > 10000" + }, + { + "record": "pf_nginx_request_rate_is_outside_normal_range", + "expr": "abs((pf:nginx_http_requests:rate5m - pf:nginx_http_requests:rate5m_prediction) / pf:nginx_http_requests:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_app_request_rate_is_outside_normal_range", + "expr": "abs((pf:app_http_requests:rate5m - pf:app_http_requests:rate5m_prediction) / pf:app_http_requests:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_nginx_response_time_is_too_high", + "expr": "pf:nginx_response_time:avg_over_time_5m > 0.5" + }, + { + "record": "pf_app_response_time_is_too_high", + "expr": "pf:app_response_time:avg_over_time_5m > 0.5" + }, + { + "record": "pf_nginx_response_time_is_outside_normal_range", + "expr": "abs((pf:nginx_response_time:avg_over_time_5m - pf:nginx_response_time:avg_over_time_5m_prediction) / pf:nginx_response_time:avg_over_time_5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_app_response_time_is_outside_normal_range", + "expr": "abs((pf:app_response_time:avg_over_time_5m - pf:app_response_time:avg_over_time_5m_prediction) / pf:app_response_time:avg_over_time_5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_nginx_4xx_rate_is_outside_normal_range", + "expr": "abs((pf:nginx_http_4xx_responses:rate5m - pf:nginx_http_4xx_responses:rate5m_prediction) / pf:nginx_http_4xx_responses:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_app_4xx_rate_is_outside_normal_range", + "expr": "abs((pf:app_http_4xx_responses:rate5m - pf:app_http_4xx_responses:rate5m_prediction) / pf:app_http_4xx_responses:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_nginx_4xx_ratio_exceeds_20", + "expr": "pf:nginx_http_4xx_responses:rate5m / pf:nginx_http_requests:rate5m > 20" + }, + { + "record": "pf_app_4xx_ratio_exceeds_20", + "expr": "pf:app_http_4xx_responses:rate5m / pf:app_http_requests:rate5m > 20" + }, + { + "record": "pf_nginx_5xx_rate_is_outside_normal_range", + "expr": "abs((pf:nginx_http_5xx_responses:rate5m - pf:nginx_http_5xx_responses:rate5m_prediction) / pf:nginx_http_5xx_responses:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_app_5xx_rate_is_outside_normal_range", + "expr": "abs((pf:app_http_5xx_responses:rate5m - pf:app_http_5xx_responses:rate5m_prediction) / pf:app_http_5xx_responses:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_nginx_5xx_ratio_exceeds_20", + "expr": "pf:nginx_http_5xx_responses:rate5m / pf:nginx_http_requests:rate5m > 20" + }, + { + "record": "pf_app_5xx_ratio_exceeds_20", + "expr": "pf:app_http_5xx_responses:rate5m / pf:app_http_requests:rate5m > 20" + }, + { + "record": "pf_log_rate_is_outside_normal_range", + "expr": "abs((pf:app_log_events:rate5m - pf:app_log_events:rate5m_prediction) / pf:app_log_events:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_app_heap_usage_too_high", + "expr": "100 * (avg by (k8scluster, lp_service, kubernetes_pod_name) (container_memory_working_set_bytes{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\"}) / avg by (k8scluster, lp_service, kubernetes_pod_name) (container_spec_memory_limit_bytes{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\"})) > 90" + } + ] + } + ] +} diff --git a/pkg/ruler/rule_concurrency.go b/pkg/ruler/rule_concurrency.go index ba4a506c2a..3b26f20e95 100644 --- a/pkg/ruler/rule_concurrency.go +++ b/pkg/ruler/rule_concurrency.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/rules" @@ -115,6 +116,7 @@ func NewMultiTenantConcurrencyController(logger log.Logger, maxGlobalConcurrency // NewTenantConcurrencyControllerFor returns a new rules.RuleConcurrencyController to use for the input tenantID. func (c *MultiTenantConcurrencyController) NewTenantConcurrencyControllerFor(tenantID string) rules.RuleConcurrencyController { return &TenantConcurrencyController{ + logger: log.With(c.logger, "tenant", tenantID), slotsInUse: c.metrics.SlotsInUse.WithLabelValues(tenantID), attemptsStartedTotal: c.metrics.AttemptsStartedTotal.WithLabelValues(tenantID), attemptsIncompleteTotal: c.metrics.AttemptsIncompleteTotal.WithLabelValues(tenantID), @@ -132,6 +134,7 @@ func (c *MultiTenantConcurrencyController) NewTenantConcurrencyControllerFor(ten // TenantConcurrencyController is a concurrency controller that limits the number of concurrent rule evaluations per tenant. // It also takes into account the global concurrency limit. type TenantConcurrencyController struct { + logger log.Logger tenantID string thresholdRuleConcurrency float64 // Percentage of the rule interval at which we consider the rule group at risk of missing its evaluation. @@ -155,19 +158,7 @@ func (c *TenantConcurrencyController) Done(_ context.Context) { } // Allow tries to acquire a slot from the concurrency controller. -func (c *TenantConcurrencyController) Allow(_ context.Context, group *rules.Group, rule rules.Rule) bool { - // To allow a rule to be executed concurrently, we need 3 conditions: - // 1. The rule group must be at risk of missing its evaluation. - // 2. The rule must not have any rules that depend on it. - // 3. The rule itself must not depend on any other rules. - if !c.isGroupAtRisk(group) { - return false - } - - if !isRuleIndependent(rule) { - return false - } - +func (c *TenantConcurrencyController) Allow(_ context.Context, _ *rules.Group, _ rules.Rule) bool { // Next, try to acquire a global concurrency slot. c.attemptsStartedTotal.Inc() if !c.globalConcurrency.TryAcquire(1) { @@ -187,6 +178,84 @@ func (c *TenantConcurrencyController) Allow(_ context.Context, group *rules.Grou return false } +// SplitGroupIntoBatches splits the group into batches of rules that can be evaluated concurrently. +// It tries to batch rules that have no dependencies together and rules that have dependencies in separate batches. +// Returning no batches or nil means that the group should be evaluated sequentially. +func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g *rules.Group) []rules.ConcurrentRules { + if !c.isGroupAtRisk(g) { + // If the group is not at risk, we can evaluate the rules sequentially. + return nil + } + + logger := log.With(c.logger, "group", g.Name()) + + type ruleInfo struct { + ruleIdx int + unevaluatedDependencies map[rules.Rule]struct{} + } + remainingRules := make(map[rules.Rule]ruleInfo) + + // This batch holds the rules that have no dependencies and will be run first. + firstBatch := rules.ConcurrentRules{} + for i, r := range g.Rules() { + if r.NoDependencyRules() { + firstBatch = append(firstBatch, i) + continue + } + // Initialize the rule info with the rule's dependencies. + // Use a copy of the dependencies to avoid mutating the rule. + info := ruleInfo{ruleIdx: i, unevaluatedDependencies: map[rules.Rule]struct{}{}} + for _, dep := range r.DependencyRules() { + info.unevaluatedDependencies[dep] = struct{}{} + } + remainingRules[r] = info + } + if len(firstBatch) == 0 { + // There are no rules without dependencies. + // Fall back to sequential evaluation. + level.Info(logger).Log("msg", "No rules without dependencies found, falling back to sequential rule evaluation.") + return nil + } + result := []rules.ConcurrentRules{firstBatch} + + // Build the order of rules to evaluate based on dependencies. + for len(remainingRules) > 0 { + previousBatch := result[len(result)-1] + // Remove the batch's rules from the dependencies of its dependents. + for _, idx := range previousBatch { + rule := g.Rules()[idx] + for _, dependent := range rule.DependentRules() { + dependentInfo := remainingRules[dependent] + delete(dependentInfo.unevaluatedDependencies, rule) + } + } + + var batch rules.ConcurrentRules + // Find rules that have no remaining dependencies. + for name, info := range remainingRules { + if len(info.unevaluatedDependencies) == 0 { + batch = append(batch, info.ruleIdx) + delete(remainingRules, name) + } + } + + if len(batch) == 0 { + // There is a cycle in the rules' dependencies. + // We can't evaluate them concurrently. + // Fall back to sequential evaluation. + level.Warn(logger).Log("msg", "Cyclic rule dependencies detected, falling back to sequential rule evaluation") + return nil + } + + result = append(result, batch) + } + + level.Info(logger).Log("msg", "Batched rules into concurrent blocks", "rules", len(g.Rules()), "batches", len(result)) + level.Debug(logger).Log("msg", "Batched rules into concurrent blocks", "batches", result) + + return result +} + // isGroupAtRisk checks if the rule group's last evaluation time is within the risk threshold. func (c *TenantConcurrencyController) isGroupAtRisk(group *rules.Group) bool { interval := group.Interval().Seconds() @@ -205,11 +274,6 @@ func (c *TenantConcurrencyController) isGroupAtRisk(group *rules.Group) bool { return false } -// isRuleIndependent checks if the rule is independent of other rules. -func isRuleIndependent(rule rules.Rule) bool { - return rule.NoDependentRules() && rule.NoDependencyRules() -} - // NoopMultiTenantConcurrencyController is a concurrency controller that does not allow for concurrency. type NoopMultiTenantConcurrencyController struct{} @@ -221,6 +285,10 @@ func (n *NoopMultiTenantConcurrencyController) NewTenantConcurrencyControllerFor type NoopTenantConcurrencyController struct{} func (n *NoopTenantConcurrencyController) Done(_ context.Context) {} +func (n *NoopTenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, _ *rules.Group) []rules.ConcurrentRules { + return nil +} + func (n *NoopTenantConcurrencyController) Allow(_ context.Context, _ *rules.Group, _ rules.Rule) bool { return false } diff --git a/pkg/ruler/rule_concurrency_test.go b/pkg/ruler/rule_concurrency_test.go index 64be33715c..e7c485db30 100644 --- a/pkg/ruler/rule_concurrency_test.go +++ b/pkg/ruler/rule_concurrency_test.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "fmt" + "os" "testing" "time" @@ -20,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" "golang.org/x/sync/semaphore" + "gopkg.in/yaml.v3" "github.com/grafana/mimir/pkg/util/validation" ) @@ -106,8 +108,8 @@ func TestMultiTenantConcurrencyController(t *testing.T) { exp, err := parser.ParseExpr("vector(1)") require.NoError(t, err) rule1 := rules.NewRecordingRule("test", exp, labels.Labels{}) - rule1.SetNoDependencyRules(true) - rule1.SetNoDependentRules(true) + rule1.SetDependencyRules([]rules.Rule{}) + rule1.SetDependentRules([]rules.Rule{}) globalController := NewMultiTenantConcurrencyController(logger, 3, 50.0, reg, limits) user1Controller := globalController.NewTenantConcurrencyControllerFor("user1") @@ -171,40 +173,6 @@ cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total{us user2Controller.Done(ctx) user2Controller.Done(ctx) - // Finally, let's try a few edge cases. - rg2 := rules.NewGroup(rules.GroupOptions{ - File: "test.rules", - Name: "test", - Interval: 1 * time.Minute, // group not at risk. - Opts: &rules.ManagerOptions{}, - }) - require.False(t, user1Controller.Allow(ctx, rg2, rule1)) // Should not be allowed with a group that is not at risk. - rule1.SetNoDependencyRules(false) - require.False(t, user1Controller.Allow(ctx, rg, rule1)) // Should not be allowed as the rule is no longer independent. - - // Check the metrics one final time to ensure there are no active slots in use. - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` -# HELP cortex_ruler_independent_rule_evaluation_concurrency_attempts_incomplete_total Total number of incomplete attempts to acquire concurrency slots across all tenants -# TYPE cortex_ruler_independent_rule_evaluation_concurrency_attempts_incomplete_total counter -cortex_ruler_independent_rule_evaluation_concurrency_attempts_incomplete_total{user="user1"} 2 -cortex_ruler_independent_rule_evaluation_concurrency_attempts_incomplete_total{user="user2"} 1 -# HELP cortex_ruler_independent_rule_evaluation_concurrency_attempts_started_total Total number of started attempts to acquire concurrency slots across all tenants -# TYPE cortex_ruler_independent_rule_evaluation_concurrency_attempts_started_total counter -cortex_ruler_independent_rule_evaluation_concurrency_attempts_started_total{user="user1"} 4 -cortex_ruler_independent_rule_evaluation_concurrency_attempts_started_total{user="user2"} 3 -# HELP cortex_ruler_independent_rule_evaluation_concurrency_slots_in_use Current number of concurrency slots currently in use across all tenants -# TYPE cortex_ruler_independent_rule_evaluation_concurrency_slots_in_use gauge -cortex_ruler_independent_rule_evaluation_concurrency_slots_in_use{user="user1"} 0 -cortex_ruler_independent_rule_evaluation_concurrency_slots_in_use{user="user2"} 0 -# HELP cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total Total number of concurrency slots we're done using across all tenants -# TYPE cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total counter -cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total{user="user1"} 2 -cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total{user="user2"} 2 -`))) - - // Make the rule independent again. - rule1.SetNoDependencyRules(true) - // Now let's test having a controller two times for the same tenant. user3Controller := globalController.NewTenantConcurrencyControllerFor("user3") user3ControllerTwo := globalController.NewTenantConcurrencyControllerFor("user3") @@ -215,91 +183,143 @@ cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total{us require.True(t, user3ControllerTwo.Allow(ctx, rg, rule1)) } -func TestIsRuleIndependent(t *testing.T) { +func TestSplitGroupIntoBatches(t *testing.T) { + limits := validation.MockOverrides(func(_ *validation.Limits, tenantLimits map[string]*validation.Limits) { + tenantLimits["user1"] = validation.MockDefaultLimits() + tenantLimits["user1"].RulerMaxIndependentRuleEvaluationConcurrencyPerTenant = 2 + }) + + mtController := NewMultiTenantConcurrencyController(log.NewNopLogger(), 3, 50.0, prometheus.NewPedanticRegistry(), limits) + controller := mtController.NewTenantConcurrencyControllerFor("user1") + + ruleManager := rules.NewManager(&rules.ManagerOptions{ + RuleConcurrencyController: controller, + }) + tests := map[string]struct { - rule rules.Rule - expected bool + inputFile string + expectedGroups []rules.ConcurrentRules }{ - "rule has neither dependencies nor dependents": { - rule: func() rules.Rule { - r := rules.NewRecordingRule("test", nil, labels.Labels{}) - r.SetNoDependentRules(true) - r.SetNoDependencyRules(true) - return r - }(), - expected: true, + "chained": { + inputFile: "fixtures/rules_chain.yaml", + expectedGroups: []rules.ConcurrentRules{ + {0, 1}, + {2}, + {3, 4}, + {5, 6}, + }, }, - "rule has both dependencies and dependents": { - rule: func() rules.Rule { - r := rules.NewRecordingRule("test", nil, labels.Labels{}) - r.SetNoDependentRules(false) - r.SetNoDependencyRules(false) - return r - }(), - expected: false, + "indeterminates": { + inputFile: "fixtures/rules_indeterminates.yaml", + expectedGroups: nil, }, - "rule has dependents": { - rule: func() rules.Rule { - r := rules.NewRecordingRule("test", nil, labels.Labels{}) - r.SetNoDependentRules(false) - r.SetNoDependencyRules(true) - return r - }(), - expected: false, + "all independent": { + inputFile: "fixtures/rules_multiple_independent.yaml", + expectedGroups: []rules.ConcurrentRules{ + {0, 1, 2, 3, 4, 5}, + }, }, - "rule has dependencies": { - rule: func() rules.Rule { - r := rules.NewRecordingRule("test", nil, labels.Labels{}) - r.SetNoDependentRules(true) - r.SetNoDependencyRules(false) - return r - }(), - expected: false, + "topological sort": { + inputFile: "fixtures/rules_topological_sort_needed.json", + expectedGroups: []rules.ConcurrentRules{ + {0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 37, 38, 58}, + {1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 21, 22, 25, 26, 29, 30, 33, 34, 39, 40, 41, 42, 45, 46, 51, 52, 55, 56}, + {3, 7, 11, 15, 19, 23, 27, 31, 35}, + {43, 44, 47, 48, 49, 50, 53, 54, 57}, + }, }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { - result := isRuleIndependent(tc.rule) - require.Equal(t, tc.expected, result) + // Load group with a -1 interval so it's always at risk. + groups, errs := ruleManager.LoadGroups(-1*time.Second, labels.EmptyLabels(), "", nil, []string{tc.inputFile}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + var group *rules.Group + for _, g := range groups { + group = g + } + + batches := controller.SplitGroupIntoBatches(context.Background(), group) + requireConcurrentRulesEqual(t, tc.expectedGroups, batches) + + // Make sure the group is not mutated and will still return the same batches when called again. + batches = controller.SplitGroupIntoBatches(context.Background(), group) + requireConcurrentRulesEqual(t, tc.expectedGroups, batches) }) } } +func requireConcurrentRulesEqual(t *testing.T, expected, actual []rules.ConcurrentRules) { + t.Helper() + + if expected == nil { + require.Nil(t, actual) + return + } + + // Like require.Equals but ignores the order of elements in the slices. + require.Len(t, actual, len(expected)) + for i, expectedBatch := range expected { + actualBatch := actual[i] + require.ElementsMatch(t, expectedBatch, actualBatch) + } +} + func TestGroupAtRisk(t *testing.T) { + // Write group file with 100 independent rules. + ruleCt := 100 + dummyRules := []map[string]interface{}{} + for i := 0; i < ruleCt; i++ { + dummyRules = append(dummyRules, map[string]interface{}{ + "record": fmt.Sprintf("test_rule%d", i), + "expr": "vector(1)", + }) + } + + groupFileContent := map[string]interface{}{ + "groups": []map[string]interface{}{ + { + "name": "test", + "rules": dummyRules, + }, + }, + } + + groupFile := t.TempDir() + "/test.rules" + f, err := os.Create(groupFile) + require.NoError(t, err) + encoder := yaml.NewEncoder(f) + require.NoError(t, encoder.Encode(groupFileContent)) + require.NoError(t, f.Close()) + createAndEvalTestGroup := func(interval time.Duration, evalConcurrently bool) *rules.Group { st := teststorage.New(t) defer st.Close() - // Create 100 rules that all take 1ms to evaluate. - var createdRules []rules.Rule - ruleCt := 100 ruleWaitTime := 1 * time.Millisecond - for i := 0; i < ruleCt; i++ { - q, err := parser.ParseExpr("vector(1)") - require.NoError(t, err) - rule := rules.NewRecordingRule(fmt.Sprintf("test_rule%d", i), q, labels.Labels{}) - rule.SetNoDependencyRules(true) - rule.SetNoDependentRules(true) - createdRules = append(createdRules, rule) - } - - // Create the group and evaluate it - opts := rules.GroupOptions{ - Interval: interval, - Opts: &rules.ManagerOptions{ - Appendable: st, - QueryFunc: func(_ context.Context, _ string, _ time.Time) (promql.Vector, error) { - time.Sleep(ruleWaitTime) - return promql.Vector{}, nil - }, + opts := &rules.ManagerOptions{ + Appendable: st, + // Make the rules take 1ms to evaluate. + QueryFunc: func(_ context.Context, _ string, _ time.Time) (promql.Vector, error) { + time.Sleep(ruleWaitTime) + return promql.Vector{}, nil }, - Rules: createdRules, } if evalConcurrently { - opts.Opts.RuleConcurrencyController = &allowAllConcurrencyController{} + opts.RuleConcurrencyController = &allowAllConcurrencyController{} } - g := rules.NewGroup(opts) + manager := rules.NewManager(opts) + groups, errs := manager.LoadGroups(interval, labels.EmptyLabels(), "", nil, groupFile) + require.Empty(t, errs) + + var g *rules.Group + for _, group := range groups { + g = group + } + rules.DefaultEvalIterationFunc(context.Background(), g, time.Now()) // Sanity check that we're actually running the rules concurrently. @@ -371,4 +391,12 @@ func (a *allowAllConcurrencyController) Allow(_ context.Context, _ *rules.Group, return true } +func (a *allowAllConcurrencyController) SplitGroupIntoBatches(_ context.Context, g *rules.Group) []rules.ConcurrentRules { + batch := rules.ConcurrentRules{} + for i := range g.Rules() { + batch = append(batch, i) + } + return []rules.ConcurrentRules{batch} +} + func (a *allowAllConcurrencyController) Done(_ context.Context) {} diff --git a/pkg/ruler/rule_query_consistency_test.go b/pkg/ruler/rule_query_consistency_test.go index 90091f6bf3..834a8ff345 100644 --- a/pkg/ruler/rule_query_consistency_test.go +++ b/pkg/ruler/rule_query_consistency_test.go @@ -50,7 +50,7 @@ func TestWrapQueryFuncWithReadConsistency(t *testing.T) { t.Run("should inject strong read consistency if the rule has dependencies", func(t *testing.T) { r := rules.NewRecordingRule("", &parser.StringLiteral{}, labels.New()) - r.SetNoDependencyRules(false) + r.SetDependencyRules([]rules.Rule{rules.NewRecordingRule("other", &parser.StringLiteral{}, labels.New())}) ctx := rules.NewOriginContext(context.Background(), rules.NewRuleDetail(r)) hasReadConsistency, readConsistencyLevel := runWrappedFunc(ctx) @@ -60,7 +60,7 @@ func TestWrapQueryFuncWithReadConsistency(t *testing.T) { t.Run("should not inject read consistency level if the rule has no dependencies, to let run with the per-tenant default", func(t *testing.T) { r := rules.NewRecordingRule("", &parser.StringLiteral{}, labels.New()) - r.SetNoDependencyRules(true) + r.SetDependencyRules([]rules.Rule{}) ctx := rules.NewOriginContext(context.Background(), rules.NewRuleDetail(r)) hasReadConsistency, _ := runWrappedFunc(ctx) diff --git a/pkg/streamingpromql/operators/functions/range_vectors.go b/pkg/streamingpromql/operators/functions/range_vectors.go index f127014606..1b7a533267 100644 --- a/pkg/streamingpromql/operators/functions/range_vectors.go +++ b/pkg/streamingpromql/operators/functions/range_vectors.go @@ -67,17 +67,22 @@ func presentOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnn } var MaxOverTime = FunctionOverRangeVectorDefinition{ - SeriesMetadataFunction: DropSeriesName, - StepFunc: maxOverTime, + SeriesMetadataFunction: DropSeriesName, + StepFunc: maxOverTime, + NeedsSeriesNamesForAnnotations: true, } -func maxOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { +func maxOverTime(step *types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { head, tail := step.Floats.UnsafePoints() if len(head) == 0 && len(tail) == 0 { return 0, false, nil, nil } + if step.Histograms.Any() { + emitAnnotation(annotations.NewHistogramIgnoredInMixedRangeInfo) + } + maxSoFar := head[0].F head = head[1:] @@ -97,17 +102,22 @@ func maxOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotat } var MinOverTime = FunctionOverRangeVectorDefinition{ - SeriesMetadataFunction: DropSeriesName, - StepFunc: minOverTime, + SeriesMetadataFunction: DropSeriesName, + StepFunc: minOverTime, + NeedsSeriesNamesForAnnotations: true, } -func minOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { +func minOverTime(step *types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { head, tail := step.Floats.UnsafePoints() if len(head) == 0 && len(tail) == 0 { return 0, false, nil, nil } + if step.Histograms.Any() { + emitAnnotation(annotations.NewHistogramIgnoredInMixedRangeInfo) + } + minSoFar := head[0].F head = head[1:] @@ -475,11 +485,8 @@ var Deriv = FunctionOverRangeVectorDefinition{ func deriv(step *types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { fHead, fTail := step.Floats.UnsafePoints() - hHead, hTail := step.Histograms.UnsafePoints() - - haveHistograms := len(hHead) > 0 || len(hTail) > 0 - if len(fHead)+len(fTail) == 1 && haveHistograms { + if len(fHead)+len(fTail) == 1 && step.Histograms.Any() { emitAnnotation(annotations.NewHistogramIgnoredInMixedRangeInfo) return 0, false, nil, nil } @@ -490,7 +497,7 @@ func deriv(step *types.RangeVectorStepData, _ float64, emitAnnotation types.Emit slope, _ := linearRegression(fHead, fTail, fHead[0].T) - if haveHistograms { + if step.Histograms.Any() { emitAnnotation(annotations.NewHistogramIgnoredInMixedRangeInfo) } diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index 6a8e796441..fb0ebaef5a 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -118,7 +118,7 @@ eval range from 0 to 7m step 1m present_over_time(some_metric_count[3m1s]) eval range from 0 to 7m step 1m present_over_time(some_metric_count[6s]) {foo="bar"} 1 1 1 1 _ _ 1 1 -eval range from 0 to 7m step 1m min_over_time(some_metric_count[3m1s]) +eval_info range from 0 to 7m step 1m min_over_time(some_metric_count[3m1s]) {foo="bar"} 0 0 0 0 1 2 3 _ eval range from 0 to 7m step 1m min_over_time(some_metric_count[6s]) @@ -127,7 +127,7 @@ eval range from 0 to 7m step 1m min_over_time(some_metric_count[6s]) eval range from 0 to 16m step 1m min_over_time(some_inf_and_nan_metric[3m1s]) {foo="baz"} 0 0 0 0 1 2 3 Inf Inf Inf NaN 8 7 6 6 6 6 -eval range from 0 to 7m step 1m max_over_time(some_metric_count[3m1s]) +eval_info range from 0 to 7m step 1m max_over_time(some_metric_count[3m1s]) {foo="bar"} 0 1 2 3 3 3 3 _ eval range from 0 to 7m step 1m max_over_time(some_metric_count[6s]) diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index 4e716c1185..b6ae2b1d39 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -975,41 +975,70 @@ eval instant at 1m avg_over_time(metric[2m]) # Tests for stddev_over_time and stdvar_over_time. clear load 10s - metric 0 8 8 2 3 + metric 0 8 8 2 3 + metric_histogram{type="only_histogram"} {{schema:1 sum:2 count:3}}x5 + metric_histogram{type="mix"} 1 1 1 {{schema:1 sum:2 count:3}} {{schema:1 sum:2 count:3}} # Unsupported by streaming engine. # eval instant at 1m stdvar_over_time(metric[2m]) -# {} 10.56 +# {} 10.56 # Unsupported by streaming engine. # eval instant at 1m stddev_over_time(metric[2m]) -# {} 3.249615 +# {} 3.249615 # Unsupported by streaming engine. # eval instant at 1m stddev_over_time((metric[2m])) -# {} 3.249615 +# {} 3.249615 + +# Tests for stddev_over_time and stdvar_over_time with histograms. +# Unsupported by streaming engine. +# eval instant at 1m stddev_over_time(metric_histogram{type="only_histogram"}[2m]) +# #empty + +# Unsupported by streaming engine. +# eval_info instant at 1m stddev_over_time(metric_histogram{type="mix"}[2m]) +# {type="mix"} 0 + +# Unsupported by streaming engine. +# eval instant at 1m stdvar_over_time(metric_histogram{type="only_histogram"}[2m]) +# #empty + +# Unsupported by streaming engine. +# eval_info instant at 1m stdvar_over_time(metric_histogram{type="mix"}[2m]) +# {type="mix"} 0 # Tests for stddev_over_time and stdvar_over_time #4927. clear load 10s - metric 1.5990505637277868 1.5990505637277868 1.5990505637277868 + metric 1.5990505637277868 1.5990505637277868 1.5990505637277868 # Unsupported by streaming engine. # eval instant at 1m stdvar_over_time(metric[1m]) -# {} 0 +# {} 0 # Unsupported by streaming engine. # eval instant at 1m stddev_over_time(metric[1m]) -# {} 0 +# {} 0 # Tests for mad_over_time. clear load 10s - metric 4 6 2 1 999 1 2 + metric 4 6 2 1 999 1 2 + metric_histogram{type="only_histogram"} {{schema:1 sum:2 count:3}}x5 + metric_histogram{type="mix"} 1 1 1 {{schema:1 sum:2 count:3}} {{schema:1 sum:2 count:3}} # Unsupported by streaming engine. # eval instant at 70s mad_over_time(metric[70s]) -# {} 1 +# {} 1 + +# Unsupported by streaming engine. +# eval instant at 70s mad_over_time(metric_histogram{type="only_histogram"}[70s]) +# #empty + +# Unsupported by streaming engine. +# eval_info instant at 70s mad_over_time(metric_histogram{type="mix"}[70s]) +# {type="mix"} 0 # Tests for quantile_over_time clear @@ -1018,6 +1047,8 @@ load 10s data{test="two samples"} 0 1 data{test="three samples"} 0 1 2 data{test="uneven samples"} 0 1 4 + data_histogram{test="only histogram samples"} {{schema:0 sum:1 count:2}}x4 + data_histogram{test="mix samples"} 0 1 2 {{schema:0 sum:1 count:2}}x2 # Unsupported by streaming engine. # eval instant at 1m quantile_over_time(0, data[2m]) @@ -1067,6 +1098,14 @@ load 10s # {test="three samples"} +Inf # {test="uneven samples"} +Inf +# Unsupported by streaming engine. +# eval instant at 1m quantile_over_time(0.5, data_histogram{test="only histogram samples"}[2m]) +# #empty + +# Unsupported by streaming engine. +# eval_info instant at 1m quantile_over_time(0.5, data_histogram{test="mix samples"}[2m]) +# {test="mix samples"} 1 + clear # Test time-related functions. @@ -1212,15 +1251,17 @@ load 5m eval_fail instant at 0m changes({__name__=~'testmetric1|testmetric2'}[5m]) -# Tests for *_over_time clear +# Tests for *_over_time load 10s data{type="numbers"} 2 0 3 data{type="some_nan"} 2 0 NaN data{type="some_nan2"} 2 NaN 1 data{type="some_nan3"} NaN 0 1 data{type="only_nan"} NaN NaN NaN + data_histogram{type="only_histogram"} {{schema:0 sum:1 count:2}} {{schema:0 sum:2 count:3}} {{schema:0 sum:3 count:4}} + data_histogram{type="mix_samples"} 0 1 {{schema:0 sum:1 count:2}} {{schema:0 sum:2 count:3}} eval instant at 1m min_over_time(data[2m]) {type="numbers"} 0 @@ -1229,6 +1270,12 @@ eval instant at 1m min_over_time(data[2m]) {type="some_nan3"} 0 {type="only_nan"} NaN +eval instant at 1m min_over_time(data_histogram{type="only_histogram"}[2m]) + #empty + +eval_info instant at 1m min_over_time(data_histogram{type="mix_samples"}[2m]) + {type="mix_samples"} 0 + eval instant at 1m max_over_time(data[2m]) {type="numbers"} 3 {type="some_nan"} 2 @@ -1236,12 +1283,29 @@ eval instant at 1m max_over_time(data[2m]) {type="some_nan3"} 1 {type="only_nan"} NaN -eval instant at 1m last_over_time(data[2m]) +eval instant at 1m max_over_time(data_histogram{type="only_histogram"}[2m]) + #empty + +eval_info instant at 1m max_over_time(data_histogram{type="mix_samples"}[2m]) + {type="mix_samples"} 1 + +eval instant at 1m last_over_time({__name__=~"data(_histogram)?"}[2m]) data{type="numbers"} 3 data{type="some_nan"} NaN data{type="some_nan2"} 1 data{type="some_nan3"} 1 data{type="only_nan"} NaN + data_histogram{type="only_histogram"} {{schema:0 sum:3 count:4}} + data_histogram{type="mix_samples"} {{schema:0 sum:2 count:3}} + +eval instant at 1m count_over_time({__name__=~"data(_histogram)?"}[2m]) + {type="numbers"} 3 + {type="some_nan"} 3 + {type="some_nan2"} 3 + {type="some_nan3"} 3 + {type="only_nan"} 3 + {type="only_histogram"} 3 + {type="mix_samples"} 4 clear diff --git a/pkg/streamingpromql/testdata/upstream/native_histograms.test b/pkg/streamingpromql/testdata/upstream/native_histograms.test index 3616d45591..61123239ce 100644 --- a/pkg/streamingpromql/testdata/upstream/native_histograms.test +++ b/pkg/streamingpromql/testdata/upstream/native_histograms.test @@ -1133,6 +1133,42 @@ eval_warn range from 0 to 12m step 6m sum(metric) eval_warn range from 0 to 12m step 6m avg(metric) {} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} _ +# Test incompatible schemas with additional aggregation operators +eval range from 0 to 12m step 6m count(metric) + {} 2 2 3 + +eval range from 0 to 12m step 6m group(metric) + {} 1 1 1 + +# Unsupported by streaming engine. +# eval range from 0 to 12m step 6m count(limitk(1, metric)) +# {} 1 1 1 + +# Unsupported by streaming engine. +# eval range from 0 to 12m step 6m limitk(3, metric) +# metric{series="1"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} +# metric{series="2"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} _ {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} +# metric{series="3"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + +# Unsupported by streaming engine. +# eval range from 0 to 12m step 6m limit_ratio(1, metric) +# metric{series="1"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} +# metric{series="2"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} _ {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} +# metric{series="3"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + +# Test incompatible schemas with and/or +eval range from 0 to 12m step 6m metric{series="1"} and ignoring(series) metric{series="2"} + metric{series="1"} _ _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + +eval range from 0 to 12m step 6m metric{series="1"} or ignoring(series) metric{series="2"} + metric{series="1"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + metric{series="2"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} _ _ + +# Test incompatible schemas with arithmetic binary operators +eval_warn range from 0 to 12m step 6m metric{series="2"} + ignoring (series) metric{series="3"} + +eval_warn range from 0 to 12m step 6m metric{series="2"} - ignoring (series) metric{series="3"} + clear load 1m diff --git a/vendor/github.com/prometheus/prometheus/promql/functions.go b/vendor/github.com/prometheus/prometheus/promql/functions.go index 5f31a3db18..2d809571d4 100644 --- a/vendor/github.com/prometheus/prometheus/promql/functions.go +++ b/vendor/github.com/prometheus/prometheus/promql/functions.go @@ -691,9 +691,15 @@ func funcLastOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNod // === mad_over_time(Matrix parser.ValueTypeMatrix) (Vector, Annotations) === func funcMadOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - if len(vals[0].(Matrix)[0].Floats) == 0 { + samples := vals[0].(Matrix)[0] + var annos annotations.Annotations + if len(samples.Floats) == 0 { return enh.Out, nil } + if len(samples.Histograms) > 0 { + metricName := samples.Metric.Get(labels.MetricName) + annos.Add(annotations.NewHistogramIgnoredInMixedRangeInfo(metricName, args[0].PositionRange())) + } return aggrOverTime(vals, enh, func(s Series) float64 { values := make(vectorByValueHeap, 0, len(s.Floats)) for _, f := range s.Floats { @@ -705,18 +711,20 @@ func funcMadOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode values = append(values, Sample{F: math.Abs(f.F - median)}) } return quantile(0.5, values) - }), nil + }), annos } // === max_over_time(Matrix parser.ValueTypeMatrix) (Vector, Annotations) === func funcMaxOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - if len(vals[0].(Matrix)[0].Floats) == 0 { - // TODO(beorn7): The passed values only contain - // histograms. max_over_time ignores histograms for now. If - // there are only histograms, we have to return without adding - // anything to enh.Out. + samples := vals[0].(Matrix)[0] + var annos annotations.Annotations + if len(samples.Floats) == 0 { return enh.Out, nil } + if len(samples.Histograms) > 0 { + metricName := samples.Metric.Get(labels.MetricName) + annos.Add(annotations.NewHistogramIgnoredInMixedRangeInfo(metricName, args[0].PositionRange())) + } return aggrOverTime(vals, enh, func(s Series) float64 { maxVal := s.Floats[0].F for _, f := range s.Floats { @@ -725,18 +733,20 @@ func funcMaxOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode } } return maxVal - }), nil + }), annos } // === min_over_time(Matrix parser.ValueTypeMatrix) (Vector, Annotations) === func funcMinOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - if len(vals[0].(Matrix)[0].Floats) == 0 { - // TODO(beorn7): The passed values only contain - // histograms. min_over_time ignores histograms for now. If - // there are only histograms, we have to return without adding - // anything to enh.Out. + samples := vals[0].(Matrix)[0] + var annos annotations.Annotations + if len(samples.Floats) == 0 { return enh.Out, nil } + if len(samples.Histograms) > 0 { + metricName := samples.Metric.Get(labels.MetricName) + annos.Add(annotations.NewHistogramIgnoredInMixedRangeInfo(metricName, args[0].PositionRange())) + } return aggrOverTime(vals, enh, func(s Series) float64 { minVal := s.Floats[0].F for _, f := range s.Floats { @@ -745,7 +755,7 @@ func funcMinOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode } } return minVal - }), nil + }), annos } // === sum_over_time(Matrix parser.ValueTypeMatrix) (Vector, Annotations) === @@ -794,10 +804,6 @@ func funcQuantileOverTime(vals []parser.Value, args parser.Expressions, enh *Eva q := vals[0].(Vector)[0].F el := vals[1].(Matrix)[0] if len(el.Floats) == 0 { - // TODO(beorn7): The passed values only contain - // histograms. quantile_over_time ignores histograms for now. If - // there are only histograms, we have to return without adding - // anything to enh.Out. return enh.Out, nil } @@ -805,7 +811,10 @@ func funcQuantileOverTime(vals []parser.Value, args parser.Expressions, enh *Eva if math.IsNaN(q) || q < 0 || q > 1 { annos.Add(annotations.NewInvalidQuantileWarning(q, args[0].PositionRange())) } - + if len(el.Histograms) > 0 { + metricName := el.Metric.Get(labels.MetricName) + annos.Add(annotations.NewHistogramIgnoredInAggregationInfo(metricName, args[0].PositionRange())) + } values := make(vectorByValueHeap, 0, len(el.Floats)) for _, f := range el.Floats { values = append(values, Sample{F: f.F}) @@ -815,13 +824,15 @@ func funcQuantileOverTime(vals []parser.Value, args parser.Expressions, enh *Eva // === stddev_over_time(Matrix parser.ValueTypeMatrix) (Vector, Annotations) === func funcStddevOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - if len(vals[0].(Matrix)[0].Floats) == 0 { - // TODO(beorn7): The passed values only contain - // histograms. stddev_over_time ignores histograms for now. If - // there are only histograms, we have to return without adding - // anything to enh.Out. + samples := vals[0].(Matrix)[0] + var annos annotations.Annotations + if len(samples.Floats) == 0 { return enh.Out, nil } + if len(samples.Histograms) > 0 { + metricName := samples.Metric.Get(labels.MetricName) + annos.Add(annotations.NewHistogramIgnoredInMixedRangeInfo(metricName, args[0].PositionRange())) + } return aggrOverTime(vals, enh, func(s Series) float64 { var count float64 var mean, cMean float64 @@ -833,18 +844,20 @@ func funcStddevOverTime(vals []parser.Value, args parser.Expressions, enh *EvalN aux, cAux = kahanSumInc(delta*(f.F-(mean+cMean)), aux, cAux) } return math.Sqrt((aux + cAux) / count) - }), nil + }), annos } // === stdvar_over_time(Matrix parser.ValueTypeMatrix) (Vector, Annotations) === func funcStdvarOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - if len(vals[0].(Matrix)[0].Floats) == 0 { - // TODO(beorn7): The passed values only contain - // histograms. stdvar_over_time ignores histograms for now. If - // there are only histograms, we have to return without adding - // anything to enh.Out. + samples := vals[0].(Matrix)[0] + var annos annotations.Annotations + if len(samples.Floats) == 0 { return enh.Out, nil } + if len(samples.Histograms) > 0 { + metricName := samples.Metric.Get(labels.MetricName) + annos.Add(annotations.NewHistogramIgnoredInMixedRangeInfo(metricName, args[0].PositionRange())) + } return aggrOverTime(vals, enh, func(s Series) float64 { var count float64 var mean, cMean float64 @@ -856,7 +869,7 @@ func funcStdvarOverTime(vals []parser.Value, args parser.Expressions, enh *EvalN aux, cAux = kahanSumInc(delta*(f.F-(mean+cMean)), aux, cAux) } return (aux + cAux) / count - }), nil + }), annos } // === absent(Vector parser.ValueTypeVector) (Vector, Annotations) === diff --git a/vendor/github.com/prometheus/prometheus/promql/promqltest/README.md b/vendor/github.com/prometheus/prometheus/promql/promqltest/README.md index af34354241..25c2653ab3 100644 --- a/vendor/github.com/prometheus/prometheus/promql/promqltest/README.md +++ b/vendor/github.com/prometheus/prometheus/promql/promqltest/README.md @@ -22,7 +22,7 @@ Each test file contains a series of commands. There are three kinds of commands: * `load` * `clear` -* `eval` +* `eval` (including the variants `eval_fail`, `eval_warn`, `eval_info`, and `eval_ordered`) Each command is executed in the order given in the file. @@ -50,12 +50,12 @@ load 1m my_metric{env="prod"} 5 2+3x2 _ stale {{schema:1 sum:3 count:22 buckets:[5 10 7]}} ``` -...will create a single series with labels `my_metric{env="prod"}`, with the following points: +… will create a single series with labels `my_metric{env="prod"}`, with the following points: * t=0: value is 5 * t=1m: value is 2 * t=2m: value is 5 -* t=3m: value is 7 +* t=3m: value is 8 * t=4m: no point * t=5m: stale marker * t=6m: native histogram with schema 1, sum -3, count 22 and bucket counts 5, 10 and 7 @@ -74,6 +74,7 @@ When loading a batch of classic histogram float series, you can optionally appen ## `eval` command `eval` runs a query against the test environment and asserts that the result is as expected. +It requires the query to succeed without any (info or warn) annotations. Both instant and range queries are supported. @@ -110,11 +111,18 @@ eval range from 0 to 3m step 1m sum by (env) (my_metric) {env="test"} 10 20 30 45 ``` -Instant queries also support asserting that the series are returned in exactly the order specified: use `eval_ordered instant ...` instead of `eval instant ...`. -This is not supported for range queries. +To assert that a query succeeds with an info or warn annotation, use the +`eval_info` or `eval_warn` commands, respectively. -It is also possible to test that queries fail: use `eval_fail instant ...` or `eval_fail range ...`. -`eval_fail` optionally takes an expected error message string or regexp to assert that the error message is as expected. +Instant queries also support asserting that the series are returned in exactly +the order specified: use `eval_ordered instant ...` instead of `eval instant +...`. `eval_ordered` ignores any annotations. The assertion always fails for +matrix results. + +To assert that a query fails, use the `eval_fail` command. `eval_fail` does not +expect any result lines. Instead, it optionally accepts an expected error +message string or regular expression to assert that the error message is as +expected. For example: diff --git a/vendor/github.com/prometheus/prometheus/promql/promqltest/test.go b/vendor/github.com/prometheus/prometheus/promql/promqltest/test.go index efa2136f10..5e0d9083cb 100644 --- a/vendor/github.com/prometheus/prometheus/promql/promqltest/test.go +++ b/vendor/github.com/prometheus/prometheus/promql/promqltest/test.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/almost" + "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/convertnhcb" "github.com/prometheus/prometheus/util/teststorage" "github.com/prometheus/prometheus/util/testutil" @@ -692,6 +693,24 @@ func (ev *evalCmd) expectMetric(pos int, m labels.Labels, vals ...parser.Sequenc ev.expected[h] = entry{pos: pos, vals: vals} } +// checkAnnotations asserts if the annotations match the expectations. +func (ev *evalCmd) checkAnnotations(expr string, annos annotations.Annotations) error { + countWarnings, countInfo := annos.CountWarningsAndInfo() + switch { + case ev.ordered: + // Ignore annotations if testing for order. + case !ev.warn && countWarnings > 0: + return fmt.Errorf("unexpected warnings evaluating query %q (line %d): %v", expr, ev.line, annos.AsErrors()) + case ev.warn && countWarnings == 0: + return fmt.Errorf("expected warnings evaluating query %q (line %d) but got none", expr, ev.line) + case !ev.info && countInfo > 0: + return fmt.Errorf("unexpected info annotations evaluating query %q (line %d): %v", expr, ev.line, annos.AsErrors()) + case ev.info && countInfo == 0: + return fmt.Errorf("expected info annotations evaluating query %q (line %d) but got none", expr, ev.line) + } + return nil +} + // compareResult compares the result value with the defined expectation. func (ev *evalCmd) compareResult(result parser.Value) error { switch val := result.(type) { @@ -1131,6 +1150,7 @@ func (t *test) execRangeEval(cmd *evalCmd, engine promql.QueryEngine) error { if err != nil { return fmt.Errorf("error creating range query for %q (line %d): %w", cmd.expr, cmd.line, err) } + defer q.Close() res := q.Exec(t.context) if res.Err != nil { if cmd.fail { @@ -1142,18 +1162,9 @@ func (t *test) execRangeEval(cmd *evalCmd, engine promql.QueryEngine) error { if res.Err == nil && cmd.fail { return fmt.Errorf("expected error evaluating query %q (line %d) but got none", cmd.expr, cmd.line) } - countWarnings, countInfo := res.Warnings.CountWarningsAndInfo() - switch { - case !cmd.warn && countWarnings > 0: - return fmt.Errorf("unexpected warnings evaluating query %q (line %d): %v", cmd.expr, cmd.line, res.Warnings) - case cmd.warn && countWarnings == 0: - return fmt.Errorf("expected warnings evaluating query %q (line %d) but got none", cmd.expr, cmd.line) - case !cmd.info && countInfo > 0: - return fmt.Errorf("unexpected info annotations evaluating query %q (line %d): %v", cmd.expr, cmd.line, res.Warnings) - case cmd.info && countInfo == 0: - return fmt.Errorf("expected info annotations evaluating query %q (line %d) but got none", cmd.expr, cmd.line) + if err := cmd.checkAnnotations(cmd.expr, res.Warnings); err != nil { + return err } - defer q.Close() if err := cmd.compareResult(res.Value); err != nil { return fmt.Errorf("error in %s %s (line %d): %w", cmd, cmd.expr, cmd.line, err) @@ -1196,16 +1207,8 @@ func (t *test) runInstantQuery(iq atModifierTestCase, cmd *evalCmd, engine promq if res.Err == nil && cmd.fail { return fmt.Errorf("expected error evaluating query %q (line %d) but got none", iq.expr, cmd.line) } - countWarnings, countInfo := res.Warnings.CountWarningsAndInfo() - switch { - case !cmd.warn && countWarnings > 0: - return fmt.Errorf("unexpected warnings evaluating query %q (line %d): %v", iq.expr, cmd.line, res.Warnings) - case cmd.warn && countWarnings == 0: - return fmt.Errorf("expected warnings evaluating query %q (line %d) but got none", iq.expr, cmd.line) - case !cmd.info && countInfo > 0: - return fmt.Errorf("unexpected info annotations evaluating query %q (line %d): %v", iq.expr, cmd.line, res.Warnings) - case cmd.info && countInfo == 0: - return fmt.Errorf("expected info annotations evaluating query %q (line %d) but got none", iq.expr, cmd.line) + if err := cmd.checkAnnotations(iq.expr, res.Warnings); err != nil { + return err } err = cmd.compareResult(res.Value) if err != nil { @@ -1218,11 +1221,11 @@ func (t *test) runInstantQuery(iq atModifierTestCase, cmd *evalCmd, engine promq if err != nil { return fmt.Errorf("error creating range query for %q (line %d): %w", cmd.expr, cmd.line, err) } + defer q.Close() rangeRes := q.Exec(t.context) if rangeRes.Err != nil { return fmt.Errorf("error evaluating query %q (line %d) in range mode: %w", iq.expr, cmd.line, rangeRes.Err) } - defer q.Close() if cmd.ordered { // Range queries are always sorted by labels, so skip this test case that expects results in a particular order. return nil @@ -1416,8 +1419,8 @@ func (ll *LazyLoader) appendTill(ts int64) error { // WithSamplesTill loads the samples till given timestamp and executes the given function. func (ll *LazyLoader) WithSamplesTill(ts time.Time, fn func(error)) { - tsMilli := ts.Sub(time.Unix(0, 0).UTC()) / time.Millisecond - fn(ll.appendTill(int64(tsMilli))) + till := ts.Sub(time.Unix(0, 0).UTC()) / time.Millisecond + fn(ll.appendTill(int64(till))) } // QueryEngine returns the LazyLoader's query engine. diff --git a/vendor/github.com/prometheus/prometheus/promql/promqltest/testdata/functions.test b/vendor/github.com/prometheus/prometheus/promql/promqltest/testdata/functions.test index 6d2ade3abc..7fc636450f 100644 --- a/vendor/github.com/prometheus/prometheus/promql/promqltest/testdata/functions.test +++ b/vendor/github.com/prometheus/prometheus/promql/promqltest/testdata/functions.test @@ -929,35 +929,58 @@ eval instant at 1m avg_over_time(metric[2m]) # Tests for stddev_over_time and stdvar_over_time. clear load 10s - metric 0 8 8 2 3 + metric 0 8 8 2 3 + metric_histogram{type="only_histogram"} {{schema:1 sum:2 count:3}}x5 + metric_histogram{type="mix"} 1 1 1 {{schema:1 sum:2 count:3}} {{schema:1 sum:2 count:3}} eval instant at 1m stdvar_over_time(metric[2m]) - {} 10.56 + {} 10.56 eval instant at 1m stddev_over_time(metric[2m]) - {} 3.249615 + {} 3.249615 eval instant at 1m stddev_over_time((metric[2m])) - {} 3.249615 + {} 3.249615 + +# Tests for stddev_over_time and stdvar_over_time with histograms. +eval instant at 1m stddev_over_time(metric_histogram{type="only_histogram"}[2m]) + #empty + +eval_info instant at 1m stddev_over_time(metric_histogram{type="mix"}[2m]) + {type="mix"} 0 + +eval instant at 1m stdvar_over_time(metric_histogram{type="only_histogram"}[2m]) + #empty + +eval_info instant at 1m stdvar_over_time(metric_histogram{type="mix"}[2m]) + {type="mix"} 0 # Tests for stddev_over_time and stdvar_over_time #4927. clear load 10s - metric 1.5990505637277868 1.5990505637277868 1.5990505637277868 + metric 1.5990505637277868 1.5990505637277868 1.5990505637277868 eval instant at 1m stdvar_over_time(metric[1m]) - {} 0 + {} 0 eval instant at 1m stddev_over_time(metric[1m]) - {} 0 + {} 0 # Tests for mad_over_time. clear load 10s - metric 4 6 2 1 999 1 2 + metric 4 6 2 1 999 1 2 + metric_histogram{type="only_histogram"} {{schema:1 sum:2 count:3}}x5 + metric_histogram{type="mix"} 1 1 1 {{schema:1 sum:2 count:3}} {{schema:1 sum:2 count:3}} eval instant at 70s mad_over_time(metric[70s]) - {} 1 + {} 1 + +eval instant at 70s mad_over_time(metric_histogram{type="only_histogram"}[70s]) + #empty + +eval_info instant at 70s mad_over_time(metric_histogram{type="mix"}[70s]) + {type="mix"} 0 # Tests for quantile_over_time clear @@ -966,6 +989,8 @@ load 10s data{test="two samples"} 0 1 data{test="three samples"} 0 1 2 data{test="uneven samples"} 0 1 4 + data_histogram{test="only histogram samples"} {{schema:0 sum:1 count:2}}x4 + data_histogram{test="mix samples"} 0 1 2 {{schema:0 sum:1 count:2}}x2 eval instant at 1m quantile_over_time(0, data[2m]) {test="two samples"} 0 @@ -1007,6 +1032,12 @@ eval_warn instant at 1m (quantile_over_time(2, (data[2m]))) {test="three samples"} +Inf {test="uneven samples"} +Inf +eval instant at 1m quantile_over_time(0.5, data_histogram{test="only histogram samples"}[2m]) + #empty + +eval_info instant at 1m quantile_over_time(0.5, data_histogram{test="mix samples"}[2m]) + {test="mix samples"} 1 + clear # Test time-related functions. @@ -1120,15 +1151,17 @@ load 5m eval_fail instant at 0m changes({__name__=~'testmetric1|testmetric2'}[5m]) -# Tests for *_over_time clear +# Tests for *_over_time load 10s data{type="numbers"} 2 0 3 data{type="some_nan"} 2 0 NaN data{type="some_nan2"} 2 NaN 1 data{type="some_nan3"} NaN 0 1 data{type="only_nan"} NaN NaN NaN + data_histogram{type="only_histogram"} {{schema:0 sum:1 count:2}} {{schema:0 sum:2 count:3}} {{schema:0 sum:3 count:4}} + data_histogram{type="mix_samples"} 0 1 {{schema:0 sum:1 count:2}} {{schema:0 sum:2 count:3}} eval instant at 1m min_over_time(data[2m]) {type="numbers"} 0 @@ -1137,6 +1170,12 @@ eval instant at 1m min_over_time(data[2m]) {type="some_nan3"} 0 {type="only_nan"} NaN +eval instant at 1m min_over_time(data_histogram{type="only_histogram"}[2m]) + #empty + +eval_info instant at 1m min_over_time(data_histogram{type="mix_samples"}[2m]) + {type="mix_samples"} 0 + eval instant at 1m max_over_time(data[2m]) {type="numbers"} 3 {type="some_nan"} 2 @@ -1144,12 +1183,29 @@ eval instant at 1m max_over_time(data[2m]) {type="some_nan3"} 1 {type="only_nan"} NaN -eval instant at 1m last_over_time(data[2m]) +eval instant at 1m max_over_time(data_histogram{type="only_histogram"}[2m]) + #empty + +eval_info instant at 1m max_over_time(data_histogram{type="mix_samples"}[2m]) + {type="mix_samples"} 1 + +eval instant at 1m last_over_time({__name__=~"data(_histogram)?"}[2m]) data{type="numbers"} 3 data{type="some_nan"} NaN data{type="some_nan2"} 1 data{type="some_nan3"} 1 data{type="only_nan"} NaN + data_histogram{type="only_histogram"} {{schema:0 sum:3 count:4}} + data_histogram{type="mix_samples"} {{schema:0 sum:2 count:3}} + +eval instant at 1m count_over_time({__name__=~"data(_histogram)?"}[2m]) + {type="numbers"} 3 + {type="some_nan"} 3 + {type="some_nan2"} 3 + {type="some_nan3"} 3 + {type="only_nan"} 3 + {type="only_histogram"} 3 + {type="mix_samples"} 4 clear diff --git a/vendor/github.com/prometheus/prometheus/promql/promqltest/testdata/native_histograms.test b/vendor/github.com/prometheus/prometheus/promql/promqltest/testdata/native_histograms.test index 6be298cf7d..414619d5cd 100644 --- a/vendor/github.com/prometheus/prometheus/promql/promqltest/testdata/native_histograms.test +++ b/vendor/github.com/prometheus/prometheus/promql/promqltest/testdata/native_histograms.test @@ -1128,6 +1128,39 @@ eval_warn range from 0 to 12m step 6m sum(metric) eval_warn range from 0 to 12m step 6m avg(metric) {} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} _ +# Test incompatible schemas with additional aggregation operators +eval range from 0 to 12m step 6m count(metric) + {} 2 2 3 + +eval range from 0 to 12m step 6m group(metric) + {} 1 1 1 + +eval range from 0 to 12m step 6m count(limitk(1, metric)) + {} 1 1 1 + +eval range from 0 to 12m step 6m limitk(3, metric) + metric{series="1"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + metric{series="2"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} _ {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} + metric{series="3"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + +eval range from 0 to 12m step 6m limit_ratio(1, metric) + metric{series="1"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + metric{series="2"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} _ {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} + metric{series="3"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + +# Test incompatible schemas with and/or +eval range from 0 to 12m step 6m metric{series="1"} and ignoring(series) metric{series="2"} + metric{series="1"} _ _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + +eval range from 0 to 12m step 6m metric{series="1"} or ignoring(series) metric{series="2"} + metric{series="1"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + metric{series="2"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} _ _ + +# Test incompatible schemas with arithmetic binary operators +eval_warn range from 0 to 12m step 6m metric{series="2"} + ignoring (series) metric{series="3"} + +eval_warn range from 0 to 12m step 6m metric{series="2"} - ignoring (series) metric{series="3"} + clear load 1m diff --git a/vendor/github.com/prometheus/prometheus/rules/alerting.go b/vendor/github.com/prometheus/prometheus/rules/alerting.go index 4f40788e27..ec498c2f5f 100644 --- a/vendor/github.com/prometheus/prometheus/rules/alerting.go +++ b/vendor/github.com/prometheus/prometheus/rules/alerting.go @@ -143,8 +143,9 @@ type AlertingRule struct { logger *slog.Logger - noDependentRules *atomic.Bool - noDependencyRules *atomic.Bool + dependenciesMutex sync.RWMutex + dependentRules []Rule + dependencyRules []Rule } // NewAlertingRule constructs a new AlertingRule. @@ -171,8 +172,6 @@ func NewAlertingRule( evaluationTimestamp: atomic.NewTime(time.Time{}), evaluationDuration: atomic.NewDuration(0), lastError: atomic.NewError(nil), - noDependentRules: atomic.NewBool(false), - noDependencyRules: atomic.NewBool(false), } } @@ -316,20 +315,54 @@ func (r *AlertingRule) Restored() bool { return r.restored.Load() } -func (r *AlertingRule) SetNoDependentRules(noDependentRules bool) { - r.noDependentRules.Store(noDependentRules) +func (r *AlertingRule) SetDependentRules(dependents []Rule) { + r.dependenciesMutex.Lock() + defer r.dependenciesMutex.Unlock() + + r.dependentRules = make([]Rule, len(dependents)) + copy(r.dependentRules, dependents) } func (r *AlertingRule) NoDependentRules() bool { - return r.noDependentRules.Load() + r.dependenciesMutex.RLock() + defer r.dependenciesMutex.RUnlock() + + if r.dependentRules == nil { + return false // We don't know if there are dependent rules. + } + + return len(r.dependentRules) == 0 +} + +func (r *AlertingRule) DependentRules() []Rule { + r.dependenciesMutex.RLock() + defer r.dependenciesMutex.RUnlock() + return r.dependentRules } -func (r *AlertingRule) SetNoDependencyRules(noDependencyRules bool) { - r.noDependencyRules.Store(noDependencyRules) +func (r *AlertingRule) SetDependencyRules(dependencies []Rule) { + r.dependenciesMutex.Lock() + defer r.dependenciesMutex.Unlock() + + r.dependencyRules = make([]Rule, len(dependencies)) + copy(r.dependencyRules, dependencies) } func (r *AlertingRule) NoDependencyRules() bool { - return r.noDependencyRules.Load() + r.dependenciesMutex.RLock() + defer r.dependenciesMutex.RUnlock() + + if r.dependencyRules == nil { + return false // We don't know if there are dependency rules. + } + + return len(r.dependencyRules) == 0 +} + +func (r *AlertingRule) DependencyRules() []Rule { + r.dependenciesMutex.RLock() + defer r.dependenciesMutex.RUnlock() + return r.dependencyRules } // resolvedRetention is the duration for which a resolved alert instance diff --git a/vendor/github.com/prometheus/prometheus/rules/group.go b/vendor/github.com/prometheus/prometheus/rules/group.go index 0965dc2763..3d1bdb22ef 100644 --- a/vendor/github.com/prometheus/prometheus/rules/group.go +++ b/vendor/github.com/prometheus/prometheus/rules/group.go @@ -75,8 +75,6 @@ type Group struct { // defaults to DefaultEvalIterationFunc. evalIterationFunc GroupEvalIterationFunc - // concurrencyController controls the rules evaluation concurrency. - concurrencyController RuleConcurrencyController appOpts *storage.AppendOptions alignEvaluationTimeOnInterval bool } @@ -130,11 +128,6 @@ func NewGroup(o GroupOptions) *Group { evalIterationFunc = DefaultEvalIterationFunc } - concurrencyController := opts.RuleConcurrencyController - if concurrencyController == nil { - concurrencyController = sequentialRuleEvalController{} - } - if opts.Logger == nil { opts.Logger = promslog.NewNopLogger() } @@ -156,7 +149,6 @@ func NewGroup(o GroupOptions) *Group { logger: opts.Logger.With("file", o.File, "group", o.Name), metrics: metrics, evalIterationFunc: evalIterationFunc, - concurrencyController: concurrencyController, appOpts: &storage.AppendOptions{DiscardOutOfOrder: true}, alignEvaluationTimeOnInterval: o.AlignEvaluationTimeOnInterval, } @@ -659,29 +651,51 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } var wg sync.WaitGroup - for i, rule := range g.rules { - select { - case <-g.done: - // There's a chance that the group is asked to return early. In that case, we should - // wait for any in-flight rules to finish evaluating before returning so that we can preserve the same semantics. - // At the time of writing, the main reason for this was to make sure we don't clear seriesInPreviousEval before we're done using it. - wg.Wait() - return - default: - } - - if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) { - wg.Add(1) + ctrl := g.opts.RuleConcurrencyController + if ctrl == nil { + ctrl = sequentialRuleEvalController{} + } - go eval(i, rule, func() { - wg.Done() - ctrl.Done(ctx) - }) - } else { + batches := ctrl.SplitGroupIntoBatches(ctx, g) + if len(batches) == 0 { + // Sequential evaluation when batches aren't set. + // This is the behaviour without a defined RuleConcurrencyController + for i, rule := range g.rules { + // Check if the group has been stopped. + select { + case <-g.done: + return + default: + } eval(i, rule, nil) } + } else { + // Concurrent evaluation. + for _, batch := range batches { + for _, ruleIndex := range batch { + // Check if the group has been stopped. + select { + case <-g.done: + wg.Wait() + return + default: + } + rule := g.rules[ruleIndex] + if len(batch) > 1 && ctrl.Allow(ctx, g, rule) { + wg.Add(1) + + go eval(ruleIndex, rule, func() { + wg.Done() + ctrl.Done(ctx) + }) + } else { + eval(ruleIndex, rule, nil) + } + } + // It is important that we finish processing any rules in this current batch - before we move into the next one. + wg.Wait() + } } - wg.Wait() g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) g.cleanupStaleSeries(ctx, ts) @@ -1076,27 +1090,25 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics { // output metric produced by another rule in its expression (i.e. as its "input"). type dependencyMap map[Rule][]Rule -// dependents returns the count of rules which use the output of the given rule as one of their inputs. -func (m dependencyMap) dependents(r Rule) int { - return len(m[r]) +// dependents returns the rules which use the output of the given rule as one of their inputs. +func (m dependencyMap) dependents(r Rule) []Rule { + return m[r] } -// dependencies returns the count of rules on which the given rule is dependent for input. -func (m dependencyMap) dependencies(r Rule) int { +// dependencies returns the rules on which the given rule is dependent for input. +func (m dependencyMap) dependencies(r Rule) []Rule { if len(m) == 0 { - return 0 + return []Rule{} } - var count int - for _, children := range m { - for _, child := range children { - if child == r { - count++ - } + var dependencies []Rule + for rule, dependents := range m { + if slices.Contains(dependents, r) { + dependencies = append(dependencies, rule) } } - return count + return dependencies } // isIndependent determines whether the given rule is not dependent on another rule for its input, nor is any other rule @@ -1106,7 +1118,7 @@ func (m dependencyMap) isIndependent(r Rule) bool { return false } - return m.dependents(r)+m.dependencies(r) == 0 + return len(m.dependents(r)) == 0 && len(m.dependencies(r)) == 0 } // buildDependencyMap builds a data-structure which contains the relationships between rules within a group. diff --git a/vendor/github.com/prometheus/prometheus/rules/manager.go b/vendor/github.com/prometheus/prometheus/rules/manager.go index 58020126e5..62cf625810 100644 --- a/vendor/github.com/prometheus/prometheus/rules/manager.go +++ b/vendor/github.com/prometheus/prometheus/rules/manager.go @@ -473,8 +473,8 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { // RuleDependencyController controls whether a set of rules have dependencies between each other. type RuleDependencyController interface { // AnalyseRules analyses dependencies between the input rules. For each rule that it's guaranteed - // not having any dependants and/or dependency, this function should call Rule.SetNoDependentRules(true) - // and/or Rule.SetNoDependencyRules(true). + // not having any dependants and/or dependency, this function should call Rule.SetDependentRules(...) + // and/or Rule.SetDependencyRules(...). AnalyseRules(rules []Rule) } @@ -489,15 +489,22 @@ func (c ruleDependencyController) AnalyseRules(rules []Rule) { } for _, r := range rules { - r.SetNoDependentRules(depMap.dependents(r) == 0) - r.SetNoDependencyRules(depMap.dependencies(r) == 0) + r.SetDependentRules(depMap.dependents(r)) + r.SetDependencyRules(depMap.dependencies(r)) } } +// ConcurrentRules represents a slice of indexes of rules that can be evaluated concurrently. +type ConcurrentRules []int + // RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently. // Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus // server with additional query load. Concurrency is controlled globally, not on a per-group basis. type RuleConcurrencyController interface { + // SplitGroupIntoBatches returns an ordered slice of of ConcurrentRules, which are batches of rules that can be evaluated concurrently. + // The rules are represented by their index from the input rule group. + SplitGroupIntoBatches(ctx context.Context, group *Group) []ConcurrentRules + // Allow determines if the given rule is allowed to be evaluated concurrently. // If Allow() returns true, then Done() must be called to release the acquired slot and corresponding cleanup is done. // It is important that both *Group and Rule are not retained and only be used for the duration of the call. @@ -519,21 +526,51 @@ func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyControlle } func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool { - // To allow a rule to be executed concurrently, we need 3 conditions: - // 1. The rule must not have any rules that depend on it. - // 2. The rule itself must not depend on any other rules. - // 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot. - if rule.NoDependentRules() && rule.NoDependencyRules() { - return c.sema.TryAcquire(1) + return c.sema.TryAcquire(1) +} + +func (c *concurrentRuleEvalController) SplitGroupIntoBatches(_ context.Context, g *Group) []ConcurrentRules { + // Using the rule dependency controller information (rules being identified as having no dependencies or no dependants), + // we can safely run the following concurrent groups: + // 1. Concurrently, all rules that have no dependencies + // 2. Sequentially, all rules that have both dependencies and dependants + // 3. Concurrently, all rules that have no dependants + + var noDependencies []int + var dependenciesAndDependants []int + var noDependants []int + + for i, r := range g.rules { + switch { + case r.NoDependencyRules(): + noDependencies = append(noDependencies, i) + case !r.NoDependentRules() && !r.NoDependencyRules(): + dependenciesAndDependants = append(dependenciesAndDependants, i) + case r.NoDependentRules(): + noDependants = append(noDependants, i) + } } - return false + var order []ConcurrentRules + if len(noDependencies) > 0 { + order = append(order, noDependencies) + } + for _, r := range dependenciesAndDependants { + order = append(order, []int{r}) + } + if len(noDependants) > 0 { + order = append(order, noDependants) + } + + return order } func (c *concurrentRuleEvalController) Done(_ context.Context) { c.sema.Release(1) } +var _ RuleConcurrencyController = &sequentialRuleEvalController{} + // sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially. type sequentialRuleEvalController struct{} @@ -541,6 +578,10 @@ func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule) return false } +func (c sequentialRuleEvalController) SplitGroupIntoBatches(_ context.Context, g *Group) []ConcurrentRules { + return nil +} + func (c sequentialRuleEvalController) Done(_ context.Context) {} // FromMaps returns new sorted Labels from the given maps, overriding each other in order. diff --git a/vendor/github.com/prometheus/prometheus/rules/recording.go b/vendor/github.com/prometheus/prometheus/rules/recording.go index 52c2a875ab..3b6db210af 100644 --- a/vendor/github.com/prometheus/prometheus/rules/recording.go +++ b/vendor/github.com/prometheus/prometheus/rules/recording.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "net/url" + "sync" "time" "go.uber.org/atomic" @@ -43,8 +44,9 @@ type RecordingRule struct { // Duration of how long it took to evaluate the recording rule. evaluationDuration *atomic.Duration - noDependentRules *atomic.Bool - noDependencyRules *atomic.Bool + dependenciesMutex sync.RWMutex + dependentRules []Rule + dependencyRules []Rule } // NewRecordingRule returns a new recording rule. @@ -57,8 +59,6 @@ func NewRecordingRule(name string, vector parser.Expr, lset labels.Labels) *Reco evaluationTimestamp: atomic.NewTime(time.Time{}), evaluationDuration: atomic.NewDuration(0), lastError: atomic.NewError(nil), - noDependentRules: atomic.NewBool(false), - noDependencyRules: atomic.NewBool(false), } } @@ -172,18 +172,52 @@ func (rule *RecordingRule) GetEvaluationTimestamp() time.Time { return rule.evaluationTimestamp.Load() } -func (rule *RecordingRule) SetNoDependentRules(noDependentRules bool) { - rule.noDependentRules.Store(noDependentRules) +func (rule *RecordingRule) SetDependentRules(dependents []Rule) { + rule.dependenciesMutex.Lock() + defer rule.dependenciesMutex.Unlock() + + rule.dependentRules = make([]Rule, len(dependents)) + copy(rule.dependentRules, dependents) } func (rule *RecordingRule) NoDependentRules() bool { - return rule.noDependentRules.Load() + rule.dependenciesMutex.RLock() + defer rule.dependenciesMutex.RUnlock() + + if rule.dependentRules == nil { + return false // We don't know if there are dependent rules. + } + + return len(rule.dependentRules) == 0 +} + +func (rule *RecordingRule) DependentRules() []Rule { + rule.dependenciesMutex.RLock() + defer rule.dependenciesMutex.RUnlock() + return rule.dependentRules } -func (rule *RecordingRule) SetNoDependencyRules(noDependencyRules bool) { - rule.noDependencyRules.Store(noDependencyRules) +func (rule *RecordingRule) SetDependencyRules(dependencies []Rule) { + rule.dependenciesMutex.Lock() + defer rule.dependenciesMutex.Unlock() + + rule.dependencyRules = make([]Rule, len(dependencies)) + copy(rule.dependencyRules, dependencies) } func (rule *RecordingRule) NoDependencyRules() bool { - return rule.noDependencyRules.Load() + rule.dependenciesMutex.RLock() + defer rule.dependenciesMutex.RUnlock() + + if rule.dependencyRules == nil { + return false // We don't know if there are dependency rules. + } + + return len(rule.dependencyRules) == 0 +} + +func (rule *RecordingRule) DependencyRules() []Rule { + rule.dependenciesMutex.RLock() + defer rule.dependenciesMutex.RUnlock() + return rule.dependencyRules } diff --git a/vendor/github.com/prometheus/prometheus/rules/rule.go b/vendor/github.com/prometheus/prometheus/rules/rule.go index 687c03d000..33f1755ac5 100644 --- a/vendor/github.com/prometheus/prometheus/rules/rule.go +++ b/vendor/github.com/prometheus/prometheus/rules/rule.go @@ -62,19 +62,25 @@ type Rule interface { // NOTE: Used dynamically by rules.html template. GetEvaluationTimestamp() time.Time - // SetNoDependentRules sets whether there's no other rule in the rule group that depends on this rule. - SetNoDependentRules(bool) + // SetDependentRules sets rules which depend on the output of this rule. + SetDependentRules(rules []Rule) // NoDependentRules returns true if it's guaranteed that in the rule group there's no other rule // which depends on this one. In case this function returns false there's no such guarantee, which // means there may or may not be other rules depending on this one. NoDependentRules() bool - // SetNoDependencyRules sets whether this rule doesn't depend on the output of any rule in the rule group. - SetNoDependencyRules(bool) + // DependentRules returns the rules which depend on the output of this rule. + DependentRules() []Rule + + // SetDependencyRules sets rules on which this rule depends. + SetDependencyRules(rules []Rule) // NoDependencyRules returns true if it's guaranteed that this rule doesn't depend on the output of // any other rule in the group. In case this function returns false there's no such guarantee, which // means the rule may or may not depend on other rules. NoDependencyRules() bool + + // DependencyRules returns the rules on which this rule depends. + DependencyRules() []Rule } diff --git a/vendor/github.com/prometheus/prometheus/scrape/target.go b/vendor/github.com/prometheus/prometheus/scrape/target.go index d05866f863..22cde01c05 100644 --- a/vendor/github.com/prometheus/prometheus/scrape/target.go +++ b/vendor/github.com/prometheus/prometheus/scrape/target.go @@ -295,12 +295,12 @@ func (t *Target) intervalAndTimeout(defaultInterval, defaultDuration time.Durati intervalLabel := t.labels.Get(model.ScrapeIntervalLabel) interval, err := model.ParseDuration(intervalLabel) if err != nil { - return defaultInterval, defaultDuration, fmt.Errorf("Error parsing interval label %q: %w", intervalLabel, err) + return defaultInterval, defaultDuration, fmt.Errorf("error parsing interval label %q: %w", intervalLabel, err) } timeoutLabel := t.labels.Get(model.ScrapeTimeoutLabel) timeout, err := model.ParseDuration(timeoutLabel) if err != nil { - return defaultInterval, defaultDuration, fmt.Errorf("Error parsing timeout label %q: %w", timeoutLabel, err) + return defaultInterval, defaultDuration, fmt.Errorf("error parsing timeout label %q: %w", timeoutLabel, err) } return time.Duration(interval), time.Duration(timeout), nil diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/metadata_watcher.go b/vendor/github.com/prometheus/prometheus/storage/remote/metadata_watcher.go index 9306dcb4c2..d7f376c96a 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/metadata_watcher.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/metadata_watcher.go @@ -38,7 +38,7 @@ type Watchable interface { type noopScrapeManager struct{} func (noop *noopScrapeManager) Get() (*scrape.Manager, error) { - return nil, errors.New("Scrape manager not ready") + return nil, errors.New("scrape manager not ready") } // MetadataWatcher watches the Scrape Manager for a given WriteMetadataTo. diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/queue_manager.go b/vendor/github.com/prometheus/prometheus/storage/remote/queue_manager.go index 475c126eff..4b966059f6 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/queue_manager.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/queue_manager.go @@ -2119,7 +2119,7 @@ func compressPayload(tmpbuf *[]byte, inp []byte, enc Compression) (compressed [] } return compressed, nil default: - return compressed, fmt.Errorf("Unknown compression scheme [%v]", enc) + return compressed, fmt.Errorf("unknown compression scheme [%v]", enc) } } diff --git a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go index 18447f0b6f..6b1cf83502 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go @@ -863,6 +863,11 @@ func (it *ListPostings) Err() error { return nil } +// Len returns the remaining number of postings in the list. +func (it *ListPostings) Len() int { + return len(it.list) +} + // bigEndianPostings implements the Postings interface over a byte stream of // big endian numbers. type bigEndianPostings struct { diff --git a/vendor/github.com/prometheus/prometheus/tsdb/wlog/watcher.go b/vendor/github.com/prometheus/prometheus/tsdb/wlog/watcher.go index 6f1bc1df35..ca74a9ceaf 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/wlog/watcher.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/wlog/watcher.go @@ -679,7 +679,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err // Ensure we read the whole contents of every segment in the checkpoint dir. segs, err := listSegments(checkpointDir) if err != nil { - return fmt.Errorf("Unable to get segments checkpoint dir: %w", err) + return fmt.Errorf("unable to get segments checkpoint dir: %w", err) } for _, segRef := range segs { size, err := getSegmentSize(checkpointDir, segRef.index) diff --git a/vendor/github.com/prometheus/prometheus/web/api/v1/api.go b/vendor/github.com/prometheus/prometheus/web/api/v1/api.go index caba3900f5..5861ac23f1 100644 --- a/vendor/github.com/prometheus/prometheus/web/api/v1/api.go +++ b/vendor/github.com/prometheus/prometheus/web/api/v1/api.go @@ -144,6 +144,8 @@ type PrometheusVersion struct { type RuntimeInfo struct { StartTime time.Time `json:"startTime"` CWD string `json:"CWD"` + Hostname string `json:"hostname"` + ServerTime time.Time `json:"serverTime"` ReloadConfigSuccess bool `json:"reloadConfigSuccess"` LastConfigTime time.Time `json:"lastConfigTime"` CorruptionCount int64 `json:"corruptionCount"` @@ -437,6 +439,10 @@ func (api *API) options(*http.Request) apiFuncResult { } func (api *API) query(r *http.Request) (result apiFuncResult) { + limit, err := parseLimitParam(r.FormValue("limit")) + if err != nil { + return invalidParamError(err, "limit") + } ts, err := parseTimeParam(r, "time", api.now()) if err != nil { return invalidParamError(err, "time") @@ -478,6 +484,15 @@ func (api *API) query(r *http.Request) (result apiFuncResult) { return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} } + warnings := res.Warnings + if limit > 0 { + var isTruncated bool + + res, isTruncated = truncateResults(res, limit) + if isTruncated { + warnings = warnings.Add(errors.New("results truncated due to limit")) + } + } // Optional stats field in response if parameter "stats" is not empty. sr := api.statsRenderer if sr == nil { @@ -489,7 +504,7 @@ func (api *API) query(r *http.Request) (result apiFuncResult) { ResultType: res.Value.Type(), Result: res.Value, Stats: qs, - }, nil, res.Warnings, qry.Close} + }, nil, warnings, qry.Close} } func (api *API) formatQuery(r *http.Request) (result apiFuncResult) { @@ -525,6 +540,10 @@ func extractQueryOpts(r *http.Request) (promql.QueryOpts, error) { } func (api *API) queryRange(r *http.Request) (result apiFuncResult) { + limit, err := parseLimitParam(r.FormValue("limit")) + if err != nil { + return invalidParamError(err, "limit") + } start, err := parseTime(r.FormValue("start")) if err != nil { return invalidParamError(err, "start") @@ -589,6 +608,16 @@ func (api *API) queryRange(r *http.Request) (result apiFuncResult) { return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} } + warnings := res.Warnings + if limit > 0 { + var isTruncated bool + + res, isTruncated = truncateResults(res, limit) + if isTruncated { + warnings = warnings.Add(errors.New("results truncated due to limit")) + } + } + // Optional stats field in response if parameter "stats" is not empty. sr := api.statsRenderer if sr == nil { @@ -600,7 +629,7 @@ func (api *API) queryRange(r *http.Request) (result apiFuncResult) { ResultType: res.Value.Type(), Result: res.Value, Stats: qs, - }, nil, res.Warnings, qry.Close} + }, nil, warnings, qry.Close} } func (api *API) queryExemplars(r *http.Request) apiFuncResult { @@ -2015,7 +2044,7 @@ func parseTimeParam(r *http.Request, paramName string, defaultValue time.Time) ( } result, err := parseTime(val) if err != nil { - return time.Time{}, fmt.Errorf("Invalid time value for '%s': %w", paramName, err) + return time.Time{}, fmt.Errorf("invalid time value for '%s': %w", paramName, err) } return result, nil } @@ -2101,3 +2130,25 @@ func toHintLimit(limit int) int { } return limit } + +// truncateResults truncates result for queryRange() and query(). +// No truncation for other types(Scalars or Strings). +func truncateResults(result *promql.Result, limit int) (*promql.Result, bool) { + isTruncated := false + + switch v := result.Value.(type) { + case promql.Matrix: + if len(v) > limit { + result.Value = v[:limit] + isTruncated = true + } + case promql.Vector: + if len(v) > limit { + result.Value = v[:limit] + isTruncated = true + } + } + + // Return the modified result. Unchanged for other types. + return result, isTruncated +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 0f989a32b4..809e3eacc4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1040,7 +1040,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20250109135143-114aaaadc203 +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20250110020350-a1e2bcf4a615 ## explicit; go 1.22.7 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1704,7 +1704,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250109135143-114aaaadc203 +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250110020350-a1e2bcf4a615 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240531075221-3685f1377d7b