Skip to content

Commit

Permalink
encapsulate bulk copy logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Intizar-T committed Aug 5, 2024
1 parent ab4c420 commit ccef8f7
Showing 1 changed file with 32 additions and 30 deletions.
62 changes: 32 additions & 30 deletions node/pkg/logscribe/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,7 @@ func Run(ctx context.Context) error {
}
}()

go func() {
ticker := time.NewTicker(DefaultBulkLogsCopyInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
bulkCopyEntries := [][]interface{}{}
loop:
for {
select {
case logs := <-logsChannel:
for _, log := range logs {
bulkCopyEntries = append(bulkCopyEntries, []interface{}{log.Service, log.Timestamp, log.Level, log.Message, log.Fields})
}
default:
break loop
}
}

if len(bulkCopyEntries) > 0 {
_, err := db.BulkCopy(ctx, "logs", []string{"service", "timestamp", "level", "message", "fields"}, bulkCopyEntries)
if err != nil {
log.Error().Err(err).Msg("Failed to bulk copy logs")
}
}
}
}
}()
go bulkCopyLogs(ctx, logsChannel)

<-ctx.Done()

Expand All @@ -85,3 +56,34 @@ func Run(ctx context.Context) error {

return nil
}

func bulkCopyLogs(ctx context.Context, logsChannel <-chan []api.LogInsertModel) {
ticker := time.NewTicker(DefaultBulkLogsCopyInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
bulkCopyEntries := [][]interface{}{}
loop:
for {
select {
case logs := <-logsChannel:
for _, log := range logs {
bulkCopyEntries = append(bulkCopyEntries, []interface{}{log.Service, log.Timestamp, log.Level, log.Message, log.Fields})
}
default:
break loop
}
}

if len(bulkCopyEntries) > 0 {
_, err := db.BulkCopy(ctx, "logs", []string{"service", "timestamp", "level", "message", "fields"}, bulkCopyEntries)
if err != nil {
log.Error().Err(err).Msg("Failed to bulk copy logs")
}
}
}
}
}

0 comments on commit ccef8f7

Please sign in to comment.