Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(DAL) Remove config id reference #2290

Merged
merged 3 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions node/pkg/aggregator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Proof = types.Proof
type GlobalAggregate = types.GlobalAggregate

type SubmissionData struct {
Symbol string
GlobalAggregate GlobalAggregate
Proof Proof
}
Expand Down
1 change: 1 addition & 0 deletions node/pkg/aggregator/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func PublishGlobalAggregateAndProof(ctx context.Context, name string, globalAggr
return nil
}
data := SubmissionData{
Symbol: name,
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
GlobalAggregate: globalAggregate,
Proof: proof,
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/dal/apiv2/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func (s *ServerV2) HealthCheckHandler(w http.ResponseWriter, r *http.Request) {
}

func (s *ServerV2) SymbolsHandler(w http.ResponseWriter, r *http.Request) {
result := make([]string, 0, len(s.hub.Configs))
for key := range s.hub.Configs {
result := make([]string, 0, len(s.hub.Symbols))
for key := range s.hub.Symbols {
result = append(result, key)
}

Expand Down
44 changes: 34 additions & 10 deletions node/pkg/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package dal

import (
"context"
"errors"
"fmt"
"os"
"time"

Expand All @@ -12,13 +12,16 @@ import (
"bisonai.com/miko/node/pkg/dal/hub"
"bisonai.com/miko/node/pkg/dal/utils/keycache"
"bisonai.com/miko/node/pkg/dal/utils/stats"
errorsentinel "bisonai.com/miko/node/pkg/error"
"bisonai.com/miko/node/pkg/utils/request"

"github.com/rs/zerolog/log"
)

type Config = types.Config

const baseMikoConfigUrl = "https://config.orakl.network/%s_configs.json"
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved

func Run(ctx context.Context) error {
log.Debug().Msg("Starting DAL API server")

Expand All @@ -28,25 +31,26 @@ func Run(ctx context.Context) error {
keyCache := keycache.NewAPIKeyCache(1 * time.Hour)
keyCache.CleanupLoop(10 * time.Minute)

adminEndpoint := os.Getenv("ORAKL_NODE_ADMIN_URL")
if adminEndpoint == "" {
return errors.New("ORAKL_NODE_ADMIN_URL is not set")
chain := os.Getenv("CHAIN")
if chain == "" {
log.Error().Msg("CHAIN environment variable not set")
return errorsentinel.ErrDalChainEnvNotFound
}

configs, err := fetchConfigs(ctx, adminEndpoint)
symbols, err := fetchSymbols(chain)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch configs")
log.Error().Err(err).Msg("Failed to fetch symbols")
return err
}

collector, err := collector.NewCollector(ctx, configs)
collector, err := collector.NewCollector(ctx, symbols)
if err != nil {
log.Error().Err(err).Msg("Failed to setup collector")
return err
}
collector.Start(ctx)

hub := hub.HubSetup(ctx, configs)
hub := hub.HubSetup(ctx, symbols)
go hub.Start(ctx, collector)

err = apiv2.Start(ctx, apiv2.WithCollector(collector), apiv2.WithHub(hub), apiv2.WithKeyCache(keyCache), apiv2.WithStatsApp(statsApp))
Expand All @@ -58,6 +62,26 @@ func Run(ctx context.Context) error {
return nil
}

func fetchConfigs(ctx context.Context, endpoint string) ([]Config, error) {
return request.Request[[]Config](request.WithEndpoint(endpoint + "/config"))
func fetchSymbols(chain string) ([]string, error) {
type ConfigEntry struct {
Name string `json:"name"`
}

results, err := request.Request[[]ConfigEntry](
request.WithEndpoint(fmt.Sprintf(baseMikoConfigUrl, chain)),
request.WithTimeout(5*time.Second))
if err != nil {
return nil, err
}

if len(results) == 0 {
return nil, errorsentinel.ErrDalSymbolsNotFound
}

var symbols []string
for _, result := range results {
symbols = append(symbols, result.Name)
}

return symbols, nil
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
}
48 changes: 26 additions & 22 deletions node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ const (
type Config = types.Config

type Collector struct {
OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData
Symbols map[int32]string
FeedHashes map[int32][]byte
LatestTimestamps map[int32]time.Time
OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData

FeedHashes map[string][]byte
LatestTimestamps map[string]time.Time
LatestData map[string]*dalcommon.OutgoingSubmissionData
CachedWhitelist []klaytncommon.Address

Expand All @@ -50,7 +50,7 @@ type Collector struct {
mu sync.RWMutex
}

func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {
func NewCollector(ctx context.Context, symbols []string) (*Collector, error) {
kaiaWebsocketUrl := os.Getenv("KAIA_WEBSOCKET_URL")
if kaiaWebsocketUrl == "" {
return nil, errors.New("KAIA_WEBSOCKET_URL is not set")
Expand Down Expand Up @@ -92,22 +92,20 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {
}

collector := &Collector{
OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(configs)),
Symbols: make(map[int32]string, len(configs)),
FeedHashes: make(map[int32][]byte, len(configs)),
LatestTimestamps: make(map[int32]time.Time),
OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(symbols)),
FeedHashes: make(map[string][]byte, len(symbols)),
LatestTimestamps: make(map[string]time.Time),
LatestData: make(map[string]*dalcommon.OutgoingSubmissionData),
chainReader: chainReader,
CachedWhitelist: initialWhitelist,
submissionProxyContractAddr: submissionProxyContractAddr,
}

redisTopics := []string{}
for _, config := range configs {
collector.OutgoingStream[config.Name] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.Symbols[config.ID] = config.Name
collector.FeedHashes[config.ID] = crypto.Keccak256([]byte(config.Name))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.Name))
for _, symbol := range symbols {
collector.OutgoingStream[symbol] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.FeedHashes[symbol] = crypto.Keccak256([]byte(symbol))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(symbol))
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
}

baseRediscribe, err := db.NewRediscribe(
Expand Down Expand Up @@ -162,7 +160,7 @@ func (c *Collector) GetLatestData(symbol string) (*dalcommon.OutgoingSubmissionD
func (c *Collector) GetAllLatestData() []dalcommon.OutgoingSubmissionData {
c.mu.RLock()
defer c.mu.RUnlock()
result := make([]dalcommon.OutgoingSubmissionData, 0, len(c.Symbols))
result := make([]dalcommon.OutgoingSubmissionData, 0, len(c.FeedHashes))
for _, value := range c.LatestData {
result = append(result, *value)
}
Expand Down Expand Up @@ -202,9 +200,9 @@ func (c *Collector) compareAndSwapLatestTimestamp(data *aggregator.SubmissionDat
c.mu.Lock()
defer c.mu.Unlock()

old, ok := c.LatestTimestamps[data.GlobalAggregate.ConfigID]
old, ok := c.LatestTimestamps[data.Symbol]
if !ok || data.GlobalAggregate.Timestamp.After(old) {
c.LatestTimestamps[data.GlobalAggregate.ConfigID] = data.GlobalAggregate.Timestamp
c.LatestTimestamps[data.Symbol] = data.GlobalAggregate.Timestamp
return true
}

Expand All @@ -218,7 +216,7 @@ func (c *Collector) processIncomingData(ctx context.Context, data *aggregator.Su
default:
valid := c.compareAndSwapLatestTimestamp(data)
if !valid {
log.Debug().Str("Player", "DalCollector").Str("Symbol", c.Symbols[data.GlobalAggregate.ConfigID]).Msg("old data recieved")
log.Debug().Str("Player", "DalCollector").Str("Symbol", data.Symbol).Msg("old data recieved")
return
}

Expand All @@ -241,15 +239,21 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data *aggreg
c.mu.RLock()
whitelist := c.CachedWhitelist
c.mu.RUnlock()

feedHashBytes, ok := c.FeedHashes[data.Symbol]
if !ok {
return nil, errorsentinel.ErrDalFeedHashNotFound
}

orderedProof, err := orderProof(
ctx,
data.Proof.Proof,
data.GlobalAggregate.Value,
data.GlobalAggregate.Timestamp,
c.Symbols[data.GlobalAggregate.ConfigID],
data.Symbol,
whitelist)
if err != nil {
log.Error().Err(err).Str("Player", "DalCollector").Str("Symbol", c.Symbols[data.GlobalAggregate.ConfigID]).Msg("failed to order proof")
log.Error().Err(err).Str("Player", "DalCollector").Str("Symbol", data.Symbol).Msg("failed to order proof")
if errors.Is(err, errorsentinel.ErrDalSignerNotWhitelisted) {
go func(ctx context.Context, chainHelper *websocketchainreader.ChainReader, contractAddress string) {
newList, getAllOraclesErr := getAllOracles(ctx, chainHelper, contractAddress)
Expand All @@ -265,11 +269,11 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data *aggreg
return nil, err
}
return &dalcommon.OutgoingSubmissionData{
Symbol: c.Symbols[data.GlobalAggregate.ConfigID],
Symbol: data.Symbol,
Value: strconv.FormatInt(data.GlobalAggregate.Value, 10),
AggregateTime: strconv.FormatInt(data.GlobalAggregate.Timestamp.UnixMilli(), 10),
Proof: formatBytesToHex(orderedProof),
FeedHash: formatBytesToHex(c.FeedHashes[data.GlobalAggregate.ConfigID]),
FeedHash: formatBytesToHex(feedHashBytes),
Decimals: DefaultDecimals,
}, nil
}
Expand Down
42 changes: 20 additions & 22 deletions node/pkg/dal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"
"time"

"bisonai.com/miko/node/pkg/common/types"
"bisonai.com/miko/node/pkg/dal/collector"
dalcommon "bisonai.com/miko/node/pkg/dal/common"
"bisonai.com/miko/node/pkg/dal/utils/stats"
Expand All @@ -15,16 +14,14 @@ import (
"nhooyr.io/websocket/wsjson"
)

type Config = types.Config

type Subscription struct {
Method string `json:"method"`
Params []string `json:"params"`
}

type Hub struct {
Configs map[string]Config
Clients map[*websocket.Conn]map[string]any
Symbols map[string]struct{}
Clients map[*websocket.Conn]map[string]struct{}
Register chan *websocket.Conn
Unregister chan *websocket.Conn
broadcast map[string]chan *dalcommon.OutgoingSubmissionData
Expand All @@ -36,20 +33,20 @@ const (
CleanupInterval = time.Hour
)

func HubSetup(ctx context.Context, configs []Config) *Hub {
configMap := make(map[string]Config)
for _, config := range configs {
configMap[config.Name] = config
func HubSetup(ctx context.Context, symbols []string) *Hub {
symbolsMap := make(map[string]struct{})
for _, symbol := range symbols {
symbolsMap[symbol] = struct{}{}
}

hub := NewHub(configMap)
hub := NewHub(symbolsMap)
return hub
}

func NewHub(configs map[string]Config) *Hub {
func NewHub(symbols map[string]struct{}) *Hub {
return &Hub{
Configs: configs,
Clients: make(map[*websocket.Conn]map[string]any),
Symbols: symbols,
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
Clients: make(map[*websocket.Conn]map[string]struct{}),
Register: make(chan *websocket.Conn),
Unregister: make(chan *websocket.Conn),
broadcast: make(map[string]chan *dalcommon.OutgoingSubmissionData),
Expand All @@ -61,8 +58,9 @@ func (h *Hub) Start(ctx context.Context, collector *collector.Collector) {

h.initializeBroadcastChannels(collector)

for symbol := range h.Configs {
go h.broadcastDataForSymbol(ctx, symbol)
for symbol := range h.Symbols {
sym := symbol // Capture loop variable to avoid potential race condition
go h.broadcastDataForSymbol(ctx, sym)
}

go h.cleanupJob(ctx)
Expand All @@ -74,13 +72,13 @@ func (h *Hub) HandleSubscription(ctx context.Context, client *websocket.Conn, ms

subscriptions, ok := h.Clients[client]
if !ok {
subscriptions = map[string]any{}
subscriptions = map[string]struct{}{}
}

valid := []string{}
for _, param := range msg.Params {
symbol := strings.TrimPrefix(param, "submission@")
if _, ok := h.Configs[symbol]; !ok {
if _, ok := h.Symbols[symbol]; !ok {
continue
}
subscriptions[symbol] = struct{}{}
Expand Down Expand Up @@ -116,7 +114,7 @@ func (h *Hub) addClient(client *websocket.Conn) {
if _, ok := h.Clients[client]; ok {
return
}
h.Clients[client] = make(map[string]any)
h.Clients[client] = make(map[string]struct{})
}

func (h *Hub) removeClient(client *websocket.Conn) {
Expand Down Expand Up @@ -146,18 +144,18 @@ func (h *Hub) initializeBroadcastChannels(collector *collector.Collector) {

func (h *Hub) broadcastDataForSymbol(ctx context.Context, symbol string) {
for data := range h.broadcast[symbol] {
go h.castSubmissionData(ctx, data, &symbol)
go h.castSubmissionData(ctx, data, symbol)
}
}

func (h *Hub) castSubmissionData(ctx context.Context, data *dalcommon.OutgoingSubmissionData, symbol *string) {
func (h *Hub) castSubmissionData(ctx context.Context, data *dalcommon.OutgoingSubmissionData, symbol string) {
var wg sync.WaitGroup

h.mu.RLock()
defer h.mu.RUnlock()

for client, subscriptions := range h.Clients {
if _, ok := subscriptions[*symbol]; ok {
if _, ok := subscriptions[symbol]; ok {
wg.Add(1)
go func(entry *websocket.Conn) {
defer wg.Done()
Expand Down Expand Up @@ -189,7 +187,7 @@ func (h *Hub) cleanup() {
h.mu.Lock()
defer h.mu.Unlock()

newClients := make(map[*websocket.Conn]map[string]any, len(h.Clients))
newClients := make(map[*websocket.Conn]map[string]struct{}, len(h.Clients))
for client, subscriptions := range h.Clients {
if len(subscriptions) > 0 {
newClients[client] = subscriptions
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/dal/tests/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCollectorStartAndStop(t *testing.T) {
collector := testItems.Collector
assert.True(t, collector.IsRunning)

assert.Greater(t, len(collector.Symbols), 0)
assert.Greater(t, len(collector.FeedHashes), 0)
collector.Stop()
assert.False(t, collector.IsRunning)
}
Expand All @@ -51,7 +51,7 @@ func TestCollectorStream(t *testing.T) {
time.Sleep(20 * time.Millisecond)

collector := testItems.Collector
assert.Greater(t, len(collector.Symbols), 0)
assert.Greater(t, len(collector.FeedHashes), 0)
assert.True(t, collector.IsRunning)

headers := map[string]string{"X-API-Key": testItems.ApiKey}
Expand Down
Loading