Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Feb 22, 2024
1 parent 7d03c56 commit 2036e10
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 19 deletions.
25 changes: 25 additions & 0 deletions node/pkg/admin/adapter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package adapter
import (
"encoding/json"

"bisonai.com/orakl/node/pkg/admin/utils"
"bisonai.com/orakl/node/pkg/db"
"github.com/go-playground/validator"
"github.com/gofiber/fiber/v2"
Expand Down Expand Up @@ -112,3 +113,27 @@ func deleteById(c *fiber.Ctx) error {
}
return c.JSON(result)
}

func activate(c *fiber.Ctx) error {
id := c.Params("id")
result, err := db.QueryRow[AdapterModel](c.Context(), ActivateAdapter, map[string]any{"id": id})
if err != nil {
panic(err)
}

utils.SendMessage(c, "fetcher", "activate", map[string]any{"id": id})

Check failure on line 124 in node/pkg/admin/adapter/controller.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value of `utils.SendMessage` is not checked (errcheck)

return c.JSON(result)
}

func deactivate(c *fiber.Ctx) error {
id := c.Params("id")
result, err := db.QueryRow[AdapterModel](c.Context(), DeactivateAdapter, map[string]any{"id": id})
if err != nil {
panic(err)
}

utils.SendMessage(c, "fetcher", "deactivate", map[string]any{"id": id})

Check failure on line 136 in node/pkg/admin/adapter/controller.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value of `utils.SendMessage` is not checked (errcheck)

return c.JSON(result)
}
4 changes: 4 additions & 0 deletions node/pkg/admin/adapter/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ const (
GetFeedsByAdapterId = `SELECT * FROM feeds WHERE adapter_id = @id;`

DeleteAdapterById = `DELETE FROM adapters WHERE id = @id RETURNING *;`

ActivateAdapter = `UPDATE adapters SET active = true WHERE id = @id RETURNING *;`

DeactivateAdapter = `UPDATE adapters SET active = false WHERE id = @id RETURNING *;`
)
5 changes: 5 additions & 0 deletions node/pkg/admin/adapter/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ func Routes(router fiber.Router) {

adapter.Post("", insert)
adapter.Get("", get)

adapter.Post("/activate/:id", activate)
adapter.Post("/deactivate/:id", deactivate)

adapter.Get("/detail/:id", getDetailById)
adapter.Get("/:id", getById)
adapter.Delete("/:id", deleteById)

}
21 changes: 21 additions & 0 deletions node/pkg/admin/fetcher/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package fetcher

import (
"bisonai.com/orakl/node/pkg/admin/utils"
"github.com/gofiber/fiber/v2"
)

func start(c *fiber.Ctx) error {
utils.SendMessage(c, "fetcher", "start", nil)

Check failure on line 9 in node/pkg/admin/fetcher/controller.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value of `utils.SendMessage` is not checked (errcheck)
return c.SendString("fetcher started")
}

func stop(c *fiber.Ctx) error {
utils.SendMessage(c, "fetcher", "stop", nil)
return c.SendString("fetcher stopped")
}

func refresh(c *fiber.Ctx) error {
utils.SendMessage(c, "fetcher", "refresh", nil)
return c.SendString("fetcher refreshed")
}
13 changes: 13 additions & 0 deletions node/pkg/admin/fetcher/route.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package fetcher

import (
"github.com/gofiber/fiber/v2"
)

func Routes(router fiber.Router) {
fetcher := router.Group("/fetcher")

fetcher.Post("/start", start)
fetcher.Post("/stop", stop)
fetcher.Post("/refresh", refresh)
}
60 changes: 60 additions & 0 deletions node/pkg/admin/tests/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,63 @@ func TestAdapterDeleteById(t *testing.T) {

assert.Lessf(t, len(readResultAfter), len(readResultBefore), "expected to have less adapters after deletion")
}

