Skip to content

Commit

Permalink
chore: Sanitize job logging
Browse files Browse the repository at this point in the history
Cleans up job logs.
Removes job_args from most of the job logs except Enqueue and Dequeue.
Removes double logging of job id and type.
  • Loading branch information
ezr-ondrej committed Oct 16, 2023
1 parent 0f3abc7 commit b16c5c6
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 20 deletions.
3 changes: 1 addition & 2 deletions pkg/worker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func initJobContext(origCtx context.Context, job *Job) (context.Context, *zerolo
Str("account_number", job.Identity.Identity.AccountNumber).
Str("request_id", job.EdgeID).
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Interface("job_args", job.Args)
Str("job_type", job.Type.String())

if config.Telemetry.Enabled {
ctx = otel.GetTextMapPropagator().Extract(ctx, job.TraceContext)
Expand Down
8 changes: 8 additions & 0 deletions pkg/worker/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@ func (w *MemoryWorker) Enqueue(ctx context.Context, job *Job) error {
return fmt.Errorf("unable to enqueue job: %w", ErrJobNotFound)
}

logger := zerolog.Ctx(ctx).With().
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Logger()
logger.Info().Interface("job_args", job.Args).Msg("Enqueuing job via memory")

if job.ID == uuid.Nil {
job.ID, err = uuid.NewRandom()
if err != nil {
logger.Error().Err(err).Msg("Unable to generate a job id")
return fmt.Errorf("unable to generate UUID: %w", err)
}
}
Expand Down Expand Up @@ -72,6 +79,7 @@ func (w *MemoryWorker) processJob(origCtx context.Context, job *Job) {

ctx, logger, span := initJobContext(origCtx, job)
defer span.End()
logger.Info().Interface("job_args", job.Args).Msgf("Dequeued job from memory")

if h, ok := w.handlers[job.Type]; ok {
cCtx, cFunc := context.WithTimeout(ctx, config.Worker.Timeout)
Expand Down
39 changes: 21 additions & 18 deletions pkg/worker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/RHEnVision/provisioning-backend/internal/config"
"github.com/RHEnVision/provisioning-backend/internal/metrics"
"github.com/RHEnVision/provisioning-backend/internal/ptr"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -78,20 +77,20 @@ func (w *RedisWorker) Enqueue(ctx context.Context, job *Job) error {
return fmt.Errorf("unable to enqueue job: %w", ErrJobNotFound)
}

logger := zerolog.Ctx(ctx).With().
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Logger()
logger.Info().Interface("job_args", job.Args).Msg("Enqueuing job via Redis")

if job.ID == uuid.Nil {
job.ID, err = uuid.NewRandom()
if err != nil {
logger.Error().Err(err).Msg("Unable to generate a job id")
return fmt.Errorf("unable to generate UUID: %w", err)
}
}

logger := ptr.To(zerolog.Ctx(ctx).With().
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Interface("job_args", job.Args).
Logger())
logger.Info().Msgf("Enqueuing job type %s via Redis", job.Type)

var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)

Expand All @@ -102,20 +101,22 @@ func (w *RedisWorker) Enqueue(ctx context.Context, job *Job) error {

err = enc.Encode(&job)
if err != nil {
logger.Error().Err(err).Msg("Unable to encode the job")
return fmt.Errorf("unable to encode args: %w", err)
}

cmd := w.client.LPush(ctx, w.queueName, buffer.Bytes())
if cmd.Err() != nil {
logger.Error().Err(err).Msg("Unable to push job into Redis")
logger.Error().Err(cmd.Err()).Msg("Unable to push job into Redis")
return fmt.Errorf("unable to push job into Redis: %w", cmd.Err())
}

result, err := cmd.Result()
if err != nil {
logger.Error().Err(err).Msg("Unable to process redis push result")
return fmt.Errorf("unable to process result: %w", err)
}
logger.Info().Int64("job_result", result).Msg("Pushed job successfully")
logger.Info().Int64("redis_push_result", result).Msg("Pushed job successfully")
return nil
}

Expand Down Expand Up @@ -188,29 +189,31 @@ func (w *RedisWorker) fetchJob(ctx context.Context) {
var job Job
dec := gob.NewDecoder(strings.NewReader(res[1]))
err = dec.Decode(&job)
logger := ptr.To(zerolog.Ctx(ctx).With().
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Interface("job_args", job.Args).
Logger())
if err != nil {
logger.Error().Err(err).Msg("Unable to unmarshal job payload, skipping")
zerolog.Ctx(ctx).Error().
Err(err).
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Interface("job_args", job.Args).
Msg("Unable to unmarshal job payload, skipping")
return
}

atomic.AddInt64(&w.inFlight, 1)
defer atomic.AddInt64(&w.inFlight, -1)

w.processJob(ctx, &job)
}

func (w *RedisWorker) processJob(origCtx context.Context, job *Job) {
if job == nil {
return
}
defer atomic.AddInt64(&w.inFlight, -1)

ctx, logger, span := initJobContext(origCtx, job)
defer span.End()
defer recoverAndLog(ctx)
logger.Info().Msgf("Dequeued job %s %s from Redis", job.Type.String(), job.ID)
logger.Info().Interface("job_args", job.Args).Msgf("Dequeued job from Redis")

if h, ok := w.handlers[job.Type]; ok {
cCtx, cFunc := context.WithTimeout(ctx, config.Worker.Timeout)
Expand Down

0 comments on commit b16c5c6

Please sign in to comment.