diff --git a/ftdc/custom_format.go b/ftdc/custom_format.go index 877d7a7b61a..40b4e92a917 100644 --- a/ftdc/custom_format.go +++ b/ftdc/custom_format.go @@ -9,6 +9,7 @@ import ( "io" "math" "reflect" + "sort" "strings" "time" @@ -124,8 +125,6 @@ func writeDatum(time int64, prev, curr []float32, output io.Writer) error { return nil } -var errNotStruct = errors.New("stats object is not a struct") - func isNumeric(kind reflect.Kind) bool { return kind == reflect.Bool || kind == reflect.Int || @@ -135,23 +134,122 @@ func isNumeric(kind reflect.Kind) bool { kind == reflect.Float32 || kind == reflect.Float64 } -func flattenStruct(item reflect.Value) ([]float32, error) { - flattenPtr := func(inp reflect.Value) reflect.Value { - for inp.Kind() == reflect.Pointer || inp.Kind() == reflect.Interface { - if inp.IsNil() { - return inp +func flattenPtr(inp reflect.Value) reflect.Value { + for inp.Kind() == reflect.Pointer || inp.Kind() == reflect.Interface { + if inp.IsNil() { + return inp + } + + inp = inp.Elem() + } + return inp +} + +func flatten(value reflect.Value) ([]string, []float32, error) { + value = flattenPtr(value) + + // why is the default case not sufficient to be considered exhaustive? + //nolint:exhaustive + switch value.Kind() { + case reflect.Struct: + return flattenStruct(value) + case reflect.Map: + return flattenMap(value) + default: + // We can get here, for example, if a struct member is typed as an `any`, but the value is + // nil. More antagonistically, this also catches weird types such as channels. + return []string{}, []float32{}, nil + } +} + +type mapSorter struct { + fields []string + values []float32 +} + +func (ms mapSorter) Len() int { + return len(ms.fields) +} + +func (ms mapSorter) Less(left, right int) bool { + return ms.fields[left] < ms.fields[right] +} + +func (ms mapSorter) Swap(left, right int) { + ms.fields[left], ms.fields[right] = ms.fields[right], ms.fields[left] + ms.values[left], ms.values[right] = ms.values[right], ms.values[left] +} + +// flattenMap must be passed in a map where the keys are explicitly typed as strings. The values can +// be any terminal type (e.g: numbers) or more maps of strings. +func flattenMap(mValue reflect.Value) ([]string, []float32, error) { + if mValue.Type().Key().Kind() != reflect.String { + // We ignore types we refuse to serialize into ftdc. + return []string{}, []float32{}, nil + } + + fields := make([]string, 0) + numbers := make([]float32, 0) + + // Map iteration order is not predictable. This means that consecutive calls to a `Statser` that + // returns a map may yield: {"X": 1, "Y": 2} for one stat followed by {"Y": 2, "X": 1}. That + // sequence would result in us rewriting out the schema. We will build up results in map + // iteration order here and sort them later. + for iter := mValue.MapRange(); iter.Next(); { + key := iter.Key() + value := flattenPtr(iter.Value()) + + switch { + case value.CanUint(): + fields = append(fields, key.String()) + numbers = append(numbers, float32(value.Uint())) + case value.CanInt(): + fields = append(fields, key.String()) + numbers = append(numbers, float32(value.Int())) + case value.CanFloat(): + fields = append(fields, key.String()) + numbers = append(numbers, float32(value.Float())) + case value.Kind() == reflect.Bool: + fields = append(fields, key.String()) + if value.Bool() { + numbers = append(numbers, 1) + } else { + numbers = append(numbers, 0) + } + case value.Kind() == reflect.Struct || + value.Kind() == reflect.Pointer || + value.Kind() == reflect.Interface || + value.Kind() == reflect.Map: + subFields, subNumbers, err := flatten(value) + if err != nil { + return nil, nil, err } - inp = inp.Elem() + for _, subField := range subFields { + fields = append(fields, fmt.Sprintf("%v.%v", key.String(), subField)) + } + numbers = append(numbers, subNumbers...) + case isNumeric(value.Kind()): + //nolint:stylecheck + return nil, nil, fmt.Errorf("A numeric type was forgotten to be included. Kind: %v", value.Kind()) + default: + // Getting the keys for a structure will ignore these types. Such as the antagonistic + // `channel`, or `string`. We follow suit in ignoring these types. } - return inp } - rVal := flattenPtr(item) - if rVal.Kind() != reflect.Struct { - return []float32{}, nil - } + // Sort `fields` in-place to ascending order to combat random map iteration order. This will + // also make the corresponding swaps on the `numbers` slice. + sort.Sort(mapSorter{fields, numbers}) + + return fields, numbers, nil +} + +func flattenStruct(value reflect.Value) ([]string, []float32, error) { + value = flattenPtr(value) + rType := value.Type() + var fields []string var numbers []float32 // Use reflection to walk the member fields of an individual set of metric readings. We rely // on reflection always walking fields in the same order. @@ -160,99 +258,51 @@ func flattenStruct(item reflect.Value) ([]float32, error) { // function calls and allocations than some more raw alternatives. For example, we can have // the "schema" keep a (field, offset, type) index and we instead access get a single unsafe // pointer to each structure and walk out index to pull out the relevant numbers. - for memberIdx := 0; memberIdx < rVal.NumField(); memberIdx++ { - rField := flattenPtr(rVal.Field(memberIdx)) + for memberIdx := 0; memberIdx < value.NumField(); memberIdx++ { + rField := flattenPtr(value.Field(memberIdx)) switch { case rField.CanUint(): + fields = append(fields, rType.Field(memberIdx).Name) numbers = append(numbers, float32(rField.Uint())) case rField.CanInt(): + fields = append(fields, rType.Field(memberIdx).Name) numbers = append(numbers, float32(rField.Int())) case rField.CanFloat(): + fields = append(fields, rType.Field(memberIdx).Name) numbers = append(numbers, float32(rField.Float())) case rField.Kind() == reflect.Bool: if rField.Bool() { + fields = append(fields, rType.Field(memberIdx).Name) numbers = append(numbers, 1) } else { + fields = append(fields, rType.Field(memberIdx).Name) numbers = append(numbers, 0) } case rField.Kind() == reflect.Struct || rField.Kind() == reflect.Pointer || - rField.Kind() == reflect.Interface: - subNumbers, err := flattenStruct(rField) + rField.Kind() == reflect.Interface || + rField.Kind() == reflect.Map: + subFields, subNumbers, err := flatten(rField) if err != nil { - return nil, err + return nil, nil, err + } + + thisFieldName := rType.Field(memberIdx).Name + for _, subField := range subFields { + fields = append(fields, fmt.Sprintf("%v.%v", thisFieldName, subField)) } + numbers = append(numbers, subNumbers...) case isNumeric(rField.Kind()): //nolint:stylecheck - return nil, fmt.Errorf("A numeric type was forgotten to be included. Kind: %v", rField.Kind()) + return nil, nil, fmt.Errorf("A numeric type was forgotten to be included. Kind: %v", rField.Kind()) default: // Getting the keys for a structure will ignore these types. Such as the antagonistic // `channel`, or `string`. We follow suit in ignoring these types. } } - return numbers, nil -} - -// getFieldsForStruct returns the (flattened) list of strings for a metric structure. For example the -// following type: -// -// type Foo { -// PowerPct float64 -// Pos int -// } -// -// Will return `["PowerPct", "Pos"]`. -// -// Nested structures will walk and return a "dot delimited" name. E.g: -// -// type ParentFoo { -// Healthy Bool -// FooField Foo -// } -// -// Will return `["Healthy", "FooField.PowerPct", "FooField.Pos"]`. -func getFieldsForStruct(item reflect.Value) ([]string, error) { - flattenPtr := func(inp reflect.Value) reflect.Value { - for inp.Kind() == reflect.Pointer || inp.Kind() == reflect.Interface { - if inp.IsNil() { - return inp - } - inp = inp.Elem() - } - return inp - } - - rVal := flattenPtr(item) - if rVal.Kind() != reflect.Struct { - return nil, errNotStruct - } - - rType := rVal.Type() - var fields []string - for memberIdx := 0; memberIdx < rVal.NumField(); memberIdx++ { - structField := rType.Field(memberIdx) - fieldVal := rVal.Field(memberIdx) - derefedVal := flattenPtr(fieldVal) - if isNumeric(derefedVal.Kind()) { - fields = append(fields, structField.Name) - continue - } - - if derefedVal.Kind() == reflect.Struct { - subFields, err := getFieldsForStruct(derefedVal) - if err != nil { - return nil, err - } - - for _, subField := range subFields { - fields = append(fields, fmt.Sprintf("%v.%v", structField.Name, subField)) - } - } - } - - return fields, nil + return fields, numbers, nil } type schemaError struct { @@ -264,62 +314,6 @@ func (err *schemaError) Error() string { return fmt.Sprintf("SchemaError: %s StatserName: %s", err.err.Error(), err.statserName) } -// getSchema returns a schema for a full FTDC datum. It immortalizes two properties: -// - mapOrder: The order to iterate future input `map[string]any` data. -// - fieldOrder: The order diff bits and values are to be written in. -// -// For correctness, it must be the case that the `mapOrder` and `fieldOrder` are consistent. I.e: if -// the `mapOrder` is `A` then `B`, the `fieldOrder` must list all of the fields of `A` first, -// followed by all the fields of `B`. -func getSchema(data map[string]any) (*schema, *schemaError) { - var mapOrder []string - var fields []string - - for name, stats := range data { - mapOrder = append(mapOrder, name) - fieldsForItem, err := getFieldsForStruct(reflect.ValueOf(stats)) - if err != nil { - return nil, &schemaError{name, err} - } - - for _, field := range fieldsForItem { - // We insert a `.` into every metric/field name we get a recording for. This property is - // assumed elsewhere. - fields = append(fields, fmt.Sprintf("%v.%v", name, field)) - } - } - - return &schema{ - mapOrder: mapOrder, - fieldOrder: fields, - }, nil -} - -// flatten takes an input `Datum` and a `mapOrder` from the current `Schema` and returns a list of -// `float32`s representing the readings. Similar to `getFieldsForItem`, there are constraints on -// input data shape that this code currently does not validate. -func flatten(datum datum, schema *schema) ([]float32, error) { - ret := make([]float32, 0, len(schema.fieldOrder)) - - for _, key := range schema.mapOrder { - // Walk over the datum in `mapOrder` to ensure we gather values in the order consistent with - // the current schema. - stats, exists := datum.Data[key] - if !exists { - //nolint - return nil, fmt.Errorf("Missing statser name. Name: %v", key) - } - - numbers, err := flattenStruct(reflect.ValueOf(stats)) - if err != nil { - return nil, err - } - ret = append(ret, numbers...) - } - - return ret, nil -} - // FlatDatum has the same information as a `datum`, but without the arbitrarily nested `Data` // map. Using dots to join keys as per the disk format. So where a `Data` map might be: // diff --git a/ftdc/custom_format_test.go b/ftdc/custom_format_test.go index d55edb3c3ec..a5f8170f21c 100644 --- a/ftdc/custom_format_test.go +++ b/ftdc/custom_format_test.go @@ -26,6 +26,8 @@ type Basic struct { Foo int } +// TestCustomFormatRoundtripBasic is a test of simple FTDC input/output. This tests schema changes, +// but where the "stats" object payload both times is a single integer. func TestCustomFormatRoundtripBasic(t *testing.T) { // This FTDC test will write to this `serializedData`. serializedData := bytes.NewBuffer(nil) @@ -39,7 +41,6 @@ func TestCustomFormatRoundtripBasic(t *testing.T) { Data: map[string]any{ "s1": &Basic{0}, }, - generationID: 1, } ftdc.writeDatum(datumV1) @@ -53,7 +54,6 @@ func TestCustomFormatRoundtripBasic(t *testing.T) { Data: map[string]any{ "s2": &Basic{2}, }, - generationID: 2, } ftdc.writeDatum(datumV2) datumV2.Time = 3 @@ -85,6 +85,9 @@ func TestCustomFormatRoundtripBasic(t *testing.T) { } } +// TestCustomFormatRoundtripRich is a test of simple FTDC input/output. In contrast to the "basic" +// test, this test has datum's with a combination of integers and floats that change in more dynamic +// ways. func TestCustomFormatRoundtripRich(t *testing.T) { // This FTDC test will write to this `serializedData`. serializedData := bytes.NewBuffer(nil) @@ -99,7 +102,6 @@ func TestCustomFormatRoundtripRich(t *testing.T) { Data: map[string]any{ "s1": Statser1{0, idx, 1.0}, }, - generationID: 1, } ftdc.writeDatum(datumV1) @@ -113,7 +115,6 @@ func TestCustomFormatRoundtripRich(t *testing.T) { // The second metric here is to test a value that flips between a diff and no diff. "s2": Statser2{0, 1 + (idx / 3), 100.0}, }, - generationID: 2, } ftdc.writeDatum(datumV2) @@ -154,12 +155,12 @@ func TestCustomFormatRoundtripRich(t *testing.T) { } func TestReflection(t *testing.T) { - fields, err := getFieldsForStruct(reflect.ValueOf(&Basic{100})) + fields, _, err := flatten(reflect.ValueOf(&Basic{100})) test.That(t, err, test.ShouldBeNil) test.That(t, fields, test.ShouldResemble, []string{"Foo"}) - fields, err = getFieldsForStruct(reflect.ValueOf(Statser1{100, 0, 44.4})) + fields, _, err = flatten(reflect.ValueOf(Statser1{100, 0, 44.4})) test.That(t, err, test.ShouldBeNil) test.That(t, fields, test.ShouldResemble, []string{"Metric1", "Metric2", "Metric3"}) @@ -179,7 +180,7 @@ type Nested struct { func TestNestedReflection(t *testing.T) { val := &TopLevel{100, Nested{200, struct{ Z uint8 }{255}}} - fields, err := getFieldsForStruct(reflect.ValueOf(val)) + fields, _, err := flatten(reflect.ValueOf(val)) test.That(t, err, test.ShouldBeNil) test.That(t, fields, test.ShouldResemble, []string{"X", "Nested.Y", "Nested.Deeper.Z"}) @@ -256,12 +257,12 @@ func TestNestedReflectionParity(t *testing.T) { }, } - fields, err := getFieldsForStruct(reflect.ValueOf(complexObj)) + fields, _, err := flatten(reflect.ValueOf(complexObj)) test.That(t, err, test.ShouldBeNil) // There will be one "field" for each number in the above `Complex` structure. test.That(t, fields, test.ShouldResemble, []string{"F1", "F2.F3", "F2.F4", "F5.F6", "F7", "F9", "F10.F11", "F10.F12", "F10.F13", "F14.F15.F16.F17"}) - values, err := flattenStruct(reflect.ValueOf(complexObj)) + _, values, err := flatten(reflect.ValueOf(complexObj)) test.That(t, err, test.ShouldBeNil) // For convenience, the number values match the field name. test.That(t, values, test.ShouldResemble, @@ -277,20 +278,20 @@ func TestNestedAny(t *testing.T) { logger := logging.NewTestLogger(t) stat := nestsAny{10, struct{ X int }{5}} - fields, err := getFieldsForStruct(reflect.ValueOf(stat)) + fields, _, err := flatten(reflect.ValueOf(stat)) logger.Info("Fields:", fields, "Err:", err) test.That(t, fields, test.ShouldResemble, []string{"Number", "Struct.X"}) - values, err := flattenStruct(reflect.ValueOf(stat)) + _, values, err := flatten(reflect.ValueOf(stat)) logger.Info("Values:", values, "Err:", err) test.That(t, values, test.ShouldResemble, []float32{10, 5}) stat = nestsAny{10, nil} - fields, err = getFieldsForStruct(reflect.ValueOf(stat)) + fields, _, err = flatten(reflect.ValueOf(stat)) logger.Info("Fields:", fields, "Err:", err) test.That(t, fields, test.ShouldResemble, []string{"Number"}) - values, err = flattenStruct(reflect.ValueOf(stat)) + _, values, err = flatten(reflect.ValueOf(stat)) logger.Info("Values:", values, "Err:", err) test.That(t, values, test.ShouldResemble, []float32{10}) } @@ -311,11 +312,11 @@ func TestWeirdStats(t *testing.T) { anArray: [5]int{5, 4, 3, 2, 1}, }} - fields, err := getFieldsForStruct(reflect.ValueOf(stat)) + fields, _, err := flatten(reflect.ValueOf(stat)) logger.Info("Fields:", fields, " Err:", err) test.That(t, fields, test.ShouldResemble, []string{"Number", "Struct.hiddenNumeric"}) - values, err := flattenStruct(reflect.ValueOf(stat)) + _, values, err := flatten(reflect.ValueOf(stat)) logger.Info("Values:", values, " Err:", err) test.That(t, values, test.ShouldResemble, []float32{10, 1}) } @@ -325,11 +326,72 @@ func TestNilNestedStats(t *testing.T) { stat := nestsAny{10, nil} - fields, err := getFieldsForStruct(reflect.ValueOf(stat)) + fields, _, err := flatten(reflect.ValueOf(stat)) logger.Info("Fields:", fields, " Err:", err) test.That(t, fields, test.ShouldResemble, []string{"Number"}) - values, err := flattenStruct(reflect.ValueOf(stat)) + _, values, err := flatten(reflect.ValueOf(stat)) logger.Info("Values:", values, " Err:", err) test.That(t, values, test.ShouldResemble, []float32{10}) } + +func TestFlattenMaps(t *testing.T) { + mp := map[string]any{ + "X": 42, + } + + keys, values, err := flatten(reflect.ValueOf(mp)) + test.That(t, err, test.ShouldBeNil) + test.That(t, keys, test.ShouldResemble, []string{"X"}) + test.That(t, values, test.ShouldResemble, []float32{42.0}) + + mp["Y"] = struct { + Foo int + Bar int + }{10, 20} + + keys, values, err = flatten(reflect.ValueOf(mp)) + test.That(t, err, test.ShouldBeNil) + // While iterating maps happens in a non-deterministic order, `flatten` will sort the outputs in + // ascending key order. + test.That(t, keys, test.ShouldResemble, []string{"X", "Y.Bar", "Y.Foo"}) + test.That(t, values, test.ShouldResemble, []float32{42.0, 20.0, 10.0}) +} + +func TestFlattenTheWorld(t *testing.T) { + mp := map[string]any{ + "X": 42, + "Y": struct { + Foo string + Bar int + mp map[int]int + mp2 map[string]any + }{ + "foo", + 5, + map[int]int{1: 2}, + map[string]any{ + "eli": 2, + "patriots": 0, + }, + }, + "Z": map[string]any{"zelda": 64}, + } + + keys, values, err := flatten(reflect.ValueOf(mp)) + test.That(t, err, test.ShouldBeNil) + // While iterating maps happens in a non-deterministic order, `flatten` will sort the outputs in + // ascending key order. + test.That(t, keys, test.ShouldResemble, []string{"X", "Y.Bar", "Y.mp2.eli", "Y.mp2.patriots", "Z.zelda"}) + test.That(t, values, test.ShouldResemble, []float32{42.0, 5.0, 2.0, 0.0, 64.0}) + + mp["Z"] = struct { + Foo int + Bar int + }{10, 20} + + keys, values, err = flatten(reflect.ValueOf(mp)) + test.That(t, err, test.ShouldBeNil) + test.That(t, keys, test.ShouldResemble, []string{"X", "Y.Bar", "Y.mp2.eli", "Y.mp2.patriots", "Z.Bar", "Z.Foo"}) + test.That(t, values, test.ShouldResemble, []float32{42.0, 5.0, 2.0, 0.0, 20.0, 10.0}) +} diff --git a/ftdc/ftdc.go b/ftdc/ftdc.go index 050f05bbada..14e86fc9189 100644 --- a/ftdc/ftdc.go +++ b/ftdc/ftdc.go @@ -9,6 +9,7 @@ import ( "os" "path" "path/filepath" + "reflect" "regexp" "runtime" "slices" @@ -62,8 +63,6 @@ type datum struct { // Time in nanoseconds since the epoch. Time int64 Data map[string]any - - generationID int } // Statser implements Stats. @@ -94,13 +93,6 @@ type FTDC struct { mu sync.Mutex statsers []namedStatser - // Fields used to generate and serialize FTDC output to bytes. - // - // inputGenerationID changes when new pieces are added to FTDC at runtime that change the - // schema. - inputGenerationID int - // outputGenerationID represents the last schema written to the FTDC `outputWriter`. - outputGenerationID int // The schema used describe how new Datums are serialized. currSchema *schema // The serialization format compares new metrics to the prior metric reading to determine what @@ -169,21 +161,18 @@ func (ftdc *FTDC) Add(name string, statser Statser) { for _, statser := range ftdc.statsers { if statser.name == name { - ftdc.logger.Warnw("Trying to add conflicting ftdc section", "name", name, - "generationId", ftdc.inputGenerationID) + ftdc.logger.Warnw("Trying to add conflicting ftdc section", "name", name) // FTDC output is broken down into separate "sections". The `name` is used to label each // section. We return here to predictably include one of the `Add`ed statsers. return } } - ftdc.logger.Debugw("Added statser", "name", name, - "type", fmt.Sprintf("%T", statser), "generationId", ftdc.inputGenerationID) + ftdc.logger.Debugw("Added statser", "name", name, "type", fmt.Sprintf("%T", statser)) ftdc.statsers = append(ftdc.statsers, namedStatser{ name: name, statser: statser, }) - ftdc.inputGenerationID++ } // Remove removes a statser that was previously `Add`ed with the given `name`. @@ -193,16 +182,13 @@ func (ftdc *FTDC) Remove(name string) { for idx, statser := range ftdc.statsers { if statser.name == name { - ftdc.logger.Debugw("Removed statser", "name", name, - "type", fmt.Sprintf("%T", statser.statser), "generationId", ftdc.inputGenerationID) + ftdc.logger.Debugw("Removed statser", "name", name, "type", fmt.Sprintf("%T", statser.statser)) ftdc.statsers = slices.Delete(ftdc.statsers, idx, idx+1) - ftdc.inputGenerationID++ return } } - ftdc.logger.Warnw("Did not find statser to remove", - "name", name, "generationId", ftdc.inputGenerationID) + ftdc.logger.Warnw("Did not find statser to remove", "name", name) } // Start spins off the background goroutine for collecting + writing FTDC data. It's normal for tests @@ -227,10 +213,6 @@ func (ftdc *FTDC) Start() { func (ftdc *FTDC) statsReader(ctx context.Context) { datum := ftdc.constructDatum() - if datum.generationID == 0 { - // No "statsers" were `Add`ed. No data to write out. - return - } // `Debugw` does not seem to serialize any of the `datum` value. ftdc.logger.Debugf("Metrics collected. Datum: %+v", datum) @@ -300,37 +282,6 @@ func (ftdc *FTDC) StopAndJoin(ctx context.Context) { } } -// conditionalRemoveStatser first checks the generation matches before removing the `name` Statser. -func (ftdc *FTDC) conditionalRemoveStatser(name string, generationID int) { - ftdc.mu.Lock() - defer ftdc.mu.Unlock() - - // This function gets called by the "write ftdc" actor. Which is concurrent to a user - // adding/removing `Statser`s. If the datum/name that created a problem came from a different - // "generation", optimistically guess that the user fixed the problem, and avoid removing a - // perhaps working `Statser`. - // - // In the (honestly, more likely) event, the `Statser` is still bad, we will eventually succeed - // in removing it. As later `Datum` objects to write will have an updated `generationId`. - if generationID != ftdc.inputGenerationID { - ftdc.logger.Debugw("Not removing statser due to concurrent operation", - "datumGenerationId", generationID, "ftdcGenerationId", ftdc.inputGenerationID) - return - } - - for idx, statser := range ftdc.statsers { - if statser.name == name { - ftdc.logger.Debugw("Removed statser", "name", name, - "type", fmt.Sprintf("%T", statser.statser), "generationId", ftdc.inputGenerationID) - ftdc.statsers = slices.Delete(ftdc.statsers, idx, idx+1) - ftdc.inputGenerationID++ - return - } - } - - ftdc.logger.Warnw("Did not find statser to remove", "name", name, "generationId", ftdc.inputGenerationID) -} - // constructDatum walks all of the registered `statser`s to construct a `datum`. func (ftdc *FTDC) constructDatum() datum { datum := datum{ @@ -344,7 +295,6 @@ func (ftdc *FTDC) constructDatum() datum { // resource graph. Which is the starting point for creating a deadlock scenario. ftdc.mu.Lock() statsers := make([]namedStatser, len(ftdc.statsers)) - datum.generationID = ftdc.inputGenerationID copy(statsers, ftdc.statsers) ftdc.mu.Unlock() @@ -356,61 +306,139 @@ func (ftdc *FTDC) constructDatum() datum { return datum } -// writeDatum takes an ftdc reading ("Datum") as input and serializes + writes it to the backing -// medium (e.g: a file). See `writeSchema`s documentation for a full description of the file format. -func (ftdc *FTDC) writeDatum(datum datum) error { - toWrite, err := ftdc.getWriter() - if err != nil { - return err +// walk accepts a datum and the previous schema and will return: +// - the new schema. If the schema is unchanged, this will be the same pointer value as `previousSchema`. +// - the flattened float32 data points. +// - an error. All errors (for now) are terminal -- the input datum cannot be output. +func walk(datum map[string]any, previousSchema *schema) (*schema, []float32, error) { + schemaChanged := false + + var ( + fields []string + values []float32 + iterationOrder []string + ) + + // In the steady state, we will have an existing schema. Use that for a `datum` iteration order. + if previousSchema != nil { + fields = make([]string, 0, len(previousSchema.fieldOrder)) + values = make([]float32, 0, len(previousSchema.fieldOrder)) + iterationOrder = previousSchema.mapOrder + } else { + // If this is the first data point, we'll walk the map in... map order. + schemaChanged = true + iterationOrder = make([]string, 0, len(datum)) + for key := range datum { + iterationOrder = append(iterationOrder, key) + } } - // The input `datum` being processed is for a different schema than we were previously using. - if datum.generationID != ftdc.outputGenerationID { - // Compute the new schema and write that to disk. - newSchema, schemaErr := getSchema(datum.Data) - if schemaErr != nil { - ftdc.logger.Warnw("Could not generate schema for statser", - "statser", schemaErr.statserName, "err", schemaErr.err) - // We choose to remove the misbehaving statser such that subsequent datums will be - // well-formed. - ftdc.conditionalRemoveStatser(schemaErr.statserName, datum.generationID) - return schemaErr + // Record the order we iterate through the keys in the input `datum`. We return this in the case + // we learn the schema changed. + datumMapOrder := make([]string, 0, len(datum)) + + // Create a set out of the `inputSchema.mapOrder` as we iterate over it. This will be used to + // see if new keys have been added to the `datum` map that were not in the `previousSchema`. + mapOrderSet := make(map[string]struct{}) + for _, key := range iterationOrder { + mapOrderSet[key] = struct{}{} + + // Walk over the datum in `mapOrder` to ensure we gather values in the order consistent with + // the current schema. + stats, exists := datum[key] + if !exists { + // There was a `Statser` in the previous `datum` that no longer exists. Note the schema + // changed and move on. + schemaChanged = true + continue } - ftdc.currSchema = newSchema - if err = writeSchema(ftdc.currSchema, toWrite); err != nil { - return err + // Get all of the field names and values from the `stats` object. + itemFields, itemNumbers, err := flatten(reflect.ValueOf(stats)) + if err != nil { + return nil, nil, err + } + + datumMapOrder = append(datumMapOrder, key) + // For each field we found, prefix it with the `datum` key (the `Statser` name). + for idx := range itemFields { + fields = append(fields, fmt.Sprintf("%v.%v", key, itemFields[idx])) } + values = append(values, itemNumbers...) + } - // Update the `outputGenerationId` to reflect the new schema. - ftdc.outputGenerationID = datum.generationID + // Check for a schema change by walking all of the keys (`Statser`s) in the input `datum`. Look + // for anything new. + for dataKey, stats := range datum { + if _, exists := mapOrderSet[dataKey]; exists { + // The steady-state is that every key in the input `datum` matches the prior + // `datum`/schema. + continue + } - data, err := flatten(datum, ftdc.currSchema) + // We found a statser that did not exist before. Let's add it to our results. + schemaChanged = true + itemFields, itemNumbers, err := flatten(reflect.ValueOf(stats)) if err != nil { - return err + return nil, nil, err } - // Write the new data point to disk. When schema changes, we do not do any diffing. We write - // a raw value for each metric. - if err = writeDatum(datum.Time, nil, data, toWrite); err != nil { - return err + datumMapOrder = append(datumMapOrder, dataKey) + // Similarly, prefix fields with the `Statser` name. + for idx := range itemFields { + fields = append(fields, fmt.Sprintf("%v.%v", dataKey, itemFields[idx])) } - ftdc.prevFlatData = data + values = append(values, itemNumbers...) + } - return nil + // Even if the keys in the `datum` stayed the same, the values returned by an individual `Stats` + // call may have changed. This ought to be rare, as this results in writing out a new schema and + // is consequently inefficient. But we prefer to have less FTDC data than inaccurate data, or + // more simply, failing. + if previousSchema != nil && !slices.Equal(previousSchema.fieldOrder, fields) { + schemaChanged = true } - // The input `datum` is for the same schema as the prior datum. Flatten the values and write a - // datum entry diffed against the `prevFlatData`. - data, err := flatten(datum, ftdc.currSchema) + // If the schema changed, return a new schema object with the updated schema. + if schemaChanged { + return &schema{datumMapOrder, fields}, values, nil + } + + return previousSchema, values, nil +} + +func (ftdc *FTDC) writeDatum(datum datum) error { + toWrite, err := ftdc.getWriter() if err != nil { return err } - if err = writeDatum(datum.Time, ftdc.prevFlatData, data, toWrite); err != nil { + // walk will return the schema it found alongside the flattened data. Errors are terminal. If + // the schema is the same, the `newSchema` pointer will match `ftdc.currSchema` and `err` will + // be nil. + newSchema, flatData, err := walk(datum.Data, ftdc.currSchema) + if err != nil { return err } - ftdc.prevFlatData = data + + // In the happy path where the schema hasn't changed, the `walk` function is guaranteed to + // return the same schema object. + if ftdc.currSchema != newSchema { + ftdc.currSchema = newSchema + if err = writeSchema(ftdc.currSchema, toWrite); err != nil { + return err + } + + // Write the new data point to disk. When schema changes, we do not do any diffing. We write + // a raw value for each metric. + ftdc.prevFlatData = nil + } + + if err = writeDatum(datum.Time, ftdc.prevFlatData, flatData, toWrite); err != nil { + return err + } + ftdc.prevFlatData = flatData + return nil } @@ -487,18 +515,18 @@ func (ftdc *FTDC) getWriter() (io.Writer, error) { // New file, reset the bytes written counter. ftdc.bytesWrittenCounter.count = 0 - // When we create a new file, we must rewrite the schema. If we do not, a file may be useless - // without its "ancestors". - // - // As a hack, we decrement the `outputGenerationID` to force a new schema to be written. - ftdc.outputGenerationID-- - // Assign the `outputWriter`. The `outputWriter` is an abstraction for where FTDC formatted // bytes go. Testing often prefers to just write bytes into memory (and consequently construct // an FTDC with `NewWithWriter`). While in production we obviously want to persist bytes on // disk. ftdc.outputWriter = io.MultiWriter(&ftdc.bytesWrittenCounter, ftdc.currOutputFile) + // The schema was last persisted in the prior FTDC file. To ensure this file can be understood + // without it, we start it with a copy of the schema. We achieve this by erasing the + // `currSchema` value. Such that the caller/`writeDatum` will behave as if this is a "schema + // change". + ftdc.currSchema = nil + return ftdc.outputWriter, nil } diff --git a/ftdc/ftdc_test.go b/ftdc/ftdc_test.go index 3c7bf115e2e..896540fc41c 100644 --- a/ftdc/ftdc_test.go +++ b/ftdc/ftdc_test.go @@ -12,7 +12,6 @@ import ( "time" "go.viam.com/test" - "go.viam.com/utils/testutils" "go.viam.com/rdk/logging" ) @@ -31,7 +30,19 @@ func (foo *foo) Stats() any { return fooStats{X: foo.x, Y: foo.y} } -func TestFTDCSchemaGenerations(t *testing.T) { +type mockStatser struct { + stats any +} + +func (ms *mockStatser) Stats() any { + return ms.stats +} + +// TestCopeWithChangingSchema asserts that FTDC copes with schema's that change. Originally FTDC was +// designed such that an explicit call to `ftdc.Add` was required to allow for a schema change. But +// it became clear over time that we had to allow for deviations. Such as how network stats +// reporting the list of network interfaces can change. +func TestCopeWithChangingSchema(t *testing.T) { logger := logging.NewTestLogger(t) // ftdcData will be appended to on each call to `writeDatum`. At the end of the test we can pass @@ -39,91 +50,82 @@ func TestFTDCSchemaGenerations(t *testing.T) { ftdcData := bytes.NewBuffer(nil) ftdc := NewWithWriter(ftdcData, logger.Sublogger("ftdc")) - // `foo` implements `Statser`. - foo1 := &foo{} + // `mockStatser` returns whatever we ask it to. Such that we can change the schema without a + // call to `ftdc.Add`. + statser := mockStatser{ + stats: struct { + X int + }{5}, + } - // Generations are a way of keeping track of when schema's change. - preAddGenerationID := ftdc.inputGenerationID - // In the initial and steady states, the input and output generations are equal. - test.That(t, ftdc.outputGenerationID, test.ShouldEqual, preAddGenerationID) + ftdc.Add("mock", &statser) + datum := ftdc.constructDatum() + err := ftdc.writeDatum(datum) + test.That(t, err, test.ShouldBeNil) - // Calling `Add` changes the schema. The `inputGenerationID` is incremented (to "1") to denote - // this. - ftdc.Add("foo1", foo1) - test.That(t, ftdc.inputGenerationID, test.ShouldEqual, preAddGenerationID+1) - // The `outputGenerationID` is still at "0". - test.That(t, ftdc.outputGenerationID, test.ShouldEqual, preAddGenerationID) + statser.stats = struct { + X int + Y float64 + }{3, 4.0} + datum = ftdc.constructDatum() + err = ftdc.writeDatum(datum) + test.That(t, err, test.ShouldBeNil) - // Constructing a datum will: - // - Have data for the `foo1` Statser. - // - Be stamped with the `inputGenerationID` of 1. - datum := ftdc.constructDatum() - test.That(t, datum.generationID, test.ShouldEqual, ftdc.inputGenerationID) - test.That(t, ftdc.outputGenerationID, test.ShouldEqual, preAddGenerationID) + datums, err := Parse(ftdcData) + test.That(t, err, test.ShouldBeNil) + test.That(t, len(datums), test.ShouldEqual, 2) + + test.That(t, datums[0].asDatum().Data["mock"], test.ShouldResemble, map[string]float32{"X": 5}) + test.That(t, datums[1].asDatum().Data["mock"], test.ShouldResemble, map[string]float32{"X": 3, "Y": 4}) +} + +// TestCopeWithSubtleSchemaChange is similar to TestCopeWithChangingSchema except that it keeps the +// number of flattened fields the same. Only the field names changed. +func TestCopeWithSubtleSchemaChange(t *testing.T) { + logger := logging.NewTestLogger(t) + + // ftdcData will be appended to on each call to `writeDatum`. At the end of the test we can pass + // this to `parse` to assert we have the expected results. + ftdcData := bytes.NewBuffer(nil) + ftdc := NewWithWriter(ftdcData, logger.Sublogger("ftdc")) + + // `mockStatser` returns whatever we ask it to. Such that we can change the schema without a + // call to `ftdc.Add`. + statser := mockStatser{ + stats: struct { + X int + }{5}, + } - // writeDatum will serialize the datum in the "custom format". Part of the bytes written will be - // the new schema at generation "1". The `outputGenerationID` will be updated to reflect that - // "1" is the "current schema". + ftdc.Add("mock", &statser) + datum := ftdc.constructDatum() err := ftdc.writeDatum(datum) test.That(t, err, test.ShouldBeNil) - test.That(t, ftdc.outputGenerationID, test.ShouldEqual, ftdc.inputGenerationID) - - // We are going to add another `Statser`. Assert that the `inputGenerationID` gets incremented after calling `Add`. - foo2 := &foo{} - preAddGenerationID = ftdc.inputGenerationID - ftdc.Add("foo2", foo2) - test.That(t, ftdc.inputGenerationID, test.ShouldEqual, preAddGenerationID+1) - - // Updating the values on the `foo` objects changes the output of the "stats" object they return - // as part of `constructDatum`. - foo1.x = 1 - foo2.x = 2 - // Constructing a datum will (again) copy the `inputGenerationID` as its own `generationID`. The - // `outputGenerationID` has not been changed yet and still matches the value prior* to adding - // `foo2`. - datum = ftdc.constructDatum() - test.That(t, datum.generationID, test.ShouldEqual, ftdc.inputGenerationID) - test.That(t, ftdc.outputGenerationID, test.ShouldEqual, preAddGenerationID) - // Writing the second datum updates the `outputGenerationID` and we are again in the steady - // state. + statser.stats = struct { + Y int + }{3} + datum = ftdc.constructDatum() err = ftdc.writeDatum(datum) test.That(t, err, test.ShouldBeNil) - test.That(t, ftdc.outputGenerationID, test.ShouldEqual, ftdc.inputGenerationID) - // Go back and parse the written data. There two be two datum objects due to two calls to - // `writeDatum`. datums, err := Parse(ftdcData) test.That(t, err, test.ShouldBeNil) test.That(t, len(datums), test.ShouldEqual, 2) - // Assert the datum1.Time <= datum2.Time. Don't trust consecutive clock reads to get an updated - // value. - datum1, datum2 := datums[0].asDatum(), datums[1].asDatum() - test.That(t, datum1.Time, test.ShouldBeLessThanOrEqualTo, datum2.Time) - - // Assert the contents of the datum. While we seemingly only assert on the "values" of X and Y - // for `foo1` and `foo2`, we're indirectly depending on the schema information being - // persisted/parsed correctly. - test.That(t, len(datum1.Data), test.ShouldEqual, 1) - // When we support nesting of structures, the `float32` type on these map assertions will be - // wrong. It will become `any`s. - test.That(t, datum1.Data["foo1"], test.ShouldResemble, map[string]float32{"X": 0, "Y": 0}) - - // Before writing the second datum, we changed the values of `X` on `foo1` and `foo2`. - test.That(t, len(datum2.Data), test.ShouldEqual, 2) - test.That(t, datum2.Data["foo1"], test.ShouldResemble, map[string]float32{"X": 1, "Y": 0}) - test.That(t, datum2.Data["foo2"], test.ShouldResemble, map[string]float32{"X": 2, "Y": 0}) + test.That(t, datums[0].asDatum().Data["mock"], test.ShouldResemble, map[string]float32{"X": 5}) + test.That(t, datums[1].asDatum().Data["mock"], test.ShouldResemble, map[string]float32{"Y": 3}) } -type badStatser struct{} +type mapStatser struct { + KeyName string +} -func (badStatser badStatser) Stats() any { - // Returning maps are disallowed. - return map[string]float32{"X": 42} +func (mapStatser *mapStatser) Stats() any { + return map[string]float32{mapStatser.KeyName: 42} } -func TestRemoveBadStatser(t *testing.T) { +func TestMapStatser(t *testing.T) { logger := logging.NewTestLogger(t) // ftdcData will be appended to on each call to `writeDatum`. At the end of the test we can pass @@ -135,32 +137,23 @@ func TestRemoveBadStatser(t *testing.T) { foo1 := &foo{x: 1, y: 2} ftdc.Add("foo1", foo1) - // `badStatser` implements `Statser`, but returns a map instead of a struct. This will fail at - // `writeDatum`. - ftdc.Add("badStatser", badStatser{}) + mapStatser1 := mapStatser{"A"} + // `mapStatser` implements `Statser`, but returns a map instead of a struct. + ftdc.Add("mapStatser", &mapStatser1) - // constructDatum should succeed as it does not perform validation. datum := ftdc.constructDatum() test.That(t, len(datum.Data), test.ShouldEqual, 2) test.That(t, datum.Data["foo1"], test.ShouldNotBeNil) - test.That(t, datum.Data["badStatser"], test.ShouldNotBeNil) + test.That(t, datum.Data["mapStatser"], test.ShouldResemble, map[string]float32{"A": 42}) - // writeDatum will discover the map and error out. err := ftdc.writeDatum(datum) - test.That(t, err, test.ShouldNotBeNil) - - // We can additionally verify the error is a schemaError that identifies the statser that - // misbehaved. - var schemaError *schemaError - test.That(t, errors.As(err, &schemaError), test.ShouldBeTrue) - test.That(t, schemaError.statserName, test.ShouldEqual, "badStatser") + test.That(t, err, test.ShouldBeNil) - // The `writeDatum` error should auto-remove `badStatser`. Verify only `foo1` is returned on a - // following call to `constructDatum`. + mapStatser1.KeyName = "B" datum = ftdc.constructDatum() - test.That(t, len(datum.Data), test.ShouldEqual, 1) + test.That(t, len(datum.Data), test.ShouldEqual, 2) test.That(t, datum.Data["foo1"], test.ShouldNotBeNil) - test.That(t, datum.Data["badStatser"], test.ShouldBeNil) + test.That(t, datum.Data["mapStatser"], test.ShouldResemble, map[string]float32{"B": 42}) // This time writing the datum works. err = ftdc.writeDatum(datum) @@ -170,9 +163,11 @@ func TestRemoveBadStatser(t *testing.T) { datums, err := ParseWithLogger(ftdcData, logger) test.That(t, err, test.ShouldBeNil) - // We called `writeDatum` twice, but only the second succeeded. - test.That(t, len(datums), test.ShouldEqual, 1) + test.That(t, len(datums), test.ShouldEqual, 2) test.That(t, datums[0].asDatum().Data["foo1"], test.ShouldResemble, map[string]float32{"X": 1, "Y": 2}) + test.That(t, datums[0].asDatum().Data["mapStatser"], test.ShouldResemble, map[string]float32{"A": 42}) + test.That(t, datums[1].asDatum().Data["foo1"], test.ShouldResemble, map[string]float32{"X": 1, "Y": 2}) + test.That(t, datums[1].asDatum().Data["mapStatser"], test.ShouldResemble, map[string]float32{"B": 42}) } type nestedStatser struct { @@ -221,11 +216,9 @@ func TestNestedStructs(t *testing.T) { test.That(t, len(datum.Data), test.ShouldEqual, 1) test.That(t, datum.Data["nested"], test.ShouldNotBeNil) - schema, schemaErr := getSchema(datum.Data) - test.That(t, schemaErr, test.ShouldBeNil) - flattened, err := flatten(datum, schema) + _, values, err := walk(datum.Data, nil) test.That(t, err, test.ShouldBeNil) - test.That(t, flattened, test.ShouldResemble, []float32{1, 2}) + test.That(t, values, test.ShouldResemble, []float32{1, 2}) err = ftdc.writeDatum(datum) test.That(t, err, test.ShouldBeNil) @@ -245,52 +238,6 @@ func TestNestedStructs(t *testing.T) { }) } -// TestStatsWriterContinuesOnSchemaError asserts that "schema errors" are handled by removing the -// violating statser, but otherwise FTDC keeps going. -func TestStatsWriterContinuesOnSchemaError(t *testing.T) { - logger := logging.NewTestLogger(t) - - // ftdcData will be appended to on each call to `writeDatum`. At the end of the test we can pass - // this to `parse` to assert we have the expected results. - ftdcData := bytes.NewBuffer(nil) - ftdc := NewWithWriter(ftdcData, logger.Sublogger("ftdc")) - - // `badStatser` implements `Statser` but returns a `map` which is disallowed. - badStatser := &badStatser{} - ftdc.Add("badStatser", badStatser) - - // Construct a datum with a `badStatser` reading that contains a map. - datum := ftdc.constructDatum() - test.That(t, len(datum.Data), test.ShouldEqual, 1) - test.That(t, datum.Data["badStatser"], test.ShouldNotBeNil) - - // Start the `statsWriter` and manually push it the bad `datum`. - go ftdc.statsWriter() - ftdc.datumCh <- datum - testutils.WaitForAssertion(t, func(tb testing.TB) { - ftdc.mu.Lock() - defer ftdc.mu.Unlock() - - // Assert that the `statsWriter`, via calls to `writeDatum` identify the bad `map` value and - // remove the `badStatser`. - test.That(tb, len(ftdc.statsers), test.ShouldEqual, 0) - }) - - // Assert that `statsWriter` is still operating by waiting for 1 second. - select { - case <-ftdc.outputWorkerDone: - t.Fatalf("A bad statser caused FTDC to abort") - case <-time.After(time.Second): - break - } - - // Closing the `datumCh` will cause the `statsWriter` to exit as it no longer can get input. - close(ftdc.datumCh) - - // Wait for the `statsWriter` goroutine to exit. - <-ftdc.outputWorkerDone -} - func TestCountingBytes(t *testing.T) { logger := logging.NewTestLogger(t)