Skip to content

Commit

Permalink
Merge branch 'master' into raaj-patel/create_job_error
Browse files Browse the repository at this point in the history
  • Loading branch information
richscott committed Oct 31, 2023
2 parents 764cc94 + f442b47 commit 11adf35
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 97 deletions.
3 changes: 3 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ dockers:
- lookoutv2
- lookoutingesterv2
extra_files:
- internal/lookout/ui
- pkg/api/api.swagger.json
- pkg/api/binoculars/api.swagger.json
- config/lookoutv2/config.yaml
- config/lookoutingesterv2/config.yaml
dockerfile: ./build_goreleaser/lookoutv2/Dockerfile
Expand Down
2 changes: 1 addition & 1 deletion build/lookoutv2/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN addgroup -S -g 2000 armada && adduser -S -u 1000 armada -G armada
USER armada

COPY ./lookoutv2 /app/

COPY ./internal/lookout/ui/build/ /app/internal/lookout/ui/build
COPY ./config/ /app/config/lookoutv2

WORKDIR /app
Expand Down
17 changes: 17 additions & 0 deletions build_goreleaser/lookoutv2/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
ARG NODE_BUILD_IMAGE=node:16.14-buster
ARG OPENAPI_BUILD_IMAGE=openapitools/openapi-generator-cli:v5.4.0
ARG BASE_IMAGE=alpine:3.18.3

FROM ${OPENAPI_BUILD_IMAGE} AS OPENAPI

