Skip to content

Commit

Permalink
deviation reporter implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
Intizar-T committed Jul 25, 2024
1 parent 9f77e0b commit 6f59d13
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 12 deletions.
46 changes: 45 additions & 1 deletion node/pkg/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

errorSentinel "bisonai.com/orakl/node/pkg/error"
"bisonai.com/orakl/node/pkg/utils/retrier"

"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -66,7 +67,12 @@ func (r *Reporter) Run(ctx context.Context) {
}
}()
} else {
log.Info().Str("Player", "Reporter").Msg("reporter job type temp not supported")
go func() {
err := r.deviationJob(ctx)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("deviation job failed")
}
}()
}
}
}
Expand Down Expand Up @@ -155,6 +161,44 @@ func (r *Reporter) report(ctx context.Context, submissionPairs map[int32]Submiss
return nil
}

func (r *Reporter) deviationJob(ctx context.Context) error {
deviatingAggregates := GetDeviatingAggregates(r.SubmissionPairs, r.LatestData, r.deviationThreshold)
if len(deviatingAggregates) == 0 {
log.Info().Msg("no deviating aggregates found")
return nil
}

log.Info().Msgf("deviating aggregates found: %v", deviatingAggregates)

reportJob := func() error {
err := r.report(ctx, deviatingAggregates)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("DeviationReport")
return err
}
return nil
}

err := retrier.Retry(
reportJob,
MAX_RETRY,
INITIAL_FAILURE_TIMEOUT,
MAX_RETRY_DELAY,
)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to report deviation, resigning from leader")
return errorSentinel.ErrReporterDeviationReportFail
}

for configId, agg := range deviatingAggregates {
pair := r.SubmissionPairs[configId]
pair.LastSubmission = agg.LastSubmission
r.SubmissionPairs[configId] = pair
}

return nil
}

func processDalWsRawData(data any) (SubmissionData, error) {
rawSubmissionData := RawSubmissionData{}

Expand Down
30 changes: 19 additions & 11 deletions node/pkg/reporter/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"math"
"math/big"
"sync"
"time"

"bisonai.com/orakl/node/pkg/chain/helper"
Expand All @@ -17,20 +18,27 @@ import (
"github.com/rs/zerolog/log"
)

func GetDeviatingAggregates(lastSubmitted []GlobalAggregate, newAggregates []GlobalAggregate, threshold float64) []GlobalAggregate {
submittedAggregates := make(map[int32]GlobalAggregate)
for _, aggregate := range lastSubmitted {
submittedAggregates[aggregate.ConfigID] = aggregate
}
func GetDeviatingAggregates(submissionPairs map[int32]SubmissionPair, latestData *sync.Map, threshold float64) map[int32]SubmissionPair {
deviatingSubmissionPairs := make(map[int32]SubmissionPair)
for configID, submissionPair := range submissionPairs {
latestDataValue, ok := latestData.Load(configID)
if !ok {
continue
}

latestDataValueInt64, ok := latestDataValue.(int64)
if !ok {
continue
}

result := make([]GlobalAggregate, 0, len(newAggregates))
for _, newAggregate := range newAggregates {
submittedAggregate, ok := submittedAggregates[newAggregate.ConfigID]
if !ok || ShouldReportDeviation(submittedAggregate.Value, newAggregate.Value, threshold) {
result = append(result, newAggregate)
if ShouldReportDeviation(submissionPair.LastSubmission, latestDataValueInt64, threshold) {
deviatingSubmissionPairs[configID] = SubmissionPair{
LastSubmission: latestDataValueInt64,
Name: submissionPair.Name,
}
}
}
return result
return deviatingSubmissionPairs
}

func ShouldReportDeviation(oldValue int64, newValue int64, threshold float64) bool {
Expand Down

0 comments on commit 6f59d13

Please sign in to comment.