Skip to content

Commit

Permalink
Better terminal states reconcilation (#3404)
Browse files Browse the repository at this point in the history
* better terminal states reconcilation

Signed-off-by: Mohamed Abdelfatah <[email protected]>

* return early

Signed-off-by: Mohamed Abdelfatah <[email protected]>

---------

Signed-off-by: Mohamed Abdelfatah <[email protected]>
Co-authored-by: Albin Severinson <[email protected]>
  • Loading branch information
Mo-Fatah and severinson authored Feb 16, 2024
1 parent 4f655e3 commit 23ede39
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions internal/scheduler/jobdb/reconciliation.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,22 +183,18 @@ func (jobDb *JobDb) reconcileRunDifferences(jobRun *JobRun, jobRepoRun *database
rst.Running = true
}
if jobRepoRun.Preempted && !jobRun.Preempted() {
jobRun = jobRun.WithoutTerminal()
jobRun = jobRun.WithPreempted(true).WithRunning(false).WithPreemptedTime(jobRepoRun.PreemptedTimestamp)
rst.Preempted = true
}
if jobRepoRun.Cancelled && !jobRun.Cancelled() {
jobRun = jobRun.WithoutTerminal()
jobRun = jobRun.WithCancelled(true).WithRunning(false).WithTerminatedTime(jobRepoRun.TerminatedTimestamp)
rst.Cancelled = true
}
if jobRepoRun.Failed && !jobRun.Failed() {
jobRun = jobRun.WithoutTerminal()
jobRun = jobRun.WithFailed(true).WithRunning(false).WithTerminatedTime(jobRepoRun.TerminatedTimestamp)
rst.Failed = true
}
if jobRepoRun.Succeeded && !jobRun.Succeeded() {
jobRun = jobRun.WithoutTerminal()
jobRun = jobRun.WithSucceeded(true).WithRunning(false).WithTerminatedTime(jobRepoRun.TerminatedTimestamp)
rst.Succeeded = true
}
Expand All @@ -210,9 +206,32 @@ func (jobDb *JobDb) reconcileRunDifferences(jobRun *JobRun, jobRepoRun *database
jobRun = jobRun.WithAttempted(true)
}
}
jobRun = jobDb.enforceTerminalStateExclusivity(jobRun, &rst)
return
}

// enforceTerminalStateExclusivity ensures that a job run has a single terminal state regardless of what the database reports.
// terminal states are: preempted, cancelled, failed, and succeeded.
func (jobDb *JobDb) enforceTerminalStateExclusivity(jobRun *JobRun, rst *RunStateTransitions) *JobRun {
if jobRun.Succeeded() {
rst.Preempted, rst.Cancelled, rst.Failed, rst.Succeeded = false, false, false, true
return jobRun.WithoutTerminal().WithSucceeded(true)
}
if jobRun.Failed() {
rst.Preempted, rst.Cancelled, rst.Succeeded, rst.Failed = false, false, false, true
return jobRun.WithoutTerminal().WithFailed(true)
}
if jobRun.Cancelled() {
rst.Preempted, rst.Failed, rst.Succeeded, rst.Cancelled = false, false, false, true
return jobRun.WithoutTerminal().WithCancelled(true)
}
if jobRun.Preempted() {
rst.Cancelled, rst.Failed, rst.Succeeded, rst.Preempted = false, false, false, true
return jobRun.WithoutTerminal().WithPreempted(true)
}
return jobRun
}

// schedulerJobFromDatabaseJob creates a new scheduler job from a database job.
func (jobDb *JobDb) schedulerJobFromDatabaseJob(dbJob *database.Job) (*Job, error) {
schedulingInfo := &schedulerobjects.JobSchedulingInfo{}
Expand Down

0 comments on commit 23ede39

Please sign in to comment.