Skip to content

Commit

Permalink
fix: update based on feedbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Sep 28, 2024
1 parent b947993 commit fbaab53
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 20 deletions.
20 changes: 13 additions & 7 deletions node/pkg/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package dal

import (
"context"
"errors"
"fmt"
"os"
"strings"
"time"

"bisonai.com/miko/node/pkg/common/types"
Expand All @@ -13,13 +12,16 @@ import (
"bisonai.com/miko/node/pkg/dal/hub"
"bisonai.com/miko/node/pkg/dal/utils/keycache"
"bisonai.com/miko/node/pkg/dal/utils/stats"
errorsentinel "bisonai.com/miko/node/pkg/error"
"bisonai.com/miko/node/pkg/utils/request"

"github.com/rs/zerolog/log"
)

type Config = types.Config

const baseMikoConfigUrl = "https://config.orakl.network/%s_configs.json"

func Run(ctx context.Context) error {
log.Debug().Msg("Starting DAL API server")

Expand All @@ -31,7 +33,8 @@ func Run(ctx context.Context) error {

chain := os.Getenv("CHAIN")
if chain == "" {
return errors.New("CHAIN is not set")
log.Error().Msg("CHAIN environment variable not set")
return errorsentinel.ErrDalChainEnvNotFound
}

symbols, err := fetchSymbols(chain)
Expand Down Expand Up @@ -64,15 +67,18 @@ func fetchSymbols(chain string) ([]string, error) {
Name string `json:"name"`
}

url := "https://config.orakl.network/" + strings.ToLower(chain) + "_configs.json"

results, err := request.Request[[]ConfigEntry](request.WithEndpoint(url), request.WithTimeout(5*time.Second))
results, err := request.Request[[]ConfigEntry](
request.WithEndpoint(fmt.Sprintf(baseMikoConfigUrl, chain)),
request.WithTimeout(5*time.Second))
if err != nil {
return nil, err
}

var symbols []string
if len(results) == 0 {
return nil, errorsentinel.ErrDalSymbolsNotFound
}

var symbols []string
for _, result := range results {
symbols = append(symbols, result.Name)
}
Expand Down
8 changes: 7 additions & 1 deletion node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data *aggreg
c.mu.RLock()
whitelist := c.CachedWhitelist
c.mu.RUnlock()

feedHashBytes, ok := c.FeedHashes[data.Symbol]
if !ok {
return nil, errorsentinel.ErrDalFeedHashNotFound
}

orderedProof, err := orderProof(
ctx,
data.Proof.Proof,
Expand Down Expand Up @@ -267,7 +273,7 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data *aggreg
Value: strconv.FormatInt(data.GlobalAggregate.Value, 10),
AggregateTime: strconv.FormatInt(data.GlobalAggregate.Timestamp.UnixMilli(), 10),
Proof: formatBytesToHex(orderedProof),
FeedHash: formatBytesToHex(c.FeedHashes[data.Symbol]),
FeedHash: formatBytesToHex(feedHashBytes),
Decimals: DefaultDecimals,
}, nil
}
Expand Down
25 changes: 13 additions & 12 deletions node/pkg/dal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Subscription struct {
}

type Hub struct {
Symbols map[string]any
Clients map[*websocket.Conn]map[string]any
Symbols map[string]struct{}
Clients map[*websocket.Conn]map[string]struct{}
Register chan *websocket.Conn
Unregister chan *websocket.Conn
broadcast map[string]chan *dalcommon.OutgoingSubmissionData
Expand All @@ -34,7 +34,7 @@ const (
)

func HubSetup(ctx context.Context, symbols []string) *Hub {
symbolsMap := make(map[string]any)
symbolsMap := make(map[string]struct{})
for _, symbol := range symbols {
symbolsMap[symbol] = struct{}{}
}
Expand All @@ -43,10 +43,10 @@ func HubSetup(ctx context.Context, symbols []string) *Hub {
return hub
}

func NewHub(symbols map[string]any) *Hub {
func NewHub(symbols map[string]struct{}) *Hub {
return &Hub{
Symbols: symbols,
Clients: make(map[*websocket.Conn]map[string]any),
Clients: make(map[*websocket.Conn]map[string]struct{}),
Register: make(chan *websocket.Conn),
Unregister: make(chan *websocket.Conn),
broadcast: make(map[string]chan *dalcommon.OutgoingSubmissionData),
Expand All @@ -59,7 +59,8 @@ func (h *Hub) Start(ctx context.Context, collector *collector.Collector) {
h.initializeBroadcastChannels(collector)

for symbol := range h.Symbols {
go h.broadcastDataForSymbol(ctx, symbol)
sym := symbol // Capture loop variable to avoid potential race condition
go h.broadcastDataForSymbol(ctx, sym)
}

go h.cleanupJob(ctx)
Expand All @@ -71,7 +72,7 @@ func (h *Hub) HandleSubscription(ctx context.Context, client *websocket.Conn, ms

subscriptions, ok := h.Clients[client]
if !ok {
subscriptions = map[string]any{}
subscriptions = map[string]struct{}{}
}

valid := []string{}
Expand Down Expand Up @@ -113,7 +114,7 @@ func (h *Hub) addClient(client *websocket.Conn) {
if _, ok := h.Clients[client]; ok {
return
}
h.Clients[client] = make(map[string]any)
h.Clients[client] = make(map[string]struct{})
}

func (h *Hub) removeClient(client *websocket.Conn) {
Expand Down Expand Up @@ -143,18 +144,18 @@ func (h *Hub) initializeBroadcastChannels(collector *collector.Collector) {

func (h *Hub) broadcastDataForSymbol(ctx context.Context, symbol string) {
for data := range h.broadcast[symbol] {
go h.castSubmissionData(ctx, data, &symbol)
go h.castSubmissionData(ctx, data, symbol)
}
}

func (h *Hub) castSubmissionData(ctx context.Context, data *dalcommon.OutgoingSubmissionData, symbol *string) {
func (h *Hub) castSubmissionData(ctx context.Context, data *dalcommon.OutgoingSubmissionData, symbol string) {
var wg sync.WaitGroup

h.mu.RLock()
defer h.mu.RUnlock()

for client, subscriptions := range h.Clients {
if _, ok := subscriptions[*symbol]; ok {
if _, ok := subscriptions[symbol]; ok {
wg.Add(1)
go func(entry *websocket.Conn) {
defer wg.Done()
Expand Down Expand Up @@ -186,7 +187,7 @@ func (h *Hub) cleanup() {
h.mu.Lock()
defer h.mu.Unlock()

newClients := make(map[*websocket.Conn]map[string]any, len(h.Clients))
newClients := make(map[*websocket.Conn]map[string]struct{}, len(h.Clients))
for client, subscriptions := range h.Clients {
if len(subscriptions) > 0 {
newClients[client] = subscriptions
Expand Down
3 changes: 3 additions & 0 deletions node/pkg/error/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ var (
ErrDalCollectorNotFound = &CustomError{Service: Dal, Code: InternalError, Message: "Collector not found"}
ErrDalHubNotFound = &CustomError{Service: Dal, Code: InternalError, Message: "Hub not found"}
ErrDalKeyCacheNotFound = &CustomError{Service: Dal, Code: InternalError, Message: "Key cache not found"}
ErrDalFeedHashNotFound = &CustomError{Service: Dal, Code: InternalError, Message: "Feed hash not found"}
ErrDalSymbolsNotFound = &CustomError{Service: Dal, Code: InternalError, Message: "Symbols not found"}
ErrDalChainEnvNotFound = &CustomError{Service: Dal, Code: InternalError, Message: "Chain env not found"}

ErrReducerCastToFloatFail = &CustomError{Service: Others, Code: InternalError, Message: "Failed to cast to float"}
ErrReducerIndexCastToInterfaceFail = &CustomError{Service: Others, Code: InternalError, Message: "Failed to cast to interface from INDEX"}
Expand Down

0 comments on commit fbaab53

Please sign in to comment.