Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(OraklNode) Update redis publish key and related codes #2222

Merged
merged 3 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e
concatProof := bytes.Join(n.roundProofs.proofs[proofMessage.RoundID], nil)
proof := Proof{ConfigID: n.ID, Round: proofMessage.RoundID, Proof: concatProof}

err := PublishGlobalAggregateAndProof(ctx, globalAggregate, proof)
err := PublishGlobalAggregateAndProof(ctx, n.Name, globalAggregate, proof)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to publish global aggregate and proof")
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ func TestPublishGlobalAggregateAndProof(t *testing.T) {
}

ch := make(chan SubmissionData)
err = db.Subscribe(ctx, keys.SubmissionDataStreamKey(node.ID), ch)
err = db.Subscribe(ctx, keys.SubmissionDataStreamKey(node.Name), ch)
if err != nil {
t.Fatal("error subscribing to stream")
}

err = PublishGlobalAggregateAndProof(ctx, testItems.tmpData.globalAggregate, proof)
err = PublishGlobalAggregateAndProof(ctx, "test_pair", testItems.tmpData.globalAggregate, proof)
if err != nil {
t.Fatal("error publishing global aggregate and proof")
}
Expand Down
6 changes: 3 additions & 3 deletions node/pkg/aggregator/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func (a *App) setGlobalAggregateBulkWriter(configs []Config) {
a.stopGlobalAggregateBulkWriter()
}

configIds := make([]int32, len(configs))
configNames := make([]string, len(configs))
for i, config := range configs {
configIds[i] = config.ID
configNames[i] = config.Name
}

a.GlobalAggregateBulkWriter = NewGlobalAggregateBulkWriter(WithConfigIds(configIds))
a.GlobalAggregateBulkWriter = NewGlobalAggregateBulkWriter(WithConfigNames(configNames))
}

func (a *App) startGlobalAggregateBulkWriter(ctx context.Context) {
Expand Down
24 changes: 12 additions & 12 deletions node/pkg/aggregator/globalaggregatebulkwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ bulk insert proofs and aggregates into pgsql
*/

type GlobalAggregateBulkWriter struct {
ReceiveChannels map[int32]chan SubmissionData
ReceiveChannels map[string]chan SubmissionData
Buffer chan SubmissionData

LatestDataUpdateInterval time.Duration
Expand All @@ -30,7 +30,7 @@ const DefaultBufferSize = 2000
type GlobalAggregateBulkWriterConfig struct {
PgsqlBulkInsertInterval time.Duration
BufferSize int
ConfigIds []int32
ConfigNames []string
}

type GlobalAggregateBulkWriterOption func(*GlobalAggregateBulkWriterConfig)
Expand All @@ -47,9 +47,9 @@ func WithBufferSize(size int) GlobalAggregateBulkWriterOption {
}
}

func WithConfigIds(configIds []int32) GlobalAggregateBulkWriterOption {
func WithConfigNames(configNames []string) GlobalAggregateBulkWriterOption {
return func(config *GlobalAggregateBulkWriterConfig) {
config.ConfigIds = configIds
config.ConfigNames = configNames
}
}

Expand All @@ -63,14 +63,14 @@ func NewGlobalAggregateBulkWriter(opts ...GlobalAggregateBulkWriterOption) *Glob
}

result := &GlobalAggregateBulkWriter{
ReceiveChannels: make(map[int32]chan SubmissionData, len(config.ConfigIds)),
ReceiveChannels: make(map[string]chan SubmissionData, len(config.ConfigNames)),
Buffer: make(chan SubmissionData, config.BufferSize),

PgsqlBulkInsertInterval: config.PgsqlBulkInsertInterval,
}

for _, configId := range config.ConfigIds {
result.ReceiveChannels[configId] = make(chan SubmissionData)
for _, configName := range config.ConfigNames {
result.ReceiveChannels[configName] = make(chan SubmissionData)
}

return result
Expand Down Expand Up @@ -102,21 +102,21 @@ func (s *GlobalAggregateBulkWriter) Stop() {
}

func (s *GlobalAggregateBulkWriter) receive(ctx context.Context) {
for id := range s.ReceiveChannels {
go s.receiveEach(ctx, id)
for name := range s.ReceiveChannels {
go s.receiveEach(ctx, name)
}
}

func (s *GlobalAggregateBulkWriter) receiveEach(ctx context.Context, configId int32) {
err := db.Subscribe(ctx, keys.SubmissionDataStreamKey(configId), s.ReceiveChannels[configId])
func (s *GlobalAggregateBulkWriter) receiveEach(ctx context.Context, configName string) {
err := db.Subscribe(ctx, keys.SubmissionDataStreamKey(configName), s.ReceiveChannels[configName])
if err != nil {
log.Error().Err(err).Str("Player", "Aggregator").Msg("failed to subscribe to submission stream")
}
for {
select {
case <-ctx.Done():
return
case data := <-s.ReceiveChannels[configId]:
case data := <-s.ReceiveChannels[configName]:
s.Buffer <- data
}
}
Expand Down
10 changes: 5 additions & 5 deletions node/pkg/aggregator/globalaggregatebulkwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestNewGlobalAggregateBulkWriter(t *testing.T) {
}
}()

_ = NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID}))
_ = NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))
if err != nil {
t.Fatal("error creating new node")
}
Expand All @@ -40,7 +40,7 @@ func TestGlobalAggregateBulkWriterStart(t *testing.T) {
}
}()

bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID}))
bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))

bulkWriter.Start(ctx)

Expand All @@ -59,7 +59,7 @@ func TestGlobalAggregateBulkWriterStop(t *testing.T) {
}
}()

bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID}))
bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))

bulkWriter.Start(ctx)

Expand All @@ -80,7 +80,7 @@ func TestGlobalAggregateBulkWriterDataStore(t *testing.T) {
}
}()

bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.globalAggregate.ConfigID}))
bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))

bulkWriter.Start(ctx)
defer bulkWriter.Stop()
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestGlobalAggregateBulkWriterDataStore(t *testing.T) {
}

time.Sleep(time.Millisecond * 50)
err = PublishGlobalAggregateAndProof(ctx, testItems.tmpData.globalAggregate, proof)
err = PublishGlobalAggregateAndProof(ctx, "test_pair", testItems.tmpData.globalAggregate, proof)
if err != nil {
t.Fatal("error publishing global aggregate and proof")
}
Expand Down
5 changes: 2 additions & 3 deletions node/pkg/aggregator/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ func FilterNegative(values []int64) []int64 {
return result
}

func PublishGlobalAggregateAndProof(ctx context.Context, globalAggregate GlobalAggregate, proof Proof) error {
func PublishGlobalAggregateAndProof(ctx context.Context, name string, globalAggregate GlobalAggregate, proof Proof) error {
if globalAggregate.Value == 0 || globalAggregate.Timestamp.IsZero() {
return nil
}
data := SubmissionData{
GlobalAggregate: globalAggregate,
Proof: proof,
}

return db.Publish(ctx, keys.SubmissionDataStreamKey(globalAggregate.ConfigID), data)
return db.Publish(ctx, keys.SubmissionDataStreamKey(name), data)
}

func getLatestRoundId(ctx context.Context, configId int32) (int32, error) {
Expand Down
8 changes: 2 additions & 6 deletions node/pkg/common/keys/keys.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package keys

import (
"strconv"
)

func SubmissionDataStreamKey(configId int32) string {
return "submissionDataStream:" + strconv.Itoa(int(configId))
func SubmissionDataStreamKey(name string) string {
return "submissionDataStream:" + name
}
25 changes: 16 additions & 9 deletions node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
type Config = types.Config

type Collector struct {
OutgoingStream map[int32]chan *dalcommon.OutgoingSubmissionData
OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData
Symbols map[int32]string
FeedHashes map[int32][]byte
LatestTimestamps map[int32]time.Time
Expand Down Expand Up @@ -92,7 +92,7 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {
}

collector := &Collector{
OutgoingStream: make(map[int32]chan *dalcommon.OutgoingSubmissionData, len(configs)),
OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(configs)),
Symbols: make(map[int32]string, len(configs)),
FeedHashes: make(map[int32][]byte, len(configs)),
LatestTimestamps: make(map[int32]time.Time),
Expand All @@ -104,20 +104,27 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {

redisTopics := []string{}
for _, config := range configs {
collector.OutgoingStream[config.ID] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.OutgoingStream[config.Name] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.Symbols[config.ID] = config.Name
collector.FeedHashes[config.ID] = crypto.Keccak256([]byte(config.Name))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.ID))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.Name))
}

baseRediscribe, err := db.NewRediscribe(ctx, db.WithRedisHost(baseRedisHost), db.WithRedisPort(baseRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter))
baseRediscribe, err := db.NewRediscribe(
ctx,
db.WithRedisHost(baseRedisHost),
db.WithRedisPort(baseRedisPort),
db.WithRedisTopics(redisTopics),
db.WithRedisRouter(collector.redisRouter))
if err != nil {
return nil, err
}
collector.baseRediscribe = baseRediscribe

if subRedisHost != "" && subRedisPort != "" {
subRediscribe, err := db.NewRediscribe(ctx, db.WithRedisHost(subRedisHost), db.WithRedisPort(subRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter))
subRediscribe, err := db.NewRediscribe(
ctx,
db.WithRedisHost(subRedisHost), db.WithRedisPort(subRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -221,12 +228,12 @@ func (c *Collector) processIncomingData(ctx context.Context, data *aggregator.Su
return
}

defer func(data *dalcommon.OutgoingSubmissionData) {
defer func(result *dalcommon.OutgoingSubmissionData) {
c.mu.Lock()
defer c.mu.Unlock()
c.LatestData[data.Symbol] = data
c.LatestData[result.Symbol] = result
}(result)
c.OutgoingStream[data.GlobalAggregate.ConfigID] <- result
c.OutgoingStream[result.Symbol] <- result
}
}

Expand Down
16 changes: 1 addition & 15 deletions node/pkg/dal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,25 +139,11 @@ func (h *Hub) removeClient(client *websocket.Conn) {
}

func (h *Hub) initializeBroadcastChannels(collector *collector.Collector) {
for configId, stream := range collector.OutgoingStream {
symbol := h.configIdToSymbol(configId)
if symbol == "" {
continue
}

for symbol, stream := range collector.OutgoingStream {
h.broadcast[symbol] = stream
}
}

func (h *Hub) configIdToSymbol(id int32) string {
for symbol, config := range h.Configs {
if config.ID == id {
return symbol
}
}
return ""
}

func (h *Hub) broadcastDataForSymbol(ctx context.Context, symbol string) {
for data := range h.broadcast[symbol] {
go h.castSubmissionData(ctx, data, &symbol)
Expand Down
12 changes: 6 additions & 6 deletions node/pkg/dal/tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestApiGetLatestAll(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestApiGetLatest(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestApiGetLatestTransposeAll(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestApiGetLatestTranspose(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestApiWebsocket(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -473,7 +473,7 @@ func TestApiWebsocket(t *testing.T) {
}

// Publish data
err = testPublishData(ctx, *expectedData)
err = testPublishData(ctx, "test-aggregate", *expectedData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/dal/tests/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestCollectorStream(t *testing.T) {
}

log.Debug().Msg("Publishing data")
err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing data: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/dal/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type TestItems struct {
StatsApp *stats.StatsApp
}

func testPublishData(ctx context.Context, submissionData aggregator.SubmissionData) error {
return db.Publish(ctx, keys.SubmissionDataStreamKey(submissionData.GlobalAggregate.ConfigID), submissionData)
func testPublishData(ctx context.Context, name string, submissionData aggregator.SubmissionData) error {
return db.Publish(ctx, keys.SubmissionDataStreamKey(name), submissionData)
}

func generateSampleSubmissionData(configId int32, value int64, timestamp time.Time, round int32, symbol string) (*aggregator.SubmissionData, error) {
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/reporter/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestWsDataHandling(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = db.Publish(ctx, keys.SubmissionDataStreamKey(sampleSubmissionData.GlobalAggregate.ConfigID), sampleSubmissionData)
err = db.Publish(ctx, keys.SubmissionDataStreamKey("test-aggregate"), sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down