diff --git a/pkg/core/config.go b/pkg/core/config.go index dc3c2c0..37cc458 100644 --- a/pkg/core/config.go +++ b/pkg/core/config.go @@ -1,13 +1,18 @@ package core -import "log/slog" +import ( + "log/slog" + + "github.com/farbodahm/streame/pkg/state_store" +) // Option implements the Functional Option pattern for StreamDataFrame type Option func(*Config) // Config is the configuration options for StreamDataFrame type Config struct { - LogLevel slog.Level + LogLevel slog.Level + StateStore state_store.StateStore } // WithLogLevel sets the log level for StreamDataFrame @@ -16,3 +21,12 @@ func WithLogLevel(level slog.Level) Option { c.LogLevel = level } } + +// WithStateStore sets the state store for StreamDataFrame. +// If not set, the default in-memory state store will be used +// which is not recommended for production use +func WithStateStore(ss state_store.StateStore) Option { + return func(c *Config) { + c.StateStore = ss + } +} diff --git a/pkg/core/config_test.go b/pkg/core/config_test.go index 10a2c83..764b2e3 100644 --- a/pkg/core/config_test.go +++ b/pkg/core/config_test.go @@ -2,9 +2,12 @@ package core_test import ( "log/slog" + "os" "testing" + "github.com/cockroachdb/pebble" . "github.com/farbodahm/streame/pkg/core" + "github.com/farbodahm/streame/pkg/state_store" "github.com/farbodahm/streame/pkg/types" "github.com/stretchr/testify/assert" ) @@ -12,11 +15,9 @@ import ( func TestStreamDataFrame_ConfigDefaultValues_DefaultValuesAssignedCorrectly(t *testing.T) { sdf := NewStreamDataFrame(nil, nil, nil, types.Schema{}, "test-stream") - default_config := Config{ - LogLevel: slog.LevelInfo, - } - - assert.Equal(t, default_config, *sdf.Configs) + assert.Equal(t, sdf.Configs.LogLevel, slog.LevelInfo) + _, in_memory_ss := sdf.Configs.StateStore.(*state_store.InMemorySS) + assert.True(t, in_memory_ss) } func TestStreamDataFrame_ConfigWithLogLevel_LogLevelAssignedCorrectly(t *testing.T) { @@ -26,3 +27,14 @@ func TestStreamDataFrame_ConfigWithLogLevel_LogLevelAssignedCorrectly(t *testing assert.Equal(t, slog.LevelError, sdf.Configs.LogLevel) } + +func TestStreamDataFrame_WithStateStore_StateStoreAssignedCorrectly(t *testing.T) { + warehouse_path := "./test-path" + defer os.RemoveAll(warehouse_path) + ss, _ := state_store.NewPebbleStateStore(warehouse_path, &pebble.Options{}) + sdf := NewStreamDataFrame(nil, nil, nil, types.Schema{}, "test-stream", + WithStateStore(ss), + ) + + assert.Equal(t, ss, sdf.Configs.StateStore) +} diff --git a/pkg/core/dataframe.go b/pkg/core/dataframe.go index 8bcc393..d480cfa 100644 --- a/pkg/core/dataframe.go +++ b/pkg/core/dataframe.go @@ -14,7 +14,7 @@ type DataFrame interface { Select(columns ...string) DataFrame AddStaticColumn(name string, value types.ColumnValue) DataFrame Rename(old_name string, new_name string) DataFrame - Join(other DataFrame, how join.JoinType, on join.JoinCondition) DataFrame + Join(other *StreamDataFrame, how join.JoinType, on join.JoinCondition, mode join.JoinMode) DataFrame Execute(ctx context.Context) error GetSchema() types.Schema } diff --git a/pkg/core/stream_dataframe.go b/pkg/core/stream_dataframe.go index add8842..02cda8e 100644 --- a/pkg/core/stream_dataframe.go +++ b/pkg/core/stream_dataframe.go @@ -4,9 +4,11 @@ import ( "context" "errors" "log/slog" + "strings" "github.com/farbodahm/streame/pkg/functions" "github.com/farbodahm/streame/pkg/functions/join" + "github.com/farbodahm/streame/pkg/state_store" "github.com/farbodahm/streame/pkg/types" "github.com/farbodahm/streame/pkg/utils" "github.com/google/uuid" @@ -24,6 +26,15 @@ type StreamDataFrame struct { Stages []Stage Schema types.Schema Configs *Config + + stateStore state_store.StateStore + // previousExecutors holds all of the SDFs which current SDF is relying on. + // Currently only `Join` operation requires this structure so that it can first run + // all of the previous SDFs before running itself. + previousExecutors []*StreamDataFrame + + // TODO: Refactor for a better way for storing runtime configs + runtimeConfig map[string]any } // NewStreamDataFrame creates a new StreamDataFrame with the given options @@ -37,13 +48,20 @@ func NewStreamDataFrame( ) StreamDataFrame { // Create config with default values config := Config{ - LogLevel: slog.LevelInfo, + LogLevel: slog.LevelInfo, + StateStore: state_store.NewInMemorySS(), } // Functional Option pattern for _, option := range options { option(&config) } + utils.InitLogger(config.LogLevel) + + if _, ok := config.StateStore.(*state_store.InMemorySS); ok { + utils.Logger.Warn("Using in-memory state store. This is not suitable for production use.") + } + sdf := StreamDataFrame{ SourceStream: sourceStream, OutputStream: outputStream, @@ -52,10 +70,18 @@ func NewStreamDataFrame( Stages: []Stage{}, Schema: schema, Configs: &config, + + runtimeConfig: make(map[string]any), + stateStore: config.StateStore, + previousExecutors: []*StreamDataFrame{}, + } + + // Only source streams need to have schema validation. When a SDF + // is created by joining 2 other streams, it doesn't need any schema validation stage. + if !strings.HasSuffix(streamName, join.JoinedStreamSuffix) { + sdf.validateSchema() } - sdf.validateSchema() - utils.InitLogger(config.LogLevel) return sdf } @@ -85,8 +111,45 @@ func (sdf *StreamDataFrame) Select(columns ...string) DataFrame { } // Join joins the DataFrame with another DataFrame based on the given join type and condition -func (sdf *StreamDataFrame) Join(other DataFrame, how join.JoinType, on join.JoinCondition) DataFrame { - panic("Not Implemented") +func (sdf *StreamDataFrame) Join(other *StreamDataFrame, how join.JoinType, on join.JoinCondition, mode join.JoinMode) DataFrame { + // Validate join condition + err := join.ValidateJoinCondition(sdf.Schema, other.Schema, on) + if err != nil { + panic(err) + } + + // Merge schemas + new_schema, err := join.MergeSchema(sdf.Schema, other.GetSchema()) + if err != nil { + panic(err) + } + + // Fan-In pattern to join 2 streams into 1 stream + merged_sources := utils.MergeChannels(sdf.OutputStream, other.OutputStream) + merged_errors := utils.MergeChannels(sdf.ErrorStream, other.ErrorStream) + + out := make(chan (types.Record)) + new_sdf := NewStreamDataFrame( + merged_sources, + out, + merged_errors, + new_schema, + sdf.Name+"-"+other.Name+join.JoinedStreamSuffix, + ) + // TODO: Decide on configs + new_sdf.Configs = sdf.Configs + + new_sdf.runtimeConfig[sdf.Name] = join.Stream + new_sdf.runtimeConfig[other.Name] = join.Table + + executor := func(ctx context.Context, record types.Record) ([]types.Record, error) { + record_type := new_sdf.runtimeConfig[record.Metadata.Stream].(join.RecordType) + return join.InnerJoinStreamTable(new_sdf.stateStore, record_type, record, on), nil + } + + new_sdf.previousExecutors = append(new_sdf.previousExecutors, sdf, other) + new_sdf.addToStages(executor) + return &new_sdf } // Filter applies filter function to each record of the DataFrame @@ -206,11 +269,17 @@ func (sdf *StreamDataFrame) addToStages(executor StageExecutor) { // It simply runs all of the stages. // It's a blocking call and returns when the context is cancelled or panics when an error occurs. func (sdf *StreamDataFrame) Execute(ctx context.Context) error { - utils.Logger.Info("Executing processor with", "len(stages)", len(sdf.Stages)) + utils.Logger.Info("Executing processor", "name", sdf.Name, "len(stages)", len(sdf.Stages)) if len(sdf.Stages) == 0 { return errors.New("no stages are created") } + // Execute previous SDFs which current SDF depends on first (if there are any) + for _, previous_sdf := range sdf.previousExecutors { + utils.Logger.Info("Executing previous SDF", "name", previous_sdf.Name) + go previous_sdf.Execute(ctx) + } + for _, stage := range sdf.Stages { go stage.Run(ctx) } @@ -220,7 +289,7 @@ func (sdf *StreamDataFrame) Execute(ctx context.Context) error { case err := <-sdf.ErrorStream: panic(err) case <-ctx.Done(): - utils.Logger.Info("Processor execution completed") + utils.Logger.Info("Processor execution completed", "name", sdf.Name) return nil // Exit the loop if the context is cancelled } } diff --git a/pkg/functions/join/join_test.go b/pkg/functions/join/join_test.go index 88132e5..040698d 100644 --- a/pkg/functions/join/join_test.go +++ b/pkg/functions/join/join_test.go @@ -1,8 +1,10 @@ package join_test import ( + "context" "testing" + "github.com/farbodahm/streame/pkg/core" "github.com/farbodahm/streame/pkg/functions/join" "github.com/farbodahm/streame/pkg/state_store" . "github.com/farbodahm/streame/pkg/types" @@ -135,3 +137,77 @@ func TestInnerJoinStreamTable_WithStreamRecord_JoinSuccessfully(t *testing.T) { err = ss.Close() assert.Nil(t, err) } + +// Integration tests +func TestJoin_SimpleStreamTableJoin_ShouldJoinStreamRecordToTableRecord(t *testing.T) { + // User Data + user_input := make(chan Record) + user_output := make(chan Record) + user_errors := make(chan error) + user_schema := Schema{ + Columns: Fields{ + "email": StringType, + "first_name": StringType, + "last_name": StringType, + }, + } + user_sdf := core.NewStreamDataFrame(user_input, user_output, user_errors, user_schema, "user-stream") + + // Order Data + order_input := make(chan Record) + orders_output := make(chan Record) + orders_errors := make(chan error) + orders_schema := Schema{ + Columns: Fields{ + "user_email": StringType, + "amount": IntType, + }, + } + orders_sdf := core.NewStreamDataFrame(order_input, orders_output, orders_errors, orders_schema, "orders-stream") + + // Logic to test + joined_sdf := orders_sdf.Join(&user_sdf, join.Inner, join.JoinCondition{LeftKey: "user_email", RightKey: "email"}, join.StreamTable).(*core.StreamDataFrame) + + go func() { + user_input <- Record{ + Key: "key1", + Data: ValueMap{ + "first_name": String{Val: "foo"}, + "last_name": String{Val: "bar"}, + "email": String{Val: "test@test.com"}, + }, + } + order_input <- Record{ + Key: "key2", + Data: ValueMap{ + "user_email": String{Val: "test@test.com"}, + "amount": Integer{Val: 100}, + }, + } + }() + + ctx, cancel := context.WithCancel(context.Background()) + go joined_sdf.Execute(ctx) + + result := <-joined_sdf.OutputStream + cancel() + // Assertions + expected_record := Record{ + Key: "key2-key1", + Data: ValueMap{ + "user_email": String{Val: "test@test.com"}, + "amount": Integer{Val: 100}, + "first_name": String{Val: "foo"}, + "last_name": String{Val: "bar"}, + "email": String{Val: "test@test.com"}, + }, + Metadata: Metadata{ + Stream: orders_sdf.Name + "-" + user_sdf.Name + join.JoinedStreamSuffix, + }, + } + assert.Equal(t, expected_record, result) + assert.Equal(t, 0, len(user_errors)) + assert.Equal(t, 0, len(orders_errors)) + assert.Equal(t, 0, len(joined_sdf.ErrorStream)) + assert.Equal(t, 0, len(joined_sdf.OutputStream)) +}