From 72a9e96be6834c59da1902f42203c99be1ad7c90 Mon Sep 17 00:00:00 2001 From: Samir Ketema Date: Fri, 19 Jul 2024 19:02:48 -0700 Subject: [PATCH] initial progress --- destination/format/csv.go | 188 +++++++++++++++++++++-------------- destination/writer/writer.go | 33 +++--- 2 files changed, 123 insertions(+), 98 deletions(-) diff --git a/destination/format/csv.go b/destination/format/csv.go index 882373a..5609a03 100644 --- a/destination/format/csv.go +++ b/destination/format/csv.go @@ -17,6 +17,7 @@ package format import ( "bytes" "context" + "crypto/sha256" "encoding/csv" "encoding/json" "fmt" @@ -27,6 +28,7 @@ import ( sdk "github.com/conduitio/conduit-connector-sdk" "github.com/go-errors/errors" "github.com/hamba/avro/v2" + "golang.org/x/exp/maps" ) // TODO: just create the table with the types on the left to make this simpler. @@ -131,6 +133,13 @@ type ConnectorColumns struct { DeletedAtColumn string } +type SchemaRecords struct { + Schema map[string]string + Records []*sdk.Record + ConnColumns ConnectorColumns + CsvColumnOrder []string +} + type AvroRecordSchema struct { Name string `json:"name"` Type string `json:"type"` @@ -140,102 +149,117 @@ type AvroRecordSchema struct { } `json:"fields"` } -func GetDataSchema( +func GetDataSchemas( ctx context.Context, records []sdk.Record, - schema map[string]string, prefix string, -) ([]string, *ConnectorColumns, error) { - // we need to store the operation in a column, to detect updates & deletes - connectorColumns := ConnectorColumns{ - OperationColumn: fmt.Sprintf("%s_operation", prefix), - CreatedAtColumn: fmt.Sprintf("%s_created_at", prefix), - UpdatedAtColumn: fmt.Sprintf("%s_updated_at", prefix), - DeletedAtColumn: fmt.Sprintf("%s_deleted_at", prefix), +) ([]SchemaRecords, error) { + if len(records) == 0 { + return nil, errors.New("unexpected empty slice of records") } - schema[connectorColumns.OperationColumn] = SnowflakeVarchar - schema[connectorColumns.CreatedAtColumn] = SnowflakeTimestampTZ - schema[connectorColumns.UpdatedAtColumn] = SnowflakeTimestampTZ - schema[connectorColumns.DeletedAtColumn] = SnowflakeTimestampTZ + schemaCache := map[string]SchemaRecords{} - csvColumnOrder := []string{} + for _, r := range records { + // we need to store the operation in a column, to detect updates & deletes + connectorColumns := ConnectorColumns{ + OperationColumn: fmt.Sprintf("%s_operation", prefix), + CreatedAtColumn: fmt.Sprintf("%s_created_at", prefix), + UpdatedAtColumn: fmt.Sprintf("%s_updated_at", prefix), + DeletedAtColumn: fmt.Sprintf("%s_deleted_at", prefix), + } - // TODO: see whether we need to support a compound key here - // TODO: what if the key field changes? e.g. from `id` to `name`? we need to think about this + schema := map[string]string{} - // Grab the schema from the first record. - // TODO: support schema evolution. - if len(records) == 0 { - return nil, nil, errors.New("unexpected empty slice of records") - } + schema[connectorColumns.OperationColumn] = SnowflakeVarchar + schema[connectorColumns.CreatedAtColumn] = SnowflakeTimestampTZ + schema[connectorColumns.UpdatedAtColumn] = SnowflakeTimestampTZ + schema[connectorColumns.DeletedAtColumn] = SnowflakeTimestampTZ - r := records[0] - data, err := extractPayload(r.Operation, r.Payload) - if err != nil { - return nil, nil, errors.Errorf("failed to extract payload data: %w", err) - } + csvColumnOrder := []string{} + + // TODO: see whether we need to support a compound key here + // TODO: what if the key field changes? e.g. from `id` to `name`? we need to think about this - avroStr, okAvro := r.Metadata["postgres.avro.schema"] - // if we have an avro schema in the metadata, interpret the schema from it - if okAvro { - sdk.Logger(ctx).Debug().Msgf("avro schema string: %s", avroStr) - avroSchema, err := avro.Parse(avroStr) + data, err := extractPayload(r.Operation, r.Payload) if err != nil { - return nil, nil, errors.Errorf("could not parse avro schema: %w", err) + return nil, errors.Errorf("failed to extract payload data: %w", err) } - avroRecordSchema, ok := avroSchema.(*avro.RecordSchema) - if !ok { - return nil, nil, errors.New("could not coerce avro schema into recordSchema") - } - for _, field := range avroRecordSchema.Fields() { - csvColumnOrder = append(csvColumnOrder, field.Name()) - schema[field.Name()], err = mapAvroToSnowflake(ctx, field) + + avroStr, okAvro := r.Metadata["postgres.avro.schema"] + // if we have an avro schema in the metadata, interpret the schema from it + if okAvro { + sdk.Logger(ctx).Debug().Msgf("avro schema string: %s", avroStr) + avroSchema, err := avro.Parse(avroStr) if err != nil { - return nil, nil, fmt.Errorf("failed to map avro field %s: %w", field.Name(), err) + return nil, errors.Errorf("could not parse avro schema: %w", err) } - } - } else { - // TODO (BEFORE MERGE): move to function - for key, val := range data { - if schema[key] == "" { - csvColumnOrder = append(csvColumnOrder, key) - switch val.(type) { - case int, int8, int16, int32, int64: - schema[key] = SnowflakeInteger - case float32, float64: - schema[key] = SnowflakeFloat - case bool: - schema[key] = SnowflakeBoolean - case time.Time, *time.Time: - schema[key] = SnowflakeTimestampTZ - case nil: - // WE SHOULD KEEP TRACK OF VARIANTS SEPERATELY IN CASE WE RUN INTO CONCRETE TYPE LATER ON - // IF WE RAN INTO NONE NULL VALUE OF THIS VARIANT COL, WE CAN EXECUTE AN ALTER TO DEST TABLE - schema[key] = SnowflakeVariant - default: - schema[key] = SnowflakeVarchar + avroRecordSchema, ok := avroSchema.(*avro.RecordSchema) + if !ok { + return nil, errors.New("could not coerce avro schema into recordSchema") + } + for _, field := range avroRecordSchema.Fields() { + csvColumnOrder = append(csvColumnOrder, field.Name()) + schema[field.Name()], err = mapAvroToSnowflake(ctx, field) + if err != nil { + return nil, fmt.Errorf("failed to map avro field %s: %w", field.Name(), err) + } + } + } else { + // TODO (BEFORE MERGE): move to function + for key, val := range data { + if schema[key] == "" { + csvColumnOrder = append(csvColumnOrder, key) + switch val.(type) { + case int, int8, int16, int32, int64: + schema[key] = SnowflakeInteger + case float32, float64: + schema[key] = SnowflakeFloat + case bool: + schema[key] = SnowflakeBoolean + case time.Time, *time.Time: + schema[key] = SnowflakeTimestampTZ + case nil: + // WE SHOULD KEEP TRACK OF VARIANTS SEPERATELY IN CASE WE RUN INTO CONCRETE TYPE LATER ON + // IF WE RAN INTO NONE NULL VALUE OF THIS VARIANT COL, WE CAN EXECUTE AN ALTER TO DEST TABLE + schema[key] = SnowflakeVariant + default: + schema[key] = SnowflakeVarchar + } } } } + + // sort data column order alphabetically to make deterministic + // but keep conduit connector columns at the front for ease of use + sort.Strings(csvColumnOrder) + csvColumnOrder = append( + []string{ + connectorColumns.OperationColumn, + connectorColumns.CreatedAtColumn, + connectorColumns.UpdatedAtColumn, + connectorColumns.DeletedAtColumn, + }, + csvColumnOrder..., + ) + + // if we have detected this schema before, simply add it to the set + hash := schemaHash(schema) + if sr, ok := schemaCache[hash]; ok { + sr.Records = append(sr.Records, &r) + } else { + schemaCache[hash] = SchemaRecords{ + Schema: schema, + Records: []*sdk.Record{&r}, + CsvColumnOrder: csvColumnOrder, + ConnColumns: connectorColumns, + } + } + + sdk.Logger(ctx).Debug().Msgf("schema detected: %+v", schema) } - // sort data column order alphabetically to make deterministic - // but keep conduit connector columns at the front for ease of use - sort.Strings(csvColumnOrder) - csvColumnOrder = append( - []string{ - connectorColumns.OperationColumn, - connectorColumns.CreatedAtColumn, - connectorColumns.UpdatedAtColumn, - connectorColumns.DeletedAtColumn, - }, - csvColumnOrder..., - ) - - sdk.Logger(ctx).Debug().Msgf("schema detected: %+v", schema) - - return csvColumnOrder, &connectorColumns, nil + return maps.Values(schemaCache), nil } // TODO: refactor this function, make it more modular and readable. @@ -558,3 +582,13 @@ func mapAvroToSnowflake(ctx context.Context, field *avro.Field) (string, error) return "", fmt.Errorf("could not find snowflake mapping for avro type %s", field.Name()) } + +func schemaHash(s map[string]string) string { + hasher := sha256.New() + + for key, value := range s { + fmt.Fprintf(hasher, "%s:%s,", k, v) + } + + return fmt.Sprintf("%x", hasher.Sum(nil)) +} diff --git a/destination/writer/writer.go b/destination/writer/writer.go index d832922..97a5d3a 100644 --- a/destination/writer/writer.go +++ b/destination/writer/writer.go @@ -163,10 +163,18 @@ func (s *SnowflakeCSV) Write(ctx context.Context, records []sdk.Record) (int, er // assign request id to the write cycle ctx = withRequestID(ctx) - // if s.schema == nil { - // if err := s.initSchema(ctx, records); err != nil { - // return 0, errors.Errorf("failed to initialize schema from records: %w", err) - // } + // extract schemas from record + schemaRecords, err := format.GetDataSchemas(ctx, records, s.Prefix) + if err != nil { + return 0, errors.Errorf("failed to convert records to CSV: %w", err) + } + + // Samir Ketema: + // now that we have grouped schemas, we could make the calls out to the evolver.Migrate() below, + // but we need to be careful about _when_ we do this. + // in hindsight, I could have also detected some ordering in these records, to help with getting the initial batch in before the change. + // however, we need to be careful about making sure this is all transactional as far as snowflake is concerned.. + // and I'm not too sure about whether that's possible in Snowflake / the implications. // // N.B. Disable until table is created by the migrator // // @@ -175,23 +183,6 @@ func (s *SnowflakeCSV) Write(ctx context.Context, records []sdk.Record) (int, er // // return 0, errors.Errorf("failed to evolve schema during boot: %w", err) // // } - // sdk.Logger(ctx).Debug(). - // // Bool("success", migrated). - // Msg("schema initialized and migration completed") - // } - - // log first record temporarily for debugging - sdk.Logger(ctx).Debug().Msgf("payload=%+v", records[0].Payload) - sdk.Logger(ctx).Debug().Msgf("payload.before=%+v", records[0].Payload.Before) - sdk.Logger(ctx).Debug().Msgf("payload.after=%+v", records[0].Payload.After) - sdk.Logger(ctx).Debug().Msgf("key=%+v", records[0].Key) - // extract schema from payload - schema := make(map[string]string) - csvColumnOrder, meroxaColumns, err := format.GetDataSchema(ctx, records, schema, s.Prefix) - if err != nil { - return 0, errors.Errorf("failed to convert records to CSV: %w", err) - } - // check if table already exists on snowflake, if yes, compare schema err = s.CheckTable(ctx, records[0].Operation, s.PrimaryKey, schema) if err != nil {