Skip to content

Commit

Permalink
feat: store detailed usages
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Jul 11, 2024
1 parent e697827 commit 18d830e
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 40 deletions.
10 changes: 0 additions & 10 deletions node/cmd/dal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package main

import (
"context"
"os"
"os/signal"
"syscall"

"bisonai.com/orakl/node/pkg/dal"
"github.com/rs/zerolog/log"
Expand All @@ -14,13 +11,6 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
cancel()
}()

err := dal.Run(ctx)
if err != nil {
log.Fatal().Err(err).Msg("Failed to start DAL")
Expand Down
5 changes: 5 additions & 0 deletions node/migrations/dal/000004_remove_log.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS logs (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
message TEXT NOT NULL
)
1 change: 1 addition & 0 deletions node/migrations/dal/000004_remove_log.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS logs
54 changes: 25 additions & 29 deletions node/pkg/dal/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"bisonai.com/orakl/node/pkg/common/types"
"bisonai.com/orakl/node/pkg/dal/collector"
dalcommon "bisonai.com/orakl/node/pkg/dal/common"
"bisonai.com/orakl/node/pkg/db"
"bisonai.com/orakl/node/pkg/dal/utils/stats"
"bisonai.com/orakl/node/pkg/utils/request"
"github.com/gofiber/contrib/websocket"
"github.com/gofiber/fiber/v2"
Expand Down Expand Up @@ -103,20 +103,30 @@ func (c *Controller) castSubmissionData(data *dalcommon.OutgoingSubmissionData,
}
}

func (c *Controller) handleWebsocket(conn *websocket.Conn) {
func (c *Controller) handleWebsocket(ctx context.Context, conn *websocket.Conn) {
c.register <- conn
_ = db.QueryWithoutResult(
context.Background(),
"INSERT INTO logs (message) VALUES (@message);",
map[string]any{"message": "websocket connected from " + conn.IP()})
apiKey := conn.Headers("X-Api-Key")
if apiKey == "" {
log.Error().Msg("X-Api-Key header not found")
return
}

id, err := stats.InsertWebsocketConnection(ctx, apiKey)
if err != nil {
log.Error().Err(err).Msg("failed to insert websocket connection")
return
}
log.Info().Int32("id", id).Msg("inserted websocket connection")

defer func() {
c.unregister <- conn
conn.Close()
_ = db.QueryWithoutResult(
context.Background(),
"INSERT INTO logs (message) VALUES (@message);",
map[string]any{"message": "websocket disconnected from " + conn.IP()})
err := stats.UpdateWebsocketConnection(ctx, id)
if err != nil {
log.Error().Err(err).Msg("failed to update websocket connection")
return
}
log.Info().Int32("id", id).Msg("updated websocket connection")
}()

for {
Expand All @@ -127,15 +137,15 @@ func (c *Controller) handleWebsocket(conn *websocket.Conn) {
}

if msg.Method == "SUBSCRIBE" {
_ = db.QueryWithoutResult(
context.Background(),
"INSERT INTO logs (message) VALUES (@message);",
map[string]any{"message": "websocket subscribed(" + strings.Join(msg.Params, ",") + ") from " + conn.IP()},
)

if c.clients[conn] == nil {
c.clients[conn] = make(map[string]bool)
}
for _, param := range msg.Params {
err = stats.InsertWebsocketSubscription(ctx, id, param)
if err != nil {
log.Error().Err(err).Msg("failed to insert websocket subscription")
}
symbol := strings.TrimPrefix(param, "submission@")
if _, ok := c.configs[symbol]; !ok {
continue
Expand All @@ -155,13 +165,6 @@ func getSymbols(c *fiber.Ctx) error {
}

func getLatestFeeds(c *fiber.Ctx) error {
defer func() {
_ = db.QueryWithoutResult(
c.Context(),
"INSERT INTO logs (message) VALUES (@message);",
map[string]any{"message": "getLatestFeeds called from " + c.IP()})
}()

result := ApiController.Collector.GetAllLatestData()
return c.JSON(result)
}
Expand All @@ -176,13 +179,6 @@ func getLatestFeed(c *fiber.Ctx) error {
return errors.New("symbol should be in {BASE}-{QUOTE} format")
}

defer func() {
_ = db.QueryWithoutResult(
c.Context(),
"INSERT INTO logs (message) VALUES (@message);",
map[string]any{"message": "getLatestFeed(" + symbol + ") called from " + c.IP()})
}()

if !strings.Contains(symbol, "test") {
symbol = strings.ToUpper(symbol)
}
Expand Down
11 changes: 10 additions & 1 deletion node/pkg/dal/api/route.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package api

import (
"context"

"github.com/gofiber/contrib/websocket"
"github.com/gofiber/fiber/v2"
)
Expand All @@ -11,5 +13,12 @@ func Routes(router fiber.Router) {
api.Get("/symbols", getSymbols)
api.Get("/latest-data-feeds/all", getLatestFeeds)
api.Get("/latest-data-feeds/:symbol", getLatestFeed)
api.Get("/ws", websocket.New(ApiController.handleWebsocket))
api.Get("/ws", func(c *fiber.Ctx) error {
if websocket.IsWebSocketUpgrade(c) {
return websocket.New(func(conn *websocket.Conn) {
ApiController.handleWebsocket(context.Background(), conn)
})(c)
}
return fiber.ErrUpgradeRequired
})
}
3 changes: 3 additions & 0 deletions node/pkg/dal/utils/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"bisonai.com/orakl/node/pkg/dal/utils/keycache"
"bisonai.com/orakl/node/pkg/dal/utils/stats"
"bisonai.com/orakl/node/pkg/db"
errorSentinel "bisonai.com/orakl/node/pkg/error"

Expand Down Expand Up @@ -58,6 +59,8 @@ func Setup(ctx context.Context) (*fiber.App, error) {
KeyLookup: "header:X-API-Key",
Validator: validator,
}))

app.Use(stats.StatsMiddleware)
return app, nil
}

Expand Down
29 changes: 29 additions & 0 deletions node/pkg/dal/utils/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

"bisonai.com/orakl/node/pkg/db"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
)

const (
Expand Down Expand Up @@ -67,3 +69,30 @@ func InsertWebsocketSubscription(ctx context.Context, connectionId int32, topic
"topic": topic,
})
}

func StatsMiddleware(c *fiber.Ctx) error {
start := time.Now()

if err := c.Next(); err != nil {
return err
}
duration := time.Since(start)
headers := c.GetReqHeaders()
apiKeyRaw, ok := headers["X-Api-Key"]
if !ok {
log.Warn().Msg("X-Api-Key header not found")
return nil
}
apiKey := apiKeyRaw[0]
if apiKey == "" {
log.Warn().Msg("X-Api-Key header is empty")
return nil
}

endpoint := c.Path()
statusCode := c.Response().StatusCode()
if err := InsertRestCall(c.Context(), apiKey, endpoint, statusCode, duration); err != nil {
log.Error().Err(err).Msg("failed to insert rest call")
}
return nil
}

0 comments on commit 18d830e

Please sign in to comment.