From 23ede39ba71c27af3ab85c966995ea23f7bf8843 Mon Sep 17 00:00:00 2001 From: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com> Date: Fri, 16 Feb 2024 18:42:43 +0200 Subject: [PATCH] Better terminal states reconcilation (#3404) * better terminal states reconcilation Signed-off-by: Mohamed Abdelfatah * return early Signed-off-by: Mohamed Abdelfatah --------- Signed-off-by: Mohamed Abdelfatah Co-authored-by: Albin Severinson --- internal/scheduler/jobdb/reconciliation.go | 27 ++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/internal/scheduler/jobdb/reconciliation.go b/internal/scheduler/jobdb/reconciliation.go index bacbb135518..69ff5b38bec 100644 --- a/internal/scheduler/jobdb/reconciliation.go +++ b/internal/scheduler/jobdb/reconciliation.go @@ -183,22 +183,18 @@ func (jobDb *JobDb) reconcileRunDifferences(jobRun *JobRun, jobRepoRun *database rst.Running = true } if jobRepoRun.Preempted && !jobRun.Preempted() { - jobRun = jobRun.WithoutTerminal() jobRun = jobRun.WithPreempted(true).WithRunning(false).WithPreemptedTime(jobRepoRun.PreemptedTimestamp) rst.Preempted = true } if jobRepoRun.Cancelled && !jobRun.Cancelled() { - jobRun = jobRun.WithoutTerminal() jobRun = jobRun.WithCancelled(true).WithRunning(false).WithTerminatedTime(jobRepoRun.TerminatedTimestamp) rst.Cancelled = true } if jobRepoRun.Failed && !jobRun.Failed() { - jobRun = jobRun.WithoutTerminal() jobRun = jobRun.WithFailed(true).WithRunning(false).WithTerminatedTime(jobRepoRun.TerminatedTimestamp) rst.Failed = true } if jobRepoRun.Succeeded && !jobRun.Succeeded() { - jobRun = jobRun.WithoutTerminal() jobRun = jobRun.WithSucceeded(true).WithRunning(false).WithTerminatedTime(jobRepoRun.TerminatedTimestamp) rst.Succeeded = true } @@ -210,9 +206,32 @@ func (jobDb *JobDb) reconcileRunDifferences(jobRun *JobRun, jobRepoRun *database jobRun = jobRun.WithAttempted(true) } } + jobRun = jobDb.enforceTerminalStateExclusivity(jobRun, &rst) return } +// enforceTerminalStateExclusivity ensures that a job run has a single terminal state regardless of what the database reports. +// terminal states are: preempted, cancelled, failed, and succeeded. +func (jobDb *JobDb) enforceTerminalStateExclusivity(jobRun *JobRun, rst *RunStateTransitions) *JobRun { + if jobRun.Succeeded() { + rst.Preempted, rst.Cancelled, rst.Failed, rst.Succeeded = false, false, false, true + return jobRun.WithoutTerminal().WithSucceeded(true) + } + if jobRun.Failed() { + rst.Preempted, rst.Cancelled, rst.Succeeded, rst.Failed = false, false, false, true + return jobRun.WithoutTerminal().WithFailed(true) + } + if jobRun.Cancelled() { + rst.Preempted, rst.Failed, rst.Succeeded, rst.Cancelled = false, false, false, true + return jobRun.WithoutTerminal().WithCancelled(true) + } + if jobRun.Preempted() { + rst.Cancelled, rst.Failed, rst.Succeeded, rst.Preempted = false, false, false, true + return jobRun.WithoutTerminal().WithPreempted(true) + } + return jobRun +} + // schedulerJobFromDatabaseJob creates a new scheduler job from a database job. func (jobDb *JobDb) schedulerJobFromDatabaseJob(dbJob *database.Job) (*Job, error) { schedulingInfo := &schedulerobjects.JobSchedulingInfo{}