Skip to content

Commit

Permalink
(OraklNode) Update redis publish key and related codes (#2222)
Browse files Browse the repository at this point in the history
* fix: fix wrong implementation for redis sub

* fix: fix typo, rename key func

* fix: minor refactoring to reduce unnecessary indexing
  • Loading branch information
nick-bisonai authored Aug 26, 2024
1 parent b36fc2d commit 9d138dc
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 66 deletions.
2 changes: 1 addition & 1 deletion node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e
concatProof := bytes.Join(n.roundProofs.proofs[proofMessage.RoundID], nil)
proof := Proof{ConfigID: n.ID, Round: proofMessage.RoundID, Proof: concatProof}

err := PublishGlobalAggregateAndProof(ctx, globalAggregate, proof)
err := PublishGlobalAggregateAndProof(ctx, n.Name, globalAggregate, proof)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to publish global aggregate and proof")
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ func TestPublishGlobalAggregateAndProof(t *testing.T) {
}

ch := make(chan SubmissionData)
err = db.Subscribe(ctx, keys.SubmissionDataStreamKey(node.ID), ch)
err = db.Subscribe(ctx, keys.SubmissionDataStreamKey(node.Name), ch)
if err != nil {
t.Fatal("error subscribing to stream")
}

err = PublishGlobalAggregateAndProof(ctx, testItems.tmpData.globalAggregate, proof)
err = PublishGlobalAggregateAndProof(ctx, "test_pair", testItems.tmpData.globalAggregate, proof)
if err != nil {
t.Fatal("error publishing global aggregate and proof")
}
Expand Down
6 changes: 3 additions & 3 deletions node/pkg/aggregator/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func (a *App) setGlobalAggregateBulkWriter(configs []Config) {
a.stopGlobalAggregateBulkWriter()
}

configIds := make([]int32, len(configs))
configNames := make([]string, len(configs))
for i, config := range configs {
configIds[i] = config.ID
configNames[i] = config.Name
}

a.GlobalAggregateBulkWriter = NewGlobalAggregateBulkWriter(WithConfigIds(configIds))
a.GlobalAggregateBulkWriter = NewGlobalAggregateBulkWriter(WithConfigNames(configNames))
}

func (a *App) startGlobalAggregateBulkWriter(ctx context.Context) {
Expand Down
24 changes: 12 additions & 12 deletions node/pkg/aggregator/globalaggregatebulkwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ bulk insert proofs and aggregates into pgsql
*/

type GlobalAggregateBulkWriter struct {
ReceiveChannels map[int32]chan SubmissionData
ReceiveChannels map[string]chan SubmissionData
Buffer chan SubmissionData

LatestDataUpdateInterval time.Duration
Expand All @@ -30,7 +30,7 @@ const DefaultBufferSize = 2000
type GlobalAggregateBulkWriterConfig struct {
PgsqlBulkInsertInterval time.Duration
BufferSize int
ConfigIds []int32
ConfigNames []string
}

type GlobalAggregateBulkWriterOption func(*GlobalAggregateBulkWriterConfig)
Expand All @@ -47,9 +47,9 @@ func WithBufferSize(size int) GlobalAggregateBulkWriterOption {
}
}

func WithConfigIds(configIds []int32) GlobalAggregateBulkWriterOption {
func WithConfigNames(configNames []string) GlobalAggregateBulkWriterOption {
return func(config *GlobalAggregateBulkWriterConfig) {
config.ConfigIds = configIds
config.ConfigNames = configNames
}
}

Expand All @@ -63,14 +63,14 @@ func NewGlobalAggregateBulkWriter(opts ...GlobalAggregateBulkWriterOption) *Glob
}

result := &GlobalAggregateBulkWriter{
ReceiveChannels: make(map[int32]chan SubmissionData, len(config.ConfigIds)),
ReceiveChannels: make(map[string]chan SubmissionData, len(config.ConfigNames)),
Buffer: make(chan SubmissionData, config.BufferSize),

PgsqlBulkInsertInterval: config.PgsqlBulkInsertInterval,
}

for _, configId := range config.ConfigIds {
result.ReceiveChannels[configId] = make(chan SubmissionData)
for _, configName := range config.ConfigNames {
result.ReceiveChannels[configName] = make(chan SubmissionData)
}

return result
Expand Down Expand Up @@ -102,21 +102,21 @@ func (s *GlobalAggregateBulkWriter) Stop() {
}

func (s *GlobalAggregateBulkWriter) receive(ctx context.Context) {
for id := range s.ReceiveChannels {
go s.receiveEach(ctx, id)
for name := range s.ReceiveChannels {
go s.receiveEach(ctx, name)
}
}

func (s *GlobalAggregateBulkWriter) receiveEach(ctx context.Context, configId int32) {
err := db.Subscribe(ctx, keys.SubmissionDataStreamKey(configId), s.ReceiveChannels[configId])
func (s *GlobalAggregateBulkWriter) receiveEach(ctx context.Context, configName string) {
err := db.Subscribe(ctx, keys.SubmissionDataStreamKey(configName), s.ReceiveChannels[configName])
if err != nil {
log.Error().Err(err).Str("Player", "Aggregator").Msg("failed to subscribe to submission stream")
}
for {
select {
case <-ctx.Done():
return
case data := <-s.ReceiveChannels[configId]:
case data := <-s.ReceiveChannels[configName]:
s.Buffer <- data
}
}
Expand Down
10 changes: 5 additions & 5 deletions node/pkg/aggregator/globalaggregatebulkwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestNewGlobalAggregateBulkWriter(t *testing.T) {
}
}()

_ = NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID}))
_ = NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))
if err != nil {
t.Fatal("error creating new node")
}
Expand All @@ -40,7 +40,7 @@ func TestGlobalAggregateBulkWriterStart(t *testing.T) {
}
}()

bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID}))
bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))

bulkWriter.Start(ctx)

Expand All @@ -59,7 +59,7 @@ func TestGlobalAggregateBulkWriterStop(t *testing.T) {
}
}()

bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID}))
bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))

bulkWriter.Start(ctx)

Expand All @@ -80,7 +80,7 @@ func TestGlobalAggregateBulkWriterDataStore(t *testing.T) {
}
}()

bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.globalAggregate.ConfigID}))
bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))

bulkWriter.Start(ctx)
defer bulkWriter.Stop()
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestGlobalAggregateBulkWriterDataStore(t *testing.T) {
}

time.Sleep(time.Millisecond * 50)
err = PublishGlobalAggregateAndProof(ctx, testItems.tmpData.globalAggregate, proof)
err = PublishGlobalAggregateAndProof(ctx, "test_pair", testItems.tmpData.globalAggregate, proof)
if err != nil {
t.Fatal("error publishing global aggregate and proof")
}
Expand Down
5 changes: 2 additions & 3 deletions node/pkg/aggregator/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ func FilterNegative(values []int64) []int64 {
return result
}

func PublishGlobalAggregateAndProof(ctx context.Context, globalAggregate GlobalAggregate, proof Proof) error {
func PublishGlobalAggregateAndProof(ctx context.Context, name string, globalAggregate GlobalAggregate, proof Proof) error {
if globalAggregate.Value == 0 || globalAggregate.Timestamp.IsZero() {
return nil
}
data := SubmissionData{
GlobalAggregate: globalAggregate,
Proof: proof,
}

return db.Publish(ctx, keys.SubmissionDataStreamKey(globalAggregate.ConfigID), data)
return db.Publish(ctx, keys.SubmissionDataStreamKey(name), data)
}

func getLatestRoundId(ctx context.Context, configId int32) (int32, error) {
Expand Down
8 changes: 2 additions & 6 deletions node/pkg/common/keys/keys.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package keys

import (
"strconv"
)

func SubmissionDataStreamKey(configId int32) string {
return "submissionDataStream:" + strconv.Itoa(int(configId))
func SubmissionDataStreamKey(name string) string {
return "submissionDataStream:" + name
}
25 changes: 16 additions & 9 deletions node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
type Config = types.Config

type Collector struct {
OutgoingStream map[int32]chan *dalcommon.OutgoingSubmissionData
OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData
Symbols map[int32]string
FeedHashes map[int32][]byte
LatestTimestamps map[int32]time.Time
Expand Down Expand Up @@ -92,7 +92,7 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {
}

collector := &Collector{
OutgoingStream: make(map[int32]chan *dalcommon.OutgoingSubmissionData, len(configs)),
OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(configs)),
Symbols: make(map[int32]string, len(configs)),
FeedHashes: make(map[int32][]byte, len(configs)),
LatestTimestamps: make(map[int32]time.Time),
Expand All @@ -104,20 +104,27 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {

redisTopics := []string{}
for _, config := range configs {
collector.OutgoingStream[config.ID] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.OutgoingStream[config.Name] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.Symbols[config.ID] = config.Name
collector.FeedHashes[config.ID] = crypto.Keccak256([]byte(config.Name))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.ID))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.Name))
}

baseRediscribe, err := db.NewRediscribe(ctx, db.WithRedisHost(baseRedisHost), db.WithRedisPort(baseRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter))
baseRediscribe, err := db.NewRediscribe(
ctx,
db.WithRedisHost(baseRedisHost),
db.WithRedisPort(baseRedisPort),
db.WithRedisTopics(redisTopics),
db.WithRedisRouter(collector.redisRouter))
if err != nil {
return nil, err
}
collector.baseRediscribe = baseRediscribe

if subRedisHost != "" && subRedisPort != "" {
subRediscribe, err := db.NewRediscribe(ctx, db.WithRedisHost(subRedisHost), db.WithRedisPort(subRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter))
subRediscribe, err := db.NewRediscribe(
ctx,
db.WithRedisHost(subRedisHost), db.WithRedisPort(subRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -221,12 +228,12 @@ func (c *Collector) processIncomingData(ctx context.Context, data *aggregator.Su
return
}

defer func(data *dalcommon.OutgoingSubmissionData) {
defer func(result *dalcommon.OutgoingSubmissionData) {
c.mu.Lock()
defer c.mu.Unlock()
c.LatestData[data.Symbol] = data
c.LatestData[result.Symbol] = result
}(result)
c.OutgoingStream[data.GlobalAggregate.ConfigID] <- result
c.OutgoingStream[result.Symbol] <- result
}
}

Expand Down
16 changes: 1 addition & 15 deletions node/pkg/dal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,25 +139,11 @@ func (h *Hub) removeClient(client *websocket.Conn) {
}

func (h *Hub) initializeBroadcastChannels(collector *collector.Collector) {
for configId, stream := range collector.OutgoingStream {
symbol := h.configIdToSymbol(configId)
if symbol == "" {
continue
}

for symbol, stream := range collector.OutgoingStream {
h.broadcast[symbol] = stream
}
}

func (h *Hub) configIdToSymbol(id int32) string {
for symbol, config := range h.Configs {
if config.ID == id {
return symbol
}
}
return ""
}

func (h *Hub) broadcastDataForSymbol(ctx context.Context, symbol string) {
for data := range h.broadcast[symbol] {
go h.castSubmissionData(ctx, data, &symbol)
Expand Down
12 changes: 6 additions & 6 deletions node/pkg/dal/tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestApiGetLatestAll(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestApiGetLatest(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestApiGetLatestTransposeAll(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestApiGetLatestTranspose(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestApiWebsocket(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -473,7 +473,7 @@ func TestApiWebsocket(t *testing.T) {
}

// Publish data
err = testPublishData(ctx, *expectedData)
err = testPublishData(ctx, "test-aggregate", *expectedData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/dal/tests/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestCollectorStream(t *testing.T) {
}

log.Debug().Msg("Publishing data")
err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing data: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/dal/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type TestItems struct {
StatsApp *stats.StatsApp
}

func testPublishData(ctx context.Context, submissionData aggregator.SubmissionData) error {
return db.Publish(ctx, keys.SubmissionDataStreamKey(submissionData.GlobalAggregate.ConfigID), submissionData)
func testPublishData(ctx context.Context, name string, submissionData aggregator.SubmissionData) error {
return db.Publish(ctx, keys.SubmissionDataStreamKey(name), submissionData)
}

func generateSampleSubmissionData(configId int32, value int64, timestamp time.Time, round int32, symbol string) (*aggregator.SubmissionData, error) {
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/reporter/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestWsDataHandling(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = db.Publish(ctx, keys.SubmissionDataStreamKey(sampleSubmissionData.GlobalAggregate.ConfigID), sampleSubmissionData)
err = db.Publish(ctx, keys.SubmissionDataStreamKey("test-aggregate"), sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down

0 comments on commit 9d138dc

Please sign in to comment.