diff --git a/internal/scheduler/floatingresources/floating_resource_types.go b/internal/scheduler/floatingresources/floating_resource_types.go index f4670d13925..3b023e0c891 100644 --- a/internal/scheduler/floatingresources/floating_resource_types.go +++ b/internal/scheduler/floatingresources/floating_resource_types.go @@ -9,19 +9,21 @@ import ( "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/scheduler/configuration" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) type FloatingResourceTypes struct { zeroFloatingResources schedulerobjects.ResourceList pools map[string]*floatingResourcePool + rlFactory *internaltypes.ResourceListFactory } type floatingResourcePool struct { totalResources schedulerobjects.ResourceList } -func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig) (*FloatingResourceTypes, error) { +func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) { zeroFloatingResources := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity, len(config))} for _, c := range config { if _, exists := zeroFloatingResources.Resources[c.Name]; exists { @@ -51,24 +53,21 @@ func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig) (*F return &FloatingResourceTypes{ zeroFloatingResources: zeroFloatingResources, pools: pools, + rlFactory: rlFactory, }, nil } -func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated schedulerobjects.ResourceList) (bool, string) { - pool, exists := frt.pools[poolName] - if !exists { +func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated internaltypes.ResourceList) (bool, string) { + available := frt.GetTotalAvailableForPoolInternalTypes(poolName) + if available.AllZero() { return false, fmt.Sprintf("floating resources not connfigured for pool %s", poolName) } - rl := pool.totalResources.DeepCopy() - rl.Sub(allocated) - for resourceName, quantity := range rl.Resources { - if !frt.isFloatingResource(resourceName) { - continue - } - if quantity.Cmp(resource.Quantity{}) == -1 { - return false, fmt.Sprintf("not enough floating resource %s in pool %s", resourceName, poolName) - } + + resourceName, _, _, exceeds := allocated.OfType(internaltypes.Floating).ExceedsAvailable(available) + if exceeds { + return false, fmt.Sprintf("not enough floating resource %s in pool %s", resourceName, poolName) } + return true, "" } @@ -86,10 +85,8 @@ func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) sche return pool.totalResources.DeepCopy() } -func (frt *FloatingResourceTypes) AddTotalAvailableForPool(poolName string, kubernetesResources schedulerobjects.ResourceList) schedulerobjects.ResourceList { - floatingResources := frt.GetTotalAvailableForPool(poolName) // Note GetTotalAvailableForPool returns a deep copy - floatingResources.Add(kubernetesResources) - return floatingResources +func (frt *FloatingResourceTypes) GetTotalAvailableForPoolInternalTypes(poolName string) internaltypes.ResourceList { + return frt.rlFactory.FromNodeProto(frt.GetTotalAvailableForPool(poolName).Resources) } func (frt *FloatingResourceTypes) SummaryString() string { @@ -98,8 +95,3 @@ func (frt *FloatingResourceTypes) SummaryString() string { } return strings.Join(maps.Keys(frt.zeroFloatingResources.Resources), " ") } - -func (frt *FloatingResourceTypes) isFloatingResource(resourceName string) bool { - _, exists := frt.zeroFloatingResources.Resources[resourceName] - return exists -} diff --git a/internal/scheduler/floatingresources/floating_resource_types_test.go b/internal/scheduler/floatingresources/floating_resource_types_test.go index 895510323b9..76ce183b624 100644 --- a/internal/scheduler/floatingresources/floating_resource_types_test.go +++ b/internal/scheduler/floatingresources/floating_resource_types_test.go @@ -7,82 +7,93 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/scheduler/configuration" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" ) func TestAllPools(t *testing.T) { - sut := makeSut(t) + sut := makeSut(t, makeRlFactory()) assert.Equal(t, []string{"cpu", "gpu"}, sut.AllPools()) } func TestGetTotalAvailableForPool(t *testing.T) { - sut := makeSut(t) + sut := makeSut(t, makeRlFactory()) zero := resource.Quantity{} assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.GetTotalAvailableForPool("cpu").Resources) assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.GetTotalAvailableForPool("gpu").Resources) assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources) } -func TestAddTotalAvailableForPool(t *testing.T) { - sut := makeSut(t) - zero := resource.Quantity{} - ten := *resource.NewQuantity(10, resource.DecimalSI) - kubernetesResources := schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": ten}} - assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.AddTotalAvailableForPool("cpu", kubernetesResources).Resources) - assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.AddTotalAvailableForPool("gpu", kubernetesResources).Resources) - assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": zero, "floating-resource-2": zero}, sut.AddTotalAvailableForPool("some-other-pool", kubernetesResources).Resources) - assert.Equal(t, map[string]resource.Quantity{"cpu": ten}, kubernetesResources.Resources) // check hasn't mutated arg +func TestGetTotalAvailableForPoolInternalTypes(t *testing.T) { + sut := makeSut(t, makeRlFactory()) + + cpuPool := sut.GetTotalAvailableForPoolInternalTypes("cpu") + assert.Equal(t, int64(200000), cpuPool.GetByNameZeroIfMissing("floating-resource-1")) + assert.Equal(t, int64(300000), cpuPool.GetByNameZeroIfMissing("floating-resource-2")) + + gpuPool := sut.GetTotalAvailableForPoolInternalTypes("gpu") + assert.Equal(t, int64(100000), gpuPool.GetByNameZeroIfMissing("floating-resource-1")) + assert.Equal(t, int64(0), gpuPool.GetByNameZeroIfMissing("floating-resource-2")) + + notFound := sut.GetTotalAvailableForPoolInternalTypes("some-invalid-value") + assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-1")) + assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-2")) } func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) { - sut := makeSut(t) + rlFactory := makeRlFactory() + sut := makeSut(t, rlFactory) withinLimits, errorMessage := sut.WithinLimits("cpu", - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("199")}}, + rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("199")}), ) assert.True(t, withinLimits) assert.Empty(t, errorMessage) } func TestWithinLimits_WhenAtLimit_ReturnsTrue(t *testing.T) { - sut := makeSut(t) + rlFactory := makeRlFactory() + sut := makeSut(t, rlFactory) withinLimits, errorMessage := sut.WithinLimits("cpu", - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200")}}, + rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200")}), ) assert.True(t, withinLimits) assert.Empty(t, errorMessage) } func TestWithinLimits_WhenExceedsLimit_ReturnsFalse(t *testing.T) { - sut := makeSut(t) + rlFactory := makeRlFactory() + sut := makeSut(t, rlFactory) withinLimits, errorMessage := sut.WithinLimits("cpu", - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("201")}}, + rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("201")}), ) assert.False(t, withinLimits) assert.NotEmpty(t, errorMessage) } func TestWithinLimits_IgnoresNonFloatingResources(t *testing.T) { - sut := makeSut(t) + rlFactory := makeRlFactory() + sut := makeSut(t, rlFactory) withinLimits, errorMessage := sut.WithinLimits("cpu", - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"some-other-resource": resource.MustParse("1000")}}, + rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"cpu": resource.MustParse("1000")}), ) assert.True(t, withinLimits) assert.Empty(t, errorMessage) } func TestWithinLimits_WhenResourceNotSpecifiedForAPool_ReturnsFalse(t *testing.T) { - sut := makeSut(t) + rlFactory := makeRlFactory() + sut := makeSut(t, rlFactory) withinLimits, errorMessage := sut.WithinLimits("gpu", - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-2": resource.MustParse("1")}}, + rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-2": resource.MustParse("1")}), ) assert.False(t, withinLimits) assert.NotEmpty(t, errorMessage) } func TestWithinLimits_WhenPoolDoesNotExist_ReturnsFalse(t *testing.T) { - sut := makeSut(t) + rlFactory := makeRlFactory() + sut := makeSut(t, rlFactory) withinLimits, errorMessage := sut.WithinLimits("some-other-pool", - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("1")}}, + rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("1")}), ) assert.False(t, withinLimits) assert.NotEmpty(t, errorMessage) @@ -115,8 +126,18 @@ func testConfig() []configuration.FloatingResourceConfig { } } -func makeSut(t *testing.T) *FloatingResourceTypes { - sut, err := NewFloatingResourceTypes(testConfig()) +func makeRlFactory() *internaltypes.ResourceListFactory { + rlFactory, err := internaltypes.NewResourceListFactory([]configuration.ResourceType{ + {Name: "cpu"}, + }, testConfig()) + if err != nil { + panic(err) + } + return rlFactory +} + +func makeSut(t *testing.T, rlFactory *internaltypes.ResourceListFactory) *FloatingResourceTypes { + sut, err := NewFloatingResourceTypes(testConfig(), rlFactory) assert.Nil(t, err) return sut } diff --git a/internal/scheduler/internaltypes/resource_fraction_list_test.go b/internal/scheduler/internaltypes/resource_fraction_list_test.go index a187fa778e1..b8224d62c77 100644 --- a/internal/scheduler/internaltypes/resource_fraction_list_test.go +++ b/internal/scheduler/internaltypes/resource_fraction_list_test.go @@ -19,7 +19,8 @@ func TestMax(t *testing.T) { factory := testFactory() assert.Equal(t, 0.0, testResourceFractionList(factory, -0.1, 0.0, 0.0).Max()) assert.Equal(t, 0.0, testResourceFractionList(factory, 0.0, 0.0, 0.0).Max()) - assert.Equal(t, 0.9, testResourceFractionList(factory, 0.1, 0.9, 0.7).Max()) + assert.Equal(t, 0.9, testResourceFractionList(factory, 0.2, 0.9, 0.1).Max()) + assert.Equal(t, 0.9, testResourceFractionList(factory, 0.9, 0.2, 0.1).Max()) } func TestMax_HandlesEmptyCorrectly(t *testing.T) { diff --git a/internal/scheduler/internaltypes/resource_list.go b/internal/scheduler/internaltypes/resource_list.go index be4b42825f2..dda39551103 100644 --- a/internal/scheduler/internaltypes/resource_list.go +++ b/internal/scheduler/internaltypes/resource_list.go @@ -2,6 +2,7 @@ package internaltypes import ( "fmt" + "math" "golang.org/x/exp/slices" k8sResource "k8s.io/apimachinery/pkg/api/resource" @@ -76,6 +77,19 @@ func (rl ResourceList) GetByNameZeroIfMissing(name string) int64 { return rl.resources[index] } +func (rl ResourceList) GetResourceByNameZeroIfMissing(name string) k8sResource.Quantity { + if rl.IsEmpty() { + return k8sResource.Quantity{} + } + + index, ok := rl.factory.nameToIndex[name] + if !ok { + return k8sResource.Quantity{} + } + + return *k8sResource.NewScaledQuantity(rl.resources[index], rl.factory.scales[index]) +} + func (rl ResourceList) GetResources() []Resource { if rl.IsEmpty() { return []Resource{} @@ -147,6 +161,15 @@ func (rl ResourceList) IsEmpty() bool { return rl.factory == nil } +func (rl ResourceList) Factory() *ResourceListFactory { + return rl.factory +} + +func (rl ResourceList) Exceeds(other ResourceList) bool { + _, _, _, exceeds := rl.ExceedsAvailable(other) + return exceeds +} + // ExceedsAvailable // - if any resource in this ResourceList is greater than the equivalent resource in param available, this function returns // - the name of the relevant resource @@ -195,6 +218,21 @@ func (rl ResourceList) OfType(t ResourceType) ResourceList { return ResourceList{factory: rl.factory, resources: result} } +func (rl ResourceList) Cap(cap ResourceList) ResourceList { + assertSameResourceListFactory(rl.factory, cap.factory) + if rl.IsEmpty() { + return ResourceList{} + } + if cap.IsEmpty() { + return rl + } + result := make([]int64, len(rl.resources)) + for i, r := range rl.resources { + result[i] = min(r, cap.resources[i]) + } + return ResourceList{factory: rl.factory, resources: result} +} + func (rl ResourceList) Add(other ResourceList) ResourceList { assertSameResourceListFactory(rl.factory, other.factory) if rl.IsEmpty() { @@ -266,17 +304,6 @@ func (rl ResourceList) Negate() ResourceList { return ResourceList{factory: rl.factory, resources: result} } -func (rl ResourceList) Scale(factor float64) ResourceList { - if rl.IsEmpty() { - return rl - } - result := make([]int64, len(rl.resources)) - for i, r := range rl.resources { - result[i] = multiplyResource(r, factor) - } - return ResourceList{resources: result, factory: rl.factory} -} - func (rl ResourceList) asQuantity(index int) *k8sResource.Quantity { if rl.factory == nil { return &k8sResource.Quantity{} @@ -299,8 +326,21 @@ func assertSameResourceListFactory(a, b *ResourceListFactory) { func multiplyResource(res int64, multiplier float64) int64 { if multiplier == 1.0 { - // avoid rounding error in the simple case + // Avoid rounding error in the simple case. return res } + + // Return max int64 if multiplier is infinity. + // If res is zero, we assume infinity trumps zero, and return int64 maxValue. + // This gives the right behavior when the result is used as a cap, + // as an infinity multiplier means "never apply cap". + if math.IsInf(multiplier, 0) { + if (multiplier < 0) == (res < 0) { + return math.MaxInt64 + } else { + return math.MinInt64 + } + } + return int64(float64(res) * multiplier) } diff --git a/internal/scheduler/internaltypes/resource_list_map_util.go b/internal/scheduler/internaltypes/resource_list_map_util.go new file mode 100644 index 00000000000..b359703034a --- /dev/null +++ b/internal/scheduler/internaltypes/resource_list_map_util.go @@ -0,0 +1,59 @@ +package internaltypes + +import ( + "strings" + + "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" +) + +func RlMapToString(m map[string]ResourceList) string { + results := []string{} + for k, v := range m { + results = append(results, k+"="+v.String()) + } + return strings.Join(results, " ") +} + +func RlMapSumValues(m map[string]ResourceList) ResourceList { + result := ResourceList{} + for _, v := range m { + result = result.Add(v) + } + return result +} + +func RlMapAllZero(m map[string]ResourceList) bool { + for _, v := range m { + if !v.AllZero() { + return false + } + } + return true +} + +func RlMapHasNegativeValues(m map[string]ResourceList) bool { + for _, v := range m { + if v.HasNegativeValues() { + return true + } + } + return false +} + +func RlMapFromJobSchedulerObjects(m schedulerobjects.QuantityByTAndResourceType[string], rlFactory *ResourceListFactory) map[string]ResourceList { + result := map[string]ResourceList{} + for k, v := range m { + result[k] = rlFactory.FromJobResourceListIgnoreUnknown(v.Resources) + } + return result +} + +func RlMapRemoveZeros(m map[string]ResourceList) map[string]ResourceList { + result := map[string]ResourceList{} + for k, v := range m { + if !v.AllZero() { + result[k] = v + } + } + return result +} diff --git a/internal/scheduler/internaltypes/resource_list_map_util_test.go b/internal/scheduler/internaltypes/resource_list_map_util_test.go new file mode 100644 index 00000000000..f278eeefd72 --- /dev/null +++ b/internal/scheduler/internaltypes/resource_list_map_util_test.go @@ -0,0 +1,111 @@ +package internaltypes + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/resource" + + "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" +) + +func TestRlMapSumValues(t *testing.T) { + factory := testFactory() + + assert.Equal(t, testResourceList(factory, "3", "3Ki"), RlMapSumValues(testMapAllPositive(factory))) + assert.True(t, RlMapSumValues(testMapEmpty(factory)).IsEmpty()) +} + +func TestRlMapAllZero(t *testing.T) { + factory := testFactory() + + assert.False(t, RlMapAllZero(testMapAllPositive(factory))) + assert.True(t, RlMapAllZero(testMapAllZero(factory))) + assert.False(t, RlMapAllZero(testMapOneZero(factory))) + assert.False(t, RlMapAllZero(testMapOneNegative(factory))) + assert.True(t, RlMapAllZero(testMapEmpty(factory))) +} + +func TestRlMapHasNegativeValues(t *testing.T) { + factory := testFactory() + + assert.False(t, RlMapHasNegativeValues(testMapAllPositive(factory))) + assert.False(t, RlMapHasNegativeValues(testMapAllZero(factory))) + assert.False(t, RlMapHasNegativeValues(testMapOneZero(factory))) + assert.True(t, RlMapHasNegativeValues(testMapOneNegative(factory))) + assert.False(t, RlMapHasNegativeValues(testMapEmpty(factory))) +} + +func TestRlMapFromJobSchedulerObjects(t *testing.T) { + factory := testFactory() + + input := make(schedulerobjects.QuantityByTAndResourceType[string]) + input.AddResourceList("priorityClass1", + schedulerobjects.ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("1Ki"), + }, + }, + ) + input.AddResourceList("priorityClass2", + schedulerobjects.ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("2"), + "memory": resource.MustParse("2Ki"), + }, + }, + ) + + expected := map[string]ResourceList{ + "priorityClass1": testResourceList(factory, "1", "1Ki"), + "priorityClass2": testResourceList(factory, "2", "2Ki"), + } + + assert.Equal(t, expected, RlMapFromJobSchedulerObjects(input, factory)) +} + +func TestRlMapRemoveZeros(t *testing.T) { + factory := testFactory() + + input := map[string]ResourceList{ + "priorityClass1": testResourceList(factory, "1", "0Ki"), + "priorityClass2": testResourceList(factory, "0", "0Ki"), + } + expected := map[string]ResourceList{ + "priorityClass1": testResourceList(factory, "1", "0Ki"), + } + assert.Equal(t, expected, RlMapRemoveZeros(input)) +} + +func testMapAllPositive(factory *ResourceListFactory) map[string]ResourceList { + return map[string]ResourceList{ + "a": testResourceList(factory, "1", "1Ki"), + "b": testResourceList(factory, "2", "2Ki"), + } +} + +func testMapAllZero(factory *ResourceListFactory) map[string]ResourceList { + return map[string]ResourceList{ + "a": testResourceList(factory, "0", "0"), + "b": testResourceList(factory, "0", "0"), + } +} + +func testMapOneNegative(factory *ResourceListFactory) map[string]ResourceList { + return map[string]ResourceList{ + "a": testResourceList(factory, "-1", "1Ki"), + "b": testResourceList(factory, "2", "2Ki"), + } +} + +func testMapOneZero(factory *ResourceListFactory) map[string]ResourceList { + return map[string]ResourceList{ + "a": testResourceList(factory, "0", "1Ki"), + "b": testResourceList(factory, "2", "2Ki"), + } +} + +func testMapEmpty(factory *ResourceListFactory) map[string]ResourceList { + return map[string]ResourceList{} +} diff --git a/internal/scheduler/internaltypes/resource_list_test.go b/internal/scheduler/internaltypes/resource_list_test.go index 216ab626670..1138b6225b2 100644 --- a/internal/scheduler/internaltypes/resource_list_test.go +++ b/internal/scheduler/internaltypes/resource_list_test.go @@ -1,6 +1,7 @@ package internaltypes import ( + "math" "testing" "github.com/stretchr/testify/assert" @@ -64,6 +65,19 @@ func TestGetByNameZeroIfMissing_HandlesEmptyCorrectly(t *testing.T) { assert.Equal(t, int64(0), empty.GetByNameZeroIfMissing("missing")) } +func TestGetResourceByNameZeroIfMissing(t *testing.T) { + factory := testFactory() + a := testResourceList(factory, "1", "1Gi") + + assert.Equal(t, *k8sResource.NewScaledQuantity(1000, k8sResource.Milli), a.GetResourceByNameZeroIfMissing("cpu")) + assert.Equal(t, k8sResource.Quantity{}, a.GetResourceByNameZeroIfMissing("missing")) +} + +func TestGetResourceByNameZeroIfMissing_HandlesEmptyCorrectly(t *testing.T) { + empty := ResourceList{} + assert.Equal(t, k8sResource.Quantity{}, empty.GetResourceByNameZeroIfMissing("missing")) +} + func TestGetResources(t *testing.T) { factory := testFactory() a := testResourceList(factory, "1", "1Gi") @@ -222,6 +236,38 @@ func TestOfType_HandlesEmptyCorrectly(t *testing.T) { assert.Equal(t, ResourceList{}, ResourceList{}.OfType(Floating)) } +func TestCap(t *testing.T) { + factory := testFactory() + + assert.Equal(t, testResourceList(factory, "1", "2Ki"), testResourceList(factory, "1", "2Ki").Cap(testResourceList(factory, "2", "4Ki"))) + assert.Equal(t, testResourceList(factory, "1", "1Ki"), testResourceList(factory, "1", "4Ki").Cap(testResourceList(factory, "2", "1Ki"))) +} + +func TestCap_HandlesEmptyCorrectly(t *testing.T) { + factory := testFactory() + + assert.Equal(t, testResourceList(factory, "1", "1Ki"), testResourceList(factory, "1", "1Ki").Cap(ResourceList{})) + assert.Equal(t, ResourceList{}, ResourceList{}.Cap(testResourceList(factory, "1", "1Ki"))) + assert.Equal(t, ResourceList{}, ResourceList{}.Cap(ResourceList{})) +} + +func TestExceeds(t *testing.T) { + factory := testFactory() + + assert.False(t, testResourceList(factory, "0", "0Ki").Exceeds(testResourceList(factory, "0", "0Ki"))) + assert.False(t, testResourceList(factory, "1", "1Ki").Exceeds(testResourceList(factory, "1", "1Ki"))) + assert.True(t, testResourceList(factory, "2", "1Ki").Exceeds(testResourceList(factory, "1", "1Ki"))) + assert.False(t, testResourceList(factory, "1", "1Ki").Exceeds(testResourceList(factory, "2", "1Ki"))) +} + +func TestExceeds_HandlesEmptyCorrectly(t *testing.T) { + factory := testFactory() + + assert.True(t, testResourceList(factory, "1", "1Ki").Exceeds(ResourceList{})) + assert.False(t, ResourceList{}.Exceeds(testResourceList(factory, "1", "1Ki"))) + assert.False(t, ResourceList{}.Exceeds(ResourceList{})) +} + func TestAdd(t *testing.T) { factory := testFactory() @@ -273,6 +319,34 @@ func TestMultiply(t *testing.T) { testResourceFractionList(factory, -0.25, -0.75, 1))) } +func TestMultiply_HandlesInfinityCorrectly(t *testing.T) { + factory := testFactory() + + result1 := testResourceList(factory, "100", "100Ki").Multiply(testResourceFractionList(factory, 0.75, math.Inf(1), 1)) + assert.Equal(t, int64(75000), result1.GetByNameZeroIfMissing("cpu")) + assert.Equal(t, int64(math.MaxInt64), result1.GetByNameZeroIfMissing("memory")) + + result2 := testResourceList(factory, "100", "0").Multiply(testResourceFractionList(factory, 0.75, math.Inf(1), 1)) + assert.Equal(t, int64(math.MaxInt64), result2.GetByNameZeroIfMissing("memory")) + + result3 := testResourceList(factory, "100", "-100Ki").Multiply(testResourceFractionList(factory, 0.75, math.Inf(1), 1)) + assert.Equal(t, int64(math.MinInt64), result3.GetByNameZeroIfMissing("memory")) +} + +func TestMultiply_HandlesMinusInfinityCorrectly(t *testing.T) { + factory := testFactory() + + result1 := testResourceList(factory, "100", "100Ki").Multiply(testResourceFractionList(factory, 0.75, math.Inf(-1), 1)) + assert.Equal(t, int64(75000), result1.GetByNameZeroIfMissing("cpu")) + assert.Equal(t, int64(math.MinInt64), result1.GetByNameZeroIfMissing("memory")) + + result2 := testResourceList(factory, "100", "0").Multiply(testResourceFractionList(factory, 0.75, math.Inf(-1), 1)) + assert.Equal(t, int64(math.MinInt64), result2.GetByNameZeroIfMissing("memory")) + + result3 := testResourceList(factory, "100", "-100Ki").Multiply(testResourceFractionList(factory, 0.75, math.Inf(-1), 1)) + assert.Equal(t, int64(math.MaxInt64), result3.GetByNameZeroIfMissing("memory")) +} + func TestMultiply_HandlesEmptyCorrectly(t *testing.T) { factory := testFactory() @@ -316,19 +390,6 @@ func TestNegate_HandlesEmptyCorrectly(t *testing.T) { assert.Equal(t, ResourceList{}, ResourceList{}.Negate()) } -func TestScale(t *testing.T) { - factory := testFactory() - assert.Equal(t, testResourceList(factory, "4", "2Ki"), testResourceList(factory, "2", "1Ki").Scale(2.0)) - assert.Equal(t, testResourceList(factory, "2", "1Ki"), testResourceList(factory, "2", "1Ki").Scale(1.0)) - assert.Equal(t, testResourceList(factory, "0", "0Ki"), testResourceList(factory, "2", "1Ki").Scale(0.0)) - assert.Equal(t, testResourceList(factory, "2", "-1Ki"), testResourceList(factory, "-2", "1Ki").Scale(-1.0)) -} - -func TestScale_HandlesEmptyCorrectly(t *testing.T) { - assert.Equal(t, ResourceList{}, ResourceList{}.Scale(0.0)) - assert.Equal(t, ResourceList{}, ResourceList{}.Scale(1.0)) -} - func testResourceList(factory *ResourceListFactory, cpu string, memory string) ResourceList { return factory.FromJobResourceListIgnoreUnknown(map[string]k8sResource.Quantity{ "cpu": k8sResource.MustParse(cpu), diff --git a/internal/scheduler/metrics/cycle_metrics_test.go b/internal/scheduler/metrics/cycle_metrics_test.go index 86c810b6fa2..4c07b001413 100644 --- a/internal/scheduler/metrics/cycle_metrics_test.go +++ b/internal/scheduler/metrics/cycle_metrics_test.go @@ -11,7 +11,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/scheduler/configuration" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/scheduling" "github.com/armadaproject/armada/internal/scheduler/scheduling/context" "github.com/armadaproject/armada/internal/scheduler/scheduling/fairness" @@ -23,7 +23,8 @@ const epsilon = 1e-6 func TestReportStateTransitions(t *testing.T) { fairnessCostProvider, err := fairness.NewDominantResourceFairness( cpu(100), - configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"cpu"}}) + configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"cpu"}}, + ) require.NoError(t, err) result := scheduling.SchedulerResult{ SchedulingContexts: []*context.SchedulingContext{ @@ -171,8 +172,8 @@ func TestDisableLeaderMetrics(t *testing.T) { assert.NotZero(t, len(collect(m))) } -func cpu(n int) schedulerobjects.ResourceList { - return schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{"cpu": resource.MustParse(fmt.Sprintf("%d", n))}, - } +func cpu(n int) internaltypes.ResourceList { + return testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown( + map[string]resource.Quantity{"cpu": resource.MustParse(fmt.Sprintf("%d", n))}, + ) } diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index 7d257de5a4e..336ca0de314 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -22,7 +22,6 @@ import ( "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduler/scheduling/context" ) @@ -213,7 +212,7 @@ func NewNodeDb( nodeTypes: make(map[uint64]*internaltypes.NodeType), wellKnownNodeTypes: make(map[string]*configuration.WellKnownNodeType), numNodesByNodeType: make(map[uint64]int), - totalResources: internaltypes.ResourceList{}, + totalResources: resourceListFactory.MakeAllZero(), db: db, // Set the initial capacity (somewhat arbitrarily) to 128 reasons. podRequirementsNotMetReasonStringCache: make(map[uint64]string, 128), @@ -305,10 +304,10 @@ func (nodeDb *NodeDb) NumNodes() int { return int(nodeDb.numNodes) } -func (nodeDb *NodeDb) TotalKubernetesResources() schedulerobjects.ResourceList { +func (nodeDb *NodeDb) TotalKubernetesResources() internaltypes.ResourceList { nodeDb.mu.Lock() defer nodeDb.mu.Unlock() - return schedulerobjects.ResourceList{Resources: nodeDb.totalResources.ToMap()} + return nodeDb.totalResources } func (nodeDb *NodeDb) Txn(write bool) *memdb.Txn { diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index fd1a53b6281..1f23597541b 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -31,9 +31,10 @@ func TestTotalResources(t *testing.T) { nodeDb, err := newNodeDbWithNodes([]*schedulerobjects.Node{}) require.NoError(t, err) - expected := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity)} - assert.True(t, expected.Equal(nodeDb.TotalKubernetesResources())) + assert.False(t, nodeDb.TotalKubernetesResources().IsEmpty()) + assert.True(t, nodeDb.TotalKubernetesResources().AllZero()) + expected := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity)} // Upserting nodes for the first time should increase the resource count. nodes := testfixtures.N32CpuNodes(2, testfixtures.TestPriorities) for _, node := range nodes { @@ -48,7 +49,9 @@ func TestTotalResources(t *testing.T) { } txn.Commit() - assert.True(t, expected.Equal(nodeDb.TotalKubernetesResources())) + assert.True(t, expected.Equal(schedulerobjects.ResourceList{ + Resources: nodeDb.TotalKubernetesResources().ToMap(), + })) // Upserting new nodes should increase the resource count. nodes = testfixtures.N8GpuNodes(3, testfixtures.TestPriorities) @@ -64,7 +67,9 @@ func TestTotalResources(t *testing.T) { } txn.Commit() - assert.True(t, expected.Equal(nodeDb.TotalKubernetesResources())) + assert.True(t, expected.Equal(schedulerobjects.ResourceList{ + Resources: nodeDb.TotalKubernetesResources().ToMap(), + })) } func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) { diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index 0ed82388e46..902d3adca25 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -85,7 +85,7 @@ func Run(config schedulerconfig.Configuration) error { } ctx.Infof("Supported resource types: %s", resourceListFactory.SummaryString()) - floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(config.Scheduling.ExperimentalFloatingResources) + floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(config.Scheduling.ExperimentalFloatingResources, resourceListFactory) if err != nil { return err } diff --git a/internal/scheduler/scheduling/constraints/constraints.go b/internal/scheduler/scheduling/constraints/constraints.go index 1f8a5b4faf3..270dd8a2dfb 100644 --- a/internal/scheduler/scheduling/constraints/constraints.go +++ b/internal/scheduler/scheduling/constraints/constraints.go @@ -4,15 +4,23 @@ import ( "math" "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/api/resource" + armadamaps "github.com/armadaproject/armada/internal/common/maps" + "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/scheduler/configuration" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/scheduling/context" "github.com/armadaproject/armada/pkg/api" ) +// SchedulingConstraints contains scheduling constraints, e.g. per-queue resource limits. +type SchedulingConstraints interface { + CheckRoundConstraints(sctx *context.SchedulingContext) (bool, string, error) + CheckJobConstraints(sctx *context.SchedulingContext, gctx *context.GangSchedulingContext) (bool, string, error) + CapResources(queue string, resourcesByPc map[string]internaltypes.ResourceList) map[string]internaltypes.ResourceList +} + const ( // Indicates that the limit on resources scheduled per round has been exceeded. MaximumResourcesScheduledUnschedulableReason = "maximum resources scheduled" @@ -23,7 +31,7 @@ const ( // Indicates that the scheduling rate limit has been exceeded. GlobalRateLimitExceededUnschedulableReason = "global scheduling rate limit exceeded" QueueRateLimitExceededUnschedulableReason = "queue scheduling rate limit exceeded" - SchedulingPausedOnQueueUnschedulableReason = "scheduling paused on queue" + QueueCordonedUnschedulableReason = "queue cordoned" // Indicates that scheduling a gang would exceed the rate limit. GlobalRateLimitExceededByGangUnschedulableReason = "gang would exceed global scheduling rate limit" @@ -55,107 +63,45 @@ func IsTerminalUnschedulableReason(reason string) bool { // IsTerminalQueueUnschedulableReason returns true if reason indicates // it's not possible to schedule any more jobs from this queue in this round. func IsTerminalQueueUnschedulableReason(reason string) bool { - return reason == QueueRateLimitExceededUnschedulableReason || reason == SchedulingPausedOnQueueUnschedulableReason + return reason == QueueRateLimitExceededUnschedulableReason || reason == QueueCordonedUnschedulableReason } // SchedulingConstraints contains scheduling constraints, e.g., per-queue resource limits. -type SchedulingConstraints struct { - // Scheduling constraints by priority class. - priorityClassSchedulingConstraintsByPriorityClassName map[string]priorityClassSchedulingConstraints - // Scheduling constraints for specific queues. - // If present for a particular queue, global limits (i.e., priorityClassSchedulingConstraintsByPriorityClassName) - // do not apply for that queue. - queueSchedulingConstraintsByQueueName map[string]queueSchedulingConstraints - // Limits total resources scheduled per invocation. - maximumResourcesToSchedule map[string]resource.Quantity -} - -// queueSchedulingConstraints contains per-queue scheduling constraints. -type queueSchedulingConstraints struct { - // Scheduling constraints by priority class. - PriorityClassSchedulingConstraintsByPriorityClassName map[string]priorityClassSchedulingConstraints - // Determines whether scheduling has been paused for this queue - Cordoned bool -} - -// priorityClassSchedulingConstraints contains scheduling constraints that apply to jobs of a specific priority class. -type priorityClassSchedulingConstraints struct { - PriorityClassName string - // Limits total resources allocated to jobs of this priority class per queue. - MaximumResourcesPerQueue map[string]resource.Quantity +type schedulingConstraints struct { + // Limits total resources scheduled per scheduling round. + maximumResourcesToSchedule internaltypes.ResourceList + // Queues that are cordoned (i.e. no jobs may be scheduled on them) + cordonedQueues map[string]bool + // Resource limits by queue and priority class. E.g. "Queue A is limited to 100 cpu at priority class armada-default" + resourceLimitsPerQueuePerPriorityClass map[string]map[string]internaltypes.ResourceList } -func NewSchedulingConstraints(pool string, totalResources schedulerobjects.ResourceList, config configuration.SchedulingConfig, queues []*api.Queue) SchedulingConstraints { - priorityClassSchedulingConstraintsByPriorityClassName := make(map[string]priorityClassSchedulingConstraints, len(config.PriorityClasses)) - for name, priorityClass := range config.PriorityClasses { - maximumResourceFractionPerQueue := priorityClass.MaximumResourceFractionPerQueue - if m, ok := priorityClass.MaximumResourceFractionPerQueueByPool[pool]; ok { - // Use pool-specific config is available. - maximumResourceFractionPerQueue = util.MergeMaps(maximumResourceFractionPerQueue, m) - } - priorityClassSchedulingConstraintsByPriorityClassName[name] = priorityClassSchedulingConstraints{ - PriorityClassName: name, - MaximumResourcesPerQueue: absoluteFromRelativeLimits(totalResources.Resources, maximumResourceFractionPerQueue), - } - } - - queueSchedulingConstraintsByQueueName := make(map[string]queueSchedulingConstraints, len(queues)) - for _, queue := range queues { - priorityClassSchedulingConstraintsByPriorityClassNameForQueue := make(map[string]priorityClassSchedulingConstraints, len(queue.ResourceLimitsByPriorityClassName)) - for priorityClassName, priorityClassResourceLimits := range queue.ResourceLimitsByPriorityClassName { - maximumResourceFraction := priorityClassResourceLimits.MaximumResourceFraction - if m, ok := priorityClassResourceLimits.MaximumResourceFractionByPool[pool]; ok { - // Use pool-specific maximum resource fraction if available. - maximumResourceFraction = util.MergeMaps(maximumResourceFraction, m.MaximumResourceFraction) - } - priorityClassSchedulingConstraintsByPriorityClassNameForQueue[priorityClassName] = priorityClassSchedulingConstraints{ - PriorityClassName: priorityClassName, - MaximumResourcesPerQueue: absoluteFromRelativeLimits(totalResources.Resources, maximumResourceFraction), - } - } - queueSchedulingConstraintsByQueueName[queue.Name] = queueSchedulingConstraints{ - PriorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassNameForQueue, - Cordoned: queue.Cordoned, - } - } +func NewSchedulingConstraints( + pool string, + totalResources internaltypes.ResourceList, + config configuration.SchedulingConfig, + queues []*api.Queue, +) SchedulingConstraints { + cordonedQueues := armadamaps.FromSlice(queues, + func(q *api.Queue) string { return q.Name }, + func(q *api.Queue) bool { return q.Cordoned }) - maximumResourceFractionToSchedule := config.MaximumResourceFractionToSchedule - if m, ok := config.MaximumResourceFractionToScheduleByPool[pool]; ok { - // Use pool-specific config is available. - maximumResourceFractionToSchedule = m - } - return SchedulingConstraints{ - maximumResourcesToSchedule: absoluteFromRelativeLimits(totalResources.Resources, maximumResourceFractionToSchedule), - priorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassName, - queueSchedulingConstraintsByQueueName: queueSchedulingConstraintsByQueueName, + return &schedulingConstraints{ + cordonedQueues: cordonedQueues, + maximumResourcesToSchedule: calculatePerRoundLimits(totalResources, pool, config), + resourceLimitsPerQueuePerPriorityClass: calculatePerQueueLimits(totalResources, pool, config.PriorityClasses, queues), } } -func absoluteFromRelativeLimits(totalResources map[string]resource.Quantity, relativeLimits map[string]float64) map[string]resource.Quantity { - absoluteLimits := make(map[string]resource.Quantity, len(relativeLimits)) - for t, f := range relativeLimits { - absoluteLimits[t] = ScaleQuantity(totalResources[t].DeepCopy(), f) - } - return absoluteLimits -} - -// ScaleQuantity scales q in-place by a factor f. -// This functions overflows for quantities the milli value of which can't be expressed as an int64. -// E.g., 1Pi is ok, but not 10Pi. -func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity { - q.SetMilli(int64(math.Round(float64(q.MilliValue()) * f))) - return q -} - -func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *context.SchedulingContext) (bool, string, error) { +func (constraints *schedulingConstraints) CheckRoundConstraints(sctx *context.SchedulingContext) (bool, string, error) { // maximumResourcesToSchedule check. - if !isStrictlyLessOrEqual(sctx.ScheduledResources.Resources, constraints.maximumResourcesToSchedule) { + if sctx.ScheduledResources.Exceeds(constraints.maximumResourcesToSchedule) { return false, MaximumResourcesScheduledUnschedulableReason, nil } return true, "", nil } -func (constraints *SchedulingConstraints) CheckConstraints( +func (constraints *schedulingConstraints) CheckJobConstraints( sctx *context.SchedulingContext, gctx *context.GangSchedulingContext, ) (bool, string, error) { @@ -164,6 +110,11 @@ func (constraints *SchedulingConstraints) CheckConstraints( return false, "", errors.Errorf("no QueueSchedulingContext for queue %s", gctx.Queue) } + // Queue cordoned + if constraints.cordonedQueues[qctx.Queue] { + return false, QueueCordonedUnschedulableReason, nil + } + // Global rate limiter check. tokens := sctx.Limiter.TokensAt(sctx.Started) if tokens <= 0 { @@ -176,9 +127,6 @@ func (constraints *SchedulingConstraints) CheckConstraints( return false, GlobalRateLimitExceededByGangUnschedulableReason, nil } - if queueConstraints, ok := constraints.queueSchedulingConstraintsByQueueName[qctx.Queue]; ok && queueConstraints.Cordoned { - return false, SchedulingPausedOnQueueUnschedulableReason, nil - } // Per-queue rate limiter check. tokens = qctx.Limiter.TokensAt(sctx.Started) if tokens <= 0 { @@ -191,64 +139,83 @@ func (constraints *SchedulingConstraints) CheckConstraints( return false, QueueRateLimitExceededByGangUnschedulableReason, nil } - // queueSchedulingConstraintsByQueueName / priorityClassSchedulingConstraintsByPriorityClassName checks. - overallResourceLimits := constraints.resolveResourceLimitsForQueueAndPriorityClass(gctx.Queue, gctx.PriorityClassName) - if !isStrictlyLessOrEqual(qctx.AllocatedByPriorityClass[gctx.PriorityClassName].Resources, overallResourceLimits) { + // Quantity scheduled by queue and priority class + queueLimit, haslimit := constraints.resourceLimitsPerQueuePerPriorityClass[qctx.Queue][gctx.PriorityClassName] + allocatedResources := qctx.AllocatedByPriorityClass[gctx.PriorityClassName] + if haslimit && allocatedResources.Exceeds(queueLimit) { return false, UnschedulableReasonMaximumResourcesExceeded, nil } return true, "", nil } -func (constraints *SchedulingConstraints) CapResources(queue string, resourcesByPc schedulerobjects.QuantityByTAndResourceType[string]) schedulerobjects.QuantityByTAndResourceType[string] { - cappedResourcesByPc := schedulerobjects.QuantityByTAndResourceType[string]{} +func (c *schedulingConstraints) CapResources(queue string, resourcesByPc map[string]internaltypes.ResourceList) map[string]internaltypes.ResourceList { + perQueueLimit, ok := c.resourceLimitsPerQueuePerPriorityClass[queue] + if !ok { + return resourcesByPc + } + cappedResourcesByPc := make(map[string]internaltypes.ResourceList, len(resourcesByPc)) for pc, resources := range resourcesByPc { - overallResourceLimits := constraints.resolveResourceLimitsForQueueAndPriorityClass(queue, pc) - cappedResources := make(map[string]resource.Quantity, len(resources.Resources)) - for resourceName, qty := range resources.Resources { - limit, ok := overallResourceLimits[resourceName] - if ok && qty.Cmp(limit) == 1 { - cappedResources[resourceName] = limit - } else { - cappedResources[resourceName] = qty - } - } - cappedResourcesByPc[pc] = schedulerobjects.ResourceList{Resources: cappedResources} + cappedResourcesByPc[pc] = resources.Cap(perQueueLimit[pc]) } return cappedResourcesByPc } -func (constraints *SchedulingConstraints) resolveResourceLimitsForQueueAndPriorityClass(queue string, priorityClass string) map[string]resource.Quantity { - queueAndPriorityClassResourceLimits := constraints.getQueueAndPriorityClassResourceLimits(queue, priorityClass) - priorityClassResourceLimits := constraints.getPriorityClassResourceLimits(priorityClass) - return util.MergeMaps(priorityClassResourceLimits, queueAndPriorityClassResourceLimits) -} - -func (constraints *SchedulingConstraints) getQueueAndPriorityClassResourceLimits(queue string, priorityClass string) map[string]resource.Quantity { - if queueConstraint, ok := constraints.queueSchedulingConstraintsByQueueName[queue]; ok { - if priorityClassConstraint, ok := queueConstraint.PriorityClassSchedulingConstraintsByPriorityClassName[priorityClass]; ok { - return priorityClassConstraint.MaximumResourcesPerQueue - } +func calculatePerRoundLimits( + totalResources internaltypes.ResourceList, + pool string, + config configuration.SchedulingConfig, +) internaltypes.ResourceList { + if totalResources.IsEmpty() { + return totalResources } - return map[string]resource.Quantity{} -} + rlFactory := totalResources.Factory() -func (constraints *SchedulingConstraints) getPriorityClassResourceLimits(priorityClass string) map[string]resource.Quantity { - if priorityClassConstraint, ok := constraints.priorityClassSchedulingConstraintsByPriorityClassName[priorityClass]; ok { - return priorityClassConstraint.MaximumResourcesPerQueue + maximumResourceFractionToSchedule := config.MaximumResourceFractionToSchedule + if m, ok := config.MaximumResourceFractionToScheduleByPool[pool]; ok { + // Use pool-specific config is available. + // Should do util.MergeMaps really but don't want to change existing behaviour. + maximumResourceFractionToSchedule = m } - return map[string]resource.Quantity{} -} + return totalResources.Multiply(rlFactory.MakeResourceFractionList(maximumResourceFractionToSchedule, math.Inf(1))) +} + +func calculatePerQueueLimits( + totalResources internaltypes.ResourceList, + pool string, + priorityClasses map[string]types.PriorityClass, + queues []*api.Queue, +) map[string]map[string]internaltypes.ResourceList { + limitsPerQueuePerPc := make(map[string]map[string]internaltypes.ResourceList, len(queues)) + + if totalResources.IsEmpty() { + return limitsPerQueuePerPc + } + rlFactory := totalResources.Factory() + + for pcName, pc := range priorityClasses { + defaultFractions := util.MergeMaps( + pc.MaximumResourceFractionPerQueue, + pc.MaximumResourceFractionPerQueueByPool[pool], + ) + + for _, queue := range queues { + fractions := defaultFractions + queueConfig, ok := queue.ResourceLimitsByPriorityClassName[pcName] + if ok { + fractions = util.MergeMaps(fractions, queueConfig.MaximumResourceFraction) + queuePoolConfig, ok := queueConfig.MaximumResourceFractionByPool[pool] + if ok { + fractions = util.MergeMaps(fractions, queuePoolConfig.GetMaximumResourceFraction()) + } + } -// isStrictlyLessOrEqual returns false if -// - there is a quantity in b greater than that in a or -// - there is a non-zero quantity in b not in a -// and true otherwise. -func isStrictlyLessOrEqual(a map[string]resource.Quantity, b map[string]resource.Quantity) bool { - for t, q := range b { - if q.Cmp(a[t]) == -1 { - return false + if _, ok := limitsPerQueuePerPc[queue.Name]; !ok { + limitsPerQueuePerPc[queue.Name] = map[string]internaltypes.ResourceList{} + } + limitsPerQueuePerPc[queue.Name][pcName] = totalResources.Multiply(rlFactory.MakeResourceFractionList(fractions, math.Inf(1))) } } - return true + + return limitsPerQueuePerPc } diff --git a/internal/scheduler/scheduling/constraints/constraints_test.go b/internal/scheduler/scheduling/constraints/constraints_test.go index 42f33847bed..6d2cffcd18e 100644 --- a/internal/scheduler/scheduling/constraints/constraints_test.go +++ b/internal/scheduler/scheduling/constraints/constraints_test.go @@ -11,8 +11,9 @@ import ( "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/scheduler/configuration" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/scheduling/context" + "github.com/armadaproject/armada/internal/scheduler/testfixtures" "github.com/armadaproject/armada/pkg/api" ) @@ -27,22 +28,32 @@ type constraintTest struct { } func TestConstraints(t *testing.T) { + rlFactory, err := internaltypes.NewResourceListFactory([]configuration.ResourceType{ + {Name: "cpu"}, + {Name: "memory"}, + {Name: "a"}, + {Name: "b"}, + {Name: "c"}, + {Name: "d"}, + }, nil) + assert.Nil(t, err) + tests := map[string]*constraintTest{ "no-constraints": makeConstraintsTest( - NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), + NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), makeSchedulingConfig(), - []*api.Queue{})), + []*api.Queue{{Name: "queue-1"}}, + ), rlFactory), "empty-queue-constraints": makeConstraintsTest( - NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), + NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), makeSchedulingConfig(), - []*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{}}})), - "within-constraints": makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), configuration.SchedulingConfig{ + []*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{}}}), rlFactory), + "within-constraints": makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{ MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1}, - MaxQueueLookback: 1000, PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.9, "memory": 0.9}}}}, - }, []*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}})), + }, []*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}}), rlFactory), "exceeds-queue-priority-class-constraint": func() *constraintTest { - t := makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{ + t := makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{ { Name: "queue-1", Cordoned: false, @@ -52,12 +63,12 @@ func TestConstraints(t *testing.T) { }, }, }, - })) + }), rlFactory) t.expectedCheckConstraintsReason = "resource limit exceeded" return t }(), "exceeds-queue-priority-class-pool-constraint": func() *constraintTest { - t := makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{ + t := makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{ { Name: "queue-1", Cordoned: false, @@ -71,48 +82,51 @@ func TestConstraints(t *testing.T) { }, }, }, - })) + }), rlFactory) t.expectedCheckConstraintsReason = "resource limit exceeded" return t }(), "exceeds-priority-class-constraint": func() *constraintTest { - t := makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), configuration.SchedulingConfig{ + t := makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{ MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1}, - MaxQueueLookback: 1000, PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.00000001, "memory": 0.9}}}}, - }, []*api.Queue{})) + }, []*api.Queue{{Name: "queue-1"}}), rlFactory) t.expectedCheckConstraintsReason = "resource limit exceeded" return t }(), - "priority-class-constraint-ignored-if-there-is-a-queue-constraint": makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), configuration.SchedulingConfig{ + "priority-class-constraint-ignored-if-there-is-a-queue-constraint": makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{ MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1}, - MaxQueueLookback: 1000, PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.00000001, "memory": 0.9}}}}, - }, []*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}})), + }, []*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}}), rlFactory), "one-constraint-per-level-falls-back-as-expected--within-limits": makeMultiLevelConstraintsTest( map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("19"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.39")}, "", "", + rlFactory, ), "one-constraint-per-level-falls-back-as-expected--a-exceeds-limits": makeMultiLevelConstraintsTest( map[string]resource.Quantity{"a": resource.MustParse("101"), "b": resource.MustParse("19"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.39")}, UnschedulableReasonMaximumResourcesExceeded, "", + rlFactory, ), "one-constraint-per-level-falls-back-as-expected--b-exceeds-limits": makeMultiLevelConstraintsTest( map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("21"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.39")}, UnschedulableReasonMaximumResourcesExceeded, "", + rlFactory, ), "one-constraint-per-level-falls-back-as-expected--c-exceeds-limits": makeMultiLevelConstraintsTest( map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("19"), "c": resource.MustParse("3.1"), "d": resource.MustParse("0.39")}, UnschedulableReasonMaximumResourcesExceeded, "", + rlFactory, ), "one-constraint-per-level-falls-back-as-expected--d-exceeds-limits": makeMultiLevelConstraintsTest( map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("19"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.41")}, UnschedulableReasonMaximumResourcesExceeded, "", + rlFactory, ), } for name, tc := range tests { @@ -122,7 +136,7 @@ func TestConstraints(t *testing.T) { require.Equal(t, tc.expectedCheckRoundConstraintsReason == "", ok) require.Equal(t, tc.expectedCheckRoundConstraintsReason, unscheduledReason) - ok, unscheduledReason, err = tc.constraints.CheckConstraints(tc.sctx, tc.gctx) + ok, unscheduledReason, err = tc.constraints.CheckJobConstraints(tc.sctx, tc.gctx) require.NoError(t, err) require.Equal(t, tc.expectedCheckConstraintsReason == "", ok) require.Equal(t, tc.expectedCheckConstraintsReason, unscheduledReason) @@ -131,20 +145,21 @@ func TestConstraints(t *testing.T) { } func TestCapResources(t *testing.T) { + rlFactory := testfixtures.TestResourceListFactory tests := map[string]struct { constraints SchedulingConstraints queue string - resources schedulerobjects.QuantityByTAndResourceType[string] - expectedResources schedulerobjects.QuantityByTAndResourceType[string] + resources map[string]internaltypes.ResourceList + expectedResources map[string]internaltypes.ResourceList }{ "no contraints": { - constraints: NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{}), + constraints: NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{{Name: "queue-1"}}), queue: "queue-1", - resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")}, - expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")}, + resources: map[string]internaltypes.ResourceList{"priority-class-1": makeResourceList(rlFactory, "1000", "1000Gi")}, + expectedResources: map[string]internaltypes.ResourceList{"priority-class-1": makeResourceList(rlFactory, "1000", "1000Gi")}, }, "unconstrained": { - constraints: NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), configuration.SchedulingConfig{ + constraints: NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{ PriorityClasses: map[string]types.PriorityClass{ "priority-class-1": { MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{ @@ -152,13 +167,13 @@ func TestCapResources(t *testing.T) { }, }, }, - }, []*api.Queue{}), + }, []*api.Queue{{Name: "queue-1"}}), queue: "queue-1", - resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1", "1Gi")}, - expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1", "1Gi")}, + resources: map[string]internaltypes.ResourceList{"priority-class-1": makeResourceList(rlFactory, "1", "1Gi")}, + expectedResources: map[string]internaltypes.ResourceList{"priority-class-1": makeResourceList(rlFactory, "1", "1Gi")}, }, "per pool cap": { - constraints: NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), configuration.SchedulingConfig{ + constraints: NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{ PriorityClasses: map[string]types.PriorityClass{ "priority-class-1": { MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{ @@ -166,13 +181,13 @@ func TestCapResources(t *testing.T) { }, }, }, - }, []*api.Queue{}), + }, []*api.Queue{{Name: "queue-1"}}), queue: "queue-1", - resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")}, - expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("100", "900Gi")}, + resources: map[string]internaltypes.ResourceList{"priority-class-1": makeResourceList(rlFactory, "1000", "1000Gi")}, + expectedResources: map[string]internaltypes.ResourceList{"priority-class-1": makeResourceList(rlFactory, "100", "900Gi")}, }, "per queue cap": { - constraints: NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), configuration.SchedulingConfig{ + constraints: NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{ PriorityClasses: map[string]types.PriorityClass{ "priority-class-1": { MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{ @@ -191,17 +206,18 @@ func TestCapResources(t *testing.T) { }, }), queue: "queue-1", - resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")}, - expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("900", "900Gi")}, + resources: map[string]internaltypes.ResourceList{"priority-class-1": makeResourceList(rlFactory, "1000", "1000Gi")}, + expectedResources: map[string]internaltypes.ResourceList{"priority-class-1": makeResourceList(rlFactory, "900", "900Gi")}, }, "per queue cap with multi pc": { - constraints: NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), configuration.SchedulingConfig{ + constraints: NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{ PriorityClasses: map[string]types.PriorityClass{ "priority-class-1": { MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{ "pool-1": {"cpu": 0.1, "memory": 0.9}, }, }, + "priority-class-2": {}, }, }, []*api.Queue{ { @@ -217,13 +233,13 @@ func TestCapResources(t *testing.T) { }, }), queue: "queue-1", - resources: map[string]schedulerobjects.ResourceList{ - "priority-class-1": makeResourceList("1000", "1000Gi"), - "priority-class-2": makeResourceList("2000", "2000Gi"), + resources: map[string]internaltypes.ResourceList{ + "priority-class-1": makeResourceList(rlFactory, "1000", "1000Gi"), + "priority-class-2": makeResourceList(rlFactory, "2000", "2000Gi"), }, - expectedResources: map[string]schedulerobjects.ResourceList{ - "priority-class-1": makeResourceList("100", "100Gi"), - "priority-class-2": makeResourceList("900", "900Gi"), + expectedResources: map[string]internaltypes.ResourceList{ + "priority-class-1": makeResourceList(rlFactory, "100", "100Gi"), + "priority-class-2": makeResourceList(rlFactory, "900", "900Gi"), }, }, } @@ -234,40 +250,33 @@ func TestCapResources(t *testing.T) { // Compare resources for equality. Note that we can't just do assert.Equal(tc.expectedResources, capped) // because the scale may have changed require.Equal(t, len(tc.expectedResources), len(capped), "number of priority classes differs") - for pc, rl := range tc.expectedResources { + for pc, expectedCappedRl := range tc.expectedResources { cappedRl, ok := capped[pc] require.True(t, ok, "no resource list found for priority class %s", pc) - require.Equal(t, len(rl.Resources), len(cappedRl.Resources), "number of resources differs for priority class %s", pc) - for res, qty := range rl.Resources { - cappedRes, ok := cappedRl.Resources[res] - require.True(t, ok, "resource %s doesn't exist at priority class %s", res, pc) - assert.Equal(t, 0, qty.Cmp(cappedRes), "resource %s differs at priority class %s", res, pc) - } + require.True(t, expectedCappedRl.Equal(cappedRl), "capped resources (%s) not as expected (%s)", cappedRl.String(), expectedCappedRl.String()) } }) } } -func makeMultiLevelConstraintsTest(requirements map[string]resource.Quantity, expectedCheckConstraintsReason string, expectedCheckRoundConstraintsReason string) *constraintTest { - zeroResources := schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{"a": resource.MustParse("0"), "b": resource.MustParse("0"), "c": resource.MustParse("0"), "d": resource.MustParse("0")}, - } +func makeMultiLevelConstraintsTest(requirements map[string]resource.Quantity, expectedCheckConstraintsReason string, expectedCheckRoundConstraintsReason string, rlFactory *internaltypes.ResourceListFactory) *constraintTest { + rr := rlFactory.FromJobResourceListIgnoreUnknown(requirements) return &constraintTest{ - constraints: makeMultiLevelConstraints(), + constraints: makeMultiLevelConstraints(rlFactory), sctx: &context.SchedulingContext{ Pool: "pool-1", WeightSum: 100, - ScheduledResources: zeroResources.DeepCopy(), + ScheduledResources: internaltypes.ResourceList{}, Limiter: rate.NewLimiter(1e9, 1e6), QueueSchedulingContexts: map[string]*context.QueueSchedulingContext{ "queue-1": { Queue: "queue-1", Weight: 1, Limiter: rate.NewLimiter(1e9, 1e6), - Allocated: zeroResources.DeepCopy(), - AllocatedByPriorityClass: schedulerobjects.QuantityByTAndResourceType[string]{"priority-class-1": schedulerobjects.ResourceList{ - Resources: requirements, - }}, + Allocated: internaltypes.ResourceList{}, + AllocatedByPriorityClass: map[string]internaltypes.ResourceList{ + "priority-class-1": rr, + }, }, }, Started: time.Now(), @@ -277,7 +286,7 @@ func makeMultiLevelConstraintsTest(requirements map[string]resource.Quantity, ex PriorityClassName: "priority-class-1", }, Queue: "queue-1", - TotalResourceRequests: schedulerobjects.ResourceList{Resources: requirements}, + TotalResourceRequests: rr, JobSchedulingContexts: []*context.JobSchedulingContext{{}}, }, queue: "queue-1", @@ -287,89 +296,64 @@ func makeMultiLevelConstraintsTest(requirements map[string]resource.Quantity, ex } } -func makeMultiLevelConstraints() SchedulingConstraints { - return NewSchedulingConstraints("pool-1", schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "a": resource.MustParse("1000"), - "b": resource.MustParse("1000"), - "c": resource.MustParse("1000"), - "d": resource.MustParse("1000"), - }, - }, configuration.SchedulingConfig{ - MaxQueueLookback: 1000, - PriorityClasses: map[string]types.PriorityClass{ - "priority-class-1": { - MaximumResourceFractionPerQueue: map[string]float64{ - "a": 0.0001, "b": 0.0002, "c": 0.0003, "d": 0.0004, - }, - MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{ - "pool-1": { - "a": 0.001, "b": 0.002, "c": 0.003, - }, - }, +func makeMultiLevelConstraints(rlFactory *internaltypes.ResourceListFactory) SchedulingConstraints { + return NewSchedulingConstraints("pool-1", + rlFactory.FromNodeProto( + map[string]resource.Quantity{ + "a": resource.MustParse("1000"), + "b": resource.MustParse("1000"), + "c": resource.MustParse("1000"), + "d": resource.MustParse("1000"), }, - }, - }, []*api.Queue{ - { - Name: "queue-1", - ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{ + ), + configuration.SchedulingConfig{ + PriorityClasses: map[string]types.PriorityClass{ "priority-class-1": { - MaximumResourceFraction: map[string]float64{"a": 0.01, "b": 0.02}, - MaximumResourceFractionByPool: map[string]*api.PriorityClassPoolResourceLimits{ + MaximumResourceFractionPerQueue: map[string]float64{ + "a": 0.0001, "b": 0.0002, "c": 0.0003, "d": 0.0004, + }, + MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{ "pool-1": { - MaximumResourceFraction: map[string]float64{"a": 0.1}, + "a": 0.001, "b": 0.002, "c": 0.003, + }, + }, + }, + }, + }, []*api.Queue{ + { + Name: "queue-1", + ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{ + "priority-class-1": { + MaximumResourceFraction: map[string]float64{"a": 0.01, "b": 0.02}, + MaximumResourceFractionByPool: map[string]*api.PriorityClassPoolResourceLimits{ + "pool-1": { + MaximumResourceFraction: map[string]float64{"a": 0.1}, + }, }, }, }, }, }, - }) -} - -func TestScaleQuantity(t *testing.T) { - tests := map[string]struct { - input resource.Quantity - f float64 - expected resource.Quantity - }{ - "one": { - input: resource.MustParse("1"), - f: 1, - expected: resource.MustParse("1"), - }, - "zero": { - input: resource.MustParse("1"), - f: 0, - expected: resource.MustParse("0"), - }, - "rounding": { - input: resource.MustParse("1"), - f: 0.3006, - expected: resource.MustParse("301m"), - }, - } - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - assert.True(t, tc.expected.Equal(ScaleQuantity(tc.input, tc.f)), "expected %s, but got %s", tc.expected.String(), tc.input.String()) - }) - } + ) } -func makeConstraintsTest(constraints SchedulingConstraints) *constraintTest { +func makeConstraintsTest(constraints SchedulingConstraints, rlFactory *internaltypes.ResourceListFactory) *constraintTest { return &constraintTest{ constraints: constraints, sctx: &context.SchedulingContext{ Pool: "pool-1", WeightSum: 100, - ScheduledResources: makeResourceList("1", "1Gi"), + ScheduledResources: makeResourceList(rlFactory, "1", "1Gi"), Limiter: rate.NewLimiter(1e9, 1e6), QueueSchedulingContexts: map[string]*context.QueueSchedulingContext{ "queue-1": { - Queue: "queue-1", - Weight: 1, - Limiter: rate.NewLimiter(1e9, 1e6), - Allocated: makeResourceList("30", "1Gi"), - AllocatedByPriorityClass: schedulerobjects.QuantityByTAndResourceType[string]{"priority-class-1": makeResourceList("20", "1Gi")}, + Queue: "queue-1", + Weight: 1, + Limiter: rate.NewLimiter(1e9, 1e6), + Allocated: makeResourceList(rlFactory, "30", "1Gi"), + AllocatedByPriorityClass: map[string]internaltypes.ResourceList{ + "priority-class-1": makeResourceList(rlFactory, "20", "1Gi"), + }, }, }, Started: time.Now(), @@ -379,7 +363,7 @@ func makeConstraintsTest(constraints SchedulingConstraints) *constraintTest { PriorityClassName: "priority-class-1", }, Queue: "queue-1", - TotalResourceRequests: makeResourceList("1", "1Gi"), + TotalResourceRequests: makeResourceList(rlFactory, "1", "1Gi"), JobSchedulingContexts: []*context.JobSchedulingContext{{}}, }, queue: "queue-1", @@ -389,98 +373,18 @@ func makeConstraintsTest(constraints SchedulingConstraints) *constraintTest { } } -func TestIsStrictlyLessOrEqual(t *testing.T) { - tests := map[string]struct { - a map[string]resource.Quantity - b map[string]resource.Quantity - expected bool - }{ - "both empty": { - a: make(map[string]resource.Quantity), - b: make(map[string]resource.Quantity), - expected: true, - }, - "zero and missing is equal": { - a: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("0"), - }, - b: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - }, - expected: true, - }, - "simple equal": { - a: map[string]resource.Quantity{ - "cpu": resource.MustParse("1"), - "memory": resource.MustParse("2"), - "foo": resource.MustParse("3"), - }, - b: map[string]resource.Quantity{ - "cpu": resource.MustParse("1"), - "memory": resource.MustParse("2"), - "foo": resource.MustParse("3"), - }, - expected: true, - }, - "simple true": { - a: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - }, - b: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("3"), - }, - expected: true, - }, - "simple false": { - a: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("3"), - }, - b: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - }, - expected: false, - }, - "present in a missing in b true": { - a: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - }, - b: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - }, - expected: true, - }, - "missing in a present in b true": { - a: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - }, - b: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - }, - expected: true, - }, - } - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - assert.Equal(t, tc.expected, isStrictlyLessOrEqual(tc.a, tc.b)) - }) - } -} - func makeSchedulingConfig() configuration.SchedulingConfig { return configuration.SchedulingConfig{ MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1}, - MaxQueueLookback: 1000, PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {}}, } } -func makeResourceList(cpu string, memory string) schedulerobjects.ResourceList { - return schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse(cpu), "memory": resource.MustParse(memory)}} +func makeResourceList(rlFactory *internaltypes.ResourceListFactory, cpu string, memory string) internaltypes.ResourceList { + return rlFactory.FromNodeProto( + map[string]resource.Quantity{ + "cpu": resource.MustParse(cpu), + "memory": resource.MustParse(memory), + }, + ) } diff --git a/internal/scheduler/scheduling/context/gang.go b/internal/scheduler/scheduling/context/gang.go index 0e6a58a9a6b..13cfddb4bba 100644 --- a/internal/scheduler/scheduling/context/gang.go +++ b/internal/scheduler/scheduling/context/gang.go @@ -3,7 +3,7 @@ package context import ( "time" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" ) type GangSchedulingContext struct { @@ -11,18 +11,18 @@ type GangSchedulingContext struct { Queue string GangInfo JobSchedulingContexts []*JobSchedulingContext - TotalResourceRequests schedulerobjects.ResourceList + TotalResourceRequests internaltypes.ResourceList AllJobsEvicted bool RequestsFloatingResources bool } func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingContext { allJobsEvicted := true - totalResourceRequests := schedulerobjects.NewResourceList(4) + totalResourceRequests := internaltypes.ResourceList{} requestsFloatingResources := false for _, jctx := range jctxs { allJobsEvicted = allJobsEvicted && jctx.IsEvicted - totalResourceRequests.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) + totalResourceRequests = totalResourceRequests.Add(jctx.Job.AllResourceRequirements()) if jctx.Job.RequestsFloatingResources() { requestsFloatingResources = true } diff --git a/internal/scheduler/scheduling/context/gang_test.go b/internal/scheduler/scheduling/context/gang_test.go index 88812e7beea..405826ca6ff 100644 --- a/internal/scheduler/scheduling/context/gang_test.go +++ b/internal/scheduler/scheduling/context/gang_test.go @@ -6,7 +6,6 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/resource" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduler/testfixtures" ) @@ -18,12 +17,12 @@ func TestNewGangSchedulingContext(t *testing.T) { assert.Equal(t, testfixtures.TestDefaultPriorityClass, gctx.GangInfo.PriorityClassName) assert.True( t, - schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ + testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown( + map[string]resource.Quantity{ "cpu": resource.MustParse("2"), "memory": resource.MustParse("8Gi"), }, - }.Equal( + ).Equal( gctx.TotalResourceRequests, ), ) diff --git a/internal/scheduler/scheduling/context/queue.go b/internal/scheduler/scheduling/context/queue.go index cd09b23b128..b4707029f3b 100644 --- a/internal/scheduler/scheduling/context/queue.go +++ b/internal/scheduler/scheduling/context/queue.go @@ -12,6 +12,7 @@ import ( "golang.org/x/time/rate" armadaslices "github.com/armadaproject/armada/internal/common/slices" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -32,23 +33,20 @@ type QueueSchedulingContext struct { Limiter *rate.Limiter // Total resources assigned to the queue across all clusters by priority class priority. // Includes jobs scheduled during this invocation of the scheduler. - Allocated schedulerobjects.ResourceList + Allocated internaltypes.ResourceList // Total demand from this queue. This is essentially the cumulative resources of all non-terminal jobs at the // start of the scheduling cycle - Demand schedulerobjects.ResourceList + Demand internaltypes.ResourceList // Capped Demand for this queue. This differs from Demand in that it takes into account any limits that we have // placed on the queue - CappedDemand schedulerobjects.ResourceList + CappedDemand internaltypes.ResourceList // Fair share is the weight of this queue over the sum of the weights of all queues FairShare float64 // AdjustedFairShare modifies fair share such that queues that have a demand cost less than their fair share, have their fair share reallocated. AdjustedFairShare float64 // Total resources assigned to the queue across all clusters by priority class. // Includes jobs scheduled during this invocation of the scheduler. - AllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] - // Total away resources assigned to the queue across all clusters by priority class. - // Includes away jobs scheduled during this invocation of the scheduler. - AwayAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] + AllocatedByPriorityClass map[string]internaltypes.ResourceList // Resources assigned to this queue during this scheduling cycle. ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] // Resources evicted from this queue during this scheduling cycle. @@ -66,7 +64,7 @@ func (qctx *QueueSchedulingContext) String() string { } // GetAllocation is necessary to implement the fairness.Queue interface. -func (qctx *QueueSchedulingContext) GetAllocation() schedulerobjects.ResourceList { +func (qctx *QueueSchedulingContext) GetAllocation() internaltypes.ResourceList { return qctx.Allocated } @@ -89,8 +87,10 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string { fmt.Fprintf(w, "Preempted resources:\t%s\n", qctx.EvictedResourcesByPriorityClass.AggregateByResource().CompactString()) fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriorityClass.String()) if verbosity >= 0 { - fmt.Fprintf(w, "Total allocated resources after scheduling:\t%s\n", qctx.Allocated.CompactString()) - fmt.Fprintf(w, "Total allocated resources after scheduling by priority class:\t%s\n", qctx.AllocatedByPriorityClass) + fmt.Fprintf(w, "Total allocated resources after scheduling:\t%s\n", qctx.Allocated.String()) + for pc, res := range qctx.AllocatedByPriorityClass { + fmt.Fprintf(w, "Total allocated resources after scheduling by for priority class %s:\t%s\n", pc, res.String()) + } fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", len(qctx.SuccessfulJobSchedulingContexts)) fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", len(qctx.EvictedJobsById)) fmt.Fprintf(w, "Number of jobs that could not be scheduled:\t%d\n", len(qctx.UnsuccessfulJobSchedulingContexts)) @@ -170,8 +170,9 @@ func (qctx *QueueSchedulingContext) addJobSchedulingContext(jctx *JobSchedulingC // Always update ResourcesByPriority. // Since ResourcesByPriority is used to order queues by fraction of fair share. - qctx.Allocated.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) - qctx.AllocatedByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) + pcName := jctx.Job.PriorityClassName() + qctx.AllocatedByPriorityClass[pcName] = qctx.AllocatedByPriorityClass[pcName].Add(jctx.Job.AllResourceRequirements()) + qctx.Allocated = qctx.Allocated.Add(jctx.Job.AllResourceRequirements()) // Only if the job is not evicted, update ScheduledResourcesByPriority. // Since ScheduledResourcesByPriority is used to control per-round scheduling constraints. @@ -205,8 +206,10 @@ func (qctx *QueueSchedulingContext) evictJob(job *jobdb.Job) (bool, error) { qctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.PriorityClassName(), rl) qctx.EvictedJobsById[jobId] = true } - qctx.Allocated.SubV1ResourceList(rl) - qctx.AllocatedByPriorityClass.SubV1ResourceList(job.PriorityClassName(), rl) + pcName := job.PriorityClassName() + qctx.AllocatedByPriorityClass[pcName] = qctx.AllocatedByPriorityClass[pcName].Subtract(job.AllResourceRequirements()) + qctx.Allocated = qctx.Allocated.Subtract(job.AllResourceRequirements()) + return scheduledInThisRound, nil } diff --git a/internal/scheduler/scheduling/context/scheduling.go b/internal/scheduler/scheduling/context/scheduling.go index e3bb7ae9d32..0feb303ba73 100644 --- a/internal/scheduler/scheduling/context/scheduling.go +++ b/internal/scheduler/scheduling/context/scheduling.go @@ -14,6 +14,7 @@ import ( "github.com/armadaproject/armada/internal/common/armadaerrors" armadamaps "github.com/armadaproject/armada/internal/common/maps" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduler/scheduling/fairness" @@ -37,11 +38,11 @@ type SchedulingContext struct { // Per-queue scheduling contexts. QueueSchedulingContexts map[string]*QueueSchedulingContext // Total resources across all clusters in this pool available at the start of the scheduling cycle. - TotalResources schedulerobjects.ResourceList + TotalResources internaltypes.ResourceList // Allocated resources across all clusters in this pool - Allocated schedulerobjects.ResourceList + Allocated internaltypes.ResourceList // Resources assigned across all queues during this scheduling cycle. - ScheduledResources schedulerobjects.ResourceList + ScheduledResources internaltypes.ResourceList ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] // Resources evicted across all queues during this scheduling cycle. EvictedResources schedulerobjects.ResourceList @@ -67,7 +68,7 @@ func NewSchedulingContext( pool string, fairnessCostProvider fairness.FairnessCostProvider, limiter *rate.Limiter, - totalResources schedulerobjects.ResourceList, + totalResources internaltypes.ResourceList, ) *SchedulingContext { return &SchedulingContext{ Started: time.Now(), @@ -75,8 +76,8 @@ func NewSchedulingContext( FairnessCostProvider: fairnessCostProvider, Limiter: limiter, QueueSchedulingContexts: make(map[string]*QueueSchedulingContext), - TotalResources: totalResources.DeepCopy(), - ScheduledResources: schedulerobjects.NewResourceListWithDefaultSize(), + TotalResources: totalResources, + ScheduledResources: internaltypes.ResourceList{}, ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]), EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]), SchedulingKeyGenerator: schedulerobjects.NewSchedulingKeyGenerator(), @@ -90,9 +91,9 @@ func (sctx *SchedulingContext) ClearUnfeasibleSchedulingKeys() { func (sctx *SchedulingContext) AddQueueSchedulingContext( queue string, weight float64, - initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string], - demand schedulerobjects.ResourceList, - cappedDemand schedulerobjects.ResourceList, + initialAllocatedByPriorityClass map[string]internaltypes.ResourceList, + demand internaltypes.ResourceList, + cappedDemand internaltypes.ResourceList, limiter *rate.Limiter, ) error { if _, ok := sctx.QueueSchedulingContexts[queue]; ok { @@ -103,16 +104,16 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext( }) } if initialAllocatedByPriorityClass == nil { - initialAllocatedByPriorityClass = make(schedulerobjects.QuantityByTAndResourceType[string]) + initialAllocatedByPriorityClass = map[string]internaltypes.ResourceList{} } else { - initialAllocatedByPriorityClass = initialAllocatedByPriorityClass.DeepCopy() + initialAllocatedByPriorityClass = maps.Clone(initialAllocatedByPriorityClass) } - allocated := schedulerobjects.NewResourceListWithDefaultSize() + allocated := internaltypes.ResourceList{} for _, rl := range initialAllocatedByPriorityClass { - allocated.Add(rl) + allocated = allocated.Add(rl) } sctx.WeightSum += weight - sctx.Allocated.Add(allocated) + sctx.Allocated = sctx.Allocated.Add(allocated) qctx := &QueueSchedulingContext{ SchedulingContext: sctx, @@ -161,7 +162,7 @@ func (sctx *SchedulingContext) UpdateFairShares() { queueInfos := make([]*queueInfo, 0, len(sctx.QueueSchedulingContexts)) for queueName, qctx := range sctx.QueueSchedulingContexts { cappedShare := 1.0 - if !sctx.TotalResources.IsZero() { + if !sctx.TotalResources.AllZero() { cappedShare = sctx.FairnessCostProvider.UnweightedCostFromAllocation(qctx.CappedDemand) } queueInfos = append(queueInfos, &queueInfo{ @@ -218,8 +219,8 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string { fmt.Fprintf(w, "Finished:\t%s\n", sctx.Finished) fmt.Fprintf(w, "Duration:\t%s\n", sctx.Finished.Sub(sctx.Started)) fmt.Fprintf(w, "Termination reason:\t%s\n", sctx.TerminationReason) - fmt.Fprintf(w, "Total capacity:\t%s\n", sctx.TotalResources.CompactString()) - fmt.Fprintf(w, "Scheduled resources:\t%s\n", sctx.ScheduledResources.CompactString()) + fmt.Fprintf(w, "Total capacity:\t%s\n", sctx.TotalResources.String()) + fmt.Fprintf(w, "Scheduled resources:\t%s\n", sctx.ScheduledResources.String()) fmt.Fprintf(w, "Preempted resources:\t%s\n", sctx.EvictedResources.CompactString()) fmt.Fprintf(w, "Number of gangs scheduled:\t%d\n", sctx.NumScheduledGangs) fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", sctx.NumScheduledJobs) @@ -296,11 +297,11 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) sctx.NumEvictedJobs-- } else { - sctx.ScheduledResources.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) + sctx.ScheduledResources = sctx.ScheduledResources.Add(jctx.Job.AllResourceRequirements()) sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) sctx.NumScheduledJobs++ } - sctx.Allocated.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) + sctx.Allocated = sctx.Allocated.Add(jctx.Job.AllResourceRequirements()) } return evictedInThisRound, nil } @@ -341,7 +342,7 @@ func (sctx *SchedulingContext) EvictJob(jctx *JobSchedulingContext) (bool, error } rl := jctx.Job.ResourceRequirements().Requests if scheduledInThisRound { - sctx.ScheduledResources.SubV1ResourceList(rl) + sctx.ScheduledResources = sctx.ScheduledResources.Subtract(jctx.Job.AllResourceRequirements()) sctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), rl) sctx.NumScheduledJobs-- } else { @@ -349,7 +350,7 @@ func (sctx *SchedulingContext) EvictJob(jctx *JobSchedulingContext) (bool, error sctx.EvictedResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), rl) sctx.NumEvictedJobs++ } - sctx.Allocated.SubV1ResourceList(rl) + sctx.Allocated = sctx.Allocated.Subtract(jctx.Job.AllResourceRequirements()) return scheduledInThisRound, nil } @@ -371,14 +372,14 @@ func (sctx *SchedulingContext) SuccessfulJobSchedulingContexts() []*JobSchedulin } // AllocatedByQueueAndPriority returns map from queue name and priority to resources allocated. -func (sctx *SchedulingContext) AllocatedByQueueAndPriority() map[string]schedulerobjects.QuantityByTAndResourceType[string] { +func (sctx *SchedulingContext) AllocatedByQueueAndPriority() map[string]map[string]internaltypes.ResourceList { rv := make( - map[string]schedulerobjects.QuantityByTAndResourceType[string], + map[string]map[string]internaltypes.ResourceList, len(sctx.QueueSchedulingContexts), ) for queue, qctx := range sctx.QueueSchedulingContexts { - if !qctx.AllocatedByPriorityClass.IsZero() { - rv[queue] = qctx.AllocatedByPriorityClass.DeepCopy() + if !internaltypes.RlMapAllZero(qctx.AllocatedByPriorityClass) { + rv[queue] = maps.Clone(qctx.AllocatedByPriorityClass) } } return rv diff --git a/internal/scheduler/scheduling/context/scheduling_test.go b/internal/scheduler/scheduling/context/scheduling_test.go index 9c1334f8c6c..8031aa7540e 100644 --- a/internal/scheduler/scheduling/context/scheduling_test.go +++ b/internal/scheduler/scheduling/context/scheduling_test.go @@ -6,18 +6,18 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/exp/maps" "k8s.io/apimachinery/pkg/api/resource" - armadaslices "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/internal/scheduler/configuration" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/scheduling/fairness" "github.com/armadaproject/armada/internal/scheduler/testfixtures" ) func TestSchedulingContextAccounting(t *testing.T) { - totalResources := schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}} + totalResources := testfixtures.TestResourceListFactory.FromNodeProto( + map[string]resource.Quantity{"cpu": resource.MustParse("1")}, + ) fairnessCostProvider, err := fairness.NewDominantResourceFairness(totalResources, configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"cpu"}}) require.NoError(t, err) sctx := NewSchedulingContext( @@ -27,13 +27,15 @@ func TestSchedulingContextAccounting(t *testing.T) { totalResources, ) priorityFactorByQueue := map[string]float64{"A": 1, "B": 1} - allocatedByQueueAndPriorityClass := map[string]schedulerobjects.QuantityByTAndResourceType[string]{ + allocatedByQueueAndPriorityClass := map[string]map[string]internaltypes.ResourceList{ "A": { - "foo": schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + testfixtures.TestDefaultPriorityClass: testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown( + map[string]resource.Quantity{"cpu": resource.MustParse("1")}, + ), }, } for _, queue := range []string{"A", "B"} { - err := sctx.AddQueueSchedulingContext(queue, priorityFactorByQueue[queue], allocatedByQueueAndPriorityClass[queue], schedulerobjects.ResourceList{}, schedulerobjects.ResourceList{}, nil) + err := sctx.AddQueueSchedulingContext(queue, priorityFactorByQueue[queue], allocatedByQueueAndPriorityClass[queue], internaltypes.ResourceList{}, internaltypes.ResourceList{}, nil) require.NoError(t, err) } @@ -42,18 +44,15 @@ func TestSchedulingContextAccounting(t *testing.T) { gctx := NewGangSchedulingContext(jctxs) _, err = sctx.AddGangSchedulingContext(gctx) require.NoError(t, err) + for _, jctx := range jctxs { _, err := sctx.EvictJob(jctx) require.NoError(t, err) } actual := sctx.AllocatedByQueueAndPriority() - queues := armadaslices.Unique( - armadaslices.Concatenate(maps.Keys(actual), maps.Keys(expected)), - ) - for _, queue := range queues { - assert.True(t, expected[queue].Equal(actual[queue])) - } + assert.Equal(t, expected, actual) + _, err = sctx.AddGangSchedulingContext(gctx) require.NoError(t, err) } @@ -65,7 +64,7 @@ func TestCalculateFairShares(t *testing.T) { oneHundredCpu := cpu(100) oneThousandCpu := cpu(1000) tests := map[string]struct { - availableResources schedulerobjects.ResourceList + availableResources internaltypes.ResourceList queueCtxs map[string]*QueueSchedulingContext expectedFairShares map[string]float64 expectedAdjustedFairShares map[string]float64 @@ -183,7 +182,7 @@ func TestCalculateFairShares(t *testing.T) { ) for qName, q := range tc.queueCtxs { err = sctx.AddQueueSchedulingContext( - qName, q.Weight, schedulerobjects.QuantityByTAndResourceType[string]{}, q.Demand, q.Demand, nil) + qName, q.Weight, map[string]internaltypes.ResourceList{}, q.Demand, q.Demand, nil) require.NoError(t, err) } sctx.UpdateFairShares() @@ -201,7 +200,7 @@ func TestCalculateFairShares(t *testing.T) { func TestCalculateFairnessError(t *testing.T) { tests := map[string]struct { - availableResources schedulerobjects.ResourceList + availableResources internaltypes.ResourceList queueCtxs map[string]*QueueSchedulingContext expected float64 }{ @@ -278,8 +277,8 @@ func testSmallCpuJobSchedulingContext(queue, priorityClassName string) *JobSched } } -func cpu(n int) schedulerobjects.ResourceList { - return schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{"cpu": resource.MustParse(fmt.Sprintf("%d", n))}, - } +func cpu(n int) internaltypes.ResourceList { + return testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown( + map[string]resource.Quantity{"cpu": resource.MustParse(fmt.Sprintf("%d", n))}, + ) } diff --git a/internal/scheduler/scheduling/fairness/fairness.go b/internal/scheduler/scheduling/fairness/fairness.go index e71cdad2500..5072a3b4a94 100644 --- a/internal/scheduler/scheduling/fairness/fairness.go +++ b/internal/scheduler/scheduling/fairness/fairness.go @@ -4,11 +4,10 @@ import ( "fmt" "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/api/resource" - "github.com/armadaproject/armada/internal/common/slices" + "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/scheduler/configuration" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" ) // QueueRepository is a minimal representation of a queue repository used for computing fairness. @@ -19,31 +18,32 @@ type QueueRepository interface { // Queue is a minimal representation of a queue used for computing fairness. type Queue interface { // GetAllocation returns the current allocation of the queue. - GetAllocation() schedulerobjects.ResourceList + GetAllocation() internaltypes.ResourceList GetWeight() float64 } // FairnessCostProvider captures algorithms to compute the cost of an allocation. type FairnessCostProvider interface { UnweightedCostFromQueue(queue Queue) float64 - UnweightedCostFromAllocation(allocation schedulerobjects.ResourceList) float64 + UnweightedCostFromAllocation(allocation internaltypes.ResourceList) float64 WeightedCostFromQueue(queue Queue) float64 - WeightedCostFromAllocation(allocation schedulerobjects.ResourceList, weight float64) float64 -} - -type resourceToConsider struct { - Name string - Multiplier float64 + WeightedCostFromAllocation(allocation internaltypes.ResourceList, weight float64) float64 } type DominantResourceFairness struct { // Total resources across all nodes. - totalResources schedulerobjects.ResourceList - // Resources considered when computing DominantResourceFairness. - resourcesToConsider []resourceToConsider + totalResources internaltypes.ResourceList + // Weight (defined in config) for each resource. + // Typically 1.0 (we care about that resource when assigning costs), + // or 0.0 (we don't care). However other values are possible. + multipliers internaltypes.ResourceFractionList } -func NewDominantResourceFairness(totalResources schedulerobjects.ResourceList, config configuration.SchedulingConfig) (*DominantResourceFairness, error) { +func NewDominantResourceFairness(totalResources internaltypes.ResourceList, config configuration.SchedulingConfig) (*DominantResourceFairness, error) { + if totalResources.IsEmpty() { + return &DominantResourceFairness{}, nil + } + if len(config.DominantResourceFairnessResourcesToConsider) != 0 && len(config.ExperimentalDominantResourceFairnessResourcesToConsider) != 0 { return nil, errors.New("config error - only one of DominantResourceFairnessResourcesToConsider and ExperimentalDominantResourceFairnessResourcesToConsider should be set") } @@ -53,22 +53,26 @@ func NewDominantResourceFairness(totalResources schedulerobjects.ResourceList, c } } - var resourcesToConsider []resourceToConsider + var multipliers map[string]float64 if len(config.DominantResourceFairnessResourcesToConsider) > 0 { - resourcesToConsider = slices.Map(config.DominantResourceFairnessResourcesToConsider, func(n string) resourceToConsider { - return resourceToConsider{Name: n, Multiplier: 1} + multipliers = maps.FromSlice(config.DominantResourceFairnessResourcesToConsider, func(n string) string { + return n + }, func(n string) float64 { + return 1.0 }) } else if len(config.ExperimentalDominantResourceFairnessResourcesToConsider) > 0 { - resourcesToConsider = slices.Map(config.ExperimentalDominantResourceFairnessResourcesToConsider, func(r configuration.DominantResourceFairnessResource) resourceToConsider { - return resourceToConsider{Name: r.Name, Multiplier: defaultMultiplier(r.Multiplier)} + multipliers = maps.FromSlice(config.ExperimentalDominantResourceFairnessResourcesToConsider, func(r configuration.DominantResourceFairnessResource) string { + return r.Name + }, func(r configuration.DominantResourceFairnessResource) float64 { + return defaultMultiplier(r.Multiplier) }) } else { return nil, errors.New("config error - DominantResourceFairnessResourcesToConsider and ExperimentalDominantResourceFairnessResourcesToConsider are both empty") } return &DominantResourceFairness{ - totalResources: totalResources, - resourcesToConsider: resourcesToConsider, + totalResources: totalResources, + multipliers: totalResources.Factory().MakeResourceFractionList(multipliers, 0.0), }, nil } @@ -87,23 +91,10 @@ func (f *DominantResourceFairness) UnweightedCostFromQueue(queue Queue) float64 return f.UnweightedCostFromAllocation(queue.GetAllocation()) } -func (f *DominantResourceFairness) WeightedCostFromAllocation(allocation schedulerobjects.ResourceList, weight float64) float64 { +func (f *DominantResourceFairness) WeightedCostFromAllocation(allocation internaltypes.ResourceList, weight float64) float64 { return f.UnweightedCostFromAllocation(allocation) / weight } -func (f *DominantResourceFairness) UnweightedCostFromAllocation(allocation schedulerobjects.ResourceList) float64 { - var cost float64 - for _, t := range f.resourcesToConsider { - capacity := f.totalResources.Get(t.Name) - if capacity.Equal(resource.Quantity{}) { - // Ignore any resources with zero capacity. - continue - } - q := allocation.Get(t.Name) - tcost := q.AsApproximateFloat64() / capacity.AsApproximateFloat64() * t.Multiplier - if tcost > cost { - cost = tcost - } - } - return cost +func (f *DominantResourceFairness) UnweightedCostFromAllocation(allocation internaltypes.ResourceList) float64 { + return max(0, allocation.DivideZeroOnError(f.totalResources).Multiply(f.multipliers).Max()) } diff --git a/internal/scheduler/scheduling/fairness/fairness_test.go b/internal/scheduler/scheduling/fairness/fairness_test.go index d5992092546..16cc4b4554e 100644 --- a/internal/scheduler/scheduling/fairness/fairness_test.go +++ b/internal/scheduler/scheduling/fairness/fairness_test.go @@ -8,15 +8,16 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/scheduler/configuration" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) type MinimalQueue struct { - allocation schedulerobjects.ResourceList + allocation internaltypes.ResourceList weight float64 } -func (q MinimalQueue) GetAllocation() schedulerobjects.ResourceList { +func (q MinimalQueue) GetAllocation() internaltypes.ResourceList { return q.allocation } @@ -25,12 +26,12 @@ func (q MinimalQueue) GetWeight() float64 { } func TestNewDominantResourceFairness(t *testing.T) { + rlFactory := makeTestResourceListFactory() _, err := NewDominantResourceFairness( - schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - }, + rlFactory.FromNodeProto(map[string]resource.Quantity{ + "foo": resource.MustParse("1"), }, + ), configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{}}, ) require.Error(t, err) @@ -204,20 +205,41 @@ func TestDominantResourceFairness(t *testing.T) { expectedCost: 2, }, } + + rlFactory := makeTestResourceListFactory() + for name, tc := range tests { t.Run(name, func(t *testing.T) { - f, err := NewDominantResourceFairness(tc.totalResources, tc.config) + totalResources := rlFactory.FromNodeProto(tc.totalResources.Resources) + allocation := rlFactory.FromJobResourceListIgnoreUnknown(tc.allocation.Resources) + f, err := NewDominantResourceFairness(totalResources, tc.config) require.NoError(t, err) assert.Equal( t, tc.expectedCost, - f.WeightedCostFromAllocation(tc.allocation, tc.weight), + f.WeightedCostFromAllocation(allocation, tc.weight), ) assert.Equal( t, - f.WeightedCostFromAllocation(tc.allocation, tc.weight), - f.WeightedCostFromQueue(MinimalQueue{allocation: tc.allocation, weight: tc.weight}), + f.WeightedCostFromAllocation(allocation, tc.weight), + f.WeightedCostFromQueue(MinimalQueue{allocation: allocation, weight: tc.weight}), ) }) } } + +func makeTestResourceListFactory() *internaltypes.ResourceListFactory { + rlFactory, err := internaltypes.NewResourceListFactory( + []configuration.ResourceType{ + {Name: "foo"}, + {Name: "bar"}, + }, + []configuration.FloatingResourceConfig{ + {Name: "baz"}, + }, + ) + if err != nil { + panic(err) + } + return rlFactory +} diff --git a/internal/scheduler/scheduling/gang_scheduler.go b/internal/scheduler/scheduling/gang_scheduler.go index 247c83e9b77..e8e0c898f45 100644 --- a/internal/scheduler/scheduling/gang_scheduler.go +++ b/internal/scheduler/scheduling/gang_scheduler.go @@ -136,7 +136,7 @@ func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *context.Gan gangAddedToSchedulingContext = true if !gctx.AllJobsEvicted { // Only perform these checks for new jobs to avoid preempting jobs if, e.g., MinimumJobSize changes. - if ok, unschedulableReason, err = sch.constraints.CheckConstraints(sch.schedulingContext, gctx); err != nil || !ok { + if ok, unschedulableReason, err = sch.constraints.CheckJobConstraints(sch.schedulingContext, gctx); err != nil || !ok { return } } diff --git a/internal/scheduler/scheduling/gang_scheduler_test.go b/internal/scheduler/scheduling/gang_scheduler_test.go index e7f2dd17e77..d1626839282 100644 --- a/internal/scheduler/scheduling/gang_scheduler_test.go +++ b/internal/scheduler/scheduling/gang_scheduler_test.go @@ -18,6 +18,7 @@ import ( "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/floatingresources" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -25,6 +26,7 @@ import ( "github.com/armadaproject/armada/internal/scheduler/scheduling/context" "github.com/armadaproject/armada/internal/scheduler/scheduling/fairness" "github.com/armadaproject/armada/internal/scheduler/testfixtures" + "github.com/armadaproject/armada/pkg/api" ) func TestGangScheduler(t *testing.T) { @@ -605,7 +607,9 @@ func TestGangScheduler(t *testing.T) { txn.Commit() if tc.TotalResources.Resources == nil { // Default to NodeDb total. - tc.TotalResources = nodeDb.TotalKubernetesResources() + tc.TotalResources = schedulerobjects.ResourceList{ + Resources: nodeDb.TotalKubernetesResources().ToMap(), + } } priorityFactorByQueue := make(map[string]float64) for _, jobs := range tc.Gangs { @@ -614,8 +618,10 @@ func TestGangScheduler(t *testing.T) { } } + totalResources := testfixtures.TestResourceListFactory.FromNodeProto(tc.TotalResources.Resources) + fairnessCostProvider, err := fairness.NewDominantResourceFairness( - tc.TotalResources, + totalResources, tc.SchedulingConfig, ) require.NoError(t, err) @@ -626,15 +632,15 @@ func TestGangScheduler(t *testing.T) { rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), tc.SchedulingConfig.MaximumSchedulingBurst, ), - tc.TotalResources, + totalResources, ) for queue, priorityFactor := range priorityFactorByQueue { err := sctx.AddQueueSchedulingContext( queue, priorityFactor, nil, - schedulerobjects.NewResourceList(0), - schedulerobjects.NewResourceList(0), + internaltypes.ResourceList{}, + internaltypes.ResourceList{}, rate.NewLimiter( rate.Limit(tc.SchedulingConfig.MaximumPerQueueSchedulingRate), tc.SchedulingConfig.MaximumPerQueueSchedulingBurst, @@ -642,8 +648,16 @@ func TestGangScheduler(t *testing.T) { ) require.NoError(t, err) } - constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil) - floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(tc.SchedulingConfig.ExperimentalFloatingResources) + constraints := schedulerconstraints.NewSchedulingConstraints( + "pool", + totalResources, + tc.SchedulingConfig, + armadaslices.Map( + maps.Keys(priorityFactorByQueue), + func(qn string) *api.Queue { return &api.Queue{Name: qn} }, + ), + ) + floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(tc.SchedulingConfig.ExperimentalFloatingResources, testfixtures.TestResourceListFactory) require.NoError(t, err) sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, false) require.NoError(t, err) diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler.go b/internal/scheduler/scheduling/preempting_queue_scheduler.go index 34d9d36ae9f..dec7c2422cd 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler.go @@ -14,9 +14,9 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" armadaslices "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/internal/scheduler/floatingresources" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/scheduling/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/scheduling/context" "github.com/armadaproject/armada/internal/scheduler/scheduling/fairness" @@ -407,8 +407,11 @@ func (sch *PreemptingQueueScheduler) setEvictedGangCardinality(evictorResult *Ev func (sch *PreemptingQueueScheduler) evictionAssertions(evictorResult *EvictorResult) error { for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { - if !qctx.AllocatedByPriorityClass.IsStrictlyNonNegative() { - return errors.Errorf("negative allocation for queue %s after eviction: %s", qctx.Queue, qctx.AllocatedByPriorityClass) + if internaltypes.RlMapHasNegativeValues(qctx.AllocatedByPriorityClass) { + return errors.Errorf("negative allocation for queue %s after eviction: %s", + qctx.Queue, + internaltypes.RlMapToString(qctx.AllocatedByPriorityClass), + ) } } evictedJobIdsByGangId := make(map[string]map[string]bool) @@ -454,17 +457,17 @@ func (qr *MinimalQueueRepository) GetQueue(name string) (fairness.Queue, bool) { func NewMinimalQueueRepositoryFromSchedulingContext(sctx *schedulercontext.SchedulingContext) *MinimalQueueRepository { queues := make(map[string]MinimalQueue, len(sctx.QueueSchedulingContexts)) for name, qctx := range sctx.QueueSchedulingContexts { - queues[name] = MinimalQueue{allocation: qctx.Allocated.DeepCopy(), weight: qctx.Weight} + queues[name] = MinimalQueue{allocation: qctx.Allocated, weight: qctx.Weight} } return &MinimalQueueRepository{queues: queues} } type MinimalQueue struct { - allocation schedulerobjects.ResourceList + allocation internaltypes.ResourceList weight float64 } -func (q MinimalQueue) GetAllocation() schedulerobjects.ResourceList { +func (q MinimalQueue) GetAllocation() internaltypes.ResourceList { return q.allocation } diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index 1d2da7c4dcf..eb0a3bb05ad 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -29,6 +29,7 @@ import ( "github.com/armadaproject/armada/internal/scheduler/scheduling/context" "github.com/armadaproject/armada/internal/scheduler/scheduling/fairness" "github.com/armadaproject/armada/internal/scheduler/testfixtures" + "github.com/armadaproject/armada/pkg/api" ) type testQueueContextChecker struct { @@ -2048,17 +2049,25 @@ func TestPreemptingQueueScheduler(t *testing.T) { for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor + queueDemand := testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown(demandByQueue[queue].Resources) err := sctx.AddQueueSchedulingContext( queue, weight, - allocatedByQueueAndPriorityClass[queue], - demandByQueue[queue], - demandByQueue[queue], + internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory), + queueDemand, + queueDemand, limiterByQueue[queue], ) require.NoError(t, err) } - constraints := schedulerconstraints.NewSchedulingConstraints("pool", totalResources, tc.SchedulingConfig, nil) + constraints := schedulerconstraints.NewSchedulingConstraints( + "pool", + totalResources, + tc.SchedulingConfig, + armadaslices.Map( + maps.Keys(tc.PriorityFactorByQueue), + func(qn string) *api.Queue { return &api.Queue{Name: qn} }, + )) sctx.UpdateFairShares() sch := NewPreemptingQueueScheduler( sctx, @@ -2104,7 +2113,8 @@ func TestPreemptingQueueScheduler(t *testing.T) { ) } for queue, qctx := range sctx.QueueSchedulingContexts { - assert.True(t, qctx.AllocatedByPriorityClass.Equal(allocatedByQueueAndPriorityClass[queue])) + m := internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory) + assert.Equal(t, internaltypes.RlMapRemoveZeros(m), internaltypes.RlMapRemoveZeros(qctx.AllocatedByPriorityClass)) } // Test that jobs are mapped to nodes correctly. @@ -2404,11 +2414,19 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { ) for queue, priorityFactor := range priorityFactorByQueue { weight := 1 / priorityFactor - err := sctx.AddQueueSchedulingContext(queue, weight, make(schedulerobjects.QuantityByTAndResourceType[string]), - schedulerobjects.NewResourceList(0), schedulerobjects.NewResourceList(0), limiterByQueue[queue]) + err := sctx.AddQueueSchedulingContext(queue, weight, make(map[string]internaltypes.ResourceList), + internaltypes.ResourceList{}, internaltypes.ResourceList{}, limiterByQueue[queue]) require.NoError(b, err) } - constraints := schedulerconstraints.NewSchedulingConstraints(testfixtures.TestPool, nodeDb.TotalKubernetesResources(), tc.SchedulingConfig, nil) + constraints := schedulerconstraints.NewSchedulingConstraints( + testfixtures.TestPool, + nodeDb.TotalKubernetesResources(), + tc.SchedulingConfig, + armadaslices.Map( + maps.Keys(priorityFactorByQueue), + func(qn string) *api.Queue { return &api.Queue{Name: qn} }, + ), + ) sch := NewPreemptingQueueScheduler( sctx, constraints, @@ -2468,7 +2486,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { for queue, priorityFactor := range priorityFactorByQueue { weight := 1 / priorityFactor err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue], - schedulerobjects.NewResourceList(0), schedulerobjects.NewResourceList(0), limiterByQueue[queue]) + internaltypes.ResourceList{}, internaltypes.ResourceList{}, limiterByQueue[queue]) require.NoError(b, err) } sch := NewPreemptingQueueScheduler( diff --git a/internal/scheduler/scheduling/queue_scheduler.go b/internal/scheduler/scheduling/queue_scheduler.go index 6938922051a..96969993ede 100644 --- a/internal/scheduler/scheduling/queue_scheduler.go +++ b/internal/scheduler/scheduling/queue_scheduler.go @@ -12,6 +12,7 @@ import ( "github.com/armadaproject/armada/internal/common/armadacontext" armadamaps "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/scheduler/floatingresources" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/scheduling/constraints" @@ -142,11 +143,11 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul stats.LastGangScheduledQueuePosition = loopNumber queue, queueOK := sch.candidateGangIterator.queueRepository.GetQueue(gctx.Queue) if queueOK { - stats.LastGangScheduledResources = gctx.TotalResourceRequests.DeepCopy() - stats.LastGangScheduledQueueResources = queue.GetAllocation().DeepCopy() + stats.LastGangScheduledResources = gctx.TotalResourceRequests + stats.LastGangScheduledQueueResources = queue.GetAllocation() } else { - stats.LastGangScheduledResources = schedulerobjects.NewResourceListWithDefaultSize() - stats.LastGangScheduledQueueResources = schedulerobjects.NewResourceListWithDefaultSize() + stats.LastGangScheduledResources = internaltypes.ResourceList{} + stats.LastGangScheduledQueueResources = internaltypes.ResourceList{} } } @@ -179,8 +180,8 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul s.LastGangScheduledSampleJobId, s.LastGangScheduledQueuePosition, s.LastGangScheduledQueueCost, - s.LastGangScheduledResources.CompactString(), - s.LastGangScheduledQueueResources.CompactString(), + s.LastGangScheduledResources.String(), + s.LastGangScheduledQueueResources.String(), s.Time.Seconds()) })) @@ -331,8 +332,6 @@ type CandidateGangIterator struct { // If, e.g., onlyYieldEvictedByQueue["A"] is true, // this iterator only yields gangs where all jobs are evicted for queue A. onlyYieldEvictedByQueue map[string]bool - // Reusable buffer to avoid allocations. - buffer schedulerobjects.ResourceList // Priority queue containing per-queue iterators. // Determines the order in which queues are processed. pq QueueCandidateGangIteratorPQ @@ -350,7 +349,6 @@ func NewCandidateGangIterator( queueRepository: queueRepository, fairnessCostProvider: fairnessCostProvider, onlyYieldEvictedByQueue: make(map[string]bool), - buffer: schedulerobjects.NewResourceListWithDefaultSize(), pq: QueueCandidateGangIteratorPQ{ considerPriority: considerPriority, items: make([]*QueueCandidateGangIteratorItem, 0, len(iteratorsByQueue)), @@ -485,10 +483,8 @@ func (it *CandidateGangIterator) queueCostWithGctx(gctx *schedulercontext.GangSc if !ok { return 0, errors.Errorf("unknown queue %s", gangQueue) } - it.buffer.Zero() - it.buffer.Add(queue.GetAllocation()) - it.buffer.Add(gctx.TotalResourceRequests) - return it.fairnessCostProvider.WeightedCostFromAllocation(it.buffer, queue.GetWeight()), nil + + return it.fairnessCostProvider.WeightedCostFromAllocation(queue.GetAllocation().Add(gctx.TotalResourceRequests), queue.GetWeight()), nil } // QueueCandidateGangIteratorPQ is a priority queue used by CandidateGangIterator to determine from which queue to schedule the next job. diff --git a/internal/scheduler/scheduling/queue_scheduler_test.go b/internal/scheduler/scheduling/queue_scheduler_test.go index c8e002422f0..f1ef18c9faa 100644 --- a/internal/scheduler/scheduling/queue_scheduler_test.go +++ b/internal/scheduler/scheduling/queue_scheduler_test.go @@ -16,6 +16,7 @@ import ( armadaslices "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/scheduler/configuration" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -484,7 +485,9 @@ func TestQueueScheduler(t *testing.T) { txn.Commit() if tc.TotalResources.Resources == nil { // Default to NodeDb total. - tc.TotalResources = nodeDb.TotalKubernetesResources() + tc.TotalResources = schedulerobjects.ResourceList{ + Resources: nodeDb.TotalKubernetesResources().ToMap(), + } } queueNameToQueue := map[string]*api.Queue{} @@ -504,8 +507,9 @@ func TestQueueScheduler(t *testing.T) { context.JobSchedulingContextsFromJobs(tc.Jobs), ) + totalResources := testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown(tc.TotalResources.Resources) fairnessCostProvider, err := fairness.NewDominantResourceFairness( - tc.TotalResources, + totalResources, tc.SchedulingConfig, ) require.NoError(t, err) @@ -516,15 +520,18 @@ func TestQueueScheduler(t *testing.T) { rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), tc.SchedulingConfig.MaximumSchedulingBurst, ), - tc.TotalResources, + totalResources, ) for _, q := range tc.Queues { weight := 1.0 / float64(q.PriorityFactor) err := sctx.AddQueueSchedulingContext( q.Name, weight, - tc.InitialAllocatedByQueueAndPriorityClass[q.Name], - schedulerobjects.NewResourceList(0), - schedulerobjects.NewResourceList(0), + internaltypes.RlMapFromJobSchedulerObjects( + tc.InitialAllocatedByQueueAndPriorityClass[q.Name], + testfixtures.TestResourceListFactory, + ), + internaltypes.ResourceList{}, + internaltypes.ResourceList{}, rate.NewLimiter( rate.Limit(tc.SchedulingConfig.MaximumPerQueueSchedulingRate), tc.SchedulingConfig.MaximumPerQueueSchedulingBurst, @@ -532,7 +539,7 @@ func TestQueueScheduler(t *testing.T) { ) require.NoError(t, err) } - constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, tc.Queues) + constraints := schedulerconstraints.NewSchedulingConstraints("pool", totalResources, tc.SchedulingConfig, tc.Queues) jobIteratorByQueue := make(map[string]JobContextIterator) for _, q := range tc.Queues { it := jobRepo.GetJobIterator(q.Name) diff --git a/internal/scheduler/scheduling/result.go b/internal/scheduler/scheduling/result.go index a8669a6d6a2..5bf209880fa 100644 --- a/internal/scheduler/scheduling/result.go +++ b/internal/scheduler/scheduling/result.go @@ -3,8 +3,8 @@ package scheduling import ( "time" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduler/scheduling/context" ) @@ -18,8 +18,8 @@ type QueueStats struct { LastGangScheduledSampleJobId string LastGangScheduledQueuePosition int LastGangScheduledQueueCost float64 - LastGangScheduledResources schedulerobjects.ResourceList - LastGangScheduledQueueResources schedulerobjects.ResourceList + LastGangScheduledResources internaltypes.ResourceList + LastGangScheduledQueueResources internaltypes.ResourceList Time time.Duration } diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index f4b29b18724..01ad8e37773 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -128,8 +128,8 @@ func (l *FairSchedulingAlgo) Schedule( ctx.Infof("Scheduling on pool %s with capacity %s %s", pool, - fsctx.nodeDb.TotalKubernetesResources().CompactString(), - l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name).CompactString(), + fsctx.nodeDb.TotalKubernetesResources().String(), + l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name).String(), ) start := time.Now() @@ -182,10 +182,6 @@ type FairSchedulingAlgoContext struct { Txn *jobdb.Txn } -func (l *FairSchedulingAlgo) NewFairSchedulingAlgoContext(ctx *armadacontext.Context, txn *jobdb.Txn, pool configuration.PoolConfig) (*FairSchedulingAlgoContext, error) { - return l.newFairSchedulingAlgoContext(ctx, txn, pool) -} - func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Context, txn *jobdb.Txn, pool configuration.PoolConfig) (*FairSchedulingAlgoContext, error) { executors, err := l.executorRepository.GetExecutors(ctx) if err != nil { @@ -281,12 +277,12 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con } totalResources := nodeDb.TotalKubernetesResources() - totalResources = l.floatingResourceTypes.AddTotalAvailableForPool(pool.Name, totalResources) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name)) schedulingContext, err := l.constructSchedulingContext( pool.Name, totalResources, - jobSchedulingInfo.demandByQueue, + jobSchedulingInfo.demandByQueueAndPriorityClass, jobSchedulingInfo.allocatedByQueueAndPriorityClass, jobSchedulingInfo.awayAllocatedByQueueAndPriorityClass, queueByName) @@ -312,9 +308,9 @@ type jobSchedulingInfo struct { nodeIdByJobId map[string]string jobIdsByGangId map[string]map[string]bool gangIdByJobId map[string]string - demandByQueue map[string]schedulerobjects.QuantityByTAndResourceType[string] - allocatedByQueueAndPriorityClass map[string]schedulerobjects.QuantityByTAndResourceType[string] - awayAllocatedByQueueAndPriorityClass map[string]schedulerobjects.QuantityByTAndResourceType[string] + demandByQueueAndPriorityClass map[string]map[string]internaltypes.ResourceList + allocatedByQueueAndPriorityClass map[string]map[string]internaltypes.ResourceList + awayAllocatedByQueueAndPriorityClass map[string]map[string]internaltypes.ResourceList } func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet map[string]bool, @@ -325,9 +321,9 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m nodeIdByJobId := make(map[string]string) jobIdsByGangId := make(map[string]map[string]bool) gangIdByJobId := make(map[string]string) - demandByQueue := make(map[string]schedulerobjects.QuantityByTAndResourceType[string]) - allocatedByQueueAndPriorityClass := make(map[string]schedulerobjects.QuantityByTAndResourceType[string]) - awayAllocatedByQueueAndPriorityClass := make(map[string]schedulerobjects.QuantityByTAndResourceType[string]) + demandByQueueAndPriorityClass := make(map[string]map[string]internaltypes.ResourceList) + allocatedByQueueAndPriorityClass := make(map[string]map[string]internaltypes.ResourceList) + awayAllocatedByQueueAndPriorityClass := make(map[string]map[string]internaltypes.ResourceList) for _, job := range jobs { if job.InTerminalState() { @@ -361,19 +357,15 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m } if slices.Contains(pools, currentPool) { - queueResources, ok := demandByQueue[job.Queue()] + queueResources, ok := demandByQueueAndPriorityClass[job.Queue()] if !ok { - queueResources = schedulerobjects.QuantityByTAndResourceType[string]{} - demandByQueue[job.Queue()] = queueResources + queueResources = map[string]internaltypes.ResourceList{} + demandByQueueAndPriorityClass[job.Queue()] = queueResources } // Queued jobs should not be considered for paused queues, so demand := running if !queue.Cordoned || !job.Queued() { - pcResources, ok := queueResources[job.PriorityClassName()] - if !ok { - pcResources = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) - queueResources[job.PriorityClassName()] = pcResources - } - pcResources.AddV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) + pcName := job.PriorityClassName() + queueResources[pcName] = queueResources[pcName].Add(job.AllResourceRequirements()) } } @@ -396,17 +388,17 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m if pool == currentPool { allocation := allocatedByQueueAndPriorityClass[queue.Name] if allocation == nil { - allocation = make(schedulerobjects.QuantityByTAndResourceType[string]) + allocation = make(map[string]internaltypes.ResourceList) allocatedByQueueAndPriorityClass[queue.Name] = allocation } - allocation.AddV1ResourceList(job.PriorityClassName(), job.ResourceRequirements().Requests) + allocation[job.PriorityClassName()] = allocation[job.PriorityClassName()].Add(job.AllResourceRequirements()) } else if slices.Contains(awayAllocationPools, pool) { awayAllocation := awayAllocatedByQueueAndPriorityClass[queue.Name] if awayAllocation == nil { - awayAllocation = make(schedulerobjects.QuantityByTAndResourceType[string]) + awayAllocation = make(map[string]internaltypes.ResourceList) awayAllocatedByQueueAndPriorityClass[queue.Name] = awayAllocation } - awayAllocation.AddV1ResourceList(job.PriorityClassName(), job.ResourceRequirements().Requests) + awayAllocation[job.PriorityClassName()] = awayAllocation[job.PriorityClassName()].Add(job.AllResourceRequirements()) } } if _, present := jobsByPool[pool]; !present { @@ -436,7 +428,7 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m nodeIdByJobId: nodeIdByJobId, jobIdsByGangId: jobIdsByGangId, gangIdByJobId: gangIdByJobId, - demandByQueue: demandByQueue, + demandByQueueAndPriorityClass: demandByQueueAndPriorityClass, allocatedByQueueAndPriorityClass: allocatedByQueueAndPriorityClass, awayAllocatedByQueueAndPriorityClass: awayAllocatedByQueueAndPriorityClass, }, nil @@ -463,10 +455,10 @@ func (l *FairSchedulingAlgo) constructNodeDb(homeJobs []*jobdb.Job, awayJobs []* func (l *FairSchedulingAlgo) constructSchedulingContext( pool string, - totalCapacity schedulerobjects.ResourceList, - demandByQueue map[string]schedulerobjects.QuantityByTAndResourceType[string], - allocationByQueueAndPriorityClass map[string]schedulerobjects.QuantityByTAndResourceType[string], - awayAllocationByQueueAndPriorityClass map[string]schedulerobjects.QuantityByTAndResourceType[string], + totalCapacity internaltypes.ResourceList, + demandByQueueAndPriorityClass map[string]map[string]internaltypes.ResourceList, + allocationByQueueAndPriorityClass map[string]map[string]internaltypes.ResourceList, + awayAllocationByQueueAndPriorityClass map[string]map[string]internaltypes.ResourceList, queues map[string]*api.Queue, ) (*schedulercontext.SchedulingContext, error) { fairnessCostProvider, err := fairness.NewDominantResourceFairness(totalCapacity, l.schedulingConfig) @@ -477,16 +469,16 @@ func (l *FairSchedulingAlgo) constructSchedulingContext( constraints := schedulerconstraints.NewSchedulingConstraints(pool, totalCapacity, l.schedulingConfig, maps.Values(queues)) for _, queue := range queues { - demand, hasDemand := demandByQueue[queue.Name] + demand, hasDemand := demandByQueueAndPriorityClass[queue.Name] if !hasDemand { // To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running. continue } cappedDemand := constraints.CapResources(queue.Name, demand) - var allocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] - if allocatedByQueueAndPriorityClass := allocationByQueueAndPriorityClass; allocatedByQueueAndPriorityClass != nil { - allocatedByPriorityClass = allocatedByQueueAndPriorityClass[queue.Name] + var allocatedByPriorityClass map[string]internaltypes.ResourceList + if allocationByQueueAndPriorityClass != nil { + allocatedByPriorityClass = allocationByQueueAndPriorityClass[queue.Name] } var weight float64 = 1 if queue.PriorityFactor > 0 { @@ -502,7 +494,7 @@ func (l *FairSchedulingAlgo) constructSchedulingContext( l.limiterByQueue[queue.Name] = queueLimiter } - if err := sctx.AddQueueSchedulingContext(queue.Name, weight, allocatedByPriorityClass, demand.AggregateByResource(), cappedDemand.AggregateByResource(), queueLimiter); err != nil { + if err := sctx.AddQueueSchedulingContext(queue.Name, weight, allocatedByPriorityClass, internaltypes.RlMapSumValues(demand), internaltypes.RlMapSumValues(cappedDemand), queueLimiter); err != nil { return nil, err } } @@ -519,7 +511,7 @@ func (l *FairSchedulingAlgo) constructSchedulingContext( weight = 1 / queue.PriorityFactor } - if err := sctx.AddQueueSchedulingContext(schedulercontext.CalculateAwayQueueName(queue.Name), weight, allocation, schedulerobjects.NewResourceList(0), schedulerobjects.NewResourceList(0), nil); err != nil { + if err := sctx.AddQueueSchedulingContext(schedulercontext.CalculateAwayQueueName(queue.Name), weight, allocation, internaltypes.ResourceList{}, internaltypes.ResourceList{}, nil); err != nil { return nil, err } } @@ -536,7 +528,8 @@ func (l *FairSchedulingAlgo) SchedulePool( pool string, ) (*SchedulerResult, *schedulercontext.SchedulingContext, error) { totalResources := fsctx.nodeDb.TotalKubernetesResources() - totalResources = l.floatingResourceTypes.AddTotalAvailableForPool(pool, totalResources) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool)) + constraints := schedulerconstraints.NewSchedulingConstraints(pool, totalResources, l.schedulingConfig, maps.Values(fsctx.queues)) scheduler := NewPreemptingQueueScheduler( diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index aec3a3d55c2..dec3c5d406d 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -53,11 +53,11 @@ type accounting struct { nodeDbByPool map[string]*nodedb.NodeDb // Allocation by pool for each queue and priority class. // Stored across invocations of the scheduler. - allocationByPoolAndQueueAndPriorityClass map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string] + allocationByPoolAndQueueAndPriorityClass map[string]map[string]map[string]internaltypes.ResourceList // Demand for each queue - demandByQueue map[string]schedulerobjects.ResourceList + demandByQueue map[string]internaltypes.ResourceList // Total resources across all executorGroups for each pool. - totalResourcesByPool map[string]schedulerobjects.ResourceList + totalResourcesByPool map[string]internaltypes.ResourceList // Mapping of job Id -> nodeId. Needed by preemptingqueuescheduler for gang preemption. nodeIdByJobId map[string]string // Mapping of gangId -> jobsINGang. Needed by preemptingqueuescheduler for gang preemption. @@ -132,7 +132,7 @@ func NewSimulator( return nil, errors.WithMessage(err, "Error with the .scheduling.supportedResourceTypes field in config") } - floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(schedulingConfig.ExperimentalFloatingResources) + floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(schedulingConfig.ExperimentalFloatingResources, resourceListFactory) if err != nil { return nil, err } @@ -176,9 +176,9 @@ func NewSimulator( accounting: accounting{ nodeDbByPool: make(map[string]*nodedb.NodeDb), poolByNodeId: make(map[string]string), - allocationByPoolAndQueueAndPriorityClass: make(map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string]), - demandByQueue: make(map[string]schedulerobjects.ResourceList), - totalResourcesByPool: make(map[string]schedulerobjects.ResourceList), + allocationByPoolAndQueueAndPriorityClass: make(map[string]map[string]map[string]internaltypes.ResourceList), + demandByQueue: make(map[string]internaltypes.ResourceList), + totalResourcesByPool: make(map[string]internaltypes.ResourceList), nodeIdByJobId: make(map[string]string), jobIdsByGangId: make(map[string]map[string]bool), gangIdByJobId: make(map[string]string), @@ -338,11 +338,6 @@ func (s *Simulator) setupClusters() error { s.accounting.nodeDbByPool[cluster.Pool] = nodeDb } - totalResourcesForPool, ok := s.accounting.totalResourcesByPool[cluster.Pool] - if !ok { - totalResourcesForPool = schedulerobjects.ResourceList{} - } - for nodeTemplateIndex, nodeTemplate := range cluster.NodeTemplates { labels := map[string]string{} if nodeTemplate.Labels != nil { @@ -378,9 +373,13 @@ func (s *Simulator) setupClusters() error { s.accounting.poolByNodeId[nodeId] = cluster.Pool } } - totalResourcesForPool.Add(nodeDb.TotalKubernetesResources()) - s.accounting.totalResourcesByPool[cluster.Pool] = totalResourcesForPool + } + + for pool, nodeDb := range s.accounting.nodeDbByPool { + s.accounting.totalResourcesByPool[pool] = nodeDb.TotalKubernetesResources() + } + return nil } @@ -898,10 +897,8 @@ func (s *Simulator) handleJobSucceeded(txn *jobdb.Txn, e *armadaevents.JobSuccee // Subtract the allocation of this job from the queue allocation. run := job.LatestRun() pool := s.accounting.poolByNodeId[run.NodeId()] - s.accounting.allocationByPoolAndQueueAndPriorityClass[pool][job.Queue()].SubV1ResourceList( - job.PriorityClassName(), - job.ResourceRequirements().Requests, - ) + allocByPc := s.accounting.allocationByPoolAndQueueAndPriorityClass[pool][job.Queue()] + allocByPc[job.PriorityClassName()] = allocByPc[job.PriorityClassName()].Subtract(job.AllResourceRequirements()) s.removeJobFromDemand(job) // Unbind the job from the node on which it was scheduled. @@ -1036,19 +1033,11 @@ func maxTime(a, b time.Time) time.Time { } func (s *Simulator) addJobToDemand(job *jobdb.Job) { - r, ok := s.accounting.demandByQueue[job.Queue()] - if !ok { - r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) - s.accounting.demandByQueue[job.Queue()] = r - } - r.AddV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) + s.accounting.demandByQueue[job.Queue()] = s.accounting.demandByQueue[job.Queue()].Add(job.AllResourceRequirements()) } func (s *Simulator) removeJobFromDemand(job *jobdb.Job) { - r, ok := s.accounting.demandByQueue[job.Queue()] - if ok { - r.SubV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) - } + s.accounting.demandByQueue[job.Queue()] = s.accounting.demandByQueue[job.Queue()].Subtract(job.AllResourceRequirements()) } func expandRepeatingTemplates(w *WorkloadSpec) *WorkloadSpec { diff --git a/internal/scheduler/simulator/sink/queue_stats_writer.go b/internal/scheduler/simulator/sink/queue_stats_writer.go index 07b575e6509..495d7d016bc 100644 --- a/internal/scheduler/simulator/sink/queue_stats_writer.go +++ b/internal/scheduler/simulator/sink/queue_stats_writer.go @@ -90,12 +90,12 @@ func (j *QueueStatsWriter) Close(ctx *armadacontext.Context) { } func calculateResourceShare(sctx *context.SchedulingContext, qctx *context.QueueSchedulingContext, resource string) float64 { - total := sctx.Allocated.Resources[resource] - allocated := qctx.Allocated.Resources[resource] + total := sctx.Allocated.GetResourceByNameZeroIfMissing(resource) + allocated := qctx.Allocated.GetResourceByNameZeroIfMissing(resource) return allocated.AsApproximateFloat64() / total.AsApproximateFloat64() } func allocatedResources(qctx *context.QueueSchedulingContext, resource string) int { - allocated := qctx.Allocated.Resources[resource] + allocated := qctx.Allocated.GetResourceByNameZeroIfMissing(resource) return int(allocated.AsApproximateFloat64()) } diff --git a/internal/scheduler/simulator/test_utils.go b/internal/scheduler/simulator/test_utils.go index 38e7a3d976f..d55b6437103 100644 --- a/internal/scheduler/simulator/test_utils.go +++ b/internal/scheduler/simulator/test_utils.go @@ -13,7 +13,6 @@ import ( "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" - "github.com/armadaproject/armada/internal/scheduler/scheduling/constraints" "github.com/armadaproject/armada/pkg/armadaevents" ) @@ -88,19 +87,6 @@ func GetBasicSchedulingConfig() configuration.SchedulingConfig { } } -// TotalResources returns the total resources available across all nodes in the ClusterSpec. -func (cs *ClusterSpec) TotalResources() schedulerobjects.ResourceList { - total := schedulerobjects.NewResourceListWithDefaultSize() - for _, cluster := range cs.Clusters { - for _, nt := range cluster.NodeTemplates { - for t, q := range nt.TotalResources.Resources { - total.AddQuantity(t, constraints.ScaleQuantity(q, float64(nt.Number))) - } - } - } - return total -} - func NodeTemplate32Cpu(n int64) *NodeTemplate { return &NodeTemplate{ Number: n, diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 3b869f2ef49..e18c5c35073 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -947,7 +947,7 @@ func MakeTestResourceListFactory() *internaltypes.ResourceListFactory { } func MakeTestFloatingResourceTypes(config []schedulerconfiguration.FloatingResourceConfig) *floatingresources.FloatingResourceTypes { - result, _ := floatingresources.NewFloatingResourceTypes(config) + result, _ := floatingresources.NewFloatingResourceTypes(config, TestResourceListFactory) return result }