Skip to content

Commit

Permalink
Propagate additional annotations to pods (#3379)
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq authored Feb 7, 2024
1 parent a24fd16 commit 9b85c5b
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 419 deletions.
7 changes: 4 additions & 3 deletions internal/executor/util/kubernetes_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func TestCreatePodFromExecutorApiJob(t *testing.T) {
Job: &armadaevents.SubmitJob{
ObjectMeta: &armadaevents.ObjectMeta{
Labels: map[string]string{},
Annotations: map[string]string{},
Annotations: map[string]string{"runtime_gang_cardinality": "3"},
Namespace: "test-namespace",
},
JobId: jobId,
Expand All @@ -520,8 +520,9 @@ func TestCreatePodFromExecutorApiJob(t *testing.T) {
domain.PodCount: "1",
},
Annotations: map[string]string{
domain.JobSetId: "job-set",
domain.Owner: "user",
domain.JobSetId: "job-set",
domain.Owner: "user",
"runtime_gang_cardinality": "3",
},
},
Spec: v1.PodSpec{
Expand Down
16 changes: 16 additions & 0 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
return err
}
addTolerations(submitMsg, PodRequirementsOverlay.Tolerations)
addAnnotations(submitMsg, PodRequirementsOverlay.Annotations)
}

var groups []string
Expand Down Expand Up @@ -239,6 +240,21 @@ func addTolerations(job *armadaevents.SubmitJob, tolerations []v1.Toleration) {
}
}

func addAnnotations(job *armadaevents.SubmitJob, annotations map[string]string) {
if job == nil || len(annotations) == 0 {
return
}
if job.ObjectMeta == nil {
job.ObjectMeta = &armadaevents.ObjectMeta{}
}
if job.ObjectMeta.Annotations == nil {
job.ObjectMeta.Annotations = make(map[string]string, len(annotations))
}
for k, v := range annotations {
job.ObjectMeta.Annotations[k] = v
}
}