COPY internal/lookout/ui /project/internal/lookout/ui
COPY pkg/api/*.swagger.json /project/pkg/api/
COPY pkg/api/binoculars/*.swagger.json /project/pkg/api/binoculars/
RUN ./project/internal/lookout/ui/openapi.sh

FROM ${NODE_BUILD_IMAGE} AS NODE
COPY --from=OPENAPI /project/internal/lookout/ui /ui/
WORKDIR /ui
RUN yarn install --immutable
RUN yarn run build

FROM ${BASE_IMAGE}
RUN addgroup -S -g 2000 armada && adduser -S -u 1000 armada -G armada
LABEL org.opencontainers.image.title=lookoutv2
LABEL org.opencontainers.image.description="Lookout V2"
LABEL org.opencontainers.image.url=https://hub.docker.com/r/gresearchdev/lookoutv2
USER armada
COPY --from=NODE /ui/build/ /app/internal/lookout/ui/build
COPY lookoutv2 /app/
COPY config/lookoutv2/config.yaml /app/config/lookoutv2/config.yaml
COPY lookoutingesterv2 /app/
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/pool_assigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func TestPoolAssigner_AssignPool(t *testing.T) {
"matches pool": {
executorTimout: executorTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(testfixtures.BaseTime)},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(testfixtures.BaseTime)},
job: cpuJob,
expectedPool: "cpu",
},
"doesn't match pool": {
executorTimout: executorTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(testfixtures.BaseTime)},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(testfixtures.BaseTime)},
job: gpuJob,
expectedPool: "",
},
Expand Down
9 changes: 9 additions & 0 deletions internal/scheduler/submitcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,15 @@ func (srv *SubmitChecker) getSchedulingResult(jctxs []*schedulercontext.JobSched
return schedulingResult{isSchedulable: true, reason: ""}
}

// Skip submit checks if this batch contains less than the min cardinality jobs.
// Reason:
// - We need to support submitting gang jobs across batches and allow for gang jobs to queue until min cardinality is satisfied.
// - We cannot verify if min cardinality jobs are schedulable unless we are given at least that many in a single batch.
// - A side effect of this is that users can submit jobs in gangs that skip this check and are never schedulable, which will be handled via queue-ttl.
if len(jctxs) < jctxs[0].GangMinCardinality {
return schedulingResult{isSchedulable: true, reason: ""}
}

// Make a shallow copy to avoid holding the lock and
// preventing updating NodeDbs while checking if jobs can be scheduled
srv.mu.Lock()
Expand Down
124 changes: 30 additions & 94 deletions internal/scheduler/submitcheck_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
package scheduler

import (
"fmt"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/clock"

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
schedulermocks "github.com/armadaproject/armada/internal/scheduler/mocks"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
Expand All @@ -38,35 +33,35 @@ func TestSubmitChecker_CheckJobDbJobs(t *testing.T) {
"one job schedules": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(baseTime)},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(baseTime)},
job: testfixtures.Test1Cpu4GiJob("queue", testfixtures.PriorityClass1),
expectPass: true,
},
"no jobs schedule due to resources": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(baseTime)},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(baseTime)},
job: testfixtures.Test32Cpu256GiJob("queue", testfixtures.PriorityClass1),
expectPass: false,
},
"no jobs schedule due to selector": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(baseTime)},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(baseTime)},
job: testfixtures.WithNodeSelectorJob(map[string]string{"foo": "bar"}, testfixtures.Test1Cpu4GiJob("queue", testfixtures.PriorityClass1)),
expectPass: false,
},
"no jobs schedule due to executor timeout": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(expiredTime)},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(expiredTime)},
job: testfixtures.Test1Cpu4GiJob("queue", testfixtures.PriorityClass1),
expectPass: false,
},
"multiple executors, 1 expired": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(expiredTime), testExecutor(baseTime)},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(expiredTime), testfixtures.TestExecutor(baseTime)},
job: testfixtures.Test1Cpu4GiJob("queue", testfixtures.PriorityClass1),
expectPass: true,
},
Expand Down Expand Up @@ -108,66 +103,73 @@ func TestSubmitChecker_TestCheckApiJobs(t *testing.T) {
"one job schedules": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{test1CoreCpuJob()},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{testfixtures.Test1CoreCpuApiJob()},
expectPass: true,
},
"multiple jobs schedule": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{test1CoreCpuJob(), test1CoreCpuJob()},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{testfixtures.Test1CoreCpuApiJob(), testfixtures.Test1CoreCpuApiJob()},
expectPass: true,
},
"first job schedules, second doesn't": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{test1CoreCpuJob(), test100CoreCpuJob()},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{testfixtures.Test1CoreCpuApiJob(), testfixtures.Test100CoreCpuApiJob()},
expectPass: false,
},
"no jobs schedule due to resources": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{test100CoreCpuJob()},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{testfixtures.Test100CoreCpuApiJob()},
expectPass: false,
},
"no jobs schedule due to selector": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{test1CoreCpuJobWithNodeSelector(map[string]string{"foo": "bar"})},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{testfixtures.Test1CoreCpuApiJobWithNodeSelector(map[string]string{"foo": "bar"})},
expectPass: false,
},
"no jobs schedule due to executor timeout": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(expiredTime)},
jobs: []*api.Job{test1CoreCpuJob()},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(expiredTime)},
jobs: []*api.Job{testfixtures.Test1CoreCpuApiJob()},
expectPass: false,
},
"multiple executors, 1 expired": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(expiredTime), testExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{test1CoreCpuJob()},
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(expiredTime), testfixtures.TestExecutor(testfixtures.BaseTime)},
jobs: []*api.Job{testfixtures.Test1CoreCpuApiJob()},
expectPass: true,
},
"gang job all jobs fit": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(testfixtures.BaseTime)},
jobs: testNJobGang(5),
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(testfixtures.BaseTime)},
jobs: testfixtures.TestNApiJobGang(5),
expectPass: true,
},
"gang job all jobs don't fit": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testExecutor(testfixtures.BaseTime)},
jobs: testNJobGang(100),
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(testfixtures.BaseTime)},
jobs: testfixtures.TestNApiJobGang(100),
expectPass: false,
},
"Less than min cardinality gang jobs in a batch skips submit check": {
executorTimout: defaultTimeout,
config: testfixtures.TestSchedulingConfig(),
executors: []*schedulerobjects.Executor{testfixtures.TestExecutor(testfixtures.BaseTime)},
jobs: testfixtures.TestNApiJobGangLessThanMinCardinality(5),
expectPass: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand All @@ -190,69 +192,3 @@ func TestSubmitChecker_TestCheckApiJobs(t *testing.T) {
})
}
}

// TODO: Move to testfixtures_test.go/delete in favour of existing fixture.
func test1CoreCpuJob() *api.Job {
return &api.Job{
Id: util.NewULID(),
Queue: uuid.NewString(),
PodSpec: &v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: map[v1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("1"),
},
Requests: map[v1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("1"),
},
},
},
},
},
}
}

// TODO: Move to testfixtures_test.go.
func testNJobGang(n int) []*api.Job {
gangId := uuid.NewString()
gang := make([]*api.Job, n)
for i := 0; i < n; i++ {
job := test1CoreCpuJob()
job.Annotations = map[string]string{
configuration.GangIdAnnotation: gangId,
configuration.GangCardinalityAnnotation: fmt.Sprintf("%d", n),
configuration.GangMinimumCardinalityAnnotation: fmt.Sprintf("%d", n),
}
gang[i] = job
}
return gang
}

// TODO: Move to testfixtures_test.go.
func test100CoreCpuJob() *api.Job {
job := test1CoreCpuJob()
hundredCores := map[v1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("100"),
}
job.PodSpec.Containers[0].Resources.Limits = hundredCores
job.PodSpec.Containers[0].Resources.Requests = hundredCores
return job
}

// TODO: Move to testfixtures_test.go.
func test1CoreCpuJobWithNodeSelector(selector map[string]string) *api.Job {
job := test1CoreCpuJob()
job.PodSpec.NodeSelector = selector
return job
}

// TODO: Move to testfixtures_test.go.
func testExecutor(lastUpdateTime time.Time) *schedulerobjects.Executor {
return &schedulerobjects.Executor{
Id: uuid.NewString(),
Pool: "cpu",
LastUpdateTime: lastUpdateTime,
Nodes: testfixtures.TestCluster(),
}
}
78 changes: 78 additions & 0 deletions internal/scheduler/testfixtures/testfixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync/atomic"
"time"

"github.com/armadaproject/armada/pkg/api"

"github.com/google/uuid"
"github.com/oklog/ulid"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -796,3 +798,79 @@ func TestRunningJobDbJob(startTime int64) *jobdb.Job {
WithQueued(false).
WithUpdatedRun(jobdb.MinimalRun(uuid.New(), startTime))
}

func Test1CoreCpuApiJob() *api.Job {
return &api.Job{
Id: util.NewULID(),
Queue: uuid.NewString(),
PodSpec: &v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: map[v1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("1"),
},
Requests: map[v1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("1"),
},
},
},
},
},
}
}

func TestNApiJobGang(n int) []*api.Job {
gangId := uuid.NewString()
gang := make([]*api.Job, n)
for i := 0; i < n; i++ {
job := Test1CoreCpuApiJob()
job.Annotations = map[string]string{
configuration.GangIdAnnotation: gangId,
configuration.GangCardinalityAnnotation: fmt.Sprintf("%d", n),
configuration.GangMinimumCardinalityAnnotation: fmt.Sprintf("%d", n),
}
gang[i] = job
}
return gang
}

func TestNApiJobGangLessThanMinCardinality(n int) []*api.Job {
gangId := uuid.NewString()
gang := make([]*api.Job, n)
for i := 0; i < n; i++ {
job := Test1CoreCpuApiJob()
job.Annotations = map[string]string{
configuration.GangIdAnnotation: gangId,
configuration.GangCardinalityAnnotation: fmt.Sprintf("%d", n+2),
configuration.GangMinimumCardinalityAnnotation: fmt.Sprintf("%d", n+1),
}
gang[i] = job
}
return gang
}

func Test100CoreCpuApiJob() *api.Job {
job := Test1CoreCpuApiJob()
hundredCores := map[v1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("100"),
}
job.PodSpec.Containers[0].Resources.Limits = hundredCores
job.PodSpec.Containers[0].Resources.Requests = hundredCores
return job
}

func Test1CoreCpuApiJobWithNodeSelector(selector map[string]string) *api.Job {
job := Test1CoreCpuApiJob()
job.PodSpec.NodeSelector = selector
return job
}

func TestExecutor(lastUpdateTime time.Time) *schedulerobjects.Executor {
return &schedulerobjects.Executor{
Id: uuid.NewString(),
Pool: "cpu",
LastUpdateTime: lastUpdateTime,
Nodes: TestCluster(),
}
}

0 comments on commit 11adf35

Please sign in to comment.