Skip to content

Commit

Permalink
Automatically expire Pulsar scheduler Redis map entries (#3177)
Browse files Browse the repository at this point in the history
* ARMADA-2346 Delete Pulsar scheduler Redis map entries (#29)

* Redis map entry deletion. Separating submit_from_log.go logic depending on scheduler

* Revert "Redis map entry deletion. Separating submit_from_log.go logic depending on scheduler"

This reverts commit 9d12da33d2b61a158d2b45793bc20a1490b1fbc1.

* Simpler implementation of redis entry deletion

* Calling correct method on event to avoid panics

* Checked if event is terminal before expiring entry, checked for preempted events

* Made returned error more explicit

* Cleanup

---------

Co-authored-by: Mustafa Ilyas <[email protected]>
  • Loading branch information
severinson and mustafai-gr authored Dec 5, 2023
1 parent 1e336ea commit a86caa4
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 8 deletions.
11 changes: 8 additions & 3 deletions internal/armada/repository/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type JobRepository interface {
GetNumberOfRetryAttempts(jobId string) (int, error)
StorePulsarSchedulerJobDetails(jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error
GetPulsarSchedulerJobDetails(jobIds string) (*schedulerobjects.PulsarSchedulerJobDetails, error)
DeletePulsarSchedulerJobDetails(jobId []string) error
ExpirePulsarSchedulerJobDetails(jobId []string) error
}

type RedisJobRepository struct {
Expand Down Expand Up @@ -1017,10 +1017,15 @@ func (repo *RedisJobRepository) GetPulsarSchedulerJobDetails(jobId string) (*sch
return details, nil
}

func (repo *RedisJobRepository) DeletePulsarSchedulerJobDetails(jobIds []string) error {
func (repo *RedisJobRepository) ExpirePulsarSchedulerJobDetails(jobIds []string) error {
if len(jobIds) == 0 {
return nil
}
pipe := repo.db.Pipeline()
for _, jobId := range jobIds {
pipe.Del(pulsarJobPrefix + jobId)
key := fmt.Sprintf("%s%s", pulsarJobPrefix, jobId)
// Expire as opposed to delete so that we are permissive of race conditions.
pipe.Expire(key, 1*time.Hour)
}
if _, err := pipe.Exec(); err != nil {
return errors.Wrap(err, "failed to delete pulsar job details in Redis")
Expand Down
33 changes: 29 additions & 4 deletions internal/armada/server/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,9 @@ func makeAggregatedQueueServerWithTestDoubles(maxRetries uint) (*mockJobReposito
}

type mockJobRepository struct {
jobs map[string]*api.Job
jobRetries map[string]int
jobs map[string]*api.Job
jobRetries map[string]int
pulsarDetails map[string]*schedulerobjects.PulsarSchedulerJobDetails

returnLeaseCalls int
deleteJobsCalls int
Expand All @@ -229,21 +230,45 @@ type mockJobRepository struct {
}

func (repo *mockJobRepository) StorePulsarSchedulerJobDetails(jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error {
for _, job := range jobDetails {
key := job.JobId
value := &schedulerobjects.PulsarSchedulerJobDetails{
JobId: job.JobId,
Queue: job.Queue,
JobSet: job.JobSet,
}
repo.pulsarDetails[key] = value
}
return nil
}

func (repo *mockJobRepository) GetPulsarSchedulerJobDetails(jobIds string) (*schedulerobjects.PulsarSchedulerJobDetails, error) {
return nil, nil
key := jobIds
value, ok := repo.pulsarDetails[key]
if !ok {
return nil, fmt.Errorf("details for jobId %s not stored", jobIds)
}

return value, nil
}

func (repo *mockJobRepository) DeletePulsarSchedulerJobDetails(jobId []string) error {
func (repo *mockJobRepository) ExpirePulsarSchedulerJobDetails(jobId []string) error {
for _, id := range jobId {
key := id
if _, ok := repo.pulsarDetails[key]; !ok {
return fmt.Errorf("could not expire details for jobId %s - details not stored", id)
}
delete(repo.pulsarDetails, key)
}

return nil
}

func newMockJobRepository() *mockJobRepository {
return &mockJobRepository{
jobs: make(map[string]*api.Job),
jobRetries: make(map[string]int),
pulsarDetails: make(map[string]*schedulerobjects.PulsarSchedulerJobDetails),
returnLeaseCalls: 0,
deleteJobsCalls: 0,
returnLeaseArg1: "",
Expand Down
42 changes: 41 additions & 1 deletion internal/armada/server/submit_from_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error {

// If this message isn't for us we can simply ack it
// and go to the next message
if !schedulers.ForLegacyScheduler(msg) {
if !schedulers.ForLegacyScheduler(msg) && !schedulers.ForPulsarScheduler(msg) {
srv.ack(ctx, msg)
break
}
Expand All @@ -132,13 +132,53 @@ func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error {
}

ctxWithLogger.WithField("numEvents", len(sequence.Events)).Info("processing sequence")

// This file exists primarily for the legacy scheduler - the pulsar scheduler only need to read from
// pulsar to delete redis PulsarJob:jobId entries
if schedulers.ForPulsarScheduler(msg) {
if err := srv.handlePulsarSchedulerEventSequence(ctxWithLogger, sequence); err != nil {
logging.WithStacktrace(ctx, err).Warnf("could not expire PulsarJobDetails; ignoring")
numErrored++
}

srv.ack(ctx, msg)
break
}

// TODO: Improve retry logic.
srv.ProcessSequence(ctxWithLogger, sequence)
srv.ack(ctx, msg)
}
}
}

func (srv *SubmitFromLog) handlePulsarSchedulerEventSequence(ctx *armadacontext.Context, sequence *armadaevents.EventSequence) error {
idsOfJobsToExpireMappingFor := make([]string, 0)
for _, event := range sequence.GetEvents() {
var jobId string
var err error
switch e := event.Event.(type) {
case *armadaevents.EventSequence_Event_JobSucceeded:
jobId, err = armadaevents.UlidStringFromProtoUuid(e.JobSucceeded.JobId)
case *armadaevents.EventSequence_Event_JobErrors:
if ok := armadaslices.AnyFunc(e.JobErrors.Errors, func(e *armadaevents.Error) bool { return e.Terminal }); ok {
jobId, err = armadaevents.UlidStringFromProtoUuid(e.JobErrors.JobId)
}
case *armadaevents.EventSequence_Event_CancelledJob:
jobId, err = armadaevents.UlidStringFromProtoUuid(e.CancelledJob.JobId)
default:
// Non-terminal event
continue
}
if err != nil {
logging.WithStacktrace(ctx, err).Warnf("failed to determine jobId from event of type %T; ignoring", event.Event)
continue
}
idsOfJobsToExpireMappingFor = append(idsOfJobsToExpireMappingFor, jobId)
}
return srv.SubmitServer.jobRepository.ExpirePulsarSchedulerJobDetails(idsOfJobsToExpireMappingFor)
}

// ProcessSequence processes all events in a particular sequence.
// For efficiency, we may process several events at a time.
// To maintain ordering, we only do so for subsequences of consecutive events of equal type.
Expand Down
10 changes: 10 additions & 0 deletions internal/common/slices/slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,13 @@ func Repeat[T any](n int, vs ...T) []T {
}
return rv
}

// AnyFunc returns true if predicate(v) returns true for any value v in s.
func AnyFunc[S ~[]T, T any](s S, predicate func(val T) bool) bool {
for _, v := range s {
if predicate(v) {
return true
}
}
return false
}
28 changes: 28 additions & 0 deletions internal/common/slices/slices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,31 @@ func TestRepeat(t *testing.T) {
})
}
}

func TestAny(t *testing.T) {
var nilSlice []int
assert.Equal(
t,
false,
AnyFunc(nilSlice, func(v int) bool { return true }),
)

emptySlice := make([]int, 0)
assert.Equal(
t,
false,
AnyFunc(emptySlice, func(v int) bool { return true }),
)

assert.Equal(
t,
true,
AnyFunc([]int{1, 2, 3}, func(v int) bool { return v > 2 }),
)

assert.Equal(
t,
false,
AnyFunc([]int{1, 2, 3}, func(v int) bool { return v > 3 }),
)
}

0 comments on commit a86caa4

Please sign in to comment.