Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Sep 27, 2024
1 parent fd3d9ae commit 9210b51
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 61 deletions.
1 change: 1 addition & 0 deletions node/pkg/aggregator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Proof = types.Proof
type GlobalAggregate = types.GlobalAggregate

type SubmissionData struct {
Symbol string
GlobalAggregate GlobalAggregate
Proof Proof
}
Expand Down
1 change: 1 addition & 0 deletions node/pkg/aggregator/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func PublishGlobalAggregateAndProof(ctx context.Context, name string, globalAggr
return nil
}
data := SubmissionData{
Symbol: name,
GlobalAggregate: globalAggregate,
Proof: proof,
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/dal/apiv2/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func (s *ServerV2) HealthCheckHandler(w http.ResponseWriter, r *http.Request) {
}

func (s *ServerV2) SymbolsHandler(w http.ResponseWriter, r *http.Request) {
result := make([]string, 0, len(s.hub.Configs))
for key := range s.hub.Configs {
result := make([]string, 0, len(s.hub.Symbols))
for key := range s.hub.Symbols {
result = append(result, key)
}

Expand Down
36 changes: 27 additions & 9 deletions node/pkg/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"os"
"strings"
"time"

"bisonai.com/miko/node/pkg/common/types"
Expand All @@ -28,25 +29,25 @@ func Run(ctx context.Context) error {
keyCache := keycache.NewAPIKeyCache(1 * time.Hour)
keyCache.CleanupLoop(10 * time.Minute)

adminEndpoint := os.Getenv("ORAKL_NODE_ADMIN_URL")
if adminEndpoint == "" {
return errors.New("ORAKL_NODE_ADMIN_URL is not set")
chain := os.Getenv("CHAIN")
if chain == "" {
return errors.New("CHAIN is not set")
}

configs, err := fetchConfigs(ctx, adminEndpoint)
symbols, err := fetchSymbols(chain)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch configs")
log.Error().Err(err).Msg("Failed to fetch symbols")
return err
}

collector, err := collector.NewCollector(ctx, configs)
collector, err := collector.NewCollector(ctx, symbols)
if err != nil {
log.Error().Err(err).Msg("Failed to setup collector")
return err
}
collector.Start(ctx)

hub := hub.HubSetup(ctx, configs)
hub := hub.HubSetup(ctx, symbols)
go hub.Start(ctx, collector)

err = apiv2.Start(ctx, apiv2.WithCollector(collector), apiv2.WithHub(hub), apiv2.WithKeyCache(keyCache), apiv2.WithStatsApp(statsApp))
Expand All @@ -58,6 +59,23 @@ func Run(ctx context.Context) error {
return nil
}

func fetchConfigs(ctx context.Context, endpoint string) ([]Config, error) {
return request.Request[[]Config](request.WithEndpoint(endpoint + "/config"))
func fetchSymbols(chain string) ([]string, error) {
type ConfigEntry struct {
Name string `json:"name"`
}

url := "https://config.orakl.network/" + strings.ToLower(chain) + "_configs.json"

results, err := request.Request[[]ConfigEntry](request.WithEndpoint(url), request.WithTimeout(5*time.Second))
if err != nil {
return nil, err
}

var symbols []string

for _, result := range results {
symbols = append(symbols, result.Name)
}

return symbols, nil
}
42 changes: 20 additions & 22 deletions node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ const (
type Config = types.Config

type Collector struct {
OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData
Symbols map[int32]string
FeedHashes map[int32][]byte
LatestTimestamps map[int32]time.Time
OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData

FeedHashes map[string][]byte
LatestTimestamps map[string]time.Time
LatestData map[string]*dalcommon.OutgoingSubmissionData
CachedWhitelist []klaytncommon.Address

Expand All @@ -50,7 +50,7 @@ type Collector struct {
mu sync.RWMutex
}

func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {
func NewCollector(ctx context.Context, symbols []string) (*Collector, error) {
kaiaWebsocketUrl := os.Getenv("KAIA_WEBSOCKET_URL")
if kaiaWebsocketUrl == "" {
return nil, errors.New("KAIA_WEBSOCKET_URL is not set")
Expand Down Expand Up @@ -92,22 +92,20 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {
}

collector := &Collector{
OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(configs)),
Symbols: make(map[int32]string, len(configs)),
FeedHashes: make(map[int32][]byte, len(configs)),
LatestTimestamps: make(map[int32]time.Time),
OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(symbols)),
FeedHashes: make(map[string][]byte, len(symbols)),
LatestTimestamps: make(map[string]time.Time),
LatestData: make(map[string]*dalcommon.OutgoingSubmissionData),
chainReader: chainReader,
CachedWhitelist: initialWhitelist,
submissionProxyContractAddr: submissionProxyContractAddr,
}

redisTopics := []string{}
for _, config := range configs {
collector.OutgoingStream[config.Name] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.Symbols[config.ID] = config.Name
collector.FeedHashes[config.ID] = crypto.Keccak256([]byte(config.Name))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.Name))
for _, symbol := range symbols {
collector.OutgoingStream[symbol] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.FeedHashes[symbol] = crypto.Keccak256([]byte(symbol))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(symbol))
}

baseRediscribe, err := db.NewRediscribe(
Expand Down Expand Up @@ -162,7 +160,7 @@ func (c *Collector) GetLatestData(symbol string) (*dalcommon.OutgoingSubmissionD
func (c *Collector) GetAllLatestData() []dalcommon.OutgoingSubmissionData {
c.mu.RLock()
defer c.mu.RUnlock()
result := make([]dalcommon.OutgoingSubmissionData, 0, len(c.Symbols))
result := make([]dalcommon.OutgoingSubmissionData, 0, len(c.FeedHashes))
for _, value := range c.LatestData {
result = append(result, *value)
}
Expand Down Expand Up @@ -202,9 +200,9 @@ func (c *Collector) compareAndSwapLatestTimestamp(data *aggregator.SubmissionDat
c.mu.Lock()
defer c.mu.Unlock()

old, ok := c.LatestTimestamps[data.GlobalAggregate.ConfigID]
old, ok := c.LatestTimestamps[data.Symbol]
if !ok || data.GlobalAggregate.Timestamp.After(old) {
c.LatestTimestamps[data.GlobalAggregate.ConfigID] = data.GlobalAggregate.Timestamp
c.LatestTimestamps[data.Symbol] = data.GlobalAggregate.Timestamp
return true
}

Expand All @@ -218,7 +216,7 @@ func (c *Collector) processIncomingData(ctx context.Context, data *aggregator.Su
default:
valid := c.compareAndSwapLatestTimestamp(data)
if !valid {
log.Debug().Str("Player", "DalCollector").Str("Symbol", c.Symbols[data.GlobalAggregate.ConfigID]).Msg("old data recieved")
log.Debug().Str("Player", "DalCollector").Str("Symbol", data.Symbol).Msg("old data recieved")
return
}

Expand Down Expand Up @@ -246,10 +244,10 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data *aggreg
data.Proof.Proof,
data.GlobalAggregate.Value,
data.GlobalAggregate.Timestamp,
c.Symbols[data.GlobalAggregate.ConfigID],
data.Symbol,
whitelist)
if err != nil {
log.Error().Err(err).Str("Player", "DalCollector").Str("Symbol", c.Symbols[data.GlobalAggregate.ConfigID]).Msg("failed to order proof")
log.Error().Err(err).Str("Player", "DalCollector").Str("Symbol", data.Symbol).Msg("failed to order proof")
if errors.Is(err, errorsentinel.ErrDalSignerNotWhitelisted) {
go func(ctx context.Context, chainHelper *websocketchainreader.ChainReader, contractAddress string) {
newList, getAllOraclesErr := getAllOracles(ctx, chainHelper, contractAddress)
Expand All @@ -265,11 +263,11 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data *aggreg
return nil, err
}
return &dalcommon.OutgoingSubmissionData{
Symbol: c.Symbols[data.GlobalAggregate.ConfigID],
Symbol: data.Symbol,
Value: strconv.FormatInt(data.GlobalAggregate.Value, 10),
AggregateTime: strconv.FormatInt(data.GlobalAggregate.Timestamp.UnixMilli(), 10),
Proof: formatBytesToHex(orderedProof),
FeedHash: formatBytesToHex(c.FeedHashes[data.GlobalAggregate.ConfigID]),
FeedHash: formatBytesToHex(c.FeedHashes[data.Symbol]),
Decimals: DefaultDecimals,
}, nil
}
Expand Down
23 changes: 10 additions & 13 deletions node/pkg/dal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"
"time"

"bisonai.com/miko/node/pkg/common/types"
"bisonai.com/miko/node/pkg/dal/collector"
dalcommon "bisonai.com/miko/node/pkg/dal/common"
"bisonai.com/miko/node/pkg/dal/utils/stats"
Expand All @@ -15,15 +14,13 @@ import (
"nhooyr.io/websocket/wsjson"
)

type Config = types.Config

type Subscription struct {
Method string `json:"method"`
Params []string `json:"params"`
}

type Hub struct {
Configs map[string]Config
Symbols map[string]any
Clients map[*websocket.Conn]map[string]any
Register chan *websocket.Conn
Unregister chan *websocket.Conn
Expand All @@ -36,19 +33,19 @@ const (
CleanupInterval = time.Hour
)

func HubSetup(ctx context.Context, configs []Config) *Hub {
configMap := make(map[string]Config)
for _, config := range configs {
configMap[config.Name] = config
func HubSetup(ctx context.Context, symbols []string) *Hub {
symbolsMap := make(map[string]any)
for _, symbol := range symbols {
symbolsMap[symbol] = struct{}{}
}

hub := NewHub(configMap)
hub := NewHub(symbolsMap)
return hub
}

func NewHub(configs map[string]Config) *Hub {
func NewHub(symbols map[string]any) *Hub {
return &Hub{
Configs: configs,
Symbols: symbols,
Clients: make(map[*websocket.Conn]map[string]any),
Register: make(chan *websocket.Conn),
Unregister: make(chan *websocket.Conn),
Expand All @@ -61,7 +58,7 @@ func (h *Hub) Start(ctx context.Context, collector *collector.Collector) {

h.initializeBroadcastChannels(collector)

for symbol := range h.Configs {
for symbol := range h.Symbols {
go h.broadcastDataForSymbol(ctx, symbol)
}

Expand All @@ -80,7 +77,7 @@ func (h *Hub) HandleSubscription(ctx context.Context, client *websocket.Conn, ms
valid := []string{}
for _, param := range msg.Params {
symbol := strings.TrimPrefix(param, "submission@")
if _, ok := h.Configs[symbol]; !ok {
if _, ok := h.Symbols[symbol]; !ok {
continue
}
subscriptions[symbol] = struct{}{}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/dal/tests/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCollectorStartAndStop(t *testing.T) {
collector := testItems.Collector
assert.True(t, collector.IsRunning)

assert.Greater(t, len(collector.Symbols), 0)
assert.Greater(t, len(collector.FeedHashes), 0)
collector.Stop()
assert.False(t, collector.IsRunning)
}
Expand All @@ -51,7 +51,7 @@ func TestCollectorStream(t *testing.T) {
time.Sleep(20 * time.Millisecond)

collector := testItems.Collector
assert.Greater(t, len(collector.Symbols), 0)
assert.Greater(t, len(collector.FeedHashes), 0)
assert.True(t, collector.IsRunning)

headers := map[string]string{"X-API-Key": testItems.ApiKey}
Expand Down
6 changes: 3 additions & 3 deletions node/pkg/dal/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,19 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {
SubmitInterval: 15000,
}

configs := []Config{testItems.TmpConfig}
symbols := []string{testItems.TmpConfig.Name}

keyCache := keycache.NewAPIKeyCache(1 * time.Hour)
keyCache.CleanupLoop(10 * time.Minute)

collector, err := collector.NewCollector(ctx, configs)
collector, err := collector.NewCollector(ctx, symbols)
if err != nil {
log.Error().Err(err).Msg("Failed to setup DAL API server")
return nil, nil, err
}
collector.Start(ctx)

hub := hub.HubSetup(ctx, configs)
hub := hub.HubSetup(ctx, symbols)
go hub.Start(ctx, collector)

statsApp := stats.NewStatsApp(ctx, stats.WithBulkLogsCopyInterval(1*time.Second))
Expand Down
10 changes: 5 additions & 5 deletions node/pkg/reporter/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestWsDataHandling(t *testing.T) {

app := New()

conn, tmpConfig, configs, err := mockDalWsServer(ctx)
conn, tmpConfig, symbols, err := mockDalWsServer(ctx)
if err != nil {
t.Fatalf("error mocking dal ws server: %v", err)
}
Expand Down Expand Up @@ -76,17 +76,17 @@ func TestWsDataHandling(t *testing.T) {
case <-ticker.C:
if app.WsHelper != nil && app.WsHelper.IsRunning {
submissionDataCount = 0
for _, config := range configs {
if _, ok := app.LatestDataMap.Load(config.Name); ok {
for _, symbol := range symbols {
if _, ok := app.LatestDataMap.Load(symbol); ok {
submissionDataCount++
}
}
if submissionDataCount == len(configs) {
if submissionDataCount == len(symbols) {
return
}
}
case <-timeout:
if submissionDataCount != len(configs) {
if submissionDataCount != len(symbols) {
t.Fatal("not all submission data received from websocket")
}
}
Expand Down
10 changes: 5 additions & 5 deletions node/pkg/reporter/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func generateSampleSubmissionData(configId int32, value int64, timestamp time.Ti
}, nil
}

func mockDalWsServer(ctx context.Context) (*wss.WebsocketHelper, *types.Config, []types.Config, error) {
func mockDalWsServer(ctx context.Context) (*wss.WebsocketHelper, *types.Config, []string, error) {
apiKey := "testApiKey"
err := db.QueryWithoutResult(
ctx,
Expand All @@ -80,18 +80,18 @@ func mockDalWsServer(ctx context.Context) (*wss.WebsocketHelper, *types.Config,
SubmitInterval: 15000,
}

configs := []types.Config{tmpConfig}
symbols := []string{tmpConfig.Name}

keyCache := keycache.NewAPIKeyCache(1 * time.Hour)
keyCache.CleanupLoop(10 * time.Minute)

collector, err := collector.NewCollector(ctx, configs)
collector, err := collector.NewCollector(ctx, symbols)
if err != nil {
return nil, nil, nil, err
}
collector.Start(ctx)

h := hub.HubSetup(ctx, configs)
h := hub.HubSetup(ctx, symbols)
go h.Start(ctx, collector)

statsApp := stats.NewStatsApp(ctx, stats.WithBulkLogsCopyInterval(1*time.Second))
Expand All @@ -108,5 +108,5 @@ func mockDalWsServer(ctx context.Context) (*wss.WebsocketHelper, *types.Config,
return nil, nil, nil, err
}

return conn, &tmpConfig, configs, nil
return conn, &tmpConfig, symbols, nil
}

0 comments on commit 9210b51

Please sign in to comment.