Skip to content

Commit

Permalink
- Add running, failed, leased and success timestamps (#3054)
Browse files Browse the repository at this point in the history
- Add cancel timestamps

- Embed timestamps update with status update

- Switch to update...from syntax for timestamps queries

- Update unit tests

- Test: generate multiple events

Signed-off-by: mohamed <[email protected]>
  • Loading branch information
Mo-Fatah authored Jan 16, 2024
1 parent 7dd31a1 commit 6c6870d
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 61 deletions.
9 changes: 6 additions & 3 deletions internal/scheduleringester/dbops.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package scheduleringester

import (
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/google/uuid"
"golang.org/x/exp/maps"
Expand All @@ -22,6 +24,7 @@ func (d *DbOperationsWithMessageIds) GetMessageIDs() []pulsar.MessageID {
type JobRunFailed struct {
LeaseReturned bool
RunAttempted bool
FailureTime time.Time
}

type JobSchedulingInfoUpdate struct {
Expand Down Expand Up @@ -119,15 +122,15 @@ type (
UpdateJobSetPriorities map[JobSetKey]int64
MarkJobSetsCancelRequested map[JobSetKey]*JobSetCancelAction
MarkJobsCancelRequested map[string]bool
MarkJobsCancelled map[string]bool
MarkJobsCancelled map[string]time.Time
MarkJobsSucceeded map[string]bool
MarkJobsFailed map[string]bool
UpdateJobPriorities map[string]int64
UpdateJobSchedulingInfo map[string]*JobSchedulingInfoUpdate
UpdateJobQueuedState map[string]*JobQueuedStateUpdate
MarkRunsSucceeded map[uuid.UUID]bool
MarkRunsSucceeded map[uuid.UUID]time.Time
MarkRunsFailed map[uuid.UUID]*JobRunFailed
MarkRunsRunning map[uuid.UUID]bool
MarkRunsRunning map[uuid.UUID]time.Time
InsertJobRunErrors map[uuid.UUID]*schedulerdb.JobRunError
InsertPartitionMarker struct {
markers []*schedulerdb.Marker
Expand Down
21 changes: 11 additions & 10 deletions internal/scheduleringester/dbops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduleringester
import (
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -171,37 +172,37 @@ func TestDbOperationOptimisation(t *testing.T) {
}},
"MarkJobsCancelled": {N: 2, Ops: []DbOperation{
InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1
MarkJobsCancelled{jobIds[0]: true}, // 2
MarkJobsCancelled{jobIds[0]: time.Time{}}, // 2
InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 2
MarkJobsCancelled{jobIds[1]: true}, // 2
MarkJobsCancelled{jobIds[1]: time.Time{}}, // 2
InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 2
}},
"MarkRunsSucceeded": {N: 3, Ops: []DbOperation{
InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1
InsertRuns{runIds[0]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}}, // 2
MarkRunsSucceeded{runIds[0]: true}, // 3
MarkRunsSucceeded{runIds[0]: time.Time{}}, // 3
InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 3
InsertRuns{runIds[1]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[1]}}}, // 3
MarkRunsSucceeded{runIds[1]: true}, // 3
MarkRunsSucceeded{runIds[1]: time.Time{}}, // 3
InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 3
}},
"MarkRunsFailed": {N: 3, Ops: []DbOperation{
InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1
InsertRuns{runIds[0]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}}, // 2
MarkRunsFailed{runIds[0]: &JobRunFailed{true, true}}, // 3
MarkRunsFailed{runIds[0]: &JobRunFailed{true, true, time.Time{}}}, // 3
InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 3
InsertRuns{runIds[1]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[1]}}}, // 3
MarkRunsFailed{runIds[1]: &JobRunFailed{true, true}}, // 3
MarkRunsFailed{runIds[1]: &JobRunFailed{true, true, time.Time{}}}, // 3
InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 3
}},
"MarkRunsRunning": {N: 3, Ops: []DbOperation{
InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1
InsertRuns{runIds[0]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}}, // 2
MarkRunsRunning{runIds[0]: true}, // 3
InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 3
MarkRunsRunning{runIds[0]: time.Time{}}, // 3
InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 3
InsertRuns{runIds[1]: &JobRunDetails{queue: testQueueName, dbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[1]}}}, // 3
MarkRunsRunning{runIds[1]: true}, // 3
InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 3
MarkRunsRunning{runIds[1]: time.Time{}}, // 3
InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 3
}},
"InsertPartitionMarker": {N: 2, Ops: []DbOperation{
InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1
Expand Down
28 changes: 15 additions & 13 deletions internal/scheduleringester/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ func (c *InstructionConverter) dbOperationsFromEventSequence(es *armadaevents.Ev
case *armadaevents.EventSequence_Event_SubmitJob:
operationsFromEvent, err = c.handleSubmitJob(event.GetSubmitJob(), eventTime, meta)
case *armadaevents.EventSequence_Event_JobRunLeased:
operationsFromEvent, err = c.handleJobRunLeased(event.GetJobRunLeased(), meta)
operationsFromEvent, err = c.handleJobRunLeased(event.GetJobRunLeased(), eventTime, meta)
case *armadaevents.EventSequence_Event_JobRunRunning:
operationsFromEvent, err = c.handleJobRunRunning(event.GetJobRunRunning())
operationsFromEvent, err = c.handleJobRunRunning(event.GetJobRunRunning(), eventTime)
case *armadaevents.EventSequence_Event_JobRunSucceeded:
operationsFromEvent, err = c.handleJobRunSucceeded(event.GetJobRunSucceeded())
operationsFromEvent, err = c.handleJobRunSucceeded(event.GetJobRunSucceeded(), eventTime)
case *armadaevents.EventSequence_Event_JobRunErrors:
operationsFromEvent, err = c.handleJobRunErrors(event.GetJobRunErrors())
operationsFromEvent, err = c.handleJobRunErrors(event.GetJobRunErrors(), eventTime)
case *armadaevents.EventSequence_Event_JobSucceeded:
operationsFromEvent, err = c.handleJobSucceeded(event.GetJobSucceeded())
case *armadaevents.EventSequence_Event_JobErrors:
Expand All @@ -98,7 +98,7 @@ func (c *InstructionConverter) dbOperationsFromEventSequence(es *armadaevents.Ev
case *armadaevents.EventSequence_Event_CancelJobSet:
operationsFromEvent, err = c.handleCancelJobSet(event.GetCancelJobSet(), meta)
case *armadaevents.EventSequence_Event_CancelledJob:
operationsFromEvent, err = c.handleCancelledJob(event.GetCancelledJob())
operationsFromEvent, err = c.handleCancelledJob(event.GetCancelledJob(), eventTime)
case *armadaevents.EventSequence_Event_JobRequeued:
operationsFromEvent, err = c.handleJobRequeued(event.GetJobRequeued())
case *armadaevents.EventSequence_Event_PartitionMarker:
Expand Down Expand Up @@ -177,7 +177,7 @@ func (c *InstructionConverter) handleSubmitJob(job *armadaevents.SubmitJob, subm
}}}, nil
}

func (c *InstructionConverter) handleJobRunLeased(jobRunLeased *armadaevents.JobRunLeased, meta eventSequenceCommon) ([]DbOperation, error) {
func (c *InstructionConverter) handleJobRunLeased(jobRunLeased *armadaevents.JobRunLeased, leaseTime time.Time, meta eventSequenceCommon) ([]DbOperation, error) {
runId := armadaevents.UuidFromProtoUuid(jobRunLeased.GetRunId())
jobId, err := armadaevents.UlidStringFromProtoUuid(jobRunLeased.GetJobId())
if err != nil {
Expand All @@ -197,6 +197,7 @@ func (c *InstructionConverter) handleJobRunLeased(jobRunLeased *armadaevents.Job
Executor: jobRunLeased.GetExecutorId(),
Node: jobRunLeased.GetNodeId(),
ScheduledAtPriority: scheduledAtPriority,
LeasedTimestamp: &leaseTime,
},
}},
UpdateJobQueuedState{jobId: &JobQueuedStateUpdate{
Expand Down Expand Up @@ -227,17 +228,17 @@ func (c *InstructionConverter) handleJobRequeued(jobRequeued *armadaevents.JobRe
}, nil
}

func (c *InstructionConverter) handleJobRunRunning(jobRunRunning *armadaevents.JobRunRunning) ([]DbOperation, error) {
func (c *InstructionConverter) handleJobRunRunning(jobRunRunning *armadaevents.JobRunRunning, runningTime time.Time) ([]DbOperation, error) {
runId := armadaevents.UuidFromProtoUuid(jobRunRunning.GetRunId())
return []DbOperation{MarkRunsRunning{runId: true}}, nil
return []DbOperation{MarkRunsRunning{runId: runningTime}}, nil
}

func (c *InstructionConverter) handleJobRunSucceeded(jobRunSucceeded *armadaevents.JobRunSucceeded) ([]DbOperation, error) {
func (c *InstructionConverter) handleJobRunSucceeded(jobRunSucceeded *armadaevents.JobRunSucceeded, successTime time.Time) ([]DbOperation, error) {
runId := armadaevents.UuidFromProtoUuid(jobRunSucceeded.GetRunId())
return []DbOperation{MarkRunsSucceeded{runId: true}}, nil
return []DbOperation{MarkRunsSucceeded{runId: successTime}}, nil
}

func (c *InstructionConverter) handleJobRunErrors(jobRunErrors *armadaevents.JobRunErrors) ([]DbOperation, error) {
func (c *InstructionConverter) handleJobRunErrors(jobRunErrors *armadaevents.JobRunErrors, failureTime time.Time) ([]DbOperation, error) {
runId := armadaevents.UuidFromProtoUuid(jobRunErrors.GetRunId())
jobId, err := armadaevents.UlidStringFromProtoUuid(jobRunErrors.JobId)
if err != nil {
Expand All @@ -264,6 +265,7 @@ func (c *InstructionConverter) handleJobRunErrors(jobRunErrors *armadaevents.Job
markRunsFailed[runId] = &JobRunFailed{
LeaseReturned: runError.GetPodLeaseReturned() != nil,
RunAttempted: runAttempted,
FailureTime: failureTime,
}
return []DbOperation{insertJobRunErrors, markRunsFailed}, nil
}
Expand Down Expand Up @@ -338,13 +340,13 @@ func (c *InstructionConverter) handleCancelJobSet(cancelJobSet *armadaevents.Can
}}, nil
}

func (c *InstructionConverter) handleCancelledJob(cancelledJob *armadaevents.CancelledJob) ([]DbOperation, error) {
func (c *InstructionConverter) handleCancelledJob(cancelledJob *armadaevents.CancelledJob, cancelTime time.Time) ([]DbOperation, error) {
jobId, err := armadaevents.UlidStringFromProtoUuid(cancelledJob.GetJobId())
if err != nil {
return nil, err
}
return []DbOperation{MarkJobsCancelled{
jobId: true,
jobId: cancelTime,
}}, nil
}

Expand Down
36 changes: 29 additions & 7 deletions internal/scheduleringester/instructions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduleringester
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -59,6 +60,7 @@ func TestConvertSequence(t *testing.T) {
Executor: f.ExecutorId,
Node: f.NodeName,
ScheduledAtPriority: &f.ScheduledAtPriority,
LeasedTimestamp: &f.BaseTime,
}}},
UpdateJobQueuedState{f.JobIdString: &JobQueuedStateUpdate{
Queued: false,
Expand All @@ -68,11 +70,11 @@ func TestConvertSequence(t *testing.T) {
},
"job run running": {
events: []*armadaevents.EventSequence_Event{f.Running},
expected: []DbOperation{MarkRunsRunning{f.RunIdUuid: true}},
expected: []DbOperation{MarkRunsRunning{f.RunIdUuid: f.BaseTime}},
},
"job run succeeded": {
events: []*armadaevents.EventSequence_Event{f.JobRunSucceeded},
expected: []DbOperation{MarkRunsSucceeded{f.RunIdUuid: true}},
expected: []DbOperation{MarkRunsSucceeded{f.RunIdUuid: f.BaseTime}},
},
"lease returned": {
events: []*armadaevents.EventSequence_Event{f.LeaseReturned},
Expand All @@ -82,7 +84,7 @@ func TestConvertSequence(t *testing.T) {
JobID: f.JobIdString,
Error: protoutil.MustMarshallAndCompress(f.LeaseReturned.GetJobRunErrors().Errors[0], compressor),
}},
MarkRunsFailed{f.RunIdUuid: &JobRunFailed{LeaseReturned: true, RunAttempted: true}},
MarkRunsFailed{f.RunIdUuid: &JobRunFailed{LeaseReturned: true, RunAttempted: true, FailureTime: f.BaseTime}},
},
},
"job failed": {
Expand All @@ -93,7 +95,7 @@ func TestConvertSequence(t *testing.T) {
JobID: f.JobIdString,
Error: protoutil.MustMarshallAndCompress(f.JobRunFailed.GetJobRunErrors().Errors[0], compressor),
}},
MarkRunsFailed{f.RunIdUuid: &JobRunFailed{LeaseReturned: false, RunAttempted: true}},
MarkRunsFailed{f.RunIdUuid: &JobRunFailed{LeaseReturned: false, RunAttempted: true, FailureTime: f.BaseTime}},
},
},
"job errors terminal": {
Expand Down Expand Up @@ -153,7 +155,7 @@ func TestConvertSequence(t *testing.T) {
"JobCancelled": {
events: []*armadaevents.EventSequence_Event{f.JobCancelled},
expected: []DbOperation{
MarkJobsCancelled{f.JobIdString: true},
MarkJobsCancelled{f.JobIdString: f.BaseTime},
},
},
"JobRequeued": {
Expand Down Expand Up @@ -185,14 +187,24 @@ func TestConvertSequence(t *testing.T) {
events: []*armadaevents.EventSequence_Event{f.JobSetCancelRequested, f.Running, f.JobSucceeded},
expected: []DbOperation{
MarkJobSetsCancelRequested{JobSetKey{queue: f.Queue, jobSet: f.JobSetName}: &JobSetCancelAction{cancelQueued: true, cancelLeased: true}},
MarkRunsRunning{f.RunIdUuid: true},
MarkRunsRunning{f.RunIdUuid: f.BaseTime},
MarkJobsSucceeded{f.JobIdString: true},
},
},
"multiple events - multiple timestamps": {
events: multipleEventsMultipleTimeStamps(),
expected: []DbOperation{
MarkJobsCancelled{f.JobIdString: f.BaseTime},
MarkRunsSucceeded{f.RunIdUuid: f.BaseTime},
MarkRunsRunning{f.RunIdUuid: f.BaseTime},
MarkJobsCancelled{f.JobIdString: f.BaseTime.Add(time.Hour)},
MarkRunsSucceeded{f.RunIdUuid: f.BaseTime.Add(time.Hour)},
},
},
"ignored events": {
events: []*armadaevents.EventSequence_Event{f.Running, f.JobPreempted, f.JobSucceeded},
expected: []DbOperation{
MarkRunsRunning{f.RunIdUuid: true},
MarkRunsRunning{f.RunIdUuid: f.BaseTime},
MarkJobsSucceeded{f.JobIdString: true},
},
},
Expand Down Expand Up @@ -306,3 +318,13 @@ func getExpectedSubmitMessageSchedulingInfo(t *testing.T) *schedulerobjects.JobS
}
return expectedSubmitSchedulingInfo
}

func multipleEventsMultipleTimeStamps() []*armadaevents.EventSequence_Event {
events := []*armadaevents.EventSequence_Event{f.JobCancelled, f.JobRunSucceeded, f.Running}
created := f.BaseTime.Add(time.Hour)
anotherCancelled, _ := f.DeepCopy(f.JobCancelled)
anotherSucceeded, _ := f.DeepCopy(f.JobRunSucceeded)
anotherCancelled.Created = &created
anotherSucceeded.Created = &created
return append(events, anotherCancelled, anotherSucceeded)
}
Loading

0 comments on commit 6c6870d

Please sign in to comment.