From e1c79501077e5c4fac2fc11235f438d7f36fb010 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Tue, 14 Nov 2023 15:34:39 +0000 Subject: [PATCH] Move JobDb methods to Txn (#3097) * Move JobDb methods to Txn * Cleanup * Update interface definition * Fix test * Typo --- internal/scheduler/jobdb/job.go | 30 ++-- internal/scheduler/jobdb/job_test.go | 10 +- internal/scheduler/jobdb/jobdb.go | 185 ++++++++++----------- internal/scheduler/jobdb/jobdb_test.go | 84 +++++----- internal/scheduler/metrics.go | 4 +- internal/scheduler/metrics_test.go | 4 +- internal/scheduler/scheduler.go | 24 +-- internal/scheduler/scheduler_test.go | 22 +-- internal/scheduler/scheduling_algo.go | 27 ++- internal/scheduler/scheduling_algo_test.go | 16 +- internal/scheduler/simulator/simulator.go | 18 +- 11 files changed, 209 insertions(+), 215 deletions(-) diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index 4bb8e5304f4..53b6b75d209 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -21,7 +21,7 @@ type Job struct { queue string // Jobset the job belongs to. // We store this as it's needed for sending job event messages. - jobset string + jobSet string // Per-queue priority of this job. priority uint32 // Requested per queue priority of this job. @@ -43,8 +43,8 @@ type Job struct { priorityClass types.PriorityClass // True if the user has requested this job be cancelled cancelRequested bool - // True if the user has requested this job's jobset be cancelled - cancelByJobsetRequested bool + // True if the user has requested this job's jobSet be cancelled + cancelByJobSetRequested bool // True if the scheduler has cancelled the job cancelled bool // True if the scheduler has failed the job @@ -98,7 +98,7 @@ func (job *Job) Equal(other *Job) bool { if job.queue != other.queue { return false } - if job.jobset != other.jobset { + if job.jobSet != other.jobSet { return false } if job.priority != other.priority { @@ -132,7 +132,7 @@ func (job *Job) Equal(other *Job) bool { if job.cancelRequested != other.cancelRequested { return false } - if job.cancelByJobsetRequested != other.cancelByJobsetRequested { + if job.cancelByJobSetRequested != other.cancelByJobSetRequested { return false } if job.cancelled != other.cancelled { @@ -167,15 +167,15 @@ func (job *Job) GetId() string { return job.id } -// Jobset returns the jobset the job belongs to. +// Jobset returns the jobSet the job belongs to. func (job *Job) Jobset() string { - return job.jobset + return job.jobSet } -// GetJobSet returns the jobset the job belongs to. +// GetJobSet returns the jobSet the job belongs to. // This is needed for compatibility with legacyJob func (job *Job) GetJobSet() string { - return job.jobset + return job.jobSet } // Queue returns the queue this job belongs to. @@ -327,9 +327,9 @@ func (job *Job) CancelRequested() bool { return job.cancelRequested } -// CancelByJobsetRequested returns true if the user has requested this job's jobset be cancelled. +// CancelByJobsetRequested returns true if the user has requested this job's jobSet be cancelled. func (job *Job) CancelByJobsetRequested() bool { - return job.cancelByJobsetRequested + return job.cancelByJobSetRequested } // WithCancelRequested returns a copy of the job with the cancelRequested status updated. @@ -339,10 +339,10 @@ func (job *Job) WithCancelRequested(cancelRequested bool) *Job { return j } -// WithCancelByJobsetRequested returns a copy of the job with the cancelByJobsetRequested status updated. +// WithCancelByJobsetRequested returns a copy of the job with the cancelByJobSetRequested status updated. func (job *Job) WithCancelByJobsetRequested(cancelByJobsetRequested bool) *Job { j := copyJob(*job) - j.cancelByJobsetRequested = cancelByJobsetRequested + j.cancelByJobSetRequested = cancelByJobsetRequested return j } @@ -488,10 +488,10 @@ func (job *Job) HasQueueTtlSet() bool { return job.GetQueueTtlSeconds() > 0 } -// WithJobset returns a copy of the job with the jobset updated. +// WithJobset returns a copy of the job with the jobSet updated. func (job *Job) WithJobset(jobset string) *Job { j := copyJob(*job) - j.jobset = jobset + j.jobSet = jobset return j } diff --git a/internal/scheduler/jobdb/job_test.go b/internal/scheduler/jobdb/job_test.go index 3ef0b2dd145..e56e3153bc9 100644 --- a/internal/scheduler/jobdb/job_test.go +++ b/internal/scheduler/jobdb/job_test.go @@ -36,7 +36,7 @@ var jobDb = NewJobDb( var baseJob = jobDb.NewJob( "test-job", - "test-jobset", + "test-jobSet", "test-queue", 2, schedulingInfo, @@ -285,7 +285,7 @@ func TestJob_TestRunsById(t *testing.T) { func TestJob_TestWithJobset(t *testing.T) { newJob := baseJob.WithJobset("fish") - assert.Equal(t, "test-jobset", baseJob.Jobset()) + assert.Equal(t, "test-jobSet", baseJob.Jobset()) assert.Equal(t, "fish", newJob.Jobset()) } @@ -302,9 +302,9 @@ func TestJob_TestWithCreated(t *testing.T) { } func TestJob_DeepCopy(t *testing.T) { - original := jobDb.NewJob("test-job", "test-jobset", "test-queue", 2, schedulingInfo, true, 0, false, false, false, 3) + original := jobDb.NewJob("test-job", "test-jobSet", "test-queue", 2, schedulingInfo, true, 0, false, false, false, 3) original = original.WithUpdatedRun(baseJobRun.DeepCopy()) - expected := jobDb.NewJob("test-job", "test-jobset", "test-queue", 2, schedulingInfo, true, 0, false, false, false, 3) + expected := jobDb.NewJob("test-job", "test-jobSet", "test-queue", 2, schedulingInfo, true, 0, false, false, false, 3) expected = expected.WithUpdatedRun(baseJobRun.DeepCopy()) result := original.DeepCopy() @@ -356,7 +356,7 @@ func TestJobSchedulingInfoFieldsInitialised(t *testing.T) { assert.Nil(t, infoWithNilFields.GetPodRequirements().NodeSelector) assert.Nil(t, infoWithNilFields.GetPodRequirements().Annotations) - job := jobDb.NewJob("test-job", "test-jobset", "test-queue", 2, infoWithNilFieldsCopy, true, 0, false, false, false, 3) + job := jobDb.NewJob("test-job", "test-jobSet", "test-queue", 2, infoWithNilFieldsCopy, true, 0, false, false, false, 3) assert.NotNil(t, job.GetNodeSelector()) assert.NotNil(t, job.GetAnnotations()) diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index 442f2759d41..02f85fdc66f 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -65,14 +65,14 @@ func NewJobDbWithSchedulingKeyGenerator( // The new job is not automatically inserted into the jobDb; call jobDb.Upsert to upsert it. func (jobDb *JobDb) NewJob( jobId string, - jobset string, + jobSet string, queue string, priority uint32, schedulingInfo *schedulerobjects.JobSchedulingInfo, queued bool, queuedVersion int32, cancelRequested bool, - cancelByJobsetRequested bool, + cancelByJobSetRequested bool, cancelled bool, created int64, ) *Job { @@ -87,7 +87,7 @@ func (jobDb *JobDb) NewJob( job := &Job{ id: jobId, queue: queue, - jobset: jobset, + jobSet: jobSet, priority: priority, queued: queued, queuedVersion: queuedVersion, @@ -97,7 +97,7 @@ func (jobDb *JobDb) NewJob( jobSchedulingInfo: schedulingInfo, priorityClass: priorityClass, cancelRequested: cancelRequested, - cancelByJobsetRequested: cancelByJobsetRequested, + cancelByJobSetRequested: cancelByJobSetRequested, cancelled: cancelled, runsById: map[uuid.UUID]*JobRun{}, } @@ -105,10 +105,85 @@ func (jobDb *JobDb) NewJob( return job } +// ReadTxn returns a read-only transaction. +// Multiple read-only transactions can access the db concurrently +func (jobDb *JobDb) ReadTxn() *Txn { + jobDb.copyMutex.Lock() + defer jobDb.copyMutex.Unlock() + return &Txn{ + readOnly: true, + jobsById: jobDb.jobsById, + jobsByRunId: jobDb.jobsByRunId, + jobsByQueue: jobDb.jobsByQueue, + queuedJobsByTtl: jobDb.queuedJobsByTtl, + active: true, + jobDb: jobDb, + } +} + +// WriteTxn returns a writeable transaction. +// Only a single write transaction may access the db at any given time so note that this function will block until +// any outstanding write transactions have been committed or aborted +func (jobDb *JobDb) WriteTxn() *Txn { + jobDb.writerMutex.Lock() + jobDb.copyMutex.Lock() + defer jobDb.copyMutex.Unlock() + return &Txn{ + readOnly: false, + jobsById: jobDb.jobsById, + jobsByRunId: jobDb.jobsByRunId, + jobsByQueue: maps.Clone(jobDb.jobsByQueue), + queuedJobsByTtl: jobDb.queuedJobsByTtl, + active: true, + jobDb: jobDb, + } +} + +// Txn is a JobDb Transaction. Transactions provide a consistent view of the database, allowing readers to +// perform multiple actions without the database changing from underneath them. +// Write transactions also allow callers to perform write operations that will not be visible to other users +// until the transaction is committed. +type Txn struct { + readOnly bool + // Map from job ids to jobs. + jobsById *immutable.Map[string, *Job] + // Map from run ids to jobs. + // Note that a job may have multiple runs, i.e., the mapping is many-to-one. + jobsByRunId *immutable.Map[uuid.UUID, string] + // Queued jobs for each queue. Stored in the order in which they should be scheduled. + jobsByQueue map[string]immutable.SortedSet[*Job] + // Queued jobs for each queue ordered by remaining time-to-live. + // TODO: The ordering is wrong. Since we call time.Now() in the compare function. + queuedJobsByTtl *immutable.SortedSet[*Job] + jobDb *JobDb + active bool +} + +func (txn *Txn) Commit() { + if txn.readOnly || !txn.active { + return + } + txn.jobDb.copyMutex.Lock() + defer txn.jobDb.copyMutex.Unlock() + defer txn.jobDb.writerMutex.Unlock() + txn.jobDb.jobsById = txn.jobsById + txn.jobDb.jobsByRunId = txn.jobsByRunId + txn.jobDb.jobsByQueue = txn.jobsByQueue + txn.jobDb.queuedJobsByTtl = txn.queuedJobsByTtl + txn.active = false +} + +func (txn *Txn) Abort() { + if txn.readOnly || !txn.active { + return + } + txn.active = false + txn.jobDb.writerMutex.Unlock() +} + // Upsert will insert the given jobs if they don't already exist or update them if they do. -// TODO: This doesn't need to be a function on jobDb. Only need txn. -func (jobDb *JobDb) Upsert(txn *Txn, jobs []*Job) error { - if err := jobDb.checkWritableTransaction(txn); err != nil { +func (txn *Txn) Upsert(jobs []*Job) error { + if err := txn.checkWritableTransaction(); err != nil { return err } @@ -199,20 +274,20 @@ func (jobDb *JobDb) Upsert(txn *Txn, jobs []*Job) error { // GetById returns the job with the given Id or nil if no such job exists // The Job returned by this function *must not* be subsequently modified -func (jobDb *JobDb) GetById(txn *Txn, id string) *Job { +func (txn *Txn) GetById(id string) *Job { j, _ := txn.jobsById.Get(id) return j } // GetByRunId returns the job with the given run id or nil if no such job exists // The Job returned by this function *must not* be subsequently modified -func (jobDb *JobDb) GetByRunId(txn *Txn, runId uuid.UUID) *Job { +func (txn *Txn) GetByRunId(runId uuid.UUID) *Job { jobId, _ := txn.jobsByRunId.Get(runId) - return jobDb.GetById(txn, jobId) + return txn.GetById(jobId) } // HasQueuedJobs returns true if the queue has any jobs in the running state or false otherwise -func (jobDb *JobDb) HasQueuedJobs(txn *Txn, queue string) bool { +func (txn *Txn) HasQueuedJobs(queue string) bool { queuedJobs, ok := txn.jobsByQueue[queue] if !ok { return false @@ -221,7 +296,7 @@ func (jobDb *JobDb) HasQueuedJobs(txn *Txn, queue string) bool { } // QueuedJobs returns true if the queue has any jobs in the running state or false otherwise -func (jobDb *JobDb) QueuedJobs(txn *Txn, queue string) *immutable.SortedSetIterator[*Job] { +func (txn *Txn) QueuedJobs(queue string) *immutable.SortedSetIterator[*Job] { jobQueue, ok := txn.jobsByQueue[queue] if ok { return jobQueue.Iterator() @@ -231,13 +306,13 @@ func (jobDb *JobDb) QueuedJobs(txn *Txn, queue string) *immutable.SortedSetItera } // QueuedJobsByTtl returns an iterator for jobs ordered by queue ttl time - the closest to expiry first -func (jobDb *JobDb) QueuedJobsByTtl(txn *Txn) *immutable.SortedSetIterator[*Job] { +func (txn *Txn) QueuedJobsByTtl() *immutable.SortedSetIterator[*Job] { return txn.queuedJobsByTtl.Iterator() } // GetAll returns all jobs in the database. // The Jobs returned by this function *must not* be subsequently modified -func (jobDb *JobDb) GetAll(txn *Txn) []*Job { +func (txn *Txn) GetAll() []*Job { allJobs := make([]*Job, 0, txn.jobsById.Len()) iter := txn.jobsById.Iterator() for !iter.Done() { @@ -249,8 +324,8 @@ func (jobDb *JobDb) GetAll(txn *Txn) []*Job { // BatchDelete deletes the jobs with the given ids from the database. // Any ids not in the database are ignored. -func (jobDb *JobDb) BatchDelete(txn *Txn, ids []string) error { - if err := jobDb.checkWritableTransaction(txn); err != nil { +func (txn *Txn) BatchDelete(ids []string) error { + if err := txn.checkWritableTransaction(); err != nil { return err } for _, id := range ids { @@ -276,7 +351,7 @@ func (jobDb *JobDb) BatchDelete(txn *Txn, ids []string) error { return nil } -func (jobDb *JobDb) checkWritableTransaction(txn *Txn) error { +func (txn *Txn) checkWritableTransaction() error { if txn.readOnly { return errors.New("Cannot write using a read only transaction") } @@ -285,79 +360,3 @@ func (jobDb *JobDb) checkWritableTransaction(txn *Txn) error { } return nil } - -// ReadTxn returns a read-only transaction. -// Multiple read-only transactions can access the db concurrently -func (jobDb *JobDb) ReadTxn() *Txn { - jobDb.copyMutex.Lock() - defer jobDb.copyMutex.Unlock() - return &Txn{ - readOnly: true, - jobsById: jobDb.jobsById, - jobsByRunId: jobDb.jobsByRunId, - jobsByQueue: jobDb.jobsByQueue, - queuedJobsByTtl: jobDb.queuedJobsByTtl, - active: true, - jobDb: jobDb, - } -} - -// WriteTxn returns a writeable transaction. -// Only a single write transaction may access the db at any given time so note that this function will block until -// any outstanding write transactions have been committed or aborted -func (jobDb *JobDb) WriteTxn() *Txn { - jobDb.writerMutex.Lock() - jobDb.copyMutex.Lock() - defer jobDb.copyMutex.Unlock() - return &Txn{ - readOnly: false, - jobsById: jobDb.jobsById, - jobsByRunId: jobDb.jobsByRunId, - jobsByQueue: maps.Clone(jobDb.jobsByQueue), - queuedJobsByTtl: jobDb.queuedJobsByTtl, - active: true, - jobDb: jobDb, - } -} - -// Txn is a JobDb Transaction. Transactions provide a consistent view of the database, allowing readers to -// perform multiple actions without the database changing from underneath them. -// Write transactions also allow callers to perform write operations that will not be visible to other users -// until the transaction is committed. -type Txn struct { - readOnly bool - // Map from job ids to jobs. - jobsById *immutable.Map[string, *Job] - // Map from run ids to jobs. - // Note that a job may have multiple runs, i.e., the mapping is many-to-one. - jobsByRunId *immutable.Map[uuid.UUID, string] - // Queued jobs for each queue. Stored in the order in which they should be scheduled. - jobsByQueue map[string]immutable.SortedSet[*Job] - // Queued jobs for each queue ordered by remaining time-to-live. - // TODO: The ordering is wrong. Since we call time.Now() in the compare function. - queuedJobsByTtl *immutable.SortedSet[*Job] - jobDb *JobDb - active bool -} - -func (txn *Txn) Commit() { - if txn.readOnly || !txn.active { - return - } - txn.jobDb.copyMutex.Lock() - defer txn.jobDb.copyMutex.Unlock() - defer txn.jobDb.writerMutex.Unlock() - txn.jobDb.jobsById = txn.jobsById - txn.jobDb.jobsByRunId = txn.jobsByRunId - txn.jobDb.jobsByQueue = txn.jobsByQueue - txn.jobDb.queuedJobsByTtl = txn.queuedJobsByTtl - txn.active = false -} - -func (txn *Txn) Abort() { - if txn.readOnly || !txn.active { - return - } - txn.active = false - txn.jobDb.writerMutex.Unlock() -} diff --git a/internal/scheduler/jobdb/jobdb_test.go b/internal/scheduler/jobdb/jobdb_test.go index b885c179866..c3888f08786 100644 --- a/internal/scheduler/jobdb/jobdb_test.go +++ b/internal/scheduler/jobdb/jobdb_test.go @@ -32,22 +32,22 @@ func TestJobDb_TestUpsert(t *testing.T) { txn := jobDb.WriteTxn() // Insert Job - err := jobDb.Upsert(txn, []*Job{job1, job2}) + err := txn.Upsert([]*Job{job1, job2}) require.NoError(t, err) - retrieved := jobDb.GetById(txn, job1.Id()) + retrieved := txn.GetById(job1.Id()) assert.Equal(t, job1, retrieved) - retrieved = jobDb.GetById(txn, job2.Id()) + retrieved = txn.GetById(job2.Id()) assert.Equal(t, job2, retrieved) // Updated Job job1Updated := job1.WithQueued(true) - err = jobDb.Upsert(txn, []*Job{job1Updated}) + err = txn.Upsert([]*Job{job1Updated}) require.NoError(t, err) - retrieved = jobDb.GetById(txn, job1.Id()) + retrieved = txn.GetById(job1.Id()) assert.Equal(t, job1Updated, retrieved) // Can't insert with read only transaction - err = jobDb.Upsert(jobDb.ReadTxn(), []*Job{job1}) + err = (jobDb.ReadTxn()).Upsert([]*Job{job1}) require.Error(t, err) } @@ -57,11 +57,11 @@ func TestJobDb_TestGetById(t *testing.T) { job2 := newJob() txn := jobDb.WriteTxn() - err := jobDb.Upsert(txn, []*Job{job1, job2}) + err := txn.Upsert([]*Job{job1, job2}) require.NoError(t, err) - assert.Equal(t, job1, jobDb.GetById(txn, job1.Id())) - assert.Equal(t, job2, jobDb.GetById(txn, job2.Id())) - assert.Nil(t, jobDb.GetById(txn, util.NewULID())) + assert.Equal(t, job1, txn.GetById(job1.Id())) + assert.Equal(t, job2, txn.GetById(job2.Id())) + assert.Nil(t, txn.GetById(util.NewULID())) } func TestJobDb_TestGetByRunId(t *testing.T) { @@ -70,15 +70,15 @@ func TestJobDb_TestGetByRunId(t *testing.T) { job2 := newJob().WithNewRun("executor", "nodeId", "nodeName") txn := jobDb.WriteTxn() - err := jobDb.Upsert(txn, []*Job{job1, job2}) + err := txn.Upsert([]*Job{job1, job2}) require.NoError(t, err) - assert.Equal(t, job1, jobDb.GetByRunId(txn, job1.LatestRun().id)) - assert.Equal(t, job2, jobDb.GetByRunId(txn, job2.LatestRun().id)) - assert.Nil(t, jobDb.GetByRunId(txn, uuid.New())) + assert.Equal(t, job1, txn.GetByRunId(job1.LatestRun().id)) + assert.Equal(t, job2, txn.GetByRunId(job2.LatestRun().id)) + assert.Nil(t, txn.GetByRunId(uuid.New())) - err = jobDb.BatchDelete(txn, []string{job1.Id()}) + err = txn.BatchDelete([]string{job1.Id()}) require.NoError(t, err) - assert.Nil(t, jobDb.GetByRunId(txn, job1.LatestRun().id)) + assert.Nil(t, txn.GetByRunId(job1.LatestRun().id)) } func TestJobDb_TestHasQueuedJobs(t *testing.T) { @@ -87,15 +87,15 @@ func TestJobDb_TestHasQueuedJobs(t *testing.T) { job2 := newJob().WithNewRun("executor", "nodeId", "nodeName") txn := jobDb.WriteTxn() - err := jobDb.Upsert(txn, []*Job{job1, job2}) + err := txn.Upsert([]*Job{job1, job2}) require.NoError(t, err) - assert.False(t, jobDb.HasQueuedJobs(txn, job1.queue)) - assert.False(t, jobDb.HasQueuedJobs(txn, "non-existent-queue")) + assert.False(t, txn.HasQueuedJobs(job1.queue)) + assert.False(t, txn.HasQueuedJobs("non-existent-queue")) - err = jobDb.Upsert(txn, []*Job{job1.WithQueued(true)}) + err = txn.Upsert([]*Job{job1.WithQueued(true)}) require.NoError(t, err) - assert.True(t, jobDb.HasQueuedJobs(txn, job1.queue)) - assert.False(t, jobDb.HasQueuedJobs(txn, "non-existent-queue")) + assert.True(t, txn.HasQueuedJobs(job1.queue)) + assert.False(t, txn.HasQueuedJobs("non-existent-queue")) } func TestJobDb_TestQueuedJobs(t *testing.T) { @@ -110,11 +110,11 @@ func TestJobDb_TestQueuedJobs(t *testing.T) { rand.Shuffle(len(shuffledJobs), func(i, j int) { shuffledJobs[i], shuffledJobs[j] = shuffledJobs[j], jobs[i] }) txn := jobDb.WriteTxn() - err := jobDb.Upsert(txn, jobs) + err := txn.Upsert(jobs) require.NoError(t, err) collect := func() []*Job { retrieved := make([]*Job, 0) - iter := jobDb.QueuedJobs(txn, jobs[0].GetQueue()) + iter := txn.QueuedJobs(jobs[0].GetQueue()) for !iter.Done() { j, _ := iter.Next() retrieved = append(retrieved, j) @@ -125,29 +125,29 @@ func TestJobDb_TestQueuedJobs(t *testing.T) { assert.Equal(t, jobs, collect()) // remove some jobs - err = jobDb.BatchDelete(txn, []string{jobs[1].id, jobs[3].id, jobs[5].id}) + err = txn.BatchDelete([]string{jobs[1].id, jobs[3].id, jobs[5].id}) require.NoError(t, err) assert.Equal(t, []*Job{jobs[0], jobs[2], jobs[4], jobs[6], jobs[7], jobs[8], jobs[9]}, collect()) // dequeue some jobs - err = jobDb.Upsert(txn, []*Job{jobs[7].WithQueued(false), jobs[4].WithQueued(false)}) + err = txn.Upsert([]*Job{jobs[7].WithQueued(false), jobs[4].WithQueued(false)}) require.NoError(t, err) assert.Equal(t, []*Job{jobs[0], jobs[2], jobs[6], jobs[8], jobs[9]}, collect()) // change the priority of a job to put it to the front of the queue updatedJob := jobs[8].WithPriority(0) - err = jobDb.Upsert(txn, []*Job{updatedJob}) + err = txn.Upsert([]*Job{updatedJob}) require.NoError(t, err) assert.Equal(t, []*Job{updatedJob, jobs[0], jobs[2], jobs[6], jobs[9]}, collect()) // new job job10 := newJob().WithPriority(90).WithQueued(true) - err = jobDb.Upsert(txn, []*Job{job10}) + err = txn.Upsert([]*Job{job10}) require.NoError(t, err) assert.Equal(t, []*Job{updatedJob, job10, jobs[0], jobs[2], jobs[6], jobs[9]}, collect()) // clear all jobs - err = jobDb.BatchDelete(txn, []string{updatedJob.id, job10.id, jobs[0].id, jobs[2].id, jobs[6].id, jobs[9].id}) + err = txn.BatchDelete([]string{updatedJob.id, job10.id, jobs[0].id, jobs[2].id, jobs[6].id, jobs[9].id}) require.NoError(t, err) assert.Equal(t, []*Job{}, collect()) } @@ -157,11 +157,11 @@ func TestJobDb_TestGetAll(t *testing.T) { job1 := newJob().WithNewRun("executor", "nodeId", "nodeName") job2 := newJob().WithNewRun("executor", "nodeId", "nodeName") txn := jobDb.WriteTxn() - assert.Equal(t, []*Job{}, jobDb.GetAll(txn)) + assert.Equal(t, []*Job{}, txn.GetAll()) - err := jobDb.Upsert(txn, []*Job{job1, job2}) + err := txn.Upsert([]*Job{job1, job2}) require.NoError(t, err) - actual := jobDb.GetAll(txn) + actual := txn.GetAll() expected := []*Job{job1, job2} slices.SortFunc(expected, func(a, b *Job) bool { return a.id > b.id @@ -178,17 +178,17 @@ func TestJobDb_TestTransactions(t *testing.T) { txn1 := jobDb.WriteTxn() txn2 := jobDb.ReadTxn() - err := jobDb.Upsert(txn1, []*Job{job}) + err := txn1.Upsert([]*Job{job}) require.NoError(t, err) - assert.NotNil(t, jobDb.GetById(txn1, job.id)) - assert.Nil(t, jobDb.GetById(txn2, job.id)) + assert.NotNil(t, txn1.GetById(job.id)) + assert.Nil(t, txn2.GetById(job.id)) txn1.Commit() txn3 := jobDb.ReadTxn() - assert.NotNil(t, jobDb.GetById(txn3, job.id)) + assert.NotNil(t, txn3.GetById(job.id)) - assert.Error(t, jobDb.Upsert(txn1, []*Job{job})) // should be error as you can't insert after committing + assert.Error(t, txn1.Upsert([]*Job{job})) // should be error as you can't insert after committing } func TestJobDb_TestBatchDelete(t *testing.T) { @@ -198,15 +198,15 @@ func TestJobDb_TestBatchDelete(t *testing.T) { txn := jobDb.WriteTxn() // Insert Job - err := jobDb.Upsert(txn, []*Job{job1, job2}) + err := txn.Upsert([]*Job{job1, job2}) require.NoError(t, err) - err = jobDb.BatchDelete(txn, []string{job2.Id()}) + err = txn.BatchDelete([]string{job2.Id()}) require.NoError(t, err) - assert.NotNil(t, jobDb.GetById(txn, job1.Id())) - assert.Nil(t, jobDb.GetById(txn, job2.Id())) + assert.NotNil(t, txn.GetById(job1.Id())) + assert.Nil(t, txn.GetById(job2.Id())) // Can't delete with read only transaction - err = jobDb.BatchDelete(jobDb.ReadTxn(), []string{job1.Id()}) + err = (jobDb.ReadTxn()).BatchDelete([]string{job1.Id()}) require.Error(t, err) } diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 168295ff91f..3af04064098 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -149,7 +149,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro } currentTime := c.clock.Now() - for _, job := range c.jobDb.GetAll(c.jobDb.ReadTxn()) { + for _, job := range c.jobDb.ReadTxn().GetAll() { // Don't calculate metrics for dead jobs if job.InTerminalState() { continue @@ -254,7 +254,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } for runId, jobRunState := range node.StateByJobRunId { - job := c.jobDb.GetByRunId(txn, uuid.MustParse(runId)) + job := txn.GetByRunId(uuid.MustParse(runId)) if job != nil { phase := schedulerobjects.JobRunState_name[int32(jobRunState)] key := queuePhaseMetricKey{ diff --git a/internal/scheduler/metrics_test.go b/internal/scheduler/metrics_test.go index 33f4326be4f..6e27c31c802 100644 --- a/internal/scheduler/metrics_test.go +++ b/internal/scheduler/metrics_test.go @@ -92,7 +92,7 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { // set up job db with initial jobs jobDb := testfixtures.NewJobDb() txn := jobDb.WriteTxn() - err := jobDb.Upsert(txn, tc.initialJobs) + err := txn.Upsert(tc.initialJobs) require.NoError(t, err) txn.Commit() @@ -242,7 +242,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { // set up job db with initial jobs jobDb := testfixtures.NewJobDb() txn := jobDb.WriteTxn() - err := jobDb.Upsert(txn, tc.jobDbJobs) + err := txn.Upsert(tc.jobDbJobs) require.NoError(t, err) txn.Commit() diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 9549b2180c5..2c534e99d25 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -211,11 +211,11 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke } // If we've been asked to generate messages for all jobs, do so. - // Otherwise generate messages only for jobs updated this cycle. + // Otherwise, generate messages only for jobs updated this cycle. txn := s.jobDb.WriteTxn() defer txn.Abort() if updateAll { - updatedJobs = s.jobDb.GetAll(txn) + updatedJobs = txn.GetAll() } // Generate any events that came out of synchronising the db state. @@ -241,7 +241,7 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke // Schedule jobs. if shouldSchedule { var result *SchedulerResult - result, err = s.schedulingAlgo.Schedule(ctx, txn, s.jobDb) + result, err = s.schedulingAlgo.Schedule(ctx, txn) if err != nil { return } @@ -293,7 +293,7 @@ func (s *Scheduler) syncState(ctx *armadacontext.Context) ([]*jobdb.Job, error) } // Try and retrieve the job from the jobDb. If it doesn't exist then create it. - job := s.jobDb.GetById(txn, dbJob.JobID) + job := txn.GetById(dbJob.JobID) if job == nil { job, err = s.schedulerJobFromDatabaseJob(&dbJob) if err != nil { @@ -316,7 +316,7 @@ func (s *Scheduler) syncState(ctx *armadacontext.Context) ([]*jobdb.Job, error) // Retrieve the job, look first in the list of updates, then in the jobDb. job, present := jobsToUpdateById[jobId] if !present { - job = s.jobDb.GetById(txn, jobId) + job = txn.GetById(jobId) // If the job is nil or terminal at this point then it cannot be active. // In this case we can ignore the run. @@ -338,10 +338,10 @@ func (s *Scheduler) syncState(ctx *armadacontext.Context) ([]*jobdb.Job, error) } jobsToUpdate := maps.Values(jobsToUpdateById) - if err := s.jobDb.Upsert(txn, jobsToUpdate); err != nil { + if err := txn.Upsert(jobsToUpdate); err != nil { return nil, err } - if err := s.jobDb.BatchDelete(txn, jobsToDelete); err != nil { + if err := txn.BatchDelete(jobsToDelete); err != nil { return nil, err } txn.Commit() @@ -716,7 +716,7 @@ func (s *Scheduler) generateUpdateMessagesFromJob(job *jobdb.Job, jobRunErrors m } if origJob != job { - err := s.jobDb.Upsert(txn, []*jobdb.Job{job}) + err := txn.Upsert([]*jobdb.Job{job}) if err != nil { return nil, err } @@ -765,7 +765,7 @@ func (s *Scheduler) expireJobsIfNecessary(ctx *armadacontext.Context, txn *jobdb events := make([]*armadaevents.EventSequence, 0) // TODO: this is inefficient. We should create a iterator of the jobs running on the affected executors - jobs := s.jobDb.GetAll(txn) + jobs := txn.GetAll() for _, job := range jobs { @@ -817,7 +817,7 @@ func (s *Scheduler) expireJobsIfNecessary(ctx *armadacontext.Context, txn *jobdb events = append(events, es) } } - if err := s.jobDb.Upsert(txn, jobsToUpdate); err != nil { + if err := txn.Upsert(jobsToUpdate); err != nil { return nil, err } return events, nil @@ -827,7 +827,7 @@ func (s *Scheduler) expireJobsIfNecessary(ctx *armadacontext.Context, txn *jobdb func (s *Scheduler) cancelQueuedJobsIfExpired(txn *jobdb.Txn) ([]*armadaevents.EventSequence, error) { jobsToCancel := make([]*jobdb.Job, 0) events := make([]*armadaevents.EventSequence, 0) - it := s.jobDb.QueuedJobsByTtl(txn) + it := txn.QueuedJobsByTtl() // `it` is ordered such that the jobs with the least ttl remaining come first, hence we exit early if we find a job that is not expired. for job, _ := it.Next(); job != nil && job.HasQueueTtlExpired(); job, _ = it.Next() { @@ -861,7 +861,7 @@ func (s *Scheduler) cancelQueuedJobsIfExpired(txn *jobdb.Txn) ([]*armadaevents.E events = append(events, cancel) } - if err := s.jobDb.Upsert(txn, jobsToCancel); err != nil { + if err := txn.Upsert(jobsToCancel); err != nil { return nil, err } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index a4c7244f4eb..8a652563c17 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -637,7 +637,7 @@ func TestScheduler_TestCycle(t *testing.T) { // insert initial jobs txn := sched.jobDb.WriteTxn() - err = sched.jobDb.Upsert(txn, tc.initialJobs) + err = txn.Upsert(tc.initialJobs) require.NoError(t, err) txn.Commit() @@ -681,7 +681,7 @@ func TestScheduler_TestCycle(t *testing.T) { } // assert that the job db is in the state we expect - jobs := sched.jobDb.GetAll(sched.jobDb.ReadTxn()) + jobs := sched.jobDb.ReadTxn().GetAll() remainingLeased := stringSet(tc.expectedLeased) remainingQueued := stringSet(tc.expectedQueued) remainingTerminal := stringSet(tc.expectedTerminal) @@ -1017,7 +1017,7 @@ func TestScheduler_TestSyncState(t *testing.T) { // insert initial jobs txn := sched.jobDb.WriteTxn() - err = sched.jobDb.Upsert(txn, tc.initialJobs) + err = txn.Upsert(tc.initialJobs) require.NoError(t, err) txn.Commit() @@ -1025,7 +1025,7 @@ func TestScheduler_TestSyncState(t *testing.T) { require.NoError(t, err) assert.Equal(t, tc.expectedUpdatedJobs, updatedJobs) - allDbJobs := sched.jobDb.GetAll(sched.jobDb.ReadTxn()) + allDbJobs := sched.jobDb.ReadTxn().GetAll() expectedIds := stringSet(tc.expectedJobDbIds) require.Equal(t, len(tc.expectedJobDbIds), len(allDbJobs)) @@ -1125,7 +1125,7 @@ type testSchedulingAlgo struct { shouldError bool } -func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn, jobDb *jobdb.JobDb) (*SchedulerResult, error) { +func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn) (*SchedulerResult, error) { t.numberOfScheduleCalls++ if t.shouldError { return nil, errors.New("error scheduling jobs") @@ -1134,7 +1134,7 @@ func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn scheduledJobs := make([]*jobdb.Job, 0, len(t.jobsToSchedule)) failedJobs := make([]*jobdb.Job, 0, len(t.jobsToFail)) for _, id := range t.jobsToPreempt { - job := jobDb.GetById(txn, id) + job := txn.GetById(id) if job == nil { return nil, errors.Errorf("was asked to preempt job %s but job does not exist", id) } @@ -1150,7 +1150,7 @@ func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn preemptedJobs = append(preemptedJobs, job) } for _, id := range t.jobsToSchedule { - job := jobDb.GetById(txn, id) + job := txn.GetById(id) if job == nil { return nil, errors.Errorf("was asked to lease %s but job does not exist", id) } @@ -1161,7 +1161,7 @@ func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn scheduledJobs = append(scheduledJobs, job) } for _, id := range t.jobsToFail { - job := jobDb.GetById(txn, id) + job := txn.GetById(id) if job == nil { return nil, errors.Errorf("was asked to lease %s but job does not exist", id) } @@ -1171,13 +1171,13 @@ func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn job = job.WithQueued(false).WithFailed(true) failedJobs = append(failedJobs, job) } - if err := jobDb.Upsert(txn, preemptedJobs); err != nil { + if err := txn.Upsert(preemptedJobs); err != nil { return nil, err } - if err := jobDb.Upsert(txn, scheduledJobs); err != nil { + if err := txn.Upsert(scheduledJobs); err != nil { return nil, err } - if err := jobDb.Upsert(txn, failedJobs); err != nil { + if err := txn.Upsert(failedJobs); err != nil { return nil, err } return NewSchedulerResultForTest(preemptedJobs, scheduledJobs, failedJobs, nil), nil diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 08ab1878664..48c06e9c56c 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -34,7 +34,7 @@ import ( type SchedulingAlgo interface { // Schedule should assign jobs to nodes. // Any jobs that are scheduled should be marked as such in the JobDb using the transaction provided. - Schedule(ctx *armadacontext.Context, txn *jobdb.Txn, jobDb *jobdb.JobDb) (*SchedulerResult, error) + Schedule(ctx *armadacontext.Context, txn *jobdb.Txn) (*SchedulerResult, error) } // FairSchedulingAlgo is a SchedulingAlgo based on PreemptingQueueScheduler. @@ -90,7 +90,6 @@ func NewFairSchedulingAlgo( func (l *FairSchedulingAlgo) Schedule( ctx *armadacontext.Context, txn *jobdb.Txn, - jobDb *jobdb.JobDb, ) (*SchedulerResult, error) { var cancel context.CancelFunc if l.maxSchedulingDuration != 0 { @@ -109,7 +108,7 @@ func (l *FairSchedulingAlgo) Schedule( return overallSchedulerResult, nil } - fsctx, err := l.newFairSchedulingAlgoContext(ctx, txn, jobDb) + fsctx, err := l.newFairSchedulingAlgoContext(ctx, txn) if err != nil { return nil, err } @@ -173,13 +172,13 @@ func (l *FairSchedulingAlgo) Schedule( preemptedJobs := PreemptedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) scheduledJobs := ScheduledJobsFromSchedulerResult[*jobdb.Job](schedulerResult) failedJobs := FailedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) - if err := jobDb.Upsert(txn, preemptedJobs); err != nil { + if err := txn.Upsert(preemptedJobs); err != nil { return nil, err } - if err := jobDb.Upsert(txn, scheduledJobs); err != nil { + if err := txn.Upsert(scheduledJobs); err != nil { return nil, err } - if err := jobDb.Upsert(txn, failedJobs); err != nil { + if err := txn.Upsert(failedJobs); err != nil { return nil, err } @@ -241,10 +240,9 @@ type fairSchedulingAlgoContext struct { allocationByPoolAndQueueAndPriorityClass map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string] executors []*schedulerobjects.Executor txn *jobdb.Txn - jobDb *jobdb.JobDb } -func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Context, txn *jobdb.Txn, jobDb *jobdb.JobDb) (*fairSchedulingAlgoContext, error) { +func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Context, txn *jobdb.Txn) (*fairSchedulingAlgoContext, error) { executors, err := l.executorRepository.GetExecutors(ctx) if err != nil { return nil, err @@ -274,7 +272,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con nodeIdByJobId := make(map[string]string) jobIdsByGangId := make(map[string]map[string]bool) gangIdByJobId := make(map[string]string) - for _, job := range jobDb.GetAll(txn) { + for _, job := range txn.GetAll() { isActiveByQueueName[job.Queue()] = true if job.Queued() { continue @@ -328,7 +326,6 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con gangIdByJobId: gangIdByJobId, allocationByPoolAndQueueAndPriorityClass: totalAllocationByPoolAndQueue, executors: executors, - jobDb: jobDb, txn: txn, }, nil } @@ -426,7 +423,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( l.schedulingConfig.Preemption.NodeEvictionProbability, l.schedulingConfig.Preemption.NodeOversubscriptionEvictionProbability, l.schedulingConfig.Preemption.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(fsctx.jobDb, fsctx.txn), + NewSchedulerJobRepositoryAdapter(fsctx.txn), nodeDb, fsctx.nodeIdByJobId, fsctx.jobIdsByGangId, @@ -477,13 +474,11 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( // // TODO: Pass JobDb into the scheduler instead of using this shim to convert to a JobRepo. type SchedulerJobRepositoryAdapter struct { - db *jobdb.JobDb txn *jobdb.Txn } -func NewSchedulerJobRepositoryAdapter(db *jobdb.JobDb, txn *jobdb.Txn) *SchedulerJobRepositoryAdapter { +func NewSchedulerJobRepositoryAdapter(txn *jobdb.Txn) *SchedulerJobRepositoryAdapter { return &SchedulerJobRepositoryAdapter{ - db: db, txn: txn, } } @@ -492,7 +487,7 @@ func NewSchedulerJobRepositoryAdapter(db *jobdb.JobDb, txn *jobdb.Txn) *Schedule // to new scheduler. func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) ([]string, error) { rv := make([]string, 0) - it := repo.db.QueuedJobs(repo.txn, queue) + it := repo.txn.QueuedJobs(queue) for v, _ := it.Next(); v != nil; v, _ = it.Next() { rv = append(rv, v.Id()) } @@ -504,7 +499,7 @@ func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) ([]strin func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) ([]interfaces.LegacySchedulerJob, error) { rv := make([]interfaces.LegacySchedulerJob, 0, len(ids)) for _, id := range ids { - if job := repo.db.GetById(repo.txn, id); job != nil { + if job := repo.txn.GetById(id); job != nil { rv = append(rv, job) } } diff --git a/internal/scheduler/scheduling_algo_test.go b/internal/scheduler/scheduling_algo_test.go index 43e50e0c14a..023572bf3ff 100644 --- a/internal/scheduler/scheduling_algo_test.go +++ b/internal/scheduler/scheduling_algo_test.go @@ -417,11 +417,11 @@ func TestSchedule(t *testing.T) { // Setup jobDb. jobDb := testfixtures.NewJobDb() txn := jobDb.WriteTxn() - err = jobDb.Upsert(txn, jobsToUpsert) + err = txn.Upsert(jobsToUpsert) require.NoError(t, err) // Run a scheduling round. - schedulerResult, err := sch.Schedule(ctx, txn, jobDb) + schedulerResult, err := sch.Schedule(ctx, txn) require.NoError(t, err) // Check that the expected preemptions took place. @@ -468,14 +468,14 @@ func TestSchedule(t *testing.T) { // Check that preempted jobs are marked as such consistently. for _, job := range preemptedJobs { - dbJob := jobDb.GetById(txn, job.Id()) + dbJob := txn.GetById(job.Id()) assert.True(t, dbJob.Failed()) assert.False(t, dbJob.Queued()) } // Check that scheduled jobs are marked as such consistently. for _, job := range scheduledJobs { - dbJob := jobDb.GetById(txn, job.Id()) + dbJob := txn.GetById(job.Id()) assert.False(t, dbJob.Failed()) assert.False(t, dbJob.Queued()) dbRun := dbJob.LatestRun() @@ -486,7 +486,7 @@ func TestSchedule(t *testing.T) { // Check that failed jobs are marked as such consistently. for _, job := range failedJobs { - dbJob := jobDb.GetById(txn, job.Id()) + dbJob := txn.GetById(job.Id()) assert.True(t, dbJob.Failed()) assert.False(t, dbJob.Queued()) } @@ -494,15 +494,15 @@ func TestSchedule(t *testing.T) { // Check that jobDb was updated correctly. // TODO: Check that there are no unexpected jobs in the jobDb. for _, job := range preemptedJobs { - dbJob := jobDb.GetById(txn, job.Id()) + dbJob := txn.GetById(job.Id()) assert.True(t, job.Equal(dbJob), "expected %v but got %v", job, dbJob) } for _, job := range scheduledJobs { - dbJob := jobDb.GetById(txn, job.Id()) + dbJob := txn.GetById(job.Id()) assert.True(t, job.Equal(dbJob), "expected %v but got %v", job, dbJob) } for _, job := range failedJobs { - dbJob := jobDb.GetById(txn, job.Id()) + dbJob := txn.GetById(job.Id()) assert.True(t, job.Equal(dbJob), "expected %v but got %v", job, dbJob) } }) diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 60a9adece1f..75b10a6afdf 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -453,7 +453,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { s.schedulingConfig.Preemption.NodeEvictionProbability, s.schedulingConfig.Preemption.NodeOversubscriptionEvictionProbability, s.schedulingConfig.Preemption.ProtectedFractionOfFairShare, - scheduler.NewSchedulerJobRepositoryAdapter(s.jobDb, txn), + scheduler.NewSchedulerJobRepositoryAdapter(txn), nodeDb, // TODO: Necessary to support partial eviction. nil, @@ -521,13 +521,13 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { } failedJobs[i] = job.WithQueued(false).WithFailed(true) } - if err := s.jobDb.Upsert(txn, preemptedJobs); err != nil { + if err := txn.Upsert(preemptedJobs); err != nil { return err } - if err := s.jobDb.Upsert(txn, scheduledJobs); err != nil { + if err := txn.Upsert(scheduledJobs); err != nil { return err } - if err := s.jobDb.Upsert(txn, failedJobs); err != nil { + if err := txn.Upsert(failedJobs); err != nil { return err } @@ -636,7 +636,7 @@ func (s *Simulator) handleSubmitJob(txn *jobdb.Txn, e *armadaevents.SubmitJob, t false, s.logicalJobCreatedTimestamp.Add(1), ) - if err := s.jobDb.Upsert(txn, []*jobdb.Job{job}); err != nil { + if err := txn.Upsert([]*jobdb.Job{job}); err != nil { return false, err } return true, nil @@ -644,7 +644,7 @@ func (s *Simulator) handleSubmitJob(txn *jobdb.Txn, e *armadaevents.SubmitJob, t func (s *Simulator) handleJobRunLeased(txn *jobdb.Txn, e *armadaevents.JobRunLeased) (bool, error) { jobId := armadaevents.UlidFromProtoUuid(e.JobId).String() - job := s.jobDb.GetById(txn, jobId) + job := txn.GetById(jobId) jobTemplate := s.jobTemplateByJobId[jobId] if jobTemplate == nil { return false, errors.Errorf("no jobTemplate associated with job %s", jobId) @@ -685,12 +685,12 @@ func generateRandomShiftedExponentialDuration(r *rand.Rand, rv ShiftedExponentia func (s *Simulator) handleJobSucceeded(txn *jobdb.Txn, e *armadaevents.JobSucceeded) (bool, error) { jobId := armadaevents.UlidFromProtoUuid(e.JobId).String() - job := s.jobDb.GetById(txn, jobId) + job := txn.GetById(jobId) if job == nil || job.InTerminalState() { // Job already terminated; nothing more to do. return false, nil } - if err := s.jobDb.BatchDelete(txn, []string{jobId}); err != nil { + if err := txn.BatchDelete([]string{jobId}); err != nil { return false, err } @@ -780,7 +780,7 @@ func (s *Simulator) unbindRunningJob(job *jobdb.Job) error { func (s *Simulator) handleJobRunPreempted(txn *jobdb.Txn, e *armadaevents.JobRunPreempted) (bool, error) { jobId := armadaevents.UlidFromProtoUuid(e.PreemptedJobId).String() - job := s.jobDb.GetById(txn, jobId) + job := txn.GetById(jobId) // Submit a retry for this job. jobTemplate := s.jobTemplateByJobId[job.GetId()]