Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(DAL) Remove global variable #1813

Merged
merged 2 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 41 additions & 16 deletions node/pkg/dal/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ import (
"github.com/rs/zerolog/log"
)

var ApiController Controller

func Setup(ctx context.Context, adminEndpoint string) error {
func Setup(ctx context.Context, adminEndpoint string) (*Controller, error) {
configs, err := request.Request[[]types.Config](request.WithEndpoint(adminEndpoint + "/config"))
if err != nil {
log.Error().Err(err).Msg("failed to get configs")
return err
return nil, err
}

configMap := make(map[string]types.Config)
Expand All @@ -31,11 +29,11 @@ func Setup(ctx context.Context, adminEndpoint string) error {
collector, err := collector.NewCollector(ctx, configs)
if err != nil {
log.Error().Err(err).Msg("failed to create collector")
return err
return nil, err
}

ApiController = *NewController(configMap, collector)
return nil
ApiController := NewController(configMap, collector)
return ApiController, nil
}

func NewController(configs map[string]types.Config, internalCollector *collector.Collector) *Controller {
Expand All @@ -52,6 +50,7 @@ func NewController(configs map[string]types.Config, internalCollector *collector

func (c *Controller) Start(ctx context.Context) {
go c.Collector.Start(ctx)
log.Info().Msg("api collector started")
go func() {
for {
select {
Expand Down Expand Up @@ -103,12 +102,23 @@ func (c *Controller) castSubmissionData(data *dalcommon.OutgoingSubmissionData,
}
}

func (c *Controller) handleWebsocket(conn *websocket.Conn) {
ctx := context.Background()
func HandleWebsocket(conn *websocket.Conn) {
c, ok := conn.Locals("apiController").(*Controller)
if !ok {
log.Error().Msg("api controller not found")
return
}

ctx, ok := conn.Locals("context").(*context.Context)
if !ok {
log.Error().Msg("ctx not found")
return
}

c.register <- conn
apiKey := conn.Headers("X-Api-Key")

id, err := stats.InsertWebsocketConnection(ctx, apiKey)
id, err := stats.InsertWebsocketConnection(*ctx, apiKey)
if err != nil {
log.Error().Err(err).Msg("failed to insert websocket connection")
return
Expand All @@ -118,7 +128,7 @@ func (c *Controller) handleWebsocket(conn *websocket.Conn) {
defer func() {
c.unregister <- conn
conn.Close()
err := stats.UpdateWebsocketConnection(ctx, id)
err = stats.UpdateWebsocketConnection(*ctx, id)
if err != nil {
log.Error().Err(err).Msg("failed to update websocket connection")
return
Expand All @@ -128,7 +138,7 @@ func (c *Controller) handleWebsocket(conn *websocket.Conn) {

for {
var msg Subscription
if err := conn.ReadJSON(&msg); err != nil {
if err = conn.ReadJSON(&msg); err != nil {
log.Error().Err(err).Msg("failed to read message")
return
}
Expand All @@ -144,7 +154,7 @@ func (c *Controller) handleWebsocket(conn *websocket.Conn) {
continue
}
c.clients[conn][symbol] = true
err = stats.InsertWebsocketSubscription(ctx, id, param)
err = stats.InsertWebsocketSubscription(*ctx, id, param)
if err != nil {
log.Error().Err(err).Msg("failed to insert websocket subscription")
}
Expand All @@ -154,19 +164,34 @@ func (c *Controller) handleWebsocket(conn *websocket.Conn) {
}

func getSymbols(c *fiber.Ctx) error {
controller, ok := c.Locals("apiController").(*Controller)
if !ok {
return errors.New("api controller not found")
}

result := []string{}
for key := range ApiController.configs {
for key := range controller.configs {
result = append(result, key)
}
return c.JSON(result)
}

func getLatestFeeds(c *fiber.Ctx) error {
result := ApiController.Collector.GetAllLatestData()
controller, ok := c.Locals("apiController").(*Controller)
if !ok {
return errors.New("api controller not found")
}

result := controller.Collector.GetAllLatestData()
return c.JSON(result)
}

func getLatestFeed(c *fiber.Ctx) error {
controller, ok := c.Locals("apiController").(*Controller)
if !ok {
return errors.New("api controller not found")
}

symbol := c.Params("symbol")

if symbol == "" {
Expand All @@ -180,7 +205,7 @@ func getLatestFeed(c *fiber.Ctx) error {
symbol = strings.ToUpper(symbol)
}

result, err := ApiController.Collector.GetLatestData(symbol)
result, err := controller.Collector.GetLatestData(symbol)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/dal/api/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ func Routes(router fiber.Router) {
api.Get("/symbols", getSymbols)
api.Get("/latest-data-feeds/all", getLatestFeeds)
api.Get("/latest-data-feeds/:symbol", getLatestFeed)
api.Get("/ws", websocket.New(ApiController.handleWebsocket))
api.Get("/ws", websocket.New(HandleWebsocket))
}
27 changes: 15 additions & 12 deletions node/pkg/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,46 @@ import (
"context"
"errors"
"os"
"time"

"bisonai.com/orakl/node/pkg/dal/api"
"bisonai.com/orakl/node/pkg/dal/utils/initializer"
"bisonai.com/orakl/node/pkg/dal/utils/keycache"

"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
)

func Run(ctx context.Context) error {
log.Debug().Msg("Starting DAL API server")
app, err := initializer.Setup(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to setup DAL API server")
return err
}
defer func() {
_ = app.Shutdown()
}()

keyCache := keycache.NewAPIKeyCache(1 * time.Hour)
keyCache.CleanupLoop(10 * time.Minute)
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved

adminEndpoint := os.Getenv("ORAKL_NODE_ADMIN_URL")
if adminEndpoint == "" {
return errors.New("ORAKL_NODE_ADMIN_URL is not set")
}

err = api.Setup(ctx, adminEndpoint)
controller, err := api.Setup(ctx, adminEndpoint)
if err != nil {
log.Error().Err(err).Msg("Failed to setup DAL API server")
return err
}
api.ApiController.Start(ctx)

log.Debug().Str("Player", "DAL API").Msg("DAL API collector started")
app, err := initializer.Setup(ctx, controller, keyCache)
if err != nil {
log.Error().Err(err).Msg("Failed to setup DAL API server")
return err
}
defer func() {
_ = app.Shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just out of curiosity, I've seen this pattern a lot in the code where the return value is assigned to a blank identifier, which is not used. Afaik the blank identifier is useful when a function returns multiple values and you are not interested in some of them. Is it still a good practice to use the blank identifier in this way?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so,
it didn't pass the linter when I didn't assign blank identifier, so it kind of became habit I think

}()

v1 := app.Group("/api/v1")
v1.Get("/", func(c *fiber.Ctx) error {
return c.SendString("Orakl Node DAL API")
})

api.Routes(v1)

port := os.Getenv("DAL_API_PORT")
Expand Down
1 change: 1 addition & 0 deletions node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (c *Collector) processIncomingData(ctx context.Context, data aggregator.Sub
log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to convert incoming data to outgoing data")
return
}

defer c.LatestData.Store(result.Symbol, result)
c.OutgoingStream[data.GlobalAggregate.ConfigID] <- *result
}
Expand Down
14 changes: 4 additions & 10 deletions node/pkg/dal/tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ func TestApiControllerRun(t *testing.T) {
}
}()

assert.False(t, testItems.Controller.Collector.IsRunning)

testItems.Controller.Start(ctx)
time.Sleep(10 * time.Millisecond)
assert.True(t, testItems.Controller.Collector.IsRunning)
}
Expand All @@ -44,7 +41,7 @@ func TestApiGetLatestAll(t *testing.T) {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()
testItems.Controller.Start(ctx)

go testItems.App.Listen(":8090")

sampleSubmissionData, err := generateSampleSubmissionData(
Expand All @@ -68,7 +65,7 @@ func TestApiGetLatestAll(t *testing.T) {
if err != nil {
t.Fatalf("error getting latest data: %v", err)
}
expected, err := testItems.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData)
expected, err := testItems.Controller.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData)
if err != nil {
t.Fatalf("error converting sample submission data to outgoing data: %v", err)
}
Expand All @@ -91,7 +88,6 @@ func TestShouldFailWithoutApiKey(t *testing.T) {
}
}()

testItems.Controller.Start(ctx)
go testItems.App.Listen(":8090")
resp, err := request.RequestRaw(request.WithEndpoint("http://localhost:8090/api/v1"))
if err != nil {
Expand Down Expand Up @@ -120,7 +116,6 @@ func TestApiGetLatest(t *testing.T) {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()
testItems.Controller.Start(ctx)
go testItems.App.Listen(":8090")

sampleSubmissionData, err := generateSampleSubmissionData(
Expand All @@ -145,7 +140,7 @@ func TestApiGetLatest(t *testing.T) {
if err != nil {
t.Fatalf("error getting latest data: %v", err)
}
expected, err := testItems.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData)
expected, err := testItems.Controller.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData)
if err != nil {
t.Fatalf("error converting sample submission data to outgoing data: %v", err)
}
Expand All @@ -166,7 +161,6 @@ func TestApiWebsocket(t *testing.T) {

headers := map[string]string{"X-API-Key": testItems.ApiKey}

testItems.Controller.Start(ctx)
go testItems.App.Listen(":8090")

conn, err := wss.NewWebsocketHelper(ctx, wss.WithEndpoint("ws://localhost:8090/api/v1/dal/ws"), wss.WithRequestHeaders(headers))
Expand Down Expand Up @@ -206,7 +200,7 @@ func TestApiWebsocket(t *testing.T) {
ch := make(chan any)
go conn.Read(ctx, ch)

expected, err := testItems.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData)
expected, err := testItems.Controller.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData)
if err != nil {
t.Fatalf("error converting sample submission data to outgoing data: %v", err)
}
Expand Down
67 changes: 49 additions & 18 deletions node/pkg/dal/tests/collector_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
//nolint:all

package test

import (
"context"
"testing"
"time"

"bisonai.com/orakl/node/pkg/dal/api"
"bisonai.com/orakl/node/pkg/dal/common"
wsfcommon "bisonai.com/orakl/node/pkg/websocketfetcher/common"
"bisonai.com/orakl/node/pkg/wss"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
)

Expand All @@ -22,13 +26,12 @@ func TestCollectorStartAndStop(t *testing.T) {
}
}()

collector := testItems.Collector
collector.Start(ctx)
time.Sleep(10 * time.Millisecond)

collector := testItems.Controller.Collector
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
assert.True(t, collector.IsRunning)

assert.Greater(t, len(collector.Symbols), 0)
assert.Greater(t, len(collector.Symbols), 0)

collector.Stop()
assert.False(t, collector.IsRunning)
}
Expand All @@ -44,10 +47,32 @@ func TestCollectorStream(t *testing.T) {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()
go testItems.App.Listen(":8090")

collector := testItems.Collector
collector.Start(ctx)
time.Sleep(10 * time.Millisecond)
time.Sleep(20 * time.Millisecond)

collector := testItems.Controller.Collector
assert.Greater(t, len(collector.Symbols), 0)
assert.True(t, collector.IsRunning)

headers := map[string]string{"X-API-Key": testItems.ApiKey}
conn, err := wss.NewWebsocketHelper(ctx, wss.WithEndpoint("ws://localhost:8090/api/v1/dal/ws"), wss.WithRequestHeaders(headers))
if err != nil {
t.Fatalf("error creating websocket helper: %v", err)
}

err = conn.Dial(ctx)
if err != nil {
t.Fatalf("error dialing websocket: %v", err)
}

err = conn.Write(ctx, api.Subscription{
Method: "SUBSCRIBE",
Params: []string{"submission@test-aggregate"},
})
if err != nil {
t.Fatalf("error subscribing to websocket: %v", err)
}

sampleSubmissionData, err := generateSampleSubmissionData(
testItems.TmpConfig.ID,
Expand All @@ -56,28 +81,34 @@ func TestCollectorStream(t *testing.T) {
1,
"test-aggregate",
)

if err != nil {
t.Fatalf("error generating sample submission data: %v", err)
}

log.Info().Msg("Publishing data")
err = testPublishData(ctx, *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing data: %v", err)
}
log.Info().Int32("configId", sampleSubmissionData.GlobalAggregate.ConfigID).Msg("Published data")

time.Sleep(10 * time.Millisecond)
ch := make(chan any)
go conn.Read(ctx, ch)

expected, err := testItems.Controller.Collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData)
if err != nil {
t.Fatalf("error converting sample submission data to outgoing data: %v", err)
}

expected, err := collector.IncomingDataToOutgoingData(ctx, *sampleSubmissionData)
sample := <-ch
result, err := wsfcommon.MessageToStruct[common.OutgoingSubmissionData](sample.(map[string]any))
if err != nil {
t.Fatalf("error converting incoming data to outgoing data: %v", err)
t.Fatalf("error converting sample to struct: %v", err)
}
assert.Equal(t, *expected, result)

select {
case sample := <-collector.OutgoingStream[testItems.TmpConfig.ID]:
assert.NotEqual(t, nil, sample)
assert.Equal(t, *expected, sample)
default:
t.Fatalf("no data received")
err = conn.Close()
if err != nil {
t.Fatalf("error closing websocket: %v", err)
}
}
Loading