diff --git a/internal/scheduleringester/dbops.go b/internal/scheduleringester/dbops.go index b4300cc868a..794bab984e7 100644 --- a/internal/scheduleringester/dbops.go +++ b/internal/scheduleringester/dbops.go @@ -1,6 +1,8 @@ package scheduleringester import ( + "time" + "github.com/apache/pulsar-client-go/pulsar" "github.com/google/uuid" "golang.org/x/exp/maps" @@ -22,6 +24,7 @@ func (d *DbOperationsWithMessageIds) GetMessageIDs() []pulsar.MessageID { type JobRunFailed struct { LeaseReturned bool RunAttempted bool + FailureTime time.Time } type JobSchedulingInfoUpdate struct { @@ -119,15 +122,15 @@ type ( UpdateJobSetPriorities map[JobSetKey]int64 MarkJobSetsCancelRequested map[JobSetKey]*JobSetCancelAction MarkJobsCancelRequested map[string]bool - MarkJobsCancelled map[string]bool + MarkJobsCancelled map[string]time.Time MarkJobsSucceeded map[string]bool MarkJobsFailed map[string]bool UpdateJobPriorities map[string]int64 UpdateJobSchedulingInfo map[string]*JobSchedulingInfoUpdate UpdateJobQueuedState map[string]*JobQueuedStateUpdate - MarkRunsSucceeded map[uuid.UUID]bool + MarkRunsSucceeded map[uuid.UUID]time.Time MarkRunsFailed map[uuid.UUID]*JobRunFailed - MarkRunsRunning map[uuid.UUID]bool + MarkRunsRunning map[uuid.UUID]time.Time InsertJobRunErrors map[uuid.UUID]*schedulerdb.JobRunError InsertPartitionMarker struct { markers []*schedulerdb.Marker diff --git a/internal/scheduleringester/dbops_test.go b/internal/scheduleringester/dbops_test.go index 385ec158a9c..0dc9897373f 100644 --- a/internal/scheduleringester/dbops_test.go +++ b/internal/scheduleringester/dbops_test.go @@ -3,6 +3,7 @@ package scheduleringester import ( "fmt" "testing" + "time" "github.com/google/uuid" "github.com/pkg/errors" @@ -171,37 +172,37 @@ func TestDbOperationOptimisation(t *testing.T) { }}, "MarkJobsCancelled": {N: 2, Ops: []DbOperation{ InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 - MarkJobsCancelled{jobIds[0]: true}, // 2 + MarkJobsCancelled{jobIds[0]: time.Time{}}, // 2 InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 2 - MarkJobsCancelled{jobIds[1]: true}, // 2 + MarkJobsCancelled{jobIds[1]: time.Time{}}, // 2 InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 2 }}, "MarkRunsSucceeded": {N: 3, Ops: []DbOperation{ InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 InsertRuns{runIds[0]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}}, // 2 - MarkRunsSucceeded{runIds[0]: true}, // 3 + MarkRunsSucceeded{runIds[0]: time.Time{}}, // 3 InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 3 InsertRuns{runIds[1]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[1]}}}, // 3 - MarkRunsSucceeded{runIds[1]: true}, // 3 + MarkRunsSucceeded{runIds[1]: time.Time{}}, // 3 InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 3 }}, "MarkRunsFailed": {N: 3, Ops: []DbOperation{ InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 InsertRuns{runIds[0]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}}, // 2 - MarkRunsFailed{runIds[0]: &JobRunFailed{true, true}}, // 3 + MarkRunsFailed{runIds[0]: &JobRunFailed{true, true, time.Time{}}}, // 3 InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 3 InsertRuns{runIds[1]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[1]}}}, // 3 - MarkRunsFailed{runIds[1]: &JobRunFailed{true, true}}, // 3 + MarkRunsFailed{runIds[1]: &JobRunFailed{true, true, time.Time{}}}, // 3 InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 3 }}, "MarkRunsRunning": {N: 3, Ops: []DbOperation{ InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 InsertRuns{runIds[0]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}}, // 2 - MarkRunsRunning{runIds[0]: true}, // 3 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 3 + MarkRunsRunning{runIds[0]: time.Time{}}, // 3 + InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 3 InsertRuns{runIds[1]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[1]}}}, // 3 - MarkRunsRunning{runIds[1]: true}, // 3 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 3 + MarkRunsRunning{runIds[1]: time.Time{}}, // 3 + InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 3 }}, "InsertPartitionMarker": {N: 2, Ops: []DbOperation{ InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 diff --git a/internal/scheduleringester/instructions.go b/internal/scheduleringester/instructions.go index d4d3e1c3245..8204ea8cbbd 100644 --- a/internal/scheduleringester/instructions.go +++ b/internal/scheduleringester/instructions.go @@ -78,13 +78,13 @@ func (c *InstructionConverter) dbOperationsFromEventSequence(es *armadaevents.Ev case *armadaevents.EventSequence_Event_SubmitJob: operationsFromEvent, err = c.handleSubmitJob(event.GetSubmitJob(), eventTime, meta) case *armadaevents.EventSequence_Event_JobRunLeased: - operationsFromEvent, err = c.handleJobRunLeased(event.GetJobRunLeased(), meta) + operationsFromEvent, err = c.handleJobRunLeased(event.GetJobRunLeased(), eventTime, meta) case *armadaevents.EventSequence_Event_JobRunRunning: - operationsFromEvent, err = c.handleJobRunRunning(event.GetJobRunRunning()) + operationsFromEvent, err = c.handleJobRunRunning(event.GetJobRunRunning(), eventTime) case *armadaevents.EventSequence_Event_JobRunSucceeded: - operationsFromEvent, err = c.handleJobRunSucceeded(event.GetJobRunSucceeded()) + operationsFromEvent, err = c.handleJobRunSucceeded(event.GetJobRunSucceeded(), eventTime) case *armadaevents.EventSequence_Event_JobRunErrors: - operationsFromEvent, err = c.handleJobRunErrors(event.GetJobRunErrors()) + operationsFromEvent, err = c.handleJobRunErrors(event.GetJobRunErrors(), eventTime) case *armadaevents.EventSequence_Event_JobSucceeded: operationsFromEvent, err = c.handleJobSucceeded(event.GetJobSucceeded()) case *armadaevents.EventSequence_Event_JobErrors: @@ -98,7 +98,7 @@ func (c *InstructionConverter) dbOperationsFromEventSequence(es *armadaevents.Ev case *armadaevents.EventSequence_Event_CancelJobSet: operationsFromEvent, err = c.handleCancelJobSet(event.GetCancelJobSet(), meta) case *armadaevents.EventSequence_Event_CancelledJob: - operationsFromEvent, err = c.handleCancelledJob(event.GetCancelledJob()) + operationsFromEvent, err = c.handleCancelledJob(event.GetCancelledJob(), eventTime) case *armadaevents.EventSequence_Event_JobRequeued: operationsFromEvent, err = c.handleJobRequeued(event.GetJobRequeued()) case *armadaevents.EventSequence_Event_PartitionMarker: @@ -177,7 +177,7 @@ func (c *InstructionConverter) handleSubmitJob(job *armadaevents.SubmitJob, subm }}}, nil } -func (c *InstructionConverter) handleJobRunLeased(jobRunLeased *armadaevents.JobRunLeased, meta eventSequenceCommon) ([]DbOperation, error) { +func (c *InstructionConverter) handleJobRunLeased(jobRunLeased *armadaevents.JobRunLeased, leaseTime time.Time, meta eventSequenceCommon) ([]DbOperation, error) { runId := armadaevents.UuidFromProtoUuid(jobRunLeased.GetRunId()) jobId, err := armadaevents.UlidStringFromProtoUuid(jobRunLeased.GetJobId()) if err != nil { @@ -197,6 +197,7 @@ func (c *InstructionConverter) handleJobRunLeased(jobRunLeased *armadaevents.Job Executor: jobRunLeased.GetExecutorId(), Node: jobRunLeased.GetNodeId(), ScheduledAtPriority: scheduledAtPriority, + LeasedTimestamp: &leaseTime, }, }}, UpdateJobQueuedState{jobId: &JobQueuedStateUpdate{ @@ -227,17 +228,17 @@ func (c *InstructionConverter) handleJobRequeued(jobRequeued *armadaevents.JobRe }, nil } -func (c *InstructionConverter) handleJobRunRunning(jobRunRunning *armadaevents.JobRunRunning) ([]DbOperation, error) { +func (c *InstructionConverter) handleJobRunRunning(jobRunRunning *armadaevents.JobRunRunning, runningTime time.Time) ([]DbOperation, error) { runId := armadaevents.UuidFromProtoUuid(jobRunRunning.GetRunId()) - return []DbOperation{MarkRunsRunning{runId: true}}, nil + return []DbOperation{MarkRunsRunning{runId: runningTime}}, nil } -func (c *InstructionConverter) handleJobRunSucceeded(jobRunSucceeded *armadaevents.JobRunSucceeded) ([]DbOperation, error) { +func (c *InstructionConverter) handleJobRunSucceeded(jobRunSucceeded *armadaevents.JobRunSucceeded, successTime time.Time) ([]DbOperation, error) { runId := armadaevents.UuidFromProtoUuid(jobRunSucceeded.GetRunId()) - return []DbOperation{MarkRunsSucceeded{runId: true}}, nil + return []DbOperation{MarkRunsSucceeded{runId: successTime}}, nil } -func (c *InstructionConverter) handleJobRunErrors(jobRunErrors *armadaevents.JobRunErrors) ([]DbOperation, error) { +func (c *InstructionConverter) handleJobRunErrors(jobRunErrors *armadaevents.JobRunErrors, failureTime time.Time) ([]DbOperation, error) { runId := armadaevents.UuidFromProtoUuid(jobRunErrors.GetRunId()) jobId, err := armadaevents.UlidStringFromProtoUuid(jobRunErrors.JobId) if err != nil { @@ -264,6 +265,7 @@ func (c *InstructionConverter) handleJobRunErrors(jobRunErrors *armadaevents.Job markRunsFailed[runId] = &JobRunFailed{ LeaseReturned: runError.GetPodLeaseReturned() != nil, RunAttempted: runAttempted, + FailureTime: failureTime, } return []DbOperation{insertJobRunErrors, markRunsFailed}, nil } @@ -338,13 +340,13 @@ func (c *InstructionConverter) handleCancelJobSet(cancelJobSet *armadaevents.Can }}, nil } -func (c *InstructionConverter) handleCancelledJob(cancelledJob *armadaevents.CancelledJob) ([]DbOperation, error) { +func (c *InstructionConverter) handleCancelledJob(cancelledJob *armadaevents.CancelledJob, cancelTime time.Time) ([]DbOperation, error) { jobId, err := armadaevents.UlidStringFromProtoUuid(cancelledJob.GetJobId()) if err != nil { return nil, err } return []DbOperation{MarkJobsCancelled{ - jobId: true, + jobId: cancelTime, }}, nil } diff --git a/internal/scheduleringester/instructions_test.go b/internal/scheduleringester/instructions_test.go index 9fd9e759b49..bdb228d8b55 100644 --- a/internal/scheduleringester/instructions_test.go +++ b/internal/scheduleringester/instructions_test.go @@ -3,6 +3,7 @@ package scheduleringester import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -59,6 +60,7 @@ func TestConvertSequence(t *testing.T) { Executor: f.ExecutorId, Node: f.NodeName, ScheduledAtPriority: &f.ScheduledAtPriority, + LeasedTimestamp: &f.BaseTime, }}}, UpdateJobQueuedState{f.JobIdString: &JobQueuedStateUpdate{ Queued: false, @@ -68,11 +70,11 @@ func TestConvertSequence(t *testing.T) { }, "job run running": { events: []*armadaevents.EventSequence_Event{f.Running}, - expected: []DbOperation{MarkRunsRunning{f.RunIdUuid: true}}, + expected: []DbOperation{MarkRunsRunning{f.RunIdUuid: f.BaseTime}}, }, "job run succeeded": { events: []*armadaevents.EventSequence_Event{f.JobRunSucceeded}, - expected: []DbOperation{MarkRunsSucceeded{f.RunIdUuid: true}}, + expected: []DbOperation{MarkRunsSucceeded{f.RunIdUuid: f.BaseTime}}, }, "lease returned": { events: []*armadaevents.EventSequence_Event{f.LeaseReturned}, @@ -82,7 +84,7 @@ func TestConvertSequence(t *testing.T) { JobID: f.JobIdString, Error: protoutil.MustMarshallAndCompress(f.LeaseReturned.GetJobRunErrors().Errors[0], compressor), }}, - MarkRunsFailed{f.RunIdUuid: &JobRunFailed{LeaseReturned: true, RunAttempted: true}}, + MarkRunsFailed{f.RunIdUuid: &JobRunFailed{LeaseReturned: true, RunAttempted: true, FailureTime: f.BaseTime}}, }, }, "job failed": { @@ -93,7 +95,7 @@ func TestConvertSequence(t *testing.T) { JobID: f.JobIdString, Error: protoutil.MustMarshallAndCompress(f.JobRunFailed.GetJobRunErrors().Errors[0], compressor), }}, - MarkRunsFailed{f.RunIdUuid: &JobRunFailed{LeaseReturned: false, RunAttempted: true}}, + MarkRunsFailed{f.RunIdUuid: &JobRunFailed{LeaseReturned: false, RunAttempted: true, FailureTime: f.BaseTime}}, }, }, "job errors terminal": { @@ -153,7 +155,7 @@ func TestConvertSequence(t *testing.T) { "JobCancelled": { events: []*armadaevents.EventSequence_Event{f.JobCancelled}, expected: []DbOperation{ - MarkJobsCancelled{f.JobIdString: true}, + MarkJobsCancelled{f.JobIdString: f.BaseTime}, }, }, "JobRequeued": { @@ -185,14 +187,24 @@ func TestConvertSequence(t *testing.T) { events: []*armadaevents.EventSequence_Event{f.JobSetCancelRequested, f.Running, f.JobSucceeded}, expected: []DbOperation{ MarkJobSetsCancelRequested{JobSetKey{queue: f.Queue, jobSet: f.JobSetName}: &JobSetCancelAction{cancelQueued: true, cancelLeased: true}}, - MarkRunsRunning{f.RunIdUuid: true}, + MarkRunsRunning{f.RunIdUuid: f.BaseTime}, MarkJobsSucceeded{f.JobIdString: true}, }, }, + "multiple events - multiple timestamps": { + events: multipleEventsMultipleTimeStamps(), + expected: []DbOperation{ + MarkJobsCancelled{f.JobIdString: f.BaseTime}, + MarkRunsSucceeded{f.RunIdUuid: f.BaseTime}, + MarkRunsRunning{f.RunIdUuid: f.BaseTime}, + MarkJobsCancelled{f.JobIdString: f.BaseTime.Add(time.Hour)}, + MarkRunsSucceeded{f.RunIdUuid: f.BaseTime.Add(time.Hour)}, + }, + }, "ignored events": { events: []*armadaevents.EventSequence_Event{f.Running, f.JobPreempted, f.JobSucceeded}, expected: []DbOperation{ - MarkRunsRunning{f.RunIdUuid: true}, + MarkRunsRunning{f.RunIdUuid: f.BaseTime}, MarkJobsSucceeded{f.JobIdString: true}, }, }, @@ -306,3 +318,13 @@ func getExpectedSubmitMessageSchedulingInfo(t *testing.T) *schedulerobjects.JobS } return expectedSubmitSchedulingInfo } + +func multipleEventsMultipleTimeStamps() []*armadaevents.EventSequence_Event { + events := []*armadaevents.EventSequence_Event{f.JobCancelled, f.JobRunSucceeded, f.Running} + created := f.BaseTime.Add(time.Hour) + anotherCancelled, _ := f.DeepCopy(f.JobCancelled) + anotherSucceeded, _ := f.DeepCopy(f.JobRunSucceeded) + anotherCancelled.Created = &created + anotherSucceeded.Created = &created + return append(events, anotherCancelled, anotherSucceeded) +} diff --git a/internal/scheduleringester/schedulerdb.go b/internal/scheduleringester/schedulerdb.go index 058f0f4778b..b57db915b18 100644 --- a/internal/scheduleringester/schedulerdb.go +++ b/internal/scheduleringester/schedulerdb.go @@ -1,6 +1,7 @@ package scheduleringester import ( + "fmt" "time" "github.com/google/uuid" @@ -180,12 +181,18 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper } case MarkJobsCancelled: jobIds := maps.Keys(o) - err := queries.MarkJobsCancelledById(ctx, jobIds) - if err != nil { + if err := queries.MarkJobsCancelledById(ctx, jobIds); err != nil { return errors.WithStack(err) } - err = queries.MarkRunsCancelledByJobId(ctx, jobIds) - if err != nil { + cancelTimes := make([]interface{}, 0, len(jobIds)) + canceled := make([]bool, 0, len(jobIds)) + for _, jobId := range jobIds { + cancelTimes = append(cancelTimes, o[jobId]) + canceled = append(canceled, true) + } + sqlStmt := multiColumnRunsUpdateStmt("job_id", "cancelled", "terminated_timestamp") + // order of arguments is important. See multiColumnRunsUpdateStmt function for details + if _, err := tx.Exec(ctx, sqlStmt, jobIds, canceled, cancelTimes); err != nil { return errors.WithStack(err) } case MarkJobsSucceeded: @@ -213,16 +220,29 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper } } case MarkRunsSucceeded: - runIds := maps.Keys(o) - err := queries.MarkJobRunsSucceededById(ctx, runIds) - if err != nil { + successTimes := make([]interface{}, 0, len(o)) + succeeded := make([]bool, 0, len(o)) + runIds := make([]uuid.UUID, 0, len(o)) + for runId, successTime := range o { + successTimes = append(successTimes, successTime) + runIds = append(runIds, runId) + succeeded = append(succeeded, true) + } + sqlStmt := multiColumnRunsUpdateStmt("run_id", "succeeded", "terminated_timestamp") + // order of arguments is important. See multiColumnRunsUpdateStmt function for details + if _, err := tx.Exec(ctx, sqlStmt, runIds, succeeded, successTimes); err != nil { return errors.WithStack(err) } case MarkRunsFailed: - runIds := maps.Keys(o) - returned := make([]uuid.UUID, 0, len(runIds)) - runAttempted := make([]uuid.UUID, 0, len(runIds)) + runIds := make([]uuid.UUID, 0, len(o)) + failTimes := make([]interface{}, 0, len(o)) + failed := make([]bool, 0, len(o)) + returned := make([]uuid.UUID, 0) + runAttempted := make([]uuid.UUID, 0) for k, v := range o { + runIds = append(runIds, k) + failTimes = append(failTimes, v.FailureTime) + failed = append(failed, true) if v.LeaseReturned { returned = append(returned, k) } @@ -230,22 +250,30 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper runAttempted = append(runAttempted, k) } } - err := queries.MarkJobRunsFailedById(ctx, runIds) - if err != nil { + sqlStmt := multiColumnRunsUpdateStmt("run_id", "failed", "terminated_timestamp") + // order of arguments is important. See multiColumnRunsUpdateStmt function for details + if _, err := tx.Exec(ctx, sqlStmt, runIds, failed, failTimes); err != nil { return errors.WithStack(err) } - err = queries.MarkJobRunsReturnedById(ctx, returned) - if err != nil { + + if err := queries.MarkJobRunsReturnedById(ctx, returned); err != nil { return errors.WithStack(err) } - err = queries.MarkJobRunsAttemptedById(ctx, runAttempted) - if err != nil { + if err := queries.MarkJobRunsAttemptedById(ctx, runAttempted); err != nil { return errors.WithStack(err) } case MarkRunsRunning: - runIds := maps.Keys(o) - err := queries.MarkJobRunsRunningById(ctx, runIds) - if err != nil { + runIds := make([]uuid.UUID, 0, len(o)) + runningTimes := make([]interface{}, 0, len(runIds)) + running := make([]bool, 0, len(runIds)) + for runId, failTime := range o { + runIds = append(runIds, runId) + runningTimes = append(runningTimes, failTime) + running = append(running, true) + } + sqlStmt := multiColumnRunsUpdateStmt("run_id", "running", "running_timestamp") + // order of arguments is important. See multiColumnRunsUpdateStmt function for details + if _, err := tx.Exec(ctx, sqlStmt, runIds, running, runningTimes); err != nil { return errors.WithStack(err) } case InsertJobRunErrors: @@ -289,3 +317,17 @@ func execBatch(ctx *armadacontext.Context, tx pgx.Tx, batch *pgx.Batch) error { } return nil } + +func multiColumnRunsUpdateStmt(id, phaseColumn, timeStampColumn string) string { + idPgType := "uuid" + if id == "job_id" { + idPgType = "text" + } + return fmt.Sprintf(`update runs set + %[2]v = runs_temp.%[2]v, + %[3]v = runs_temp.%[3]v + from (select * from unnest($1::%[4]v[], $2::boolean[] ,$3::timestamptz[])) + as runs_temp(%[1]v, %[2]v, %[3]v) + where runs.%[1]v = runs_temp.%[1]v;`, + id, phaseColumn, timeStampColumn, idPgType) +} diff --git a/internal/scheduleringester/schedulerdb_test.go b/internal/scheduleringester/schedulerdb_test.go index 6789aea5bc9..7adfa98a7ef 100644 --- a/internal/scheduleringester/schedulerdb_test.go +++ b/internal/scheduleringester/schedulerdb_test.go @@ -15,6 +15,7 @@ import ( "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/ingest/metrics" + "github.com/armadaproject/armada/internal/common/ingest/testfixtures" "github.com/armadaproject/armada/internal/common/util" schedulerdb "github.com/armadaproject/armada/internal/scheduler/database" ) @@ -140,8 +141,8 @@ func TestWriteOps(t *testing.T) { runIds[3]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[3], RunID: runIds[3]}}, }, MarkJobsCancelled{ - jobIds[0]: true, - jobIds[1]: true, + jobIds[0]: testfixtures.BaseTime, + jobIds[1]: testfixtures.BaseTime.Add(time.Hour), }, }}, "MarkJobsSucceeded": {Ops: []DbOperation{ @@ -182,8 +183,8 @@ func TestWriteOps(t *testing.T) { runIds[3]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[3], RunID: runIds[3]}}, }, MarkRunsSucceeded{ - runIds[0]: true, - runIds[1]: true, + runIds[0]: testfixtures.BaseTime, + runIds[1]: testfixtures.BaseTime.Add(time.Hour), }, }}, "UpdateJobSchedulingInfo": {Ops: []DbOperation{ @@ -230,9 +231,9 @@ func TestWriteOps(t *testing.T) { runIds[3]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[3], RunID: runIds[3]}}, }, MarkRunsFailed{ - runIds[0]: &JobRunFailed{LeaseReturned: true}, - runIds[1]: &JobRunFailed{LeaseReturned: true, RunAttempted: true}, - runIds[2]: &JobRunFailed{LeaseReturned: false}, + runIds[0]: &JobRunFailed{LeaseReturned: true, FailureTime: testfixtures.BaseTime}, + runIds[1]: &JobRunFailed{LeaseReturned: true, RunAttempted: true, FailureTime: testfixtures.BaseTime.Add(time.Hour)}, + runIds[2]: &JobRunFailed{LeaseReturned: false, FailureTime: testfixtures.BaseTime}, }, }}, "MarkRunsRunning": {Ops: []DbOperation{ @@ -249,8 +250,8 @@ func TestWriteOps(t *testing.T) { runIds[3]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[3], RunID: runIds[3]}}, }, MarkRunsRunning{ - runIds[0]: true, - runIds[1]: true, + runIds[0]: testfixtures.BaseTime, + runIds[1]: testfixtures.BaseTime.Add(time.Hour), }, }}, "Insert PositionMarkers": {Ops: []DbOperation{ @@ -478,6 +479,7 @@ func assertOpSuccess(t *testing.T, schedulerDb *SchedulerDb, serials map[string] for _, run := range runs { if _, ok := expected[run.JobID]; ok { assert.True(t, run.Cancelled) + assert.Equal(t, expected[run.JobID], run.TerminatedTimestamp.UTC()) runsChanged++ } } @@ -542,6 +544,7 @@ func assertOpSuccess(t *testing.T, schedulerDb *SchedulerDb, serials map[string] for _, run := range runs { if _, ok := expected[run.RunID]; ok { assert.True(t, run.Succeeded) + assert.Equal(t, expected[run.RunID], run.TerminatedTimestamp.UTC()) numChanged++ } } @@ -569,6 +572,7 @@ func assertOpSuccess(t *testing.T, schedulerDb *SchedulerDb, serials map[string] assert.True(t, run.Failed) assert.Equal(t, expectedRun.LeaseReturned, run.Returned) assert.Equal(t, expectedRun.RunAttempted, run.RunAttempted) + assert.Equal(t, expectedRun.FailureTime, run.TerminatedTimestamp.UTC()) numChanged++ } } @@ -594,6 +598,7 @@ func assertOpSuccess(t *testing.T, schedulerDb *SchedulerDb, serials map[string] for _, run := range runs { if _, ok := expected[run.RunID]; ok { assert.True(t, run.Running) + assert.Equal(t, expected[run.RunID], run.RunningTimestamp.UTC()) numChanged++ } }