Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export query itself together with queryId in stat_statement metrics #940

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ This will build the docker image as `prometheuscommunity/postgres_exporter:${bra
* `[no-]collector.stat_statements`
Enable the `stat_statements` collector (default: disabled).

* `[no-]collector.stat_statements.include_query`
Enable selecting statement query together with queryId. (default: disabled)

* `[no-]collector.stat_user_tables`
Enable the `stat_user_tables` collector (default: enabled).

Expand Down
73 changes: 65 additions & 8 deletions collector/pg_stat_statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package collector
import (
"context"
"database/sql"
"fmt"
"github.com/alecthomas/kingpin/v2"
"strings"

"github.com/blang/semver/v4"
"github.com/go-kit/log"
Expand All @@ -24,19 +27,31 @@ import (

const statStatementsSubsystem = "stat_statements"

var includeQueryFlag *bool = nil

func init() {
// WARNING:
// Disabled by default because this set of metrics can be quite expensive on a busy server
// Every unique query will cause a new timeseries to be created
registerCollector(statStatementsSubsystem, defaultDisabled, NewPGStatStatementsCollector)

flagName := fmt.Sprintf("collector.%s.include_query", statStatementsSubsystem)
flagEnvName := fmt.Sprintf("PG_EXPORTER_COLLECTOR_%s_INCLUDE_QUERY", strings.ToUpper(statStatementsSubsystem))
SuperQ marked this conversation as resolved.
Show resolved Hide resolved
flagHelp := "Enable selecting statement query together with queryId. (default: false)"
defaultValue := fmt.Sprintf("%v", defaultDisabled)
includeQueryFlag = kingpin.Flag(flagName, flagHelp).Default(defaultValue).Envar(flagEnvName).Bool()
}

type PGStatStatementsCollector struct {
log log.Logger
log log.Logger
includeQueryStatement bool
}

func NewPGStatStatementsCollector(config collectorConfig) (Collector, error) {
return &PGStatStatementsCollector{log: config.logger}, nil
return &PGStatStatementsCollector{
log: config.logger,
includeQueryStatement: *includeQueryFlag,
}, nil
}

var (
Expand Down Expand Up @@ -71,10 +86,20 @@ var (
prometheus.Labels{},
)

statStatementsQuery = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statStatementsSubsystem, "query_id"),
"SQL Query to queryid mapping",
[]string{"queryid", "query"},
prometheus.Labels{},
)

pgStatStatementQuerySelect = "pg_stat_statements.query,"

pgStatStatementsQuery = `SELECT
pg_get_userbyid(userid) as user,
pg_database.datname,
pg_stat_statements.queryid,
%s
pg_stat_statements.calls as calls_total,
pg_stat_statements.total_time / 1000.0 as seconds_total,
pg_stat_statements.rows as rows_total,
Expand All @@ -96,6 +121,7 @@ var (
pg_get_userbyid(userid) as user,
pg_database.datname,
pg_stat_statements.queryid,
%s
pg_stat_statements.calls as calls_total,
pg_stat_statements.total_exec_time / 1000.0 as seconds_total,
pg_stat_statements.rows as rows_total,
Expand All @@ -114,25 +140,37 @@ var (
LIMIT 100;`
)

func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
query := pgStatStatementsQuery
func (c PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
queryTemplate := pgStatStatementsQuery
if instance.version.GE(semver.MustParse("13.0.0")) {
query = pgStatStatementsNewQuery
queryTemplate = pgStatStatementsNewQuery
}
var querySelect = ""
if c.includeQueryStatement {
querySelect = pgStatStatementQuerySelect
}
query := fmt.Sprintf(queryTemplate, querySelect)

db := instance.getDB()
rows, err := db.QueryContext(ctx, query)

var presentQueryIds = make(map[string]struct{})

if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var user, datname, queryid sql.NullString
var user, datname, queryid, statement sql.NullString
var callsTotal, rowsTotal sql.NullInt64
var secondsTotal, blockReadSecondsTotal, blockWriteSecondsTotal sql.NullFloat64

if err := rows.Scan(&user, &datname, &queryid, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal); err != nil {
var columns []any
if c.includeQueryStatement {
columns = []any{&user, &datname, &queryid, &statement, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal}
} else {
columns = []any{&user, &datname, &queryid, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal}
}
if err := rows.Scan(columns...); err != nil {
return err
}

Expand Down Expand Up @@ -203,6 +241,25 @@ func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance,
blockWriteSecondsTotalMetric,
userLabel, datnameLabel, queryidLabel,
)

if c.includeQueryStatement {
_, ok := presentQueryIds[queryidLabel]
if !ok {
presentQueryIds[queryidLabel] = struct{}{}

queryLabel := "unknown"
if statement.Valid {
queryLabel = statement.String
}

ch <- prometheus.MustNewConstMetric(
statStatementsQuery,
prometheus.CounterValue,
1,
queryidLabel, queryLabel,
)
}
}
}
if err := rows.Err(); err != nil {
return err
Expand Down
139 changes: 136 additions & 3 deletions collector/pg_stat_statements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package collector

import (
"context"
"fmt"
"testing"

"github.com/DATA-DOG/go-sqlmock"
Expand All @@ -35,7 +36,7 @@ func TestPGStateStatementsCollector(t *testing.T) {
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2)
mock.ExpectQuery(sanitizeQuery(pgStatStatementsQuery)).WillReturnRows(rows)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, ""))).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
Expand Down Expand Up @@ -66,6 +67,50 @@ func TestPGStateStatementsCollector(t *testing.T) {
}
}

func TestPGStateStatementsCollectorWithStatement(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db, version: semver.MustParse("12.0.0")}

columns := []string{"user", "datname", "queryid", "query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, pgStatStatementQuerySelect))).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatStatementsCollector{includeQueryStatement: true}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2},
{labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1},
}

convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPGStateStatementsCollectorNull(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
Expand All @@ -78,7 +123,7 @@ func TestPGStateStatementsCollectorNull(t *testing.T) {
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil, nil, nil, nil, nil)
mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, ""))).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
Expand Down Expand Up @@ -109,6 +154,50 @@ func TestPGStateStatementsCollectorNull(t *testing.T) {
}
}

func TestPGStateStatementsCollectorNullWithStatement(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db, version: semver.MustParse("13.3.7")}

columns := []string{"user", "datname", "queryid", "query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, pgStatStatementQuerySelect))).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatStatementsCollector{includeQueryStatement: true}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"queryid": "unknown", "query": "unknown"}, metricType: dto.MetricType_COUNTER, value: 1},
}

convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPGStateStatementsCollectorNewPG(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
Expand All @@ -121,7 +210,7 @@ func TestPGStateStatementsCollectorNewPG(t *testing.T) {
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2)
mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, ""))).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
Expand Down Expand Up @@ -151,3 +240,47 @@ func TestPGStateStatementsCollectorNewPG(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPGStateStatementsCollectorNewPGWithStatement(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db, version: semver.MustParse("13.3.7")}

columns := []string{"user", "datname", "queryid", "query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, pgStatStatementQuerySelect))).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatStatementsCollector{includeQueryStatement: true}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2},
{labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1},
}

convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}