From e78db8bfe25657e3233da5ca9e0e3a3c4e27c643 Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 19 Aug 2024 21:05:02 +0900 Subject: [PATCH] feat: bulk --- node/pkg/dal/app.go | 3 ++ node/pkg/dal/utils/stats/stats.go | 59 +++++++++++++++++++++++++++---- 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/node/pkg/dal/app.go b/node/pkg/dal/app.go index 6e18e9dbd..440e7c523 100644 --- a/node/pkg/dal/app.go +++ b/node/pkg/dal/app.go @@ -11,6 +11,7 @@ import ( "bisonai.com/orakl/node/pkg/dal/collector" "bisonai.com/orakl/node/pkg/dal/hub" "bisonai.com/orakl/node/pkg/dal/utils/keycache" + "bisonai.com/orakl/node/pkg/dal/utils/stats" "bisonai.com/orakl/node/pkg/utils/request" "github.com/rs/zerolog/log" @@ -21,6 +22,8 @@ type Config = types.Config func Run(ctx context.Context) error { log.Debug().Msg("Starting DAL API server") + stats.Start(ctx) + keyCache := keycache.NewAPIKeyCache(1 * time.Hour) keyCache.CleanupLoop(10 * time.Minute) diff --git a/node/pkg/dal/utils/stats/stats.go b/node/pkg/dal/utils/stats/stats.go index c63359728..ed69cf72f 100644 --- a/node/pkg/dal/utils/stats/stats.go +++ b/node/pkg/dal/utils/stats/stats.go @@ -31,12 +31,55 @@ const ( ` ) -type websocketId struct { +const ( + DefaultBulkLogsCopyInterval = 10 * time.Minute + DefaultBufferSize = 20000 +) + +var restEntryBuffer chan *RestEntry + +type WebsocketId struct { Id int32 `db:"id"` } -func InsertRestCall(ctx context.Context, apiKey string, endpoint string, statusCode int, responseTime time.Duration) error { +type RestEntry struct { + ApiKey string + Endpoint string + StatusCode int + ResponseTime time.Duration +} + +func Start(ctx context.Context) { + restEntryBuffer = make(chan *RestEntry, DefaultBufferSize) + ticker := time.NewTicker(DefaultBufferSize) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + bulkCopyEntries := [][]any{} + loop: + for { + select { + case entry := <-restEntryBuffer: + bulkCopyEntries = append(bulkCopyEntries, []any{entry.ApiKey, entry.Endpoint, entry.StatusCode, entry.ResponseTime.Microseconds()}) + default: + break loop + } + } + if len(bulkCopyEntries) > 0 { + _, err := db.BulkCopy(ctx, "rest_calls", []string{"api_key", "endpoint", "status_code", "response_time"}, bulkCopyEntries) + if err != nil { + log.Error().Err(err).Msg("failed to bulk copy rest calls") + } + } + } + } +} + +func InsertRestCall(ctx context.Context, apiKey string, endpoint string, statusCode int, responseTime time.Duration) error { responseTimeMicro := responseTime.Microseconds() return db.QueryWithoutResult(ctx, INSERT_REST_CALLS, map[string]any{ "api_key": apiKey, @@ -47,7 +90,7 @@ func InsertRestCall(ctx context.Context, apiKey string, endpoint string, statusC } func InsertWebsocketConnection(ctx context.Context, apiKey string) (int32, error) { - result, err := db.QueryRow[websocketId](ctx, INSERT_WEBSOCKET_CONNECTIONS, map[string]any{ + result, err := db.QueryRow[WebsocketId](ctx, INSERT_WEBSOCKET_CONNECTIONS, map[string]any{ "api_key": apiKey, }) if err != nil { @@ -89,9 +132,13 @@ func RequestLoggerMiddleware(next http.Handler) http.Handler { } statusCode := sl.statusCode - duration := time.Since(start) - if err := InsertRestCall(r.Context(), key, endpoint, *statusCode, duration); err != nil { - log.Error().Err(err).Msg("failed to insert rest call") + responseTime := time.Since(start) + + restEntryBuffer <- &RestEntry{ + ApiKey: key, + Endpoint: endpoint, + StatusCode: *statusCode, + ResponseTime: responseTime, } }() next.ServeHTTP(sl, r)