From e59cbf9ad6403c26b122b6ccb44f8c811bbe0ee3 Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 24 Aug 2024 21:07:22 +0900 Subject: [PATCH] fix: fix wrong implementation for redis sub --- node/pkg/aggregator/aggregator.go | 2 +- node/pkg/aggregator/aggregator_test.go | 4 ++-- node/pkg/aggregator/app.go | 6 ++--- .../aggregator/globalaggregatebulkwriter.go | 24 +++++++++---------- .../globalaggregatebulkwriter_test.go | 10 ++++---- node/pkg/aggregator/utils.go | 5 ++-- node/pkg/common/keys/keys.go | 8 ++----- node/pkg/dal/collector/collector.go | 13 +++++++--- node/pkg/dal/tests/api_test.go | 12 +++++----- node/pkg/dal/tests/collector_test.go | 2 +- node/pkg/dal/tests/main_test.go | 4 ++-- node/pkg/reporter/app_test.go | 2 +- 12 files changed, 47 insertions(+), 45 deletions(-) diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index dba22ab62..c9d45cb6d 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -282,7 +282,7 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e concatProof := bytes.Join(n.roundProofs.proofs[proofMessage.RoundID], nil) proof := Proof{ConfigID: n.ID, Round: proofMessage.RoundID, Proof: concatProof} - err := PublishGlobalAggregateAndProof(ctx, globalAggregate, proof) + err := PublishGlobalAggregateAndProof(ctx, n.Name, globalAggregate, proof) if err != nil { log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to publish global aggregate and proof") } diff --git a/node/pkg/aggregator/aggregator_test.go b/node/pkg/aggregator/aggregator_test.go index f5baca788..5f5a47ea8 100644 --- a/node/pkg/aggregator/aggregator_test.go +++ b/node/pkg/aggregator/aggregator_test.go @@ -121,12 +121,12 @@ func TestPublishGlobalAggregateAndProof(t *testing.T) { } ch := make(chan SubmissionData) - err = db.Subscribe(ctx, keys.SubmissionDataStreamKey(node.ID), ch) + err = db.Subscribe(ctx, keys.SubmissionDataStreamKeyV2(node.Name), ch) if err != nil { t.Fatal("error subscribing to stream") } - err = PublishGlobalAggregateAndProof(ctx, testItems.tmpData.globalAggregate, proof) + err = PublishGlobalAggregateAndProof(ctx, "test_pair", testItems.tmpData.globalAggregate, proof) if err != nil { t.Fatal("error publishing global aggregate and proof") } diff --git a/node/pkg/aggregator/app.go b/node/pkg/aggregator/app.go index e3b64d089..7058b5833 100644 --- a/node/pkg/aggregator/app.go +++ b/node/pkg/aggregator/app.go @@ -59,12 +59,12 @@ func (a *App) setGlobalAggregateBulkWriter(configs []Config) { a.stopGlobalAggregateBulkWriter() } - configIds := make([]int32, len(configs)) + configNames := make([]string, len(configs)) for i, config := range configs { - configIds[i] = config.ID + configNames[i] = config.Name } - a.GlobalAggregateBulkWriter = NewGlobalAggregateBulkWriter(WithConfigIds(configIds)) + a.GlobalAggregateBulkWriter = NewGlobalAggregateBulkWriter(WithConfigNames(configNames)) } func (a *App) startGlobalAggregateBulkWriter(ctx context.Context) { diff --git a/node/pkg/aggregator/globalaggregatebulkwriter.go b/node/pkg/aggregator/globalaggregatebulkwriter.go index 68ed46057..e318f0e83 100644 --- a/node/pkg/aggregator/globalaggregatebulkwriter.go +++ b/node/pkg/aggregator/globalaggregatebulkwriter.go @@ -14,7 +14,7 @@ bulk insert proofs and aggregates into pgsql */ type GlobalAggregateBulkWriter struct { - ReceiveChannels map[int32]chan SubmissionData + ReceiveChannels map[string]chan SubmissionData Buffer chan SubmissionData LatestDataUpdateInterval time.Duration @@ -30,7 +30,7 @@ const DefaultBufferSize = 2000 type GlobalAggregateBulkWriterConfig struct { PgsqlBulkInsertInterval time.Duration BufferSize int - ConfigIds []int32 + ConfigNames []string } type GlobalAggregateBulkWriterOption func(*GlobalAggregateBulkWriterConfig) @@ -47,9 +47,9 @@ func WithBufferSize(size int) GlobalAggregateBulkWriterOption { } } -func WithConfigIds(configIds []int32) GlobalAggregateBulkWriterOption { +func WithConfigNames(configNames []string) GlobalAggregateBulkWriterOption { return func(config *GlobalAggregateBulkWriterConfig) { - config.ConfigIds = configIds + config.ConfigNames = configNames } } @@ -63,14 +63,14 @@ func NewGlobalAggregateBulkWriter(opts ...GlobalAggregateBulkWriterOption) *Glob } result := &GlobalAggregateBulkWriter{ - ReceiveChannels: make(map[int32]chan SubmissionData, len(config.ConfigIds)), + ReceiveChannels: make(map[string]chan SubmissionData, len(config.ConfigNames)), Buffer: make(chan SubmissionData, config.BufferSize), PgsqlBulkInsertInterval: config.PgsqlBulkInsertInterval, } - for _, configId := range config.ConfigIds { - result.ReceiveChannels[configId] = make(chan SubmissionData) + for _, configName := range config.ConfigNames { + result.ReceiveChannels[configName] = make(chan SubmissionData) } return result @@ -102,13 +102,13 @@ func (s *GlobalAggregateBulkWriter) Stop() { } func (s *GlobalAggregateBulkWriter) receive(ctx context.Context) { - for id := range s.ReceiveChannels { - go s.receiveEach(ctx, id) + for name := range s.ReceiveChannels { + go s.receiveEach(ctx, name) } } -func (s *GlobalAggregateBulkWriter) receiveEach(ctx context.Context, configId int32) { - err := db.Subscribe(ctx, keys.SubmissionDataStreamKey(configId), s.ReceiveChannels[configId]) +func (s *GlobalAggregateBulkWriter) receiveEach(ctx context.Context, configName string) { + err := db.Subscribe(ctx, keys.SubmissionDataStreamKeyV2(configName), s.ReceiveChannels[configName]) if err != nil { log.Error().Err(err).Str("Player", "Aggregator").Msg("failed to subscribe to submission stream") } @@ -116,7 +116,7 @@ func (s *GlobalAggregateBulkWriter) receiveEach(ctx context.Context, configId in select { case <-ctx.Done(): return - case data := <-s.ReceiveChannels[configId]: + case data := <-s.ReceiveChannels[configName]: s.Buffer <- data } } diff --git a/node/pkg/aggregator/globalaggregatebulkwriter_test.go b/node/pkg/aggregator/globalaggregatebulkwriter_test.go index 8c57bdbe7..2dbd409c6 100644 --- a/node/pkg/aggregator/globalaggregatebulkwriter_test.go +++ b/node/pkg/aggregator/globalaggregatebulkwriter_test.go @@ -22,7 +22,7 @@ func TestNewGlobalAggregateBulkWriter(t *testing.T) { } }() - _ = NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID})) + _ = NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name})) if err != nil { t.Fatal("error creating new node") } @@ -40,7 +40,7 @@ func TestGlobalAggregateBulkWriterStart(t *testing.T) { } }() - bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID})) + bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name})) bulkWriter.Start(ctx) @@ -59,7 +59,7 @@ func TestGlobalAggregateBulkWriterStop(t *testing.T) { } }() - bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID})) + bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name})) bulkWriter.Start(ctx) @@ -80,7 +80,7 @@ func TestGlobalAggregateBulkWriterDataStore(t *testing.T) { } }() - bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.globalAggregate.ConfigID})) + bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name})) bulkWriter.Start(ctx) defer bulkWriter.Stop() @@ -109,7 +109,7 @@ func TestGlobalAggregateBulkWriterDataStore(t *testing.T) { } time.Sleep(time.Millisecond * 50) - err = PublishGlobalAggregateAndProof(ctx, testItems.tmpData.globalAggregate, proof) + err = PublishGlobalAggregateAndProof(ctx, "test_pair", testItems.tmpData.globalAggregate, proof) if err != nil { t.Fatal("error publishing global aggregate and proof") } diff --git a/node/pkg/aggregator/utils.go b/node/pkg/aggregator/utils.go index 137e2338b..da1f8fa9a 100644 --- a/node/pkg/aggregator/utils.go +++ b/node/pkg/aggregator/utils.go @@ -18,7 +18,7 @@ func FilterNegative(values []int64) []int64 { return result } -func PublishGlobalAggregateAndProof(ctx context.Context, globalAggregate GlobalAggregate, proof Proof) error { +func PublishGlobalAggregateAndProof(ctx context.Context, name string, globalAggregate GlobalAggregate, proof Proof) error { if globalAggregate.Value == 0 || globalAggregate.Timestamp.IsZero() { return nil } @@ -26,8 +26,7 @@ func PublishGlobalAggregateAndProof(ctx context.Context, globalAggregate GlobalA GlobalAggregate: globalAggregate, Proof: proof, } - - return db.Publish(ctx, keys.SubmissionDataStreamKey(globalAggregate.ConfigID), data) + return db.Publish(ctx, keys.SubmissionDataStreamKeyV2(name), data) } func getLatestRoundId(ctx context.Context, configId int32) (int32, error) { diff --git a/node/pkg/common/keys/keys.go b/node/pkg/common/keys/keys.go index 35d414e5c..1b3287b45 100644 --- a/node/pkg/common/keys/keys.go +++ b/node/pkg/common/keys/keys.go @@ -1,9 +1,5 @@ package keys -import ( - "strconv" -) - -func SubmissionDataStreamKey(configId int32) string { - return "submissionDataStream:" + strconv.Itoa(int(configId)) +func SubmissionDataStreamKeyV2(name string) string { + return "submissionDataSteram:" + name } diff --git a/node/pkg/dal/collector/collector.go b/node/pkg/dal/collector/collector.go index 2a980e988..202a17663 100644 --- a/node/pkg/dal/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -107,17 +107,24 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) { collector.OutgoingStream[config.ID] = make(chan *dalcommon.OutgoingSubmissionData, 1000) collector.Symbols[config.ID] = config.Name collector.FeedHashes[config.ID] = crypto.Keccak256([]byte(config.Name)) - redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.ID)) + redisTopics = append(redisTopics, keys.SubmissionDataStreamKeyV2(config.Name)) } - baseRediscribe, err := db.NewRediscribe(ctx, db.WithRedisHost(baseRedisHost), db.WithRedisPort(baseRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter)) + baseRediscribe, err := db.NewRediscribe( + ctx, + db.WithRedisHost(baseRedisHost), + db.WithRedisPort(baseRedisPort), + db.WithRedisTopics(redisTopics), + db.WithRedisRouter(collector.redisRouter)) if err != nil { return nil, err } collector.baseRediscribe = baseRediscribe if subRedisHost != "" && subRedisPort != "" { - subRediscribe, err := db.NewRediscribe(ctx, db.WithRedisHost(subRedisHost), db.WithRedisPort(subRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter)) + subRediscribe, err := db.NewRediscribe( + ctx, + db.WithRedisHost(subRedisHost), db.WithRedisPort(subRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter)) if err != nil { return nil, err } diff --git a/node/pkg/dal/tests/api_test.go b/node/pkg/dal/tests/api_test.go index 8357952f0..31ab1b5fb 100644 --- a/node/pkg/dal/tests/api_test.go +++ b/node/pkg/dal/tests/api_test.go @@ -96,7 +96,7 @@ func TestApiGetLatestAll(t *testing.T) { t.Fatalf("error generating sample submission data: %v", err) } - err = testPublishData(ctx, *sampleSubmissionData) + err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData) if err != nil { t.Fatalf("error publishing sample submission data: %v", err) } @@ -168,7 +168,7 @@ func TestApiGetLatest(t *testing.T) { t.Fatalf("error generating sample submission data: %v", err) } - err = testPublishData(ctx, *sampleSubmissionData) + err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData) if err != nil { t.Fatalf("error publishing sample submission data: %v", err) } @@ -209,7 +209,7 @@ func TestApiGetLatestTransposeAll(t *testing.T) { t.Fatalf("error generating sample submission data: %v", err) } - err = testPublishData(ctx, *sampleSubmissionData) + err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData) if err != nil { t.Fatalf("error publishing sample submission data: %v", err) } @@ -257,7 +257,7 @@ func TestApiGetLatestTranspose(t *testing.T) { t.Fatalf("error generating sample submission data: %v", err) } - err = testPublishData(ctx, *sampleSubmissionData) + err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData) if err != nil { t.Fatalf("error publishing sample submission data: %v", err) } @@ -327,7 +327,7 @@ func TestApiWebsocket(t *testing.T) { t.Fatalf("error generating sample submission data: %v", err) } - err = testPublishData(ctx, *sampleSubmissionData) + err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData) if err != nil { t.Fatalf("error publishing sample submission data: %v", err) } @@ -473,7 +473,7 @@ func TestApiWebsocket(t *testing.T) { } // Publish data - err = testPublishData(ctx, *expectedData) + err = testPublishData(ctx, "test-aggregate", *expectedData) if err != nil { t.Fatalf("error publishing sample submission data: %v", err) } diff --git a/node/pkg/dal/tests/collector_test.go b/node/pkg/dal/tests/collector_test.go index 1aab2aa3b..157ee77cf 100644 --- a/node/pkg/dal/tests/collector_test.go +++ b/node/pkg/dal/tests/collector_test.go @@ -85,7 +85,7 @@ func TestCollectorStream(t *testing.T) { } log.Debug().Msg("Publishing data") - err = testPublishData(ctx, *sampleSubmissionData) + err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData) if err != nil { t.Fatalf("error publishing data: %v", err) } diff --git a/node/pkg/dal/tests/main_test.go b/node/pkg/dal/tests/main_test.go index 053b2c377..dea876c93 100644 --- a/node/pkg/dal/tests/main_test.go +++ b/node/pkg/dal/tests/main_test.go @@ -36,8 +36,8 @@ type TestItems struct { StatsApp *stats.StatsApp } -func testPublishData(ctx context.Context, submissionData aggregator.SubmissionData) error { - return db.Publish(ctx, keys.SubmissionDataStreamKey(submissionData.GlobalAggregate.ConfigID), submissionData) +func testPublishData(ctx context.Context, name string, submissionData aggregator.SubmissionData) error { + return db.Publish(ctx, keys.SubmissionDataStreamKeyV2(name), submissionData) } func generateSampleSubmissionData(configId int32, value int64, timestamp time.Time, round int32, symbol string) (*aggregator.SubmissionData, error) { diff --git a/node/pkg/reporter/app_test.go b/node/pkg/reporter/app_test.go index 228eb1e79..352d0d1c5 100644 --- a/node/pkg/reporter/app_test.go +++ b/node/pkg/reporter/app_test.go @@ -61,7 +61,7 @@ func TestWsDataHandling(t *testing.T) { t.Fatalf("error generating sample submission data: %v", err) } - err = db.Publish(ctx, keys.SubmissionDataStreamKey(sampleSubmissionData.GlobalAggregate.ConfigID), sampleSubmissionData) + err = db.Publish(ctx, keys.SubmissionDataStreamKeyV2("test-aggregate"), sampleSubmissionData) if err != nil { t.Fatalf("error publishing sample submission data: %v", err) }