Skip to content

Commit

Permalink
add pkg/pipeline/transform
Browse files Browse the repository at this point in the history
Signed-off-by: hmoazzem <[email protected]>
  • Loading branch information
hmoazzem committed Jan 4, 2025
1 parent 613f9c7 commit 81d91d4
Show file tree
Hide file tree
Showing 8 changed files with 432 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.10.0
github.com/tidwall/gjson v1.18.0
github.com/xdg-go/scram v1.1.2
github.com/zitadel/oidc/v3 v3.33.1
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -71,6 +72,8 @@ require (
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/zitadel/logging v0.6.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,13 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo=
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs=
github.com/uptrace/bun v1.1.12 h1:sOjDVHxNTuM6dNGaba0wUuz7KvDE1BmNu9Gqs2gJSXQ=
Expand Down
188 changes: 188 additions & 0 deletions internal/testutil/sample.cdc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "PostgreSQL_server.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "PostgreSQL_server.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "int64",
"optional": false,
"field": "ts_ns"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "PostgreSQL_server.inventory.customers.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "[email protected]"
},
"source": {
"version": "3.0.5.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"ts_us": 1559033904863123,
"ts_ns": 1559033904863123000,
"snapshot": true,
"db": "postgres",
"sequence": "[\"24023119\",\"24023128\"]",
"schema": "public",
"table": "customers",
"txId": 555,
"lsn": 24023128,
"xmin": null
},
"op": "c",
"ts_ms": 1559033904863,
"ts_us": 1559033904863841,
"ts_ns": 1559033904863841257
}
}
31 changes: 31 additions & 0 deletions internal/testutil/testdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package testutil

import (
"encoding/json"
"os"
"path/filepath"
"runtime"

"github.com/edgeflare/pgo/pkg/pglogrepl"
)

// LoadCDC returns a CDC object in Debezium format
func LoadCDC() (pglogrepl.CDC, error) {
var cdc pglogrepl.CDC

// Get the directory containing this file
_, currentFile, _, _ := runtime.Caller(0)
dir := filepath.Dir(currentFile)

data, err := os.ReadFile(filepath.Join(dir, "sample.cdc.json"))
if err != nil {
return cdc, err
}

err = json.Unmarshal(data, &cdc)
if err != nil {
return cdc, err
}

return cdc, nil
}
4 changes: 4 additions & 0 deletions pkg/pipeline/transform/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Package transform provides utilities for applying transformations to change data capture (CDC) events in pipelines.
// It's inspired by Debezium's [Single Message Transformations (SMTs)](https://docs.confluent.io/platform/current/connect/transforms/overview.html) usage.

package transform
75 changes: 75 additions & 0 deletions pkg/pipeline/transform/extract.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package transform

import (
"fmt"

"github.com/edgeflare/pgo/pkg/pglogrepl"
"github.com/tidwall/gjson"
)

// ExtractConfig holds the configuration for the extract transformation
type ExtractConfig struct {
Fields []string `json:"fields"`
}

// Validate validates the ExtractConfig
func (c *ExtractConfig) Validate() error {
if len(c.Fields) == 0 {
return fmt.Errorf("at least one field is required")
}
return nil
}

// Type returns the type of the transformation
func (c *ExtractConfig) Type() string {
return "extract"
}

// Extract creates a TransformFunc that extracts specified fields from the CDC event
func Extract(config *ExtractConfig) TransformFunc {
return func(cdc *pglogrepl.CDC) (*pglogrepl.CDC, error) {
if err := config.Validate(); err != nil {
return cdc, fmt.Errorf("invalid extract configuration: %w", err)
}
current := cdc
fields := config.Fields
newBefore := make(map[string]interface{})
newAfter := make(map[string]interface{})

if before, ok := current.Payload.Before.(map[string]interface{}); ok {
for _, field := range fields {
if value, exists := before[field]; exists {
newBefore[field] = value
}
}
}
if after, ok := current.Payload.After.(map[string]interface{}); ok {
for _, field := range fields {
if value, exists := after[field]; exists {
newAfter[field] = value
}
}
}

current.Payload.Before = newBefore
current.Payload.After = newAfter

return current, nil
}
}

// extractValueFromMap extracts a value from a map using a dot-notated field
func extractValueFromMap(data map[string]interface{}, field string) (string, bool) {
// Try direct access to the map
if value, exists := data[field]; exists {
return fmt.Sprintf("%v", value), true
}

// Fallback to gjson for dot-notated nested fields
result := gjson.Get(fmt.Sprintf("%v", data), field)
if result.Exists() {
return result.String(), true
}

return "", false
}
Loading

0 comments on commit 81d91d4

Please sign in to comment.