Skip to content

Commit

Permalink
feat: statsApp implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 20, 2024
1 parent 5629c05 commit 336fe59
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 42 deletions.
6 changes: 3 additions & 3 deletions node/pkg/dal/apiv2/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Start(ctx context.Context, opts ...ServerV2Option) error {
return err
}

wsServer := NewServer(config.Collector, config.KeyCache, config.Hub)
wsServer := NewServer(config.Collector, config.KeyCache, config.Hub, config.StatsApp)
httpServer := &http.Server{
Handler: wsServer,
BaseContext: func(_ net.Listener) context.Context {
Expand All @@ -65,7 +65,7 @@ func Start(ctx context.Context, opts ...ServerV2Option) error {
return nil
}

func NewServer(collector *collector.Collector, keyCache *keycache.KeyCache, hub *hub.Hub) *ServerV2 {
func NewServer(collector *collector.Collector, keyCache *keycache.KeyCache, hub *hub.Hub, statsApp *stats.StatsApp) *ServerV2 {
s := &ServerV2{
collector: collector,
keyCache: keyCache,
Expand All @@ -83,7 +83,7 @@ func NewServer(collector *collector.Collector, keyCache *keycache.KeyCache, hub
serveMux.HandleFunc("GET /latest-data-feeds/{symbols}", s.LatestFeedsHandler)

// Apply the RequestLoggerMiddleware to the ServeMux
loggedMux := stats.RequestLoggerMiddleware(serveMux)
loggedMux := statsApp.RequestLoggerMiddleware(serveMux)

s.handler = loggedMux

Expand Down
8 changes: 8 additions & 0 deletions node/pkg/dal/apiv2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bisonai.com/orakl/node/pkg/dal/collector"
"bisonai.com/orakl/node/pkg/dal/hub"
"bisonai.com/orakl/node/pkg/dal/utils/keycache"
"bisonai.com/orakl/node/pkg/dal/utils/stats"
)

type BulkResponse struct {
Expand All @@ -29,6 +30,7 @@ type ServerV2Config struct {
Collector *collector.Collector
Hub *hub.Hub
KeyCache *keycache.KeyCache
StatsApp *stats.StatsApp
}

type ServerV2Option func(*ServerV2Config)
Expand All @@ -51,6 +53,12 @@ func WithHub(h *hub.Hub) ServerV2Option {
}
}

func WithStatsApp(s *stats.StatsApp) ServerV2Option {
return func(config *ServerV2Config) {
config.StatsApp = s
}
}

func WithKeyCache(k *keycache.KeyCache) ServerV2Option {
return func(config *ServerV2Config) {
config.KeyCache = k
Expand Down
5 changes: 3 additions & 2 deletions node/pkg/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type Config = types.Config
func Run(ctx context.Context) error {
log.Debug().Msg("Starting DAL API server")

go stats.Start(ctx)
statsApp := stats.Start(ctx)
defer statsApp.Stop()

keyCache := keycache.NewAPIKeyCache(1 * time.Hour)
keyCache.CleanupLoop(10 * time.Minute)
Expand All @@ -48,7 +49,7 @@ func Run(ctx context.Context) error {
hub := hub.HubSetup(ctx, configs)
go hub.Start(ctx, collector)

err = apiv2.Start(ctx, apiv2.WithCollector(collector), apiv2.WithHub(hub), apiv2.WithKeyCache(keyCache))
err = apiv2.Start(ctx, apiv2.WithCollector(collector), apiv2.WithHub(hub), apiv2.WithKeyCache(keyCache), apiv2.WithStatsApp(statsApp))
if err != nil {
log.Error().Err(err).Msg("Failed to start DAL WS server")
return err
Expand Down
9 changes: 8 additions & 1 deletion node/pkg/dal/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bisonai.com/orakl/node/pkg/dal/collector"
"bisonai.com/orakl/node/pkg/dal/hub"
"bisonai.com/orakl/node/pkg/dal/utils/keycache"
"bisonai.com/orakl/node/pkg/dal/utils/stats"
"bisonai.com/orakl/node/pkg/db"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand All @@ -32,6 +33,7 @@ type TestItems struct {
MockAdmin *httptest.Server
MockDal *httptest.Server
ApiKey string
StatsApp *stats.StatsApp
}

func testPublishData(ctx context.Context, submissionData aggregator.SubmissionData) error {
Expand Down Expand Up @@ -113,7 +115,10 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {
hub := hub.HubSetup(ctx, configs)
go hub.Start(ctx, collector)

server := apiv2.NewServer(collector, keyCache, hub)
statsApp := stats.NewStatsApp(ctx, stats.WithBulkLogsCopyInterval(1*time.Second))
go statsApp.Run(ctx)

server := apiv2.NewServer(collector, keyCache, hub, statsApp)

mockDal := httptest.NewServer(server)

Expand All @@ -122,13 +127,15 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {
testItems.Controller = hub
testItems.MockAdmin = mockAdminServer
testItems.MockDal = mockDal
testItems.StatsApp = statsApp

return cleanup(ctx, testItems), testItems, nil
}

func cleanup(ctx context.Context, testItems *TestItems) func() error {
return func() error {
testItems.MockDal.Close()
testItems.StatsApp.Stop()

testItems.Collector.Stop()
testItems.Controller = nil
Expand Down
124 changes: 88 additions & 36 deletions node/pkg/dal/utils/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,30 @@ const (
DefaultBufferSize = 20000
)

var restEntryBuffer chan *RestEntry
type StatsAppConfig struct {
BulkLogsCopyInterval time.Duration
BufferSize int
}

type StatsOption func(*StatsAppConfig)

func WithBulkLogsCopyInterval(interval time.Duration) StatsOption {
return func(config *StatsAppConfig) {
config.BulkLogsCopyInterval = interval
}
}

func WithBufferSize(size int) StatsOption {
return func(config *StatsAppConfig) {
config.BufferSize = size
}
}

type StatsApp struct {
BulkLogsCopyInterval time.Duration
RestEntryBuffer chan *RestEntry
Cancel context.CancelFunc
}

type WebsocketId struct {
Id int32 `db:"id"`
Expand All @@ -49,10 +72,39 @@ type RestEntry struct {
ResponseTime time.Duration
}

func Start(ctx context.Context) {
restEntryBuffer = make(chan *RestEntry, DefaultBufferSize)
ticker := time.NewTicker(DefaultBufferSize)
func NewStatsApp(ctx context.Context, opts ...StatsOption) *StatsApp {
_, cancel := context.WithCancel(ctx)

config := &StatsAppConfig{
BulkLogsCopyInterval: DefaultBulkLogsCopyInterval,
BufferSize: DefaultBufferSize,
}

for _, opt := range opts {
opt(config)
}

return &StatsApp{
BulkLogsCopyInterval: config.BulkLogsCopyInterval,
RestEntryBuffer: make(chan *RestEntry, config.BufferSize),
Cancel: cancel,
}
}

func Start(ctx context.Context) *StatsApp {
app := NewStatsApp(ctx)
go app.Run(ctx)
return app
}

func (a *StatsApp) Stop() {
a.Cancel()
}

func (a *StatsApp) Run(ctx context.Context) {
ticker := time.NewTicker(a.BulkLogsCopyInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -62,7 +114,7 @@ func Start(ctx context.Context) {
loop:
for {
select {
case entry := <-restEntryBuffer:
case entry := <-a.RestEntryBuffer:
bulkCopyEntries = append(bulkCopyEntries, []any{entry.ApiKey, entry.Endpoint, entry.StatusCode, entry.ResponseTime.Microseconds()})
default:
break loop
Expand All @@ -79,6 +131,37 @@ func Start(ctx context.Context) {
}
}

func (a *StatsApp) RequestLoggerMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
sl := NewStatsLogger(w)
w.Header()
defer func() {
key := r.Header.Get("X-API-Key")
if key == "" {
log.Warn().Msg("X-API-Key header is empty")
return
}

endpoint := r.RequestURI
if endpoint == "/" {
return
}

statusCode := sl.statusCode
responseTime := time.Since(start)

a.RestEntryBuffer <- &RestEntry{
ApiKey: key,
Endpoint: endpoint,
StatusCode: *statusCode,
ResponseTime: responseTime,
}
}()
next.ServeHTTP(sl, r)
})
}

func InsertRestCall(ctx context.Context, apiKey string, endpoint string, statusCode int, responseTime time.Duration) error {
responseTimeMicro := responseTime.Microseconds()
return db.QueryWithoutResult(ctx, INSERT_REST_CALLS, map[string]any{
Expand Down Expand Up @@ -114,37 +197,6 @@ func InsertWebsocketSubscriptions(ctx context.Context, connectionId int32, topic
return db.BulkInsert(ctx, "websocket_subscriptions", []string{"connection_id", "topic"}, entries)
}

func RequestLoggerMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
sl := NewStatsLogger(w)
w.Header()
defer func() {
key := r.Header.Get("X-API-Key")
if key == "" {
log.Warn().Msg("X-API-Key header is empty")
return
}

endpoint := r.RequestURI
if endpoint == "/" {
return
}

statusCode := sl.statusCode
responseTime := time.Since(start)

restEntryBuffer <- &RestEntry{
ApiKey: key,
Endpoint: endpoint,
StatusCode: *statusCode,
ResponseTime: responseTime,
}
}()
next.ServeHTTP(sl, r)
})
}

type StatsLogger struct {
w *http.ResponseWriter
body *bytes.Buffer
Expand Down

0 comments on commit 336fe59

Please sign in to comment.