From 2036e1049f9eabe35ccc6a2c746839cf8e14235d Mon Sep 17 00:00:00 2001 From: nick Date: Thu, 22 Feb 2024 17:59:27 +0900 Subject: [PATCH] wip --- node/pkg/admin/adapter/controller.go | 25 ++++++++ node/pkg/admin/adapter/queries.go | 4 ++ node/pkg/admin/adapter/route.go | 5 ++ node/pkg/admin/fetcher/controller.go | 21 +++++++ node/pkg/admin/fetcher/route.go | 13 ++++ node/pkg/admin/tests/adapter_test.go | 60 +++++++++++++++++++ node/pkg/admin/tests/fetcher_test.go | 89 ++++++++++++++++++++++++++++ node/pkg/admin/tests/main_test.go | 9 ++- node/pkg/admin/tests/test_helper.go | 30 +++++++--- node/pkg/admin/utils/utils.go | 26 +++++++- node/pkg/bus/bus.go | 9 ++- node/pkg/bus/bus_test.go | 18 +++--- node/pkg/fetcher/main_test.go | 5 +- 13 files changed, 295 insertions(+), 19 deletions(-) create mode 100644 node/pkg/admin/fetcher/controller.go create mode 100644 node/pkg/admin/fetcher/route.go create mode 100644 node/pkg/admin/tests/fetcher_test.go diff --git a/node/pkg/admin/adapter/controller.go b/node/pkg/admin/adapter/controller.go index de596c3fe..a39515ce1 100644 --- a/node/pkg/admin/adapter/controller.go +++ b/node/pkg/admin/adapter/controller.go @@ -3,6 +3,7 @@ package adapter import ( "encoding/json" + "bisonai.com/orakl/node/pkg/admin/utils" "bisonai.com/orakl/node/pkg/db" "github.com/go-playground/validator" "github.com/gofiber/fiber/v2" @@ -112,3 +113,27 @@ func deleteById(c *fiber.Ctx) error { } return c.JSON(result) } + +func activate(c *fiber.Ctx) error { + id := c.Params("id") + result, err := db.QueryRow[AdapterModel](c.Context(), ActivateAdapter, map[string]any{"id": id}) + if err != nil { + panic(err) + } + + utils.SendMessage(c, "fetcher", "activate", map[string]any{"id": id}) + + return c.JSON(result) +} + +func deactivate(c *fiber.Ctx) error { + id := c.Params("id") + result, err := db.QueryRow[AdapterModel](c.Context(), DeactivateAdapter, map[string]any{"id": id}) + if err != nil { + panic(err) + } + + utils.SendMessage(c, "fetcher", "deactivate", map[string]any{"id": id}) + + return c.JSON(result) +} diff --git a/node/pkg/admin/adapter/queries.go b/node/pkg/admin/adapter/queries.go index 7551814f5..9fd1c43c0 100644 --- a/node/pkg/admin/adapter/queries.go +++ b/node/pkg/admin/adapter/queries.go @@ -12,4 +12,8 @@ const ( GetFeedsByAdapterId = `SELECT * FROM feeds WHERE adapter_id = @id;` DeleteAdapterById = `DELETE FROM adapters WHERE id = @id RETURNING *;` + + ActivateAdapter = `UPDATE adapters SET active = true WHERE id = @id RETURNING *;` + + DeactivateAdapter = `UPDATE adapters SET active = false WHERE id = @id RETURNING *;` ) diff --git a/node/pkg/admin/adapter/route.go b/node/pkg/admin/adapter/route.go index 4d2cdbe3d..fe0ccbc18 100644 --- a/node/pkg/admin/adapter/route.go +++ b/node/pkg/admin/adapter/route.go @@ -9,7 +9,12 @@ func Routes(router fiber.Router) { adapter.Post("", insert) adapter.Get("", get) + + adapter.Post("/activate/:id", activate) + adapter.Post("/deactivate/:id", deactivate) + adapter.Get("/detail/:id", getDetailById) adapter.Get("/:id", getById) adapter.Delete("/:id", deleteById) + } diff --git a/node/pkg/admin/fetcher/controller.go b/node/pkg/admin/fetcher/controller.go new file mode 100644 index 000000000..c4aeb87d2 --- /dev/null +++ b/node/pkg/admin/fetcher/controller.go @@ -0,0 +1,21 @@ +package fetcher + +import ( + "bisonai.com/orakl/node/pkg/admin/utils" + "github.com/gofiber/fiber/v2" +) + +func start(c *fiber.Ctx) error { + utils.SendMessage(c, "fetcher", "start", nil) + return c.SendString("fetcher started") +} + +func stop(c *fiber.Ctx) error { + utils.SendMessage(c, "fetcher", "stop", nil) + return c.SendString("fetcher stopped") +} + +func refresh(c *fiber.Ctx) error { + utils.SendMessage(c, "fetcher", "refresh", nil) + return c.SendString("fetcher refreshed") +} diff --git a/node/pkg/admin/fetcher/route.go b/node/pkg/admin/fetcher/route.go new file mode 100644 index 000000000..15a3ab0b6 --- /dev/null +++ b/node/pkg/admin/fetcher/route.go @@ -0,0 +1,13 @@ +package fetcher + +import ( + "github.com/gofiber/fiber/v2" +) + +func Routes(router fiber.Router) { + fetcher := router.Group("/fetcher") + + fetcher.Post("/start", start) + fetcher.Post("/stop", stop) + fetcher.Post("/refresh", refresh) +} diff --git a/node/pkg/admin/tests/adapter_test.go b/node/pkg/admin/tests/adapter_test.go index ccb0cf333..cf21c12f7 100644 --- a/node/pkg/admin/tests/adapter_test.go +++ b/node/pkg/admin/tests/adapter_test.go @@ -128,3 +128,63 @@ func TestAdapterDeleteById(t *testing.T) { assert.Lessf(t, len(readResultAfter), len(readResultBefore), "expected to have less adapters after deletion") } + +func TestAdapterDeactivate(t *testing.T) { + app, err := setup() + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer cleanup() + defer app.Shutdown() + + channel := appBus.Subscribe("fetcher", 10) + + deactivateResult, err := PostRequest[adapter.AdapterModel](app, "/api/v1/adapter/deactivate/"+strconv.FormatInt(*insertedAdapter.Id, 10), nil) + if err != nil { + t.Fatalf("error deactivating adapter: %v", err) + } + assert.False(t, deactivateResult.Active) + + select { + case msg := <-channel: + if msg.From != "admin" || msg.To != "fetcher" || msg.Content.Command != "deactivate" { + t.Errorf("Message did not match expected. Got %v", msg) + } + default: + t.Errorf("No message received on channel") + } +} + +func TestAdapterActivate(t *testing.T) { + app, err := setup() + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer cleanup() + defer app.Shutdown() + + channel := appBus.Subscribe("fetcher", 10) + + //first deactivate before activate + _, err = PostRequest[adapter.AdapterModel](app, "/api/v1/adapter/deactivate/"+strconv.FormatInt(*insertedAdapter.Id, 10), nil) + if err != nil { + t.Fatalf("error deactivating adapter: %v", err) + } + <-channel + + // activate + activateResult, err := PostRequest[adapter.AdapterModel](app, "/api/v1/adapter/activate/"+strconv.FormatInt(*insertedAdapter.Id, 10), nil) + if err != nil { + t.Fatalf("error activating adapter: %v", err) + } + assert.True(t, activateResult.Active) + + select { + case msg := <-channel: + if msg.From != "admin" || msg.To != "fetcher" || msg.Content.Command != "activate" { + t.Errorf("Message did not match expected. Got %v", msg) + } + default: + t.Errorf("No message received on channel") + } +} diff --git a/node/pkg/admin/tests/fetcher_test.go b/node/pkg/admin/tests/fetcher_test.go new file mode 100644 index 000000000..1128ab626 --- /dev/null +++ b/node/pkg/admin/tests/fetcher_test.go @@ -0,0 +1,89 @@ +//nolint:all +package tests + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFetcherStart(t *testing.T) { + app, err := setup() + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer cleanup() + defer app.Shutdown() + + channel := appBus.Subscribe("fetcher", 10) + + result, err := RawPostRequest(app, "/api/v1/fetcher/start", nil) + if err != nil { + t.Fatalf("error starting fetcher: %v", err) + } + + assert.Equal(t, string(result), "fetcher started") + + select { + case msg := <-channel: + if msg.From != "admin" || msg.To != "fetcher" || msg.Content.Command != "start" { + t.Fatalf("unexpected message: %v", msg) + } + default: + t.Fatalf("no message received on channel") + } +} + +func TestFetcherStop(t *testing.T) { + app, err := setup() + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer cleanup() + defer app.Shutdown() + + channel := appBus.Subscribe("fetcher", 10) + + result, err := RawPostRequest(app, "/api/v1/fetcher/stop", nil) + if err != nil { + t.Fatalf("error stopping fetcher: %v", err) + } + + assert.Equal(t, string(result), "fetcher stopped") + + select { + case msg := <-channel: + if msg.From != "admin" || msg.To != "fetcher" || msg.Content.Command != "stop" { + t.Fatalf("unexpected message: %v", msg) + } + default: + t.Fatalf("no message received on channel") + } +} + +func TestFetcherRefresh(t *testing.T) { + app, err := setup() + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer cleanup() + defer app.Shutdown() + + channel := appBus.Subscribe("fetcher", 10) + + result, err := RawPostRequest(app, "/api/v1/fetcher/refresh", nil) + if err != nil { + t.Fatalf("error refreshing fetcher: %v", err) + } + + assert.Equal(t, string(result), "fetcher refreshed") + + select { + case msg := <-channel: + if msg.From != "admin" || msg.To != "fetcher" || msg.Content.Command != "refresh" { + t.Fatalf("unexpected message: %v", msg) + } + default: + t.Fatalf("no message received on channel") + } +} diff --git a/node/pkg/admin/tests/main_test.go b/node/pkg/admin/tests/main_test.go index 2f578ffbb..3e03bab80 100644 --- a/node/pkg/admin/tests/main_test.go +++ b/node/pkg/admin/tests/main_test.go @@ -7,16 +7,22 @@ import ( "bisonai.com/orakl/node/pkg/admin/adapter" "bisonai.com/orakl/node/pkg/admin/feed" + "bisonai.com/orakl/node/pkg/admin/fetcher" "bisonai.com/orakl/node/pkg/admin/utils" + "bisonai.com/orakl/node/pkg/bus" "bisonai.com/orakl/node/pkg/db" "github.com/gofiber/fiber/v2" ) var insertedAdapter adapter.AdapterModel var insertedFeed feed.FeedModel +var appBus *bus.MessageBus func setup() (*fiber.App, error) { - app, err := utils.Setup("") + _bus := bus.NewMessageBus() + appBus = _bus + + app, err := utils.Setup("", appBus) if err != nil { return nil, err } @@ -27,6 +33,7 @@ func setup() (*fiber.App, error) { v1 := app.Group("/api/v1") adapter.Routes(v1) feed.Routes(v1) + fetcher.Routes(v1) return app, nil } diff --git a/node/pkg/admin/tests/test_helper.go b/node/pkg/admin/tests/test_helper.go index 0fec97bfc..53ce796f0 100644 --- a/node/pkg/admin/tests/test_helper.go +++ b/node/pkg/admin/tests/test_helper.go @@ -13,6 +13,24 @@ import ( func req[T any](app *fiber.App, method string, endpoint string, requestBody interface{}) (T, error) { var result T + + resultBody, err := rawReq(app, method, endpoint, requestBody) + if err != nil { + fmt.Println("failed to raw request:", err) + return result, err + } + + err = json.Unmarshal(resultBody, &result) + if err != nil { + fmt.Println("failed Unmarshal result body:" + string(resultBody)) + return result, err + } + + return result, nil +} + +func rawReq(app *fiber.App, method string, endpoint string, requestBody interface{}) ([]byte, error) { + var result []byte var body io.Reader if requestBody != nil { @@ -48,13 +66,7 @@ func req[T any](app *fiber.App, method string, endpoint string, requestBody inte return result, err } - err = json.Unmarshal(resultBody, &result) - if err != nil { - fmt.Println("failed Unmarshal result body:" + string(resultBody)) - return result, err - } - - return result, nil + return resultBody, nil } func GetRequest[T any](app *fiber.App, endpoint string, requestBody interface{}) (T, error) { @@ -65,6 +77,10 @@ func PostRequest[T any](app *fiber.App, endpoint string, requestBody interface{} return req[T](app, "POST", endpoint, requestBody) } +func RawPostRequest(app *fiber.App, endpoint string, requestBody interface{}) ([]byte, error) { + return rawReq(app, "POST", endpoint, requestBody) +} + func PatchRequest[T any](app *fiber.App, endpoint string, requestBody interface{}) (T, error) { return req[T](app, "PATCH", endpoint, requestBody) } diff --git a/node/pkg/admin/utils/utils.go b/node/pkg/admin/utils/utils.go index 1ac036a5c..7056a7944 100644 --- a/node/pkg/admin/utils/utils.go +++ b/node/pkg/admin/utils/utils.go @@ -8,15 +8,17 @@ import ( "runtime/debug" "strings" + "bisonai.com/orakl/node/pkg/bus" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/recover" ) -func Setup(version string) (*fiber.App, error) { +func Setup(version string, bus *bus.MessageBus) (*fiber.App, error) { if version == "" { version = "test" } + app := fiber.New(fiber.Config{ AppName: "Node API " + version, EnablePrintRoutes: true, @@ -32,6 +34,11 @@ func Setup(version string) (*fiber.App, error) { app.Use(cors.New()) + app.Use(func(c *fiber.Ctx) error { + c.Locals("bus", bus) + return c.Next() + }) + return app, nil } @@ -71,3 +78,20 @@ func CustomStackTraceHandler(_ *fiber.Ctx, e interface{}) { log.Printf("| (%s) panic: %v \n", failPoint, e) _, _ = os.Stderr.WriteString(fmt.Sprintf("%s\n", debug.Stack())) //nolint:errcheck // This will never fail } + +func SendMessage(c *fiber.Ctx, to string, command string, args map[string]interface{}) error { + messageBus, ok := c.Locals("bus").(*bus.MessageBus) + if !ok { + return errors.New("bus is not found, failed to message fetcher") + } + msg := bus.Message{ + From: "admin", + To: to, + Content: bus.MessageContent{ + Command: command, + Args: args, + }, + } + messageBus.Publish(msg) + return nil +} diff --git a/node/pkg/bus/bus.go b/node/pkg/bus/bus.go index f77ea1e64..fabf9878b 100644 --- a/node/pkg/bus/bus.go +++ b/node/pkg/bus/bus.go @@ -5,7 +5,12 @@ package bus type Message struct { From string To string - Content interface{} + Content MessageContent +} + +type MessageContent struct { + Command string + Args map[string]interface{} } type MessageBus struct { @@ -21,7 +26,7 @@ func NewMessageBus() *MessageBus { func (mb *MessageBus) Subscribe(id string, buffer int) <-chan Message { ch := make(chan Message, buffer) mb.channels[id] = ch - return ch + return mb.channels[id] } func (mb *MessageBus) Publish(msg Message) { diff --git a/node/pkg/bus/bus_test.go b/node/pkg/bus/bus_test.go index 85dc1f15b..499f5f397 100644 --- a/node/pkg/bus/bus_test.go +++ b/node/pkg/bus/bus_test.go @@ -5,24 +5,28 @@ import ( ) func TestSubscribeAndPublish(t *testing.T) { - bus := NewMessageBus() + messageBus := NewMessageBus() // Test Subscribe - channel := bus.Subscribe("test", 10) + channel := messageBus.Subscribe("test", 10) // Test Publish - bus.Publish(Message{ - From: "testFrom", - To: "test", - Content: "testContent", + messageBus.Publish(Message{ + From: "testFrom", + To: "test", + Content: MessageContent{ + Command: "testCommand", + Args: map[string]any{"testArg": "testArg"}, + }, }) select { case msg := <-channel: - if msg.From != "testFrom" || msg.To != "test" || msg.Content.(string) != "testContent" { + if msg.From != "testFrom" || msg.To != "test" || msg.Content.Command != "testCommand" { t.Errorf("Message did not match expected. Got %v", msg) } default: t.Errorf("No message received on channel") } + } diff --git a/node/pkg/fetcher/main_test.go b/node/pkg/fetcher/main_test.go index 25ce3ec8c..064b2cff0 100644 --- a/node/pkg/fetcher/main_test.go +++ b/node/pkg/fetcher/main_test.go @@ -10,6 +10,7 @@ import ( "bisonai.com/orakl/node/pkg/admin/adapter" "bisonai.com/orakl/node/pkg/admin/tests" "bisonai.com/orakl/node/pkg/admin/utils" + "bisonai.com/orakl/node/pkg/bus" "bisonai.com/orakl/node/pkg/db" "github.com/gofiber/fiber/v2" ) @@ -220,7 +221,9 @@ var sampleData = `{ var insertResult adapter.AdapterModel func setup() (*fiber.App, error) { - app, err := utils.Setup("") + messageBus := bus.NewMessageBus() + + app, err := utils.Setup("", messageBus) if err != nil { return nil, err }