diff --git a/internal/armada/repository/job.go b/internal/armada/repository/job.go index 9484ae3ead3..d7ab59a4fda 100644 --- a/internal/armada/repository/job.go +++ b/internal/armada/repository/job.go @@ -84,7 +84,7 @@ type JobRepository interface { GetNumberOfRetryAttempts(jobId string) (int, error) StorePulsarSchedulerJobDetails(jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error GetPulsarSchedulerJobDetails(jobIds string) (*schedulerobjects.PulsarSchedulerJobDetails, error) - DeletePulsarSchedulerJobDetails(jobId []string) error + ExpirePulsarSchedulerJobDetails(jobId []string) error } type RedisJobRepository struct { @@ -1017,10 +1017,15 @@ func (repo *RedisJobRepository) GetPulsarSchedulerJobDetails(jobId string) (*sch return details, nil } -func (repo *RedisJobRepository) DeletePulsarSchedulerJobDetails(jobIds []string) error { +func (repo *RedisJobRepository) ExpirePulsarSchedulerJobDetails(jobIds []string) error { + if len(jobIds) == 0 { + return nil + } pipe := repo.db.Pipeline() for _, jobId := range jobIds { - pipe.Del(pulsarJobPrefix + jobId) + key := fmt.Sprintf("%s%s", pulsarJobPrefix, jobId) + // Expire as opposed to delete so that we are permissive of race conditions. + pipe.Expire(key, 1*time.Hour) } if _, err := pipe.Exec(); err != nil { return errors.Wrap(err, "failed to delete pulsar job details in Redis") diff --git a/internal/armada/server/lease_test.go b/internal/armada/server/lease_test.go index d95dfc73105..7cab8109a05 100644 --- a/internal/armada/server/lease_test.go +++ b/internal/armada/server/lease_test.go @@ -213,8 +213,9 @@ func makeAggregatedQueueServerWithTestDoubles(maxRetries uint) (*mockJobReposito } type mockJobRepository struct { - jobs map[string]*api.Job - jobRetries map[string]int + jobs map[string]*api.Job + jobRetries map[string]int + pulsarDetails map[string]*schedulerobjects.PulsarSchedulerJobDetails returnLeaseCalls int deleteJobsCalls int @@ -229,14 +230,37 @@ type mockJobRepository struct { } func (repo *mockJobRepository) StorePulsarSchedulerJobDetails(jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error { + for _, job := range jobDetails { + key := job.JobId + value := &schedulerobjects.PulsarSchedulerJobDetails{ + JobId: job.JobId, + Queue: job.Queue, + JobSet: job.JobSet, + } + repo.pulsarDetails[key] = value + } return nil } func (repo *mockJobRepository) GetPulsarSchedulerJobDetails(jobIds string) (*schedulerobjects.PulsarSchedulerJobDetails, error) { - return nil, nil + key := jobIds + value, ok := repo.pulsarDetails[key] + if !ok { + return nil, fmt.Errorf("details for jobId %s not stored", jobIds) + } + + return value, nil } -func (repo *mockJobRepository) DeletePulsarSchedulerJobDetails(jobId []string) error { +func (repo *mockJobRepository) ExpirePulsarSchedulerJobDetails(jobId []string) error { + for _, id := range jobId { + key := id + if _, ok := repo.pulsarDetails[key]; !ok { + return fmt.Errorf("could not expire details for jobId %s - details not stored", id) + } + delete(repo.pulsarDetails, key) + } + return nil } @@ -244,6 +268,7 @@ func newMockJobRepository() *mockJobRepository { return &mockJobRepository{ jobs: make(map[string]*api.Job), jobRetries: make(map[string]int), + pulsarDetails: make(map[string]*schedulerobjects.PulsarSchedulerJobDetails), returnLeaseCalls: 0, deleteJobsCalls: 0, returnLeaseArg1: "", diff --git a/internal/armada/server/submit_from_log.go b/internal/armada/server/submit_from_log.go index c32060bf15a..cd686f71d0c 100644 --- a/internal/armada/server/submit_from_log.go +++ b/internal/armada/server/submit_from_log.go @@ -111,7 +111,7 @@ func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error { // If this message isn't for us we can simply ack it // and go to the next message - if !schedulers.ForLegacyScheduler(msg) { + if !schedulers.ForLegacyScheduler(msg) && !schedulers.ForPulsarScheduler(msg) { srv.ack(ctx, msg) break } @@ -132,6 +132,19 @@ func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error { } ctxWithLogger.WithField("numEvents", len(sequence.Events)).Info("processing sequence") + + // This file exists primarily for the legacy scheduler - the pulsar scheduler only need to read from + // pulsar to delete redis PulsarJob:jobId entries + if schedulers.ForPulsarScheduler(msg) { + if err := srv.handlePulsarSchedulerEventSequence(ctxWithLogger, sequence); err != nil { + logging.WithStacktrace(ctx, err).Warnf("could not expire PulsarJobDetails; ignoring") + numErrored++ + } + + srv.ack(ctx, msg) + break + } + // TODO: Improve retry logic. srv.ProcessSequence(ctxWithLogger, sequence) srv.ack(ctx, msg) @@ -139,6 +152,33 @@ func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error { } } +func (srv *SubmitFromLog) handlePulsarSchedulerEventSequence(ctx *armadacontext.Context, sequence *armadaevents.EventSequence) error { + idsOfJobsToExpireMappingFor := make([]string, 0) + for _, event := range sequence.GetEvents() { + var jobId string + var err error + switch e := event.Event.(type) { + case *armadaevents.EventSequence_Event_JobSucceeded: + jobId, err = armadaevents.UlidStringFromProtoUuid(e.JobSucceeded.JobId) + case *armadaevents.EventSequence_Event_JobErrors: + if ok := armadaslices.AnyFunc(e.JobErrors.Errors, func(e *armadaevents.Error) bool { return e.Terminal }); ok { + jobId, err = armadaevents.UlidStringFromProtoUuid(e.JobErrors.JobId) + } + case *armadaevents.EventSequence_Event_CancelledJob: + jobId, err = armadaevents.UlidStringFromProtoUuid(e.CancelledJob.JobId) + default: + // Non-terminal event + continue + } + if err != nil { + logging.WithStacktrace(ctx, err).Warnf("failed to determine jobId from event of type %T; ignoring", event.Event) + continue + } + idsOfJobsToExpireMappingFor = append(idsOfJobsToExpireMappingFor, jobId) + } + return srv.SubmitServer.jobRepository.ExpirePulsarSchedulerJobDetails(idsOfJobsToExpireMappingFor) +} + // ProcessSequence processes all events in a particular sequence. // For efficiency, we may process several events at a time. // To maintain ordering, we only do so for subsequences of consecutive events of equal type. diff --git a/internal/common/slices/slices.go b/internal/common/slices/slices.go index 1d3065dad2b..2cfaf7c5710 100644 --- a/internal/common/slices/slices.go +++ b/internal/common/slices/slices.go @@ -168,3 +168,13 @@ func Repeat[T any](n int, vs ...T) []T { } return rv } + +// AnyFunc returns true if predicate(v) returns true for any value v in s. +func AnyFunc[S ~[]T, T any](s S, predicate func(val T) bool) bool { + for _, v := range s { + if predicate(v) { + return true + } + } + return false +} diff --git a/internal/common/slices/slices_test.go b/internal/common/slices/slices_test.go index 08e74a297ed..4d57dbf7fac 100644 --- a/internal/common/slices/slices_test.go +++ b/internal/common/slices/slices_test.go @@ -298,3 +298,31 @@ func TestRepeat(t *testing.T) { }) } } + +func TestAny(t *testing.T) { + var nilSlice []int + assert.Equal( + t, + false, + AnyFunc(nilSlice, func(v int) bool { return true }), + ) + + emptySlice := make([]int, 0) + assert.Equal( + t, + false, + AnyFunc(emptySlice, func(v int) bool { return true }), + ) + + assert.Equal( + t, + true, + AnyFunc([]int{1, 2, 3}, func(v int) bool { return v > 2 }), + ) + + assert.Equal( + t, + false, + AnyFunc([]int{1, 2, 3}, func(v int) bool { return v > 3 }), + ) +}