Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Initiate Join #29

Merged
merged 28 commits into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ac8e103
feat: add record to Protobuf struct converter
farbodahm Jun 11, 2024
cd813ae
tests: add scenario for converting non known value type
farbodahm Jun 12, 2024
433ff13
feat: add record data to protocol buffers serializer
farbodahm Jun 13, 2024
05fa7a8
feat: add proto struct to record data converter
farbodahm Jun 14, 2024
ae49c1f
refactor: rename RecordData to ValueMap
farbodahm Jun 15, 2024
237b219
feat: add protocol buffers to valuemap deserializer
farbodahm Jun 15, 2024
dfa25fe
feat: add state store interface
farbodahm Jun 24, 2024
9d0213b
feat: implement state store for pebble
farbodahm Jun 25, 2024
9898c82
fix: test failure for protobuf
farbodahm Jun 25, 2024
b7fcbd0
tests: add more tests for Pebble state store
farbodahm Jun 26, 2024
34fc11d
tests: add benchmark for Pebble
farbodahm Jun 29, 2024
35028e6
feat: add join interface
farbodahm Jul 6, 2024
a377ded
feat: add merge schema functionality
farbodahm Jul 9, 2024
4d113c3
feat: add stream name to record
farbodahm Aug 21, 2024
7530e7a
feat: add record join functionality
farbodahm Aug 24, 2024
1716362
feat: add MergeChannel utility function
farbodahm Aug 29, 2024
67c85cc
append joined suffix to the merge record metadata
farbodahm Aug 31, 2024
ff95815
feat: add record to Protobuf serializer
farbodahm Sep 1, 2024
8a279be
feat: add Protobuf to Record deserializer
farbodahm Sep 1, 2024
13eb677
refactor: Use RecordToProtobuf serializer for Pebble state store
farbodahm Sep 1, 2024
2289915
benchmark: add metadata value to benchmarks
farbodahm Sep 1, 2024
583ec7f
benchmark: add benchmark for protobuf serializing
farbodahm Sep 1, 2024
dcd117a
feat: add function for validating join condition
farbodahm Sep 2, 2024
fa9f382
feat: add InMemory state store for test scenarios
farbodahm Sep 3, 2024
771c859
feat: add stream-table inner join functionality
farbodahm Sep 3, 2024
d82fae1
feat: Add StreamTable Inner join functionality to the main SDF
farbodahm Sep 4, 2024
92aed70
tests: add more tests for join
farbodahm Sep 7, 2024
7264d1c
benchmark: add benchmark for StreamTable Inner Join
farbodahm Sep 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/add_static_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func heavy_static_column_stages(number_of_stages int, number_of_records int) {
output := make(chan Record)
errors := make(chan error)

sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(),
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(), "test-stream",
core.WithLogLevel(slog.LevelError))

// Create stages
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/filter_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func heavy_filter_stages(number_of_stages int, number_of_records int) {
output := make(chan Record)
errors := make(chan error)

sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(),
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(), "test-stream",
core.WithLogLevel(slog.LevelError))

// Create stages
Expand Down
70 changes: 70 additions & 0 deletions benchmarks/inner_join_stream_table_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package benchmarks

import (
"context"
"log/slog"
"sync"
"testing"

"github.com/farbodahm/streame/pkg/core"
"github.com/farbodahm/streame/pkg/functions/join"
. "github.com/farbodahm/streame/pkg/types"
"github.com/farbodahm/streame/pkg/utils"
"golang.org/x/exp/rand"
)

func inner_join_stream_table(number_of_records int) {
// Stream1
stream1_input := make(chan Record)
stream1_output := make(chan Record)
stream1_errors := make(chan error)
stream1_sdf := core.NewStreamDataFrame(stream1_input, stream1_output, stream1_errors, utils.HeavyRecordSchema(), "benchmark1", core.WithLogLevel(slog.LevelError))

// Stream2
stream2_input := make(chan Record)
stream2_output := make(chan Record)
stream2_errors := make(chan error)
stream2_sdf := core.NewStreamDataFrame(stream2_input, stream2_output, stream2_errors, utils.HeavyRecordSchemaV2(), "benchmark2 ", core.WithLogLevel(slog.LevelError))

// Logic to test
joined_sdf := stream2_sdf.Join(&stream1_sdf, join.Inner, join.JoinCondition{LeftKey: "field_21", RightKey: "field_1"}, join.StreamTable).(*core.StreamDataFrame)

// shared_keys is used to make sure all stream records will match to a table record
shared_keys := []string{}
// wg is used to make sure table is populated first
var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
for i := 0; i < number_of_records; i++ {
heavy_record := utils.NewHeavyRecord(100)
shared_keys = append(shared_keys, heavy_record.Data["field_1"].ToString())
stream1_input <- heavy_record
}
}()

go func() {
wg.Wait()
for i := 0; i < number_of_records; i++ {
heavy_record_v2 := utils.NewHeavyRecordV2(100)
heavy_record_v2.Data["field_21"] = String{Val: shared_keys[rand.Intn(len(shared_keys))]}
stream2_input <- heavy_record_v2
}
}()

