Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use pdata for sql data #5546

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions internal/signalfx-agent/pkg/monitors/sql/expressions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package sql

import (
"database/sql"
"go.opentelemetry.io/collector/pdata/pmetric"
"reflect"

"github.com/signalfx/golib/v3/datapoint"
"github.com/signalfx/golib/v3/sfxclient"
"github.com/signalfx/signalfx-agent/pkg/utils"
)

type ExprEnv map[string]interface{}
Expand Down Expand Up @@ -80,11 +77,20 @@ func convertToFloatOrPanic(val interface{}) float64 {
return rVal.Convert(floatType).Float()
}

func (e ExprEnv) GAUGE(metric string, dims map[string]interface{}, val interface{}) *datapoint.Datapoint {

return sfxclient.GaugeF(metric, utils.StringInterfaceMapToStringMap(dims), convertToFloatOrPanic(val))
func (e ExprEnv) GAUGE(metric string, dims map[string]interface{}, val interface{}) pmetric.Metric {
m := pmetric.NewMetric()
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.SetDoubleValue(convertToFloatOrPanic(val))
dp.Attributes().FromRaw(dims)
m.SetName(metric)
return m
}

func (e ExprEnv) CUMULATIVE(metric string, dims map[string]interface{}, val interface{}) *datapoint.Datapoint {
return sfxclient.CumulativeF(metric, utils.StringInterfaceMapToStringMap(dims), convertToFloatOrPanic(val))
func (e ExprEnv) CUMULATIVE(metric string, dims map[string]interface{}, val interface{}) pmetric.Metric {
m := pmetric.NewMetric()
dp := m.SetEmptySum().DataPoints().AppendEmpty()
dp.SetDoubleValue(convertToFloatOrPanic(val))
dp.Attributes().FromRaw(dims)
m.SetName(metric)
return m
}
12 changes: 12 additions & 0 deletions internal/signalfx-agent/pkg/monitors/sql/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"go.opentelemetry.io/collector/pdata/pmetric"
"strings"
"time"

Expand Down Expand Up @@ -75,6 +76,17 @@ func (m *Metric) NewDatapoint() *datapoint.Datapoint {
return datapoint.New(m.MetricName, map[string]string{}, nil, typ, time.Time{})
}

func (m *Metric) NewMetric() pmetric.Metric {
met := pmetric.NewMetric()
if m.IsCumulative {
met.SetEmptySum()
} else {
met.SetEmptyGauge()
}
met.SetName(m.MetricName)
return met
}

// Config for this monitor
type Config struct {
config.MonitorConfig `yaml:",inline" acceptsEndpoints:"true"`
Expand Down
91 changes: 59 additions & 32 deletions internal/signalfx-agent/pkg/monitors/sql/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package sql
import (
"context"
"database/sql"
"errors"
"fmt"
"go.opentelemetry.io/collector/pdata/pmetric"
"reflect"
"strings"
"time"

"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
"github.com/davecgh/go-spew/spew"
"github.com/signalfx/golib/v3/datapoint"
"github.com/sirupsen/logrus"

"github.com/signalfx/signalfx-agent/pkg/monitors/types"
Expand Down Expand Up @@ -95,13 +96,13 @@ func (q *querier) doQuery(ctx context.Context, database *sql.DB, output types.Ou
return fmt.Errorf("error executing statement %s: %v", q.query.Query, err)
}
for rows.Next() {
dps, dims, err := q.convertCurrentRowToDatapointAndDimensions(rows)
metrics, dims, err := q.convertCurrentRowToDatapointAndDimensions(rows)
if err != nil {
rows.Close()
return err
}

output.SendDatapoints(dps...)
output.SendMetrics(metrics...)

for i := range dims {
for _, dim := range dims[i] {
Expand All @@ -112,7 +113,7 @@ func (q *querier) doQuery(ctx context.Context, database *sql.DB, output types.Ou
return rows.Close()
}

func (q *querier) convertCurrentRowToDatapointAndDimensions(rows *sql.Rows) ([]*datapoint.Datapoint, [][]*types.Dimension, error) {
func (q *querier) convertCurrentRowToDatapointAndDimensions(rows *sql.Rows) ([]pmetric.Metric, [][]*types.Dimension, error) {
rowSlice, err := q.getRowSlice(rows)
if err != nil {
return nil, nil, err
Expand All @@ -128,31 +129,30 @@ func (q *querier) convertCurrentRowToDatapointAndDimensions(rows *sql.Rows) ([]*
if q.logQueries {
q.logger.Info("Got results %s", spew.Sdump(rowSlice))
}
m := make([]pmetric.Metric, 0, len(q.query.Metrics)+len(q.query.DatapointExpressions))

dps := make([]*datapoint.Datapoint, 0, len(q.query.Metrics)+len(q.query.DatapointExpressions))
var dims [][]*types.Dimension

if len(q.query.Metrics) > 0 {
var err error
var structuredDPs []*datapoint.Datapoint
structuredDPs, dims, err = q.convertCurrentRowStructured(rowSlice, columnNames)
var structuredMetrics []pmetric.Metric
structuredMetrics, dims, err = q.convertCurrentRowStructured(rowSlice, columnNames)
m = append(m, structuredMetrics...)
if err != nil {
q.logger.WithError(err).Warn("Failed to convert row to datapoints and dimensions")
}

dps = append(dps, structuredDPs...)
}

if len(q.query.DatapointExpressions) > 0 {
exprDPs := q.convertCurrentRowExpressions(rowSlice, columnNames)
dps = append(dps, exprDPs...)
exprMetrics := q.convertCurrentRowExpressions(rowSlice, columnNames)
m = append(m, exprMetrics...)
}

return dps, dims, nil
return m, dims, nil
}

func (q *querier) convertCurrentRowExpressions(rowSlice []interface{}, columnNames []string) []*datapoint.Datapoint {
dps := make([]*datapoint.Datapoint, 0, len(q.compiledExprs))
func (q *querier) convertCurrentRowExpressions(rowSlice []interface{}, columnNames []string) []pmetric.Metric {
result := make([]pmetric.Metric, 0, len(q.compiledExprs))

for _, compiled := range q.compiledExprs {
env := newExprEnv(rowSlice, columnNames)
Expand All @@ -176,18 +176,17 @@ func (q *querier) convertCurrentRowExpressions(rowSlice []interface{}, columnNam
continue
}

if v, ok := dp.(*datapoint.Datapoint); ok {
dps = append(dps, v)
if v, ok := dp.(pmetric.Metric); ok {
result = append(result, v)
} else {
q.logger.WithField("expression", compiled.Source.Content()).WithField("result", dp).Warn("Result of expression is not a datapoint")
q.logger.WithField("expression", compiled.Source.Content()).WithField("result", dp).Warn("Result of expression is not a pmetric.Metric")
continue
}
}

return dps
return result
}

func (q *querier) convertCurrentRowStructured(rowSlice []interface{}, columnNames []string) ([]*datapoint.Datapoint, [][]*types.Dimension, error) {
func (q *querier) convertCurrentRowStructured(rowSlice []interface{}, columnNames []string) ([]pmetric.Metric, [][]*types.Dimension, error) {
// Clone all properties before updating them
for i := range q.dimensions {
for j := range q.dimensions[i] {
Expand All @@ -202,11 +201,13 @@ func (q *querier) convertCurrentRowStructured(rowSlice []interface{}, columnName
}
}

dps := make([]*datapoint.Datapoint, 0, len(q.query.Metrics))
for i := range q.query.Metrics {
dps = append(dps, q.query.Metrics[i].NewDatapoint())
exprMetrics := make([]pmetric.Metric, 0, len(q.query.Metrics))
for _, m := range q.query.Metrics {
exprMetrics = append(exprMetrics, m.NewMetric())
}

var emptyValues []int

for i := range rowSlice {
switch v := rowSlice[i].(type) {
case *sql.NullFloat64:
Expand All @@ -219,9 +220,16 @@ func (q *querier) convertCurrentRowStructured(rowSlice []interface{}, columnName
// This is a logical error in the code, not user input error
panic("valueColumn was not properly mapped to metric")
}

dp := dps[q.metricToIndex[metric]]
dp.Value = datapoint.NewFloatValue(v.Float64)
q.query.Metrics[i].NewDatapoint()
dp := exprMetrics[q.metricToIndex[metric]]
switch dp.Type() {
case pmetric.MetricTypeSum:
dp.Sum().DataPoints().AppendEmpty().SetDoubleValue(v.Float64)
case pmetric.MetricTypeGauge:
dp.Gauge().DataPoints().AppendEmpty().SetDoubleValue(v.Float64)
default:
return nil, nil, errors.New("invalid metric type")
}

case *sql.NullString:
dimVal := v.String
Expand All @@ -245,23 +253,42 @@ func (q *querier) convertCurrentRowStructured(rowSlice []interface{}, columnName
if !q.dimensionColumnSets[j][strings.ToLower(columnNames[i])] {
continue
}

dps[j].Dimensions[columnNames[i]] = dimVal
dp := exprMetrics[j]
switch dp.Type() {
case pmetric.MetricTypeSum:
dp.Sum().DataPoints().At(0).Attributes().PutStr(columnNames[i], dimVal)
case pmetric.MetricTypeGauge:
dp.Gauge().DataPoints().At(0).Attributes().PutStr(columnNames[i], dimVal)
default:
return nil, nil, errors.New("invalid metric type")
}
}
default:
emptyValues = append(emptyValues, i)
}
}

var n int
for i := range dps {
if dps[i].Value == nil {
for i := range exprMetrics {
dp := exprMetrics[i]
var empty bool
switch dp.Type() {
case pmetric.MetricTypeSum:
empty = dp.Sum().DataPoints().Len() == 0
case pmetric.MetricTypeGauge:
empty = dp.Gauge().DataPoints().Len() == 0
default:
empty = true
}
if empty {
q.logger.Warnf("Metric %s's value column '%s' did not correspond to a value\nrowSlice: %s", q.query.Metrics[i].MetricName, q.query.Metrics[i].ValueColumn, spew.Sdump(rowSlice))
continue
}
dps[n] = dps[i]
exprMetrics[n] = exprMetrics[i]
n++
}

return dps[:n], q.dimensions, nil
return exprMetrics, q.dimensions, nil
}

func (q *querier) getRowSlice(rows *sql.Rows) ([]interface{}, error) {
Expand Down
Loading