diff --git a/node/pkg/aggregator/types.go b/node/pkg/aggregator/types.go index fcd689ecb..626bb683a 100644 --- a/node/pkg/aggregator/types.go +++ b/node/pkg/aggregator/types.go @@ -33,6 +33,7 @@ type Proof = types.Proof type GlobalAggregate = types.GlobalAggregate type SubmissionData struct { + Symbol string GlobalAggregate GlobalAggregate Proof Proof } diff --git a/node/pkg/aggregator/utils.go b/node/pkg/aggregator/utils.go index f9406413d..1ec5931b1 100644 --- a/node/pkg/aggregator/utils.go +++ b/node/pkg/aggregator/utils.go @@ -23,6 +23,7 @@ func PublishGlobalAggregateAndProof(ctx context.Context, name string, globalAggr return nil } data := SubmissionData{ + Symbol: name, GlobalAggregate: globalAggregate, Proof: proof, } diff --git a/node/pkg/dal/apiv2/controller.go b/node/pkg/dal/apiv2/controller.go index e5ab0bda1..ac7dcdabd 100644 --- a/node/pkg/dal/apiv2/controller.go +++ b/node/pkg/dal/apiv2/controller.go @@ -172,8 +172,8 @@ func (s *ServerV2) HealthCheckHandler(w http.ResponseWriter, r *http.Request) { } func (s *ServerV2) SymbolsHandler(w http.ResponseWriter, r *http.Request) { - result := make([]string, 0, len(s.hub.Configs)) - for key := range s.hub.Configs { + result := make([]string, 0, len(s.hub.Symbols)) + for key := range s.hub.Symbols { result = append(result, key) } diff --git a/node/pkg/dal/app.go b/node/pkg/dal/app.go index 4e21ef9ae..b015f5619 100644 --- a/node/pkg/dal/app.go +++ b/node/pkg/dal/app.go @@ -4,6 +4,7 @@ import ( "context" "errors" "os" + "strings" "time" "bisonai.com/miko/node/pkg/common/types" @@ -28,25 +29,25 @@ func Run(ctx context.Context) error { keyCache := keycache.NewAPIKeyCache(1 * time.Hour) keyCache.CleanupLoop(10 * time.Minute) - adminEndpoint := os.Getenv("ORAKL_NODE_ADMIN_URL") - if adminEndpoint == "" { - return errors.New("ORAKL_NODE_ADMIN_URL is not set") + chain := os.Getenv("CHAIN") + if chain == "" { + return errors.New("CHAIN is not set") } - configs, err := fetchConfigs(ctx, adminEndpoint) + symbols, err := fetchSymbols(chain) if err != nil { - log.Error().Err(err).Msg("Failed to fetch configs") + log.Error().Err(err).Msg("Failed to fetch symbols") return err } - collector, err := collector.NewCollector(ctx, configs) + collector, err := collector.NewCollector(ctx, symbols) if err != nil { log.Error().Err(err).Msg("Failed to setup collector") return err } collector.Start(ctx) - hub := hub.HubSetup(ctx, configs) + hub := hub.HubSetup(ctx, symbols) go hub.Start(ctx, collector) err = apiv2.Start(ctx, apiv2.WithCollector(collector), apiv2.WithHub(hub), apiv2.WithKeyCache(keyCache), apiv2.WithStatsApp(statsApp)) @@ -58,6 +59,23 @@ func Run(ctx context.Context) error { return nil } -func fetchConfigs(ctx context.Context, endpoint string) ([]Config, error) { - return request.Request[[]Config](request.WithEndpoint(endpoint + "/config")) +func fetchSymbols(chain string) ([]string, error) { + type ConfigEntry struct { + Name string `json:"name"` + } + + url := "https://config.orakl.network/" + strings.ToLower(chain) + "_configs.json" + + results, err := request.Request[[]ConfigEntry](request.WithEndpoint(url), request.WithTimeout(5*time.Second)) + if err != nil { + return nil, err + } + + var symbols []string + + for _, result := range results { + symbols = append(symbols, result.Name) + } + + return symbols, nil } diff --git a/node/pkg/dal/collector/collector.go b/node/pkg/dal/collector/collector.go index ce2acbb09..002e16f23 100644 --- a/node/pkg/dal/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -31,10 +31,10 @@ const ( type Config = types.Config type Collector struct { - OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData - Symbols map[int32]string - FeedHashes map[int32][]byte - LatestTimestamps map[int32]time.Time + OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData + + FeedHashes map[string][]byte + LatestTimestamps map[string]time.Time LatestData map[string]*dalcommon.OutgoingSubmissionData CachedWhitelist []klaytncommon.Address @@ -50,7 +50,7 @@ type Collector struct { mu sync.RWMutex } -func NewCollector(ctx context.Context, configs []Config) (*Collector, error) { +func NewCollector(ctx context.Context, symbols []string) (*Collector, error) { kaiaWebsocketUrl := os.Getenv("KAIA_WEBSOCKET_URL") if kaiaWebsocketUrl == "" { return nil, errors.New("KAIA_WEBSOCKET_URL is not set") @@ -92,10 +92,9 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) { } collector := &Collector{ - OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(configs)), - Symbols: make(map[int32]string, len(configs)), - FeedHashes: make(map[int32][]byte, len(configs)), - LatestTimestamps: make(map[int32]time.Time), + OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(symbols)), + FeedHashes: make(map[string][]byte, len(symbols)), + LatestTimestamps: make(map[string]time.Time), LatestData: make(map[string]*dalcommon.OutgoingSubmissionData), chainReader: chainReader, CachedWhitelist: initialWhitelist, @@ -103,11 +102,10 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) { } redisTopics := []string{} - for _, config := range configs { - collector.OutgoingStream[config.Name] = make(chan *dalcommon.OutgoingSubmissionData, 1000) - collector.Symbols[config.ID] = config.Name - collector.FeedHashes[config.ID] = crypto.Keccak256([]byte(config.Name)) - redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.Name)) + for _, symbol := range symbols { + collector.OutgoingStream[symbol] = make(chan *dalcommon.OutgoingSubmissionData, 1000) + collector.FeedHashes[symbol] = crypto.Keccak256([]byte(symbol)) + redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(symbol)) } baseRediscribe, err := db.NewRediscribe( @@ -162,7 +160,7 @@ func (c *Collector) GetLatestData(symbol string) (*dalcommon.OutgoingSubmissionD func (c *Collector) GetAllLatestData() []dalcommon.OutgoingSubmissionData { c.mu.RLock() defer c.mu.RUnlock() - result := make([]dalcommon.OutgoingSubmissionData, 0, len(c.Symbols)) + result := make([]dalcommon.OutgoingSubmissionData, 0, len(c.FeedHashes)) for _, value := range c.LatestData { result = append(result, *value) } @@ -202,9 +200,9 @@ func (c *Collector) compareAndSwapLatestTimestamp(data *aggregator.SubmissionDat c.mu.Lock() defer c.mu.Unlock() - old, ok := c.LatestTimestamps[data.GlobalAggregate.ConfigID] + old, ok := c.LatestTimestamps[data.Symbol] if !ok || data.GlobalAggregate.Timestamp.After(old) { - c.LatestTimestamps[data.GlobalAggregate.ConfigID] = data.GlobalAggregate.Timestamp + c.LatestTimestamps[data.Symbol] = data.GlobalAggregate.Timestamp return true } @@ -218,7 +216,7 @@ func (c *Collector) processIncomingData(ctx context.Context, data *aggregator.Su default: valid := c.compareAndSwapLatestTimestamp(data) if !valid { - log.Debug().Str("Player", "DalCollector").Str("Symbol", c.Symbols[data.GlobalAggregate.ConfigID]).Msg("old data recieved") + log.Debug().Str("Player", "DalCollector").Str("Symbol", data.Symbol).Msg("old data recieved") return } @@ -246,10 +244,10 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data *aggreg data.Proof.Proof, data.GlobalAggregate.Value, data.GlobalAggregate.Timestamp, - c.Symbols[data.GlobalAggregate.ConfigID], + data.Symbol, whitelist) if err != nil { - log.Error().Err(err).Str("Player", "DalCollector").Str("Symbol", c.Symbols[data.GlobalAggregate.ConfigID]).Msg("failed to order proof") + log.Error().Err(err).Str("Player", "DalCollector").Str("Symbol", data.Symbol).Msg("failed to order proof") if errors.Is(err, errorsentinel.ErrDalSignerNotWhitelisted) { go func(ctx context.Context, chainHelper *websocketchainreader.ChainReader, contractAddress string) { newList, getAllOraclesErr := getAllOracles(ctx, chainHelper, contractAddress) @@ -265,11 +263,11 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data *aggreg return nil, err } return &dalcommon.OutgoingSubmissionData{ - Symbol: c.Symbols[data.GlobalAggregate.ConfigID], + Symbol: data.Symbol, Value: strconv.FormatInt(data.GlobalAggregate.Value, 10), AggregateTime: strconv.FormatInt(data.GlobalAggregate.Timestamp.UnixMilli(), 10), Proof: formatBytesToHex(orderedProof), - FeedHash: formatBytesToHex(c.FeedHashes[data.GlobalAggregate.ConfigID]), + FeedHash: formatBytesToHex(c.FeedHashes[data.Symbol]), Decimals: DefaultDecimals, }, nil } diff --git a/node/pkg/dal/hub/hub.go b/node/pkg/dal/hub/hub.go index 1b5f69f52..d9864c815 100644 --- a/node/pkg/dal/hub/hub.go +++ b/node/pkg/dal/hub/hub.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "bisonai.com/miko/node/pkg/common/types" "bisonai.com/miko/node/pkg/dal/collector" dalcommon "bisonai.com/miko/node/pkg/dal/common" "bisonai.com/miko/node/pkg/dal/utils/stats" @@ -15,15 +14,13 @@ import ( "nhooyr.io/websocket/wsjson" ) -type Config = types.Config - type Subscription struct { Method string `json:"method"` Params []string `json:"params"` } type Hub struct { - Configs map[string]Config + Symbols map[string]any Clients map[*websocket.Conn]map[string]any Register chan *websocket.Conn Unregister chan *websocket.Conn @@ -36,19 +33,19 @@ const ( CleanupInterval = time.Hour ) -func HubSetup(ctx context.Context, configs []Config) *Hub { - configMap := make(map[string]Config) - for _, config := range configs { - configMap[config.Name] = config +func HubSetup(ctx context.Context, symbols []string) *Hub { + symbolsMap := make(map[string]any) + for _, symbol := range symbols { + symbolsMap[symbol] = struct{}{} } - hub := NewHub(configMap) + hub := NewHub(symbolsMap) return hub } -func NewHub(configs map[string]Config) *Hub { +func NewHub(symbols map[string]any) *Hub { return &Hub{ - Configs: configs, + Symbols: symbols, Clients: make(map[*websocket.Conn]map[string]any), Register: make(chan *websocket.Conn), Unregister: make(chan *websocket.Conn), @@ -61,7 +58,7 @@ func (h *Hub) Start(ctx context.Context, collector *collector.Collector) { h.initializeBroadcastChannels(collector) - for symbol := range h.Configs { + for symbol := range h.Symbols { go h.broadcastDataForSymbol(ctx, symbol) } @@ -80,7 +77,7 @@ func (h *Hub) HandleSubscription(ctx context.Context, client *websocket.Conn, ms valid := []string{} for _, param := range msg.Params { symbol := strings.TrimPrefix(param, "submission@") - if _, ok := h.Configs[symbol]; !ok { + if _, ok := h.Symbols[symbol]; !ok { continue } subscriptions[symbol] = struct{}{} diff --git a/node/pkg/dal/tests/collector_test.go b/node/pkg/dal/tests/collector_test.go index 157ee77cf..5eb03f8e2 100644 --- a/node/pkg/dal/tests/collector_test.go +++ b/node/pkg/dal/tests/collector_test.go @@ -31,7 +31,7 @@ func TestCollectorStartAndStop(t *testing.T) { collector := testItems.Collector assert.True(t, collector.IsRunning) - assert.Greater(t, len(collector.Symbols), 0) + assert.Greater(t, len(collector.FeedHashes), 0) collector.Stop() assert.False(t, collector.IsRunning) } @@ -51,7 +51,7 @@ func TestCollectorStream(t *testing.T) { time.Sleep(20 * time.Millisecond) collector := testItems.Collector - assert.Greater(t, len(collector.Symbols), 0) + assert.Greater(t, len(collector.FeedHashes), 0) assert.True(t, collector.IsRunning) headers := map[string]string{"X-API-Key": testItems.ApiKey} diff --git a/node/pkg/dal/tests/main_test.go b/node/pkg/dal/tests/main_test.go index 1cece9b87..03ad720e1 100644 --- a/node/pkg/dal/tests/main_test.go +++ b/node/pkg/dal/tests/main_test.go @@ -100,19 +100,19 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { SubmitInterval: 15000, } - configs := []Config{testItems.TmpConfig} + symbols := []string{testItems.TmpConfig.Name} keyCache := keycache.NewAPIKeyCache(1 * time.Hour) keyCache.CleanupLoop(10 * time.Minute) - collector, err := collector.NewCollector(ctx, configs) + collector, err := collector.NewCollector(ctx, symbols) if err != nil { log.Error().Err(err).Msg("Failed to setup DAL API server") return nil, nil, err } collector.Start(ctx) - hub := hub.HubSetup(ctx, configs) + hub := hub.HubSetup(ctx, symbols) go hub.Start(ctx, collector) statsApp := stats.NewStatsApp(ctx, stats.WithBulkLogsCopyInterval(1*time.Second)) diff --git a/node/pkg/reporter/app_test.go b/node/pkg/reporter/app_test.go index 7617c67ba..3a98b3249 100644 --- a/node/pkg/reporter/app_test.go +++ b/node/pkg/reporter/app_test.go @@ -33,7 +33,7 @@ func TestWsDataHandling(t *testing.T) { app := New() - conn, tmpConfig, configs, err := mockDalWsServer(ctx) + conn, tmpConfig, symbols, err := mockDalWsServer(ctx) if err != nil { t.Fatalf("error mocking dal ws server: %v", err) } @@ -76,17 +76,17 @@ func TestWsDataHandling(t *testing.T) { case <-ticker.C: if app.WsHelper != nil && app.WsHelper.IsRunning { submissionDataCount = 0 - for _, config := range configs { - if _, ok := app.LatestDataMap.Load(config.Name); ok { + for _, symbol := range symbols { + if _, ok := app.LatestDataMap.Load(symbol); ok { submissionDataCount++ } } - if submissionDataCount == len(configs) { + if submissionDataCount == len(symbols) { return } } case <-timeout: - if submissionDataCount != len(configs) { + if submissionDataCount != len(symbols) { t.Fatal("not all submission data received from websocket") } } diff --git a/node/pkg/reporter/main_test.go b/node/pkg/reporter/main_test.go index 41bd8888e..1b4a13cc6 100644 --- a/node/pkg/reporter/main_test.go +++ b/node/pkg/reporter/main_test.go @@ -61,7 +61,7 @@ func generateSampleSubmissionData(configId int32, value int64, timestamp time.Ti }, nil } -func mockDalWsServer(ctx context.Context) (*wss.WebsocketHelper, *types.Config, []types.Config, error) { +func mockDalWsServer(ctx context.Context) (*wss.WebsocketHelper, *types.Config, []string, error) { apiKey := "testApiKey" err := db.QueryWithoutResult( ctx, @@ -80,18 +80,18 @@ func mockDalWsServer(ctx context.Context) (*wss.WebsocketHelper, *types.Config, SubmitInterval: 15000, } - configs := []types.Config{tmpConfig} + symbols := []string{tmpConfig.Name} keyCache := keycache.NewAPIKeyCache(1 * time.Hour) keyCache.CleanupLoop(10 * time.Minute) - collector, err := collector.NewCollector(ctx, configs) + collector, err := collector.NewCollector(ctx, symbols) if err != nil { return nil, nil, nil, err } collector.Start(ctx) - h := hub.HubSetup(ctx, configs) + h := hub.HubSetup(ctx, symbols) go h.Start(ctx, collector) statsApp := stats.NewStatsApp(ctx, stats.WithBulkLogsCopyInterval(1*time.Second)) @@ -108,5 +108,5 @@ func mockDalWsServer(ctx context.Context) (*wss.WebsocketHelper, *types.Config, return nil, nil, nil, err } - return conn, &tmpConfig, configs, nil + return conn, &tmpConfig, symbols, nil }