From bd087969a607908376ede0ed95ac0a2943589c1f Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 1 Jul 2024 17:47:45 +0900 Subject: [PATCH 01/25] feat: rest api based dal api wip feat: encapsulate wip wip feat: rest api --- node/cmd/dal/main.go | 17 +++ node/pkg/dalapi/api/controller.go | 167 +++++++++++++++++++++ node/pkg/dalapi/api/route.go | 12 ++ node/pkg/dalapi/app.go | 37 +++++ node/pkg/dalapi/collector/collector.go | 192 +++++++++++++++++++++++++ node/pkg/dalapi/collector/utils.go | 90 ++++++++++++ node/pkg/dalapi/common/types.go | 10 ++ node/pkg/dalapi/utils/utils.go | 97 +++++++++++++ node/taskfiles/taskfile.local.yml | 4 + 9 files changed, 626 insertions(+) create mode 100644 node/cmd/dal/main.go create mode 100644 node/pkg/dalapi/api/controller.go create mode 100644 node/pkg/dalapi/api/route.go create mode 100644 node/pkg/dalapi/app.go create mode 100644 node/pkg/dalapi/collector/collector.go create mode 100644 node/pkg/dalapi/collector/utils.go create mode 100644 node/pkg/dalapi/common/types.go create mode 100644 node/pkg/dalapi/utils/utils.go diff --git a/node/cmd/dal/main.go b/node/cmd/dal/main.go new file mode 100644 index 000000000..1bcfb7479 --- /dev/null +++ b/node/cmd/dal/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "context" + + "bisonai.com/orakl/node/pkg/dalapi" + "github.com/rs/zerolog/log" +) + +func main() { + ctx := context.Background() + err := dalapi.Run(ctx) + if err != nil { + log.Fatal().Err(err).Msg("Failed to start dalapi") + return + } +} diff --git a/node/pkg/dalapi/api/controller.go b/node/pkg/dalapi/api/controller.go new file mode 100644 index 000000000..ee7363a07 --- /dev/null +++ b/node/pkg/dalapi/api/controller.go @@ -0,0 +1,167 @@ +package api + +import ( + "context" + "errors" + "strings" + + "bisonai.com/orakl/node/pkg/aggregator" + "bisonai.com/orakl/node/pkg/common/keys" + "bisonai.com/orakl/node/pkg/common/types" + "bisonai.com/orakl/node/pkg/dalapi/collector" + dalcommon "bisonai.com/orakl/node/pkg/dalapi/common" + "bisonai.com/orakl/node/pkg/db" + "github.com/gofiber/contrib/websocket" + "github.com/gofiber/fiber/v2" + "github.com/rs/zerolog/log" +) + +type Controller struct { + configs map[string]types.Config + Collector *collector.Collector + clients map[*websocket.Conn]map[string]bool + register chan *websocket.Conn + unregister chan *websocket.Conn + broadcast chan dalcommon.OutgoingSubmissionData +} + +var ApiController Controller + +func NewController(configs map[string]types.Config, internalCollector *collector.Collector) *Controller { + return &Controller{ + Collector: internalCollector, + configs: configs, + + clients: make(map[*websocket.Conn]map[string]bool), + register: make(chan *websocket.Conn), + unregister: make(chan *websocket.Conn), + broadcast: make(chan dalcommon.OutgoingSubmissionData), + } +} + +func (c *Controller) getLatestSubmissionData(ctx context.Context) ([]aggregator.SubmissionData, error) { + globalAggregateKeyList := make([]string, 0, len(c.configs)) + for _, config := range c.configs { + globalAggregateKeyList = append(globalAggregateKeyList, keys.GlobalAggregateKey(config.ID)) + } + + globalAggregates, err := db.MGetObject[aggregator.GlobalAggregate](ctx, globalAggregateKeyList) + if err != nil { + return nil, err + } + + proofKeyList := make([]string, 0, len(globalAggregates)) + for _, globalAggregate := range globalAggregates { + proofKeyList = append(proofKeyList, keys.ProofKey(globalAggregate.ConfigID, globalAggregate.Round)) + } + + proofs, err := db.MGetObject[aggregator.Proof](ctx, proofKeyList) + if err != nil { + return nil, err + } + + proofMap := make(map[int32]aggregator.Proof) + for _, proof := range proofs { + proofMap[proof.ConfigID] = proof + } + + result := make([]aggregator.SubmissionData, 0, len(globalAggregates)) + for _, globalAggregate := range globalAggregates { + proof, ok := proofMap[globalAggregate.ConfigID] + if !ok { + continue + } + + result = append(result, aggregator.SubmissionData{ + GlobalAggregate: globalAggregate, + Proof: proof, + }) + } + + return result, nil +} + +func (c *Controller) getLatestSubmissionDataSingle(ctx context.Context, symbol string) (*aggregator.SubmissionData, error) { + if symbol == "" { + return nil, errors.New("invalid symbol: empty symbol") + } + if strings.Contains(symbol, "-") { + return nil, errors.New("symbol should be in {BASE}-{QUOTE} format") + } + + symbol = strings.ToUpper(symbol) + + config, ok := c.configs[symbol] + if !ok { + return nil, errors.New("invalid symbol") + } + + globalAggregate, err := db.GetObject[aggregator.GlobalAggregate](ctx, keys.GlobalAggregateKey(config.ID)) + if err != nil { + return nil, err + } + + proof, err := db.GetObject[aggregator.Proof](ctx, keys.ProofKey(config.ID, globalAggregate.Round)) + if err != nil { + return nil, err + } + + return &aggregator.SubmissionData{ + GlobalAggregate: globalAggregate, + Proof: proof, + }, nil +} + +func init() { + ctx := context.Background() + configs, err := db.QueryRows[types.Config](ctx, "SELECT * FROM configs", nil) + if err != nil { + log.Error().Err(err).Msg("failed to get configs") + panic(err) + } + configMap := make(map[string]types.Config) + for _, config := range configs { + configMap[config.Name] = config + } + collector, err := collector.NewCollector(ctx, configs) + if err != nil { + log.Error().Err(err).Msg("failed to create collector") + panic(err) + } + + ApiController = *NewController(configMap, collector) + +} + +func getLatestFeeds(c *fiber.Ctx) error { + submissionData, err := ApiController.getLatestSubmissionData(c.Context()) + if err != nil { + return err + } + + result := make([]dalcommon.OutgoingSubmissionData, 0, len(submissionData)) + + for _, data := range submissionData { + outgoingData, err := ApiController.Collector.IncomingDataToOutgoingData(c.Context(), data) + if err != nil { + return err + } + result = append(result, *outgoingData) + } + return c.JSON(result) +} + +func getLatestFeed(c *fiber.Ctx) error { + symbol := c.Params("symbol") + + submissionData, err := ApiController.getLatestSubmissionDataSingle(c.Context(), symbol) + if err != nil { + return err + } + + outgoingData, err := ApiController.Collector.IncomingDataToOutgoingData(c.Context(), *submissionData) + if err != nil { + return err + } + return c.JSON(outgoingData) +} diff --git a/node/pkg/dalapi/api/route.go b/node/pkg/dalapi/api/route.go new file mode 100644 index 000000000..44dae4c99 --- /dev/null +++ b/node/pkg/dalapi/api/route.go @@ -0,0 +1,12 @@ +package api + +import ( + "github.com/gofiber/fiber/v2" +) + +func Routes(router fiber.Router) { + api := router.Group("/dal") + + api.Get("/latest-data-feeds/all", getLatestFeeds) + api.Get("/latest-data-feeds/:symbol", getLatestFeed) +} diff --git a/node/pkg/dalapi/app.go b/node/pkg/dalapi/app.go new file mode 100644 index 000000000..0b5bfeb63 --- /dev/null +++ b/node/pkg/dalapi/app.go @@ -0,0 +1,37 @@ +package dalapi + +import ( + "context" + "os" + + "bisonai.com/orakl/node/pkg/dalapi/api" + "bisonai.com/orakl/node/pkg/dalapi/utils" + + "github.com/gofiber/fiber/v2" + "github.com/rs/zerolog/log" +) + +func Run(ctx context.Context) error { + log.Debug().Msg("Starting DAL API server") + app, err := utils.Setup(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to setup DAL API server") + return err + } + + go api.ApiController.Collector.Start(ctx) + log.Debug().Str("Player", "DAL API").Msg("DAL API collector started") + v1 := app.Group("/api/v1") + v1.Get("/", func(c *fiber.Ctx) error { + return c.SendString("Orakl Node DAL API") + }) + + api.Routes(v1) + + port := os.Getenv("DAL_API_PORT") + if port == "" { + port = "8090" + } + + return app.Listen(":" + port) +} diff --git a/node/pkg/dalapi/collector/collector.go b/node/pkg/dalapi/collector/collector.go new file mode 100644 index 000000000..2140c70ca --- /dev/null +++ b/node/pkg/dalapi/collector/collector.go @@ -0,0 +1,192 @@ +package collector + +import ( + "context" + "errors" + "os" + "strconv" + "sync" + + "bisonai.com/orakl/node/pkg/aggregator" + "bisonai.com/orakl/node/pkg/chain/websocketchainreader" + "bisonai.com/orakl/node/pkg/common/keys" + "bisonai.com/orakl/node/pkg/common/types" + dalcommon "bisonai.com/orakl/node/pkg/dalapi/common" + "bisonai.com/orakl/node/pkg/db" + errorSentinel "bisonai.com/orakl/node/pkg/error" + klaytncommon "github.com/klaytn/klaytn/common" + "github.com/klaytn/klaytn/crypto" + "github.com/rs/zerolog/log" +) + +const ( + DefaultDecimals = "8" + GetAllOracles = "function getAllOracles() public view returns (address[])" + OracleAdded = "event OracleAdded(address oracle, uint256 expirationTime)" +) + +type Collector struct { + IncomingStream map[int32]chan aggregator.SubmissionData + OutgoingStream map[int32]chan dalcommon.OutgoingSubmissionData + Symbols map[int32]string + FeedHashes map[int32][]byte + CachedWhitelist []klaytncommon.Address + + chainReader *websocketchainreader.ChainReader + submissionProxyContractAddr string + ctx context.Context + cancelFunc context.CancelFunc + + mu sync.RWMutex +} + +func NewCollector(ctx context.Context, configs []types.Config) (*Collector, error) { + kaiaWebsocketUrl := os.Getenv("KAIA_WEBSOCKET_URL") + if kaiaWebsocketUrl == "" { + return nil, errors.New("KAIA_WEBSOCKET_URL is not set") + } + + submissionProxyContractAddr := os.Getenv("SUBMISSION_PROXY_CONTRACT") + if submissionProxyContractAddr == "" { + return nil, errors.New("SUBMISSION_PROXY_CONTRACT is not set") + } + + chainReader, err := websocketchainreader.New(websocketchainreader.WithKaiaWebsocketUrl(kaiaWebsocketUrl)) + if err != nil { + return nil, err + } + + initialWhitelist, err := getAllOracles(ctx, chainReader, submissionProxyContractAddr) + if err != nil { + return nil, err + } + + collector := &Collector{ + IncomingStream: make(map[int32]chan aggregator.SubmissionData, len(configs)), + OutgoingStream: make(map[int32]chan dalcommon.OutgoingSubmissionData, len(configs)), + Symbols: make(map[int32]string, len(configs)), + FeedHashes: make(map[int32][]byte, len(configs)), + chainReader: chainReader, + CachedWhitelist: initialWhitelist, + submissionProxyContractAddr: submissionProxyContractAddr, + } + + for _, config := range configs { + collector.IncomingStream[config.ID] = make(chan aggregator.SubmissionData) + collector.OutgoingStream[config.ID] = make(chan dalcommon.OutgoingSubmissionData) + collector.Symbols[config.ID] = config.Name + collector.FeedHashes[config.ID] = crypto.Keccak256([]byte(config.Name)) + } + + return collector, nil +} + +func (c *Collector) Start(ctx context.Context) { + if c.ctx != nil { + log.Debug().Str("Player", "DalCollector").Msg("Collector already running") + return + } + + ctxWithCancel, cancel := context.WithCancel(ctx) + c.cancelFunc = cancel + c.ctx = ctxWithCancel + + c.receive(ctxWithCancel) + c.trackOracleAdded(ctxWithCancel) +} + +func (c *Collector) Stop() { + if c.cancelFunc != nil { + c.cancelFunc() + } +} + +func (c *Collector) receive(ctx context.Context) { + for id := range c.IncomingStream { + go c.receiveEach(ctx, id) + } +} + +func (c *Collector) receiveEach(ctx context.Context, configId int32) { + err := db.Subscribe(ctx, keys.SubmissionDataStreamKey(configId), c.IncomingStream[configId]) + if err != nil { + log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to subscribe to submission stream") + } + for { + select { + case <-ctx.Done(): + return + case data := <-c.IncomingStream[configId]: + go c.processIncomingData(ctx, data) + } + } +} + +func (c *Collector) processIncomingData(ctx context.Context, data aggregator.SubmissionData) { + result, err := c.IncomingDataToOutgoingData(ctx, data) + if err != nil { + log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to convert incoming data to outgoing data") + return + } + + c.OutgoingStream[data.GlobalAggregate.ConfigID] <- *result +} + +func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data aggregator.SubmissionData) (*dalcommon.OutgoingSubmissionData, error) { + c.mu.RLock() + whitelist := c.CachedWhitelist + c.mu.RUnlock() + orderedProof, err := orderProof( + ctx, + data.Proof.Proof, + data.GlobalAggregate.Value, + data.GlobalAggregate.Timestamp, + c.Symbols[data.GlobalAggregate.ConfigID], + whitelist) + if err != nil { + log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to order proof") + if errors.Is(err, errorSentinel.ErrReporterSignerNotWhitelisted) { + newList, err := getAllOracles(ctx, c.chainReader, c.submissionProxyContractAddr) + if err != nil { + log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to refresh oracles") + return nil, err + } + c.mu.Lock() + c.CachedWhitelist = newList + c.mu.Unlock() + } + return nil, err + } + return &dalcommon.OutgoingSubmissionData{ + Symbol: c.Symbols[data.GlobalAggregate.ConfigID], + Value: strconv.FormatInt(data.GlobalAggregate.Value, 10), + AggregateTime: strconv.FormatInt(data.GlobalAggregate.Timestamp.Unix(), 10), + Proof: orderedProof, + FeedHash: c.FeedHashes[data.GlobalAggregate.ConfigID], + Decimals: DefaultDecimals, + }, nil +} + +func (c *Collector) trackOracleAdded(ctx context.Context) { + var eventTriggered chan any + err := subscribeAddOracleEvent(ctx, c.chainReader, c.submissionProxyContractAddr, eventTriggered) + if err != nil { + log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to subscribe to oracle added event") + } + go func() { + for { + select { + case <-ctx.Done(): + return + case <-eventTriggered: + newList, err := getAllOracles(ctx, c.chainReader, c.submissionProxyContractAddr) + if err != nil { + log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to get all oracles") + } + c.mu.Lock() + c.CachedWhitelist = newList + c.mu.Unlock() + } + } + }() +} diff --git a/node/pkg/dalapi/collector/utils.go b/node/pkg/dalapi/collector/utils.go new file mode 100644 index 000000000..dbd299245 --- /dev/null +++ b/node/pkg/dalapi/collector/utils.go @@ -0,0 +1,90 @@ +package collector + +import ( + "context" + "errors" + "time" + + chainutils "bisonai.com/orakl/node/pkg/chain/utils" + "bisonai.com/orakl/node/pkg/chain/websocketchainreader" + "bisonai.com/orakl/node/pkg/reporter" + + "github.com/klaytn/klaytn/blockchain/types" + klaytncommon "github.com/klaytn/klaytn/common" + "github.com/rs/zerolog/log" +) + +func getAllOracles(ctx context.Context, chainReader *websocketchainreader.ChainReader, submissionProxyContractAddr string) ([]klaytncommon.Address, error) { + rawResult, err := chainReader.ReadContractOnce(ctx, websocketchainreader.Kaia, submissionProxyContractAddr, GetAllOracles) + if err != nil { + log.Error().Err(err).Msg("failed to get all oracles") + return nil, err + } + rawResultSlice, ok := rawResult.([]interface{}) + if !ok { + return nil, errors.New("failed to cast result to []interface{}") + } + + return rawResultSlice[0].([]klaytncommon.Address), nil +} + +func subscribeAddOracleEvent(ctx context.Context, chainReader *websocketchainreader.ChainReader, submissionProxyContractAddr string, isUpdated chan any) error { + logChannel := make(chan types.Log) + err := chainReader.Subscribe( + ctx, + websocketchainreader.WithAddress(submissionProxyContractAddr), + websocketchainreader.WithChainType(websocketchainreader.Kaia), + websocketchainreader.WithChannel(logChannel), + ) + if err != nil { + return err + } + + eventName, input, _, eventParseErr := chainutils.ParseMethodSignature(OracleAdded) + if eventParseErr != nil { + return eventParseErr + } + + oracleAddedEventABI, err := chainutils.GenerateEventABI(eventName, input) + if err != nil { + return err + } + + for eventLog := range logChannel { + result, err := oracleAddedEventABI.Unpack(eventName, eventLog.Data) + if err != nil { + continue + } + + _, ok := result[0].(klaytncommon.Address) + if !ok { + continue + } + + isUpdated <- true + } + + return nil +} + +func orderProof(ctx context.Context, proof []byte, value int64, timestamp time.Time, symbol string, cachedWhitelist []klaytncommon.Address) ([]byte, error) { + proof = reporter.RemoveDuplicateProof(proof) + hash := chainutils.Value2HashForSign(value, timestamp.Unix(), symbol) + proofChunks, err := reporter.SplitProofToChunk(proof) + if err != nil { + return nil, err + } + + signers, err := reporter.GetSignerListFromProofs(hash, proofChunks) + if err != nil { + return nil, err + } + + err = reporter.CheckForNonWhitelistedSigners(signers, cachedWhitelist) + if err != nil { + return nil, err + } + + signerMap := reporter.GetSignerMap(signers, proofChunks) + return reporter.OrderProof(signerMap, cachedWhitelist) +} diff --git a/node/pkg/dalapi/common/types.go b/node/pkg/dalapi/common/types.go new file mode 100644 index 000000000..9bad76d6f --- /dev/null +++ b/node/pkg/dalapi/common/types.go @@ -0,0 +1,10 @@ +package common + +type OutgoingSubmissionData struct { + Symbol string `json:"symbol"` + Value string `json:"value"` + AggregateTime string `json:"aggregateTime"` + Proof []byte `json:"proof"` + FeedHash []byte `json:"feedHash"` + Decimals string `json:"decimals"` +} diff --git a/node/pkg/dalapi/utils/utils.go b/node/pkg/dalapi/utils/utils.go new file mode 100644 index 000000000..655c3c3a2 --- /dev/null +++ b/node/pkg/dalapi/utils/utils.go @@ -0,0 +1,97 @@ +package utils + +import ( + "context" + "errors" + "fmt" + "os" + "runtime/debug" + "strings" + + "bisonai.com/orakl/node/pkg/db" + errorSentinel "bisonai.com/orakl/node/pkg/error" + + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/cors" + "github.com/gofiber/fiber/v2/middleware/recover" + "github.com/rs/zerolog/log" +) + +func Setup(ctx context.Context) (*fiber.App, error) { + _, err := db.GetPool(ctx) + if err != nil { + log.Error().Err(err).Msg("error getting db pool") + return nil, errorSentinel.ErrAdminDbPoolNotFound + } + + _, err = db.GetRedisClient(ctx) + if err != nil { + log.Error().Err(err).Msg("error getting redis conn") + return nil, errorSentinel.ErrAdminRedisConnNotFound + } + + app := fiber.New(fiber.Config{ + AppName: "Data Availability Layer API 0.1.0", + EnablePrintRoutes: true, + ErrorHandler: CustomErrorHandler, + }) + + app.Use(recover.New( + recover.Config{ + EnableStackTrace: true, + StackTraceHandler: CustomStackTraceHandler, + }, + )) + + app.Use(cors.New()) + return app, nil +} + +func CustomErrorHandler(c *fiber.Ctx, err error) error { + // Status code defaults to 500 + code := fiber.StatusInternalServerError + + // Retrieve the custom status code if it's a *fiber.Error + var e *fiber.Error + if errors.As(err, &e) { + code = e.Code + } + + // Set Content-Type: text/plain; charset=utf-8 + c.Set(fiber.HeaderContentType, fiber.MIMETextPlainCharsetUTF8) + + // Return status code with error message + // | ${status} | ${ip} | ${method} | ${path} | ${error}", + + log. + Info(). + Err(err). + Int("status", code). + Str("ip", c.IP()). + Str("method", c.Method()). + Str("path", c.Path()). + Msg("error") + + return c.Status(code).SendString(err.Error()) +} + +func CustomStackTraceHandler(_ *fiber.Ctx, e interface{}) { + stackTrace := strings.Split(string(debug.Stack()), "\n") + var failPoint string + + for _, line := range stackTrace { + if strings.Contains(line, "controller.go") { + path := strings.Split(strings.TrimSpace(line), " ")[0] + splitted := strings.Split(path, "/") + failPoint = splitted[len(splitted)-2] + "/" + splitted[len(splitted)-1] + + break + } + } + log. + Info(). + Str("failPoint", failPoint). + Msgf("panic: %v", e) + + _, _ = os.Stderr.WriteString(fmt.Sprintf("%s\n", debug.Stack())) //nolint:errcheck // This will never fail +} diff --git a/node/taskfiles/taskfile.local.yml b/node/taskfiles/taskfile.local.yml index 2d55f52ac..979d8fdad 100644 --- a/node/taskfiles/taskfile.local.yml +++ b/node/taskfiles/taskfile.local.yml @@ -25,6 +25,10 @@ tasks: dotenv: [".env"] cmds: - go run ./cmd/aggregator/main.go + dal: + dotenv: [".env"] + cmds: + - go run ./cmd/dal/main.go script-submission: dotenv: [".env"] From 57d0a6adb0ef78d1b71db8cfd75147361d2c1b71 Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 2 Jul 2024 17:09:00 +0900 Subject: [PATCH 02/25] feat: websocket fix: linter err and vet err fix feat: rename and separate types fix: change function order, remove err return fix: update based on feedbacks docs: update .env.example --- node/.env.example | 10 ++ node/cmd/dal/main.go | 4 +- node/pkg/{dalapi => dal}/api/controller.go | 139 +++++++++++++----- node/pkg/{dalapi => dal}/api/route.go | 2 + node/pkg/dal/api/types.go | 22 +++ node/pkg/{dalapi => dal}/app.go | 8 +- .../{dalapi => dal}/collector/collector.go | 10 +- node/pkg/{dalapi => dal}/collector/utils.go | 0 node/pkg/{dalapi => dal}/common/types.go | 0 node/pkg/{dalapi => dal}/utils/utils.go | 2 +- 10 files changed, 149 insertions(+), 48 deletions(-) rename node/pkg/{dalapi => dal}/api/controller.go (66%) rename node/pkg/{dalapi => dal}/api/route.go (69%) create mode 100644 node/pkg/dal/api/types.go rename node/pkg/{dalapi => dal}/app.go (81%) rename node/pkg/{dalapi => dal}/collector/collector.go (94%) rename node/pkg/{dalapi => dal}/collector/utils.go (100%) rename node/pkg/{dalapi => dal}/common/types.go (100%) rename node/pkg/{dalapi => dal}/utils/utils.go (99%) diff --git a/node/.env.example b/node/.env.example index 797918247..a4b5eb203 100644 --- a/node/.env.example +++ b/node/.env.example @@ -52,3 +52,13 @@ POR_PROVIDER_URL= # (optional) defaults to 3000 POR_PORT= +# DAL + +# (optoinal) defaults to 8090 +DAL_API_PORT= + +# (required) +# KAIA_WEBSOCKET_URL= + +# (required) +# SUBMISSION_PROXY_CONTRACT= \ No newline at end of file diff --git a/node/cmd/dal/main.go b/node/cmd/dal/main.go index 1bcfb7479..e71a5e4e2 100644 --- a/node/cmd/dal/main.go +++ b/node/cmd/dal/main.go @@ -3,13 +3,13 @@ package main import ( "context" - "bisonai.com/orakl/node/pkg/dalapi" + "bisonai.com/orakl/node/pkg/dal" "github.com/rs/zerolog/log" ) func main() { ctx := context.Background() - err := dalapi.Run(ctx) + err := dal.Run(ctx) if err != nil { log.Fatal().Err(err).Msg("Failed to start dalapi") return diff --git a/node/pkg/dalapi/api/controller.go b/node/pkg/dal/api/controller.go similarity index 66% rename from node/pkg/dalapi/api/controller.go rename to node/pkg/dal/api/controller.go index ee7363a07..f02c4d986 100644 --- a/node/pkg/dalapi/api/controller.go +++ b/node/pkg/dal/api/controller.go @@ -8,24 +8,62 @@ import ( "bisonai.com/orakl/node/pkg/aggregator" "bisonai.com/orakl/node/pkg/common/keys" "bisonai.com/orakl/node/pkg/common/types" - "bisonai.com/orakl/node/pkg/dalapi/collector" - dalcommon "bisonai.com/orakl/node/pkg/dalapi/common" + "bisonai.com/orakl/node/pkg/dal/collector" + dalcommon "bisonai.com/orakl/node/pkg/dal/common" "bisonai.com/orakl/node/pkg/db" "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" "github.com/rs/zerolog/log" ) -type Controller struct { - configs map[string]types.Config - Collector *collector.Collector - clients map[*websocket.Conn]map[string]bool - register chan *websocket.Conn - unregister chan *websocket.Conn - broadcast chan dalcommon.OutgoingSubmissionData +var ApiController Controller + +func init() { + ctx := context.Background() + configs, err := db.QueryRows[types.Config](ctx, "SELECT * FROM configs", nil) + if err != nil { + log.Error().Err(err).Msg("failed to get configs") + panic(err) + } + configMap := make(map[string]types.Config) + for _, config := range configs { + configMap[config.Name] = config + } + collector, err := collector.NewCollector(ctx, configs) + if err != nil { + log.Error().Err(err).Msg("failed to create collector") + panic(err) + } + + ApiController = *NewController(configMap, collector) } -var ApiController Controller +func (c *Controller) Run(ctx context.Context) { + go c.Collector.Start(ctx) + go func() { + for { + select { + case conn := <-c.register: + c.clients[conn] = make(map[string]bool) + case conn := <-c.unregister: + delete(c.clients, conn) + conn.Close() + } + } + }() + + for configId, stream := range c.Collector.OutgoingStream { + symbol := c.configIdToSymbol(configId) + c.broadcast[symbol] = make(chan dalcommon.OutgoingSubmissionData) + c.broadcast[symbol] = stream + } + + go func() { + for symbol := range c.configs { + go c.broadcastDataForSymbol(symbol) + } + }() +} func NewController(configs map[string]types.Config, internalCollector *collector.Collector) *Controller { return &Controller{ @@ -35,7 +73,7 @@ func NewController(configs map[string]types.Config, internalCollector *collector clients: make(map[*websocket.Conn]map[string]bool), register: make(chan *websocket.Conn), unregister: make(chan *websocket.Conn), - broadcast: make(chan dalcommon.OutgoingSubmissionData), + broadcast: make(map[string]chan dalcommon.OutgoingSubmissionData), } } @@ -82,15 +120,6 @@ func (c *Controller) getLatestSubmissionData(ctx context.Context) ([]aggregator. } func (c *Controller) getLatestSubmissionDataSingle(ctx context.Context, symbol string) (*aggregator.SubmissionData, error) { - if symbol == "" { - return nil, errors.New("invalid symbol: empty symbol") - } - if strings.Contains(symbol, "-") { - return nil, errors.New("symbol should be in {BASE}-{QUOTE} format") - } - - symbol = strings.ToUpper(symbol) - config, ok := c.configs[symbol] if !ok { return nil, errors.New("invalid symbol") @@ -112,25 +141,54 @@ func (c *Controller) getLatestSubmissionDataSingle(ctx context.Context, symbol s }, nil } -func init() { - ctx := context.Background() - configs, err := db.QueryRows[types.Config](ctx, "SELECT * FROM configs", nil) - if err != nil { - log.Error().Err(err).Msg("failed to get configs") - panic(err) - } - configMap := make(map[string]types.Config) - for _, config := range configs { - configMap[config.Name] = config - } - collector, err := collector.NewCollector(ctx, configs) - if err != nil { - log.Error().Err(err).Msg("failed to create collector") - panic(err) +func (c *Controller) handleWebsocket(conn *websocket.Conn) { + c.register <- conn + defer func() { + c.unregister <- conn + conn.Close() + }() + + for { + var msg Subscription + if err := conn.ReadJSON(&msg); err != nil { + log.Error().Err(err).Msg("failed to read message") + return + } + + if msg.Method == "SUBSCRIBE" { + if c.clients[conn] == nil { + c.clients[conn] = make(map[string]bool) + } + for _, param := range msg.Params { + symbol := strings.TrimPrefix(param, "submission@") + if _, ok := c.configs[symbol]; !ok { + continue + } + c.clients[conn][symbol] = true + } + } } +} - ApiController = *NewController(configMap, collector) +func (c *Controller) configIdToSymbol(id int32) string { + for symbol, config := range c.configs { + if config.ID == id { + return symbol + } + } + return "" +} +func (c *Controller) broadcastDataForSymbol(symbol string) { + for data := range c.broadcast[symbol] { + for conn := range c.clients { + if _, ok := c.clients[conn][symbol]; ok { + if err := conn.WriteJSON(data); err != nil { + log.Error().Err(err).Msg("failed to write message") + } + } + } + } } func getLatestFeeds(c *fiber.Ctx) error { @@ -154,6 +212,15 @@ func getLatestFeeds(c *fiber.Ctx) error { func getLatestFeed(c *fiber.Ctx) error { symbol := c.Params("symbol") + if symbol == "" { + return errors.New("invalid symbol: empty symbol") + } + if !strings.Contains(symbol, "-") { + return errors.New("symbol should be in {BASE}-{QUOTE} format") + } + + symbol = strings.ToUpper(symbol) + submissionData, err := ApiController.getLatestSubmissionDataSingle(c.Context(), symbol) if err != nil { return err diff --git a/node/pkg/dalapi/api/route.go b/node/pkg/dal/api/route.go similarity index 69% rename from node/pkg/dalapi/api/route.go rename to node/pkg/dal/api/route.go index 44dae4c99..251b10403 100644 --- a/node/pkg/dalapi/api/route.go +++ b/node/pkg/dal/api/route.go @@ -1,6 +1,7 @@ package api import ( + "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" ) @@ -9,4 +10,5 @@ func Routes(router fiber.Router) { api.Get("/latest-data-feeds/all", getLatestFeeds) api.Get("/latest-data-feeds/:symbol", getLatestFeed) + api.Get("/ws", websocket.New(ApiController.handleWebsocket)) } diff --git a/node/pkg/dal/api/types.go b/node/pkg/dal/api/types.go new file mode 100644 index 000000000..3e16967e2 --- /dev/null +++ b/node/pkg/dal/api/types.go @@ -0,0 +1,22 @@ +package api + +import ( + "bisonai.com/orakl/node/pkg/common/types" + "bisonai.com/orakl/node/pkg/dal/collector" + dalcommon "bisonai.com/orakl/node/pkg/dal/common" + "github.com/gofiber/contrib/websocket" +) + +type Subscription struct { + Method string `json:"method"` + Params []string `json:"params"` +} + +type Controller struct { + Collector *collector.Collector + configs map[string]types.Config + clients map[*websocket.Conn]map[string]bool + register chan *websocket.Conn + unregister chan *websocket.Conn + broadcast map[string]chan dalcommon.OutgoingSubmissionData +} diff --git a/node/pkg/dalapi/app.go b/node/pkg/dal/app.go similarity index 81% rename from node/pkg/dalapi/app.go rename to node/pkg/dal/app.go index 0b5bfeb63..e1217734e 100644 --- a/node/pkg/dalapi/app.go +++ b/node/pkg/dal/app.go @@ -1,11 +1,11 @@ -package dalapi +package dal import ( "context" "os" - "bisonai.com/orakl/node/pkg/dalapi/api" - "bisonai.com/orakl/node/pkg/dalapi/utils" + "bisonai.com/orakl/node/pkg/dal/api" + "bisonai.com/orakl/node/pkg/dal/utils" "github.com/gofiber/fiber/v2" "github.com/rs/zerolog/log" @@ -19,7 +19,7 @@ func Run(ctx context.Context) error { return err } - go api.ApiController.Collector.Start(ctx) + api.ApiController.Run(ctx) log.Debug().Str("Player", "DAL API").Msg("DAL API collector started") v1 := app.Group("/api/v1") v1.Get("/", func(c *fiber.Ctx) error { diff --git a/node/pkg/dalapi/collector/collector.go b/node/pkg/dal/collector/collector.go similarity index 94% rename from node/pkg/dalapi/collector/collector.go rename to node/pkg/dal/collector/collector.go index 2140c70ca..c0417eb61 100644 --- a/node/pkg/dalapi/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -11,7 +11,7 @@ import ( "bisonai.com/orakl/node/pkg/chain/websocketchainreader" "bisonai.com/orakl/node/pkg/common/keys" "bisonai.com/orakl/node/pkg/common/types" - dalcommon "bisonai.com/orakl/node/pkg/dalapi/common" + dalcommon "bisonai.com/orakl/node/pkg/dal/common" "bisonai.com/orakl/node/pkg/db" errorSentinel "bisonai.com/orakl/node/pkg/error" klaytncommon "github.com/klaytn/klaytn/common" @@ -146,10 +146,10 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data aggrega if err != nil { log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to order proof") if errors.Is(err, errorSentinel.ErrReporterSignerNotWhitelisted) { - newList, err := getAllOracles(ctx, c.chainReader, c.submissionProxyContractAddr) - if err != nil { - log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to refresh oracles") - return nil, err + newList, getAllOraclesErr := getAllOracles(ctx, c.chainReader, c.submissionProxyContractAddr) + if getAllOraclesErr != nil { + log.Error().Err(getAllOraclesErr).Str("Player", "DalCollector").Msg("failed to refresh oracles") + return nil, getAllOraclesErr } c.mu.Lock() c.CachedWhitelist = newList diff --git a/node/pkg/dalapi/collector/utils.go b/node/pkg/dal/collector/utils.go similarity index 100% rename from node/pkg/dalapi/collector/utils.go rename to node/pkg/dal/collector/utils.go diff --git a/node/pkg/dalapi/common/types.go b/node/pkg/dal/common/types.go similarity index 100% rename from node/pkg/dalapi/common/types.go rename to node/pkg/dal/common/types.go diff --git a/node/pkg/dalapi/utils/utils.go b/node/pkg/dal/utils/utils.go similarity index 99% rename from node/pkg/dalapi/utils/utils.go rename to node/pkg/dal/utils/utils.go index 655c3c3a2..ac02e230c 100644 --- a/node/pkg/dalapi/utils/utils.go +++ b/node/pkg/dal/utils/utils.go @@ -64,7 +64,7 @@ func CustomErrorHandler(c *fiber.Ctx, err error) error { // | ${status} | ${ip} | ${method} | ${path} | ${error}", log. - Info(). + Error(). Err(err). Int("status", code). Str("ip", c.IP()). From c5de6338469a6319da1c94069e700d947c7372c6 Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 2 Jul 2024 21:41:00 +0900 Subject: [PATCH 03/25] fix: remove unnecessary thread --- node/cmd/dal/main.go | 2 +- node/pkg/dal/api/controller.go | 53 +++++++++++++++++----------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/node/cmd/dal/main.go b/node/cmd/dal/main.go index e71a5e4e2..14449113c 100644 --- a/node/cmd/dal/main.go +++ b/node/cmd/dal/main.go @@ -11,7 +11,7 @@ func main() { ctx := context.Background() err := dal.Run(ctx) if err != nil { - log.Fatal().Err(err).Msg("Failed to start dalapi") + log.Fatal().Err(err).Msg("Failed to start dal") return } } diff --git a/node/pkg/dal/api/controller.go b/node/pkg/dal/api/controller.go index f02c4d986..8279ca585 100644 --- a/node/pkg/dal/api/controller.go +++ b/node/pkg/dal/api/controller.go @@ -38,6 +38,18 @@ func init() { ApiController = *NewController(configMap, collector) } +func NewController(configs map[string]types.Config, internalCollector *collector.Collector) *Controller { + return &Controller{ + Collector: internalCollector, + configs: configs, + + clients: make(map[*websocket.Conn]map[string]bool), + register: make(chan *websocket.Conn), + unregister: make(chan *websocket.Conn), + broadcast: make(map[string]chan dalcommon.OutgoingSubmissionData), + } +} + func (c *Controller) Run(ctx context.Context) { go c.Collector.Start(ctx) go func() { @@ -58,22 +70,23 @@ func (c *Controller) Run(ctx context.Context) { c.broadcast[symbol] = stream } - go func() { - for symbol := range c.configs { - go c.broadcastDataForSymbol(symbol) - } - }() -} + for symbol := range c.configs { + go c.broadcastDataForSymbol(symbol) + } -func NewController(configs map[string]types.Config, internalCollector *collector.Collector) *Controller { - return &Controller{ - Collector: internalCollector, - configs: configs, +} - clients: make(map[*websocket.Conn]map[string]bool), - register: make(chan *websocket.Conn), - unregister: make(chan *websocket.Conn), - broadcast: make(map[string]chan dalcommon.OutgoingSubmissionData), +func (c *Controller) broadcastDataForSymbol(symbol string) { + for data := range c.broadcast[symbol] { + for conn := range c.clients { + if _, ok := c.clients[conn][symbol]; ok { + if err := conn.WriteJSON(data); err != nil { + log.Error().Err(err).Msg("failed to write message") + delete(c.clients, conn) + conn.Close() + } + } + } } } @@ -179,18 +192,6 @@ func (c *Controller) configIdToSymbol(id int32) string { return "" } -func (c *Controller) broadcastDataForSymbol(symbol string) { - for data := range c.broadcast[symbol] { - for conn := range c.clients { - if _, ok := c.clients[conn][symbol]; ok { - if err := conn.WriteJSON(data); err != nil { - log.Error().Err(err).Msg("failed to write message") - } - } - } - } -} - func getLatestFeeds(c *fiber.Ctx) error { submissionData, err := ApiController.getLatestSubmissionData(c.Context()) if err != nil { From 784a5b1334066d7f5ccdf42272932c3eb0a9f1c8 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 00:45:42 +0900 Subject: [PATCH 04/25] feat: test codes, minor fixes --- node/pkg/dal/api/controller.go | 84 ++++++------ node/pkg/dal/api/types.go | 3 +- node/pkg/dal/app.go | 4 +- node/pkg/dal/collector/collector.go | 20 +-- node/pkg/dal/collector/utils.go | 26 ++-- node/pkg/dal/tests/api_test.go | 186 +++++++++++++++++++++++++++ node/pkg/dal/tests/collector_test.go | 79 ++++++++++++ node/pkg/dal/tests/main_test.go | 124 ++++++++++++++++++ node/pkg/dal/utils/utils.go | 8 +- node/taskfiles/taskfile.local.yml | 4 + 10 files changed, 466 insertions(+), 72 deletions(-) create mode 100644 node/pkg/dal/tests/api_test.go create mode 100644 node/pkg/dal/tests/collector_test.go create mode 100644 node/pkg/dal/tests/main_test.go diff --git a/node/pkg/dal/api/controller.go b/node/pkg/dal/api/controller.go index 8279ca585..cca7adef8 100644 --- a/node/pkg/dal/api/controller.go +++ b/node/pkg/dal/api/controller.go @@ -18,8 +18,7 @@ import ( var ApiController Controller -func init() { - ctx := context.Background() +func Setup(ctx context.Context) { configs, err := db.QueryRows[types.Config](ctx, "SELECT * FROM configs", nil) if err != nil { log.Error().Err(err).Msg("failed to get configs") @@ -50,7 +49,7 @@ func NewController(configs map[string]types.Config, internalCollector *collector } } -func (c *Controller) Run(ctx context.Context) { +func (c *Controller) Start(ctx context.Context) { go c.Collector.Start(ctx) go func() { for { @@ -73,7 +72,15 @@ func (c *Controller) Run(ctx context.Context) { for symbol := range c.configs { go c.broadcastDataForSymbol(symbol) } +} +func (c *Controller) configIdToSymbol(id int32) string { + for symbol, config := range c.configs { + if config.ID == id { + return symbol + } + } + return "" } func (c *Controller) broadcastDataForSymbol(symbol string) { @@ -90,6 +97,35 @@ func (c *Controller) broadcastDataForSymbol(symbol string) { } } +func (c *Controller) handleWebsocket(conn *websocket.Conn) { + c.register <- conn + defer func() { + c.unregister <- conn + conn.Close() + }() + + for { + var msg Subscription + if err := conn.ReadJSON(&msg); err != nil { + log.Error().Err(err).Msg("failed to read message") + return + } + + if msg.Method == "SUBSCRIBE" { + if c.clients[conn] == nil { + c.clients[conn] = make(map[string]bool) + } + for _, param := range msg.Params { + symbol := strings.TrimPrefix(param, "submission@") + if _, ok := c.configs[symbol]; !ok { + continue + } + c.clients[conn][symbol] = true + } + } + } +} + func (c *Controller) getLatestSubmissionData(ctx context.Context) ([]aggregator.SubmissionData, error) { globalAggregateKeyList := make([]string, 0, len(c.configs)) for _, config := range c.configs { @@ -154,44 +190,6 @@ func (c *Controller) getLatestSubmissionDataSingle(ctx context.Context, symbol s }, nil } -func (c *Controller) handleWebsocket(conn *websocket.Conn) { - c.register <- conn - defer func() { - c.unregister <- conn - conn.Close() - }() - - for { - var msg Subscription - if err := conn.ReadJSON(&msg); err != nil { - log.Error().Err(err).Msg("failed to read message") - return - } - - if msg.Method == "SUBSCRIBE" { - if c.clients[conn] == nil { - c.clients[conn] = make(map[string]bool) - } - for _, param := range msg.Params { - symbol := strings.TrimPrefix(param, "submission@") - if _, ok := c.configs[symbol]; !ok { - continue - } - c.clients[conn][symbol] = true - } - } - } -} - -func (c *Controller) configIdToSymbol(id int32) string { - for symbol, config := range c.configs { - if config.ID == id { - return symbol - } - } - return "" -} - func getLatestFeeds(c *fiber.Ctx) error { submissionData, err := ApiController.getLatestSubmissionData(c.Context()) if err != nil { @@ -220,7 +218,9 @@ func getLatestFeed(c *fiber.Ctx) error { return errors.New("symbol should be in {BASE}-{QUOTE} format") } - symbol = strings.ToUpper(symbol) + if !strings.Contains(symbol, "test") { + symbol = strings.ToUpper(symbol) + } submissionData, err := ApiController.getLatestSubmissionDataSingle(c.Context(), symbol) if err != nil { diff --git a/node/pkg/dal/api/types.go b/node/pkg/dal/api/types.go index 3e16967e2..5a071aabd 100644 --- a/node/pkg/dal/api/types.go +++ b/node/pkg/dal/api/types.go @@ -13,7 +13,8 @@ type Subscription struct { } type Controller struct { - Collector *collector.Collector + Collector *collector.Collector + configs map[string]types.Config clients map[*websocket.Conn]map[string]bool register chan *websocket.Conn diff --git a/node/pkg/dal/app.go b/node/pkg/dal/app.go index e1217734e..c27512bb5 100644 --- a/node/pkg/dal/app.go +++ b/node/pkg/dal/app.go @@ -19,7 +19,9 @@ func Run(ctx context.Context) error { return err } - api.ApiController.Run(ctx) + api.Setup(ctx) + api.ApiController.Start(ctx) + log.Debug().Str("Player", "DAL API").Msg("DAL API collector started") v1 := app.Group("/api/v1") v1.Get("/", func(c *fiber.Ctx) error { diff --git a/node/pkg/dal/collector/collector.go b/node/pkg/dal/collector/collector.go index c0417eb61..066f168b1 100644 --- a/node/pkg/dal/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -21,8 +21,8 @@ import ( const ( DefaultDecimals = "8" - GetAllOracles = "function getAllOracles() public view returns (address[])" - OracleAdded = "event OracleAdded(address oracle, uint256 expirationTime)" + GetAllOracles = "getAllOracles() public view returns (address[] memory)" + OracleAdded = "OracleAdded(address oracle, uint256 expirationTime)" ) type Collector struct { @@ -32,10 +32,11 @@ type Collector struct { FeedHashes map[int32][]byte CachedWhitelist []klaytncommon.Address + Ctx context.Context + CancelFunc context.CancelFunc + chainReader *websocketchainreader.ChainReader submissionProxyContractAddr string - ctx context.Context - cancelFunc context.CancelFunc mu sync.RWMutex } @@ -82,22 +83,23 @@ func NewCollector(ctx context.Context, configs []types.Config) (*Collector, erro } func (c *Collector) Start(ctx context.Context) { - if c.ctx != nil { + if c.Ctx != nil { log.Debug().Str("Player", "DalCollector").Msg("Collector already running") return } ctxWithCancel, cancel := context.WithCancel(ctx) - c.cancelFunc = cancel - c.ctx = ctxWithCancel + c.CancelFunc = cancel + c.Ctx = ctxWithCancel c.receive(ctxWithCancel) c.trackOracleAdded(ctxWithCancel) } func (c *Collector) Stop() { - if c.cancelFunc != nil { - c.cancelFunc() + if c.CancelFunc != nil { + c.CancelFunc() + c.Ctx = nil } } diff --git a/node/pkg/dal/collector/utils.go b/node/pkg/dal/collector/utils.go index dbd299245..e3722b1e4 100644 --- a/node/pkg/dal/collector/utils.go +++ b/node/pkg/dal/collector/utils.go @@ -50,19 +50,21 @@ func subscribeAddOracleEvent(ctx context.Context, chainReader *websocketchainrea return err } - for eventLog := range logChannel { - result, err := oracleAddedEventABI.Unpack(eventName, eventLog.Data) - if err != nil { - continue + go func() { + for eventLog := range logChannel { + result, err := oracleAddedEventABI.Unpack(eventName, eventLog.Data) + if err != nil { + continue + } + + _, ok := result[0].(klaytncommon.Address) + if !ok { + continue + } + + isUpdated <- true } - - _, ok := result[0].(klaytncommon.Address) - if !ok { - continue - } - - isUpdated <- true - } + }() return nil } diff --git a/node/pkg/dal/tests/api_test.go b/node/pkg/dal/tests/api_test.go new file mode 100644 index 000000000..30359fcf8 --- /dev/null +++ b/node/pkg/dal/tests/api_test.go @@ -0,0 +1,186 @@ +//nolint:all +package test + +import ( + "context" + "testing" + "time" + + "bisonai.com/orakl/node/pkg/admin/tests" + "bisonai.com/orakl/node/pkg/aggregator" + "bisonai.com/orakl/node/pkg/dal/api" + "bisonai.com/orakl/node/pkg/dal/common" + wsfcommon "bisonai.com/orakl/node/pkg/websocketfetcher/common" + "bisonai.com/orakl/node/pkg/wss" + "github.com/stretchr/testify/assert" +) + +func TestApiControllerRun(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + assert.Equal(t, nil, testItems.Controller.Collector.Ctx) + + testItems.Controller.Start(ctx) + time.Sleep(10 * time.Millisecond) + assert.NotEqual(t, nil, testItems.Controller.Collector.Ctx) +} + +func TestApiGetLatestAll(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + testItems.Controller.Start(ctx) + + sampleSubmissionData, err := generateSampleSubmissionData( + testItems.TmpConfig.ID, + int64(15), + time.Now(), + 1, + "test-aggregate", + ) + if err != nil { + t.Fatalf("error generating sample submission data: %v", err) + } + + aggregator.SetLatestGlobalAggregateAndProof(ctx, testItems.TmpConfig.ID, sampleSubmissionData.GlobalAggregate, sampleSubmissionData.Proof) + + result, err := tests.GetRequest[[]common.OutgoingSubmissionData](testItems.App, "/api/v1/dal/latest-data-feeds/all", nil) + if err != nil { + t.Fatalf("error getting latest data: %v", err) + } + + expected, err := testItems.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData) + if err != nil { + t.Fatalf("error converting sample submission data to outgoing data: %v", err) + } + + assert.Greater(t, len(result), 0) + if len(result) > 0 { + assert.Equal(t, *expected, result[0]) + } +} + +func TestApiGetLatest(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + testItems.Controller.Start(ctx) + + sampleSubmissionData, err := generateSampleSubmissionData( + testItems.TmpConfig.ID, + int64(15), + time.Now(), + 1, + "test-aggregate", + ) + if err != nil { + t.Fatalf("error generating sample submission data: %v", err) + } + + aggregator.SetLatestGlobalAggregateAndProof(ctx, testItems.TmpConfig.ID, sampleSubmissionData.GlobalAggregate, sampleSubmissionData.Proof) + + result, err := tests.GetRequest[common.OutgoingSubmissionData](testItems.App, "/api/v1/dal/latest-data-feeds/test-aggregate", nil) + if err != nil { + t.Fatalf("error getting latest data: %v", err) + } + + expected, err := testItems.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData) + if err != nil { + t.Fatalf("error converting sample submission data to outgoing data: %v", err) + } + + assert.Equal(t, *expected, result) +} + +func TestApiWebsocket(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + testItems.Controller.Start(ctx) + + go testItems.App.Listen(":8090") + + conn, err := wss.NewWebsocketHelper(ctx, wss.WithEndpoint("ws://localhost:8090/api/v1/dal/ws")) + if err != nil { + t.Fatalf("error creating websocket helper: %v", err) + } + + err = conn.Dial(ctx) + if err != nil { + t.Fatalf("error dialing websocket: %v", err) + } + + err = conn.Write(ctx, api.Subscription{ + Method: "SUBSCRIBE", + Params: []string{"submission@test-aggregate"}, + }) + if err != nil { + t.Fatalf("error subscribing to websocket: %v", err) + } + + sampleSubmissionData, err := generateSampleSubmissionData( + testItems.TmpConfig.ID, + int64(15), + time.Now(), + 1, + "test-aggregate", + ) + if err != nil { + t.Fatalf("error generating sample submission data: %v", err) + } + + testPublishData(ctx, *sampleSubmissionData) + + expected, err := testItems.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData) + if err != nil { + t.Fatalf("error converting sample submission data to outgoing data: %v", err) + } + + ch := make(chan any) + go conn.Read(ctx, ch) + sample := <-ch + + result, err := wsfcommon.MessageToStruct[common.OutgoingSubmissionData](sample.(map[string]any)) + if err != nil { + t.Fatalf("error converting sample to struct: %v", err) + } + assert.Equal(t, *expected, result) + + err = conn.Close() + if err != nil { + t.Fatalf("error closing websocket: %v", err) + } +} diff --git a/node/pkg/dal/tests/collector_test.go b/node/pkg/dal/tests/collector_test.go new file mode 100644 index 000000000..1c6033fa3 --- /dev/null +++ b/node/pkg/dal/tests/collector_test.go @@ -0,0 +1,79 @@ +//nolint:all + +package test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestCollectorStartAndStop(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + collector := testItems.Collector + collector.Start(ctx) + assert.NotEqual(t, nil, collector.Ctx) + assert.Greater(t, len(collector.Symbols), 0) + assert.Greater(t, len(collector.Symbols), 0) + + collector.Stop() + assert.Equal(t, nil, collector.Ctx) +} + +func TestCollectorStream(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + collector := testItems.Collector + collector.Start(ctx) + time.Sleep(10 * time.Millisecond) + + sampleSubmissionData, err := generateSampleSubmissionData( + testItems.TmpConfig.ID, + int64(15), + time.Now(), + 1, + "test-aggregate", + ) + + if err != nil { + t.Fatalf("error generating sample submission data: %v", err) + } + + testPublishData(ctx, *sampleSubmissionData) + + time.Sleep(10 * time.Millisecond) + + expected, err := collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData) + if err != nil { + t.Fatalf("error converting incoming data to outgoing data: %v", err) + } + + select { + case sample := <-collector.OutgoingStream[testItems.TmpConfig.ID]: + assert.NotEqual(t, nil, sample) + assert.Equal(t, *expected, sample) + default: + t.Fatalf("no data received") + } +} diff --git a/node/pkg/dal/tests/main_test.go b/node/pkg/dal/tests/main_test.go new file mode 100644 index 000000000..05d55d399 --- /dev/null +++ b/node/pkg/dal/tests/main_test.go @@ -0,0 +1,124 @@ +//nolint:all + +package test + +import ( + "context" + "os" + "testing" + "time" + + "bisonai.com/orakl/node/pkg/aggregator" + "bisonai.com/orakl/node/pkg/chain/helper" + "bisonai.com/orakl/node/pkg/common/keys" + "bisonai.com/orakl/node/pkg/common/types" + "bisonai.com/orakl/node/pkg/dal/api" + "bisonai.com/orakl/node/pkg/dal/collector" + "bisonai.com/orakl/node/pkg/dal/utils" + "bisonai.com/orakl/node/pkg/db" + "github.com/gofiber/fiber/v2" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +type TestItems struct { + App *fiber.App + Controller *api.Controller + Collector *collector.Collector + TmpConfig types.Config +} + +func testPublishData(ctx context.Context, submissionData aggregator.SubmissionData) { + db.Publish(ctx, keys.SubmissionDataStreamKey(submissionData.GlobalAggregate.ConfigID), submissionData) +} + +func generateSampleSubmissionData(configId int32, value int64, timestamp time.Time, round int32, symbol string) (*aggregator.SubmissionData, error) { + sampleGlobalAggregate := aggregator.GlobalAggregate{ + ConfigID: configId, + Value: value, + Timestamp: timestamp, + Round: round, + } + + signHelper, err := helper.NewSignHelper("") + if err != nil { + return nil, err + } + + rawProof, err := signHelper.MakeGlobalAggregateProof(value, timestamp, symbol) + if err != nil { + return nil, err + } + + proof := aggregator.Proof{ + ConfigID: configId, + Round: round, + Proof: rawProof, + } + + return &aggregator.SubmissionData{ + GlobalAggregate: sampleGlobalAggregate, + Proof: proof, + }, nil +} + +func setup(ctx context.Context) (func() error, *TestItems, error) { + var testItems = new(TestItems) + + tmpConfig, err := db.QueryRow[types.Config]( + ctx, + `INSERT INTO configs (name, fetch_interval, aggregate_interval, submit_interval) VALUES (@name, @fetch_interval, @aggregate_interval, @submit_interval) RETURNING name, id, submit_interval, aggregate_interval, fetch_interval;`, + map[string]any{"name": "test-aggregate", "submit_interval": 15000, "fetch_interval": 15000, "aggregate_interval": 15000}) + if err != nil { + log.Error().Err(err).Msg("error inserting config 0") + return nil, nil, err + } + testItems.TmpConfig = tmpConfig + + app, err := utils.Setup(ctx) + if err != nil { + return nil, nil, err + } + testItems.App = app + api.Setup(ctx) + testItems.Controller = &api.ApiController + testItems.Collector = api.ApiController.Collector + + v1 := app.Group("/api/v1") + api.Routes(v1) + + return cleanup(ctx, testItems), testItems, nil +} + +func cleanup(ctx context.Context, testItems *TestItems) func() error { + return func() error { + err := db.QueryWithoutResult(ctx, "DELETE FROM configs", nil) + if err != nil { + log.Error().Err(err).Msg("error deleting config") + return err + } + err = testItems.App.Shutdown() + if err != nil { + log.Error().Err(err).Msg("error shutting down app") + return err + } + + testItems.Collector.Stop() + + testItems.Controller = nil + testItems.Collector = nil + return nil + } +} + +func TestMain(m *testing.M) { + zerolog.SetGlobalLevel(zerolog.InfoLevel) + // setup + code := m.Run() + + db.ClosePool() + db.CloseRedis() + + // teardown + os.Exit(code) +} diff --git a/node/pkg/dal/utils/utils.go b/node/pkg/dal/utils/utils.go index ac02e230c..21a101f93 100644 --- a/node/pkg/dal/utils/utils.go +++ b/node/pkg/dal/utils/utils.go @@ -18,13 +18,7 @@ import ( ) func Setup(ctx context.Context) (*fiber.App, error) { - _, err := db.GetPool(ctx) - if err != nil { - log.Error().Err(err).Msg("error getting db pool") - return nil, errorSentinel.ErrAdminDbPoolNotFound - } - - _, err = db.GetRedisClient(ctx) + _, err := db.GetRedisClient(ctx) if err != nil { log.Error().Err(err).Msg("error getting redis conn") return nil, errorSentinel.ErrAdminRedisConnNotFound diff --git a/node/taskfiles/taskfile.local.yml b/node/taskfiles/taskfile.local.yml index 979d8fdad..5b9a237aa 100644 --- a/node/taskfiles/taskfile.local.yml +++ b/node/taskfiles/taskfile.local.yml @@ -113,6 +113,10 @@ tasks: dotenv: [".env"] cmds: - go test ./pkg/websocketfetcher/tests -v + test-dal: + dotenv: [".env"] + cmds: + - go test ./pkg/dal/tests -v check-api: dotenv: [".env"] From 3e867853888612f1f625ea9f6060152357e8290f Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 10:14:57 +0900 Subject: [PATCH 05/25] fix: lint fix --- node/pkg/dal/tests/main_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/pkg/dal/tests/main_test.go b/node/pkg/dal/tests/main_test.go index 05d55d399..5c1a36527 100644 --- a/node/pkg/dal/tests/main_test.go +++ b/node/pkg/dal/tests/main_test.go @@ -29,7 +29,7 @@ type TestItems struct { } func testPublishData(ctx context.Context, submissionData aggregator.SubmissionData) { - db.Publish(ctx, keys.SubmissionDataStreamKey(submissionData.GlobalAggregate.ConfigID), submissionData) + _ = db.Publish(ctx, keys.SubmissionDataStreamKey(submissionData.GlobalAggregate.ConfigID), submissionData) } func generateSampleSubmissionData(configId int32, value int64, timestamp time.Time, round int32, symbol string) (*aggregator.SubmissionData, error) { From 68c8eead0cf9d40314bcf1f2486483821cf7504b Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 11:07:57 +0900 Subject: [PATCH 06/25] fix: updates based on feedbacks --- node/.env.example | 7 ++++++- node/pkg/dal/api/controller.go | 26 ++++++++++++++++---------- node/pkg/dal/app.go | 6 +++++- node/pkg/dal/collector/collector.go | 16 ++++++++++------ node/pkg/dal/collector/utils.go | 15 +++++++++++++-- node/pkg/dal/tests/main_test.go | 5 ++++- node/pkg/dal/utils/utils.go | 16 +++++++++++----- 7 files changed, 65 insertions(+), 26 deletions(-) diff --git a/node/.env.example b/node/.env.example index a4b5eb203..df50b73aa 100644 --- a/node/.env.example +++ b/node/.env.example @@ -61,4 +61,9 @@ DAL_API_PORT= # KAIA_WEBSOCKET_URL= # (required) -# SUBMISSION_PROXY_CONTRACT= \ No newline at end of file +# SUBMISSION_PROXY_CONTRACT= + +# (required) +# DATABASE_URL= +# REDIS_HOST= +# REDIS_PORT= \ No newline at end of file diff --git a/node/pkg/dal/api/controller.go b/node/pkg/dal/api/controller.go index cca7adef8..5b05a347c 100644 --- a/node/pkg/dal/api/controller.go +++ b/node/pkg/dal/api/controller.go @@ -18,11 +18,11 @@ import ( var ApiController Controller -func Setup(ctx context.Context) { +func Setup(ctx context.Context) error { configs, err := db.QueryRows[types.Config](ctx, "SELECT * FROM configs", nil) if err != nil { log.Error().Err(err).Msg("failed to get configs") - panic(err) + return err } configMap := make(map[string]types.Config) for _, config := range configs { @@ -31,10 +31,11 @@ func Setup(ctx context.Context) { collector, err := collector.NewCollector(ctx, configs) if err != nil { log.Error().Err(err).Msg("failed to create collector") - panic(err) + return err } ApiController = *NewController(configMap, collector) + return nil } func NewController(configs map[string]types.Config, internalCollector *collector.Collector) *Controller { @@ -85,13 +86,18 @@ func (c *Controller) configIdToSymbol(id int32) string { func (c *Controller) broadcastDataForSymbol(symbol string) { for data := range c.broadcast[symbol] { - for conn := range c.clients { - if _, ok := c.clients[conn][symbol]; ok { - if err := conn.WriteJSON(data); err != nil { - log.Error().Err(err).Msg("failed to write message") - delete(c.clients, conn) - conn.Close() - } + go c.castSubmissionData(&data, &symbol) + } +} + +// pass by pointer to reduce memory copy time +func (c *Controller) castSubmissionData(data *dalcommon.OutgoingSubmissionData, symbol *string) { + for conn := range c.clients { + if _, ok := c.clients[conn][*symbol]; ok { + if err := conn.WriteJSON(*data); err != nil { + log.Error().Err(err).Msg("failed to write message") + delete(c.clients, conn) + conn.Close() } } } diff --git a/node/pkg/dal/app.go b/node/pkg/dal/app.go index c27512bb5..24048b2c4 100644 --- a/node/pkg/dal/app.go +++ b/node/pkg/dal/app.go @@ -19,7 +19,11 @@ func Run(ctx context.Context) error { return err } - api.Setup(ctx) + err = api.Setup(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to setup DAL API server") + return err + } api.ApiController.Start(ctx) log.Debug().Str("Player", "DAL API").Msg("DAL API collector started") diff --git a/node/pkg/dal/collector/collector.go b/node/pkg/dal/collector/collector.go index 066f168b1..f7e9d6dd1 100644 --- a/node/pkg/dal/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -84,7 +84,7 @@ func NewCollector(ctx context.Context, configs []types.Config) (*Collector, erro func (c *Collector) Start(ctx context.Context) { if c.Ctx != nil { - log.Debug().Str("Player", "DalCollector").Msg("Collector already running") + log.Warn().Str("Player", "DalCollector").Msg("Collector already running, skipping start") return } @@ -105,19 +105,23 @@ func (c *Collector) Stop() { func (c *Collector) receive(ctx context.Context) { for id := range c.IncomingStream { - go c.receiveEach(ctx, id) + go func(id int32) { + if err := c.receiveEach(ctx, id); err != nil { + log.Error().Err(err).Str("Player", "DalCollector").Msg("Error in receiveEach goroutine") + } + }(id) } } -func (c *Collector) receiveEach(ctx context.Context, configId int32) { +func (c *Collector) receiveEach(ctx context.Context, configId int32) error { err := db.Subscribe(ctx, keys.SubmissionDataStreamKey(configId), c.IncomingStream[configId]) if err != nil { - log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to subscribe to submission stream") + return err } for { select { case <-ctx.Done(): - return + return nil case data := <-c.IncomingStream[configId]: go c.processIncomingData(ctx, data) } @@ -146,7 +150,7 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data aggrega c.Symbols[data.GlobalAggregate.ConfigID], whitelist) if err != nil { - log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to order proof") + log.Error().Err(err).Str("Player", "DalCollector").Str("Symbol", c.Symbols[data.GlobalAggregate.ConfigID]).Msg("failed to order proof") if errors.Is(err, errorSentinel.ErrReporterSignerNotWhitelisted) { newList, getAllOraclesErr := getAllOracles(ctx, c.chainReader, c.submissionProxyContractAddr) if getAllOraclesErr != nil { diff --git a/node/pkg/dal/collector/utils.go b/node/pkg/dal/collector/utils.go index e3722b1e4..211426c94 100644 --- a/node/pkg/dal/collector/utils.go +++ b/node/pkg/dal/collector/utils.go @@ -22,10 +22,15 @@ func getAllOracles(ctx context.Context, chainReader *websocketchainreader.ChainR } rawResultSlice, ok := rawResult.([]interface{}) if !ok { - return nil, errors.New("failed to cast result to []interface{}") + return nil, errors.New("failed to cast result to []interface{} in getAllOracles") } - return rawResultSlice[0].([]klaytncommon.Address), nil + addresses, ok := rawResultSlice[0].([]klaytncommon.Address) + if !ok { + return nil, errors.New("failed to cast first element to []klaytncommon.Address") + } + + return addresses, nil } func subscribeAddOracleEvent(ctx context.Context, chainReader *websocketchainreader.ChainReader, submissionProxyContractAddr string, isUpdated chan any) error { @@ -51,14 +56,17 @@ func subscribeAddOracleEvent(ctx context.Context, chainReader *websocketchainrea } go func() { + defer close(logChannel) for eventLog := range logChannel { result, err := oracleAddedEventABI.Unpack(eventName, eventLog.Data) if err != nil { + log.Error().Err(err).Msg("failed to unpack event log data in subscribeAddOracleEvent") continue } _, ok := result[0].(klaytncommon.Address) if !ok { + log.Error().Msg("failed to cast result to klaytncommon.Address in subscribeAddOracleEvent") continue } @@ -74,16 +82,19 @@ func orderProof(ctx context.Context, proof []byte, value int64, timestamp time.T hash := chainutils.Value2HashForSign(value, timestamp.Unix(), symbol) proofChunks, err := reporter.SplitProofToChunk(proof) if err != nil { + log.Error().Err(err).Msg("failed to split proof to chunks in orderProof") return nil, err } signers, err := reporter.GetSignerListFromProofs(hash, proofChunks) if err != nil { + log.Error().Err(err).Msg("failed to get signer list from proofs in orderProof") return nil, err } err = reporter.CheckForNonWhitelistedSigners(signers, cachedWhitelist) if err != nil { + log.Error().Err(err).Msg("non-whitelisted signers found in orderProof") return nil, err } diff --git a/node/pkg/dal/tests/main_test.go b/node/pkg/dal/tests/main_test.go index 5c1a36527..1139b3dfe 100644 --- a/node/pkg/dal/tests/main_test.go +++ b/node/pkg/dal/tests/main_test.go @@ -80,7 +80,10 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { return nil, nil, err } testItems.App = app - api.Setup(ctx) + err = api.Setup(ctx) + if err != nil { + return nil, nil, err + } testItems.Controller = &api.ApiController testItems.Collector = api.ApiController.Collector diff --git a/node/pkg/dal/utils/utils.go b/node/pkg/dal/utils/utils.go index 21a101f93..48883361f 100644 --- a/node/pkg/dal/utils/utils.go +++ b/node/pkg/dal/utils/utils.go @@ -18,9 +18,15 @@ import ( ) func Setup(ctx context.Context) (*fiber.App, error) { - _, err := db.GetRedisClient(ctx) + _, err := db.GetPool(ctx) if err != nil { - log.Error().Err(err).Msg("error getting redis conn") + log.Error().Err(err).Msg("error getting pgs conn in Setup") + return nil, errorSentinel.ErrAdminDbPoolNotFound + } + + _, err = db.GetRedisClient(ctx) + if err != nil { + log.Error().Err(err).Msg("error getting redis conn in Setup") return nil, errorSentinel.ErrAdminRedisConnNotFound } @@ -66,7 +72,8 @@ func CustomErrorHandler(c *fiber.Ctx, err error) error { Str("path", c.Path()). Msg("error") - return c.Status(code).SendString(err.Error()) + c.Set(fiber.HeaderContentType, fiber.MIMEApplicationJSONCharsetUTF8) + return c.Status(code).JSON(fiber.Map{"error": err.Error()}) } func CustomStackTraceHandler(_ *fiber.Ctx, e interface{}) { @@ -74,11 +81,10 @@ func CustomStackTraceHandler(_ *fiber.Ctx, e interface{}) { var failPoint string for _, line := range stackTrace { - if strings.Contains(line, "controller.go") { + if strings.Contains(line, ".go") { path := strings.Split(strings.TrimSpace(line), " ")[0] splitted := strings.Split(path, "/") failPoint = splitted[len(splitted)-2] + "/" + splitted[len(splitted)-1] - break } } From 8d5acdc563c7baf8c17a65a4f3a058b67ab9c41b Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 11:10:51 +0900 Subject: [PATCH 07/25] fix: update based on feedback --- node/pkg/dal/tests/api_test.go | 5 ++++- node/pkg/dal/tests/collector_test.go | 5 ++++- node/pkg/dal/tests/main_test.go | 4 ++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/node/pkg/dal/tests/api_test.go b/node/pkg/dal/tests/api_test.go index 30359fcf8..9b55dbf68 100644 --- a/node/pkg/dal/tests/api_test.go +++ b/node/pkg/dal/tests/api_test.go @@ -162,7 +162,10 @@ func TestApiWebsocket(t *testing.T) { t.Fatalf("error generating sample submission data: %v", err) } - testPublishData(ctx, *sampleSubmissionData) + err = testPublishData(ctx, *sampleSubmissionData) + if err != nil { + t.Fatalf("error publishing sample submission data: %v", err) + } expected, err := testItems.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData) if err != nil { diff --git a/node/pkg/dal/tests/collector_test.go b/node/pkg/dal/tests/collector_test.go index 1c6033fa3..83ee71e8c 100644 --- a/node/pkg/dal/tests/collector_test.go +++ b/node/pkg/dal/tests/collector_test.go @@ -60,7 +60,10 @@ func TestCollectorStream(t *testing.T) { t.Fatalf("error generating sample submission data: %v", err) } - testPublishData(ctx, *sampleSubmissionData) + err = testPublishData(ctx, *sampleSubmissionData) + if err != nil { + t.Fatalf("error publishing data: %v", err) + } time.Sleep(10 * time.Millisecond) diff --git a/node/pkg/dal/tests/main_test.go b/node/pkg/dal/tests/main_test.go index 1139b3dfe..2ab448f4a 100644 --- a/node/pkg/dal/tests/main_test.go +++ b/node/pkg/dal/tests/main_test.go @@ -28,8 +28,8 @@ type TestItems struct { TmpConfig types.Config } -func testPublishData(ctx context.Context, submissionData aggregator.SubmissionData) { - _ = db.Publish(ctx, keys.SubmissionDataStreamKey(submissionData.GlobalAggregate.ConfigID), submissionData) +func testPublishData(ctx context.Context, submissionData aggregator.SubmissionData) error { + return db.Publish(ctx, keys.SubmissionDataStreamKey(submissionData.GlobalAggregate.ConfigID), submissionData) } func generateSampleSubmissionData(configId int32, value int64, timestamp time.Time, round int32, symbol string) (*aggregator.SubmissionData, error) { From 6316eecc8de072d08a8d837a877443891e5488fc Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 11:11:16 +0900 Subject: [PATCH 08/25] feat: include dal into test list --- node/taskfiles/taskfile.local.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/node/taskfiles/taskfile.local.yml b/node/taskfiles/taskfile.local.yml index 5b9a237aa..d16c1640a 100644 --- a/node/taskfiles/taskfile.local.yml +++ b/node/taskfiles/taskfile.local.yml @@ -206,3 +206,4 @@ tasks: - task: test-chain - task: test-wss - task: test-websocketfetcher + - task: test-dal From 540d0efc78727bb59c4e0f701c4a98aef09db1cd Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 11:14:08 +0900 Subject: [PATCH 09/25] test: clear db before testrun --- node/pkg/dal/tests/main_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/node/pkg/dal/tests/main_test.go b/node/pkg/dal/tests/main_test.go index 2ab448f4a..ee3664985 100644 --- a/node/pkg/dal/tests/main_test.go +++ b/node/pkg/dal/tests/main_test.go @@ -65,6 +65,12 @@ func generateSampleSubmissionData(configId int32, value int64, timestamp time.Ti func setup(ctx context.Context) (func() error, *TestItems, error) { var testItems = new(TestItems) + err := db.QueryWithoutResult(ctx, "DELETE FROM configs", nil) + if err != nil { + log.Error().Err(err).Msg("error deleting config") + return nil, nil, err + } + tmpConfig, err := db.QueryRow[types.Config]( ctx, `INSERT INTO configs (name, fetch_interval, aggregate_interval, submit_interval) VALUES (@name, @fetch_interval, @aggregate_interval, @submit_interval) RETURNING name, id, submit_interval, aggregate_interval, fetch_interval;`, From c9855eea9bafe4327fec5924acb4968cf9c8b421 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 11:35:51 +0900 Subject: [PATCH 10/25] test: separate github action test --- .github/workflows/dal.test.yaml | 97 +++++++++++++++++++++++++++++++ node/taskfiles/taskfile.local.yml | 1 - 2 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/dal.test.yaml diff --git a/.github/workflows/dal.test.yaml b/.github/workflows/dal.test.yaml new file mode 100644 index 000000000..ed748fd7b --- /dev/null +++ b/.github/workflows/dal.test.yaml @@ -0,0 +1,97 @@ +name: "orakl node test" + +on: + push: + branches-ignore: + - "master" + paths: + - "node/pkg/dal/**" + workflow_dispatch: + +jobs: + core-build: + strategy: + fail-fast: false + runs-on: ubuntu-latest + timeout-minutes: 10 + + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + POSTGRES_DB: orakl-test + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + redis: + image: redis + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 6379:6379 + + steps: + - uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v4 + with: + go-version: "1.22.3" + check-latest: true + cache-dependency-path: | + ./node/go.sum + + - name: Run lint + uses: golangci/golangci-lint-action@v3 + with: + version: v1.54 + working-directory: node + skip-pkg-cache: true + skip-build-cache: true + args: --timeout=10m + + - name: Run Vet + run: | + cd ./node + go install golang.org/x/tools/go/analysis/passes/shadow/cmd/shadow@latest + go vet ./... + go vet -vettool=$(which shadow) ./... + + - name: Install golang-migrate + run: | + curl -L https://github.com/golang-migrate/migrate/releases/download/v4.17.0/migrate.linux-amd64.tar.gz | tar xvz + sudo mv ./migrate /usr/bin + + - name: Migrate up + run: | + cd ./node + migrate -database "postgresql://postgres:postgres@localhost:5432/orakl-test?search_path=public&sslmode=disable" -verbose -path ./migrations/node up + + - name: Install dependencies + run: | + cd ./node + go mod tidy + + - name: Install Task + uses: arduino/setup-task@v2 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Run test + run: | + cd ./node + task local:test-dal + env: + DATABASE_URL: "postgresql://postgres:postgres@localhost:5432/orakl-test?search_path=public" + REDIS_HOST: "localhost" + REDIS_PORT: "6379" + KAIA_WEBSOCKET_URL: "wss://public-en-baobab.klaytn.net/ws" + SUBMISSION_PROXY_CONTRACT: "0x35bA1102A4954147272782302856BD8440227B85" diff --git a/node/taskfiles/taskfile.local.yml b/node/taskfiles/taskfile.local.yml index d16c1640a..5b9a237aa 100644 --- a/node/taskfiles/taskfile.local.yml +++ b/node/taskfiles/taskfile.local.yml @@ -206,4 +206,3 @@ tasks: - task: test-chain - task: test-wss - task: test-websocketfetcher - - task: test-dal From 32d1c26d194f7c51add39578412f219f3216edf7 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 11:37:02 +0900 Subject: [PATCH 11/25] fix: rename github action name --- .github/workflows/dal.test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/dal.test.yaml b/.github/workflows/dal.test.yaml index ed748fd7b..a848f2e56 100644 --- a/.github/workflows/dal.test.yaml +++ b/.github/workflows/dal.test.yaml @@ -1,4 +1,4 @@ -name: "orakl node test" +name: "orakl dal test" on: push: From 37312dc6db22783412722450306aebc04c884fe1 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 14:02:59 +0900 Subject: [PATCH 12/25] feat: add dal consumer script,update feedHash type --- node/pkg/dal/collector/collector.go | 2 +- node/pkg/dal/common/types.go | 12 ++-- node/script/test_dal_consumer/main.go | 79 +++++++++++++++++++++++++++ node/taskfiles/taskfile.local.yml | 4 ++ 4 files changed, 90 insertions(+), 7 deletions(-) create mode 100644 node/script/test_dal_consumer/main.go diff --git a/node/pkg/dal/collector/collector.go b/node/pkg/dal/collector/collector.go index f7e9d6dd1..390a649a6 100644 --- a/node/pkg/dal/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -168,7 +168,7 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data aggrega Value: strconv.FormatInt(data.GlobalAggregate.Value, 10), AggregateTime: strconv.FormatInt(data.GlobalAggregate.Timestamp.Unix(), 10), Proof: orderedProof, - FeedHash: c.FeedHashes[data.GlobalAggregate.ConfigID], + FeedHash: [32]byte(c.FeedHashes[data.GlobalAggregate.ConfigID]), Decimals: DefaultDecimals, }, nil } diff --git a/node/pkg/dal/common/types.go b/node/pkg/dal/common/types.go index 9bad76d6f..ea965877e 100644 --- a/node/pkg/dal/common/types.go +++ b/node/pkg/dal/common/types.go @@ -1,10 +1,10 @@ package common type OutgoingSubmissionData struct { - Symbol string `json:"symbol"` - Value string `json:"value"` - AggregateTime string `json:"aggregateTime"` - Proof []byte `json:"proof"` - FeedHash []byte `json:"feedHash"` - Decimals string `json:"decimals"` + Symbol string `json:"symbol"` + Value string `json:"value"` + AggregateTime string `json:"aggregateTime"` + Proof []byte `json:"proof"` + FeedHash [32]byte `json:"feedHash"` + Decimals string `json:"decimals"` } diff --git a/node/script/test_dal_consumer/main.go b/node/script/test_dal_consumer/main.go new file mode 100644 index 000000000..60bdd2baa --- /dev/null +++ b/node/script/test_dal_consumer/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "context" + "fmt" + "math/big" + "os" + + "bisonai.com/orakl/node/pkg/chain/helper" + "bisonai.com/orakl/node/pkg/dal/common" + "bisonai.com/orakl/node/pkg/utils/request" + "github.com/rs/zerolog/log" +) + +const ( + SINGLE_PAIR = "ADA-USDT" + SUBMIT_WITH_PROOFS = "submit(bytes32[] calldata _feedHashes, int256[] calldata _answers, uint256[] calldata _timestamps, bytes[] calldata _proofs)" +) + +func main() { + ctx := context.Background() + url := fmt.Sprintf("http://localhost:8090/api/v1/dal/latest-data-feeds/%s", SINGLE_PAIR) + contractAddr := os.Getenv("SUBMISSION_PROXY_CONTRACT") + if contractAddr == "" { + log.Error().Msg("Missing SUBMISSION_PROXY_CONTRACT") + panic("Missing SUBMISSION_PROXY_CONTRACT") + } + + kaiaHelper, err := helper.NewChainHelper(ctx) + if err != nil { + log.Error().Err(err).Msg("NewTxHelper") + panic(err) + } + + result, err := request.Request[common.OutgoingSubmissionData](request.WithEndpoint(url)) + if err != nil { + log.Error().Err(err).Str("Player", "TestConsumer").Msg("failed to get data feed") + panic(err) + } + + var submissionVal big.Int + _, success := submissionVal.SetString(result.Value, 10) + if !success { + log.Error().Str("Player", "TestConsumer").Msg("failed to convert string to big int") + panic("failed to convert string to big int") + } + + var submissionTime big.Int + _, success = submissionTime.SetString(result.AggregateTime, 10) + if !success { + log.Error().Str("Player", "TestConsumer").Msg("failed to convert string to big int") + panic("failed to convert string to big int") + } + + feedHashes := [][32]byte{result.FeedHash} + values := []*big.Int{&submissionVal} + timestamps := []*big.Int{&submissionTime} + proofs := [][]byte{result.Proof} + + rawTx, err := kaiaHelper.MakeDirectTx(ctx, contractAddr, SUBMIT_WITH_PROOFS, feedHashes, values, timestamps, proofs) + if err != nil { + log.Error().Err(err).Msg("MakeDirect") + panic(err) + } + + log.Debug().Any("feedHashes", feedHashes).Msg("feedHashes") + log.Debug().Any("values", values).Msg("values") + log.Debug().Any("timestamps", timestamps).Msg("timestamps") + log.Debug().Any("proofs", proofs).Msg("proofs") + + log.Debug().Any("tx", rawTx).Msg("tx") + + err = kaiaHelper.SubmitRawTx(ctx, rawTx) + if err != nil { + log.Error().Err(err).Msg("SubmitRawTx") + panic(err) + } + +} diff --git a/node/taskfiles/taskfile.local.yml b/node/taskfiles/taskfile.local.yml index 5b9a237aa..e569b3423 100644 --- a/node/taskfiles/taskfile.local.yml +++ b/node/taskfiles/taskfile.local.yml @@ -62,6 +62,10 @@ tasks: dotenv: [".env"] cmds: - go run ./script/test_websocketdexfetcher/main.go + script-test-dal: + dotenv: [".env"] + cmds: + - go run ./script/test_dal_consumer/main.go test-db: dotenv: [".env"] From 98865b57420205bc2894ec5e6e40846c28896c5c Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 14:06:48 +0900 Subject: [PATCH 13/25] feat: update github action --- .github/workflows/node.test.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/node.test.yaml b/.github/workflows/node.test.yaml index d653afb09..59c5c400c 100644 --- a/.github/workflows/node.test.yaml +++ b/.github/workflows/node.test.yaml @@ -6,6 +6,10 @@ on: - "master" paths: - "node/**" + paths-ignore: + - "node/pkg/dal/**" + - "node/pkg/boot/**" + - "node/migrations/boot/**" workflow_dispatch: jobs: From c9cdb7b9efe4054b9b4e25b3c57ceb55bd9d9d90 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 14:07:50 +0900 Subject: [PATCH 14/25] feat: include signer pk --- .github/workflows/dal.test.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/dal.test.yaml b/.github/workflows/dal.test.yaml index a848f2e56..843e629ce 100644 --- a/.github/workflows/dal.test.yaml +++ b/.github/workflows/dal.test.yaml @@ -95,3 +95,4 @@ jobs: REDIS_PORT: "6379" KAIA_WEBSOCKET_URL: "wss://public-en-baobab.klaytn.net/ws" SUBMISSION_PROXY_CONTRACT: "0x35bA1102A4954147272782302856BD8440227B85" + SIGNER_PK: ${{ secrets.TEST_DELEGATOR_REPORTER_PK}} From fd98173b1b266f6a93f79d22328b221c7839bb8c Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jul 2024 15:57:30 +0900 Subject: [PATCH 15/25] fix: rollback github action condition --- .github/workflows/node.test.yaml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/node.test.yaml b/.github/workflows/node.test.yaml index 59c5c400c..d653afb09 100644 --- a/.github/workflows/node.test.yaml +++ b/.github/workflows/node.test.yaml @@ -6,10 +6,6 @@ on: - "master" paths: - "node/**" - paths-ignore: - - "node/pkg/dal/**" - - "node/pkg/boot/**" - - "node/migrations/boot/**" workflow_dispatch: jobs: From e658ddcb38205f5ce432b01b9d6727428b7d1545 Mon Sep 17 00:00:00 2001 From: Nick <148735107+nick-bisonai@users.noreply.github.com> Date: Fri, 5 Jul 2024 21:32:18 +0900 Subject: [PATCH 16/25] (DAL) Module update and add dockerfile (#1722) * fix: module update and add dockerfile * feat: add dal into application options (#1723) --- .github/workflows/deployment.yaml | 1 + dockerfiles/orakl-dal.Dockerfile | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+) create mode 100644 dockerfiles/orakl-dal.Dockerfile diff --git a/.github/workflows/deployment.yaml b/.github/workflows/deployment.yaml index 172e6e373..966b440fd 100644 --- a/.github/workflows/deployment.yaml +++ b/.github/workflows/deployment.yaml @@ -25,6 +25,7 @@ on: - "node" - "boot-api" - "sentinel" + - "dal" image: description: "Image Version" required: true diff --git a/dockerfiles/orakl-dal.Dockerfile b/dockerfiles/orakl-dal.Dockerfile new file mode 100644 index 000000000..c4e52de96 --- /dev/null +++ b/dockerfiles/orakl-dal.Dockerfile @@ -0,0 +1,22 @@ +FROM golang:1.22.3-bullseye as builder + +RUN apt-get update && apt-get install -y curl g++-x86-64-linux-gnu libc6-dev-amd64-cross && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY node node + +WORKDIR /app/node + +RUN CGO_ENABLED=1 CGO_CFLAGS="-O -D__BLST_PORTABLE__" CGO_CFLAGS_ALLOW="-O -D__BLST_PORTABLE__" CC=x86_64-linux-gnu-gcc GOOS=linux GOARCH=amd64 go build -o dalbin -ldflags="-w -s" ./cmd/dal/main.go + +# debian:bullseye-slim +FROM debian@sha256:4b48997afc712259da850373fdbc60315316ee72213a4e77fc5a66032d790b2a + +RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY --from=builder /app/node/dalbin /usr/bin + +CMD ["dalbin"] From b891b0fce2d7502b720d1d8c8f72d5e7f132ffe8 Mon Sep 17 00:00:00 2001 From: Nick <148735107+nick-bisonai@users.noreply.github.com> Date: Sun, 7 Jul 2024 16:04:09 +0900 Subject: [PATCH 17/25] (DAL) Implement basic api key (#1740) * feat: implement basic api key * fix: init header * test: trigger dal test * fix: add log based on coderabbit feedback * test: start subscription before publish from test --- .github/workflows/dal.test.yaml | 1 + node/.env.example | 3 +++ node/pkg/admin/tests/test_helper.go | 11 +++++++-- node/pkg/dal/tests/api_test.go | 15 ++++++++---- node/pkg/dal/utils/utils.go | 18 +++++++++++++++ node/pkg/wss/utils.go | 36 +++++++++++++++++++++-------- 6 files changed, 68 insertions(+), 16 deletions(-) diff --git a/.github/workflows/dal.test.yaml b/.github/workflows/dal.test.yaml index 843e629ce..d83d6b654 100644 --- a/.github/workflows/dal.test.yaml +++ b/.github/workflows/dal.test.yaml @@ -96,3 +96,4 @@ jobs: KAIA_WEBSOCKET_URL: "wss://public-en-baobab.klaytn.net/ws" SUBMISSION_PROXY_CONTRACT: "0x35bA1102A4954147272782302856BD8440227B85" SIGNER_PK: ${{ secrets.TEST_DELEGATOR_REPORTER_PK}} + API_KEY: "MikoTestApiKey" diff --git a/node/.env.example b/node/.env.example index df50b73aa..732edcefb 100644 --- a/node/.env.example +++ b/node/.env.example @@ -63,6 +63,9 @@ DAL_API_PORT= # (required) # SUBMISSION_PROXY_CONTRACT= +# (required) +# API_KEY= + # (required) # DATABASE_URL= # REDIS_HOST= diff --git a/node/pkg/admin/tests/test_helper.go b/node/pkg/admin/tests/test_helper.go index ed443afbf..7933b744d 100644 --- a/node/pkg/admin/tests/test_helper.go +++ b/node/pkg/admin/tests/test_helper.go @@ -6,6 +6,7 @@ import ( "io" "net/http" "net/url" + "os" "testing" "time" @@ -51,12 +52,18 @@ func rawReq(app *fiber.App, method string, endpoint string, requestBody interfac endpoint, body, ) - - req.Header.Set("Content-Type", "application/json") if err != nil { log.Error().Err(err).Msg("failed to create request") return result, err } + + req.Header.Set("Content-Type", "application/json") + + apiKey := os.Getenv("API_KEY") + if apiKey != "" { + req.Header.Set("X-API-Key", apiKey) + } + res, err := app.Test(req, -1) if err != nil { log.Error().Err(err).Msg("failed to call test") diff --git a/node/pkg/dal/tests/api_test.go b/node/pkg/dal/tests/api_test.go index 9b55dbf68..4adc0e9ae 100644 --- a/node/pkg/dal/tests/api_test.go +++ b/node/pkg/dal/tests/api_test.go @@ -3,6 +3,7 @@ package test import ( "context" + "os" "testing" "time" @@ -113,7 +114,6 @@ func TestApiGetLatest(t *testing.T) { if err != nil { t.Fatalf("error converting sample submission data to outgoing data: %v", err) } - assert.Equal(t, *expected, result) } @@ -129,11 +129,17 @@ func TestApiWebsocket(t *testing.T) { } }() + apiKey := os.Getenv("API_KEY") + if apiKey == "" { + t.Fatalf("apiKey required") + } + headers := map[string]string{"X-API-Key": apiKey} + testItems.Controller.Start(ctx) go testItems.App.Listen(":8090") - conn, err := wss.NewWebsocketHelper(ctx, wss.WithEndpoint("ws://localhost:8090/api/v1/dal/ws")) + conn, err := wss.NewWebsocketHelper(ctx, wss.WithEndpoint("ws://localhost:8090/api/v1/dal/ws"), wss.WithRequestHeaders(headers)) if err != nil { t.Fatalf("error creating websocket helper: %v", err) } @@ -167,13 +173,14 @@ func TestApiWebsocket(t *testing.T) { t.Fatalf("error publishing sample submission data: %v", err) } + ch := make(chan any) + go conn.Read(ctx, ch) + expected, err := testItems.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData) if err != nil { t.Fatalf("error converting sample submission data to outgoing data: %v", err) } - ch := make(chan any) - go conn.Read(ctx, ch) sample := <-ch result, err := wsfcommon.MessageToStruct[common.OutgoingSubmissionData](sample.(map[string]any)) diff --git a/node/pkg/dal/utils/utils.go b/node/pkg/dal/utils/utils.go index 48883361f..b1b0e24e3 100644 --- a/node/pkg/dal/utils/utils.go +++ b/node/pkg/dal/utils/utils.go @@ -17,6 +17,23 @@ import ( "github.com/rs/zerolog/log" ) +func APIKeyMiddleware() fiber.Handler { + return func(c *fiber.Ctx) error { + apiKey := c.Get("X-API-Key") + + validAPIKey := os.Getenv("API_KEY") + + if apiKey != validAPIKey { + log.Warn().Msg("Unauthorized access attempt") + return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{ + "error": "Unauthorized", + }) + } + + return c.Next() + } +} + func Setup(ctx context.Context) (*fiber.App, error) { _, err := db.GetPool(ctx) if err != nil { @@ -44,6 +61,7 @@ func Setup(ctx context.Context) (*fiber.App, error) { )) app.Use(cors.New()) + app.Use(APIKeyMiddleware()) return app, nil } diff --git a/node/pkg/wss/utils.go b/node/pkg/wss/utils.go index cb43fc17f..43ef23ad5 100644 --- a/node/pkg/wss/utils.go +++ b/node/pkg/wss/utils.go @@ -22,15 +22,17 @@ type WebsocketHelper struct { Compression bool CustomDialFunc *func(context.Context, string, *websocket.DialOptions) (*websocket.Conn, *http.Response, error) CustomReadFunc *func(context.Context, *websocket.Conn) (map[string]interface{}, error) + RequestHeaders map[string]string } type ConnectionConfig struct { - Endpoint string - Proxy string - Subscriptions []any - Compression bool - DialFunc func(context.Context, string, *websocket.DialOptions) (*websocket.Conn, *http.Response, error) - ReadFunc func(context.Context, *websocket.Conn) (map[string]interface{}, error) + Endpoint string + Proxy string + Subscriptions []any + Compression bool + DialFunc func(context.Context, string, *websocket.DialOptions) (*websocket.Conn, *http.Response, error) + ReadFunc func(context.Context, *websocket.Conn) (map[string]interface{}, error) + RequestHeaders map[string]string } type ConnectionOption func(*ConnectionConfig) @@ -74,6 +76,12 @@ func WithCompressionMode() ConnectionOption { } } +func WithRequestHeaders(headers map[string]string) ConnectionOption { + return func(c *ConnectionConfig) { + c.RequestHeaders = headers + } +} + func NewWebsocketHelper(ctx context.Context, opts ...ConnectionOption) (*WebsocketHelper, error) { config := &ConnectionConfig{} for _, opt := range opts { @@ -90,10 +98,11 @@ func NewWebsocketHelper(ctx context.Context, opts ...ConnectionOption) (*Websock } ws := &WebsocketHelper{ - Endpoint: config.Endpoint, - Subscriptions: config.Subscriptions, - Proxy: config.Proxy, - Compression: config.Compression, + Endpoint: config.Endpoint, + Subscriptions: config.Subscriptions, + Proxy: config.Proxy, + Compression: config.Compression, + RequestHeaders: config.RequestHeaders, } if config.DialFunc != nil { @@ -125,6 +134,13 @@ func (ws *WebsocketHelper) Dial(ctx context.Context) error { } } + if len(ws.RequestHeaders) > 0 { + dialOption.HTTPHeader = http.Header{} + for key, value := range ws.RequestHeaders { + dialOption.HTTPHeader.Add(key, value) + } + } + if ws.Compression { dialOption.CompressionMode = websocket.CompressionContextTakeover } From 593c0908fb5095fbe33e2f2c1dc0ff17d3cb1cf8 Mon Sep 17 00:00:00 2001 From: nick Date: Sun, 7 Jul 2024 16:05:56 +0900 Subject: [PATCH 18/25] fix: update based on feedback --- node/pkg/dal/collector/collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/pkg/dal/collector/collector.go b/node/pkg/dal/collector/collector.go index 390a649a6..86a88f6da 100644 --- a/node/pkg/dal/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -174,7 +174,7 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data aggrega } func (c *Collector) trackOracleAdded(ctx context.Context) { - var eventTriggered chan any + eventTriggered := make(chan any) err := subscribeAddOracleEvent(ctx, c.chainReader, c.submissionProxyContractAddr, eventTriggered) if err != nil { log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to subscribe to oracle added event") From 25443d3e17081da5002a39c5f5674746decfa31b Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 8 Jul 2024 11:24:26 +0900 Subject: [PATCH 19/25] fix: go mod tidy --- node/go.mod | 13 ++++++++----- node/go.sum | 26 ++++++++++++++++---------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/node/go.mod b/node/go.mod index e6a6f208a..12cb011f8 100644 --- a/node/go.mod +++ b/node/go.mod @@ -6,8 +6,9 @@ require ( github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5 github.com/go-playground/validator v9.31.0+incompatible github.com/go-playground/validator/v10 v10.18.0 - github.com/gofiber/fiber/v2 v2.52.0 - github.com/google/uuid v1.5.0 + github.com/gofiber/contrib/websocket v1.3.2 + github.com/gofiber/fiber/v2 v2.52.5 + github.com/google/uuid v1.6.0 github.com/hashicorp/vault/api v1.12.0 github.com/hashicorp/vault/api/auth/kubernetes v0.6.0 github.com/jackc/pgx/v5 v5.5.3 @@ -32,7 +33,7 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/VictoriaMetrics/fastcache v1.12.2 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect - github.com/andybalholm/brotli v1.0.5 // indirect + github.com/andybalholm/brotli v1.1.0 // indirect github.com/aristanetworks/goarista v0.0.0-20191001182449-186a6201b8ef // indirect github.com/aws/aws-sdk-go v1.34.28 // indirect github.com/benbjohnson/clock v1.3.5 // indirect @@ -60,6 +61,7 @@ require ( github.com/edsrzf/mmap-go v1.0.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect github.com/ethereum/c-kzg-4844 v0.4.0 // indirect + github.com/fasthttp/websocket v1.5.8 // indirect github.com/fatih/color v1.16.0 // indirect github.com/fjl/memsize v0.0.2 // indirect github.com/flynn/noise v1.0.0 // indirect @@ -103,7 +105,7 @@ require ( github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect - github.com/klauspost/compress v1.17.2 // indirect + github.com/klauspost/compress v1.17.7 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/koron/go-ssdp v0.0.4 // indirect github.com/leodido/go-urn v1.4.0 // indirect @@ -163,6 +165,7 @@ require ( github.com/rs/cors v1.7.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect + github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 // indirect github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect @@ -171,7 +174,7 @@ require ( github.com/tinylib/msgp v1.1.8 // indirect github.com/urfave/cli/v2 v2.25.7 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.51.0 // indirect + github.com/valyala/fasthttp v1.52.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect github.com/wealdtech/go-eth2-wallet-encryptor-keystorev4 v1.4.1 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect diff --git a/node/go.sum b/node/go.sum index 88aa7f925..56ba60b81 100644 --- a/node/go.sum +++ b/node/go.sum @@ -36,8 +36,8 @@ github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAu github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= -github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= -github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/aristanetworks/fsnotify v1.4.2/go.mod h1:D/rtu7LpjYM8tRJphJ0hUBYpjai8SfX+aSNsWDTq/Ks= github.com/aristanetworks/glog v0.0.0-20180419172825-c15b03b3054f/go.mod h1:KASm+qXFKs/xjSoWn30NrWBBvdTTQq+UjkhjEJHfSFA= @@ -146,6 +146,8 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ethereum/c-kzg-4844 v0.4.0 h1:3MS1s4JtA868KpJxroZoepdV0ZKBp3u/O5HcZ7R3nlY= github.com/ethereum/c-kzg-4844 v0.4.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= +github.com/fasthttp/websocket v1.5.8 h1:k5DpirKkftIF/w1R8ZzjSgARJrs54Je9YJK37DL/Ah8= +github.com/fasthttp/websocket v1.5.8/go.mod h1:d08g8WaT6nnyvg9uMm8K9zMYyDjfKyj3170AtPRuVU0= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= @@ -198,8 +200,10 @@ github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/gofiber/fiber/v2 v2.52.0 h1:S+qXi7y+/Pgvqq4DrSmREGiFwtB7Bu6+QFLuIHYw/UE= -github.com/gofiber/fiber/v2 v2.52.0/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= +github.com/gofiber/contrib/websocket v1.3.2 h1:AUq5PYeKwK50s0nQrnluuINYeep1c4nRCJ0NWsV3cvg= +github.com/gofiber/contrib/websocket v1.3.2/go.mod h1:07u6QGMsvX+sx7iGNCl5xhzuUVArWwLQ3tBIH24i+S8= +github.com/gofiber/fiber/v2 v2.52.5 h1:tWoP1MJQjGEe4GB5TUGOi7P2E0ZMMRx5ZTG4rT+yGMo= +github.com/gofiber/fiber/v2 v2.52.5/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -253,8 +257,8 @@ github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OI github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b h1:RMpPgZTSApbPf7xaVel+QkoGPRLFLrwFO89uDUHEGf0= github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= -github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= -github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= @@ -341,8 +345,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= +github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= @@ -577,6 +581,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511 h1:KanIMPX0QdEdB4R3CiimCAbxFrhB3j7h0/OvpYGVQa8= +github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511/go.mod h1:sM7Mt7uEoCeFSCBM+qBrqvEo+/9vdmj19wzp3yzUhmg= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY= github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM= @@ -650,8 +656,8 @@ github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA= -github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= +github.com/valyala/fasthttp v1.52.0 h1:wqBQpxH71XW0e2g+Og4dzQM8pk34aFYlA1Ga8db7gU0= +github.com/valyala/fasthttp v1.52.0/go.mod h1:hf5C4QnVMkNXMspnsUlfM3WitlgYflyhHYoKol/szxQ= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= From 965c1ac208a00e0ef903ca7fb1cfe07e880e42cb Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 8 Jul 2024 11:50:04 +0900 Subject: [PATCH 20/25] fix: update based on feedback --- node/cmd/dal/main.go | 17 ++++++++++++++--- node/pkg/dal/tests/main_test.go | 3 ++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/node/cmd/dal/main.go b/node/cmd/dal/main.go index 14449113c..786e3d1ab 100644 --- a/node/cmd/dal/main.go +++ b/node/cmd/dal/main.go @@ -2,16 +2,27 @@ package main import ( "context" + "os" + "os/signal" + "syscall" "bisonai.com/orakl/node/pkg/dal" "github.com/rs/zerolog/log" ) func main() { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + cancel() + }() + err := dal.Run(ctx) if err != nil { - log.Fatal().Err(err).Msg("Failed to start dal") - return + log.Fatal().Err(err).Msg("Failed to start DAL") } } diff --git a/node/pkg/dal/tests/main_test.go b/node/pkg/dal/tests/main_test.go index ee3664985..9d28f0f37 100644 --- a/node/pkg/dal/tests/main_test.go +++ b/node/pkg/dal/tests/main_test.go @@ -33,6 +33,7 @@ func testPublishData(ctx context.Context, submissionData aggregator.SubmissionDa } func generateSampleSubmissionData(configId int32, value int64, timestamp time.Time, round int32, symbol string) (*aggregator.SubmissionData, error) { + ctx := context.Background() sampleGlobalAggregate := aggregator.GlobalAggregate{ ConfigID: configId, Value: value, @@ -40,7 +41,7 @@ func generateSampleSubmissionData(configId int32, value int64, timestamp time.Ti Round: round, } - signHelper, err := helper.NewSignHelper("") + signHelper, err := helper.NewSigner(ctx) if err != nil { return nil, err } From af335917335779014e0aa0d6558dd35f473ec936 Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 8 Jul 2024 12:47:09 +0900 Subject: [PATCH 21/25] fix: update based on feedback --- node/pkg/dal/collector/collector.go | 9 +++++---- node/pkg/dal/tests/api_test.go | 4 ++-- node/pkg/dal/tests/collector_test.go | 5 +++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/node/pkg/dal/collector/collector.go b/node/pkg/dal/collector/collector.go index 86a88f6da..9f2680f33 100644 --- a/node/pkg/dal/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -32,7 +32,7 @@ type Collector struct { FeedHashes map[int32][]byte CachedWhitelist []klaytncommon.Address - Ctx context.Context + IsRunning bool CancelFunc context.CancelFunc chainReader *websocketchainreader.ChainReader @@ -83,14 +83,15 @@ func NewCollector(ctx context.Context, configs []types.Config) (*Collector, erro } func (c *Collector) Start(ctx context.Context) { - if c.Ctx != nil { + if c.IsRunning { log.Warn().Str("Player", "DalCollector").Msg("Collector already running, skipping start") return } + c.IsRunning = true ctxWithCancel, cancel := context.WithCancel(ctx) c.CancelFunc = cancel - c.Ctx = ctxWithCancel + c.IsRunning = true c.receive(ctxWithCancel) c.trackOracleAdded(ctxWithCancel) @@ -99,7 +100,7 @@ func (c *Collector) Start(ctx context.Context) { func (c *Collector) Stop() { if c.CancelFunc != nil { c.CancelFunc() - c.Ctx = nil + c.IsRunning = false } } diff --git a/node/pkg/dal/tests/api_test.go b/node/pkg/dal/tests/api_test.go index 4adc0e9ae..9b596d3a4 100644 --- a/node/pkg/dal/tests/api_test.go +++ b/node/pkg/dal/tests/api_test.go @@ -28,11 +28,11 @@ func TestApiControllerRun(t *testing.T) { } }() - assert.Equal(t, nil, testItems.Controller.Collector.Ctx) + assert.False(t, testItems.Controller.Collector.IsRunning) testItems.Controller.Start(ctx) time.Sleep(10 * time.Millisecond) - assert.NotEqual(t, nil, testItems.Controller.Collector.Ctx) + assert.True(t, testItems.Controller.Collector.IsRunning) } func TestApiGetLatestAll(t *testing.T) { diff --git a/node/pkg/dal/tests/collector_test.go b/node/pkg/dal/tests/collector_test.go index 83ee71e8c..01060a7a2 100644 --- a/node/pkg/dal/tests/collector_test.go +++ b/node/pkg/dal/tests/collector_test.go @@ -24,12 +24,13 @@ func TestCollectorStartAndStop(t *testing.T) { collector := testItems.Collector collector.Start(ctx) - assert.NotEqual(t, nil, collector.Ctx) + assert.True(t, collector.IsRunning) + assert.Greater(t, len(collector.Symbols), 0) assert.Greater(t, len(collector.Symbols), 0) collector.Stop() - assert.Equal(t, nil, collector.Ctx) + assert.False(t, collector.IsRunning) } func TestCollectorStream(t *testing.T) { From 66186cbf95fb3eb741ebb171e87919ff09077276 Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 8 Jul 2024 12:54:33 +0900 Subject: [PATCH 22/25] feat: add missing env into github action test --- .github/workflows/dal.test.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/dal.test.yaml b/.github/workflows/dal.test.yaml index d83d6b654..2416de9de 100644 --- a/.github/workflows/dal.test.yaml +++ b/.github/workflows/dal.test.yaml @@ -93,6 +93,7 @@ jobs: DATABASE_URL: "postgresql://postgres:postgres@localhost:5432/orakl-test?search_path=public" REDIS_HOST: "localhost" REDIS_PORT: "6379" + KAIA_PROVIDER_URL: "https://public-en.kairos.node.kaia.io" KAIA_WEBSOCKET_URL: "wss://public-en-baobab.klaytn.net/ws" SUBMISSION_PROXY_CONTRACT: "0x35bA1102A4954147272782302856BD8440227B85" SIGNER_PK: ${{ secrets.TEST_DELEGATOR_REPORTER_PK}} From e3600eaf08347e4d55c1dc6763ffe233b633f7b4 Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 8 Jul 2024 12:59:58 +0900 Subject: [PATCH 23/25] fix: uncomment .env.example `API_KEY` --- node/.env.example | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/.env.example b/node/.env.example index 732edcefb..32c2813b3 100644 --- a/node/.env.example +++ b/node/.env.example @@ -64,7 +64,7 @@ DAL_API_PORT= # SUBMISSION_PROXY_CONTRACT= # (required) -# API_KEY= +API_KEY= # (required) # DATABASE_URL= From 32f1ada7663428da10b83ebce6b1cd0fb35d332b Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 8 Jul 2024 13:03:23 +0900 Subject: [PATCH 24/25] test: tmp file to trigger test --- node/pkg/dal/tmp.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 node/pkg/dal/tmp.txt diff --git a/node/pkg/dal/tmp.txt b/node/pkg/dal/tmp.txt new file mode 100644 index 000000000..e69de29bb From 90bf7e888c0bc4132e1045733550722fd6345045 Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 8 Jul 2024 13:13:44 +0900 Subject: [PATCH 25/25] fix: remove tmp file --- node/pkg/dal/tmp.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 node/pkg/dal/tmp.txt diff --git a/node/pkg/dal/tmp.txt b/node/pkg/dal/tmp.txt deleted file mode 100644 index e69de29bb..000000000