diff --git a/node/cmd/dal/main.go b/node/cmd/dal/main.go index 786e3d1ab..df04dbd9a 100644 --- a/node/cmd/dal/main.go +++ b/node/cmd/dal/main.go @@ -2,9 +2,6 @@ package main import ( "context" - "os" - "os/signal" - "syscall" "bisonai.com/orakl/node/pkg/dal" "github.com/rs/zerolog/log" @@ -14,13 +11,6 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sigCh - cancel() - }() - err := dal.Run(ctx) if err != nil { log.Fatal().Err(err).Msg("Failed to start DAL") diff --git a/node/migrations/dal/000004_remove_log.down.sql b/node/migrations/dal/000004_remove_log.down.sql new file mode 100644 index 000000000..df10581dd --- /dev/null +++ b/node/migrations/dal/000004_remove_log.down.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS logs ( + id SERIAL PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + message TEXT NOT NULL +) \ No newline at end of file diff --git a/node/migrations/dal/000004_remove_log.up.sql b/node/migrations/dal/000004_remove_log.up.sql new file mode 100644 index 000000000..7ceef22ca --- /dev/null +++ b/node/migrations/dal/000004_remove_log.up.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS logs \ No newline at end of file diff --git a/node/pkg/dal/api/controller.go b/node/pkg/dal/api/controller.go index cb915fc95..405c5d62c 100644 --- a/node/pkg/dal/api/controller.go +++ b/node/pkg/dal/api/controller.go @@ -8,7 +8,7 @@ import ( "bisonai.com/orakl/node/pkg/common/types" "bisonai.com/orakl/node/pkg/dal/collector" dalcommon "bisonai.com/orakl/node/pkg/dal/common" - "bisonai.com/orakl/node/pkg/db" + "bisonai.com/orakl/node/pkg/dal/utils/stats" "bisonai.com/orakl/node/pkg/utils/request" "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" @@ -103,20 +103,30 @@ func (c *Controller) castSubmissionData(data *dalcommon.OutgoingSubmissionData, } } -func (c *Controller) handleWebsocket(conn *websocket.Conn) { +func (c *Controller) handleWebsocket(ctx context.Context, conn *websocket.Conn) { c.register <- conn - _ = db.QueryWithoutResult( - context.Background(), - "INSERT INTO logs (message) VALUES (@message);", - map[string]any{"message": "websocket connected from " + conn.IP()}) + apiKey := conn.Headers("X-Api-Key") + if apiKey == "" { + log.Error().Msg("X-Api-Key header not found") + return + } + + id, err := stats.InsertWebsocketConnection(ctx, apiKey) + if err != nil { + log.Error().Err(err).Msg("failed to insert websocket connection") + return + } + log.Info().Int32("id", id).Msg("inserted websocket connection") defer func() { c.unregister <- conn conn.Close() - _ = db.QueryWithoutResult( - context.Background(), - "INSERT INTO logs (message) VALUES (@message);", - map[string]any{"message": "websocket disconnected from " + conn.IP()}) + err := stats.UpdateWebsocketConnection(ctx, id) + if err != nil { + log.Error().Err(err).Msg("failed to update websocket connection") + return + } + log.Info().Int32("id", id).Msg("updated websocket connection") }() for { @@ -127,15 +137,15 @@ func (c *Controller) handleWebsocket(conn *websocket.Conn) { } if msg.Method == "SUBSCRIBE" { - _ = db.QueryWithoutResult( - context.Background(), - "INSERT INTO logs (message) VALUES (@message);", - map[string]any{"message": "websocket subscribed(" + strings.Join(msg.Params, ",") + ") from " + conn.IP()}, - ) + if c.clients[conn] == nil { c.clients[conn] = make(map[string]bool) } for _, param := range msg.Params { + err = stats.InsertWebsocketSubscription(ctx, id, param) + if err != nil { + log.Error().Err(err).Msg("failed to insert websocket subscription") + } symbol := strings.TrimPrefix(param, "submission@") if _, ok := c.configs[symbol]; !ok { continue @@ -155,13 +165,6 @@ func getSymbols(c *fiber.Ctx) error { } func getLatestFeeds(c *fiber.Ctx) error { - defer func() { - _ = db.QueryWithoutResult( - c.Context(), - "INSERT INTO logs (message) VALUES (@message);", - map[string]any{"message": "getLatestFeeds called from " + c.IP()}) - }() - result := ApiController.Collector.GetAllLatestData() return c.JSON(result) } @@ -176,13 +179,6 @@ func getLatestFeed(c *fiber.Ctx) error { return errors.New("symbol should be in {BASE}-{QUOTE} format") } - defer func() { - _ = db.QueryWithoutResult( - c.Context(), - "INSERT INTO logs (message) VALUES (@message);", - map[string]any{"message": "getLatestFeed(" + symbol + ") called from " + c.IP()}) - }() - if !strings.Contains(symbol, "test") { symbol = strings.ToUpper(symbol) } diff --git a/node/pkg/dal/api/route.go b/node/pkg/dal/api/route.go index 305fcea99..334a1e4e4 100644 --- a/node/pkg/dal/api/route.go +++ b/node/pkg/dal/api/route.go @@ -1,6 +1,8 @@ package api import ( + "context" + "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" ) @@ -11,5 +13,12 @@ func Routes(router fiber.Router) { api.Get("/symbols", getSymbols) api.Get("/latest-data-feeds/all", getLatestFeeds) api.Get("/latest-data-feeds/:symbol", getLatestFeed) - api.Get("/ws", websocket.New(ApiController.handleWebsocket)) + api.Get("/ws", func(c *fiber.Ctx) error { + if websocket.IsWebSocketUpgrade(c) { + return websocket.New(func(conn *websocket.Conn) { + ApiController.handleWebsocket(context.Background(), conn) + })(c) + } + return fiber.ErrUpgradeRequired + }) } diff --git a/node/pkg/dal/utils/initializer/initializer.go b/node/pkg/dal/utils/initializer/initializer.go index 9a137c1fc..0251a8f35 100644 --- a/node/pkg/dal/utils/initializer/initializer.go +++ b/node/pkg/dal/utils/initializer/initializer.go @@ -10,6 +10,7 @@ import ( "time" "bisonai.com/orakl/node/pkg/dal/utils/keycache" + "bisonai.com/orakl/node/pkg/dal/utils/stats" "bisonai.com/orakl/node/pkg/db" errorSentinel "bisonai.com/orakl/node/pkg/error" @@ -58,6 +59,8 @@ func Setup(ctx context.Context) (*fiber.App, error) { KeyLookup: "header:X-API-Key", Validator: validator, })) + + app.Use(stats.StatsMiddleware) return app, nil } diff --git a/node/pkg/dal/utils/stats/stats.go b/node/pkg/dal/utils/stats/stats.go index 502f4a142..bddb9f7af 100644 --- a/node/pkg/dal/utils/stats/stats.go +++ b/node/pkg/dal/utils/stats/stats.go @@ -5,6 +5,8 @@ import ( "time" "bisonai.com/orakl/node/pkg/db" + "github.com/gofiber/fiber/v2" + "github.com/rs/zerolog/log" ) const ( @@ -67,3 +69,30 @@ func InsertWebsocketSubscription(ctx context.Context, connectionId int32, topic "topic": topic, }) } + +func StatsMiddleware(c *fiber.Ctx) error { + start := time.Now() + + if err := c.Next(); err != nil { + return err + } + duration := time.Since(start) + headers := c.GetReqHeaders() + apiKeyRaw, ok := headers["X-Api-Key"] + if !ok { + log.Warn().Msg("X-Api-Key header not found") + return nil + } + apiKey := apiKeyRaw[0] + if apiKey == "" { + log.Warn().Msg("X-Api-Key header is empty") + return nil + } + + endpoint := c.Path() + statusCode := c.Response().StatusCode() + if err := InsertRestCall(c.Context(), apiKey, endpoint, statusCode, duration); err != nil { + log.Error().Err(err).Msg("failed to insert rest call") + } + return nil +}