func TestAdapterDeactivate(t *testing.T) {
app, err := setup()
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer cleanup()
defer app.Shutdown()

channel := appBus.Subscribe("fetcher", 10)

deactivateResult, err := PostRequest[adapter.AdapterModel](app, "/api/v1/adapter/deactivate/"+strconv.FormatInt(*insertedAdapter.Id, 10), nil)
if err != nil {
t.Fatalf("error deactivating adapter: %v", err)
}
assert.False(t, deactivateResult.Active)

select {
case msg := <-channel:
if msg.From != "admin" || msg.To != "fetcher" || msg.Content.Command != "deactivate" {
t.Errorf("Message did not match expected. Got %v", msg)
}
default:
t.Errorf("No message received on channel")
}
}

func TestAdapterActivate(t *testing.T) {
app, err := setup()
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer cleanup()
defer app.Shutdown()

channel := appBus.Subscribe("fetcher", 10)

//first deactivate before activate
_, err = PostRequest[adapter.AdapterModel](app, "/api/v1/adapter/deactivate/"+strconv.FormatInt(*insertedAdapter.Id, 10), nil)
if err != nil {
t.Fatalf("error deactivating adapter: %v", err)
}
<-channel

// activate
activateResult, err := PostRequest[adapter.AdapterModel](app, "/api/v1/adapter/activate/"+strconv.FormatInt(*insertedAdapter.Id, 10), nil)
if err != nil {
t.Fatalf("error activating adapter: %v", err)
}
assert.True(t, activateResult.Active)

select {
case msg := <-channel:
if msg.From != "admin" || msg.To != "fetcher" || msg.Content.Command != "activate" {
t.Errorf("Message did not match expected. Got %v", msg)
}
default:
t.Errorf("No message received on channel")
}
}
89 changes: 89 additions & 0 deletions node/pkg/admin/tests/fetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//nolint:all
package tests

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestFetcherStart(t *testing.T) {
app, err := setup()
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer cleanup()
defer app.Shutdown()

channel := appBus.Subscribe("fetcher", 10)

result, err := RawPostRequest(app, "/api/v1/fetcher/start", nil)
if err != nil {
t.Fatalf("error starting fetcher: %v", err)
}

assert.Equal(t, string(result), "fetcher started")

select {
case msg := <-channel:
if msg.From != "admin" || msg.To != "fetcher" || msg.Content.Command != "start" {
t.Fatalf("unexpected message: %v", msg)
}
default:
t.Fatalf("no message received on channel")
}
}

func TestFetcherStop(t *testing.T) {
app, err := setup()
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer cleanup()
defer app.Shutdown()

channel := appBus.Subscribe("fetcher", 10)

result, err := RawPostRequest(app, "/api/v1/fetcher/stop", nil)
if err != nil {
t.Fatalf("error stopping fetcher: %v", err)
}

assert.Equal(t, string(result), "fetcher stopped")

select {
case msg := <-channel:
if msg.From != "admin" || msg.To != "fetcher" || msg.Content.Command != "stop" {
t.Fatalf("unexpected message: %v", msg)
}
default:
t.Fatalf("no message received on channel")
}
}

func TestFetcherRefresh(t *testing.T) {
app, err := setup()
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer cleanup()
defer app.Shutdown()

channel := appBus.Subscribe("fetcher", 10)

result, err := RawPostRequest(app, "/api/v1/fetcher/refresh", nil)
if err != nil {
t.Fatalf("error refreshing fetcher: %v", err)
}

assert.Equal(t, string(result), "fetcher refreshed")

select {
case msg := <-channel:
if msg.From != "admin" || msg.To != "fetcher" || msg.Content.Command != "refresh" {
t.Fatalf("unexpected message: %v", msg)
}
default:
t.Fatalf("no message received on channel")
}
}
9 changes: 8 additions & 1 deletion node/pkg/admin/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@ import (

"bisonai.com/orakl/node/pkg/admin/adapter"
"bisonai.com/orakl/node/pkg/admin/feed"
"bisonai.com/orakl/node/pkg/admin/fetcher"
"bisonai.com/orakl/node/pkg/admin/utils"
"bisonai.com/orakl/node/pkg/bus"
"bisonai.com/orakl/node/pkg/db"
"github.com/gofiber/fiber/v2"
)