ctx, cancel := context.WithCancel(context.Background())
go joined_sdf.Execute(ctx)

for i := 0; i < number_of_records; i++ {
<-joined_sdf.OutputStream
}
cancel()

}

func BenchmarkStreamTableInnerJoin(b *testing.B) {
for i := 0; i < b.N; i++ {
inner_join_stream_table(1000)
}
}
110 changes: 110 additions & 0 deletions benchmarks/pebble_state_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package benchmarks

import (
"os"
"testing"

"github.com/cockroachdb/pebble"
"github.com/farbodahm/streame/pkg/state_store"
"github.com/farbodahm/streame/pkg/utils"
)

var warehouse_path = "./test"

func write(number_of_repeats int, ss *state_store.PebbleStateStore) {

for i := 0; i < number_of_repeats; i++ {
record := utils.NewHeavyRecord(100)
err := ss.Set(record.Key, record)
if err != nil {
panic(err)
}
}

}

func read_with_cache(number_of_repeats int, key string, ss *state_store.PebbleStateStore) {
for i := 0; i < number_of_repeats; i++ {
_, err := ss.Get(key)
if err != nil {
panic(err)
}
}

}

func BenchmarkPebbleStateStore_Set(b *testing.B) {
defer os.RemoveAll(warehouse_path)
ss, err := state_store.NewPebbleStateStore(warehouse_path, &pebble.Options{})
if err != nil {
panic(err)
}

for i := 0; i < b.N; i++ {
write(500, ss)
}

err = ss.Close()
if err != nil {
panic(err)
}
}

func BenchmarkPebbleStateStore_GetWithCache(b *testing.B) {
defer os.RemoveAll(warehouse_path)
ss, err := state_store.NewPebbleStateStore(warehouse_path, &pebble.Options{})
if err != nil {
b.Fatal(err)
}

record := utils.NewHeavyRecord(100)
err = ss.Set(record.Key, record)
if err != nil {
b.Fatal(err)
}

for i := 0; i < b.N; i++ {
read_with_cache(500, record.Key, ss)
}

err = ss.Close()
if err != nil {
b.Fatal(err)
}
}

func BenchmarkPebbleStateStore_GetWithoutCache(b *testing.B) {
defer os.RemoveAll(warehouse_path)
ss, err := state_store.NewPebbleStateStore(warehouse_path, &pebble.Options{})
if err != nil {
b.Fatal(err)
}

number_of_repeats := 500
generated_keys := make([]string, 0, number_of_repeats)

for i := 0; i < number_of_repeats; i++ {
record := utils.NewHeavyRecord(100)
err := ss.Set(record.Key, record)
if err != nil {
b.Fatal(err)
}
generated_keys = append(generated_keys, record.Key)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, key := range generated_keys {
_, err := ss.Get(key)
if err != nil {
b.Fatal(err)
}
}

}

err = ss.Close()
if err != nil {
b.Fatal(err)
}
}
33 changes: 33 additions & 0 deletions benchmarks/protobuf_serialization_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package benchmarks

import (
"testing"

"github.com/farbodahm/streame/pkg/messaging"
"github.com/farbodahm/streame/pkg/utils"
)

// serializeAndDeserializeRecords serializes and deserializes a set number of records
func serializeAndDeserializeRecords(number_of_records int) {
for i := 0; i < number_of_records; i++ {
record := utils.NewHeavyRecord(100)

// Serialize the record to protobuf
data, err := messaging.RecordToProtocolBuffers(record)
if err != nil {
panic(err)
}

// Deserialize the protobuf back to a record
_, err = messaging.ProtocolBuffersToRecord(data)
if err != nil {
panic(err)
}
}
}

func BenchmarkProtobufSerialization(b *testing.B) {
for i := 0; i < b.N; i++ {
serializeAndDeserializeRecords(1000)
}
}
2 changes: 1 addition & 1 deletion benchmarks/rename_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func heavy_rename_column_stages(number_of_records int) {
output := make(chan Record)
errors := make(chan error)

sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(),
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(), "test-stream",
core.WithLogLevel(slog.LevelError))

// Create stages
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/schema_validation_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func heavy_schema_validation_stages(number_of_records int) {
output := make(chan Record)
errors := make(chan error)

sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(),
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(), "test-stream",
core.WithLogLevel(slog.LevelError))

go func() {
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/select_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func heavy_select_stages(number_of_stages int, number_of_records int) {
output := make(chan Record)
errors := make(chan error)

sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(),
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(), "test-stream",
core.WithLogLevel(slog.LevelError))

result_df := sdf.Select("field_1", "field_2", "field_3", "field_4",
Expand Down
31 changes: 30 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,44 @@ go 1.22.1
require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.9.0
)

require (
github.com/DataDog/zstd v1.4.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.11.3 // indirect
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/getsentry/sentry-go v0.27.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.0 // indirect
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/text v0.14.0 // indirect
)

require (
github.com/chzyer/readline v1.5.1 // indirect
github.com/cockroachdb/pebble v1.1.1
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/pprof v0.0.0-20240327155427-868f304927ed // indirect
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.18.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading