Skip to content

Commit

Permalink
[Separate Reporter Part 2] base report logic with rest api (#1873)
Browse files Browse the repository at this point in the history
* comment out / remove unused code from reporter

* comment out tests and create reporter entrypoint

* temp comment out deviation reporter

* initial dal fetch and submit logic

* report logic implemented

* temp disable reporter tests

* remove reporter from node entrypoint

* replace zerolog with zeropglog in reporter entrypoing

* shared chain helper and mutex
  • Loading branch information
Intizar-T committed Jul 24, 2024
1 parent 36ff097 commit ea6656a
Show file tree
Hide file tree
Showing 12 changed files with 1,155 additions and 1,484 deletions.
13 changes: 0 additions & 13 deletions node/cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"bisonai.com/orakl/node/pkg/fetcher"
"bisonai.com/orakl/node/pkg/libp2p/helper"
libp2pSetup "bisonai.com/orakl/node/pkg/libp2p/setup"
"bisonai.com/orakl/node/pkg/reporter"
"bisonai.com/orakl/node/pkg/utils/retrier"
"bisonai.com/orakl/node/pkg/zeropglog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -87,18 +86,6 @@ func main() {
}()
log.Info().Msg("Aggregator started")

wg.Add(1)
go func() {
defer wg.Done()
r := reporter.New(mb, host, ps)
reporterErr := r.Run(ctx)
if reporterErr != nil {
log.Error().Err(reporterErr).Msg("Failed to start reporter")
os.Exit(1)
}
}()
log.Info().Msg("Reporter started")

