diff --git a/.github/workflows/airflow-operator-release-to-pypi.yml b/.github/workflows/airflow-operator-release-to-pypi.yml index 5a0e27210d3..4b1f7a8d17f 100644 --- a/.github/workflows/airflow-operator-release-to-pypi.yml +++ b/.github/workflows/airflow-operator-release-to-pypi.yml @@ -20,8 +20,8 @@ jobs: - run: go run github.com/magefile/mage@v1.14.0 -v airflowOperator - uses: ./.github/workflows/python-tests with: - python-version: '3.8' - tox-env: 'py38' + python-version: '3.10' + tox-env: 'py310' path: third_party/airflow github-token: ${{secrets.GITHUB_TOKEN}} - name: Publish package to PyPI diff --git a/.github/workflows/airflow-operator.yml b/.github/workflows/airflow-operator.yml index 50747f98dc7..03e717b029e 100644 --- a/.github/workflows/airflow-operator.yml +++ b/.github/workflows/airflow-operator.yml @@ -40,13 +40,14 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - python: [ '3.8', '3.9', '3.10' ] + python: [ '3.10', '3.11', '3.12' ] include: - - tox-env: 'py38' - - tox-env: 'py39' - python: '3.9' - tox-env: 'py310' python: '3.10' + - tox-env: 'py311' + python: '3.11' + - tox-env: 'py312' + python: '3.12' steps: - uses: actions/checkout@v4 - name: Setup Go diff --git a/.github/workflows/python-client-release-to-pypi.yml b/.github/workflows/python-client-release-to-pypi.yml index 33866ffd5c9..7c4877b2f6c 100644 --- a/.github/workflows/python-client-release-to-pypi.yml +++ b/.github/workflows/python-client-release-to-pypi.yml @@ -19,8 +19,8 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: ./.github/workflows/python-tests with: - python-version: '3.8' - tox-env: 'py38' + python-version: '3.9' + tox-env: 'py39' path: 'client/python' github-token: ${{secrets.GITHUB_TOKEN}} - name: Publish package to PyPI diff --git a/.github/workflows/python-client.yml b/.github/workflows/python-client.yml index 8541394a811..a9f5cfd9155 100644 --- a/.github/workflows/python-client.yml +++ b/.github/workflows/python-client.yml @@ -34,13 +34,16 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - python: [ '3.8', '3.9', '3.10' ] + python: [ '3.9', '3.10', '3.11', '3.12' ] include: - - tox-env: 'py38' - tox-env: 'py39' python: '3.9' - tox-env: 'py310' python: '3.10' + - tox-env: 'py311' + python: '3.11' + - tox-env: 'py312' + python: '3.12' steps: - uses: actions/checkout@v4 - name: Setup Go diff --git a/build/airflow-operator/Dockerfile b/build/airflow-operator/Dockerfile index 87a2e81a5cb..4fbe0f49bab 100644 --- a/build/airflow-operator/Dockerfile +++ b/build/airflow-operator/Dockerfile @@ -1,5 +1,5 @@ ARG PLATFORM=x86_64 -ARG BASE_IMAGE=python:3.10.14-bookworm +ARG BASE_IMAGE=python:3.10-bookworm FROM --platform=$PLATFORM ${BASE_IMAGE} RUN mkdir /proto diff --git a/build/python-client/Dockerfile b/build/python-client/Dockerfile index 10aa957944b..3122bc8c982 100644 --- a/build/python-client/Dockerfile +++ b/build/python-client/Dockerfile @@ -1,6 +1,5 @@ ARG PLATFORM=x86_64 -ARG BASE_IMAGE=python:3.8.18-bookworm - +ARG BASE_IMAGE=python:3.9-bookworm FROM --platform=$PLATFORM ${BASE_IMAGE} RUN mkdir /proto diff --git a/client/python/pyproject.toml b/client/python/pyproject.toml index c9f06f55ba7..2d3b0783062 100644 --- a/client/python/pyproject.toml +++ b/client/python/pyproject.toml @@ -1,18 +1,18 @@ [project] name = "armada_client" -version = "0.4.8" +version = "0.4.10" description = "Armada gRPC API python client" readme = "README.md" -requires-python = ">=3.7" +requires-python = ">=3.9" dependencies = ["grpcio==1.66.1", "grpcio-tools==1.66.1", "mypy-protobuf>=3.2.0", "protobuf>=5.26.1,<6.0dev" ] license = { text = "Apache Software License" } authors = [{ name = "G-Research Open Source Software", email = "armada@armadaproject.io" }] [project.optional-dependencies] -format = ["black==23.7.0", "flake8==7.0.0", "pylint==2.17.5"] +format = ["black>=23.7.0", "flake8>=7.0.0", "pylint>=2.17.5"] # note(JayF): sphinx-jekyll-builder was broken by sphinx-markdown-builder 0.6 -- so pin to 0.5.5 docs = ["sphinx==7.1.2", "sphinx-jekyll-builder==0.3.0", "sphinx-toolbox==3.2.0b1", "sphinx-markdown-builder==0.5.5"] -test = ["pytest==7.3.1", "coverage>=6.5.0", "pytest-asyncio==0.21.1"] +test = ["pytest==7.3.1", "coverage==6.5.0", "pytest-asyncio==0.21.1"] [build-system] requires = ["setuptools"] diff --git a/config/lookoutv2/config.yaml b/config/lookoutv2/config.yaml index f423cc5eeed..3a35d84592d 100644 --- a/config/lookoutv2/config.yaml +++ b/config/lookoutv2/config.yaml @@ -32,3 +32,7 @@ uiConfig: template: "kubectl --context {{ runs[runs.length - 1].cluster }} -n {{ namespace }} logs armada-{{ jobId }}-0" - name: Exec template: "kubectl --context {{ runs[runs.length - 1].cluster }} -n {{ namespace }} exec -it armada-{{ jobId }}-0 /bin/sh" + descriptionMd: Execute a command on the job's container. + alertMessageMd: | + This will only work if the container is still running. + alertLevel: info diff --git a/internal/executor/util/ingress_service_config.go b/internal/executor/util/ingress_service_config.go deleted file mode 100644 index 2f72f7147d6..00000000000 --- a/internal/executor/util/ingress_service_config.go +++ /dev/null @@ -1,86 +0,0 @@ -package util - -import ( - "golang.org/x/exp/maps" - "golang.org/x/exp/slices" - - "github.com/armadaproject/armada/pkg/api" -) - -type IngressServiceType int - -const ( - Ingress IngressServiceType = iota - NodePort - Headless -) - -func (st IngressServiceType) String() string { - return []string{"Ingress", "NodePort", "Headless"}[st] -} - -type IngressServiceConfig struct { - Type IngressServiceType - Ports []uint32 - Annotations map[string]string - TlsEnabled bool - CertName string - UseClusterIp bool -} - -func deepCopy(config *IngressServiceConfig) *IngressServiceConfig { - return &IngressServiceConfig{ - Type: config.Type, - Ports: slices.Clone(config.Ports), - Annotations: maps.Clone(config.Annotations), - TlsEnabled: config.TlsEnabled, - CertName: config.CertName, - UseClusterIp: config.UseClusterIp, - } -} - -func CombineIngressService(ingresses []*api.IngressConfig, services []*api.ServiceConfig) []*IngressServiceConfig { - result := []*IngressServiceConfig{} - - for _, ing := range ingresses { - result = append( - result, - &IngressServiceConfig{ - Type: Ingress, - Ports: slices.Clone(ing.Ports), - Annotations: maps.Clone(ing.Annotations), - TlsEnabled: ing.TlsEnabled, - CertName: ing.CertName, - UseClusterIp: ing.UseClusterIP, - }, - ) - } - - for _, svc := range services { - svcType := NodePort - useClusterIP := true - if svc.Type == api.ServiceType_Headless { - svcType = Headless - useClusterIP = false - } - result = append( - result, - &IngressServiceConfig{ - Type: svcType, - Ports: slices.Clone(svc.Ports), - UseClusterIp: useClusterIP, - }, - ) - } - - return result -} - -func useClusterIP(configs []*IngressServiceConfig) bool { - for _, config := range configs { - if config.UseClusterIp { - return true - } - } - return false -} diff --git a/internal/executor/util/ingress_util.go b/internal/executor/util/ingress_util.go deleted file mode 100644 index aef695d96e6..00000000000 --- a/internal/executor/util/ingress_util.go +++ /dev/null @@ -1,117 +0,0 @@ -package util - -import ( - "fmt" - "strings" - - v1 "k8s.io/api/core/v1" - networking "k8s.io/api/networking/v1" - - "github.com/armadaproject/armada/internal/common/util" - "github.com/armadaproject/armada/internal/executor/configuration" - "github.com/armadaproject/armada/pkg/api" -) - -func GenerateIngresses(job *api.Job, pod *v1.Pod, ingressConfig *configuration.IngressConfiguration) ([]*v1.Service, []*networking.Ingress) { - services := []*v1.Service{} - ingresses := []*networking.Ingress{} - ingressToGen := CombineIngressService(job.Ingress, job.Services) - groupedIngressConfigs := groupIngressConfig(ingressToGen) - for svcType, configs := range groupedIngressConfigs { - if len(GetServicePorts(configs, &pod.Spec)) > 0 { - service := CreateService(job, pod, GetServicePorts(configs, &pod.Spec), svcType, useClusterIP(configs)) - services = append(services, service) - - if svcType == Ingress { - for index, config := range configs { - if len(GetServicePorts([]*IngressServiceConfig{config}, &pod.Spec)) <= 0 { - continue - } - // TODO: This results in an invalid name (one starting with "-") if pod.Name is the empty string; - // we should return an error if that's the case. - ingressName := fmt.Sprintf("%s-%s-%d", pod.Name, strings.ToLower(svcType.String()), index) - ingress := CreateIngress(ingressName, job, pod, service, ingressConfig, config) - ingresses = append(ingresses, ingress) - } - } - } - } - - return services, ingresses -} - -func groupIngressConfig(configs []*IngressServiceConfig) map[IngressServiceType][]*IngressServiceConfig { - result := gatherIngressConfig(configs) - - for ingressType, grp := range result { - result[ingressType] = mergeOnAnnotations(grp) - } - - return result -} - -// gatherIngressConfig takes a list of ingress configs and groups them by IngressServiceType -func gatherIngressConfig(configs []*IngressServiceConfig) map[IngressServiceType][]*IngressServiceConfig { - result := make(map[IngressServiceType][]*IngressServiceConfig, 10) - - for _, config := range configs { - result[config.Type] = append(result[config.Type], deepCopy(config)) - } - - return result -} - -func mergeOnAnnotations(configs []*IngressServiceConfig) []*IngressServiceConfig { - result := make([]*IngressServiceConfig, 0, len(configs)) - - for _, config := range configs { - matchFound := false - - for _, existingConfig := range result { - if util.Equal(config.Annotations, existingConfig.Annotations) { - existingConfig.Ports = append(existingConfig.Ports, config.Ports...) - matchFound = true - } - } - if !matchFound { - result = append(result, deepCopy(config)) - } - } - - return result -} - -func GetServicePorts(svcConfigs []*IngressServiceConfig, podSpec *v1.PodSpec) []v1.ServicePort { - var servicePorts []v1.ServicePort - - for _, container := range podSpec.Containers { - ports := container.Ports - for _, svcConfig := range svcConfigs { - for _, port := range ports { - // Don't expose host via service, this will already be handled by kubernetes - if port.HostPort > 0 { - continue - } - if contains(svcConfig, uint32(port.ContainerPort)) { - servicePort := v1.ServicePort{ - Name: fmt.Sprintf("%s-%d", container.Name, port.ContainerPort), - Port: port.ContainerPort, - Protocol: port.Protocol, - } - servicePorts = append(servicePorts, servicePort) - } - } - } - } - - return servicePorts -} - -func contains(portConfig *IngressServiceConfig, port uint32) bool { - for _, p := range portConfig.Ports { - if p == port { - return true - } - } - return false -} diff --git a/internal/executor/util/ingress_util_test.go b/internal/executor/util/ingress_util_test.go deleted file mode 100644 index 3b3ee63bd33..00000000000 --- a/internal/executor/util/ingress_util_test.go +++ /dev/null @@ -1,464 +0,0 @@ -package util - -import ( - "testing" - - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - - "github.com/armadaproject/armada/pkg/api" -) - -func TestDeepCopy(t *testing.T) { - input := &IngressServiceConfig{ - Type: NodePort, - Ports: []uint32{1, 2, 3}, - Annotations: map[string]string{ - "a": "value", - "b": "value2", - }, - } - result := deepCopy(input) - assert.Equal(t, input, result) - - result.Annotations["c"] = "value3" - assert.NotEqual(t, input, result) - - result = deepCopy(input) - result.Ports = append(result.Ports, 4) - assert.NotEqual(t, input, result) -} - -func TestGetServicePorts(t *testing.T) { - config := &IngressServiceConfig{ - Ports: []uint32{1, 2, 3}, - } - podSpec := &v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "a", - Ports: []v1.ContainerPort{ - { - ContainerPort: 1, - Protocol: v1.ProtocolTCP, - }, - { - ContainerPort: 2, - Protocol: v1.ProtocolUDP, - }, - }, - }, - }, - } - expected := []v1.ServicePort{ - { - Name: "a-1", - Protocol: v1.ProtocolTCP, - Port: 1, - }, - { - Name: "a-2", - Protocol: v1.ProtocolUDP, - Port: 2, - }, - } - - assert.Equal(t, GetServicePorts([]*IngressServiceConfig{config}, podSpec), expected) -} - -func TestGetServicePorts_MultipleContainer(t *testing.T) { - config := &IngressServiceConfig{ - Ports: []uint32{1, 2, 3}, - } - podSpec := &v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "a", - Ports: []v1.ContainerPort{ - { - ContainerPort: 1, - Protocol: v1.ProtocolTCP, - }, - }, - }, - { - Name: "b", - Ports: []v1.ContainerPort{ - { - ContainerPort: 2, - Protocol: v1.ProtocolUDP, - }, - }, - }, - }, - } - expected := []v1.ServicePort{ - { - Name: "a-1", - Protocol: v1.ProtocolTCP, - Port: 1, - }, - { - Name: "b-2", - Protocol: v1.ProtocolUDP, - Port: 2, - }, - } - - assert.Equal(t, GetServicePorts([]*IngressServiceConfig{config}, podSpec), expected) -} - -func TestGetServicePorts_MultipleIngressConfigs(t *testing.T) { - config1 := &IngressServiceConfig{ - Ports: []uint32{1}, - } - config2 := &IngressServiceConfig{ - Ports: []uint32{2}, - } - config3 := &IngressServiceConfig{ - Ports: []uint32{3}, - } - podSpec := &v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "a", - Ports: []v1.ContainerPort{ - { - ContainerPort: 1, - Protocol: v1.ProtocolTCP, - }, - { - ContainerPort: 2, - Protocol: v1.ProtocolUDP, - }, - }, - }, - }, - } - expected := []v1.ServicePort{ - { - Name: "a-1", - Protocol: v1.ProtocolTCP, - Port: 1, - }, - { - Name: "a-2", - Protocol: v1.ProtocolUDP, - Port: 2, - }, - } - servicePorts := GetServicePorts([]*IngressServiceConfig{config1, config2, config3}, podSpec) - assert.Equal(t, servicePorts, expected) -} - -func TestGetServicePorts_HostPortSkipped(t *testing.T) { - config := &IngressServiceConfig{ - Ports: []uint32{1, 2, 3}, - } - podSpec := &v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "a", - Ports: []v1.ContainerPort{ - { - ContainerPort: 1, - HostPort: 100, - Protocol: v1.ProtocolTCP, - }, - { - ContainerPort: 2, - Protocol: v1.ProtocolUDP, - }, - }, - }, - }, - } - expected := []v1.ServicePort{ - { - Name: "a-2", - Protocol: v1.ProtocolUDP, - Port: 2, - }, - } - - assert.Equal(t, GetServicePorts([]*IngressServiceConfig{config}, podSpec), expected) -} - -func TestGroupIngressConfig_IngressTypeNodePort_AlwaysGrouped(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - NodePort: { - { - Type: NodePort, - Ports: []uint32{1, 2, 3}, - }, - }, - } - input1 := &IngressServiceConfig{ - Type: NodePort, - Ports: []uint32{1, 2}, - } - input2 := &IngressServiceConfig{ - Type: NodePort, - Ports: []uint32{3}, - } - groupedConfig := groupIngressConfig([]*IngressServiceConfig{input1, input2}) - assert.Equal(t, groupedConfig, expected) - - // Non ingress type will never have annotations anymore - assert.Equal(t, groupIngressConfig([]*IngressServiceConfig{input1, input2}), expected) -} - -func TestGroupIngressConfig_IngressType_NoAnnotations(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - Ingress: { - { - Type: Ingress, - Ports: []uint32{1, 2, 3}, - }, - }, - } - input1 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{1, 2}, - } - input2 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{3}, - } - groupedConfig := groupIngressConfig([]*IngressServiceConfig{input1, input2}) - assert.Equal(t, groupedConfig, expected) -} - -func TestGroupIngressConfig_IngressType_SameAnnotations(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - Ingress: { - { - Type: Ingress, - Ports: []uint32{1, 2, 3}, - Annotations: map[string]string{ - "test": "value", - }, - }, - }, - } - input1 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{1, 2}, - Annotations: map[string]string{ - "test": "value", - }, - } - input2 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{3}, - Annotations: map[string]string{ - "test": "value", - }, - } - assert.Equal(t, groupIngressConfig([]*IngressServiceConfig{input1, input2}), expected) -} - -func TestGroupIngressConfig_IngressType_DifferentAnnotations(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - Ingress: { - { - Type: Ingress, - Ports: []uint32{1, 2}, - Annotations: map[string]string{ - "test": "value", - }, - }, - { - Type: Ingress, - Ports: []uint32{3}, - Annotations: map[string]string{ - "test": "value2", - }, - }, - }, - } - input1 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{1, 2}, - Annotations: map[string]string{ - "test": "value", - }, - } - input2 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{3}, - Annotations: map[string]string{ - "test": "value2", - }, - } - groupedConfig := groupIngressConfig([]*IngressServiceConfig{input1, input2}) - assert.Equal(t, groupedConfig, expected) -} - -func TestGroupIngressConfig_MixedIngressType(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - Ingress: { - { - Type: Ingress, - Ports: []uint32{1, 2}, - Annotations: map[string]string{ - "test": "value", - }, - }, - { - Type: Ingress, - Ports: []uint32{3}, - Annotations: map[string]string{ - "test": "value2", - }, - }, - }, - NodePort: { - { - Type: NodePort, - Ports: []uint32{4, 5}, - }, - }, - } - input1 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{1, 2}, - Annotations: map[string]string{ - "test": "value", - }, - } - input2 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{3}, - Annotations: map[string]string{ - "test": "value2", - }, - } - input3 := &IngressServiceConfig{ - Type: NodePort, - Ports: []uint32{4, 5}, - } - groupedConfig := groupIngressConfig([]*IngressServiceConfig{input1, input2, input3}) - assert.Equal(t, groupedConfig, expected) -} - -func TestGroupIngressConfig_IngressType_Headless(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - Headless: { - { - Type: Headless, - Ports: []uint32{1}, - }, - }, - } - input := &IngressServiceConfig{ - Type: Headless, - Ports: []uint32{1}, - } - groupedConfig := groupIngressConfig([]*IngressServiceConfig{input}) - assert.Equal(t, groupedConfig, expected) -} - -func TestGatherIngressConfigs(t *testing.T) { - inputConfigs := []*IngressServiceConfig{ - { - Type: Ingress, - Ports: []uint32{1}, - }, - { - Type: Ingress, - Ports: []uint32{2}, - }, - { - Type: Headless, - Ports: []uint32{1}, - }, - { - Type: NodePort, - Ports: []uint32{1}, - }, - { - Type: Headless, - Ports: []uint32{2}, - }, - } - - expected := map[IngressServiceType][]*IngressServiceConfig{ - Ingress: { - { - Type: Ingress, - Ports: []uint32{1}, - }, - { - Type: Ingress, - Ports: []uint32{2}, - }, - }, - NodePort: { - { - Type: NodePort, - Ports: []uint32{1}, - }, - }, - Headless: { - { - Type: Headless, - Ports: []uint32{1}, - }, - { - Type: Headless, - Ports: []uint32{2}, - }, - }, - } - - assert.Equal(t, gatherIngressConfig(inputConfigs), expected) -} - -func TestCombineIngressService(t *testing.T) { - ingress := []*api.IngressConfig{ - { - Ports: []uint32{1, 2, 3}, - Annotations: map[string]string{ - "Hello": "World", - }, - TlsEnabled: true, - UseClusterIP: false, - }, - } - - services := []*api.ServiceConfig{ - { - Type: api.ServiceType_Headless, - Ports: []uint32{4}, - }, - { - Type: api.ServiceType_NodePort, - Ports: []uint32{5}, - }, - } - - expected := []*IngressServiceConfig{ - { - Type: Ingress, - Ports: []uint32{1, 2, 3}, - Annotations: map[string]string{ - "Hello": "World", - }, - TlsEnabled: true, - UseClusterIp: false, - }, - { - Type: Headless, - Ports: []uint32{4}, - UseClusterIp: false, - }, - { - Type: NodePort, - Ports: []uint32{5}, - UseClusterIp: true, - }, - } - - assert.Equal(t, expected, CombineIngressService(ingress, services)) -} diff --git a/internal/executor/util/kubernetes_object.go b/internal/executor/util/kubernetes_object.go index 1b66502e1ab..ccfcf7de929 100644 --- a/internal/executor/util/kubernetes_object.go +++ b/internal/executor/util/kubernetes_object.go @@ -3,7 +3,6 @@ package util import ( "fmt" "strconv" - "strings" v1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" @@ -18,140 +17,6 @@ import ( "github.com/armadaproject/armada/pkg/executorapi" ) -func CreateService( - job *api.Job, - pod *v1.Pod, - ports []v1.ServicePort, - ingSvcType IngressServiceType, - useClusterIP bool, -) *v1.Service { - serviceType := v1.ServiceTypeClusterIP - if ingSvcType == NodePort { - serviceType = v1.ServiceTypeNodePort - } - - clusterIP := "" - if !useClusterIP { - clusterIP = "None" - } - - serviceSpec := v1.ServiceSpec{ - Type: serviceType, - Selector: map[string]string{ - domain.JobId: pod.Labels[domain.JobId], - domain.Queue: pod.Labels[domain.Queue], - domain.PodNumber: pod.Labels[domain.PodNumber], - }, - Ports: ports, - ClusterIP: clusterIP, - } - labels := util.MergeMaps(job.Labels, map[string]string{ - domain.JobId: pod.Labels[domain.JobId], - domain.Queue: pod.Labels[domain.Queue], - domain.PodNumber: pod.Labels[domain.PodNumber], - }) - annotation := util.MergeMaps(job.Annotations, map[string]string{ - domain.JobSetId: job.JobSetId, - domain.Owner: job.Owner, - }) - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", pod.Name, strings.ToLower(ingSvcType.String())), - Labels: labels, - Annotations: annotation, - Namespace: job.Namespace, - }, - Spec: serviceSpec, - } - return service -} - -func CreateIngress( - name string, - job *api.Job, - pod *v1.Pod, - service *v1.Service, - executorIngressConfig *configuration.IngressConfiguration, - jobConfig *IngressServiceConfig, -) *networking.Ingress { - labels := util.MergeMaps(job.Labels, map[string]string{ - domain.JobId: pod.Labels[domain.JobId], - domain.Queue: pod.Labels[domain.Queue], - domain.PodNumber: pod.Labels[domain.PodNumber], - }) - annotations := util.MergeMaps(job.Annotations, executorIngressConfig.Annotations) - annotations = util.MergeMaps(annotations, jobConfig.Annotations) - annotations = util.MergeMaps(annotations, map[string]string{ - domain.JobSetId: job.JobSetId, - domain.Owner: job.Owner, - }) - - rules := make([]networking.IngressRule, 0, len(service.Spec.Ports)) - tlsHosts := make([]string, 0, len(service.Spec.Ports)) - - // Rest of the hosts are generated off port information - for _, servicePort := range service.Spec.Ports { - if !contains(jobConfig, uint32(servicePort.Port)) { - continue - } - host := fmt.Sprintf("%s-%s.%s.%s", servicePort.Name, pod.Name, pod.Namespace, executorIngressConfig.HostnameSuffix) - tlsHosts = append(tlsHosts, host) - - // Workaround to get constant's address - pathType := networking.PathTypePrefix - path := networking.IngressRule{ - Host: host, - IngressRuleValue: networking.IngressRuleValue{ - HTTP: &networking.HTTPIngressRuleValue{ - Paths: []networking.HTTPIngressPath{ - { - Path: "/", - PathType: &pathType, - Backend: networking.IngressBackend{ - Service: &networking.IngressServiceBackend{ - Name: service.Name, - Port: networking.ServiceBackendPort{ - Number: servicePort.Port, - }, - }, - }, - }, - }, - }, - }, - } - rules = append(rules, path) - } - - tls := make([]networking.IngressTLS, 0, 1) - - if jobConfig.TlsEnabled { - certName := jobConfig.CertName - if certName == "" { - certName = fmt.Sprintf("%s-%s", job.Namespace, executorIngressConfig.CertNameSuffix) - } - - tls = append(tls, networking.IngressTLS{ - Hosts: tlsHosts, - SecretName: certName, - }) - } - - ingress := &networking.Ingress{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: labels, - Annotations: annotations, - Namespace: job.Namespace, - }, - Spec: networking.IngressSpec{ - Rules: rules, - TLS: tls, - }, - } - return ingress -} - func CreateOwnerReference(pod *v1.Pod) metav1.OwnerReference { return metav1.OwnerReference{ APIVersion: "v1", diff --git a/internal/executor/util/kubernetes_objects_test.go b/internal/executor/util/kubernetes_objects_test.go index 9ee9bca2f66..639aa843aab 100644 --- a/internal/executor/util/kubernetes_objects_test.go +++ b/internal/executor/util/kubernetes_objects_test.go @@ -9,7 +9,6 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" - networking "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/armadaproject/armada/internal/common" @@ -140,346 +139,6 @@ func makePodSpec() *v1.PodSpec { return &spec } -func makeTestJob() *api.Job { - return &api.Job{ - Id: "Id", - JobSetId: "JobSetId", - Queue: "QueueTest", - Owner: "UserTest", - Namespace: "testNamespace", - PodSpecs: []*v1.PodSpec{makePodSpec()}, - } -} - -func makeTestService() *v1.Service { - return &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: "testService"}, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Name: "testPort", - Port: 8080, - }, - }, - }, - } -} - -func TestCreateIngress_Basic(t *testing.T) { - // Boilerplate, should be the same in TlsEnabled - job := makeTestJob() - service := makeTestService() - pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: "testNamespace"}} - ingressConfig := &configuration.IngressConfiguration{ - HostnameSuffix: "testSuffix", - } - - // TLS disabled jobconfig - jobConfig := &IngressServiceConfig{ - Ports: []uint32{8080}, - } - - result := CreateIngress("testIngress", job, pod, service, ingressConfig, jobConfig) - - pathType := networking.PathTypePrefix - expectedIngressSpec := networking.IngressSpec{ - TLS: []networking.IngressTLS{}, - Rules: []networking.IngressRule{ - { - Host: "testPort-testPod.testNamespace.testSuffix", - IngressRuleValue: networking.IngressRuleValue{ - HTTP: &networking.HTTPIngressRuleValue{ - Paths: []networking.HTTPIngressPath{ - { - Path: "/", - PathType: &pathType, - Backend: networking.IngressBackend{ - Service: &networking.IngressServiceBackend{ - Name: "testService", - Port: networking.ServiceBackendPort{ - Number: 8080, - }, - }, - }, - }, - }, - }, - }, - }, - }, - } - - assert.Equal(t, result.Spec, expectedIngressSpec) -} - -func TestCreateIngress_TLS(t *testing.T) { - // Boilerplate setup - job := makeTestJob() - service := makeTestService() - pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: "testNamespace"}} - ingressConfig := &configuration.IngressConfiguration{ - HostnameSuffix: "testSuffix", - CertNameSuffix: "ingress-tls-certificate", - } - - // TLS enabled in this test - jobConfig := &IngressServiceConfig{ - TlsEnabled: true, - Ports: []uint32{8080}, - } - - result := CreateIngress("testIngress", job, pod, service, ingressConfig, jobConfig) - - pathType := networking.PathTypePrefix - expectedIngressSpec := networking.IngressSpec{ - TLS: []networking.IngressTLS{ - { - Hosts: []string{ - "testPort-testPod.testNamespace.testSuffix", - }, - SecretName: "testNamespace-ingress-tls-certificate", - }, - }, - Rules: []networking.IngressRule{ - { - Host: "testPort-testPod.testNamespace.testSuffix", - IngressRuleValue: networking.IngressRuleValue{ - HTTP: &networking.HTTPIngressRuleValue{ - Paths: []networking.HTTPIngressPath{ - { - Path: "/", - PathType: &pathType, - Backend: networking.IngressBackend{ - Service: &networking.IngressServiceBackend{ - Name: "testService", - Port: networking.ServiceBackendPort{ - Number: 8080, - }, - }, - }, - }, - }, - }, - }, - }, - }, - } - - assert.Equal(t, result.Spec, expectedIngressSpec) -} - -func TestCreateService_Ingress_Headless(t *testing.T) { - job := makeTestJob() - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - }, - } - ports := []v1.ServicePort{ - { - Port: 123, - }, - } - ingressType := Ingress - createdService := CreateService(job, pod, ports, ingressType, false) - - expected := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod-ingress", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Annotations: map[string]string{ - "armada_jobset_id": "JobSetId", - "armada_owner": "UserTest", - }, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Port: 123, - }, - }, - Selector: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Type: "ClusterIP", - ClusterIP: "None", - }, - } - assert.Equal(t, createdService, expected) -} - -func TestCreateService_Ingress_ClusterIP(t *testing.T) { - job := makeTestJob() - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - }, - } - ports := []v1.ServicePort{ - { - Port: 123, - }, - } - ingressType := Ingress - createdService := CreateService(job, pod, ports, ingressType, true) - - expected := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod-ingress", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Annotations: map[string]string{ - "armada_jobset_id": "JobSetId", - "armada_owner": "UserTest", - }, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Port: 123, - }, - }, - Selector: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Type: "ClusterIP", - }, - } - assert.Equal(t, createdService, expected) -} - -func TestCreateService_NodePort(t *testing.T) { - job := makeTestJob() - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - }, - } - ports := []v1.ServicePort{ - { - Port: 123, - NodePort: 456, - }, - } - ingressType := NodePort - createdService := CreateService(job, pod, ports, ingressType, true) - - expected := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod-nodeport", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Annotations: map[string]string{ - "armada_jobset_id": "JobSetId", - "armada_owner": "UserTest", - }, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Port: 123, - NodePort: 456, - }, - }, - Selector: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Type: "NodePort", - }, - } - assert.Equal(t, createdService, expected) -} - -func TestCreateService_Headless(t *testing.T) { - job := makeTestJob() - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - }, - } - ports := []v1.ServicePort{ - { - Port: 123, - }, - } - ingressType := Headless - createdService := CreateService(job, pod, ports, ingressType, false) - - expected := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod-headless", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Annotations: map[string]string{ - "armada_jobset_id": "JobSetId", - "armada_owner": "UserTest", - }, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Port: 123, - }, - }, - Selector: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Type: "ClusterIP", - ClusterIP: "None", - }, - } - assert.Equal(t, createdService, expected) -} - func TestCreatePodFromExecutorApiJob(t *testing.T) { runId := uuid.NewString() jobId := util.NewULID() diff --git a/internal/lookout/ui/package.json b/internal/lookout/ui/package.json index 052baa34c44..a0cd1b4347b 100644 --- a/internal/lookout/ui/package.json +++ b/internal/lookout/ui/package.json @@ -38,6 +38,8 @@ "date-fns-tz": "^1.3.7", "js-yaml": "^4.0.0", "lodash": "^4.17.21", + "markdown-to-jsx": "^7.7.3", + "mui-markdown": "^1.2.5", "notistack": "^3.0.1", "oidc-client-ts": "^2.3.0", "prism-react-renderer": "^2.4.1", diff --git a/internal/lookout/ui/src/components/lookoutV2/sidebar/SidebarTabJobCommands.tsx b/internal/lookout/ui/src/components/lookoutV2/sidebar/SidebarTabJobCommands.tsx index 938175564e2..8409370aee2 100644 --- a/internal/lookout/ui/src/components/lookoutV2/sidebar/SidebarTabJobCommands.tsx +++ b/internal/lookout/ui/src/components/lookoutV2/sidebar/SidebarTabJobCommands.tsx @@ -1,6 +1,7 @@ import { OpenInNew } from "@mui/icons-material" -import { Link, Stack } from "@mui/material" +import { Alert, AlertColor, Link, Stack } from "@mui/material" import { template, templateSettings } from "lodash" +import { MuiMarkdown } from "mui-markdown" import { Fragment } from "react/jsx-runtime" import validator from "validator" @@ -11,6 +12,8 @@ import { SPACING } from "../../../styling/spacing" import { CommandSpec } from "../../../utils" import { CodeBlock } from "../../CodeBlock" +const KNOWN_ALERT_COLORS: AlertColor[] = ["success", "info", "warning", "error"] + export interface SidebarTabJobCommandsProps { job: Job commandSpecs: CommandSpec[] @@ -35,27 +38,43 @@ export const SidebarTabJobCommands = ({ job, commandSpecs }: SidebarTabJobComman return ( <> {commandSpecs.map((commandSpec) => { - const { name } = commandSpec + const { name, descriptionMd, alertLevel, alertMessageMd } = commandSpec const commandText = getCommandText(job, commandSpec) + + const alertSeverity: AlertColor = + alertLevel && (KNOWN_ALERT_COLORS as string[]).includes(alertLevel) ? (alertLevel as AlertColor) : "info" + return ( {name} - {validator.isURL(commandText) ? ( - - -
{commandText}
- -
- - ) : ( - + {descriptionMd && ( +
+ {descriptionMd} +
+ )} + {alertMessageMd && ( + + {alertMessageMd} + )} +
+ {validator.isURL(commandText) ? ( + + +
{commandText}
+ +
+ + ) : ( + + )} +
) })} diff --git a/internal/lookout/ui/src/services/lookoutV2/useGetUiConfig.ts b/internal/lookout/ui/src/services/lookoutV2/useGetUiConfig.ts index 683c41b3062..4010d3159a1 100644 --- a/internal/lookout/ui/src/services/lookoutV2/useGetUiConfig.ts +++ b/internal/lookout/ui/src/services/lookoutV2/useGetUiConfig.ts @@ -55,10 +55,27 @@ export const useGetUiConfig = (enabled = true) => { } if (json.CommandSpecs) { - config.commandSpecs = json.CommandSpecs.map(({ Name, Template }: { Name: string; Template: string }) => ({ - name: Name, - template: Template, - })) + config.commandSpecs = json.CommandSpecs.map( + ({ + Name, + Template, + DescriptionMd, + AlertMessageMd, + AlertLevel, + }: { + Name: string + Template: string + DescriptionMd: string + AlertMessageMd: string + AlertLevel: string + }) => ({ + name: Name, + template: Template, + descriptionMd: DescriptionMd, + alertMessageMd: AlertMessageMd, + alertLevel: AlertLevel, + }), + ) } if (json.Backend) config.backend = json.Backend diff --git a/internal/lookout/ui/src/utils.tsx b/internal/lookout/ui/src/utils.tsx index 3c4caf410d7..a458e946f89 100644 --- a/internal/lookout/ui/src/utils.tsx +++ b/internal/lookout/ui/src/utils.tsx @@ -9,9 +9,13 @@ export interface OidcConfig { clientId: string scope: string } + export interface CommandSpec { name: string template: string + descriptionMd?: string + alertMessageMd?: string + alertLevel?: string } export interface UIConfig { @@ -75,9 +79,27 @@ export async function getUIConfig(): Promise { scope: json.Oidc.Scope, } if (json.CommandSpecs) { - config.commandSpecs = json.CommandSpecs.map((c: { Name: string; Template: string }) => { - return { name: c.Name, template: c.Template } - }) + config.commandSpecs = json.CommandSpecs.map( + ({ + Name, + Template, + DescriptionMd, + AlertMessageMd, + AlertLevel, + }: { + Name: string + Template: string + DescriptionMd: string + AlertMessageMd: string + AlertLevel: string + }) => ({ + name: Name, + template: Template, + descriptionMd: DescriptionMd, + alertMessageMd: AlertMessageMd, + alertLevel: AlertLevel, + }), + ) } } if (json.Backend) config.backend = json.Backend diff --git a/internal/lookout/ui/yarn.lock b/internal/lookout/ui/yarn.lock index 462a894822e..7a1cf05c44a 100644 --- a/internal/lookout/ui/yarn.lock +++ b/internal/lookout/ui/yarn.lock @@ -3261,6 +3261,11 @@ magic-string@^0.30.17: dependencies: "@jridgewell/sourcemap-codec" "^1.5.0" +markdown-to-jsx@^7.7.3: + version "7.7.3" + resolved "https://registry.yarnpkg.com/markdown-to-jsx/-/markdown-to-jsx-7.7.3.tgz#c75927252592696e9e8b2a9557628749d8ab023e" + integrity sha512-o35IhJDFP6Fv60zPy+hbvZSQMmgvSGdK5j8NRZ7FeZMY+Bgqw+dSg7SC1ZEzC26++CiOUCqkbq96/c3j/FfTEQ== + math-intrinsics@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/math-intrinsics/-/math-intrinsics-1.1.0.tgz#a0dd74be81e2aa5c2f27e65ce283605ee4e2b7f9" @@ -3325,6 +3330,13 @@ ms@^2.1.1, ms@^2.1.3: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== +mui-markdown@^1.2.5: + version "1.2.5" + resolved "https://registry.yarnpkg.com/mui-markdown/-/mui-markdown-1.2.5.tgz#48e8a800c6707f84b77f56f3e553eb00754f15ff" + integrity sha512-zgLSXxYgHmUkUZ6mp2aM8C1vcoAsCyQLyvvaiSf8AAutNnAXhA8tlBiiGg8hOvX77VQs1A1dssbOyT/W2ytonA== + optionalDependencies: + prism-react-renderer "^2.0.3" + nanoid@^3.3.7: version "3.3.8" resolved "https://registry.yarnpkg.com/nanoid/-/nanoid-3.3.8.tgz#b1be3030bee36aaff18bacb375e5cce521684baf" @@ -3579,7 +3591,7 @@ pretty-format@^29.0.0, pretty-format@^29.7.0: ansi-styles "^5.0.0" react-is "^18.0.0" -prism-react-renderer@^2.4.1: +prism-react-renderer@^2.0.3, prism-react-renderer@^2.4.1: version "2.4.1" resolved "https://registry.yarnpkg.com/prism-react-renderer/-/prism-react-renderer-2.4.1.tgz#ac63b7f78e56c8f2b5e76e823a976d5ede77e35f" integrity sha512-ey8Ls/+Di31eqzUxC46h8MksNuGx/n0AAC8uKpwFau4RPDYLuE3EXTp8N8G2vX2N7UC/+IXeNUnlWBGGcAG+Ig== diff --git a/internal/lookoutv2/configuration/types.go b/internal/lookoutv2/configuration/types.go index fa4d399f166..eb3ebfb77d1 100644 --- a/internal/lookoutv2/configuration/types.go +++ b/internal/lookoutv2/configuration/types.go @@ -36,9 +36,31 @@ type PrunerConfig struct { Postgres configuration.PostgresConfig } +// Alert level enum values correspond to the severity levels of the MUI Alert +// component: https://mui.com/material-ui/react-alert/#severity +type AlertLevel string + +const ( + AlertLevelSuccess AlertLevel = "success" + AlertLevelInfo AlertLevel = "info" + AlertLevelWarning AlertLevel = "warning" + AlertLevelError AlertLevel = "error" +) + +// CommandSpec details a command to be displayed on a job's "Commands" sidebar +// tab in the Lookout UI type CommandSpec struct { - Name string + // Name is the title of the command + Name string + // Tempate is the template string for the command Template string + // DescriptionMd is an optional description for the command in Markdown + DescriptionMd string + // AlertMessageMd is an optional message for the command, to be displayed as + // an alert, written in Markdown + AlertMessageMd string + // AlertLevel is the severity level of the alert + AlertLevel AlertLevel } type UIConfig struct { diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index b24bb7b1ff9..90be011edc5 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -177,7 +177,7 @@ type FairSchedulingAlgoContext struct { Txn *jobdb.Txn } -func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Context, txn *jobdb.Txn, pool configuration.PoolConfig) (*FairSchedulingAlgoContext, error) { +func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Context, txn *jobdb.Txn, currentPool configuration.PoolConfig) (*FairSchedulingAlgoContext, error) { executors, err := l.executorRepository.GetExecutors(ctx) if err != nil { return nil, err @@ -194,12 +194,12 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con awayAllocationPools := []string{} for _, otherPool := range l.schedulingConfig.Pools { - if slices.Contains(otherPool.AwayPools, pool.Name) { + if slices.Contains(otherPool.AwayPools, currentPool.Name) { awayAllocationPools = append(awayAllocationPools, otherPool.Name) } } - allPools := []string{pool.Name} - allPools = append(allPools, pool.AwayPools...) + allPools := []string{currentPool.Name} + allPools = append(allPools, currentPool.AwayPools...) allPools = append(allPools, awayAllocationPools...) jobSchedulingInfo, err := calculateJobSchedulingInfo(ctx, @@ -208,7 +208,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con func(_ *schedulerobjects.Executor) bool { return true }), queueByName, txn.GetAll(), - pool.Name, + currentPool.Name, awayAllocationPools, allPools) if err != nil { @@ -238,35 +238,41 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con ctx.Error(errMes) }) - homeJobs := jobSchedulingInfo.jobsByPool[pool.Name] - awayJobs := []*jobdb.Job{} + currentPoolJobs := jobSchedulingInfo.jobsByPool[currentPool.Name] + otherPoolsJobs := []*jobdb.Job{} - for _, otherPool := range l.schedulingConfig.Pools { - if pool.Name == otherPool.Name { + for _, pool := range l.schedulingConfig.Pools { + if currentPool.Name == pool.Name { continue } - if slices.Contains(otherPool.AwayPools, pool.Name) { - homeJobs = append(homeJobs, jobSchedulingInfo.jobsByPool[otherPool.Name]...) + if slices.Contains(pool.AwayPools, currentPool.Name) { + // Jobs from away pools need to be considered in the current scheduling round, so should be added here + // This is so the jobs are available for eviction, if a home job needs to take their place + currentPoolJobs = append(currentPoolJobs, jobSchedulingInfo.jobsByPool[pool.Name]...) + } else { + // Jobs not used by the current pool belong to other pools we aren't currently considering + // Add them here, so their resource can made unallocatable in the nodeDb, preventing us scheduling over them + // The cases this is needed (a node has jobs from multiple pools is) + // - The pool of the node was changed, but still has jobs running from the pool it was previously in + // - A node running home jobs and cross-pool away jobs. In this case when scheduling the cross-pool away jobs + // we need to not schedule over resource used by the home jobs + otherPoolsJobs = append(otherPoolsJobs, jobSchedulingInfo.jobsByPool[pool.Name]...) } } - for _, awayPool := range pool.AwayPools { - awayJobs = append(awayJobs, jobSchedulingInfo.jobsByPool[awayPool]...) - } - - nodePools := append(pool.AwayPools, pool.Name) + nodePools := append(currentPool.AwayPools, currentPool.Name) - nodeDb, err := l.constructNodeDb(homeJobs, awayJobs, + nodeDb, err := l.constructNodeDb(currentPoolJobs, otherPoolsJobs, armadaslices.Filter(nodes, func(node *internaltypes.Node) bool { return slices.Contains(nodePools, node.GetPool()) })) if err != nil { return nil, err } totalResources := nodeDb.TotalKubernetesResources() - totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name)) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(currentPool.Name)) schedulingContext, err := l.constructSchedulingContext( - pool.Name, + currentPool.Name, totalResources, jobSchedulingInfo.demandByQueueAndPriorityClass, jobSchedulingInfo.allocatedByQueueAndPriorityClass, @@ -278,7 +284,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con return &FairSchedulingAlgoContext{ queues: queueByName, - pool: pool.Name, + pool: currentPool.Name, nodeDb: nodeDb, schedulingContext: schedulingContext, nodeIdByJobId: jobSchedulingInfo.nodeIdByJobId, @@ -331,17 +337,6 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m pools = []string{pool} } - matches := false - for _, pool := range pools { - if slices.Contains(allPools, pool) { - matches = true - break - } - } - if !matches { - continue - } - if slices.Contains(pools, currentPool) { queueResources, ok := demandByQueueAndPriorityClass[job.Queue()] if !ok { @@ -369,6 +364,21 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m } pool := job.LatestRun().Pool() + if _, present := jobsByPool[pool]; !present { + jobsByPool[pool] = []*jobdb.Job{} + } + jobsByPool[pool] = append(jobsByPool[pool], job) + + matches := false + for _, pool := range pools { + if slices.Contains(allPools, pool) { + matches = true + break + } + } + if !matches { + continue + } if _, isActive := activeExecutorsSet[executorId]; isActive { if pool == currentPool { @@ -387,10 +397,7 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m awayAllocation[job.PriorityClassName()] = awayAllocation[job.PriorityClassName()].Add(job.AllResourceRequirements()) } } - if _, present := jobsByPool[pool]; !present { - jobsByPool[pool] = []*jobdb.Job{} - } - jobsByPool[pool] = append(jobsByPool[pool], job) + jobsByExecutorId[executorId] = append(jobsByExecutorId[executorId], job) nodeIdByJobId[job.Id()] = nodeId gangInfo, err := schedulercontext.GangInfoFromLegacySchedulerJob(job) @@ -420,7 +427,7 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m }, nil } -func (l *FairSchedulingAlgo) constructNodeDb(homeJobs []*jobdb.Job, awayJobs []*jobdb.Job, nodes []*internaltypes.Node) (*nodedb.NodeDb, error) { +func (l *FairSchedulingAlgo) constructNodeDb(currentPoolJobs []*jobdb.Job, otherPoolsJobs []*jobdb.Job, nodes []*internaltypes.Node) (*nodedb.NodeDb, error) { nodeDb, err := nodedb.NewNodeDb( l.schedulingConfig.PriorityClasses, l.schedulingConfig.IndexedResources, @@ -432,7 +439,7 @@ func (l *FairSchedulingAlgo) constructNodeDb(homeJobs []*jobdb.Job, awayJobs []* if err != nil { return nil, err } - if err := l.populateNodeDb(nodeDb, homeJobs, awayJobs, nodes); err != nil { + if err := l.populateNodeDb(nodeDb, currentPoolJobs, otherPoolsJobs, nodes); err != nil { return nil, err } @@ -590,7 +597,7 @@ func (l *FairSchedulingAlgo) SchedulePool( } // populateNodeDb adds all the nodes and jobs associated with a particular pool to the nodeDb. -func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, homeJobs []*jobdb.Job, awayJobs []*jobdb.Job, nodes []*internaltypes.Node) error { +func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, currentPoolJobs []*jobdb.Job, otherPoolsJobs []*jobdb.Job, nodes []*internaltypes.Node) error { txn := nodeDb.Txn(true) defer txn.Abort() nodesById := armadaslices.GroupByFuncUnique( @@ -598,7 +605,7 @@ func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, homeJobs []*j func(node *internaltypes.Node) string { return node.GetId() }, ) jobsByNodeId := make(map[string][]*jobdb.Job, len(nodes)) - for _, job := range homeJobs { + for _, job := range currentPoolJobs { if job.InTerminalState() || !job.HasRuns() { continue } @@ -612,20 +619,17 @@ func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, homeJobs []*j } jobsByNodeId[nodeId] = append(jobsByNodeId[nodeId], job) } - for _, job := range awayJobs { + for _, job := range otherPoolsJobs { if job.InTerminalState() || !job.HasRuns() { continue } nodeId := job.LatestRun().NodeId() node, ok := nodesById[nodeId] if !ok { - log.Errorf( - "job %s assigned to node %s on executor %s, but no such node found", - job.Id(), nodeId, job.LatestRun().Executor(), - ) + // Job is allocated to a node which isn't part of this pool, ignore it continue } - + // Mark resource used by jobs of other pools as unallocatable so we don't double schedule this resource markResourceUnallocatable(node.AllocatableByPriority, job.KubernetesResourceRequirements()) } diff --git a/third_party/airflow/pyproject.toml b/third_party/airflow/pyproject.toml index fd162626326..cc1f14c4a63 100644 --- a/third_party/airflow/pyproject.toml +++ b/third_party/airflow/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "armada_airflow" -version = "1.0.12" +version = "1.0.13" description = "Armada Airflow Operator" readme='README.md' authors = [{name = "Armada-GROSS", email = "armada@armadaproject.io"}] @@ -17,15 +17,15 @@ dependencies=[ 'kubernetes_asyncio>=24.2.3', 'opentelemetry-exporter-otlp>=1.28.1' # We want to force dependency upgrade for transitive Airflow dependency ] -requires-python=">=3.8" +requires-python=">=3.10" classifiers=[ 'Programming Language :: Python :: 3', 'Operating System :: OS Independent', ] [project.optional-dependencies] -format = ["black>=24.0.0", "flake8==7.0.0", "pylint==2.17.5"] -test = ["pytest==7.3.1", "coverage==7.3.2", "pytest-asyncio==0.21.1", +format = ["black>=24.0.0", "flake8>=7.0.0", "pylint>=2.17.5"] +test = ["pytest==7.3.1", "coverage==6.5.0", "pytest-asyncio==0.21.1", "pytest-mock>=3.14.0"] # note(JayF): sphinx-jekyll-builder was broken by sphinx-markdown-builder 0.6 -- so pin to 0.5.5 docs = ["sphinx==7.1.2", "sphinx-jekyll-builder==0.3.0", "sphinx-toolbox==3.2.0b1", "sphinx-markdown-builder==0.5.5"] @@ -41,7 +41,7 @@ include = ["armada_airflow*"] [tool.black] line-length = 88 -target-version = ['py38', 'py39', 'py310'] +target-version = ['py310', 'py311', 'py312'] include = ''' /( armada