diff --git a/node/pkg/dal/apiv2/controller.go b/node/pkg/dal/apiv2/controller.go index cd69249e9..b011ccf2f 100644 --- a/node/pkg/dal/apiv2/controller.go +++ b/node/pkg/dal/apiv2/controller.go @@ -49,7 +49,7 @@ func Start(ctx context.Context, opts ...ServerV2Option) error { return err } - wsServer := NewServer(config.Collector, config.KeyCache, config.Hub) + wsServer := NewServer(config.Collector, config.KeyCache, config.Hub, config.StatsApp) httpServer := &http.Server{ Handler: wsServer, BaseContext: func(_ net.Listener) context.Context { @@ -65,7 +65,7 @@ func Start(ctx context.Context, opts ...ServerV2Option) error { return nil } -func NewServer(collector *collector.Collector, keyCache *keycache.KeyCache, hub *hub.Hub) *ServerV2 { +func NewServer(collector *collector.Collector, keyCache *keycache.KeyCache, hub *hub.Hub, statsApp *stats.StatsApp) *ServerV2 { s := &ServerV2{ collector: collector, keyCache: keyCache, @@ -83,7 +83,7 @@ func NewServer(collector *collector.Collector, keyCache *keycache.KeyCache, hub serveMux.HandleFunc("GET /latest-data-feeds/{symbols}", s.LatestFeedsHandler) // Apply the RequestLoggerMiddleware to the ServeMux - loggedMux := stats.RequestLoggerMiddleware(serveMux) + loggedMux := statsApp.RequestLoggerMiddleware(serveMux) s.handler = loggedMux diff --git a/node/pkg/dal/apiv2/types.go b/node/pkg/dal/apiv2/types.go index 764b02214..2d2ced35f 100644 --- a/node/pkg/dal/apiv2/types.go +++ b/node/pkg/dal/apiv2/types.go @@ -6,6 +6,7 @@ import ( "bisonai.com/orakl/node/pkg/dal/collector" "bisonai.com/orakl/node/pkg/dal/hub" "bisonai.com/orakl/node/pkg/dal/utils/keycache" + "bisonai.com/orakl/node/pkg/dal/utils/stats" ) type BulkResponse struct { @@ -29,6 +30,7 @@ type ServerV2Config struct { Collector *collector.Collector Hub *hub.Hub KeyCache *keycache.KeyCache + StatsApp *stats.StatsApp } type ServerV2Option func(*ServerV2Config) @@ -51,6 +53,12 @@ func WithHub(h *hub.Hub) ServerV2Option { } } +func WithStatsApp(s *stats.StatsApp) ServerV2Option { + return func(config *ServerV2Config) { + config.StatsApp = s + } +} + func WithKeyCache(k *keycache.KeyCache) ServerV2Option { return func(config *ServerV2Config) { config.KeyCache = k diff --git a/node/pkg/dal/app.go b/node/pkg/dal/app.go index c5e4ccb65..7d18eeb04 100644 --- a/node/pkg/dal/app.go +++ b/node/pkg/dal/app.go @@ -22,7 +22,8 @@ type Config = types.Config func Run(ctx context.Context) error { log.Debug().Msg("Starting DAL API server") - go stats.Start(ctx) + statsApp := stats.Start(ctx) + defer statsApp.Stop() keyCache := keycache.NewAPIKeyCache(1 * time.Hour) keyCache.CleanupLoop(10 * time.Minute) @@ -48,7 +49,7 @@ func Run(ctx context.Context) error { hub := hub.HubSetup(ctx, configs) go hub.Start(ctx, collector) - err = apiv2.Start(ctx, apiv2.WithCollector(collector), apiv2.WithHub(hub), apiv2.WithKeyCache(keyCache)) + err = apiv2.Start(ctx, apiv2.WithCollector(collector), apiv2.WithHub(hub), apiv2.WithKeyCache(keyCache), apiv2.WithStatsApp(statsApp)) if err != nil { log.Error().Err(err).Msg("Failed to start DAL WS server") return err diff --git a/node/pkg/dal/tests/main_test.go b/node/pkg/dal/tests/main_test.go index 04f4ffa73..b748a891e 100644 --- a/node/pkg/dal/tests/main_test.go +++ b/node/pkg/dal/tests/main_test.go @@ -17,6 +17,7 @@ import ( "bisonai.com/orakl/node/pkg/dal/collector" "bisonai.com/orakl/node/pkg/dal/hub" "bisonai.com/orakl/node/pkg/dal/utils/keycache" + "bisonai.com/orakl/node/pkg/dal/utils/stats" "bisonai.com/orakl/node/pkg/db" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -32,6 +33,7 @@ type TestItems struct { MockAdmin *httptest.Server MockDal *httptest.Server ApiKey string + StatsApp *stats.StatsApp } func testPublishData(ctx context.Context, submissionData aggregator.SubmissionData) error { @@ -113,7 +115,10 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { hub := hub.HubSetup(ctx, configs) go hub.Start(ctx, collector) - server := apiv2.NewServer(collector, keyCache, hub) + statsApp := stats.NewStatsApp(ctx, stats.WithBulkLogsCopyInterval(1*time.Second)) + go statsApp.Run(ctx) + + server := apiv2.NewServer(collector, keyCache, hub, statsApp) mockDal := httptest.NewServer(server) @@ -122,6 +127,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { testItems.Controller = hub testItems.MockAdmin = mockAdminServer testItems.MockDal = mockDal + testItems.StatsApp = statsApp return cleanup(ctx, testItems), testItems, nil } @@ -129,6 +135,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { func cleanup(ctx context.Context, testItems *TestItems) func() error { return func() error { testItems.MockDal.Close() + testItems.StatsApp.Stop() testItems.Collector.Stop() testItems.Controller = nil diff --git a/node/pkg/dal/utils/stats/stats.go b/node/pkg/dal/utils/stats/stats.go index ed69cf72f..413a6cf11 100644 --- a/node/pkg/dal/utils/stats/stats.go +++ b/node/pkg/dal/utils/stats/stats.go @@ -36,7 +36,30 @@ const ( DefaultBufferSize = 20000 ) -var restEntryBuffer chan *RestEntry +type StatsAppConfig struct { + BulkLogsCopyInterval time.Duration + BufferSize int +} + +type StatsOption func(*StatsAppConfig) + +func WithBulkLogsCopyInterval(interval time.Duration) StatsOption { + return func(config *StatsAppConfig) { + config.BulkLogsCopyInterval = interval + } +} + +func WithBufferSize(size int) StatsOption { + return func(config *StatsAppConfig) { + config.BufferSize = size + } +} + +type StatsApp struct { + BulkLogsCopyInterval time.Duration + RestEntryBuffer chan *RestEntry + Cancel context.CancelFunc +} type WebsocketId struct { Id int32 `db:"id"` @@ -49,10 +72,39 @@ type RestEntry struct { ResponseTime time.Duration } -func Start(ctx context.Context) { - restEntryBuffer = make(chan *RestEntry, DefaultBufferSize) - ticker := time.NewTicker(DefaultBufferSize) +func NewStatsApp(ctx context.Context, opts ...StatsOption) *StatsApp { + _, cancel := context.WithCancel(ctx) + + config := &StatsAppConfig{ + BulkLogsCopyInterval: DefaultBulkLogsCopyInterval, + BufferSize: DefaultBufferSize, + } + + for _, opt := range opts { + opt(config) + } + + return &StatsApp{ + BulkLogsCopyInterval: config.BulkLogsCopyInterval, + RestEntryBuffer: make(chan *RestEntry, config.BufferSize), + Cancel: cancel, + } +} + +func Start(ctx context.Context) *StatsApp { + app := NewStatsApp(ctx) + go app.Run(ctx) + return app +} + +func (a *StatsApp) Stop() { + a.Cancel() +} + +func (a *StatsApp) Run(ctx context.Context) { + ticker := time.NewTicker(a.BulkLogsCopyInterval) defer ticker.Stop() + for { select { case <-ctx.Done(): @@ -62,7 +114,7 @@ func Start(ctx context.Context) { loop: for { select { - case entry := <-restEntryBuffer: + case entry := <-a.RestEntryBuffer: bulkCopyEntries = append(bulkCopyEntries, []any{entry.ApiKey, entry.Endpoint, entry.StatusCode, entry.ResponseTime.Microseconds()}) default: break loop @@ -79,6 +131,37 @@ func Start(ctx context.Context) { } } +func (a *StatsApp) RequestLoggerMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + sl := NewStatsLogger(w) + w.Header() + defer func() { + key := r.Header.Get("X-API-Key") + if key == "" { + log.Warn().Msg("X-API-Key header is empty") + return + } + + endpoint := r.RequestURI + if endpoint == "/" { + return + } + + statusCode := sl.statusCode + responseTime := time.Since(start) + + a.RestEntryBuffer <- &RestEntry{ + ApiKey: key, + Endpoint: endpoint, + StatusCode: *statusCode, + ResponseTime: responseTime, + } + }() + next.ServeHTTP(sl, r) + }) +} + func InsertRestCall(ctx context.Context, apiKey string, endpoint string, statusCode int, responseTime time.Duration) error { responseTimeMicro := responseTime.Microseconds() return db.QueryWithoutResult(ctx, INSERT_REST_CALLS, map[string]any{ @@ -114,37 +197,6 @@ func InsertWebsocketSubscriptions(ctx context.Context, connectionId int32, topic return db.BulkInsert(ctx, "websocket_subscriptions", []string{"connection_id", "topic"}, entries) } -func RequestLoggerMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - start := time.Now() - sl := NewStatsLogger(w) - w.Header() - defer func() { - key := r.Header.Get("X-API-Key") - if key == "" { - log.Warn().Msg("X-API-Key header is empty") - return - } - - endpoint := r.RequestURI - if endpoint == "/" { - return - } - - statusCode := sl.statusCode - responseTime := time.Since(start) - - restEntryBuffer <- &RestEntry{ - ApiKey: key, - Endpoint: endpoint, - StatusCode: *statusCode, - ResponseTime: responseTime, - } - }() - next.ServeHTTP(sl, r) - }) -} - type StatsLogger struct { w *http.ResponseWriter body *bytes.Buffer