Skip to content

Commit

Permalink
fetch all logs in one trip
Browse files Browse the repository at this point in the history
  • Loading branch information
Intizar-T committed Aug 12, 2024
1 parent d34db70 commit d3a90e8
Showing 1 changed file with 58 additions and 57 deletions.
115 changes: 58 additions & 57 deletions node/pkg/logscribe/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
logsChannelSize = 10_000
dbReadBatchSize = 1000
DefaultBulkLogsCopyInterval = 30 * time.Second
readDeleteLogsQuery = `DELETE FROM logs WHERE id IN (SELECT id FROM logs LIMIT @limit) RETURNING *;`
readDeleteLogsQuery = `DELETE FROM logs RETURNING *;`
topOccurrencesForGithubIssue = 5
logAlreadyProcessedQuery = `SELECT COUNT(*) FROM processed_logs WHERE log_hash = @hash`
)
Expand Down Expand Up @@ -92,7 +92,7 @@ func (a *App) Run(ctx context.Context) error {
go a.bulkCopyLogs(ctx, logsChannel)

cron := cron.New()
_, err = cron.AddFunc("@every 3s", func() {
_, err = cron.AddFunc("@weekly", func() { // Run once a week, midnight between Sat/Sun
a.processLogs(ctx)
})
if err != nil {
Expand Down Expand Up @@ -145,45 +145,48 @@ func (a *App) bulkCopyLogs(ctx context.Context, logsChannel <-chan *[]api.LogIns
func (a *App) processLogs(ctx context.Context) {
processedLogs := make(map[string][]LogInsertModelWithIDWithCount)
logMap := make(map[string]map[string][]LogInsertModelWithID) // {"service": {hashedLog: []logs}}
for {
logs, err := fetchDeleteLogs(ctx)
if err != nil || len(logs) == 0 {
break
}
logs, err := fetchDeleteLogs(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch logs")
return
}
if len(logs) == 0 {
log.Debug().Msgf("No logs to process")
return
}

for _, log := range logs {
hash := hashLog(log)
if logMap[log.Service] == nil {
logMap[log.Service] = make(map[string][]LogInsertModelWithID)
}
if logMap[log.Service][hash] == nil {
logMap[log.Service][hash] = make([]LogInsertModelWithID, 0)
}
// uniquely identify logs by hash and store them in a slice to count occurrences later
logMap[log.Service][hash] = append(logMap[log.Service][hash], log)
for _, log := range logs {
hash := hashLog(log)
if logMap[log.Service] == nil {
logMap[log.Service] = make(map[string][]LogInsertModelWithID)
}
if logMap[log.Service][hash] == nil {
logMap[log.Service][hash] = make([]LogInsertModelWithID, 0)
}
// uniquely identify logs by hash and store them in a slice to count occurrences later
logMap[log.Service][hash] = append(logMap[log.Service][hash], log)
}

for service, hashLogPairs := range logMap {
pairs := make([]HashLogPairs, 0, len(hashLogPairs))
for hash, logs := range hashLogPairs {
pairs = append(pairs, HashLogPairs{hash, logs})
}
sort.Slice(pairs, func(i, j int) bool {
return len(pairs[i].logs) > len(pairs[j].logs)
})
for service, hashLogPairs := range logMap {
pairs := make([]HashLogPairs, 0, len(hashLogPairs))
for hash, logs := range hashLogPairs {
pairs = append(pairs, HashLogPairs{hash, logs})
}
sort.Slice(pairs, func(i, j int) bool {
return len(pairs[i].logs) > len(pairs[j].logs)
})

topOccurrences := pairs
if len(pairs) > topOccurrencesForGithubIssue {
topOccurrences = pairs[:topOccurrencesForGithubIssue]
}
processedLogs[service] = make([]LogInsertModelWithIDWithCount, 0, len(topOccurrences))
for _, pair := range topOccurrences {
// all logs in pair.logs are same (due to hashing), so only need to store one
processedLogs[service] = append(
processedLogs[service],
LogInsertModelWithIDWithCount{OccurrenceCount: len(pair.logs), LogInsertModelWithID: pair.logs[0]},
)
}
topOccurrences := pairs
if len(pairs) > topOccurrencesForGithubIssue {
topOccurrences = pairs[:topOccurrencesForGithubIssue]
}
processedLogs[service] = make([]LogInsertModelWithIDWithCount, 0, len(topOccurrences))
for _, pair := range topOccurrences {
// all logs in pair.logs are same (due to hashing), so only need to store one
processedLogs[service] = append(
processedLogs[service],
LogInsertModelWithIDWithCount{OccurrenceCount: len(pair.logs), LogInsertModelWithID: pair.logs[0]},
)
}
}

Expand All @@ -192,6 +195,24 @@ func (a *App) processLogs(ctx context.Context) {
}
}

func fetchDeleteLogs(ctx context.Context) ([]LogInsertModelWithID, error) {
logs, err := db.QueryRows[LogInsertModelWithID](ctx, readDeleteLogsQuery, nil)
if err != nil {
log.Error().Err(err).Msg("Failed to read logs")
return nil, err
}
return logs, nil
}

func hashLog(log LogInsertModelWithID) string {
hash := sha256.New()
hash.Write([]byte(log.Service))
hash.Write([]byte(fmt.Sprintf("%d", log.Level)))
hash.Write([]byte(log.Message))
hash.Write(log.Fields)
return hex.EncodeToString(hash.Sum(nil))
}

func (a *App) createGithubIssue(ctx context.Context, processedLogs map[string][]LogInsertModelWithIDWithCount) {
if a.githubOwner == "test" {
return
Expand Down Expand Up @@ -244,23 +265,3 @@ func (a *App) createGithubIssue(ctx context.Context, processedLogs map[string][]
}
log.Debug().Msgf("Created %d github issues", issueCount)
}

func fetchDeleteLogs(ctx context.Context) ([]LogInsertModelWithID, error) {
logs, err := db.QueryRows[LogInsertModelWithID](ctx, readDeleteLogsQuery, map[string]any{
"limit": dbReadBatchSize,
})
if err != nil {
log.Error().Err(err).Msg("Failed to read logs")
return nil, err
}
return logs, nil
}

func hashLog(log LogInsertModelWithID) string {
hash := sha256.New()
hash.Write([]byte(log.Service))
hash.Write([]byte(fmt.Sprintf("%d", log.Level)))
hash.Write([]byte(log.Message))
hash.Write(log.Fields)
return hex.EncodeToString(hash.Sum(nil))
}

0 comments on commit d3a90e8

Please sign in to comment.