var insertedAdapter adapter.AdapterModel
var insertedFeed feed.FeedModel
var appBus *bus.MessageBus

func setup() (*fiber.App, error) {
app, err := utils.Setup("")
_bus := bus.NewMessageBus()
appBus = _bus

app, err := utils.Setup("", appBus)
if err != nil {
return nil, err
}
Expand All @@ -27,6 +33,7 @@ func setup() (*fiber.App, error) {
v1 := app.Group("/api/v1")
adapter.Routes(v1)
feed.Routes(v1)
fetcher.Routes(v1)
return app, nil
}

Expand Down
30 changes: 23 additions & 7 deletions node/pkg/admin/tests/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,24 @@ import (

func req[T any](app *fiber.App, method string, endpoint string, requestBody interface{}) (T, error) {
var result T

resultBody, err := rawReq(app, method, endpoint, requestBody)
if err != nil {
fmt.Println("failed to raw request:", err)
return result, err
}

err = json.Unmarshal(resultBody, &result)
if err != nil {
fmt.Println("failed Unmarshal result body:" + string(resultBody))
return result, err
}

return result, nil
}

func rawReq(app *fiber.App, method string, endpoint string, requestBody interface{}) ([]byte, error) {
var result []byte
var body io.Reader

if requestBody != nil {
Expand Down Expand Up @@ -48,13 +66,7 @@ func req[T any](app *fiber.App, method string, endpoint string, requestBody inte
return result, err
}

err = json.Unmarshal(resultBody, &result)
if err != nil {
fmt.Println("failed Unmarshal result body:" + string(resultBody))
return result, err
}

return result, nil
return resultBody, nil
}

func GetRequest[T any](app *fiber.App, endpoint string, requestBody interface{}) (T, error) {
Expand All @@ -65,6 +77,10 @@ func PostRequest[T any](app *fiber.App, endpoint string, requestBody interface{}
return req[T](app, "POST", endpoint, requestBody)
}

func RawPostRequest(app *fiber.App, endpoint string, requestBody interface{}) ([]byte, error) {
return rawReq(app, "POST", endpoint, requestBody)
}

func PatchRequest[T any](app *fiber.App, endpoint string, requestBody interface{}) (T, error) {
return req[T](app, "PATCH", endpoint, requestBody)
}
Expand Down
26 changes: 25 additions & 1 deletion node/pkg/admin/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import (
"runtime/debug"
"strings"

"bisonai.com/orakl/node/pkg/bus"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/recover"
)

func Setup(version string) (*fiber.App, error) {
func Setup(version string, bus *bus.MessageBus) (*fiber.App, error) {
if version == "" {
version = "test"
}

app := fiber.New(fiber.Config{
AppName: "Node API " + version,
EnablePrintRoutes: true,
Expand All @@ -32,6 +34,11 @@ func Setup(version string) (*fiber.App, error) {

app.Use(cors.New())

app.Use(func(c *fiber.Ctx) error {
c.Locals("bus", bus)
return c.Next()
})

return app, nil

}
Expand Down Expand Up @@ -71,3 +78,20 @@ func CustomStackTraceHandler(_ *fiber.Ctx, e interface{}) {
log.Printf("| (%s) panic: %v \n", failPoint, e)
_, _ = os.Stderr.WriteString(fmt.Sprintf("%s\n", debug.Stack())) //nolint:errcheck // This will never fail
}

func SendMessage(c *fiber.Ctx, to string, command string, args map[string]interface{}) error {
messageBus, ok := c.Locals("bus").(*bus.MessageBus)
if !ok {
return errors.New("bus is not found, failed to message fetcher")
}
msg := bus.Message{
From: "admin",
To: to,
Content: bus.MessageContent{
Command: command,
Args: args,
},
}
messageBus.Publish(msg)
return nil
}
Loading

0 comments on commit 2036e10

Please sign in to comment.