wg.Add(1)
go func() {
defer wg.Done()
Expand Down
38 changes: 38 additions & 0 deletions node/cmd/reporter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"context"
"os"
"os/signal"
"syscall"

"bisonai.com/orakl/node/pkg/reporter"
"bisonai.com/orakl/node/pkg/zeropglog"
"github.com/rs/zerolog/log"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

zeropglog := zeropglog.New()
go zeropglog.Run(ctx)

r := reporter.New()
err := r.Run(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to start reporter")
cancel()
return
}

<-sigChan
log.Info().Msg("Reporter termination signal received")

cancel()

log.Info().Msg("Reporter service has stopped")
}
1 change: 1 addition & 0 deletions node/pkg/error/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ var (
ErrReporterStart = &CustomError{Service: Reporter, Code: InternalError, Message: "Failed to start reporters"}
ErrReporterStop = &CustomError{Service: Reporter, Code: InternalError, Message: "Failed to stop reporters"}
ErrReporterValidateAggregateTimestampValues = &CustomError{Service: Reporter, Code: InternalError, Message: "Failed to validate aggregate timestamp values"}
ErrReporterDalApiKeyNotFound = &CustomError{Service: Reporter, Code: InternalError, Message: "DAL API key not found in reporter"}

ErrDalEmptyProofParam = &CustomError{Service: Dal, Code: InvalidInputError, Message: "Empty proof param"}
ErrDalInvalidProofLength = &CustomError{Service: Dal, Code: InvalidInputError, Message: "Invalid proof length"}
Expand Down
206 changes: 36 additions & 170 deletions node/pkg/reporter/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,43 @@ package reporter

import (
"context"
"fmt"
"os"
"time"

"bisonai.com/orakl/node/pkg/bus"
"bisonai.com/orakl/node/pkg/chain/helper"
"bisonai.com/orakl/node/pkg/db"
errorSentinel "bisonai.com/orakl/node/pkg/error"
"github.com/klaytn/klaytn/common"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/rs/zerolog/log"
)

func New(bus *bus.MessageBus, h host.Host, ps *pubsub.PubSub) *App {
func New() *App {
return &App{
Reporters: []*Reporter{},
Bus: bus,
Host: h,
Pubsub: ps,
}
}

func (a *App) Run(ctx context.Context) error {
err := a.setReporters(ctx, a.Host, a.Pubsub)
err := a.setReporters(ctx)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to set reporters")
return err
}
a.subscribe(ctx)

return a.startReporters(ctx)
}

func (a *App) setReporters(ctx context.Context, h host.Host, ps *pubsub.PubSub) error {
err := a.clearReporters()
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to clear reporters")
return err
func (a *App) setReporters(ctx context.Context) error {
dalApiKey := os.Getenv("API_KEY")
if dalApiKey == "" {
return errorSentinel.ErrReporterDalApiKeyNotFound
}

chain := os.Getenv("CHAIN")
if chain == "" {
log.Warn().Str("Player", "Reporter").Msg("chain not set, defaulting to baobab")
chain = "baobab"
}

contractAddress := os.Getenv("SUBMISSION_PROXY_CONTRACT")
Expand Down Expand Up @@ -70,12 +69,12 @@ func (a *App) setReporters(ctx context.Context, h host.Host, ps *pubsub.PubSub)
for groupInterval, configs := range groupedConfigs {
reporter, errNewReporter := NewReporter(
ctx,
WithHost(h),
WithPubsub(ps),
WithConfigs(configs),
WithInterval(groupInterval),
WithContractAddress(contractAddress),
WithCachedWhitelist(cachedWhitelist),
WithDalEndpoint(fmt.Sprintf("https://dal.%s.orakl.network", chain)),
WithDalApiKey(dalApiKey),
)
if errNewReporter != nil {
log.Error().Str("Player", "Reporter").Err(errNewReporter).Msg("failed to set reporter")
Expand All @@ -88,55 +87,35 @@ func (a *App) setReporters(ctx context.Context, h host.Host, ps *pubsub.PubSub)
return errorSentinel.ErrReporterNotFound
}

deviationReporter, errNewDeviationReporter := NewReporter(
ctx,
WithHost(h),
WithPubsub(ps),
WithConfigs(configs),
WithInterval(DEVIATION_INTERVAL),
WithContractAddress(contractAddress),
WithCachedWhitelist(cachedWhitelist),
WithJobType(DeviationJob),
)
if errNewDeviationReporter != nil {
log.Error().Str("Player", "Reporter").Err(errNewDeviationReporter).Msg("failed to set deviation reporter")
return errNewDeviationReporter
}
a.Reporters = append(a.Reporters, deviationReporter)
// deviationReporter, errNewDeviationReporter := NewReporter(
// ctx,
// WithConfigs(configs),
// WithInterval(DEVIATION_INTERVAL),
// WithContractAddress(contractAddress),
// WithCachedWhitelist(cachedWhitelist),
// WithJobType(DeviationJob),
// )
// if errNewDeviationReporter != nil {
// log.Error().Str("Player", "Reporter").Err(errNewDeviationReporter).Msg("failed to set deviation reporter")
// return errNewDeviationReporter
// }
// a.Reporters = append(a.Reporters, deviationReporter)

log.Info().Str("Player", "Reporter").Msgf("%d reporters set", len(a.Reporters))
return nil
}

func (a *App) clearReporters() error {
if a.Reporters == nil {
return nil
}

func (a *App) startReporters(ctx context.Context) error {
var errs []string
for _, reporter := range a.Reporters {
if reporter.isRunning {
err := stopReporter(reporter)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to stop reporter")
errs = append(errs, err.Error())
}
}
}
a.Reporters = make([]*Reporter, 0)

if len(errs) > 0 {
return errorSentinel.ErrReporterClear
chainHelper, chainHelperErr := helper.NewChainHelper(ctx)
if chainHelperErr != nil {
return chainHelperErr
}

return nil
}

func (a *App) startReporters(ctx context.Context) error {
var errs []string
a.chainHelper = chainHelper

for _, reporter := range a.Reporters {
err := startReporter(ctx, reporter)
err := a.startReporter(ctx, reporter)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to start reporter")
errs = append(errs, err.Error())
Expand All @@ -150,97 +129,6 @@ func (a *App) startReporters(ctx context.Context) error {
return nil
}

func (a *App) stopReporters() error {
var errs []string

for _, reporter := range a.Reporters {
err := stopReporter(reporter)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to stop reporter")
errs = append(errs, err.Error())
}
}

if len(errs) > 0 {
return errorSentinel.ErrReporterStop
}

return nil
}

func (a *App) subscribe(ctx context.Context) {
log.Debug().Str("Player", "Reporter").Msg("subscribing to reporter topic")
channel := a.Bus.Subscribe(bus.REPORTER)
if channel == nil {
log.Error().Str("Player", "Reporter").Msg("failed to subscribe to reporter topic")
return
}

go func() {
log.Debug().Str("Player", "Reporter").Msg("start reporter subscription goroutine")
for {
select {
case msg := <-channel:
log.Debug().Str("Player", "Reporter").Str("command", msg.Content.Command).Msg("received message from reporter topic")
go a.handleMessage(ctx, msg)
case <-ctx.Done():
log.Debug().Str("Player", "Reporter").Msg("stopping reporter subscription goroutine")
return
}
}
}()
}

func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
switch msg.Content.Command {
case bus.ACTIVATE_REPORTER:
if msg.From != bus.ADMIN {
bus.HandleMessageError(errorSentinel.ErrBusNonAdmin, msg, "reporter received message from non-admin")
return
}
err := a.startReporters(ctx)
if err != nil {
bus.HandleMessageError(err, msg, "failed to start reporter")
return
}
msg.Response <- bus.MessageResponse{Success: true}
case bus.DEACTIVATE_REPORTER:
if msg.From != bus.ADMIN {
bus.HandleMessageError(errorSentinel.ErrBusNonAdmin, msg, "reporter received message from non-admin")
return
}
err := a.stopReporters()
if err != nil {
bus.HandleMessageError(err, msg, "failed to stop reporter")
return
}
msg.Response <- bus.MessageResponse{Success: true}
case bus.REFRESH_REPORTER:
if msg.From != bus.ADMIN {
bus.HandleMessageError(errorSentinel.ErrBusNonAdmin, msg, "reporter received message from non-admin")
return
}
err := a.stopReporters()
if err != nil {
bus.HandleMessageError(err, msg, "failed to stop reporter")
return
}

err = a.setReporters(ctx, a.Host, a.Pubsub)
if err != nil {
bus.HandleMessageError(err, msg, "failed to set reporters")
return
}

err = a.startReporters(ctx)
if err != nil {
bus.HandleMessageError(err, msg, "failed to start reporter")
return
}
msg.Response <- bus.MessageResponse{Success: true}
}
}

func (a *App) GetReporterWithInterval(interval int) (*Reporter, error) {
for _, reporter := range a.Reporters {
if reporter.SubmissionInterval == time.Duration(interval)*time.Millisecond {
Expand All @@ -250,17 +138,13 @@ func (a *App) GetReporterWithInterval(interval int) (*Reporter, error) {
return nil, errorSentinel.ErrReporterNotFound
}

func startReporter(ctx context.Context, reporter *Reporter) error {
func (a *App) startReporter(ctx context.Context, reporter *Reporter) error {
if reporter.isRunning {
log.Debug().Str("Player", "Reporter").Msg("reporter already running")
return errorSentinel.ErrReporterAlreadyRunning
}

err := reporter.SetKaiaHelper(ctx)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to set kaia helper")
return err
}
reporter.KaiaHelper = a.chainHelper

nodeCtx, cancel := context.WithCancel(ctx)
reporter.nodeCtx = nodeCtx
Expand All @@ -271,24 +155,6 @@ func startReporter(ctx context.Context, reporter *Reporter) error {
return nil
}

func stopReporter(reporter *Reporter) error {
if !reporter.isRunning {
log.Debug().Str("Player", "Reporter").Msg("reporter not running")
return nil
}

if reporter.nodeCancel == nil {
log.Error().Str("Player", "Reporter").Msg("reporter cancel function not found")
return errorSentinel.ErrReporterCancelNotFound
}

reporter.nodeCancel()
reporter.isRunning = false
reporter.KaiaHelper.Close()
<-reporter.nodeCtx.Done()
return nil
}

func getConfigs(ctx context.Context) ([]Config, error) {
reporterConfigs, err := db.QueryRows[Config](ctx, GET_REPORTER_CONFIGS, nil)
if err != nil {
Expand Down
Loading

0 comments on commit ea6656a

Please sign in to comment.