Skip to content

Commit

Permalink
RSDK-9870: Check for schema changes all the time. Allow for FTDC to c…
Browse files Browse the repository at this point in the history
…onsume maps. (#4756)
  • Loading branch information
dgottlieb authored Jan 31, 2025
1 parent 7824694 commit 8acdd7f
Show file tree
Hide file tree
Showing 4 changed files with 417 additions and 386 deletions.
266 changes: 130 additions & 136 deletions ftdc/custom_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"math"
"reflect"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -124,8 +125,6 @@ func writeDatum(time int64, prev, curr []float32, output io.Writer) error {
return nil
}

var errNotStruct = errors.New("stats object is not a struct")

func isNumeric(kind reflect.Kind) bool {
return kind == reflect.Bool ||
kind == reflect.Int ||
Expand All @@ -135,23 +134,122 @@ func isNumeric(kind reflect.Kind) bool {
kind == reflect.Float32 || kind == reflect.Float64
}

func flattenStruct(item reflect.Value) ([]float32, error) {
flattenPtr := func(inp reflect.Value) reflect.Value {
for inp.Kind() == reflect.Pointer || inp.Kind() == reflect.Interface {
if inp.IsNil() {
return inp
func flattenPtr(inp reflect.Value) reflect.Value {
for inp.Kind() == reflect.Pointer || inp.Kind() == reflect.Interface {
if inp.IsNil() {
return inp
}

inp = inp.Elem()
}
return inp
}

func flatten(value reflect.Value) ([]string, []float32, error) {
value = flattenPtr(value)

// why is the default case not sufficient to be considered exhaustive?
//nolint:exhaustive
switch value.Kind() {
case reflect.Struct:
return flattenStruct(value)
case reflect.Map:
return flattenMap(value)
default:
// We can get here, for example, if a struct member is typed as an `any`, but the value is
// nil. More antagonistically, this also catches weird types such as channels.
return []string{}, []float32{}, nil
}
}

type mapSorter struct {
fields []string
values []float32
}

func (ms mapSorter) Len() int {
return len(ms.fields)
}

func (ms mapSorter) Less(left, right int) bool {
return ms.fields[left] < ms.fields[right]
}

func (ms mapSorter) Swap(left, right int) {
ms.fields[left], ms.fields[right] = ms.fields[right], ms.fields[left]
ms.values[left], ms.values[right] = ms.values[right], ms.values[left]
}

// flattenMap must be passed in a map where the keys are explicitly typed as strings. The values can
// be any terminal type (e.g: numbers) or more maps of strings.
func flattenMap(mValue reflect.Value) ([]string, []float32, error) {
if mValue.Type().Key().Kind() != reflect.String {
// We ignore types we refuse to serialize into ftdc.
return []string{}, []float32{}, nil
}

fields := make([]string, 0)
numbers := make([]float32, 0)

// Map iteration order is not predictable. This means that consecutive calls to a `Statser` that
// returns a map may yield: {"X": 1, "Y": 2} for one stat followed by {"Y": 2, "X": 1}. That
// sequence would result in us rewriting out the schema. We will build up results in map
// iteration order here and sort them later.
for iter := mValue.MapRange(); iter.Next(); {
key := iter.Key()
value := flattenPtr(iter.Value())

switch {
case value.CanUint():
fields = append(fields, key.String())
numbers = append(numbers, float32(value.Uint()))
case value.CanInt():
fields = append(fields, key.String())
numbers = append(numbers, float32(value.Int()))
case value.CanFloat():
fields = append(fields, key.String())
numbers = append(numbers, float32(value.Float()))
case value.Kind() == reflect.Bool:
fields = append(fields, key.String())
if value.Bool() {
numbers = append(numbers, 1)
} else {
numbers = append(numbers, 0)
}
case value.Kind() == reflect.Struct ||
value.Kind() == reflect.Pointer ||
value.Kind() == reflect.Interface ||
value.Kind() == reflect.Map:
subFields, subNumbers, err := flatten(value)
if err != nil {
return nil, nil, err
}

inp = inp.Elem()
for _, subField := range subFields {
fields = append(fields, fmt.Sprintf("%v.%v", key.String(), subField))
}
numbers = append(numbers, subNumbers...)
case isNumeric(value.Kind()):
//nolint:stylecheck
return nil, nil, fmt.Errorf("A numeric type was forgotten to be included. Kind: %v", value.Kind())
default:
// Getting the keys for a structure will ignore these types. Such as the antagonistic
// `channel`, or `string`. We follow suit in ignoring these types.
}
return inp
}

rVal := flattenPtr(item)
if rVal.Kind() != reflect.Struct {
return []float32{}, nil
}
// Sort `fields` in-place to ascending order to combat random map iteration order. This will
// also make the corresponding swaps on the `numbers` slice.
sort.Sort(mapSorter{fields, numbers})

return fields, numbers, nil
}

func flattenStruct(value reflect.Value) ([]string, []float32, error) {
value = flattenPtr(value)
rType := value.Type()

var fields []string
var numbers []float32
// Use reflection to walk the member fields of an individual set of metric readings. We rely
// on reflection always walking fields in the same order.
Expand All @@ -160,99 +258,51 @@ func flattenStruct(item reflect.Value) ([]float32, error) {
// function calls and allocations than some more raw alternatives. For example, we can have
// the "schema" keep a (field, offset, type) index and we instead access get a single unsafe
// pointer to each structure and walk out index to pull out the relevant numbers.
for memberIdx := 0; memberIdx < rVal.NumField(); memberIdx++ {
rField := flattenPtr(rVal.Field(memberIdx))
for memberIdx := 0; memberIdx < value.NumField(); memberIdx++ {
rField := flattenPtr(value.Field(memberIdx))
switch {
case rField.CanUint():
fields = append(fields, rType.Field(memberIdx).Name)
numbers = append(numbers, float32(rField.Uint()))
case rField.CanInt():
fields = append(fields, rType.Field(memberIdx).Name)
numbers = append(numbers, float32(rField.Int()))
case rField.CanFloat():
fields = append(fields, rType.Field(memberIdx).Name)
numbers = append(numbers, float32(rField.Float()))
case rField.Kind() == reflect.Bool:
if rField.Bool() {
fields = append(fields, rType.Field(memberIdx).Name)
numbers = append(numbers, 1)
} else {
fields = append(fields, rType.Field(memberIdx).Name)
numbers = append(numbers, 0)
}
case rField.Kind() == reflect.Struct ||
rField.Kind() == reflect.Pointer ||
rField.Kind() == reflect.Interface:
subNumbers, err := flattenStruct(rField)
rField.Kind() == reflect.Interface ||
rField.Kind() == reflect.Map:
subFields, subNumbers, err := flatten(rField)
if err != nil {
return nil, err
return nil, nil, err
}

thisFieldName := rType.Field(memberIdx).Name
for _, subField := range subFields {
fields = append(fields, fmt.Sprintf("%v.%v", thisFieldName, subField))
}

numbers = append(numbers, subNumbers...)
case isNumeric(rField.Kind()):
//nolint:stylecheck
return nil, fmt.Errorf("A numeric type was forgotten to be included. Kind: %v", rField.Kind())
return nil, nil, fmt.Errorf("A numeric type was forgotten to be included. Kind: %v", rField.Kind())
default:
// Getting the keys for a structure will ignore these types. Such as the antagonistic
// `channel`, or `string`. We follow suit in ignoring these types.
}
}

return numbers, nil
}

// getFieldsForStruct returns the (flattened) list of strings for a metric structure. For example the
// following type:
//
// type Foo {
// PowerPct float64
// Pos int
// }
//
// Will return `["PowerPct", "Pos"]`.
//
// Nested structures will walk and return a "dot delimited" name. E.g:
//
// type ParentFoo {
// Healthy Bool
// FooField Foo
// }
//
// Will return `["Healthy", "FooField.PowerPct", "FooField.Pos"]`.
func getFieldsForStruct(item reflect.Value) ([]string, error) {
flattenPtr := func(inp reflect.Value) reflect.Value {
for inp.Kind() == reflect.Pointer || inp.Kind() == reflect.Interface {
if inp.IsNil() {
return inp
}
inp = inp.Elem()
}
return inp
}

rVal := flattenPtr(item)
if rVal.Kind() != reflect.Struct {
return nil, errNotStruct
}

rType := rVal.Type()
var fields []string
for memberIdx := 0; memberIdx < rVal.NumField(); memberIdx++ {
structField := rType.Field(memberIdx)
fieldVal := rVal.Field(memberIdx)
derefedVal := flattenPtr(fieldVal)
if isNumeric(derefedVal.Kind()) {
fields = append(fields, structField.Name)
continue
}

if derefedVal.Kind() == reflect.Struct {
subFields, err := getFieldsForStruct(derefedVal)
if err != nil {
return nil, err
}

for _, subField := range subFields {
fields = append(fields, fmt.Sprintf("%v.%v", structField.Name, subField))
}
}
}

return fields, nil
return fields, numbers, nil
}

type schemaError struct {
Expand All @@ -264,62 +314,6 @@ func (err *schemaError) Error() string {
return fmt.Sprintf("SchemaError: %s StatserName: %s", err.err.Error(), err.statserName)
}

// getSchema returns a schema for a full FTDC datum. It immortalizes two properties:
// - mapOrder: The order to iterate future input `map[string]any` data.
// - fieldOrder: The order diff bits and values are to be written in.
//
// For correctness, it must be the case that the `mapOrder` and `fieldOrder` are consistent. I.e: if
// the `mapOrder` is `A` then `B`, the `fieldOrder` must list all of the fields of `A` first,
// followed by all the fields of `B`.
func getSchema(data map[string]any) (*schema, *schemaError) {
var mapOrder []string
var fields []string

for name, stats := range data {
mapOrder = append(mapOrder, name)
fieldsForItem, err := getFieldsForStruct(reflect.ValueOf(stats))
if err != nil {
return nil, &schemaError{name, err}
}

for _, field := range fieldsForItem {
// We insert a `.` into every metric/field name we get a recording for. This property is
// assumed elsewhere.
fields = append(fields, fmt.Sprintf("%v.%v", name, field))
}
}

return &schema{
mapOrder: mapOrder,
fieldOrder: fields,
}, nil
}

// flatten takes an input `Datum` and a `mapOrder` from the current `Schema` and returns a list of
// `float32`s representing the readings. Similar to `getFieldsForItem`, there are constraints on
// input data shape that this code currently does not validate.
func flatten(datum datum, schema *schema) ([]float32, error) {
ret := make([]float32, 0, len(schema.fieldOrder))

for _, key := range schema.mapOrder {
// Walk over the datum in `mapOrder` to ensure we gather values in the order consistent with
// the current schema.
stats, exists := datum.Data[key]
if !exists {
//nolint
return nil, fmt.Errorf("Missing statser name. Name: %v", key)
}

numbers, err := flattenStruct(reflect.ValueOf(stats))
if err != nil {
return nil, err
}
ret = append(ret, numbers...)
}

return ret, nil
}

// FlatDatum has the same information as a `datum`, but without the arbitrarily nested `Data`
// map. Using dots to join keys as per the disk format. So where a `Data` map might be:
//
Expand Down
Loading

0 comments on commit 8acdd7f

Please sign in to comment.