Skip to content

Commit

Permalink
[Separate Reporter Part 3] Websocket implementation (#1887)
Browse files Browse the repository at this point in the history
* initial dal fetch and submit logic

* fix lint issues

* replace const ticker time with submission interval

* initial implementation of dal websocket

* uncomment report delegated to pass linting

* websocket implemented

* remove manual dial from reporter app

* reflect feedback

* make reporting concurrent

* remove unnecessary comments
  • Loading branch information
Intizar-T committed Jul 24, 2024
1 parent 0a6dbe6 commit c432c8e
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 77 deletions.
1 change: 1 addition & 0 deletions node/pkg/error/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ var (
ErrReporterStop = &CustomError{Service: Reporter, Code: InternalError, Message: "Failed to stop reporters"}
ErrReporterValidateAggregateTimestampValues = &CustomError{Service: Reporter, Code: InternalError, Message: "Failed to validate aggregate timestamp values"}
ErrReporterDalApiKeyNotFound = &CustomError{Service: Reporter, Code: InternalError, Message: "DAL API key not found in reporter"}
ErrReporterDalWsDataProcessingFailed = &CustomError{Service: Reporter, Code: InternalError, Message: "Failed to process DAL WS data"}

ErrDalEmptyProofParam = &CustomError{Service: Dal, Code: InvalidInputError, Message: "Empty proof param"}
ErrDalInvalidProofLength = &CustomError{Service: Dal, Code: InvalidInputError, Message: "Invalid proof length"}
Expand Down
43 changes: 27 additions & 16 deletions node/pkg/reporter/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package reporter

import (
"context"
"fmt"
"os"
"time"

Expand Down Expand Up @@ -35,17 +34,17 @@ func (a *App) setReporters(ctx context.Context) error {
return errorSentinel.ErrReporterDalApiKeyNotFound
}

chain := os.Getenv("CHAIN")
if chain == "" {
log.Warn().Str("Player", "Reporter").Msg("chain not set, defaulting to baobab")
chain = "baobab"
dalWsEndpoint := os.Getenv("DAL_WS_URL")
if dalWsEndpoint == "" {
dalWsEndpoint = "ws://orakl-dal.orakl.svc.cluster.local/ws"
}

contractAddress := os.Getenv("SUBMISSION_PROXY_CONTRACT")
if contractAddress == "" {
return errorSentinel.ErrReporterSubmissionProxyContractNotFound
}

// TODO: probably can assign the chainHelper to app here and not reuse it in startReporters instead of closing
tmpChainHelper, err := helper.NewChainHelper(ctx)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to create chain helper")
Expand All @@ -65,6 +64,12 @@ func (a *App) setReporters(ctx context.Context) error {
return err
}

dalWsHelper, dalWsHelperErr := SetupDalWsHelper(ctx, configs, dalWsEndpoint, dalApiKey)
if dalWsHelperErr != nil {
return dalWsHelperErr
}
a.WsHelper = dalWsHelper

groupedConfigs := groupConfigsBySubmitIntervals(configs)
for groupInterval, configs := range groupedConfigs {
reporter, errNewReporter := NewReporter(
Expand All @@ -73,13 +78,12 @@ func (a *App) setReporters(ctx context.Context) error {
WithInterval(groupInterval),
WithContractAddress(contractAddress),
WithCachedWhitelist(cachedWhitelist),
WithDalEndpoint(fmt.Sprintf("https://dal.%s.orakl.network", chain)),
WithDalApiKey(dalApiKey),
)
if errNewReporter != nil {
log.Error().Str("Player", "Reporter").Err(errNewReporter).Msg("failed to set reporter")
return errNewReporter
}
reporter.LatestData = &a.LatestData
a.Reporters = append(a.Reporters, reporter)
}
if len(a.Reporters) == 0 {
Expand Down Expand Up @@ -108,6 +112,8 @@ func (a *App) setReporters(ctx context.Context) error {
func (a *App) startReporters(ctx context.Context) error {
var errs []string

go a.WsHelper.Run(ctx, a.handleWsMessage)

chainHelper, chainHelperErr := helper.NewChainHelper(ctx)
if chainHelperErr != nil {
return chainHelperErr
Expand All @@ -129,15 +135,6 @@ func (a *App) startReporters(ctx context.Context) error {
return nil
}

func (a *App) GetReporterWithInterval(interval int) (*Reporter, error) {
for _, reporter := range a.Reporters {
if reporter.SubmissionInterval == time.Duration(interval)*time.Millisecond {
return reporter, nil
}
}
return nil, errorSentinel.ErrReporterNotFound
}

func (a *App) startReporter(ctx context.Context, reporter *Reporter) error {
if reporter.isRunning {
log.Debug().Str("Player", "Reporter").Msg("reporter already running")
Expand Down Expand Up @@ -175,3 +172,17 @@ func groupConfigsBySubmitIntervals(reporterConfigs []Config) map[int][]Config {
}
return grouped
}

func (a *App) handleWsMessage(ctx context.Context, data map[string]interface{}) error {
a.LatestData.Store(data["symbol"], data)
return nil
}

func (a *App) GetReporterWithInterval(interval int) (*Reporter, error) {
for _, reporter := range a.Reporters {
if reporter.SubmissionInterval == time.Duration(interval)*time.Millisecond {
return reporter, nil
}
}
return nil, errorSentinel.ErrReporterNotFound
}
130 changes: 88 additions & 42 deletions node/pkg/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package reporter

import (
"context"
"encoding/json"
"math/big"
"strconv"
"sync"
"time"

errorSentinel "bisonai.com/orakl/node/pkg/error"
"bisonai.com/orakl/node/pkg/utils/request"

"github.com/rs/zerolog/log"
)
Expand All @@ -29,24 +29,17 @@ func NewReporter(ctx context.Context, opts ...ReporterOption) (*Reporter, error)
return nil, errorSentinel.ErrReporterEmptyConfigs
}

topicString := TOPIC_STRING + "-"
if config.JobType == DeviationJob {
topicString += "deviation"
} else {
topicString += strconv.Itoa(config.Interval)
}

groupInterval := time.Duration(config.Interval) * time.Millisecond

deviationThreshold := GetDeviationThreshold(groupInterval)

reporter := &Reporter{
contractAddress: config.ContractAddress,
SubmissionInterval: groupInterval,
CachedWhitelist: config.CachedWhitelist,
deviationThreshold: deviationThreshold,
DalEndpoint: config.DalEndpoint,
DalApiKey: config.DalApiKey,
}

reporter.SubmissionPairs = make(map[int32]SubmissionPair)
for _, sa := range config.Configs {
reporter.SubmissionPairs[sa.ID] = SubmissionPair{LastSubmission: 0, Name: sa.Name}
Expand All @@ -65,51 +58,67 @@ func (r *Reporter) Run(ctx context.Context) {
log.Debug().Str("Player", "Reporter").Msg("context done, stopping reporter")
return
case <-ticker.C:
err := r.report(ctx)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("reporting failed")
}
go func() {
err := r.report(ctx)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("reporting failed")
}
}()
}
}
}

func (r *Reporter) report(ctx context.Context) error {
pairs := ""
for _, submissionPair := range r.SubmissionPairs {
pairs += submissionPair.Name + ","
}
submissionData, err := request.Request[[]SubmissionData](request.WithEndpoint(r.DalEndpoint+"/latest-data-feeds/"+pairs), request.WithTimeout(10*time.Second), request.WithHeaders(map[string]string{"X-API-Key": r.DalApiKey, "Content-Type": "application/json"}))

if err != nil {
return err
}

var feedHashes [][32]byte
var values []*big.Int
var timestamps []*big.Int
var proofs [][]byte

for _, data := range submissionData {
intValue, valueErr := strconv.ParseInt(data.Value, 10, 64)
if valueErr != nil {
log.Error().Str("Player", "Reporter").Err(valueErr).Msgf("failed to parse value in data: %v", data.Symbol)
continue
}
feedHashesChan := make(chan [32]byte, len(r.SubmissionPairs))
valuesChan := make(chan *big.Int, len(r.SubmissionPairs))
timestampsChan := make(chan *big.Int, len(r.SubmissionPairs))
proofsChan := make(chan []byte, len(r.SubmissionPairs))

wg := sync.WaitGroup{}

for _, pair := range r.SubmissionPairs {
wg.Add(1)
go func(pair SubmissionPair) {
defer wg.Done()
if value, ok := r.LatestData.Load(pair.Name); ok {
submissionData, err := processDalWsRawData(value)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to process dal ws raw data")
return
}

feedHashesChan <- submissionData.FeedHash
valuesChan <- big.NewInt(submissionData.Value)
timestampsChan <- big.NewInt(submissionData.AggregateTime)
proofsChan <- submissionData.Proof
} else {
log.Error().Str("Player", "Reporter").Msgf("latest data for pair %s not found", pair.Name)
}
}(pair)
}

timestampValue, timestampErr := strconv.ParseInt(data.AggregateTime, 10, 64)
if timestampErr != nil {
log.Error().Str("Player", "Reporter").Err(timestampErr).Msgf("failed to parse timestamp in data: %v", data)
continue
}
wg.Wait()

feedHashes = append(feedHashes, data.FeedHash)
values = append(values, big.NewInt(intValue))
timestamps = append(timestamps, big.NewInt(timestampValue))
proofs = append(proofs, data.Proof)
close(timestampsChan)
close(valuesChan)
close(feedHashesChan)
close(proofsChan)

for feedHash := range feedHashesChan {
feedHashes = append(feedHashes, feedHash)
values = append(values, <-valuesChan)
timestamps = append(timestamps, <-timestampsChan)
proofs = append(proofs, <-proofsChan)
}

for start := 0; start < len(submissionData); start += MAX_REPORT_BATCH_SIZE {
end := min(start+MAX_REPORT_BATCH_SIZE, len(submissionData))
dataLen := len(feedHashes)
for start := 0; start < dataLen; start += MAX_REPORT_BATCH_SIZE {
end := min(start+MAX_REPORT_BATCH_SIZE, dataLen-1)

batchFeedHashes := feedHashes[start:end]
batchValues := values[start:end]
Expand All @@ -118,11 +127,11 @@ func (r *Reporter) report(ctx context.Context) error {

err := r.reportDelegated(ctx, SUBMIT_WITH_PROOFS, batchFeedHashes, batchValues, batchTimestamps, batchProofs)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("report")
err = r.reportDirect(ctx, SUBMIT_WITH_PROOFS, batchFeedHashes, batchValues, batchTimestamps, batchProofs)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("report")
}
log.Error().Str("Player", "Reporter").Err(err).Msg("report")
}
}
return nil
Expand Down Expand Up @@ -163,6 +172,43 @@ func (r *Reporter) reportDelegated(ctx context.Context, functionString string, a
return r.KaiaHelper.SubmitRawTx(ctx, signedTx)
}

func processDalWsRawData(data any) (SubmissionData, error) {
rawSubmissionData := RawSubmissionData{}

jsonMarshalData, jsonMarshalDataErr := json.Marshal(data)
if jsonMarshalDataErr != nil {
log.Error().Str("Player", "Reporter").Err(jsonMarshalDataErr).Msg("failed to marshal data")
return SubmissionData{}, errorSentinel.ErrReporterDalWsDataProcessingFailed
}

jsonUnmarshalDataErr := json.Unmarshal(jsonMarshalData, &rawSubmissionData)
if jsonUnmarshalDataErr != nil {
log.Error().Str("Player", "Reporter").Err(jsonUnmarshalDataErr).Msg("failed to unmarshal data")
return SubmissionData{}, errorSentinel.ErrReporterDalWsDataProcessingFailed
}

submissionData := SubmissionData{
FeedHash: rawSubmissionData.FeedHash,
Proof: rawSubmissionData.Proof,
}

value, valueErr := strconv.ParseInt(rawSubmissionData.Value, 10, 64)
if valueErr != nil {
log.Error().Str("Player", "Reporter").Err(valueErr).Msg("failed to parse value")
return SubmissionData{}, errorSentinel.ErrReporterDalWsDataProcessingFailed
}
submissionData.Value = value

timestampValue, timestampErr := strconv.ParseInt(rawSubmissionData.AggregateTime, 10, 64)
if timestampErr != nil {
log.Error().Str("Player", "Reporter").Err(timestampErr).Msg("failed to parse timestamp")
return SubmissionData{}, errorSentinel.ErrReporterDalWsDataProcessingFailed
}
submissionData.AggregateTime = timestampValue

return submissionData, nil
}

// func (r *Reporter) deviationJob() error {
// start := time.Now()
// log.Info().Str("Player", "Reporter").Time("start", start).Msg("reporter deviation job")
Expand Down
Loading

0 comments on commit c432c8e

Please sign in to comment.