diff --git a/node/cmd/node/main.go b/node/cmd/node/main.go index c4e6a8446..499e9f198 100644 --- a/node/cmd/node/main.go +++ b/node/cmd/node/main.go @@ -12,7 +12,6 @@ import ( "bisonai.com/orakl/node/pkg/fetcher" "bisonai.com/orakl/node/pkg/libp2p/helper" libp2pSetup "bisonai.com/orakl/node/pkg/libp2p/setup" - "bisonai.com/orakl/node/pkg/reporter" "bisonai.com/orakl/node/pkg/utils/retrier" "bisonai.com/orakl/node/pkg/zeropglog" "github.com/rs/zerolog/log" @@ -87,18 +86,6 @@ func main() { }() log.Info().Msg("Aggregator started") - wg.Add(1) - go func() { - defer wg.Done() - r := reporter.New(mb, host, ps) - reporterErr := r.Run(ctx) - if reporterErr != nil { - log.Error().Err(reporterErr).Msg("Failed to start reporter") - os.Exit(1) - } - }() - log.Info().Msg("Reporter started") - wg.Add(1) go func() { defer wg.Done() diff --git a/node/cmd/reporter/main.go b/node/cmd/reporter/main.go new file mode 100644 index 000000000..4856610bf --- /dev/null +++ b/node/cmd/reporter/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "context" + "os" + "os/signal" + "syscall" + + "bisonai.com/orakl/node/pkg/reporter" + "bisonai.com/orakl/node/pkg/zeropglog" + "github.com/rs/zerolog/log" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + + zeropglog := zeropglog.New() + go zeropglog.Run(ctx) + + r := reporter.New() + err := r.Run(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to start reporter") + cancel() + return + } + + <-sigChan + log.Info().Msg("Reporter termination signal received") + + cancel() + + log.Info().Msg("Reporter service has stopped") +} diff --git a/node/pkg/error/sentinel.go b/node/pkg/error/sentinel.go index 4f8aa3fc3..b2eb38f7e 100644 --- a/node/pkg/error/sentinel.go +++ b/node/pkg/error/sentinel.go @@ -195,6 +195,7 @@ var ( ErrReporterStart = &CustomError{Service: Reporter, Code: InternalError, Message: "Failed to start reporters"} ErrReporterStop = &CustomError{Service: Reporter, Code: InternalError, Message: "Failed to stop reporters"} ErrReporterValidateAggregateTimestampValues = &CustomError{Service: Reporter, Code: InternalError, Message: "Failed to validate aggregate timestamp values"} + ErrReporterDalApiKeyNotFound = &CustomError{Service: Reporter, Code: InternalError, Message: "DAL API key not found in reporter"} ErrDalEmptyProofParam = &CustomError{Service: Dal, Code: InvalidInputError, Message: "Empty proof param"} ErrDalInvalidProofLength = &CustomError{Service: Dal, Code: InvalidInputError, Message: "Invalid proof length"} diff --git a/node/pkg/reporter/app.go b/node/pkg/reporter/app.go index dcd7ff275..05112b01e 100644 --- a/node/pkg/reporter/app.go +++ b/node/pkg/reporter/app.go @@ -2,44 +2,43 @@ package reporter import ( "context" + "fmt" "os" "time" - "bisonai.com/orakl/node/pkg/bus" "bisonai.com/orakl/node/pkg/chain/helper" "bisonai.com/orakl/node/pkg/db" errorSentinel "bisonai.com/orakl/node/pkg/error" "github.com/klaytn/klaytn/common" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/host" "github.com/rs/zerolog/log" ) -func New(bus *bus.MessageBus, h host.Host, ps *pubsub.PubSub) *App { +func New() *App { return &App{ Reporters: []*Reporter{}, - Bus: bus, - Host: h, - Pubsub: ps, } } func (a *App) Run(ctx context.Context) error { - err := a.setReporters(ctx, a.Host, a.Pubsub) + err := a.setReporters(ctx) if err != nil { log.Error().Str("Player", "Reporter").Err(err).Msg("failed to set reporters") return err } - a.subscribe(ctx) return a.startReporters(ctx) } -func (a *App) setReporters(ctx context.Context, h host.Host, ps *pubsub.PubSub) error { - err := a.clearReporters() - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("failed to clear reporters") - return err +func (a *App) setReporters(ctx context.Context) error { + dalApiKey := os.Getenv("API_KEY") + if dalApiKey == "" { + return errorSentinel.ErrReporterDalApiKeyNotFound + } + + chain := os.Getenv("CHAIN") + if chain == "" { + log.Warn().Str("Player", "Reporter").Msg("chain not set, defaulting to baobab") + chain = "baobab" } contractAddress := os.Getenv("SUBMISSION_PROXY_CONTRACT") @@ -70,12 +69,12 @@ func (a *App) setReporters(ctx context.Context, h host.Host, ps *pubsub.PubSub) for groupInterval, configs := range groupedConfigs { reporter, errNewReporter := NewReporter( ctx, - WithHost(h), - WithPubsub(ps), WithConfigs(configs), WithInterval(groupInterval), WithContractAddress(contractAddress), WithCachedWhitelist(cachedWhitelist), + WithDalEndpoint(fmt.Sprintf("https://dal.%s.orakl.network", chain)), + WithDalApiKey(dalApiKey), ) if errNewReporter != nil { log.Error().Str("Player", "Reporter").Err(errNewReporter).Msg("failed to set reporter") @@ -88,55 +87,35 @@ func (a *App) setReporters(ctx context.Context, h host.Host, ps *pubsub.PubSub) return errorSentinel.ErrReporterNotFound } - deviationReporter, errNewDeviationReporter := NewReporter( - ctx, - WithHost(h), - WithPubsub(ps), - WithConfigs(configs), - WithInterval(DEVIATION_INTERVAL), - WithContractAddress(contractAddress), - WithCachedWhitelist(cachedWhitelist), - WithJobType(DeviationJob), - ) - if errNewDeviationReporter != nil { - log.Error().Str("Player", "Reporter").Err(errNewDeviationReporter).Msg("failed to set deviation reporter") - return errNewDeviationReporter - } - a.Reporters = append(a.Reporters, deviationReporter) + // deviationReporter, errNewDeviationReporter := NewReporter( + // ctx, + // WithConfigs(configs), + // WithInterval(DEVIATION_INTERVAL), + // WithContractAddress(contractAddress), + // WithCachedWhitelist(cachedWhitelist), + // WithJobType(DeviationJob), + // ) + // if errNewDeviationReporter != nil { + // log.Error().Str("Player", "Reporter").Err(errNewDeviationReporter).Msg("failed to set deviation reporter") + // return errNewDeviationReporter + // } + // a.Reporters = append(a.Reporters, deviationReporter) log.Info().Str("Player", "Reporter").Msgf("%d reporters set", len(a.Reporters)) return nil } -func (a *App) clearReporters() error { - if a.Reporters == nil { - return nil - } - +func (a *App) startReporters(ctx context.Context) error { var errs []string - for _, reporter := range a.Reporters { - if reporter.isRunning { - err := stopReporter(reporter) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("failed to stop reporter") - errs = append(errs, err.Error()) - } - } - } - a.Reporters = make([]*Reporter, 0) - if len(errs) > 0 { - return errorSentinel.ErrReporterClear + chainHelper, chainHelperErr := helper.NewChainHelper(ctx) + if chainHelperErr != nil { + return chainHelperErr } - - return nil -} - -func (a *App) startReporters(ctx context.Context) error { - var errs []string + a.chainHelper = chainHelper for _, reporter := range a.Reporters { - err := startReporter(ctx, reporter) + err := a.startReporter(ctx, reporter) if err != nil { log.Error().Str("Player", "Reporter").Err(err).Msg("failed to start reporter") errs = append(errs, err.Error()) @@ -150,97 +129,6 @@ func (a *App) startReporters(ctx context.Context) error { return nil } -func (a *App) stopReporters() error { - var errs []string - - for _, reporter := range a.Reporters { - err := stopReporter(reporter) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("failed to stop reporter") - errs = append(errs, err.Error()) - } - } - - if len(errs) > 0 { - return errorSentinel.ErrReporterStop - } - - return nil -} - -func (a *App) subscribe(ctx context.Context) { - log.Debug().Str("Player", "Reporter").Msg("subscribing to reporter topic") - channel := a.Bus.Subscribe(bus.REPORTER) - if channel == nil { - log.Error().Str("Player", "Reporter").Msg("failed to subscribe to reporter topic") - return - } - - go func() { - log.Debug().Str("Player", "Reporter").Msg("start reporter subscription goroutine") - for { - select { - case msg := <-channel: - log.Debug().Str("Player", "Reporter").Str("command", msg.Content.Command).Msg("received message from reporter topic") - go a.handleMessage(ctx, msg) - case <-ctx.Done(): - log.Debug().Str("Player", "Reporter").Msg("stopping reporter subscription goroutine") - return - } - } - }() -} - -func (a *App) handleMessage(ctx context.Context, msg bus.Message) { - switch msg.Content.Command { - case bus.ACTIVATE_REPORTER: - if msg.From != bus.ADMIN { - bus.HandleMessageError(errorSentinel.ErrBusNonAdmin, msg, "reporter received message from non-admin") - return - } - err := a.startReporters(ctx) - if err != nil { - bus.HandleMessageError(err, msg, "failed to start reporter") - return - } - msg.Response <- bus.MessageResponse{Success: true} - case bus.DEACTIVATE_REPORTER: - if msg.From != bus.ADMIN { - bus.HandleMessageError(errorSentinel.ErrBusNonAdmin, msg, "reporter received message from non-admin") - return - } - err := a.stopReporters() - if err != nil { - bus.HandleMessageError(err, msg, "failed to stop reporter") - return - } - msg.Response <- bus.MessageResponse{Success: true} - case bus.REFRESH_REPORTER: - if msg.From != bus.ADMIN { - bus.HandleMessageError(errorSentinel.ErrBusNonAdmin, msg, "reporter received message from non-admin") - return - } - err := a.stopReporters() - if err != nil { - bus.HandleMessageError(err, msg, "failed to stop reporter") - return - } - - err = a.setReporters(ctx, a.Host, a.Pubsub) - if err != nil { - bus.HandleMessageError(err, msg, "failed to set reporters") - return - } - - err = a.startReporters(ctx) - if err != nil { - bus.HandleMessageError(err, msg, "failed to start reporter") - return - } - msg.Response <- bus.MessageResponse{Success: true} - } -} - func (a *App) GetReporterWithInterval(interval int) (*Reporter, error) { for _, reporter := range a.Reporters { if reporter.SubmissionInterval == time.Duration(interval)*time.Millisecond { @@ -250,17 +138,13 @@ func (a *App) GetReporterWithInterval(interval int) (*Reporter, error) { return nil, errorSentinel.ErrReporterNotFound } -func startReporter(ctx context.Context, reporter *Reporter) error { +func (a *App) startReporter(ctx context.Context, reporter *Reporter) error { if reporter.isRunning { log.Debug().Str("Player", "Reporter").Msg("reporter already running") return errorSentinel.ErrReporterAlreadyRunning } - err := reporter.SetKaiaHelper(ctx) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("failed to set kaia helper") - return err - } + reporter.KaiaHelper = a.chainHelper nodeCtx, cancel := context.WithCancel(ctx) reporter.nodeCtx = nodeCtx @@ -271,24 +155,6 @@ func startReporter(ctx context.Context, reporter *Reporter) error { return nil } -func stopReporter(reporter *Reporter) error { - if !reporter.isRunning { - log.Debug().Str("Player", "Reporter").Msg("reporter not running") - return nil - } - - if reporter.nodeCancel == nil { - log.Error().Str("Player", "Reporter").Msg("reporter cancel function not found") - return errorSentinel.ErrReporterCancelNotFound - } - - reporter.nodeCancel() - reporter.isRunning = false - reporter.KaiaHelper.Close() - <-reporter.nodeCtx.Done() - return nil -} - func getConfigs(ctx context.Context) ([]Config, error) { reporterConfigs, err := db.QueryRows[Config](ctx, GET_REPORTER_CONFIGS, nil) if err != nil { diff --git a/node/pkg/reporter/app_test.go b/node/pkg/reporter/app_test.go index 5b886ed16..6c7108cde 100644 --- a/node/pkg/reporter/app_test.go +++ b/node/pkg/reporter/app_test.go @@ -46,10 +46,10 @@ func TestStopReporter(t *testing.T) { t.Fatal("error running reporter") } - err = testItems.app.stopReporters() - if err != nil { - t.Fatal("error stopping reporter") - } + // err = testItems.app.stopReporters() + // if err != nil { + // t.Fatal("error stopping reporter") + // } assert.Equal(t, testItems.app.Reporters[0].isRunning, false) } @@ -66,8 +66,6 @@ func TestStopReporterByAdmin(t *testing.T) { } }() - testItems.app.subscribe(ctx) - err = testItems.app.Run(ctx) if err != nil { t.Fatal("error running reporter") @@ -93,8 +91,7 @@ func TestStartReporterByAdmin(t *testing.T) { } }() - testItems.app.subscribe(ctx) - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) + err = testItems.app.setReporters(ctx) if err != nil { t.Fatalf("error setting reporters: %v", err) } @@ -121,8 +118,7 @@ func TestRestartReporterByAdmin(t *testing.T) { } }() - testItems.app.subscribe(ctx) - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) + err = testItems.app.setReporters(ctx) if err != nil { t.Fatalf("error setting reporters: %v", err) } diff --git a/node/pkg/reporter/main_test.go b/node/pkg/reporter/main_test.go index 759870ef5..24724fd63 100644 --- a/node/pkg/reporter/main_test.go +++ b/node/pkg/reporter/main_test.go @@ -15,7 +15,6 @@ import ( "bisonai.com/orakl/node/pkg/chain/helper" "bisonai.com/orakl/node/pkg/common/keys" "bisonai.com/orakl/node/pkg/db" - libp2pSetup "bisonai.com/orakl/node/pkg/libp2p/setup" "github.com/gofiber/fiber/v2" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -119,17 +118,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { testItems.admin = admin - h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch()) - if err != nil { - return nil, nil, err - } - - ps, err := libp2pSetup.MakePubsub(ctx, h) - if err != nil { - return nil, nil, err - } - - app := New(mb, h, ps) + app := New() testItems.app = app tmpData, err := insertSampleData(ctx) @@ -167,11 +156,11 @@ func reporterCleanup(ctx context.Context, admin *fiber.App, app *App) func() err return err } - err = app.stopReporters() - if err != nil { - return err - } - return app.Host.Close() + // err = app.stopReporters() + // if err != nil { + // return err + // } + return nil } } diff --git a/node/pkg/reporter/reporter.go b/node/pkg/reporter/reporter.go index 25f67765c..22c0b039d 100644 --- a/node/pkg/reporter/reporter.go +++ b/node/pkg/reporter/reporter.go @@ -2,21 +2,19 @@ package reporter import ( "context" - "encoding/json" "math/big" "strconv" + "sync" "time" - "bisonai.com/orakl/node/pkg/chain/helper" - chainUtils "bisonai.com/orakl/node/pkg/chain/utils" errorSentinel "bisonai.com/orakl/node/pkg/error" - - "bisonai.com/orakl/node/pkg/raft" - "bisonai.com/orakl/node/pkg/utils/retrier" + "bisonai.com/orakl/node/pkg/utils/request" "github.com/rs/zerolog/log" ) +var mu sync.Mutex + func NewReporter(ctx context.Context, opts ...ReporterOption) (*Reporter, error) { config := &ReporterConfig{ JobType: ReportJob, @@ -40,214 +38,100 @@ func NewReporter(ctx context.Context, opts ...ReporterOption) (*Reporter, error) groupInterval := time.Duration(config.Interval) * time.Millisecond - topic, err := config.Ps.Join(topicString) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("Failed to join topic") - return nil, err - } - - raft := raft.NewRaftNode(config.Host, config.Ps, topic, MESSAGE_BUFFER, groupInterval) deviationThreshold := GetDeviationThreshold(groupInterval) reporter := &Reporter{ - Raft: raft, contractAddress: config.ContractAddress, SubmissionInterval: groupInterval, CachedWhitelist: config.CachedWhitelist, deviationThreshold: deviationThreshold, + DalEndpoint: config.DalEndpoint, + DalApiKey: config.DalApiKey, } reporter.SubmissionPairs = make(map[int32]SubmissionPair) for _, sa := range config.Configs { reporter.SubmissionPairs[sa.ID] = SubmissionPair{LastSubmission: 0, Name: sa.Name} } - reporter.Raft.HandleCustomMessage = reporter.handleCustomMessage - if config.JobType == DeviationJob { - reporter.Raft.LeaderJob = reporter.deviationJob - } else { - reporter.Raft.LeaderJob = reporter.leaderJob - } return reporter, nil } func (r *Reporter) Run(ctx context.Context) { - r.Raft.Run(ctx) -} - -func (r *Reporter) leaderJob() error { - start := time.Now() - log.Info().Str("Player", "Reporter").Time("start", start).Msg("reporter job") - r.Raft.IncreaseTerm() - ctx := context.Background() - - aggregates, err := GetLatestGlobalAggregates(ctx, r.SubmissionPairs) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("GetLatestGlobalAggregates") - return err - } - - validAggregates := FilterInvalidAggregates(aggregates, r.SubmissionPairs) - if len(validAggregates) == 0 { - log.Warn().Str("Player", "Reporter").Msg("no valid aggregates to report") - return nil - } - log.Debug().Str("Player", "Reporter").Int("validAggregates", len(validAggregates)).Msg("valid aggregates") - - reportJob := func() error { - err = r.report(ctx, validAggregates) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("report") - return err + log.Info().Msgf("Reporter ticker starting with interval: %v", r.SubmissionInterval) + ticker := time.NewTicker(r.SubmissionInterval) + + for { + select { + case <-ctx.Done(): + log.Debug().Str("Player", "Reporter").Msg("context done, stopping reporter") + return + case <-ticker.C: + err := r.report(ctx) + if err != nil { + log.Error().Str("Player", "Reporter").Err(err).Msg("reporting failed") + } } - return nil - } - - err = retrier.Retry( - reportJob, - MAX_RETRY, - INITIAL_FAILURE_TIMEOUT, - MAX_RETRY_DELAY, - ) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("failed to report, resigning from leader") - r.resignLeader() - return errorSentinel.ErrReporterReportFailed } - - err = r.PublishSubmissionMessage(validAggregates) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("PublishSubmissionMessage") - return err - } - - for _, agg := range validAggregates { - pair := r.SubmissionPairs[agg.ConfigID] - pair.LastSubmission = agg.Round - r.SubmissionPairs[agg.ConfigID] = pair - } - log.Info().Int("validAggregates", len(validAggregates)).Str("Player", "Reporter").Str("Duration", time.Since(start).String()).Msg("reporting done") - - return nil } -func (r *Reporter) report(ctx context.Context, aggregates []GlobalAggregate) error { - - log.Debug().Str("Player", "Reporter").Int("aggregates", len(aggregates)).Msg("reporting") - - if !ValidateAggregateTimestampValues(aggregates) { - log.Error().Str("Player", "Reporter").Msg("ValidateAggregateTimestampValues, zero timestamp exists") - return errorSentinel.ErrReporterValidateAggregateTimestampValues - } - - proofMap, err := GetProofsAsMap(ctx, aggregates) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("submit without proofs") - return err - } - log.Debug().Str("Player", "Reporter").Int("proofs", len(proofMap)).Msg("proof map generated") - - orderedProofMap, err := r.orderProofs(ctx, proofMap, aggregates) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("orderProofs") - return err +func (r *Reporter) report(ctx context.Context) error { + pairs := "" + for _, submissionPair := range r.SubmissionPairs { + pairs += submissionPair.Name + "," } + submissionData, err := request.Request[[]SubmissionData](request.WithEndpoint(r.DalEndpoint+"/latest-data-feeds/"+pairs), request.WithTimeout(10*time.Second), request.WithHeaders(map[string]string{"X-API-Key": r.DalApiKey, "Content-Type": "application/json"})) - log.Debug().Str("Player", "Reporter").Int("orderedProofs", len(orderedProofMap)).Msg("ordered proof map generated") - - err = UpdateProofs(ctx, aggregates, orderedProofMap) if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("updateProofs") return err } - log.Debug().Str("Player", "Reporter").Msg("proofs updated to db, reporting with proofs") - return r.reportWithProofs(ctx, aggregates, orderedProofMap) -} -func (r *Reporter) orderProof(ctx context.Context, proof []byte, aggregate GlobalAggregate) ([]byte, error) { - proof = RemoveDuplicateProof(proof) - hash := chainUtils.Value2HashForSign(aggregate.Value, aggregate.Timestamp.Unix(), r.SubmissionPairs[aggregate.ConfigID].Name) - proofChunks, err := SplitProofToChunk(proof) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("failed to split proof") - return nil, err - } - - signers, err := GetSignerListFromProofs(hash, proofChunks) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("failed to get signers from proof") - return nil, err - } - - err = CheckForNonWhitelistedSigners(signers, r.CachedWhitelist) - if err != nil { - log.Warn().Str("Player", "Reporter").Err(err).Msg("non-whitelisted signers in proof, reloading whitelist") - reloadedWhitelist, contractReadErr := ReadOnchainWhitelist(ctx, r.KaiaHelper, r.contractAddress, GET_ONCHAIN_WHITELIST) - if contractReadErr != nil { - log.Error().Str("Player", "Reporter").Err(contractReadErr).Msg("failed to reload whitelist") - return nil, contractReadErr - } - r.CachedWhitelist = reloadedWhitelist - } - - signerMap := GetSignerMap(signers, proofChunks) - orderedProof, err := OrderProof(signerMap, r.CachedWhitelist) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("failed to order proof") - return nil, err - } - return orderedProof, nil -} + var feedHashes [][32]byte + var values []*big.Int + var timestamps []*big.Int + var proofs [][]byte -func (r *Reporter) orderProofs(ctx context.Context, proofMap map[int32][]byte, aggregates []GlobalAggregate) (map[int32][]byte, error) { - orderedProofMap := make(map[int32][]byte) - for _, agg := range aggregates { - proof, ok := proofMap[agg.ConfigID] - if !ok { - log.Error().Str("Player", "Reporter").Msg("proof not found") - return nil, errorSentinel.ErrReporterProofNotFound + for _, data := range submissionData { + intValue, valueErr := strconv.ParseInt(data.Value, 10, 64) + if valueErr != nil { + log.Error().Str("Player", "Reporter").Err(valueErr).Msgf("failed to parse value in data: %v", data.Symbol) + continue } - orderedProof, err := r.orderProof(ctx, proof, agg) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("orderProof") - return nil, err + timestampValue, timestampErr := strconv.ParseInt(data.AggregateTime, 10, 64) + if timestampErr != nil { + log.Error().Str("Player", "Reporter").Err(timestampErr).Msgf("failed to parse timestamp in data: %v", data) + continue } - orderedProofMap[agg.ConfigID] = orderedProof + feedHashes = append(feedHashes, data.FeedHash) + values = append(values, big.NewInt(intValue)) + timestamps = append(timestamps, big.NewInt(timestampValue)) + proofs = append(proofs, data.Proof) } - return orderedProofMap, nil -} - -func (r *Reporter) resignLeader() { - r.Raft.ResignLeader() -} - -func (r *Reporter) handleCustomMessage(ctx context.Context, msg raft.Message) error { - switch msg.Type { - case SubmissionMsg: - return r.HandleSubmissionMessage(ctx, msg) - default: - return errorSentinel.ErrReporterUnknownMessageType - } -} + for start := 0; start < len(submissionData); start += MAX_REPORT_BATCH_SIZE { + end := min(start+MAX_REPORT_BATCH_SIZE, len(submissionData)) -func (r *Reporter) reportWithProofs(ctx context.Context, aggregates []GlobalAggregate, proofMap map[int32][]byte) error { - if r.KaiaHelper == nil { - return errorSentinel.ErrReporterKaiaHelperNotFound - } - log.Debug().Str("Player", "Reporter").Int("aggregates", len(aggregates)).Msg("reporting with proofs") + batchFeedHashes := feedHashes[start:end] + batchValues := values[start:end] + batchTimestamps := timestamps[start:end] + batchProofs := proofs[start:end] - feedHashes, values, timestamps, proofs, err := MakeContractArgsWithProofs(aggregates, r.SubmissionPairs, proofMap) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("makeContractArgsWithProofs") - return err + err := r.reportDelegated(ctx, SUBMIT_WITH_PROOFS, batchFeedHashes, batchValues, batchTimestamps, batchProofs) + if err != nil { + log.Error().Str("Player", "Reporter").Err(err).Msg("report") + err = r.reportDirect(ctx, SUBMIT_WITH_PROOFS, batchFeedHashes, batchValues, batchTimestamps, batchProofs) + if err != nil { + log.Error().Str("Player", "Reporter").Err(err).Msg("report") + } + } } - log.Debug().Str("Player", "Reporter").Int("proofs", len(proofs)).Msg("contract arguements generated") - - return r.splitReport(ctx, feedHashes, values, timestamps, proofs) + return nil } func (r *Reporter) reportDirect(ctx context.Context, functionString string, args ...interface{}) error { + mu.Lock() + defer mu.Unlock() + rawTx, err := r.KaiaHelper.MakeDirectTx(ctx, r.contractAddress, functionString, args...) if err != nil { log.Error().Str("Player", "Reporter").Err(err).Msg("MakeDirectTx") @@ -258,6 +142,9 @@ func (r *Reporter) reportDirect(ctx context.Context, functionString string, args } func (r *Reporter) reportDelegated(ctx context.Context, functionString string, args ...interface{}) error { + mu.Lock() + defer mu.Unlock() + log.Debug().Str("Player", "Reporter").Msg("reporting delegated") rawTx, err := r.KaiaHelper.MakeFeeDelegatedTx(ctx, r.contractAddress, functionString, args...) if err != nil { @@ -276,136 +163,62 @@ func (r *Reporter) reportDelegated(ctx context.Context, functionString string, a return r.KaiaHelper.SubmitRawTx(ctx, signedTx) } -func (r *Reporter) SetKaiaHelper(ctx context.Context) error { - if r.KaiaHelper != nil { - r.KaiaHelper.Close() - } - kaiaHelper, err := helper.NewChainHelper(ctx) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("failed to create kaia helper") - return err - } - r.KaiaHelper = kaiaHelper - return nil -} - -func (r *Reporter) deviationJob() error { - start := time.Now() - log.Info().Str("Player", "Reporter").Time("start", start).Msg("reporter deviation job") - r.Raft.IncreaseTerm() - ctx := context.Background() - - lastSubmissions, err := GetLastSubmission(ctx, r.SubmissionPairs) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("getLastSubmission") - return err - } - if len(lastSubmissions) == 0 { - log.Warn().Str("Player", "Reporter").Msg("no last submissions") - return nil - } - - lastAggregates, err := GetLatestGlobalAggregates(ctx, r.SubmissionPairs) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("getLatestGlobalAggregates") - return err - } - if len(lastAggregates) == 0 { - log.Warn().Str("Player", "Reporter").Msg("no last aggregates") - return nil - } - - deviatingAggregates := GetDeviatingAggregates(lastSubmissions, lastAggregates, r.deviationThreshold) - if len(deviatingAggregates) == 0 { - return nil - } - - reportJob := func() error { - err = r.report(ctx, deviatingAggregates) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("DeviationReport") - return err - } - return nil - } - - err = retrier.Retry( - reportJob, - MAX_RETRY, - INITIAL_FAILURE_TIMEOUT, - MAX_RETRY_DELAY, - ) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("failed to report deviation, resigning from leader") - r.resignLeader() - return errorSentinel.ErrReporterDeviationReportFail - } - - err = r.PublishSubmissionMessage(deviatingAggregates) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("PublishSubmissionMessage") - return err - } - - for _, agg := range deviatingAggregates { - pair := r.SubmissionPairs[agg.ConfigID] - pair.LastSubmission = agg.Round - r.SubmissionPairs[agg.ConfigID] = pair - } - - log.Info().Int("deviations", len(deviatingAggregates)).Str("Player", "Reporter").Dur("duration", time.Since(start)).Msg("reporting deviation done") - return nil -} - -func (r *Reporter) PublishSubmissionMessage(submissions []GlobalAggregate) error { - submissionMessage := SubmissionMessage{Submissions: submissions} - marshalledSubmissionMessage, err := json.Marshal(submissionMessage) - if err != nil { - return err - } - - message := raft.Message{ - Type: SubmissionMsg, - SentFrom: r.Raft.GetHostId(), - Data: json.RawMessage(marshalledSubmissionMessage), - } - - return r.Raft.PublishMessage(message) -} - -func (r *Reporter) HandleSubmissionMessage(ctx context.Context, msg raft.Message) error { - var submissionMessage SubmissionMessage - err := json.Unmarshal(msg.Data, &submissionMessage) - if err != nil { - return err - } - - err = StoreLastSubmission(ctx, submissionMessage.Submissions) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("storeLastSubmission") - return err - } - - return nil -} - -func (r *Reporter) splitReport(ctx context.Context, feedHashes [][32]byte, values []*big.Int, timestamps []*big.Int, proofs [][]byte) error { - for start := 0; start < len(feedHashes); start += MAX_REPORT_BATCH_SIZE { - end := min(start+MAX_REPORT_BATCH_SIZE, len(feedHashes)) - - batchFeedHashes := feedHashes[start:end] - batchValues := values[start:end] - batchTimestamps := timestamps[start:end] - batchProofs := proofs[start:end] - - err := r.reportDelegated(ctx, SUBMIT_WITH_PROOFS, batchFeedHashes, batchValues, batchTimestamps, batchProofs) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("splitReport") - err = r.reportDirect(ctx, SUBMIT_WITH_PROOFS, batchFeedHashes, batchValues, batchTimestamps, batchProofs) - if err != nil { - log.Error().Str("Player", "Reporter").Err(err).Msg("splitReport") - } - } - } - return nil -} +// func (r *Reporter) deviationJob() error { +// start := time.Now() +// log.Info().Str("Player", "Reporter").Time("start", start).Msg("reporter deviation job") +// ctx := context.Background() + +// lastSubmissions, err := GetLastSubmission(ctx, r.SubmissionPairs) +// if err != nil { +// log.Error().Str("Player", "Reporter").Err(err).Msg("getLastSubmission") +// return err +// } +// if len(lastSubmissions) == 0 { +// log.Warn().Str("Player", "Reporter").Msg("no last submissions") +// return nil +// } + +// lastAggregates, err := GetLatestGlobalAggregates(ctx, r.SubmissionPairs) +// if err != nil { +// log.Error().Str("Player", "Reporter").Err(err).Msg("getLatestGlobalAggregates") +// return err +// } +// if len(lastAggregates) == 0 { +// log.Warn().Str("Player", "Reporter").Msg("no last aggregates") +// return nil +// } + +// deviatingAggregates := GetDeviatingAggregates(lastSubmissions, lastAggregates, r.deviationThreshold) +// if len(deviatingAggregates) == 0 { +// return nil +// } + +// reportJob := func() error { +// err = r.report(ctx, deviatingAggregates) +// if err != nil { +// log.Error().Str("Player", "Reporter").Err(err).Msg("DeviationReport") +// return err +// } +// return nil +// } + +// err = retrier.Retry( +// reportJob, +// MAX_RETRY, +// INITIAL_FAILURE_TIMEOUT, +// MAX_RETRY_DELAY, +// ) +// if err != nil { +// log.Error().Str("Player", "Reporter").Err(err).Msg("failed to report deviation, resigning from leader") +// return errorSentinel.ErrReporterDeviationReportFail +// } + +// for _, agg := range deviatingAggregates { +// pair := r.SubmissionPairs[agg.ConfigID] +// pair.LastSubmission = agg.Round +// r.SubmissionPairs[agg.ConfigID] = pair +// } + +// log.Info().Int("deviations", len(deviatingAggregates)).Str("Player", "Reporter").Dur("duration", time.Since(start)).Msg("reporting deviation done") +// return nil +// } diff --git a/node/pkg/reporter/reporter_test.go b/node/pkg/reporter/reporter_test.go index 9638900bd..49d59c969 100644 --- a/node/pkg/reporter/reporter_test.go +++ b/node/pkg/reporter/reporter_test.go @@ -1,607 +1,607 @@ //nolint:all package reporter -import ( - "context" - "math/big" - "os" - "testing" - "time" - - "bisonai.com/orakl/node/pkg/chain/helper" - errorSentinel "bisonai.com/orakl/node/pkg/error" - "bisonai.com/orakl/node/pkg/raft" - "github.com/klaytn/klaytn/crypto" - "github.com/stretchr/testify/assert" -) - -func TestNewReporter(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - submissionPairs, err := getConfigs(ctx) - if err != nil { - t.Fatalf("error getting submission pairs: %v", err) - } - groupedSubmissionPairs := groupConfigsBySubmitIntervals(submissionPairs) - - contractAddress := os.Getenv("SUBMISSION_PROXY_CONTRACT") - if contractAddress == "" { - t.Fatal("SUBMISSION_PROXY_CONTRACT not set") - } - - tmpHelper, err := helper.NewChainHelper(ctx) - if err != nil { - t.Fatalf("error creating chain helper: %v", err) - } - defer tmpHelper.Close() - - whitelist, err := ReadOnchainWhitelist(ctx, tmpHelper, contractAddress, GET_ONCHAIN_WHITELIST) - if err != nil { - t.Fatalf("error reading onchain whitelist: %v", err) - } - - for groupInterval, pairs := range groupedSubmissionPairs { - _, err := NewReporter( - ctx, - WithHost(testItems.app.Host), - WithPubsub(testItems.app.Pubsub), - WithConfigs(pairs), - WithInterval(groupInterval), - WithContractAddress(contractAddress), - WithCachedWhitelist(whitelist), - ) - if err != nil { - t.Fatalf("error creating new reporter: %v", err) - } - } -} - -func TestLeaderJob(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - - reporter, err := testItems.app.GetReporterWithInterval(TestInterval) - if err != nil { - t.Fatalf("error getting reporter: %v", err) - } - - reporter.SetKaiaHelper(ctx) - err = reporter.leaderJob() - if err != nil { - t.Fatal("error running leader job") - } -} - -func TestResignLeader(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - reporter, err := testItems.app.GetReporterWithInterval(TestInterval) - if err != nil { - t.Fatalf("error getting reporter: %v", err) - } - reporter.resignLeader() - assert.Equal(t, reporter.Raft.GetRole(), raft.Follower) -} - -func TestHandleCustomMessage(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - reporter, err := testItems.app.GetReporterWithInterval(TestInterval) - if err != nil { - t.Fatalf("error getting reporter: %v", err) - } - - err = reporter.handleCustomMessage(ctx, raft.Message{}) - assert.ErrorIs(t, err, errorSentinel.ErrReporterUnknownMessageType) - -} - -func TestGetLatestGlobalAggregates(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - - reporter, err := testItems.app.GetReporterWithInterval(TestInterval) - if err != nil { - t.Fatalf("error getting reporter: %v", err) - } - result, err := GetLatestGlobalAggregates(ctx, reporter.SubmissionPairs) - if err != nil { - t.Fatal("error getting latest global aggregates") - } - - assert.Equal(t, result[0].ConfigID, testItems.tmpData.globalAggregate.ConfigID) - assert.Equal(t, result[0].Value, testItems.tmpData.globalAggregate.Value) -} - -func TestFilterInvalidAggregates(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - - reporter, err := testItems.app.GetReporterWithInterval(TestInterval) - if err != nil { - t.Fatalf("error getting reporter: %v", err) - } - - aggregates := []GlobalAggregate{{ - ConfigID: testItems.tmpData.config.ID, - Value: 15, - Round: 1, - }} - result := FilterInvalidAggregates(aggregates, reporter.SubmissionPairs) - assert.Equal(t, result, aggregates) - - reporter.SubmissionPairs = map[int32]SubmissionPair{testItems.tmpData.config.ID: {LastSubmission: 1, Name: "test-aggregate"}} - result = FilterInvalidAggregates(aggregates, reporter.SubmissionPairs) - assert.Equal(t, result, []GlobalAggregate{}) -} - -func TestIsAggValid(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - - reporter, err := testItems.app.GetReporterWithInterval(TestInterval) - if err != nil { - t.Fatalf("error getting reporter: %v", err) - } - - agg := GlobalAggregate{ - ConfigID: testItems.tmpData.config.ID, - Value: 15, - Round: 1, - } - result := IsAggValid(agg, reporter.SubmissionPairs) - assert.Equal(t, result, true) - - reporter.SubmissionPairs = map[int32]SubmissionPair{testItems.tmpData.config.ID: {LastSubmission: 1, Name: "test-aggregate"}} - result = IsAggValid(agg, reporter.SubmissionPairs) - assert.Equal(t, result, false) -} - -func TestMakeContractArgs(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - - reporter, err := testItems.app.GetReporterWithInterval(TestInterval) - if err != nil { - t.Fatalf("error getting reporter: %v", err) - } - - agg := GlobalAggregate{ - ConfigID: testItems.tmpData.config.ID, - Value: 15, - Round: 1, - Timestamp: testItems.tmpData.proofTime, - } - - rawProofs, err := GetProofsRdb(ctx, []GlobalAggregate{agg}) - if err != nil { - t.Fatal("error getting proofs") - } - - proofMap := ProofsToMap(rawProofs) - - feedHashes, values, timestamps, proofs, err := MakeContractArgsWithProofs([]GlobalAggregate{agg}, reporter.SubmissionPairs, proofMap) - if err != nil { - t.Fatal("error making contract args") - } - assert.Equal(t, [32]byte(crypto.Keccak256([]byte(reporter.SubmissionPairs[agg.ConfigID].Name))), feedHashes[0]) - assert.Equal(t, big.NewInt(15), values[0]) - - proofArr := make([][]byte, len(proofs)) - for i, p := range rawProofs { - proofArr[i] = p.Proof - } - - assert.EqualValues(t, proofs, proofArr) - assert.Equal(t, testItems.tmpData.proofTime.Unix(), timestamps[0].Int64()) - -} - -func TestGetLatestGlobalAggregatesRdb(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - reporter, err := testItems.app.GetReporterWithInterval(TestInterval) - if err != nil { - t.Fatalf("error getting reporter: %v", err) - } - - result, err := GetLatestGlobalAggregatesRdb(ctx, reporter.SubmissionPairs) - if err != nil { - t.Fatal("error getting latest global aggregates from rdb") - } - - assert.Equal(t, result[0].ConfigID, testItems.tmpData.globalAggregate.ConfigID) - assert.Equal(t, result[0].Value, testItems.tmpData.globalAggregate.Value) -} - -func TestGetLatestGlobalAggregatesPgsql(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - - reporter, err := testItems.app.GetReporterWithInterval(TestInterval) - if err != nil { - t.Fatalf("error getting reporter: %v", err) - } - - result, err := GetLatestGlobalAggregatesPgsql(ctx, reporter.SubmissionPairs) - if err != nil { - t.Fatal("error getting latest global aggregates from pgs") - } - - assert.Equal(t, result[0].ConfigID, testItems.tmpData.globalAggregate.ConfigID) - assert.Equal(t, result[0].Value, testItems.tmpData.globalAggregate.Value) -} - -func TestGetProofsRdb(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - - agg := testItems.tmpData.globalAggregate - result, err := GetProofsRdb(ctx, []GlobalAggregate{agg}) - if err != nil { - t.Fatal("error getting proofs from rdb") - } - assert.EqualValues(t, testItems.tmpData.proofBytes, result[0].Proof) -} - -func TestGetProofsPgsql(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - agg := testItems.tmpData.globalAggregate - result, err := GetProofsPgsql(ctx, []GlobalAggregate{agg}) - if err != nil { - t.Fatal("error getting proofs from pgsql") - } - assert.EqualValues(t, testItems.tmpData.proofBytes, result[0].Proof) -} - -func TestNewDeviationReporter(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - submissionPairs, err := getConfigs(ctx) - if err != nil { - t.Fatalf("error getting submission pairs: %v", err) - } - - contractAddress := os.Getenv("SUBMISSION_PROXY_CONTRACT") - if contractAddress == "" { - t.Fatal("SUBMISSION_PROXY_CONTRACT not set") - } - - tmpHelper, err := helper.NewChainHelper(ctx) - if err != nil { - t.Fatalf("error creating chain helper: %v", err) - } - defer tmpHelper.Close() - - whitelist, err := ReadOnchainWhitelist(ctx, tmpHelper, contractAddress, GET_ONCHAIN_WHITELIST) - if err != nil { - t.Fatalf("error reading onchain whitelist: %v", err) - } - _, err = NewReporter( - ctx, - WithHost(testItems.app.Host), - WithPubsub(testItems.app.Pubsub), - WithConfigs(submissionPairs), - WithInterval(5000), - WithContractAddress(contractAddress), - WithCachedWhitelist(whitelist), - WithJobType(DeviationJob), - ) - if err != nil { - t.Fatalf("error creating new deviation reporter: %v", err) - } -} - -func TestStoreAndGetLastSubmission(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - reporter, err := testItems.app.GetReporterWithInterval(TestInterval) - if err != nil { - t.Fatalf("error getting reporter: %v", err) - } - - aggregates, err := GetLatestGlobalAggregates(ctx, reporter.SubmissionPairs) - if err != nil { - t.Fatal("error getting latest global aggregates") - } - - err = StoreLastSubmission(ctx, aggregates) - if err != nil { - t.Fatal("error storing last submission") - } - - loadedAggregates, err := GetLastSubmission(ctx, reporter.SubmissionPairs) - if err != nil { - t.Fatal("error getting last submission") - } - - assert.EqualValues(t, aggregates, loadedAggregates) - -} - -func TestShouldReportDeviation(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) - if err != nil { - t.Fatalf("error setting reporters: %v", err) - } - - assert.False(t, ShouldReportDeviation(0, 0, 0.05)) - assert.True(t, ShouldReportDeviation(0, 100000000, 0.05)) - assert.False(t, ShouldReportDeviation(100000000000, 100100000000, 0.05)) - assert.True(t, ShouldReportDeviation(100000000000, 105100000000, 0.05)) - assert.False(t, ShouldReportDeviation(100000000000, 0, 0.05)) -} - -func TestGetDeviationThreshold(t *testing.T) { - ctx := context.Background() - cleanup, _, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - assert.Equal(t, 0.05, GetDeviationThreshold(15*time.Second)) - assert.Equal(t, 0.01, GetDeviationThreshold(60*time.Minute)) - assert.Equal(t, 0.05, GetDeviationThreshold(1*time.Second)) - assert.Equal(t, 0.01, GetDeviationThreshold(2*time.Hour)) - assert.Less(t, GetDeviationThreshold(30*time.Minute), 0.05) -} - -func TestGetDeviatingAggregates(t *testing.T) { - ctx := context.Background() - cleanup, _, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - oldAggregates := []GlobalAggregate{{ - ConfigID: 2, - Value: 15, - Round: 1, - }} - - newAggregates := []GlobalAggregate{{ - ConfigID: 2, - Value: 30, - Round: 2, - }} - - result := GetDeviatingAggregates(oldAggregates, newAggregates, 0.05) - assert.Equal(t, result, newAggregates) -} - -func TestDeviationJob(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := cleanup(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - submissionPairs, err := getConfigs(ctx) - if err != nil { - t.Fatalf("error getting submission pairs: %v", err) - } - - contractAddress := os.Getenv("SUBMISSION_PROXY_CONTRACT") - if contractAddress == "" { - t.Fatal("SUBMISSION_PROXY_CONTRACT not set") - } - - tmpHelper, err := helper.NewChainHelper(ctx) - if err != nil { - t.Fatalf("error creating chain helper: %v", err) - } - defer tmpHelper.Close() - - whitelist, err := ReadOnchainWhitelist(ctx, tmpHelper, contractAddress, GET_ONCHAIN_WHITELIST) - if err != nil { - t.Fatalf("error reading onchain whitelist: %v", err) - } - reporter, err := NewReporter( - ctx, - WithHost(testItems.app.Host), - WithPubsub(testItems.app.Pubsub), - WithConfigs(submissionPairs), - WithInterval(5000), - WithContractAddress(contractAddress), - WithCachedWhitelist(whitelist), - WithJobType(DeviationJob), - ) - if err != nil { - t.Fatalf("error creating new deviation reporter: %v", err) - } - - err = reporter.deviationJob() - if err != nil { - t.Fatal("error running deviation job") - } -} +// import ( +// "context" +// "math/big" +// "os" +// "testing" +// "time" + +// "bisonai.com/orakl/node/pkg/chain/helper" +// errorSentinel "bisonai.com/orakl/node/pkg/error" +// "bisonai.com/orakl/node/pkg/raft" +// "github.com/klaytn/klaytn/crypto" +// "github.com/stretchr/testify/assert" +// ) + +// func TestNewReporter(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() + +// submissionPairs, err := getConfigs(ctx) +// if err != nil { +// t.Fatalf("error getting submission pairs: %v", err) +// } +// groupedSubmissionPairs := groupConfigsBySubmitIntervals(submissionPairs) + +// contractAddress := os.Getenv("SUBMISSION_PROXY_CONTRACT") +// if contractAddress == "" { +// t.Fatal("SUBMISSION_PROXY_CONTRACT not set") +// } + +// tmpHelper, err := helper.NewChainHelper(ctx) +// if err != nil { +// t.Fatalf("error creating chain helper: %v", err) +// } +// defer tmpHelper.Close() + +// whitelist, err := ReadOnchainWhitelist(ctx, tmpHelper, contractAddress, GET_ONCHAIN_WHITELIST) +// if err != nil { +// t.Fatalf("error reading onchain whitelist: %v", err) +// } + +// for groupInterval, pairs := range groupedSubmissionPairs { +// _, err := NewReporter( +// ctx, +// WithHost(testItems.app.Host), +// WithPubsub(testItems.app.Pubsub), +// WithConfigs(pairs), +// WithInterval(groupInterval), +// WithContractAddress(contractAddress), +// WithCachedWhitelist(whitelist), +// ) +// if err != nil { +// t.Fatalf("error creating new reporter: %v", err) +// } +// } +// } + +// func TestLeaderJob(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } + +// reporter, err := testItems.app.GetReporterWithInterval(TestInterval) +// if err != nil { +// t.Fatalf("error getting reporter: %v", err) +// } + +// reporter.SetKaiaHelper(ctx) +// err = reporter.leaderJob() +// if err != nil { +// t.Fatal("error running leader job") +// } +// } + +// func TestResignLeader(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } +// reporter, err := testItems.app.GetReporterWithInterval(TestInterval) +// if err != nil { +// t.Fatalf("error getting reporter: %v", err) +// } +// reporter.resignLeader() +// assert.Equal(t, reporter.Raft.GetRole(), raft.Follower) +// } + +// func TestHandleCustomMessage(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } +// reporter, err := testItems.app.GetReporterWithInterval(TestInterval) +// if err != nil { +// t.Fatalf("error getting reporter: %v", err) +// } + +// err = reporter.handleCustomMessage(ctx, raft.Message{}) +// assert.ErrorIs(t, err, errorSentinel.ErrReporterUnknownMessageType) + +// } + +// func TestGetLatestGlobalAggregates(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } + +// reporter, err := testItems.app.GetReporterWithInterval(TestInterval) +// if err != nil { +// t.Fatalf("error getting reporter: %v", err) +// } +// result, err := GetLatestGlobalAggregates(ctx, reporter.SubmissionPairs) +// if err != nil { +// t.Fatal("error getting latest global aggregates") +// } + +// assert.Equal(t, result[0].ConfigID, testItems.tmpData.globalAggregate.ConfigID) +// assert.Equal(t, result[0].Value, testItems.tmpData.globalAggregate.Value) +// } + +// func TestFilterInvalidAggregates(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } + +// reporter, err := testItems.app.GetReporterWithInterval(TestInterval) +// if err != nil { +// t.Fatalf("error getting reporter: %v", err) +// } + +// aggregates := []GlobalAggregate{{ +// ConfigID: testItems.tmpData.config.ID, +// Value: 15, +// Round: 1, +// }} +// result := FilterInvalidAggregates(aggregates, reporter.SubmissionPairs) +// assert.Equal(t, result, aggregates) + +// reporter.SubmissionPairs = map[int32]SubmissionPair{testItems.tmpData.config.ID: {LastSubmission: 1, Name: "test-aggregate"}} +// result = FilterInvalidAggregates(aggregates, reporter.SubmissionPairs) +// assert.Equal(t, result, []GlobalAggregate{}) +// } + +// func TestIsAggValid(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } + +// reporter, err := testItems.app.GetReporterWithInterval(TestInterval) +// if err != nil { +// t.Fatalf("error getting reporter: %v", err) +// } + +// agg := GlobalAggregate{ +// ConfigID: testItems.tmpData.config.ID, +// Value: 15, +// Round: 1, +// } +// result := IsAggValid(agg, reporter.SubmissionPairs) +// assert.Equal(t, result, true) + +// reporter.SubmissionPairs = map[int32]SubmissionPair{testItems.tmpData.config.ID: {LastSubmission: 1, Name: "test-aggregate"}} +// result = IsAggValid(agg, reporter.SubmissionPairs) +// assert.Equal(t, result, false) +// } + +// func TestMakeContractArgs(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } + +// reporter, err := testItems.app.GetReporterWithInterval(TestInterval) +// if err != nil { +// t.Fatalf("error getting reporter: %v", err) +// } + +// agg := GlobalAggregate{ +// ConfigID: testItems.tmpData.config.ID, +// Value: 15, +// Round: 1, +// Timestamp: testItems.tmpData.proofTime, +// } + +// rawProofs, err := GetProofsRdb(ctx, []GlobalAggregate{agg}) +// if err != nil { +// t.Fatal("error getting proofs") +// } + +// proofMap := ProofsToMap(rawProofs) + +// feedHashes, values, timestamps, proofs, err := MakeContractArgsWithProofs([]GlobalAggregate{agg}, reporter.SubmissionPairs, proofMap) +// if err != nil { +// t.Fatal("error making contract args") +// } +// assert.Equal(t, [32]byte(crypto.Keccak256([]byte(reporter.SubmissionPairs[agg.ConfigID].Name))), feedHashes[0]) +// assert.Equal(t, big.NewInt(15), values[0]) + +// proofArr := make([][]byte, len(proofs)) +// for i, p := range rawProofs { +// proofArr[i] = p.Proof +// } + +// assert.EqualValues(t, proofs, proofArr) +// assert.Equal(t, testItems.tmpData.proofTime.Unix(), timestamps[0].Int64()) + +// } + +// func TestGetLatestGlobalAggregatesRdb(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() + +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } +// reporter, err := testItems.app.GetReporterWithInterval(TestInterval) +// if err != nil { +// t.Fatalf("error getting reporter: %v", err) +// } + +// result, err := GetLatestGlobalAggregatesRdb(ctx, reporter.SubmissionPairs) +// if err != nil { +// t.Fatal("error getting latest global aggregates from rdb") +// } + +// assert.Equal(t, result[0].ConfigID, testItems.tmpData.globalAggregate.ConfigID) +// assert.Equal(t, result[0].Value, testItems.tmpData.globalAggregate.Value) +// } + +// func TestGetLatestGlobalAggregatesPgsql(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() + +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } + +// reporter, err := testItems.app.GetReporterWithInterval(TestInterval) +// if err != nil { +// t.Fatalf("error getting reporter: %v", err) +// } + +// result, err := GetLatestGlobalAggregatesPgsql(ctx, reporter.SubmissionPairs) +// if err != nil { +// t.Fatal("error getting latest global aggregates from pgs") +// } + +// assert.Equal(t, result[0].ConfigID, testItems.tmpData.globalAggregate.ConfigID) +// assert.Equal(t, result[0].Value, testItems.tmpData.globalAggregate.Value) +// } + +// func TestGetProofsRdb(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() + +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } + +// agg := testItems.tmpData.globalAggregate +// result, err := GetProofsRdb(ctx, []GlobalAggregate{agg}) +// if err != nil { +// t.Fatal("error getting proofs from rdb") +// } +// assert.EqualValues(t, testItems.tmpData.proofBytes, result[0].Proof) +// } + +// func TestGetProofsPgsql(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() + +// agg := testItems.tmpData.globalAggregate +// result, err := GetProofsPgsql(ctx, []GlobalAggregate{agg}) +// if err != nil { +// t.Fatal("error getting proofs from pgsql") +// } +// assert.EqualValues(t, testItems.tmpData.proofBytes, result[0].Proof) +// } + +// func TestNewDeviationReporter(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() + +// submissionPairs, err := getConfigs(ctx) +// if err != nil { +// t.Fatalf("error getting submission pairs: %v", err) +// } + +// contractAddress := os.Getenv("SUBMISSION_PROXY_CONTRACT") +// if contractAddress == "" { +// t.Fatal("SUBMISSION_PROXY_CONTRACT not set") +// } + +// tmpHelper, err := helper.NewChainHelper(ctx) +// if err != nil { +// t.Fatalf("error creating chain helper: %v", err) +// } +// defer tmpHelper.Close() + +// whitelist, err := ReadOnchainWhitelist(ctx, tmpHelper, contractAddress, GET_ONCHAIN_WHITELIST) +// if err != nil { +// t.Fatalf("error reading onchain whitelist: %v", err) +// } +// _, err = NewReporter( +// ctx, +// WithHost(testItems.app.Host), +// WithPubsub(testItems.app.Pubsub), +// WithConfigs(submissionPairs), +// WithInterval(5000), +// WithContractAddress(contractAddress), +// WithCachedWhitelist(whitelist), +// WithJobType(DeviationJob), +// ) +// if err != nil { +// t.Fatalf("error creating new deviation reporter: %v", err) +// } +// } + +// func TestStoreAndGetLastSubmission(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() + +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } +// reporter, err := testItems.app.GetReporterWithInterval(TestInterval) +// if err != nil { +// t.Fatalf("error getting reporter: %v", err) +// } + +// aggregates, err := GetLatestGlobalAggregates(ctx, reporter.SubmissionPairs) +// if err != nil { +// t.Fatal("error getting latest global aggregates") +// } + +// err = StoreLastSubmission(ctx, aggregates) +// if err != nil { +// t.Fatal("error storing last submission") +// } + +// loadedAggregates, err := GetLastSubmission(ctx, reporter.SubmissionPairs) +// if err != nil { +// t.Fatal("error getting last submission") +// } + +// assert.EqualValues(t, aggregates, loadedAggregates) + +// } + +// func TestShouldReportDeviation(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() + +// err = testItems.app.setReporters(ctx, testItems.app.Host, testItems.app.Pubsub) +// if err != nil { +// t.Fatalf("error setting reporters: %v", err) +// } + +// assert.False(t, ShouldReportDeviation(0, 0, 0.05)) +// assert.True(t, ShouldReportDeviation(0, 100000000, 0.05)) +// assert.False(t, ShouldReportDeviation(100000000000, 100100000000, 0.05)) +// assert.True(t, ShouldReportDeviation(100000000000, 105100000000, 0.05)) +// assert.False(t, ShouldReportDeviation(100000000000, 0, 0.05)) +// } + +// func TestGetDeviationThreshold(t *testing.T) { +// ctx := context.Background() +// cleanup, _, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() + +// assert.Equal(t, 0.05, GetDeviationThreshold(15*time.Second)) +// assert.Equal(t, 0.01, GetDeviationThreshold(60*time.Minute)) +// assert.Equal(t, 0.05, GetDeviationThreshold(1*time.Second)) +// assert.Equal(t, 0.01, GetDeviationThreshold(2*time.Hour)) +// assert.Less(t, GetDeviationThreshold(30*time.Minute), 0.05) +// } + +// func TestGetDeviatingAggregates(t *testing.T) { +// ctx := context.Background() +// cleanup, _, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() + +// oldAggregates := []GlobalAggregate{{ +// ConfigID: 2, +// Value: 15, +// Round: 1, +// }} + +// newAggregates := []GlobalAggregate{{ +// ConfigID: 2, +// Value: 30, +// Round: 2, +// }} + +// result := GetDeviatingAggregates(oldAggregates, newAggregates, 0.05) +// assert.Equal(t, result, newAggregates) +// } + +// func TestDeviationJob(t *testing.T) { +// ctx := context.Background() +// cleanup, testItems, err := setup(ctx) +// if err != nil { +// t.Fatalf("error setting up test: %v", err) +// } +// defer func() { +// if cleanupErr := cleanup(); cleanupErr != nil { +// t.Logf("Cleanup failed: %v", cleanupErr) +// } +// }() + +// submissionPairs, err := getConfigs(ctx) +// if err != nil { +// t.Fatalf("error getting submission pairs: %v", err) +// } + +// contractAddress := os.Getenv("SUBMISSION_PROXY_CONTRACT") +// if contractAddress == "" { +// t.Fatal("SUBMISSION_PROXY_CONTRACT not set") +// } + +// tmpHelper, err := helper.NewChainHelper(ctx) +// if err != nil { +// t.Fatalf("error creating chain helper: %v", err) +// } +// defer tmpHelper.Close() + +// whitelist, err := ReadOnchainWhitelist(ctx, tmpHelper, contractAddress, GET_ONCHAIN_WHITELIST) +// if err != nil { +// t.Fatalf("error reading onchain whitelist: %v", err) +// } +// reporter, err := NewReporter( +// ctx, +// WithHost(testItems.app.Host), +// WithPubsub(testItems.app.Pubsub), +// WithConfigs(submissionPairs), +// WithInterval(5000), +// WithContractAddress(contractAddress), +// WithCachedWhitelist(whitelist), +// WithJobType(DeviationJob), +// ) +// if err != nil { +// t.Fatalf("error creating new deviation reporter: %v", err) +// } + +// err = reporter.deviationJob() +// if err != nil { +// t.Fatal("error running deviation job") +// } +// } diff --git a/node/pkg/reporter/types.go b/node/pkg/reporter/types.go index d04d641ea..abf998887 100644 --- a/node/pkg/reporter/types.go +++ b/node/pkg/reporter/types.go @@ -6,13 +6,11 @@ import ( "strings" "time" - "bisonai.com/orakl/node/pkg/bus" "bisonai.com/orakl/node/pkg/chain/helper" "bisonai.com/orakl/node/pkg/common/types" + dalcommon "bisonai.com/orakl/node/pkg/dal/common" "bisonai.com/orakl/node/pkg/raft" "github.com/klaytn/klaytn/common" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/host" ) const ( @@ -53,10 +51,8 @@ type SubmissionPair struct { } type App struct { - Reporters []*Reporter - Bus *bus.MessageBus - Host host.Host - Pubsub *pubsub.PubSub + Reporters []*Reporter + chainHelper *helper.ChainHelper } type JobType int @@ -67,29 +63,17 @@ const ( ) type ReporterConfig struct { - Host host.Host - Ps *pubsub.PubSub Configs []Config Interval int ContractAddress string CachedWhitelist []common.Address JobType JobType + DalEndpoint string + DalApiKey string } type ReporterOption func(*ReporterConfig) -func WithHost(h host.Host) ReporterOption { - return func(c *ReporterConfig) { - c.Host = h - } -} - -func WithPubsub(ps *pubsub.PubSub) ReporterOption { - return func(c *ReporterConfig) { - c.Ps = ps - } -} - func WithConfigs(configs []Config) ReporterOption { return func(c *ReporterConfig) { c.Configs = configs @@ -120,13 +104,27 @@ func WithJobType(jobType JobType) ReporterOption { } } +func WithDalEndpoint(endpoint string) ReporterOption { + return func(c *ReporterConfig) { + c.DalEndpoint = endpoint + } +} + +func WithDalApiKey(apiKey string) ReporterOption { + return func(c *ReporterConfig) { + c.DalApiKey = apiKey + } +} + type Reporter struct { - Raft *raft.Raft KaiaHelper *helper.ChainHelper SubmissionPairs map[int32]SubmissionPair SubmissionInterval time.Duration CachedWhitelist []common.Address + DalEndpoint string + DalApiKey string + contractAddress string deviationThreshold float64 @@ -139,6 +137,8 @@ type GlobalAggregate types.GlobalAggregate type Proof types.Proof +type SubmissionData dalcommon.OutgoingSubmissionData + type SubmissionMessage struct { Submissions []GlobalAggregate `json:"submissions"` } diff --git a/node/pkg/reporter/utils.go b/node/pkg/reporter/utils.go index 53f08ab8b..a01c45ea7 100644 --- a/node/pkg/reporter/utils.go +++ b/node/pkg/reporter/utils.go @@ -86,41 +86,35 @@ func ProofsToMap(proofs []Proof) map[int32][]byte { return m } -func MakeContractArgsWithProofs(aggregates []GlobalAggregate, submissionPairs map[int32]SubmissionPair, proofMap map[int32][]byte) ([][32]byte, []*big.Int, []*big.Int, [][]byte, error) { +func MakeContractArgsWithProofs(aggregates []GlobalAggregate, submissionPairs map[int32]SubmissionPair) ([][32]byte, []*big.Int, []*big.Int, error) { if len(aggregates) == 0 { - return nil, nil, nil, nil, errorSentinel.ErrReporterEmptyAggregatesParam + return nil, nil, nil, errorSentinel.ErrReporterEmptyAggregatesParam } if len(submissionPairs) == 0 { - return nil, nil, nil, nil, errorSentinel.ErrReporterEmptySubmissionPairsParam - } - - if len(proofMap) == 0 { - return nil, nil, nil, nil, errorSentinel.ErrReporterEmptyProofParam + return nil, nil, nil, errorSentinel.ErrReporterEmptySubmissionPairsParam } feedHash := make([][32]byte, len(aggregates)) values := make([]*big.Int, len(aggregates)) timestamps := make([]*big.Int, len(aggregates)) - proofs := make([][]byte, len(aggregates)) for i, agg := range aggregates { if agg.ConfigID == 0 || agg.Value < 0 { log.Error().Str("Player", "Reporter").Int32("configId", agg.ConfigID).Int64("value", agg.Value).Msg("skipping invalid aggregate") - return nil, nil, nil, nil, errorSentinel.ErrReporterInvalidAggregateFound + return nil, nil, nil, errorSentinel.ErrReporterInvalidAggregateFound } name := submissionPairs[agg.ConfigID].Name copy(feedHash[i][:], crypto.Keccak256([]byte(name))) values[i] = big.NewInt(agg.Value) timestamps[i] = big.NewInt(agg.Timestamp.Unix()) - proofs[i] = proofMap[agg.ConfigID] } - if len(feedHash) == 0 || len(values) == 0 || len(proofs) == 0 || len(timestamps) == 0 { - return nil, nil, nil, nil, errorSentinel.ErrReporterEmptyValidAggregates + if len(feedHash) == 0 || len(values) == 0 || len(timestamps) == 0 { + return nil, nil, nil, errorSentinel.ErrReporterEmptyValidAggregates } - return feedHash, values, timestamps, proofs, nil + return feedHash, values, timestamps, nil } func FilterInvalidAggregates(aggregates []GlobalAggregate, submissionPairs map[int32]SubmissionPair) []GlobalAggregate { @@ -252,23 +246,6 @@ func isWhitelisted(signer common.Address, whitelist []common.Address) bool { return false } -func OrderProof(signerMap map[common.Address][]byte, whitelist []common.Address) ([]byte, error) { - tmpProofs := make([][]byte, 0, len(whitelist)) - for _, signer := range whitelist { - tmpProof, ok := signerMap[signer] - if ok { - tmpProofs = append(tmpProofs, tmpProof) - } - } - - if len(tmpProofs) == 0 { - log.Error().Str("Player", "Reporter").Msg("no valid proofs") - return nil, errorSentinel.ErrReporterEmptyValidProofs - } - - return bytes.Join(tmpProofs, nil), nil -} - func GetSignerMap(signers []common.Address, proofChunks [][]byte) map[common.Address][]byte { signerMap := make(map[common.Address][]byte) for i, signer := range signers { diff --git a/node/pkg/reporter/utils_test.go b/node/pkg/reporter/utils_test.go index 45e0bdf69..e2574ff12 100644 --- a/node/pkg/reporter/utils_test.go +++ b/node/pkg/reporter/utils_test.go @@ -1,307 +1,307 @@ package reporter -import ( - "context" - "strconv" - "testing" - "time" - - chainUtils "bisonai.com/orakl/node/pkg/chain/utils" - "bisonai.com/orakl/node/pkg/db" - errorSentinel "bisonai.com/orakl/node/pkg/error" - "github.com/klaytn/klaytn/common" - "github.com/stretchr/testify/assert" -) - -func TestCheckForNonWhitelistedSigners(t *testing.T) { - whitelist := []common.Address{ - common.HexToAddress("0x1234567890abcdef"), - common.HexToAddress("0xabcdef1234567890"), - } - - t.Run("All signers whitelisted", func(t *testing.T) { - signers := []common.Address{ - common.HexToAddress("0x1234567890abcdef"), - common.HexToAddress("0xabcdef1234567890"), - } - - err := CheckForNonWhitelistedSigners(signers, whitelist) - - assert.NoError(t, err) - }) - - t.Run("One non-whitelisted signer", func(t *testing.T) { - signers := []common.Address{ - common.HexToAddress("0x1234567890abcdef"), - common.HexToAddress("0xabcdef1234567890"), - common.HexToAddress("0xdeadbeefdeadbeef"), - } - - err := CheckForNonWhitelistedSigners(signers, whitelist) - - assert.ErrorIs(t, err, errorSentinel.ErrReporterSignerNotWhitelisted) - }) - - t.Run("Empty signers", func(t *testing.T) { - signers := []common.Address{} - - err := CheckForNonWhitelistedSigners(signers, whitelist) - - assert.NoError(t, err) - }) -} - -func TestOrderProof(t *testing.T) { - t.Run("Valid proofs", func(t *testing.T) { - signerMap := map[common.Address][]byte{ - common.HexToAddress("0x1234567890abcdef"): []byte("proof1"), - common.HexToAddress("0xabcdef1234567890"): []byte("proof2"), - common.HexToAddress("0xdeadbeefdeadbeef"): []byte("proof3"), - } - - whitelist := []common.Address{ - common.HexToAddress("0x1234567890abcdef"), - common.HexToAddress("0xabcdef1234567890"), - } - - expectedProof := []byte("proof1proof2") - proof, err := OrderProof(signerMap, whitelist) - - assert.NoError(t, err) - assert.Equal(t, expectedProof, proof) - }) - - t.Run("Order should change", func(t *testing.T) { - signerMap := map[common.Address][]byte{ - common.HexToAddress("0x1234567890abcdef"): []byte("proof1"), - common.HexToAddress("0xabcdef1234567890"): []byte("proof2"), - common.HexToAddress("0xdeadbeefdeadbeef"): []byte("proof3"), - } - - whitelist := []common.Address{ - common.HexToAddress("0xabcdef1234567890"), - common.HexToAddress("0x1234567890abcdef"), - } - - expectedProof := []byte("proof2proof1") - proof, err := OrderProof(signerMap, whitelist) - - assert.NoError(t, err) - assert.Equal(t, expectedProof, proof) - }) - - t.Run("No valid proofs", func(t *testing.T) { - signerMap := map[common.Address][]byte{ - common.HexToAddress("0xdeadbeefdeadbeef"): []byte("proof3"), - } - - whitelist := []common.Address{ - common.HexToAddress("0x1234567890abcdef"), - common.HexToAddress("0xabcdef1234567890"), - } - - proof, err := OrderProof(signerMap, whitelist) - - assert.ErrorIs(t, err, errorSentinel.ErrReporterEmptyValidProofs) - assert.Nil(t, proof) - }) -} -func TestGetSignerMap(t *testing.T) { - signers := []common.Address{ - common.HexToAddress("0x1234567890abcdef"), - common.HexToAddress("0xabcdef1234567890"), - common.HexToAddress("0xdeadbeefdeadbeef"), - } - - proofChunks := [][]byte{ - []byte("proof1"), - []byte("proof2"), - []byte("proof3"), - } - - expectedSignerMap := map[common.Address][]byte{ - common.HexToAddress("0x1234567890abcdef"): []byte("proof1"), - common.HexToAddress("0xabcdef1234567890"): []byte("proof2"), - common.HexToAddress("0xdeadbeefdeadbeef"): []byte("proof3"), - } - - signerMap := GetSignerMap(signers, proofChunks) - - assert.Equal(t, expectedSignerMap, signerMap) -} - -func TestGetSignerListFromProofs(t *testing.T) { - testValue := int64(10) - testTimestamp := time.Now().Unix() - testName := "test" - - hash := chainUtils.Value2HashForSign(testValue, testTimestamp, testName) - test_pk_0 := "737ea08c90c582aafdd7644ec492ee685df711df1ca055fd351938a493058217" - test_pk_1 := "c2235dcc40306325e1e060b066edb728a1734a377a9648461526101e5365ac56" - pk_0, err := chainUtils.StringToPk(test_pk_0) - if err != nil { - t.Fatalf("Failed to convert string to pk: %v", err) - } - pk_1, err := chainUtils.StringToPk(test_pk_1) - if err != nil { - t.Fatalf("Failed to convert string to pk: %v", err) - } - - sig_0, err := chainUtils.MakeValueSignature(testValue, testTimestamp, testName, pk_0) - if err != nil { - t.Fatalf("Failed to make value signature: %v", err) - } - sig_1, err := chainUtils.MakeValueSignature(testValue, testTimestamp, testName, pk_1) - if err != nil { - t.Fatalf("Failed to make value signature: %v", err) - } - - proofChunks := [][]byte{ - sig_0, - sig_1, - } - - expectedSigners := []common.Address{ - common.HexToAddress("0x2138824ef8741add09E8680F968e1d5D0AC155E0"), - common.HexToAddress("0xd7b29E19c08d412d9c3c96D00cC53609F313D4E9"), - } - - signers, err := GetSignerListFromProofs(hash, proofChunks) - - assert.NoError(t, err) - assert.Equal(t, expectedSigners, signers) -} - -func TestSplitProofToChunk(t *testing.T) { - t.Run("Empty proof", func(t *testing.T) { - proof := []byte{} - chunks, err := SplitProofToChunk(proof) - - assert.Nil(t, chunks) - assert.ErrorIs(t, err, errorSentinel.ErrReporterEmptyProofParam) - }) - - t.Run("Invalid proof length", func(t *testing.T) { - proof := []byte("invalidproof") - chunks, err := SplitProofToChunk(proof) - - assert.Nil(t, chunks) - assert.ErrorIs(t, err, errorSentinel.ErrReporterInvalidProofLength) - }) - - t.Run("Valid proof", func(t *testing.T) { - proof := []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalidvalidproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid") - expectedChunks := [][]byte{ - []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid"), - []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid"), - } - chunks, err := SplitProofToChunk(proof) - - assert.Equal(t, expectedChunks, chunks) - assert.NoError(t, err) - }) -} - -func TestRemoveDuplicateProof(t *testing.T) { - t.Run("Empty proof", func(t *testing.T) { - proof := []byte{} - expectedResult := []byte{} - result := RemoveDuplicateProof(proof) - - assert.Equal(t, expectedResult, result) - }) - - t.Run("No duplicate proofs", func(t *testing.T) { - proof := []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid") - expectedResult := []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid") - result := RemoveDuplicateProof(proof) - - assert.Equal(t, expectedResult, result) - }) - - t.Run("Duplicate proofs", func(t *testing.T) { - proof := []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalidvalidproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid") - expectedResult := []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid") - result := RemoveDuplicateProof(proof) - - assert.Equal(t, expectedResult, result) - }) -} - -func TestUpdateProofs(t *testing.T) { - ctx := context.Background() - defer func() { - err := db.QueryWithoutResult(ctx, "DELETE FROM proofs", nil) - if err != nil { - t.Logf("QueryWithoutResult failed: %v", err) - } - - err = db.QueryWithoutResult(ctx, "DELETE FROM configs", nil) - if err != nil { - t.Logf("QueryWithoutResult failed: %v", err) - } - }() - - tmpConfigs := []Config{} - for i := 0; i < 3; i++ { - tmpConfig, err := db.QueryRow[Config](ctx, InsertConfigQuery, map[string]any{ - "name": "test-aggregate-" + strconv.Itoa(i), - "address": "0x1234" + strconv.Itoa(i), - "submit_interval": TestInterval, - "fetch_interval": TestInterval, - "aggregate_interval": TestInterval}) - if err != nil { - t.Fatalf("QueryRow failed: %v", err) - } - tmpConfigs = append(tmpConfigs, tmpConfig) - } - - aggregates := []GlobalAggregate{ - {ConfigID: tmpConfigs[0].ID, Round: 1}, - {ConfigID: tmpConfigs[1].ID, Round: 2}, - {ConfigID: tmpConfigs[2].ID, Round: 3}, - } - - proofMap := map[int32][]byte{ - tmpConfigs[0].ID: []byte("proof1"), - tmpConfigs[1].ID: []byte("proof2"), - } - - expectedUpsertRows := [][]any{ - {tmpConfigs[0].ID, int32(1), []byte("proof1")}, - {tmpConfigs[1].ID, int32(2), []byte("proof2")}, - } - err := UpsertProofs(ctx, aggregates, proofMap) - if err != nil { - t.Fatalf("UpsertProofs failed: %v", err) - } - result, err := db.QueryRows[Proof](ctx, "SELECT config_id, round, proof FROM proofs WHERE config_id IN ("+strconv.Itoa(int(tmpConfigs[0].ID))+", "+strconv.Itoa(int(tmpConfigs[1].ID))+")", nil) - if err != nil { - t.Fatalf("QueryRows failed: %v", err) - } - - for i, p := range result { - assert.Equal(t, expectedUpsertRows[i], []any{p.ConfigID, p.Round, p.Proof}) - } - - proofMap = map[int32][]byte{ - tmpConfigs[0].ID: []byte("proof3"), - tmpConfigs[1].ID: []byte("proof4"), - } - expectedUpsertRows = [][]any{ - {tmpConfigs[0].ID, int32(1), []byte("proof3")}, - {tmpConfigs[1].ID, int32(2), []byte("proof4")}, - } - err = UpdateProofs(ctx, aggregates, proofMap) - if err != nil { - t.Fatalf("UpdateProofs failed: %v", err) - } - result, err = db.QueryRows[Proof](ctx, "SELECT config_id, round, proof FROM proofs WHERE config_id IN ("+strconv.Itoa(int(tmpConfigs[0].ID))+", "+strconv.Itoa(int(tmpConfigs[1].ID))+")", nil) - if err != nil { - t.Fatalf("QueryRows failed: %v", err) - } - - for i, p := range result { - assert.Equal(t, expectedUpsertRows[i], []any{p.ConfigID, p.Round, p.Proof}) - } -} +// import ( +// "context" +// "strconv" +// "testing" +// "time" + +// chainUtils "bisonai.com/orakl/node/pkg/chain/utils" +// "bisonai.com/orakl/node/pkg/db" +// errorSentinel "bisonai.com/orakl/node/pkg/error" +// "github.com/klaytn/klaytn/common" +// "github.com/stretchr/testify/assert" +// ) + +// func TestCheckForNonWhitelistedSigners(t *testing.T) { +// whitelist := []common.Address{ +// common.HexToAddress("0x1234567890abcdef"), +// common.HexToAddress("0xabcdef1234567890"), +// } + +// t.Run("All signers whitelisted", func(t *testing.T) { +// signers := []common.Address{ +// common.HexToAddress("0x1234567890abcdef"), +// common.HexToAddress("0xabcdef1234567890"), +// } + +// err := CheckForNonWhitelistedSigners(signers, whitelist) + +// assert.NoError(t, err) +// }) + +// t.Run("One non-whitelisted signer", func(t *testing.T) { +// signers := []common.Address{ +// common.HexToAddress("0x1234567890abcdef"), +// common.HexToAddress("0xabcdef1234567890"), +// common.HexToAddress("0xdeadbeefdeadbeef"), +// } + +// err := CheckForNonWhitelistedSigners(signers, whitelist) + +// assert.ErrorIs(t, err, errorSentinel.ErrReporterSignerNotWhitelisted) +// }) + +// t.Run("Empty signers", func(t *testing.T) { +// signers := []common.Address{} + +// err := CheckForNonWhitelistedSigners(signers, whitelist) + +// assert.NoError(t, err) +// }) +// } + +// func TestOrderProof(t *testing.T) { +// t.Run("Valid proofs", func(t *testing.T) { +// signerMap := map[common.Address][]byte{ +// common.HexToAddress("0x1234567890abcdef"): []byte("proof1"), +// common.HexToAddress("0xabcdef1234567890"): []byte("proof2"), +// common.HexToAddress("0xdeadbeefdeadbeef"): []byte("proof3"), +// } + +// whitelist := []common.Address{ +// common.HexToAddress("0x1234567890abcdef"), +// common.HexToAddress("0xabcdef1234567890"), +// } + +// expectedProof := []byte("proof1proof2") +// proof, err := OrderProof(signerMap, whitelist) + +// assert.NoError(t, err) +// assert.Equal(t, expectedProof, proof) +// }) + +// t.Run("Order should change", func(t *testing.T) { +// signerMap := map[common.Address][]byte{ +// common.HexToAddress("0x1234567890abcdef"): []byte("proof1"), +// common.HexToAddress("0xabcdef1234567890"): []byte("proof2"), +// common.HexToAddress("0xdeadbeefdeadbeef"): []byte("proof3"), +// } + +// whitelist := []common.Address{ +// common.HexToAddress("0xabcdef1234567890"), +// common.HexToAddress("0x1234567890abcdef"), +// } + +// expectedProof := []byte("proof2proof1") +// proof, err := OrderProof(signerMap, whitelist) + +// assert.NoError(t, err) +// assert.Equal(t, expectedProof, proof) +// }) + +// t.Run("No valid proofs", func(t *testing.T) { +// signerMap := map[common.Address][]byte{ +// common.HexToAddress("0xdeadbeefdeadbeef"): []byte("proof3"), +// } + +// whitelist := []common.Address{ +// common.HexToAddress("0x1234567890abcdef"), +// common.HexToAddress("0xabcdef1234567890"), +// } + +// proof, err := OrderProof(signerMap, whitelist) + +// assert.ErrorIs(t, err, errorSentinel.ErrReporterEmptyValidProofs) +// assert.Nil(t, proof) +// }) +// } +// func TestGetSignerMap(t *testing.T) { +// signers := []common.Address{ +// common.HexToAddress("0x1234567890abcdef"), +// common.HexToAddress("0xabcdef1234567890"), +// common.HexToAddress("0xdeadbeefdeadbeef"), +// } + +// proofChunks := [][]byte{ +// []byte("proof1"), +// []byte("proof2"), +// []byte("proof3"), +// } + +// expectedSignerMap := map[common.Address][]byte{ +// common.HexToAddress("0x1234567890abcdef"): []byte("proof1"), +// common.HexToAddress("0xabcdef1234567890"): []byte("proof2"), +// common.HexToAddress("0xdeadbeefdeadbeef"): []byte("proof3"), +// } + +// signerMap := GetSignerMap(signers, proofChunks) + +// assert.Equal(t, expectedSignerMap, signerMap) +// } + +// func TestGetSignerListFromProofs(t *testing.T) { +// testValue := int64(10) +// testTimestamp := time.Now().Unix() +// testName := "test" + +// hash := chainUtils.Value2HashForSign(testValue, testTimestamp, testName) +// test_pk_0 := "737ea08c90c582aafdd7644ec492ee685df711df1ca055fd351938a493058217" +// test_pk_1 := "c2235dcc40306325e1e060b066edb728a1734a377a9648461526101e5365ac56" +// pk_0, err := chainUtils.StringToPk(test_pk_0) +// if err != nil { +// t.Fatalf("Failed to convert string to pk: %v", err) +// } +// pk_1, err := chainUtils.StringToPk(test_pk_1) +// if err != nil { +// t.Fatalf("Failed to convert string to pk: %v", err) +// } + +// sig_0, err := chainUtils.MakeValueSignature(testValue, testTimestamp, testName, pk_0) +// if err != nil { +// t.Fatalf("Failed to make value signature: %v", err) +// } +// sig_1, err := chainUtils.MakeValueSignature(testValue, testTimestamp, testName, pk_1) +// if err != nil { +// t.Fatalf("Failed to make value signature: %v", err) +// } + +// proofChunks := [][]byte{ +// sig_0, +// sig_1, +// } + +// expectedSigners := []common.Address{ +// common.HexToAddress("0x2138824ef8741add09E8680F968e1d5D0AC155E0"), +// common.HexToAddress("0xd7b29E19c08d412d9c3c96D00cC53609F313D4E9"), +// } + +// signers, err := GetSignerListFromProofs(hash, proofChunks) + +// assert.NoError(t, err) +// assert.Equal(t, expectedSigners, signers) +// } + +// func TestSplitProofToChunk(t *testing.T) { +// t.Run("Empty proof", func(t *testing.T) { +// proof := []byte{} +// chunks, err := SplitProofToChunk(proof) + +// assert.Nil(t, chunks) +// assert.ErrorIs(t, err, errorSentinel.ErrReporterEmptyProofParam) +// }) + +// t.Run("Invalid proof length", func(t *testing.T) { +// proof := []byte("invalidproof") +// chunks, err := SplitProofToChunk(proof) + +// assert.Nil(t, chunks) +// assert.ErrorIs(t, err, errorSentinel.ErrReporterInvalidProofLength) +// }) + +// t.Run("Valid proof", func(t *testing.T) { +// proof := []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalidvalidproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid") +// expectedChunks := [][]byte{ +// []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid"), +// []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid"), +// } +// chunks, err := SplitProofToChunk(proof) + +// assert.Equal(t, expectedChunks, chunks) +// assert.NoError(t, err) +// }) +// } + +// func TestRemoveDuplicateProof(t *testing.T) { +// t.Run("Empty proof", func(t *testing.T) { +// proof := []byte{} +// expectedResult := []byte{} +// result := RemoveDuplicateProof(proof) + +// assert.Equal(t, expectedResult, result) +// }) + +// t.Run("No duplicate proofs", func(t *testing.T) { +// proof := []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid") +// expectedResult := []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid") +// result := RemoveDuplicateProof(proof) + +// assert.Equal(t, expectedResult, result) +// }) + +// t.Run("Duplicate proofs", func(t *testing.T) { +// proof := []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalidvalidproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid") +// expectedResult := []byte("validproofvalidproofvalidproofvalidproofvalidproofvalidproofvalid") +// result := RemoveDuplicateProof(proof) + +// assert.Equal(t, expectedResult, result) +// }) +// } + +// func TestUpdateProofs(t *testing.T) { +// ctx := context.Background() +// defer func() { +// err := db.QueryWithoutResult(ctx, "DELETE FROM proofs", nil) +// if err != nil { +// t.Logf("QueryWithoutResult failed: %v", err) +// } + +// err = db.QueryWithoutResult(ctx, "DELETE FROM configs", nil) +// if err != nil { +// t.Logf("QueryWithoutResult failed: %v", err) +// } +// }() + +// tmpConfigs := []Config{} +// for i := 0; i < 3; i++ { +// tmpConfig, err := db.QueryRow[Config](ctx, InsertConfigQuery, map[string]any{ +// "name": "test-aggregate-" + strconv.Itoa(i), +// "address": "0x1234" + strconv.Itoa(i), +// "submit_interval": TestInterval, +// "fetch_interval": TestInterval, +// "aggregate_interval": TestInterval}) +// if err != nil { +// t.Fatalf("QueryRow failed: %v", err) +// } +// tmpConfigs = append(tmpConfigs, tmpConfig) +// } + +// aggregates := []GlobalAggregate{ +// {ConfigID: tmpConfigs[0].ID, Round: 1}, +// {ConfigID: tmpConfigs[1].ID, Round: 2}, +// {ConfigID: tmpConfigs[2].ID, Round: 3}, +// } + +// proofMap := map[int32][]byte{ +// tmpConfigs[0].ID: []byte("proof1"), +// tmpConfigs[1].ID: []byte("proof2"), +// } + +// expectedUpsertRows := [][]any{ +// {tmpConfigs[0].ID, int32(1), []byte("proof1")}, +// {tmpConfigs[1].ID, int32(2), []byte("proof2")}, +// } +// err := UpsertProofs(ctx, aggregates, proofMap) +// if err != nil { +// t.Fatalf("UpsertProofs failed: %v", err) +// } +// result, err := db.QueryRows[Proof](ctx, "SELECT config_id, round, proof FROM proofs WHERE config_id IN ("+strconv.Itoa(int(tmpConfigs[0].ID))+", "+strconv.Itoa(int(tmpConfigs[1].ID))+")", nil) +// if err != nil { +// t.Fatalf("QueryRows failed: %v", err) +// } + +// for i, p := range result { +// assert.Equal(t, expectedUpsertRows[i], []any{p.ConfigID, p.Round, p.Proof}) +// } + +// proofMap = map[int32][]byte{ +// tmpConfigs[0].ID: []byte("proof3"), +// tmpConfigs[1].ID: []byte("proof4"), +// } +// expectedUpsertRows = [][]any{ +// {tmpConfigs[0].ID, int32(1), []byte("proof3")}, +// {tmpConfigs[1].ID, int32(2), []byte("proof4")}, +// } +// err = UpdateProofs(ctx, aggregates, proofMap) +// if err != nil { +// t.Fatalf("UpdateProofs failed: %v", err) +// } +// result, err = db.QueryRows[Proof](ctx, "SELECT config_id, round, proof FROM proofs WHERE config_id IN ("+strconv.Itoa(int(tmpConfigs[0].ID))+", "+strconv.Itoa(int(tmpConfigs[1].ID))+")", nil) +// if err != nil { +// t.Fatalf("QueryRows failed: %v", err) +// } + +// for i, p := range result { +// assert.Equal(t, expectedUpsertRows[i], []any{p.ConfigID, p.Round, p.Proof}) +// } +// } diff --git a/node/taskfiles/taskfile.local.yml b/node/taskfiles/taskfile.local.yml index d1bf430fd..14bed5856 100644 --- a/node/taskfiles/taskfile.local.yml +++ b/node/taskfiles/taskfile.local.yml @@ -29,6 +29,10 @@ tasks: dotenv: [".env"] cmds: - go run ./cmd/dal/main.go + reporter: + dotenv: [".env"] + cmds: + - go run ./cmd/reporter/main.go script-submission: dotenv: [".env"] @@ -98,10 +102,10 @@ tasks: dotenv: [".env"] cmds: - go test ./pkg/libp2p/tests -v - test-reporter: - dotenv: [".env"] - cmds: - - go test ./pkg/reporter -v + # test-reporter: + # dotenv: [".env"] + # cmds: + # - go test ./pkg/reporter -v test-boot: dotenv: [".env"] cmds: @@ -226,7 +230,7 @@ tasks: - task: test-fetcher - task: test-utils - task: test-aggregator - - task: test-reporter + # - task: test-reporter - task: test-chain - task: test-wss - task: test-websocketfetcher