diff --git a/node/cmd/node/main.go b/node/cmd/node/main.go index fe59331f9..6608a00ed 100644 --- a/node/cmd/node/main.go +++ b/node/cmd/node/main.go @@ -68,7 +68,7 @@ func main() { return } - syncUrl := "http://localhost:" + port + "/api/v1/sync" + syncUrl := "http://localhost:" + port + "/api/v1/config/sync" _, err = http.Post(syncUrl, "application/json", nil) if err != nil { log.Error().Err(err).Msg("Failed to sync from orakl config") diff --git a/node/migrations/000019_feeds_update.down.sql b/node/migrations/000019_feeds_update.down.sql new file mode 100644 index 000000000..28ae8c656 --- /dev/null +++ b/node/migrations/000019_feeds_update.down.sql @@ -0,0 +1,15 @@ +DO $$ +BEGIN + IF EXISTS(SELECT 1 FROM feeds) THEN + DELETE FROM feeds; + END IF; + + IF EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name = 'feeds' AND column_name = 'config_id') THEN + ALTER TABLE feeds DROP COLUMN config_id; + END IF; + + IF NOT EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name = 'feeds' AND column_name = 'adapter_id') THEN + ALTER TABLE feeds ADD COLUMN adapter_id INT8 NOT NULL; + ALTER TABLE feeds ADD CONSTRAINT feeds_adapter_id_fkey FOREIGN KEY(adapter_id) REFERENCES adapters(id) ON DELETE CASCADE; + END IF; +END $$; \ No newline at end of file diff --git a/node/migrations/000019_feeds_update.up.sql b/node/migrations/000019_feeds_update.up.sql new file mode 100644 index 000000000..ef262d198 --- /dev/null +++ b/node/migrations/000019_feeds_update.up.sql @@ -0,0 +1,15 @@ +DO $$ +BEGIN + IF EXISTS(SELECT 1 FROM feeds) THEN + DELETE FROM feeds; + END IF; + + IF EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name = 'feeds' AND column_name = 'adapter_id') THEN + ALTER TABLE feeds DROP COLUMN adapter_id; + END IF; + + IF NOT EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name = 'feeds' AND column_name = 'config_id') THEN + ALTER TABLE feeds ADD COLUMN config_id INT8 NOT NULL; + ALTER TABLE feeds ADD CONSTRAINT feeds_config_id_fkey FOREIGN KEY(config_id) REFERENCES configs(id) ON DELETE CASCADE; + END IF; +END $$; \ No newline at end of file diff --git a/node/migrations/000020_remove_tables.down.sql b/node/migrations/000020_remove_tables.down.sql new file mode 100644 index 000000000..5b6d958fd --- /dev/null +++ b/node/migrations/000020_remove_tables.down.sql @@ -0,0 +1,20 @@ +CREATE TABLE IF NOT EXISTS adapters ( + id SERIAL PRIMARY KEY, + name text NOT NULL, + active bool NOT NULL DEFAULT true, + interval int4 NOT NULL DEFAULT 2000, +); + +CREATE TABLE IF NOT EXISTS aggregators ( + id SERIAL PRIMARY KEY, + name test NOT NULL, + active bool NOT NULL DEFAULT true, + interval int4 NOT NULL DEFAULT 5000, +) + +CREATE TABLE IF NOT EXISTS submission_addresses ( + id SERIAL PRIMARY KEY, + name text NOT NULL, + address text NOT NULL, + interval int4 +) \ No newline at end of file diff --git a/node/migrations/000020_remove_tables.up.sql b/node/migrations/000020_remove_tables.up.sql new file mode 100644 index 000000000..833318fa9 --- /dev/null +++ b/node/migrations/000020_remove_tables.up.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS adapters; +DROP TABLE IF EXISTS aggregators; +DROP TABLE IF EXISTS submission_addresses; \ No newline at end of file diff --git a/node/pkg/admin/adapter/controller.go b/node/pkg/admin/adapter/controller.go deleted file mode 100644 index d28d0f067..000000000 --- a/node/pkg/admin/adapter/controller.go +++ /dev/null @@ -1,301 +0,0 @@ -package adapter - -import ( - "context" - "encoding/json" - "fmt" - "os" - "sync" - - "bisonai.com/orakl/node/pkg/admin/utils" - "bisonai.com/orakl/node/pkg/bus" - "bisonai.com/orakl/node/pkg/db" - "bisonai.com/orakl/node/pkg/utils/request" - "github.com/go-playground/validator" - "github.com/gofiber/fiber/v2" - "github.com/rs/zerolog/log" -) - -type BulkAdapters struct { - Adapters []AdapterInsertModel `json:"result"` -} - -type AdapterModel struct { - Id *int64 `db:"id" json:"id"` - Name string `db:"name" json:"name"` - Active bool `db:"active" json:"active"` - Interval int `db:"interval" json:"interval"` -} - -type FeedModel struct { - Id *int64 `db:"id" json:"id"` - Name string `db:"name" json:"name"` - Definition json.RawMessage `db:"definition" json:"definition"` - AdapterId *int64 `db:"adapter_id" json:"adapterId"` -} - -type FeedInsertModel struct { - Name string `db:"name" json:"name" validate:"required"` - Definition json.RawMessage `db:"definition" json:"definition" validate:"required"` - AdapterId *int64 `db:"adapter_id" json:"adapterId"` -} - -type AdapterInsertModel struct { - Name string `db:"name" json:"name" validate:"required"` - Feeds []FeedInsertModel `json:"feeds"` - Interval *int `db:"interval" json:"interval"` -} - -type AdapterDetailModel struct { - AdapterModel - Feeds []FeedModel `json:"feeds"` -} - -func SyncFromOraklConfig(c *fiber.Ctx) error { - configUrl := getConfigUrl() - - var adapters BulkAdapters - adapters, err := request.GetRequest[BulkAdapters](configUrl, nil, nil) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to get orakl config: " + err.Error()) - } - - errs := make(chan error, len(adapters.Adapters)) - var wg sync.WaitGroup - - validate := validator.New() - maxConcurrency := 20 - sem := make(chan struct{}, maxConcurrency) - - for _, adapter := range adapters.Adapters { - wg.Add(1) - sem <- struct{}{} - go func(adapter AdapterInsertModel) { - defer wg.Done() - defer func() { <-sem }() - - if err = validate.Struct(adapter); err != nil { - log.Error().Err(err).Msg("failed to validate orakl config adapter") - errs <- err - return - } - - row, err := insertAdapter(c.Context(), adapter) - if err != nil { - log.Error().Err(err).Msg("failed to execute adapter insert query") - errs <- err - return - } - - for _, feed := range adapter.Feeds { - feed.AdapterId = row.Id - _, err := db.QueryRow[FeedModel](c.Context(), UpsertFeed, map[string]any{ - "name": feed.Name, - "definition": feed.Definition, - "adapter_id": feed.AdapterId, - }) - if err != nil { - log.Error().Err(err).Msg("failed to execute feed insert query") - errs <- err - continue - } - } - }(adapter) - } - - wg.Wait() - close(errs) - - var errorMessages []string - for err := range errs { - errorMessages = append(errorMessages, err.Error()) - } - - if len(errorMessages) > 0 { - return c.Status(fiber.StatusInternalServerError).JSON(errorMessages) - } - - return c.Status(fiber.StatusOK).SendString("sync successful") -} - -func addFromOraklConfig(c *fiber.Ctx) error { - configUrl := getConfigUrl() - name := c.Params("name") - - if name == "" { - return c.Status(fiber.StatusBadRequest).SendString("name is required") - } - - var adapters BulkAdapters - adapters, err := request.GetRequest[BulkAdapters](configUrl, nil, map[string]string{"Content-Type": "application/json"}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to get orakl config: " + err.Error()) - } - - for _, adapter := range adapters.Adapters { - if adapter.Name == name { - validate := validator.New() - if err = validate.Struct(adapter); err != nil { - log.Error().Err(err).Msg("failed to validate orakl config adapter") - return c.Status(fiber.StatusInternalServerError).SendString("failed to validate orakl config adapter: " + err.Error()) - } - - row, err := insertAdapter(c.Context(), adapter) - if err != nil { - log.Error().Err(err).Msg("failed to execute adapter insert query") - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute adapter insert query: " + err.Error()) - } - - for _, feed := range adapter.Feeds { - feed.AdapterId = row.Id - _, err := db.QueryRow[FeedModel](c.Context(), UpsertFeed, map[string]any{ - "name": feed.Name, - "definition": feed.Definition, - "adapter_id": feed.AdapterId, - }) - if err != nil { - log.Error().Err(err).Msg("failed to execute feed insert query") - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute feed insert query: " + err.Error()) - } - } - - result := AdapterModel{Id: row.Id, Name: row.Name, Active: row.Active} - return c.JSON(result) - } - } - return c.Status(fiber.StatusInternalServerError).SendString("adapter not found in orakl config") -} - -func insert(c *fiber.Ctx) error { - payload := new(AdapterInsertModel) - if err := c.BodyParser(payload); err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to parse body for adapter insert payload: " + err.Error()) - } - - validate := validator.New() - if err := validate.Struct(payload); err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to validate adapter insert payload: " + err.Error()) - } - row, err := insertAdapter(c.Context(), *payload) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute adapter insert query: " + err.Error()) - } - - for _, feed := range payload.Feeds { - feed.AdapterId = row.Id - _, err := db.QueryRow[FeedModel](c.Context(), InsertFeed, map[string]any{ - "name": feed.Name, - "definition": feed.Definition, - "adapter_id": feed.AdapterId, - }) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute feed insert query: " + err.Error()) - } - } - - result := AdapterModel{Id: row.Id, Name: row.Name, Active: row.Active} - - return c.JSON(result) -} - -func get(c *fiber.Ctx) error { - results, err := db.QueryRows[AdapterModel](c.Context(), GetAdapter, nil) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute adapter get query: " + err.Error()) - } - - return c.JSON(results) -} - -func getById(c *fiber.Ctx) error { - id := c.Params("id") - result, err := db.QueryRow[AdapterModel](c.Context(), GetAdapterById, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute adapter get by id query: " + err.Error()) - } - return c.JSON(result) -} - -func getDetailById(c *fiber.Ctx) error { - id := c.Params("id") - adapter, err := db.QueryRow[AdapterModel](c.Context(), GetAdapterById, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute adapter get by id query: " + err.Error()) - } - feeds, err := db.QueryRows[FeedModel](c.Context(), GetFeedsByAdapterId, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute feed get by adapter id query: " + err.Error()) - } - result := AdapterDetailModel{AdapterModel: adapter, Feeds: feeds} - return c.JSON(result) -} - -func deleteById(c *fiber.Ctx) error { - id := c.Params("id") - result, err := db.QueryRow[AdapterModel](c.Context(), DeleteAdapterById, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute adapter delete by id query: " + err.Error()) - } - return c.JSON(result) -} - -func activate(c *fiber.Ctx) error { - id := c.Params("id") - result, err := db.QueryRow[AdapterModel](c.Context(), ActivateAdapter, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute adapter activate query: " + err.Error()) - } - - msg, err := utils.SendMessage(c, bus.FETCHER, bus.ACTIVATE_FETCHER, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to send activate message to fetcher: " + err.Error()) - } - - resp := <-msg.Response - if !resp.Success { - return c.Status(fiber.StatusInternalServerError).SendString("failed to activate adapter: " + resp.Args["error"].(string)) - } - - return c.JSON(result) -} - -func deactivate(c *fiber.Ctx) error { - id := c.Params("id") - result, err := db.QueryRow[AdapterModel](c.Context(), DeactivateAdapter, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute adapter deactivate query: " + err.Error()) - } - - msg, err := utils.SendMessage(c, bus.FETCHER, bus.DEACTIVATE_FETCHER, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to send deactivate message to fetcher: " + err.Error()) - } - - resp := <-msg.Response - if !resp.Success { - return c.Status(fiber.StatusInternalServerError).SendString("failed to deactivate adapter: " + resp.Args["error"].(string)) - } - - return c.JSON(result) -} - -func getConfigUrl() string { - // TODO: add chain validation (currently only supporting baobab and cypress) - chain := os.Getenv("CHAIN") - if chain == "" { - chain = "baobab" - } - return fmt.Sprintf("https://config.orakl.network/%s_adapters.json", chain) -} - -func insertAdapter(ctx context.Context, adapter AdapterInsertModel) (AdapterModel, error) { - if adapter.Interval == nil { - newInterval := 2000 - adapter.Interval = &newInterval - } - - return db.QueryRow[AdapterModel](ctx, UpsertAdapterWithInterval, map[string]any{ - "name": adapter.Name, - "interval": adapter.Interval, - }) -} diff --git a/node/pkg/admin/adapter/queries.go b/node/pkg/admin/adapter/queries.go deleted file mode 100644 index 6026d90ad..000000000 --- a/node/pkg/admin/adapter/queries.go +++ /dev/null @@ -1,40 +0,0 @@ -package adapter - -const ( - InsertAdapter = `INSERT INTO adapters (name) VALUES (@name) RETURNING *;` - - UpsertAdapter = ` - INSERT INTO adapters (name) VALUES (@name) - ON CONFLICT (name) DO UPDATE SET active = true - RETURNING *;` - - UpsertAdapterWithInterval = ` - INSERT INTO adapters (name, interval) VALUES (@name, @interval) - ON CONFLICT (name) DO UPDATE SET active = true, interval = @interval - RETURNING *; - ` - - InsertFeed = `INSERT INTO feeds (name, definition, adapter_id) VALUES (@name, @definition, @adapter_id) RETURNING *;` - - UpsertFeed = ` - INSERT INTO feeds (name, definition, adapter_id) - VALUES (@name, @definition, @adapter_id) - ON CONFLICT (name) DO UPDATE SET - definition = EXCLUDED.definition, - adapter_id = EXCLUDED.adapter_id - RETURNING *;` - - GetAdapter = `SELECT * FROM adapters;` - - GetAdapterByName = `SELECT * FROM adapters WHERE name = @name LIMIT 1;` - - GetAdapterById = `SELECT * FROM adapters WHERE id = @id;` - - GetFeedsByAdapterId = `SELECT * FROM feeds WHERE adapter_id = @id;` - - DeleteAdapterById = `DELETE FROM adapters WHERE id = @id RETURNING *;` - - ActivateAdapter = `UPDATE adapters SET active = true WHERE id = @id RETURNING *;` - - DeactivateAdapter = `UPDATE adapters SET active = false WHERE id = @id RETURNING *;` -) diff --git a/node/pkg/admin/adapter/route.go b/node/pkg/admin/adapter/route.go deleted file mode 100644 index 32446faa7..000000000 --- a/node/pkg/admin/adapter/route.go +++ /dev/null @@ -1,19 +0,0 @@ -package adapter - -import ( - "github.com/gofiber/fiber/v2" -) - -func Routes(router fiber.Router) { - adapter := router.Group("/adapter") - - adapter.Post("", insert) - adapter.Get("", get) - adapter.Post("/sync", SyncFromOraklConfig) - adapter.Post("/sync/:name", addFromOraklConfig) - adapter.Get("/detail/:id", getDetailById) - adapter.Get("/:id", getById) - adapter.Delete("/:id", deleteById) - adapter.Post("/activate/:id", activate) - adapter.Post("/deactivate/:id", deactivate) -} diff --git a/node/pkg/admin/admin.go b/node/pkg/admin/admin.go index 27878b066..e5cfd0ea8 100644 --- a/node/pkg/admin/admin.go +++ b/node/pkg/admin/admin.go @@ -4,14 +4,14 @@ import ( "fmt" "os" - "bisonai.com/orakl/node/pkg/admin/adapter" "bisonai.com/orakl/node/pkg/admin/aggregator" + "bisonai.com/orakl/node/pkg/admin/config" "bisonai.com/orakl/node/pkg/admin/feed" "bisonai.com/orakl/node/pkg/admin/fetcher" "bisonai.com/orakl/node/pkg/admin/providerUrl" "bisonai.com/orakl/node/pkg/admin/proxy" "bisonai.com/orakl/node/pkg/admin/reporter" - "bisonai.com/orakl/node/pkg/admin/submissionAddress" + "bisonai.com/orakl/node/pkg/admin/utils" "bisonai.com/orakl/node/pkg/admin/wallet" "bisonai.com/orakl/node/pkg/bus" @@ -35,17 +35,14 @@ func Run(bus *bus.MessageBus) error { return c.SendString("Orakl Node Admin API") }) - v1.Post("/sync", syncAll) - - adapter.Routes(v1) feed.Routes(v1) proxy.Routes(v1) fetcher.Routes(v1) aggregator.Routes(v1) reporter.Routes(v1) wallet.Routes(v1) - submissionAddress.Routes(v1) providerUrl.Routes(v1) + config.Routes(v1) port := os.Getenv("APP_PORT") if port == "" { @@ -59,22 +56,3 @@ func Run(bus *bus.MessageBus) error { } return nil } - -func syncAll(c *fiber.Ctx) error { - err := adapter.SyncFromOraklConfig(c) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString(err.Error()) - } - - err = aggregator.SyncFromOraklConfig(c) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString(err.Error()) - } - - err = submissionAddress.SyncFromOraklConfig(c) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString(err.Error()) - } - - return c.SendString("synced") -} diff --git a/node/pkg/admin/aggregator/controller.go b/node/pkg/admin/aggregator/controller.go index 65657ae5c..a96114d2b 100644 --- a/node/pkg/admin/aggregator/controller.go +++ b/node/pkg/admin/aggregator/controller.go @@ -1,36 +1,11 @@ package aggregator import ( - "context" - "fmt" - "os" - "sync" - "bisonai.com/orakl/node/pkg/admin/utils" "bisonai.com/orakl/node/pkg/bus" - "bisonai.com/orakl/node/pkg/db" - "bisonai.com/orakl/node/pkg/utils/request" - "github.com/go-playground/validator" "github.com/gofiber/fiber/v2" - "github.com/rs/zerolog/log" ) -type AggregatorModel struct { - Id *int64 `db:"id" json:"id"` - Name string `db:"name" json:"name"` - Active bool `db:"active" json:"active"` - Interval int `db:"interval" json:"interval"` -} - -type AggregatorInsertModel struct { - Name string `db:"name" json:"name" validate:"required"` - Interval *int `db:"interval" json:"aggregateHeartbeat"` -} - -type BulkAggregators struct { - Aggregators []AggregatorInsertModel `json:"result"` -} - func start(c *fiber.Ctx) error { msg, err := utils.SendMessage(c, bus.AGGREGATOR, bus.START_AGGREGATOR_APP, nil) if err != nil { @@ -66,190 +41,3 @@ func refresh(c *fiber.Ctx) error { } return c.SendString("aggregator refreshed") } - -func insert(c *fiber.Ctx) error { - payload := new(AggregatorInsertModel) - if err := c.BodyParser(payload); err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to parse body for aggregator insert payload: " + err.Error()) - } - - validate := validator.New() - if err := validate.Struct(payload); err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to validate aggregator insert payload: " + err.Error()) - } - result, err := insertAggregator(c.Context(), *payload) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute aggregator insert query: " + err.Error()) - } - - return c.JSON(result) -} - -func get(c *fiber.Ctx) error { - result, err := db.QueryRows[AggregatorModel](c.Context(), GetAggregator, nil) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute aggregator get query: " + err.Error()) - } - - return c.JSON(result) -} - -func getById(c *fiber.Ctx) error { - id := c.Params("id") - result, err := db.QueryRow[AggregatorModel](c.Context(), GetAggregatorById, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute aggregator get by id query: " + err.Error()) - } - return c.JSON(result) -} - -func deleteById(c *fiber.Ctx) error { - id := c.Params("id") - result, err := db.QueryRow[AggregatorModel](c.Context(), DeleteAggregatorById, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute aggregator delete by id query: " + err.Error()) - } - return c.JSON(result) -} - -func SyncFromOraklConfig(c *fiber.Ctx) error { - configUrl := getConfigUrl() - - var aggregators BulkAggregators - aggregators, err := request.GetRequest[BulkAggregators](configUrl, nil, nil) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to get aggregators from config: " + err.Error()) - } - - errs := make(chan error, len(aggregators.Aggregators)) - var wg sync.WaitGroup - - validate := validator.New() - maxConcurrency := 20 - sem := make(chan struct{}, maxConcurrency) - - for _, aggregator := range aggregators.Aggregators { - wg.Add(1) - sem <- struct{}{} - go func(aggregator AggregatorInsertModel) { - defer wg.Done() - defer func() { <-sem }() - - if err = validate.Struct(aggregator); err != nil { - log.Error().Err(err).Msg("failed to validate orakl config aggregator") - errs <- err - return - } - _, err := insertAggregator(c.Context(), aggregator) - if err != nil { - log.Error().Err(err).Msg("failed to execute aggregator insert query") - errs <- err - return - } - }(aggregator) - } - wg.Wait() - close(errs) - - var errorMessages []string - for err := range errs { - errorMessages = append(errorMessages, err.Error()) - } - - if len(errorMessages) > 0 { - return c.Status(fiber.StatusInternalServerError).JSON(errorMessages) - } - - return c.Status(fiber.StatusOK).SendString("sync successful") -} - -func addFromOraklConfig(c *fiber.Ctx) error { - configUrl := getConfigUrl() - name := c.Params("name") - - if name == "" { - return c.Status(fiber.StatusBadRequest).SendString("name is required") - } - - var aggregators BulkAggregators - aggregators, err := request.GetRequest[BulkAggregators](configUrl, nil, nil) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to get orakl config: " + err.Error()) - } - - validate := validator.New() - for _, aggregator := range aggregators.Aggregators { - if aggregator.Name == name { - if err := validate.Struct(aggregator); err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to validate orakl config aggregator: " + err.Error()) - } - result, err := insertAggregator(c.Context(), aggregator) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute aggregator insert query: " + err.Error()) - } - return c.JSON(result) - } - } - return c.Status(fiber.StatusNotFound).SendString("aggregator not found in orakl config") -} - -func activate(c *fiber.Ctx) error { - id := c.Params("id") - result, err := db.QueryRow[AggregatorModel](c.Context(), ActivateAggregator, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute aggregator activate query: " + err.Error()) - } - - msg, err := utils.SendMessage(c, bus.AGGREGATOR, bus.ACTIVATE_AGGREGATOR, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to send message to aggregator: " + err.Error()) - } - - resp := <-msg.Response - if !resp.Success { - return c.Status(fiber.StatusInternalServerError).SendString("failed to activate aggregator: " + resp.Args["error"].(string)) - } - - return c.JSON(result) -} - -func deactivate(c *fiber.Ctx) error { - id := c.Params("id") - result, err := db.QueryRow[AggregatorModel](c.Context(), DeactivateAggregator, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute aggregator deactivate query: " + err.Error()) - } - - msg, err := utils.SendMessage(c, bus.AGGREGATOR, bus.DEACTIVATE_AGGREGATOR, map[string]any{"id": id}) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to send message to aggregator: " + err.Error()) - } - - resp := <-msg.Response - if !resp.Success { - return c.Status(fiber.StatusInternalServerError).SendString("failed to deactivate aggregator: " + resp.Args["error"].(string)) - } - - return c.JSON(result) -} - -func getConfigUrl() string { - // TODO: add chain validation (currently only supporting baobab and cypress) - chain := os.Getenv("CHAIN") - if chain == "" { - chain = "baobab" - } - return fmt.Sprintf("https://config.orakl.network/%s_aggregators.json", chain) -} - -func insertAggregator(ctx context.Context, aggregator AggregatorInsertModel) (AggregatorModel, error) { - if aggregator.Interval == nil { - newInterval := 5000 - aggregator.Interval = &newInterval - } - - return db.QueryRow[AggregatorModel](ctx, UpsertAggregatorWithInterval, map[string]any{ - "name": aggregator.Name, - "interval": aggregator.Interval, - }) -} diff --git a/node/pkg/admin/aggregator/queries.go b/node/pkg/admin/aggregator/queries.go deleted file mode 100644 index 276dd8520..000000000 --- a/node/pkg/admin/aggregator/queries.go +++ /dev/null @@ -1,30 +0,0 @@ -package aggregator - -const ( - InsertAggregator = `INSERT INTO aggregators (name) VALUES (@name) RETURNING *;` - - UpsertAggregator = `INSERT INTO aggregators (name) VALUES (@name) - ON CONFLICT (name) DO UPDATE SET active = true - RETURNING *;` - - UpsertAggregatorWithInterval = `INSERT INTO aggregators (name, interval) VALUES (@name, @interval) - ON CONFLICT (name) DO UPDATE SET active = true, interval = @interval - RETURNING *; - ` - - GetAggregator = `SELECT * FROM aggregators;` - - GetAggregatorById = `SELECT * FROM aggregators WHERE id = @id;` - - DeleteAggregatorById = `DELETE FROM aggregators WHERE id = @id RETURNING *;` - - ActivateAggregator = `UPDATE aggregators SET active = true WHERE id = @id RETURNING *;` - - DeactivateAggregator = `UPDATE aggregators SET active = false WHERE id = @id RETURNING *;` - - SyncAggregator = `INSERT INTO aggregators (name) - SELECT name FROM adapters - WHERE NOT EXISTS ( - SELECT 1 FROM aggregators WHERE aggregators.name = adapters.name - ) RETURNING *;` -) diff --git a/node/pkg/admin/aggregator/route.go b/node/pkg/admin/aggregator/route.go index f5320a61f..fdf1c47ce 100644 --- a/node/pkg/admin/aggregator/route.go +++ b/node/pkg/admin/aggregator/route.go @@ -7,18 +7,8 @@ import ( func Routes(router fiber.Router) { aggregator := router.Group("/aggregator") - aggregator.Post("", insert) - aggregator.Get("", get) - aggregator.Post("/start", start) aggregator.Post("/stop", stop) aggregator.Post("/refresh", refresh) - aggregator.Post("/sync/config", SyncFromOraklConfig) - aggregator.Post("/sync/config/:name", addFromOraklConfig) - aggregator.Get("/:id", getById) - aggregator.Delete("/:id", deleteById) - aggregator.Post("/activate/:id", activate) - aggregator.Post("/deactivate/:id", deactivate) - } diff --git a/node/pkg/admin/config/controller.go b/node/pkg/admin/config/controller.go new file mode 100644 index 000000000..430abc60f --- /dev/null +++ b/node/pkg/admin/config/controller.go @@ -0,0 +1,114 @@ +package config + +import ( + "context" + "encoding/json" + "fmt" + "os" + + "bisonai.com/orakl/node/pkg/db" + "bisonai.com/orakl/node/pkg/utils/request" + "github.com/gofiber/fiber/v2" +) + +type BulkConfigs struct { + Configs []ConfigInsertModel `json:"result"` +} + +type FeedInsertModel struct { + Name string `db:"name" json:"name" validate:"required"` + Definition json.RawMessage `db:"definition" json:"definition" validate:"required"` + ConfigId *int64 `db:"config_id" json:"configId"` +} + +type ConfigInsertModel struct { + Name string `db:"name" json:"name"` + Address string `db:"address" json:"address"` + FetchInterval *int `db:"fetch_interval" json:"fetchInterval"` + AggregateInterval *int `db:"aggregate_interval" json:"aggregateInterval"` + SubmitInterval *int `db:"submit_interval" json:"submitInterval"` + Feeds []FeedInsertModel `json:"feeds"` +} + +type ConfigModel struct { + Id int64 `db:"id" json:"id"` + Name string `db:"name" json:"name"` + Address string `db:"address" json:"address"` + FetchInterval *int `db:"fetch_interval" json:"fetchInterval"` + AggregateInterval *int `db:"aggregate_interval" json:"aggregateInterval"` + SubmitInterval *int `db:"submit_interval" json:"submitInterval"` +} + +type ConfigNameIdModel struct { + Name string `db:"name" json:"name"` + Id int64 `db:"id" json:"id"` +} + +func Sync(c *fiber.Ctx) error { + configUrl := getConfigUrl() + bulkConfigs, err := request.GetRequest[BulkConfigs](configUrl, nil, nil) + if err != nil { + return err + } + + err = bulkUpsertConfigs(c.Context(), bulkConfigs.Configs) + if err != nil { + return err + } + + whereValues := make([]interface{}, 0, len(bulkConfigs.Configs)) + for _, config := range bulkConfigs.Configs { + whereValues = append(whereValues, config.Name) + } + + configIds, err := db.BulkSelect[ConfigNameIdModel](c.Context(), "configs", []string{"name", "id"}, []string{"name"}, whereValues) + if err != nil { + return err + } + + configNameIdMap := map[string]int64{} + for _, configId := range configIds { + configNameIdMap[configId.Name] = configId.Id + } + + upsertRows := make([][]any, 0) + for _, config := range bulkConfigs.Configs { + for _, feed := range config.Feeds { + configId, ok := configNameIdMap[config.Name] + if !ok { + continue + } + upsertRows = append(upsertRows, []any{feed.Name, feed.Definition, configId}) + } + } + + return db.BulkUpsert(c.Context(), "feeds", []string{"name", "definition", "config_id"}, upsertRows, []string{"name"}, []string{"definition", "config_id"}) +} + +func Get(c *fiber.Ctx) error { + configs, err := db.QueryRows[ConfigModel](c.Context(), "SELECT * FROM configs", nil) + if err != nil { + return err + } + + return c.JSON(configs) + +} + +func getConfigUrl() string { + chain := os.Getenv("CHAIN") + if chain == "" { + chain = "baobab" + } + + return fmt.Sprintf("https://config.orakl.network/%s_configs.json", chain) +} + +func bulkUpsertConfigs(ctx context.Context, configs []ConfigInsertModel) error { + upsertRows := make([][]any, 0, len(configs)) + for _, config := range configs { + upsertRows = append(upsertRows, []any{config.Name, config.Address, config.FetchInterval, config.AggregateInterval, config.SubmitInterval}) + } + + return db.BulkUpsert(ctx, "configs", []string{"name", "address", "fetch_interval", "aggregate_interval", "submit_interval"}, upsertRows, []string{"name"}, []string{"address", "fetch_interval", "aggregate_interval", "submit_interval"}) +} diff --git a/node/pkg/admin/config/queries.go b/node/pkg/admin/config/queries.go new file mode 100644 index 000000000..7ccd00dda --- /dev/null +++ b/node/pkg/admin/config/queries.go @@ -0,0 +1,13 @@ +package config + +import "strings" + +func generateGetConfigIdsQuery(configs []ConfigInsertModel) (string, []interface{}) { + baseQuery := "SELECT id, name FROM configs WHERE name IN (" + queryArgs := make([]interface{}, 0, len(configs)) + for _, config := range configs { + queryArgs = append(queryArgs, config.Name) + } + + return baseQuery + strings.Repeat("?,", len(configs)-1) + "?" + ")", queryArgs +} diff --git a/node/pkg/admin/config/route.go b/node/pkg/admin/config/route.go new file mode 100644 index 000000000..2bceced82 --- /dev/null +++ b/node/pkg/admin/config/route.go @@ -0,0 +1,10 @@ +package config + +import ( + "github.com/gofiber/fiber/v2" +) + +func Routes(router fiber.Router) { + config := router.Group("/config") + config.Post("/sync", Sync) +} diff --git a/node/pkg/admin/feed/controller.go b/node/pkg/admin/feed/controller.go index 38838288e..63d8f7978 100644 --- a/node/pkg/admin/feed/controller.go +++ b/node/pkg/admin/feed/controller.go @@ -32,9 +32,9 @@ func getById(c *fiber.Ctx) error { return c.JSON(result) } -func getByAdpaterId(c *fiber.Ctx) error { +func getByConfigId(c *fiber.Ctx) error { id := c.Params("id") - results, err := db.QueryRows[FeedModel](c.Context(), GetFeedsByAdapterId, map[string]any{"adapter_id": id}) + results, err := db.QueryRows[FeedModel](c.Context(), GetFeedsByConfigId, map[string]any{"config_id": id}) if err != nil { return c.Status(fiber.StatusInternalServerError).SendString("failed to execute feed get by adapter id query: " + err.Error()) } diff --git a/node/pkg/admin/feed/queries.go b/node/pkg/admin/feed/queries.go index 33faf402e..9a82d4f43 100644 --- a/node/pkg/admin/feed/queries.go +++ b/node/pkg/admin/feed/queries.go @@ -5,5 +5,5 @@ const ( GetFeedById = `SELECT * FROM feeds WHERE id = @id;` - GetFeedsByAdapterId = `SELECT * FROM feeds WHERE adapter_id = @adapter_id;` + GetFeedsByConfigId = `SELECT * FROM feeds WHERE config_id = @config_id;` ) diff --git a/node/pkg/admin/feed/route.go b/node/pkg/admin/feed/route.go index 5aad61b3a..7275dd8a8 100644 --- a/node/pkg/admin/feed/route.go +++ b/node/pkg/admin/feed/route.go @@ -7,7 +7,7 @@ import ( func Routes(router fiber.Router) { feed := router.Group("/feed") feed.Get("", get) - feed.Get("/adapter/:id", getByAdpaterId) + feed.Get("/config/:id", getByConfigId) feed.Get("/:id", getById) } diff --git a/node/pkg/admin/submissionAddress/controller.go b/node/pkg/admin/submissionAddress/controller.go deleted file mode 100644 index 16fac6830..000000000 --- a/node/pkg/admin/submissionAddress/controller.go +++ /dev/null @@ -1,243 +0,0 @@ -package submissionAddress - -import ( - "context" - "fmt" - "os" - "sync" - - "bisonai.com/orakl/node/pkg/db" - "bisonai.com/orakl/node/pkg/utils/request" - "github.com/go-playground/validator" - "github.com/gofiber/fiber/v2" - "github.com/rs/zerolog/log" -) - -type SubmissionAddressModel struct { - Id *int64 `db:"id" json:"id"` - Name string `db:"name" json:"name"` - Address string `db:"address" json:"address"` - Interval *int `db:"interval" json:"heartbeat"` -} - -type SubmissionAddressInsertModel struct { - Name string `db:"name" json:"name" validate:"required"` - Address string `db:"address" json:"address" validate:"required"` - Interval *int `db:"interval" json:"heartbeat"` -} - -type BulkAddresses struct { - Addresses []SubmissionAddressInsertModel `json:"result"` -} - -type AggregatorName struct { - Name string `json:"name"` -} - -func SyncFromOraklConfig(c *fiber.Ctx) error { - configUrl := getConfigUrl() - - var submissionAddresses BulkAddresses - submissionAddresses, err := request.GetRequest[BulkAddresses](configUrl, nil, nil) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to get orakl config: " + err.Error()) - } - - errs := make(chan error, len(submissionAddresses.Addresses)) - var wg sync.WaitGroup - - validate := validator.New() - maxConcurrency := 20 - sem := make(chan struct{}, maxConcurrency) - - for _, address := range submissionAddresses.Addresses { - wg.Add(1) - sem <- struct{}{} - go func(address SubmissionAddressInsertModel) { - defer wg.Done() - defer func() { <-sem }() - - if err := validate.Struct(address); err != nil { - log.Error().Err(err).Str("Name", address.Name).Str("Address", address.Address).Msg("failed to validate submission address") - errs <- err - return - } - - _, err := insertSubmissionAddress(c.Context(), address) - if err != nil { - log.Error().Err(err).Msg("failed to execute submission address insert query") - errs <- err - return - } - }(address) - } - wg.Wait() - close(errs) - - var errorMessages []string - for err := range errs { - errorMessages = append(errorMessages, err.Error()) - } - - if len(errorMessages) > 0 { - return c.Status(fiber.StatusInternalServerError).JSON(errorMessages) - } - - return c.Status(fiber.StatusOK).SendString("sync successful") -} - -func syncWithAggregator(c *fiber.Ctx) error { - pairNames, err := db.QueryRows[AggregatorName](c.Context(), GetAggregatorNames, nil) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute aggregator sync with adapter query: " + err.Error()) - } - - configUrl := getConfigUrl() - - var submissionAddresses BulkAddresses - submissionAddresses, err = request.GetRequest[BulkAddresses](configUrl, nil, nil) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to get orakl config: " + err.Error()) - } - - for _, address := range submissionAddresses.Addresses { - if containsName(pairNames, address.Name) { - _, err := insertSubmissionAddress(c.Context(), address) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute submission address insert query: " + err.Error()) - } - } - } - - return c.Status(fiber.StatusOK).SendString("sync successful") -} - -func addFromOraklConfig(c *fiber.Ctx) error { - configUrl := getConfigUrl() - name := c.Params("name") - - if name == "" { - return c.Status(fiber.StatusBadRequest).SendString("name is required") - } - - var submissionAddresses BulkAddresses - submissionAddresses, err := request.GetRequest[BulkAddresses](configUrl, nil, nil) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to get orakl config: " + err.Error()) - } - - for _, address := range submissionAddresses.Addresses { - if address.Name == name { - result, err := insertSubmissionAddress(c.Context(), address) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute submission address insert query: " + err.Error()) - } - return c.JSON(result) - } - } - return c.Status(fiber.StatusNotFound).SendString("name not found in orakl config") -} - -func insert(c *fiber.Ctx) error { - payload := new(SubmissionAddressInsertModel) - if err := c.BodyParser(payload); err != nil { - return c.Status(fiber.StatusBadRequest).SendString(err.Error()) - } - - validate := validator.New() - if err := validate.Struct(payload); err != nil { - return c.Status(fiber.StatusBadRequest).SendString(err.Error()) - } - - result, err := insertSubmissionAddress(c.Context(), *payload) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute submission address insert query: " + err.Error()) - } - - return c.JSON(result) -} - -func get(c *fiber.Ctx) error { - result, err := db.QueryRows[SubmissionAddressModel](c.Context(), GetSubmissionAddress, nil) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute submission address get query: " + err.Error()) - } - - return c.JSON(result) -} - -func getById(c *fiber.Ctx) error { - id := c.Params("id") - result, err := db.QueryRow[SubmissionAddressModel](c.Context(), GetSubmissionAddressById, map[string]any{ - "id": id, - }) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute submission address get by id query: " + err.Error()) - } - return c.JSON(result) -} - -func deleteById(c *fiber.Ctx) error { - id := c.Params("id") - result, err := db.QueryRow[SubmissionAddressModel](c.Context(), DeleteSubmissionAddressById, map[string]any{ - "id": id, - }) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute submission address delete by id query: " + err.Error()) - } - return c.JSON(result) -} - -func updateById(c *fiber.Ctx) error { - id := c.Params("id") - payload := new(SubmissionAddressInsertModel) - if err := c.BodyParser(payload); err != nil { - return c.Status(fiber.StatusBadRequest).SendString(err.Error()) - } - - validate := validator.New() - if err := validate.Struct(payload); err != nil { - return c.Status(fiber.StatusBadRequest).SendString(err.Error()) - } - - result, err := db.QueryRow[SubmissionAddressModel](c.Context(), UpdateSubmissionAddressById, map[string]any{ - "id": id, - "name": payload.Name, - "address": payload.Address, - }) - if err != nil { - return c.Status(fiber.StatusInternalServerError).SendString("failed to execute submission address update by id query: " + err.Error()) - } - - return c.JSON(result) -} - -func insertSubmissionAddress(ctx context.Context, address SubmissionAddressInsertModel) (SubmissionAddressModel, error) { - result, err := db.QueryRow[SubmissionAddressModel](ctx, UpsertSubmissionAddress, map[string]any{ - "name": address.Name, - "address": address.Address, - "interval": address.Interval, - }) - if err != nil { - return SubmissionAddressModel{}, err - } - return result, nil -} - -func getConfigUrl() string { - // TODO: add chain validation (currently only supporting baobab and cypress) - chain := os.Getenv("CHAIN") - if chain == "" { - chain = "baobab" - } - return fmt.Sprintf("https://config.orakl.network/%s_aggregators.json", chain) -} - -func containsName(names []AggregatorName, target string) bool { - for _, name := range names { - if name.Name == target { - return true - } - } - return false -} diff --git a/node/pkg/admin/submissionAddress/queries.go b/node/pkg/admin/submissionAddress/queries.go deleted file mode 100644 index 65dee84bf..000000000 --- a/node/pkg/admin/submissionAddress/queries.go +++ /dev/null @@ -1,17 +0,0 @@ -package submissionAddress - -const ( - InsertSubmissionAddress = `INSERT INTO submission_addresses (name, address, interval) VALUES (@name, @address, @interval) RETURNING *;` - - UpsertSubmissionAddress = `INSERT INTO submission_addresses (name, address, interval) VALUES (@name, @address, @interval) ON CONFLICT (name) DO UPDATE SET address = @address, interval = @interval RETURNING *;` - - GetAggregatorNames = `SELECT name FROM aggregators WHERE active = true;` - - GetSubmissionAddress = `SELECT * FROM submission_addresses;` - - GetSubmissionAddressById = `SELECT * FROM submission_addresses WHERE id = @id;` - - DeleteSubmissionAddressById = `DELETE FROM submission_addresses WHERE id = @id RETURNING *;` - - UpdateSubmissionAddressById = `UPDATE submission_addresses SET name = @name, address = @address WHERE id = @id RETURNING *;` -) diff --git a/node/pkg/admin/submissionAddress/route.go b/node/pkg/admin/submissionAddress/route.go deleted file mode 100644 index 93d606837..000000000 --- a/node/pkg/admin/submissionAddress/route.go +++ /dev/null @@ -1,18 +0,0 @@ -package submissionAddress - -import ( - "github.com/gofiber/fiber/v2" -) - -func Routes(router fiber.Router) { - submissionAddress := router.Group("/submission-address") - - submissionAddress.Post("/sync/aggregator", syncWithAggregator) - submissionAddress.Post("/sync/config", SyncFromOraklConfig) - submissionAddress.Post("/sync/config/:name", addFromOraklConfig) - submissionAddress.Post("", insert) - submissionAddress.Get("", get) - submissionAddress.Get("/:id", getById) - submissionAddress.Delete("/:id", deleteById) - submissionAddress.Patch("/:id", updateById) -} diff --git a/node/pkg/admin/tests/adapter_test.go b/node/pkg/admin/tests/adapter_test.go deleted file mode 100644 index 80d567482..000000000 --- a/node/pkg/admin/tests/adapter_test.go +++ /dev/null @@ -1,201 +0,0 @@ -//nolint:all -package tests - -import ( - "context" - "strconv" - "testing" - - "bisonai.com/orakl/node/pkg/admin/adapter" - "bisonai.com/orakl/node/pkg/bus" - "bisonai.com/orakl/node/pkg/db" - "github.com/stretchr/testify/assert" -) - -func TestAdapterInsert(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - mockAdapter1 := adapter.AdapterInsertModel{ - Name: "test_adapter_2", - } - - readResultBefore, err := GetRequest[[]adapter.AdapterModel](testItems.app, "/api/v1/adapter", nil) - if err != nil { - t.Fatalf("error getting adapters before: %v", err) - } - - insertResult, err := PostRequest[adapter.AdapterModel](testItems.app, "/api/v1/adapter", mockAdapter1) - if err != nil { - t.Fatalf("error inserting adapter: %v", err) - } - assert.Equal(t, insertResult.Name, mockAdapter1.Name) - - readResultAfter, err := GetRequest[[]adapter.AdapterModel](testItems.app, "/api/v1/adapter", nil) - if err != nil { - t.Fatalf("error getting adapters after: %v", err) - } - - assert.Greaterf(t, len(readResultAfter), len(readResultBefore), "expected to have more adapters after insertion") - - //cleanup - _, err = db.QueryRow[adapter.AdapterModel](context.Background(), adapter.DeleteAdapterById, map[string]any{"id": insertResult.Id}) - if err != nil { - t.Fatalf("error cleaning up test: %v", err) - } -} - -func TestAdapterGet(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResult, err := GetRequest[[]adapter.AdapterModel](testItems.app, "/api/v1/adapter", nil) - if err != nil { - t.Fatalf("error getting adapters: %v", err) - } - assert.Greater(t, len(readResult), 0) -} - -func TestAdapterReadDetailById(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResult, err := GetRequest[adapter.AdapterDetailModel](testItems.app, "/api/v1/adapter/detail/"+strconv.FormatInt(*testItems.tmpData.adapter.Id, 10), nil) - if err != nil { - t.Fatalf("error getting adapter detail: %v", err) - } - assert.Equal(t, readResult.Id, testItems.tmpData.adapter.Id) - assert.NotEmpty(t, readResult.Feeds) -} - -func TestAdapterGetById(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResult, err := GetRequest[adapter.AdapterModel](testItems.app, "/api/v1/adapter/"+strconv.FormatInt(*testItems.tmpData.adapter.Id, 10), nil) - if err != nil { - t.Fatalf("error getting adapter by id: %v", err) - } - assert.Equal(t, readResult.Id, testItems.tmpData.adapter.Id) -} - -func TestAdapterDeleteById(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - mockAdapter1 := adapter.AdapterInsertModel{ - Name: "test_adapter_2", - } - insertResult, err := PostRequest[adapter.AdapterModel](testItems.app, "/api/v1/adapter", mockAdapter1) - if err != nil { - t.Fatalf("error inserting adapter: %v", err) - } - - readResultBefore, err := GetRequest[[]adapter.AdapterModel](testItems.app, "/api/v1/adapter", nil) - if err != nil { - t.Fatalf("error getting adapters before: %v", err) - } - - deleteResult, err := DeleteRequest[adapter.AdapterModel](testItems.app, "/api/v1/adapter/"+strconv.FormatInt(*insertResult.Id, 10), nil) - if err != nil { - t.Fatalf("error deleting adapter: %v", err) - } - assert.Equal(t, deleteResult.Id, insertResult.Id) - - readResultAfter, err := GetRequest[[]adapter.AdapterModel](testItems.app, "/api/v1/adapter", nil) - if err != nil { - t.Fatalf("error getting adapters after: %v", err) - } - - assert.Lessf(t, len(readResultAfter), len(readResultBefore), "expected to have less adapters after deletion") -} - -func TestAdapterDeactivate(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - channel := testItems.mb.Subscribe(bus.FETCHER) - waitForMessage(t, channel, bus.ADMIN, bus.FETCHER, bus.DEACTIVATE_FETCHER) - - deactivateResult, err := PostRequest[adapter.AdapterModel](testItems.app, "/api/v1/adapter/deactivate/"+strconv.FormatInt(*testItems.tmpData.adapter.Id, 10), nil) - if err != nil { - t.Fatalf("error deactivating adapter: %v", err) - } - assert.False(t, deactivateResult.Active) -} - -func TestAdapterActivate(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - channel := testItems.mb.Subscribe(bus.FETCHER) - waitForMessage(t, channel, bus.ADMIN, bus.FETCHER, bus.ACTIVATE_FETCHER) - - // activate - activateResult, err := PostRequest[adapter.AdapterModel](testItems.app, "/api/v1/adapter/activate/"+strconv.FormatInt(*testItems.tmpData.adapter.Id, 10), nil) - if err != nil { - t.Fatalf("error activating adapter: %v", err) - } - assert.True(t, activateResult.Active) - -} - -func TestAdapterSync(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResultBefore, err := GetRequest[[]adapter.AdapterModel](testItems.app, "/api/v1/adapter", nil) - if err != nil { - t.Fatalf("error getting adapters after: %v", err) - } - - _, err = RawPostRequest(testItems.app, "/api/v1/adapter/sync", nil) - if err != nil { - t.Fatalf("error syncing adapter: %v", err) - } - - readResultAfter, err := GetRequest[[]adapter.AdapterModel](testItems.app, "/api/v1/adapter", nil) - if err != nil { - t.Fatalf("error getting adapters after: %v", err) - } - - assert.Greaterf(t, len(readResultAfter), len(readResultBefore), "expected to have more adapters after insertion") - - // cleanup - err = db.QueryWithoutResult(ctx, "DELETE FROM adapters", nil) - if err != nil { - t.Fatalf("error cleaning up test: %v", err) - } -} diff --git a/node/pkg/admin/tests/aggregator_test.go b/node/pkg/admin/tests/aggregator_test.go index 403ab47a2..417177a5e 100644 --- a/node/pkg/admin/tests/aggregator_test.go +++ b/node/pkg/admin/tests/aggregator_test.go @@ -3,12 +3,9 @@ package tests import ( "context" - "strconv" "testing" - "bisonai.com/orakl/node/pkg/admin/aggregator" "bisonai.com/orakl/node/pkg/bus" - "bisonai.com/orakl/node/pkg/db" "github.com/stretchr/testify/assert" ) @@ -70,214 +67,3 @@ func TestAggregatorRefresh(t *testing.T) { assert.Equal(t, string(result), "aggregator refreshed") } - -func TestAggregatorInsert(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - mockAggregator := aggregator.AggregatorInsertModel{ - Name: "test_aggregator_2", - } - - readResultBefore, err := GetRequest[[]aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", nil) - if err != nil { - t.Fatalf("error getting aggregators before: %v", err) - } - - insertResult, err := PostRequest[aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", mockAggregator) - if err != nil { - t.Fatalf("error inserting aggregator: %v", err) - } - - assert.Equal(t, insertResult.Name, mockAggregator.Name) - - readResultAfter, err := GetRequest[[]aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", nil) - if err != nil { - t.Fatalf("error getting aggregators after: %v", err) - } - - assert.Greaterf(t, len(readResultAfter), len(readResultBefore), "expected to have more aggregators after insertion") - - // cleanup - _, err = db.QueryRow[aggregator.AggregatorModel](context.Background(), aggregator.DeleteAggregatorById, map[string]any{"id": insertResult.Id}) - if err != nil { - t.Fatalf("error cleaning up test: %v", err) - } -} - -func TestAggregatorGet(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResult, err := GetRequest[[]aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", nil) - if err != nil { - t.Fatalf("error getting aggregators: %v", err) - } - - assert.Greater(t, len(readResult), 0, "expected to have at least one aggregator") -} - -func TestAggregatorGetById(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResult, err := GetRequest[aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator/"+strconv.FormatInt(*testItems.tmpData.aggregator.Id, 10), nil) - if err != nil { - t.Fatalf("error getting aggregator by id: %v", err) - } - assert.Equal(t, readResult.Id, testItems.tmpData.aggregator.Id) -} - -func TestAggregatorDeleteById(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - mockAggregator := aggregator.AggregatorInsertModel{ - Name: "test_aggregator_2", - } - - insertResult, err := PostRequest[aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", mockAggregator) - if err != nil { - t.Fatalf("error inserting aggregator: %v", err) - } - - readResultBefore, err := GetRequest[[]aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", nil) - if err != nil { - t.Fatalf("error getting aggregators before: %v", err) - } - - deleteResult, err := DeleteRequest[aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator/"+strconv.FormatInt(*insertResult.Id, 10), nil) - if err != nil { - t.Fatalf("error deleting aggregator by id: %v", err) - } - - assert.Equal(t, deleteResult.Id, insertResult.Id) - - readResultAfter, err := GetRequest[[]aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", nil) - if err != nil { - t.Fatalf("error getting aggregators after: %v", err) - } - - assert.Lessf(t, len(readResultAfter), len(readResultBefore), "expected to have less aggregators after deletion") - -} - -func TestAggregatorActivate(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - channel := testItems.mb.Subscribe(bus.AGGREGATOR) - waitForMessage(t, channel, bus.ADMIN, bus.AGGREGATOR, bus.ACTIVATE_AGGREGATOR) - - activateResult, err := PostRequest[aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator/activate/"+strconv.FormatInt(*testItems.tmpData.aggregator.Id, 10), nil) - if err != nil { - t.Fatalf("error activating aggregator: %v", err) - } - assert.True(t, activateResult.Active) - -} - -func TestAggregatorDeactivate(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - channel := testItems.mb.Subscribe(bus.AGGREGATOR) - waitForMessage(t, channel, bus.ADMIN, bus.AGGREGATOR, bus.DEACTIVATE_AGGREGATOR) - - deactivateResult, err := PostRequest[aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator/deactivate/"+strconv.FormatInt(*testItems.tmpData.aggregator.Id, 10), nil) - if err != nil { - t.Fatalf("error deactivating aggregator: %v", err) - } - assert.False(t, deactivateResult.Active) - -} - -func TestAggregatorSyncWithOraklConfig(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResultBefore, err := GetRequest[[]aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", nil) - if err != nil { - t.Fatalf("error getting aggregators before: %v", err) - } - - _, err = RawPostRequest(testItems.app, "/api/v1/aggregator/sync/config", nil) - if err != nil { - t.Fatalf("error syncing aggregator with orakl config: %v", err) - } - - readResultAfter, err := GetRequest[[]aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", nil) - if err != nil { - t.Fatalf("error getting aggregators before: %v", err) - } - - assert.Greaterf(t, len(readResultAfter), len(readResultBefore), "expected to have more aggregators after syncing with orakl config") - - // cleanup - _, err = db.QueryRow[aggregator.AggregatorModel](context.Background(), "DELETE FROM aggregators;", nil) - if err != nil { - t.Fatalf("error cleaning up test: %v", err) - } -} - -func TestAggregatorAddFromOraklConfig(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResultBefore, err := GetRequest[[]aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", nil) - if err != nil { - t.Fatalf("error getting aggregators before: %v", err) - } - - result, err := PostRequest[aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator/sync/config/ADA-USDT", nil) - if err != nil { - t.Fatalf("error adding aggregator from orakl config: %v", err) - } - - assert.Equal(t, result.Name, "ADA-USDT") - - readResultAfter, err := GetRequest[[]aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", nil) - if err != nil { - t.Fatalf("error getting aggregators after: %v", err) - } - - assert.Greaterf(t, len(readResultAfter), len(readResultBefore), "expected to have more aggregators after adding from orakl config") - - // cleanup - _, err = db.QueryRow[aggregator.AggregatorModel](context.Background(), "DELETE FROM aggregators;", nil) - if err != nil { - t.Fatalf("error cleaning up test: %v", err) - } -} diff --git a/node/pkg/admin/tests/config_test.go b/node/pkg/admin/tests/config_test.go new file mode 100644 index 000000000..3af1c8ce9 --- /dev/null +++ b/node/pkg/admin/tests/config_test.go @@ -0,0 +1,44 @@ +package tests + +import ( + "context" + "testing" + + "bisonai.com/orakl/node/pkg/admin/config" + "github.com/stretchr/testify/assert" +) + +func TestConfigSync(t *testing.T) { + ctx := context.Background() + cleanup, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer cleanup() + + _, err = RawPostRequest(testItems.app, "/api/v1/config/sync", nil) + if err != nil { + t.Fatalf("error syncing config: %v", err) + } + + readResult, err := GetRequest[[]config.ConfigModel](testItems.app, "/api/v1/config", nil) + if err != nil { + t.Fatalf("error getting config: %v", err) + } + assert.Greater(t, len(readResult), 1) +} + +func TestConfigRead(t *testing.T) { + ctx := context.Background() + cleanup, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer cleanup() + + readResult, err := GetRequest[[]config.ConfigModel](testItems.app, "/api/v1/config", nil) + if err != nil { + t.Fatalf("error getting config: %v", err) + } + assert.Greater(t, len(readResult), 0) +} diff --git a/node/pkg/admin/tests/feed_test.go b/node/pkg/admin/tests/feed_test.go index 45b9de6b0..39de9b351 100644 --- a/node/pkg/admin/tests/feed_test.go +++ b/node/pkg/admin/tests/feed_test.go @@ -33,7 +33,7 @@ func TestFeedGetByAdapterId(t *testing.T) { } defer cleanup() - readResult, err := GetRequest[[]feed.FeedModel](testItems.app, "/api/v1/feed/adapter/"+strconv.FormatInt(*testItems.tmpData.adapter.Id, 10), nil) + readResult, err := GetRequest[[]feed.FeedModel](testItems.app, "/api/v1/feed/config/"+strconv.FormatInt(testItems.tmpData.config.Id, 10), nil) if err != nil { t.Fatalf("error getting feeds: %v", err) } diff --git a/node/pkg/admin/tests/main_test.go b/node/pkg/admin/tests/main_test.go index 5762053e1..800f92069 100644 --- a/node/pkg/admin/tests/main_test.go +++ b/node/pkg/admin/tests/main_test.go @@ -2,18 +2,16 @@ package tests import ( "context" - "database/sql" "os" "testing" - "bisonai.com/orakl/node/pkg/admin/adapter" "bisonai.com/orakl/node/pkg/admin/aggregator" + "bisonai.com/orakl/node/pkg/admin/config" "bisonai.com/orakl/node/pkg/admin/feed" "bisonai.com/orakl/node/pkg/admin/fetcher" "bisonai.com/orakl/node/pkg/admin/providerUrl" "bisonai.com/orakl/node/pkg/admin/proxy" "bisonai.com/orakl/node/pkg/admin/reporter" - "bisonai.com/orakl/node/pkg/admin/submissionAddress" "bisonai.com/orakl/node/pkg/admin/utils" "bisonai.com/orakl/node/pkg/admin/wallet" "bisonai.com/orakl/node/pkg/bus" @@ -28,13 +26,11 @@ type TestItems struct { } type TmpData struct { - aggregator aggregator.AggregatorModel - adapter adapter.AdapterModel - submissionAddress submissionAddress.SubmissionAddressModel - feed feed.FeedModel - proxy proxy.ProxyModel - wallet wallet.WalletModel - providerUrl providerUrl.ProviderUrlModel + config config.ConfigModel + feed feed.FeedModel + proxy proxy.ProxyModel + wallet wallet.WalletModel + providerUrl providerUrl.ProviderUrlModel } func setup(ctx context.Context) (func() error, *TestItems, error) { @@ -63,34 +59,26 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { v1 := app.Group("/api/v1") aggregator.Routes(v1) - adapter.Routes(v1) feed.Routes(v1) fetcher.Routes(v1) proxy.Routes(v1) wallet.Routes(v1) reporter.Routes(v1) - submissionAddress.Routes(v1) providerUrl.Routes(v1) - + config.Routes(v1) return adminCleanup(testItems), testItems, nil } func insertSampleData(ctx context.Context) (*TmpData, error) { var tmpData = new(TmpData) - tmpAdapter, err := db.QueryRow[adapter.AdapterModel](ctx, adapter.InsertAdapter, map[string]any{"name": "test_adapter"}) - if err != nil { - return nil, err - } - tmpData.adapter = tmpAdapter - - tmpAggregator, err := db.QueryRow[aggregator.AggregatorModel](ctx, aggregator.InsertAggregator, map[string]any{"name": "test_aggregator"}) + tmpConfig, err := db.QueryRow[config.ConfigModel](ctx, "INSERT INTO configs (name, address, fetch_interval, aggregate_interval, submit_interval) RETURNING *", map[string]any{"name": "test_config", "address": "test_address", "fetch_interval": 1, "aggregate_interval": 1, "submit_interval": 1}) if err != nil { return nil, err } - tmpData.aggregator = tmpAggregator + tmpData.config = tmpConfig - tmpFeed, err := db.QueryRow[feed.FeedModel](ctx, adapter.InsertFeed, map[string]any{"name": "test_feed", "adapter_id": tmpAdapter.Id, "definition": `{"test": "test"}`}) + tmpFeed, err := db.QueryRow[feed.FeedModel](ctx, "INSERT INTO feeds (name, config_id, definition)", map[string]any{"name": "test_feed", "config_id": tmpConfig.Id, "definition": `{"test": "test"}`}) if err != nil { return nil, err } @@ -108,12 +96,6 @@ func insertSampleData(ctx context.Context) (*TmpData, error) { } tmpData.wallet = tmpWallet - tmpSubmissionAddress, err := db.QueryRow[submissionAddress.SubmissionAddressModel](ctx, submissionAddress.InsertSubmissionAddress, map[string]any{"name": "test_submission_address", "address": "test_address", "interval": sql.NullInt32{Valid: false}}) - if err != nil { - return nil, err - } - tmpData.submissionAddress = tmpSubmissionAddress - tmpProviderUrl, err := db.QueryRow[providerUrl.ProviderUrlModel](ctx, providerUrl.InsertProviderUrl, map[string]any{"chain_id": 1, "url": "test_url", "priority": 1}) if err != nil { return nil, err @@ -129,27 +111,18 @@ func adminCleanup(testItems *TestItems) func() error { if err != nil { return err } - _, err = db.QueryRow[adapter.AdapterModel](context.Background(), adapter.DeleteAdapterById, map[string]any{"id": testItems.tmpData.adapter.Id}) - if err != nil { - return err - } - - _, err = db.QueryRow[aggregator.AggregatorModel](context.Background(), aggregator.DeleteAggregatorById, map[string]any{"id": testItems.tmpData.aggregator.Id}) - if err != nil { - return err - } _, err = db.QueryRow[proxy.ProxyModel](context.Background(), proxy.DeleteProxyById, map[string]any{"id": testItems.tmpData.proxy.Id}) if err != nil { return err } - _, err = db.QueryRow[submissionAddress.SubmissionAddressModel](context.Background(), submissionAddress.DeleteSubmissionAddressById, map[string]any{"id": testItems.tmpData.submissionAddress.Id}) + _, err = db.QueryRow[providerUrl.ProviderUrlModel](context.Background(), providerUrl.DeleteProviderUrlById, map[string]any{"id": testItems.tmpData.providerUrl.Id}) if err != nil { return err } - _, err = db.QueryRow[providerUrl.ProviderUrlModel](context.Background(), providerUrl.DeleteProviderUrlById, map[string]any{"id": testItems.tmpData.providerUrl.Id}) + err = db.QueryWithoutResult(context.Background(), "DELETE FROM configs", nil) if err != nil { return err } diff --git a/node/pkg/admin/tests/submissionAddress_test.go b/node/pkg/admin/tests/submissionAddress_test.go deleted file mode 100644 index caf2aac1c..000000000 --- a/node/pkg/admin/tests/submissionAddress_test.go +++ /dev/null @@ -1,243 +0,0 @@ -//nolint:all -package tests - -import ( - "context" - "strconv" - "testing" - - "bisonai.com/orakl/node/pkg/admin/aggregator" - "bisonai.com/orakl/node/pkg/admin/submissionAddress" - "bisonai.com/orakl/node/pkg/db" - "github.com/stretchr/testify/assert" -) - -func TestSubmissionAddressSync(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResultBefore, err := GetRequest[[]submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", nil) - if err != nil { - t.Fatalf("error getting submission addresses before: %v", err) - } - - _, err = RawPostRequest(testItems.app, "/api/v1/submission-address/sync/config", nil) - if err != nil { - t.Fatalf("error syncing submission addresses: %v", err) - } - - readResultAfter, err := GetRequest[[]submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", nil) - if err != nil { - t.Fatalf("error getting submission addresses after: %v", err) - } - - assert.Greaterf(t, len(readResultAfter), len(readResultBefore), "expected to have more submission addresses after syncing") - - //cleanup - err = db.QueryWithoutResult(ctx, "DELETE FROM submission_addresses;", nil) - if err != nil { - t.Fatalf("error cleaning up test: %v", err) - } -} - -func TestSubmissionAddressInsert(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - mockSubmissionAddress1 := submissionAddress.SubmissionAddressInsertModel{ - Name: "test_submission_address_2", - Address: "test_submission_address_2", - } - - readResultBefore, err := GetRequest[[]submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", nil) - if err != nil { - t.Fatalf("error getting submission addresses before: %v", err) - } - - insertResult, err := PostRequest[submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", mockSubmissionAddress1) - if err != nil { - t.Fatalf("error inserting submission address: %v", err) - } - assert.Equal(t, insertResult.Name, mockSubmissionAddress1.Name) - - readResultAfter, err := GetRequest[[]submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", nil) - if err != nil { - t.Fatalf("error getting submission addresses after: %v", err) - } - - assert.Greaterf(t, len(readResultAfter), len(readResultBefore), "expected to have more submission addresses after insertion") - - //cleanup - err = db.QueryWithoutResult(ctx, "DELETE FROM submission_addresses WHERE id = @id;", map[string]interface{}{"id": insertResult.Id}) - if err != nil { - t.Fatalf("error cleaning up test: %v", err) - } -} - -func TestSubmissionAddressGet(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResult, err := GetRequest[[]submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", nil) - if err != nil { - t.Fatalf("error getting submission addresses: %v", err) - } - assert.Greater(t, len(readResult), 0) -} - -func TestSubmissionAddressGetById(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResult, err := GetRequest[submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address/"+strconv.FormatInt(*testItems.tmpData.submissionAddress.Id, 10), nil) - if err != nil { - t.Fatalf("error getting submission address by id: %v", err) - } - assert.Equal(t, readResult.Id, testItems.tmpData.submissionAddress.Id) -} - -func TestSubmissionAddressDeleteById(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResultBefore, err := GetRequest[[]submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", nil) - if err != nil { - t.Fatalf("error getting submission addresses before: %v", err) - } - - result, err := DeleteRequest[submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address/"+strconv.FormatInt(*testItems.tmpData.submissionAddress.Id, 10), nil) - if err != nil { - t.Fatalf("error deleting submission address by id: %v", err) - } - - assert.Equal(t, result.Id, testItems.tmpData.submissionAddress.Id) - - readResultAfter, err := GetRequest[[]submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", nil) - if err != nil { - t.Fatalf("error getting submission addresses after: %v", err) - } - - assert.Lessf(t, len(readResultAfter), len(readResultBefore), "expected to have less submission addresses after deletion") -} - -func TestSubmissionAddressUpdateById(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - mockSubmissionAddress1 := submissionAddress.SubmissionAddressInsertModel{ - Name: "test_submission_address_2", - Address: "test_submission_address_2", - } - - updateResult, err := PatchRequest[submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address/"+strconv.FormatInt(*testItems.tmpData.submissionAddress.Id, 10), mockSubmissionAddress1) - if err != nil { - t.Fatalf("error updating submission address by id: %v", err) - } - - assert.Equal(t, updateResult.Name, mockSubmissionAddress1.Name) - assert.Equal(t, updateResult.Address, mockSubmissionAddress1.Address) - - readResult, err := GetRequest[submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address/"+strconv.FormatInt(*testItems.tmpData.submissionAddress.Id, 10), nil) - if err != nil { - t.Fatalf("error getting submission address by id: %v", err) - } - - assert.Equal(t, readResult.Name, mockSubmissionAddress1.Name) - assert.Equal(t, readResult.Address, mockSubmissionAddress1.Address) -} - -func TestSubmissionAddressSyncWithAggregator(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - // first add an aggregator into table - mockAggregator := aggregator.AggregatorInsertModel{ - Name: "ADA-USDT", - } - tmpAggregatorInsertResult, err := PostRequest[aggregator.AggregatorModel](testItems.app, "/api/v1/aggregator", mockAggregator) - if err != nil { - t.Fatalf("error inserting aggregator: %v", err) - } - - readResultBefore, err := GetRequest[[]submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", nil) - if err != nil { - t.Fatalf("error getting submission addresses before: %v", err) - } - - _, err = RawPostRequest(testItems.app, "/api/v1/submission-address/sync/aggregator", nil) - if err != nil { - t.Fatalf("error syncing submission addresses with aggregator") - } - - readResultAfter, err := GetRequest[[]submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", nil) - if err != nil { - t.Fatalf("error getting submission addresses after: %v", err) - } - - assert.Greaterf(t, len(readResultAfter), len(readResultBefore), "expected to have more submission addresses after syncing") - - //cleanup - err = db.QueryWithoutResult(ctx, "DELETE FROM aggregators WHERE id = @id;", map[string]any{"id": tmpAggregatorInsertResult.Id}) - if err != nil { - t.Fatalf("error cleaning up test: %v", err) - } - - err = db.QueryWithoutResult(ctx, "DELETE FROM submission_addresses;", nil) - if err != nil { - t.Fatalf("error cleaning up test: %v", err) - } -} - -func TestSubmissionAddressAddFromOraklConfig(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - readResultBefore, err := GetRequest[[]submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", nil) - if err != nil { - t.Fatalf("error getting submission addresses before: %v", err) - } - - _, err = RawPostRequest(testItems.app, "/api/v1/submission-address/sync/config/"+"ADA-USDT", nil) - if err != nil { - t.Fatalf("error adding submission address from orakl config: %v", err) - } - - readResultAfter, err := GetRequest[[]submissionAddress.SubmissionAddressModel](testItems.app, "/api/v1/submission-address", nil) - if err != nil { - t.Fatalf("error getting submission addresses after: %v", err) - } - - assert.Greaterf(t, len(readResultAfter), len(readResultBefore), "expected to have more submission addresses after adding from orakl config") -}