// ReportEvents publishes all eventSequences to Pulsar. The eventSequences are compacted for more efficient publishing.
func (srv *ExecutorApi) ReportEvents(grpcCtx context.Context, list *executorapi.EventList) (*types.Empty, error) {
ctx := armadacontext.FromGrpcCtx(grpcCtx)
Expand Down
30 changes: 18 additions & 12 deletions internal/scheduler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {

submit, compressedSubmit := submitMsg(
t,
nil,
&v1.PodSpec{
NodeSelector: map[string]string{nodeIdName: "node-id"},
},
Expand All @@ -97,7 +98,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
SubmitMessage: compressedSubmit,
}

submitWithoutNodeSelector, compressedSubmitNoNodeSelector := submitMsg(t, &v1.PodSpec{})
submitWithoutNodeSelector, compressedSubmitNoNodeSelector := submitMsg(t, nil, nil)
leaseWithoutNode := &database.JobRunLease{
RunID: uuid.New(),
Queue: "test-queue",
Expand All @@ -114,7 +115,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
Effect: v1.TaintEffectNoSchedule,
},
}
leaseWithTolerations := &database.JobRunLease{
leaseWithOverlay := &database.JobRunLease{
RunID: uuid.New(),
Queue: "test-queue",
JobSet: "test-jobset",
Expand All @@ -127,18 +128,22 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
PodRequirementsOverlay: protoutil.MustMarshall(
&schedulerobjects.PodRequirements{
Tolerations: tolerations,
Annotations: map[string]string{"runtime_gang_cardinality": "3"},
Priority: 1000,
},
),
}
submitWithTolerations, _ := submitMsg(
submitWithOverlay, _ := submitMsg(
t,
&armadaevents.ObjectMeta{
Annotations: map[string]string{"runtime_gang_cardinality": "3"},
},
&v1.PodSpec{
NodeSelector: map[string]string{nodeIdName: "node-id"},
Tolerations: tolerations,
},
)
submitWithTolerations.JobId = submit.JobId
submitWithOverlay.JobId = submit.JobId

tests := map[string]struct {
request *executorapi.LeaseRequest
Expand Down Expand Up @@ -195,17 +200,17 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
},
"run with PodRequirementsOverlay": {
request: defaultRequest,
leases: []*database.JobRunLease{leaseWithTolerations},
leases: []*database.JobRunLease{leaseWithOverlay},
expectedExecutor: defaultExpectedExecutor,
expectedMsgs: []*executorapi.LeaseStreamMessage{
{
Event: &executorapi.LeaseStreamMessage_Lease{Lease: &executorapi.JobRunLease{
JobRunId: armadaevents.ProtoUuidFromUuid(leaseWithTolerations.RunID),
Queue: leaseWithTolerations.Queue,
Jobset: leaseWithTolerations.JobSet,
User: leaseWithTolerations.UserID,
JobRunId: armadaevents.ProtoUuidFromUuid(leaseWithOverlay.RunID),
Queue: leaseWithOverlay.Queue,
Jobset: leaseWithOverlay.JobSet,
User: leaseWithOverlay.UserID,
Groups: groups,
Job: submitWithTolerations,
Job: submitWithOverlay,
}},
},
{
Expand Down Expand Up @@ -406,9 +411,10 @@ func TestExecutorApi_Publish(t *testing.T) {
}
}

func submitMsg(t *testing.T, podSpec *v1.PodSpec) (*armadaevents.SubmitJob, []byte) {
func submitMsg(t *testing.T, objectMeta *armadaevents.ObjectMeta, podSpec *v1.PodSpec) (*armadaevents.SubmitJob, []byte) {
submitMsg := &armadaevents.SubmitJob{
JobId: armadaevents.ProtoUuidFromUuid(uuid.New()),
JobId: armadaevents.ProtoUuidFromUuid(uuid.New()),
ObjectMeta: objectMeta,
MainObject: &armadaevents.KubernetesMainObject{
Object: &armadaevents.KubernetesMainObject_PodSpec{
PodSpec: &armadaevents.PodSpecWithAvoidList{
Expand Down
6 changes: 1 addition & 5 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,10 +542,6 @@ func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventS
if err != nil {
return nil, err
}
additionalAnnotations, found := additionalAnnotationsByJobId[job.Id()]
if !found {
additionalAnnotations = make(map[string]string)
}
run := job.LatestRun()
if run == nil {
return nil, errors.Errorf("attempting to generate lease eventSequences for job %s with no associated runs", job.Id())
Expand All @@ -569,9 +565,9 @@ func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventS
UpdateSequenceNumber: job.QueuedVersion(),
HasScheduledAtPriority: hasScheduledAtPriority,
ScheduledAtPriority: scheduledAtPriority,
AdditionalAnnotations: additionalAnnotations,
PodRequirementsOverlay: &schedulerobjects.PodRequirements{
Tolerations: jctx.AdditionalTolerations,
Annotations: additionalAnnotationsByJobId[job.Id()],
Priority: scheduledAtPriority,
},
},
Expand Down
3 changes: 0 additions & 3 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1843,7 +1843,6 @@ func TestCycleConsistency(t *testing.T) {
UpdateSequenceNumber: 1,
HasScheduledAtPriority: true,
ScheduledAtPriority: 10,
AdditionalAnnotations: make(map[string]string),
PodRequirementsOverlay: &schedulerobjects.PodRequirements{Priority: 10},
},
},
Expand Down Expand Up @@ -1928,7 +1927,6 @@ func TestCycleConsistency(t *testing.T) {
UpdateSequenceNumber: 1,
HasScheduledAtPriority: true,
ScheduledAtPriority: 10,
AdditionalAnnotations: make(map[string]string),
PodRequirementsOverlay: &schedulerobjects.PodRequirements{Priority: 10},
},
},
Expand Down Expand Up @@ -1992,7 +1990,6 @@ func TestCycleConsistency(t *testing.T) {
UpdateSequenceNumber: 1,
HasScheduledAtPriority: true,
ScheduledAtPriority: 10,
AdditionalAnnotations: make(map[string]string),
PodRequirementsOverlay: &schedulerobjects.PodRequirements{Priority: 10},
},
},
Expand Down
Loading

0 comments on commit 9b85c5b

Please sign in to comment.