Skip to content

Commit

Permalink
Node and queue auto-quarantining (#3451)
Browse files Browse the repository at this point in the history
* Correctly pass through priority to schedule pods at in nodeDb

* Pass through tolerations after eviction

* Restore temporary workaround

* mage proto

* mage proto

* Remove unused proto import

* Add node and queue quarantining

* Comments

* Comments

* Lint
  • Loading branch information
severinson authored Mar 14, 2024
1 parent 6d1e917 commit be3b40c
Show file tree
Hide file tree
Showing 10 changed files with 457 additions and 80 deletions.
8 changes: 7 additions & 1 deletion config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,15 @@ scheduling:
maxUnacknowledgedJobsPerExecutor: 2500
alwaysAttemptScheduling: false
executorUpdateFrequency: "1m"
failureEstimatorConfig:
failureProbabilityEstimation:
# Optimised default parameters.
numInnerIterations: 10
innerOptimiserStepSize: 0.05
outerOptimiserStepSize: 0.05
outerOptimiserNesterovAcceleration: 0.2
nodeQuarantining:
failureProbabilityQuarantineThreshold: 0.95
failureProbabilityEstimateTimeout: "10m"
queueQuarantining:
quarantineFactorMultiplier: 0.5 # At most halve the scheduling rate of misbehaving queues.
failureProbabilityEstimateTimeout: "10m"
26 changes: 22 additions & 4 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,12 @@ type SchedulingConfig struct {
AlwaysAttemptScheduling bool
// The frequency at which the scheduler updates the cluster state.
ExecutorUpdateFrequency time.Duration
// Controls node and queue failure probability estimation.
FailureEstimatorConfig FailureEstimatorConfig
// Controls node and queue success probability estimation.
FailureProbabilityEstimation FailureEstimatorConfig
// Controls node quarantining, i.e., removing from consideration for scheduling misbehaving nodes.
NodeQuarantining NodeQuarantinerConfig
// Controls queue quarantining, i.e., rate-limiting scheduling from misbehaving queues.
QueueQuarantining QueueQuarantinerConfig
}

const (
Expand Down Expand Up @@ -310,8 +314,8 @@ type WellKnownNodeType struct {
Taints []v1.Taint
}

// FailureEstimatorConfig contains config controlling node and queue success probability estimation.
// See the internal/scheduler/failureestimator package for details.
// FailureEstimatorConfig controls node and queue success probability estimation.
// See internal/scheduler/failureestimator.go for details.
type FailureEstimatorConfig struct {
Disabled bool
NumInnerIterations int `validate:"gt=0"`
Expand All @@ -320,6 +324,20 @@ type FailureEstimatorConfig struct {
OuterOptimiserNesterovAcceleration float64 `validate:"gte=0"`
}

// NodeQuarantinerConfig controls how nodes are quarantined, i.e., removed from consideration when scheduling new jobs.
// See internal/scheduler/quarantine/node_quarantiner.go for details.
type NodeQuarantinerConfig struct {
FailureProbabilityQuarantineThreshold float64 `validate:"gte=0,lte=1"`
FailureProbabilityEstimateTimeout time.Duration `validate:"gte=0"`
}

// QueueQuarantinerConfig controls how scheduling from misbehaving queues is rate-limited.
// See internal/scheduler/quarantine/queue_quarantiner.go for details.
type QueueQuarantinerConfig struct {
QuarantineFactorMultiplier float64 `validate:"gte=0,lte=1"`
FailureProbabilityEstimateTimeout time.Duration `validate:"gte=0"`
}

// TODO: we can probably just typedef this to map[string]string
type PostgresConfig struct {
Connection map[string]string
Expand Down
133 changes: 94 additions & 39 deletions internal/scheduler/failureestimator/failureestimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"
"sync"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -55,15 +56,12 @@ type FailureEstimator struct {
gradient *mat.VecDense

// Maps node (queue) names to the corresponding index of parameters.
// E.g., if parameterIndexByNode["myNode"] = 10, then parameters[10] is the estimated success probability of myNode.
parameterIndexByNode map[string]int
parameterIndexByQueue map[string]int

// Maps node names to the cluster they belong to.
clusterByNode map[string]string
// E.g., if nodeByName["myNode"].parameterIndex = 10, then parameters[10] is the estimated success probability of myNode.
nodeByName map[string]node
queueByName map[string]queue

// Samples that have not been processed yet.
samples []Sample
samples []sample

// Optimisation settings.
numInnerIterations int
Expand All @@ -82,7 +80,18 @@ type FailureEstimator struct {
mu sync.Mutex
}

type Sample struct {
type node struct {
parameterIndex int
cluster string
timeOfMostRecentSample time.Time
}

type queue struct {
parameterIndex int
timeOfMostRecentSample time.Time
}

type sample struct {
i int
j int
c bool
Expand All @@ -106,10 +115,8 @@ func New(
intermediateParameters: mat.NewVecDense(32, armadaslices.Zeros[float64](32)),
gradient: mat.NewVecDense(32, armadaslices.Zeros[float64](32)),

parameterIndexByNode: make(map[string]int, 16),
parameterIndexByQueue: make(map[string]int, 16),

clusterByNode: make(map[string]string),
nodeByName: make(map[string]node, 16),
queueByName: make(map[string]queue, 16),

numInnerIterations: numInnerIterations,
innerOptimiser: innerOptimiser,
Expand Down Expand Up @@ -146,25 +153,34 @@ func (fe *FailureEstimator) IsDisabled() bool {

// Push adds a sample to the internal buffer of the failure estimator.
// Samples added via Push are processed on the next call to Update.
func (fe *FailureEstimator) Push(node, queue, cluster string, success bool) {
// The timestamp t should be the time at which the success or failure happened.
func (fe *FailureEstimator) Push(nodeName, queueName, clusterName string, success bool, t time.Time) {
fe.mu.Lock()
defer fe.mu.Unlock()

fe.clusterByNode[node] = cluster
i, ok := fe.parameterIndexByNode[node]
node, ok := fe.nodeByName[nodeName]
if !ok {
i = len(fe.parameterIndexByNode) + len(fe.parameterIndexByQueue)
fe.parameterIndexByNode[node] = i
node.parameterIndex = len(fe.nodeByName) + len(fe.queueByName)
}
node.cluster = clusterName
if node.timeOfMostRecentSample.Compare(t) == -1 {
node.timeOfMostRecentSample = t
}
j, ok := fe.parameterIndexByQueue[queue]
fe.nodeByName[nodeName] = node

queue, ok := fe.queueByName[queueName]
if !ok {
j = len(fe.parameterIndexByNode) + len(fe.parameterIndexByQueue)
fe.parameterIndexByQueue[queue] = j
queue.parameterIndex = len(fe.nodeByName) + len(fe.queueByName)
}
fe.extendParameters(armadamath.Max(i, j) + 1)
fe.samples = append(fe.samples, Sample{
i: i,
j: j,
if queue.timeOfMostRecentSample.Compare(t) == -1 {
queue.timeOfMostRecentSample = t
}
fe.queueByName[queueName] = queue

fe.extendParameters(armadamath.Max(node.parameterIndex, queue.parameterIndex) + 1)
fe.samples = append(fe.samples, sample{
i: node.parameterIndex,
j: queue.parameterIndex,
c: success,
})
}
Expand Down Expand Up @@ -259,6 +275,54 @@ func (fe *FailureEstimator) negLogLikelihoodGradient(nodeSuccessProbability, que
}
}

// FailureProbabilityFromNodeName returns the failure probability estimate of the named node
// and the timestamp of the most recent success or failure observed for this node.
// The most recent sample may not be reflected in the estimate if Update has not been called since the last call to Push.
// If there is no estimate for nodeName, the final return value is false.
func (fe *FailureEstimator) FailureProbabilityFromNodeName(nodeName string) (float64, time.Time, bool) {
node, ok := fe.nodeByName[nodeName]
if !ok {
return 0, time.Time{}, false
}
return 1 - fe.parameters.AtVec(node.parameterIndex), node.timeOfMostRecentSample, true
}

// FailureProbabilityFromQueueName returns the failure probability estimate of the named queue
// and the timestamp of the most recent success or failure observed for this queue.
// The most recent sample may not be reflected in the estimate if Update has not been called since the last call to Push.
// If there is no estimate for queueName, the final return value is false.
func (fe *FailureEstimator) FailureProbabilityFromQueueName(queueName string) (float64, time.Time, bool) {
queue, ok := fe.nodeByName[queueName]
if !ok {
return 0, time.Time{}, false
}
return 1 - fe.parameters.AtVec(queue.parameterIndex), queue.timeOfMostRecentSample, true
}

func (fe *FailureEstimator) ApplyNodes(f func(nodeName, cluster string, failureProbability float64, timeOfLastUpdate time.Time)) {
fe.mu.Lock()
defer fe.mu.Unlock()
for nodeName, node := range fe.nodeByName {
// Report failure probability rounded to nearest multiple of 0.01.
// (As it's unlikely the estimate is accurate to within less than this.)
failureProbability := 1 - fe.parameters.AtVec(node.parameterIndex)
failureProbability = math.Round(failureProbability*100) / 100
f(nodeName, node.cluster, failureProbability, node.timeOfMostRecentSample)
}
}

func (fe *FailureEstimator) ApplyQueues(f func(queueName string, failureProbability float64, timeOfLastUpdate time.Time)) {
fe.mu.Lock()
defer fe.mu.Unlock()
for queueName, queue := range fe.queueByName {
// Report failure probability rounded to nearest multiple of 0.01.
// (As it's unlikely the estimate is accurate to within less than this.)
failureProbability := 1 - fe.parameters.AtVec(queue.parameterIndex)
failureProbability = math.Round(failureProbability*100) / 100
f(queueName, failureProbability, queue.timeOfMostRecentSample)
}
}

func (fe *FailureEstimator) Describe(ch chan<- *prometheus.Desc) {
if fe.IsDisabled() {
return
Expand All @@ -271,19 +335,10 @@ func (fe *FailureEstimator) Collect(ch chan<- prometheus.Metric) {
if fe.IsDisabled() {
return
}
fe.mu.Lock()
defer fe.mu.Unlock()

// Report failure probability rounded to nearest multiple of 0.01.
// (As it's unlikely the estimate is accurate to within less than this.)
for k, i := range fe.parameterIndexByNode {
failureProbability := 1 - fe.parameters.AtVec(i)
failureProbability = math.Round(failureProbability*100) / 100
ch <- prometheus.MustNewConstMetric(fe.failureProbabilityByNodeDesc, prometheus.GaugeValue, failureProbability, k, fe.clusterByNode[k])
}
for k, j := range fe.parameterIndexByQueue {
failureProbability := 1 - fe.parameters.AtVec(j)
failureProbability = math.Round(failureProbability*100) / 100
ch <- prometheus.MustNewConstMetric(fe.failureProbabilityByQueueDesc, prometheus.GaugeValue, failureProbability, k)
}
fe.ApplyNodes(func(nodeName, cluster string, failureProbability float64, timeOfLastUpdate time.Time) {
ch <- prometheus.MustNewConstMetric(fe.failureProbabilityByNodeDesc, prometheus.GaugeValue, failureProbability, nodeName, cluster)
})
fe.ApplyQueues(func(queueName string, failureProbability float64, timeOfLastUpdate time.Time) {
ch <- prometheus.MustNewConstMetric(fe.failureProbabilityByQueueDesc, prometheus.GaugeValue, failureProbability, queueName)
})
}
33 changes: 19 additions & 14 deletions internal/scheduler/failureestimator/failureestimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package failureestimator
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -20,25 +21,29 @@ func TestUpdate(t *testing.T) {
require.NoError(t, err)

// Test initialisation.
fe.Push("node", "queue", "cluster", false)
nodeParameterIndex, ok := fe.parameterIndexByNode["node"]
now := time.Now()
fe.Push("node", "queue", "cluster", false, now)
node, ok := fe.nodeByName["node"]
require.True(t, ok)
queueParameterIndex, ok := fe.parameterIndexByQueue["queue"]
queue, ok := fe.queueByName["queue"]
require.True(t, ok)
require.Equal(t, 0, nodeParameterIndex)
require.Equal(t, 1, queueParameterIndex)
require.Equal(t, 0, node.parameterIndex)
require.Equal(t, 1, queue.parameterIndex)
require.Equal(t, 0.5, fe.parameters.AtVec(0))
require.Equal(t, 0.5, fe.parameters.AtVec(1))
require.Equal(t, now, node.timeOfMostRecentSample)
require.Equal(t, now, queue.timeOfMostRecentSample)

for i := 0; i < 100; i++ {
fe.Push(fmt.Sprintf("node-%d", i), "queue-0", "cluster", false)
now := time.Now()
fe.Push(fmt.Sprintf("node-%d", i), "queue-0", "cluster", false, now)
}
nodeParameterIndex, ok = fe.parameterIndexByNode["node-99"]
node, ok = fe.nodeByName["node-99"]
require.True(t, ok)
queueParameterIndex, ok = fe.parameterIndexByQueue["queue-0"]
queue, ok = fe.queueByName["queue-0"]
require.True(t, ok)
require.Equal(t, 2+100, nodeParameterIndex)
require.Equal(t, 3, queueParameterIndex)
require.Equal(t, 2+100, node.parameterIndex)
require.Equal(t, 3, queue.parameterIndex)
require.Equal(t, 0.5, fe.parameters.AtVec(102))
require.Equal(t, 0.5, fe.parameters.AtVec(3))

Expand All @@ -51,15 +56,15 @@ func TestUpdate(t *testing.T) {
assert.Less(t, nodeSuccessProbability, 0.5-eps)
assert.Less(t, queueSuccessProbability, 0.5-eps)

// Test that the estimates move in the expected direction on success.
fe.Push("node", "queue", "cluster", true)
// Test that the estimates move in the expected direction after observing successes and failures.
fe.Push("node", "queue", "cluster", true, now)
fe.Update()
assert.Greater(t, fe.parameters.AtVec(0), nodeSuccessProbability)
assert.Greater(t, fe.parameters.AtVec(1), queueSuccessProbability)

for i := 0; i < 1000; i++ {
for i := 0; i < 10; i++ {
fe.Push("node", "queue", "cluster", false)
fe.Push("node", "queue", "cluster", false, now)
}
fe.Update()
}
Expand All @@ -70,7 +75,7 @@ func TestUpdate(t *testing.T) {

for i := 0; i < 1000; i++ {
for i := 0; i < 10; i++ {
fe.Push("node", "queue", "cluster", true)
fe.Push("node", "queue", "cluster", true, now)
}
fe.Update()
}
Expand Down
Loading

0 comments on commit be3b40c

Please sign in to comment.