Skip to content

Commit

Permalink
feat: bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 19, 2024
1 parent 1bde9e8 commit e78db8b
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
3 changes: 3 additions & 0 deletions node/pkg/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"bisonai.com/orakl/node/pkg/dal/collector"
"bisonai.com/orakl/node/pkg/dal/hub"
"bisonai.com/orakl/node/pkg/dal/utils/keycache"
"bisonai.com/orakl/node/pkg/dal/utils/stats"
"bisonai.com/orakl/node/pkg/utils/request"

"github.com/rs/zerolog/log"
Expand All @@ -21,6 +22,8 @@ type Config = types.Config
func Run(ctx context.Context) error {
log.Debug().Msg("Starting DAL API server")

stats.Start(ctx)

keyCache := keycache.NewAPIKeyCache(1 * time.Hour)
keyCache.CleanupLoop(10 * time.Minute)

Expand Down
59 changes: 53 additions & 6 deletions node/pkg/dal/utils/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,55 @@ const (
`
)

type websocketId struct {
const (
DefaultBulkLogsCopyInterval = 10 * time.Minute
DefaultBufferSize = 20000
)

var restEntryBuffer chan *RestEntry

type WebsocketId struct {
Id int32 `db:"id"`
}

func InsertRestCall(ctx context.Context, apiKey string, endpoint string, statusCode int, responseTime time.Duration) error {
type RestEntry struct {
ApiKey string
Endpoint string
StatusCode int
ResponseTime time.Duration
}

func Start(ctx context.Context) {
restEntryBuffer = make(chan *RestEntry, DefaultBufferSize)
ticker := time.NewTicker(DefaultBufferSize)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
bulkCopyEntries := [][]any{}
loop:
for {
select {
case entry := <-restEntryBuffer:
bulkCopyEntries = append(bulkCopyEntries, []any{entry.ApiKey, entry.Endpoint, entry.StatusCode, entry.ResponseTime.Microseconds()})
default:
break loop
}
}

if len(bulkCopyEntries) > 0 {
_, err := db.BulkCopy(ctx, "rest_calls", []string{"api_key", "endpoint", "status_code", "response_time"}, bulkCopyEntries)
if err != nil {
log.Error().Err(err).Msg("failed to bulk copy rest calls")
}
}
}
}
}

func InsertRestCall(ctx context.Context, apiKey string, endpoint string, statusCode int, responseTime time.Duration) error {
responseTimeMicro := responseTime.Microseconds()
return db.QueryWithoutResult(ctx, INSERT_REST_CALLS, map[string]any{
"api_key": apiKey,
Expand All @@ -47,7 +90,7 @@ func InsertRestCall(ctx context.Context, apiKey string, endpoint string, statusC
}

func InsertWebsocketConnection(ctx context.Context, apiKey string) (int32, error) {
result, err := db.QueryRow[websocketId](ctx, INSERT_WEBSOCKET_CONNECTIONS, map[string]any{
result, err := db.QueryRow[WebsocketId](ctx, INSERT_WEBSOCKET_CONNECTIONS, map[string]any{
"api_key": apiKey,
})
if err != nil {
Expand Down Expand Up @@ -89,9 +132,13 @@ func RequestLoggerMiddleware(next http.Handler) http.Handler {
}

statusCode := sl.statusCode
duration := time.Since(start)
if err := InsertRestCall(r.Context(), key, endpoint, *statusCode, duration); err != nil {
log.Error().Err(err).Msg("failed to insert rest call")
responseTime := time.Since(start)

restEntryBuffer <- &RestEntry{
ApiKey: key,
Endpoint: endpoint,
StatusCode: *statusCode,
ResponseTime: responseTime,
}
}()
next.ServeHTTP(sl, r)
Expand Down

0 comments on commit e78db8b

Please sign in to comment.