From cb4b1130712cee09eb69d4affc1233a6aeadffbc Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Wed, 28 Feb 2024 11:32:11 +0000 Subject: [PATCH] Comments and cleanup --- .../common/optimisation/descent/descent.go | 2 ++ .../common/optimisation/nesterov/nesterov.go | 2 ++ internal/common/optimisation/optimisation.go | 3 ++ internal/common/slices/slices.go | 5 ++- .../failureestimator/failureestimator.go | 36 ++++++++++--------- 5 files changed, 30 insertions(+), 18 deletions(-) diff --git a/internal/common/optimisation/descent/descent.go b/internal/common/optimisation/descent/descent.go index 758f882ce0b..c57051084ac 100644 --- a/internal/common/optimisation/descent/descent.go +++ b/internal/common/optimisation/descent/descent.go @@ -9,6 +9,8 @@ import ( "github.com/armadaproject/armada/internal/common/armadaerrors" ) +// Gradient descent optimiser; see the following link for details: +// https://fluxml.ai/Flux.jl/stable/training/optimisers/ type Descent struct { eta float64 } diff --git a/internal/common/optimisation/nesterov/nesterov.go b/internal/common/optimisation/nesterov/nesterov.go index 8a632c4af44..050f91f5e7f 100644 --- a/internal/common/optimisation/nesterov/nesterov.go +++ b/internal/common/optimisation/nesterov/nesterov.go @@ -11,6 +11,8 @@ import ( "github.com/armadaproject/armada/internal/common/linalg" ) +// Nesterov accelerated gradient descent optimiser; see the following link for details: +// https://fluxml.ai/Flux.jl/stable/training/optimisers/ type Nesterov struct { eta float64 rho float64 diff --git a/internal/common/optimisation/optimisation.go b/internal/common/optimisation/optimisation.go index 10cc76a7bb1..d9b789e0029 100644 --- a/internal/common/optimisation/optimisation.go +++ b/internal/common/optimisation/optimisation.go @@ -2,7 +2,10 @@ package optimisation import "gonum.org/v1/gonum/mat" +// Optimiser represents a first-order optimisation algorithm. type Optimiser interface { + // Update the parameters p using gradient g and store the result in out. Update(out, p *mat.VecDense, g mat.Vector) *mat.VecDense + // Extend the internal state of the optimiser to accommodate at least n parameters. Extend(n int) } diff --git a/internal/common/slices/slices.go b/internal/common/slices/slices.go index 0187c557cd8..9063bf85053 100644 --- a/internal/common/slices/slices.go +++ b/internal/common/slices/slices.go @@ -181,11 +181,13 @@ func AnyFunc[S ~[]T, T any](s S, predicate func(val T) bool) bool { return false } +// Zeros returns a slice T[] of length n with all elements equal to zero. func Zeros[T any](n int) []T { return make([]T, n) } -func Fill[T interfaces.Number](v T, n int) []T { +// Fill returns a slice T[] of length n with all elements equal to v. +func Fill[T any](v T, n int) []T { rv := make([]T, n) for i := range rv { rv[i] = v @@ -193,6 +195,7 @@ func Fill[T interfaces.Number](v T, n int) []T { return rv } +// Ones returns a slice T[] of length n with all elements equal to 1. func Ones[T interfaces.Number](n int) []T { return Fill[T](1, n) } diff --git a/internal/scheduler/failureestimator/failureestimator.go b/internal/scheduler/failureestimator/failureestimator.go index 7424b27c103..c131ded8da8 100644 --- a/internal/scheduler/failureestimator/failureestimator.go +++ b/internal/scheduler/failureestimator/failureestimator.go @@ -59,8 +59,8 @@ type FailureEstimator struct { parameterIndexByNode map[string]int parameterIndexByQueue map[string]int - // Model updates that have not been applied yet. - pendingUpdates []Update + // Samples that have not been processed yet. + samples []Sample // Optimisation settings. numInnerIterations int @@ -79,7 +79,7 @@ type FailureEstimator struct { mu sync.Mutex } -type Update struct { +type Sample struct { i int j int c bool @@ -139,6 +139,8 @@ func (fe *FailureEstimator) IsDisabled() bool { return fe.disabled } +// Push adds a sample to the internal buffer of the failure estimator. +// Samples added via Push are processed on the next call to Update. func (fe *FailureEstimator) Push(node, queue string, success bool) { fe.mu.Lock() defer fe.mu.Unlock() @@ -154,7 +156,7 @@ func (fe *FailureEstimator) Push(node, queue string, success bool) { fe.parameterIndexByQueue[queue] = j } fe.extendParameters(armadamath.Max(i, j) + 1) - fe.pendingUpdates = append(fe.pendingUpdates, Update{ + fe.samples = append(fe.samples, Sample{ i: i, j: j, c: success, @@ -174,31 +176,31 @@ func (fe *FailureEstimator) extendParameters(n int) { fe.gradient = linalg.ExtendVecDense(fe.gradient, n) } -// Update success estimates based on pendingUpdates. +// Update success probability estimates based on pushed samples. func (fe *FailureEstimator) Update() { fe.mu.Lock() defer fe.mu.Unlock() - if len(fe.pendingUpdates) == 0 { + if len(fe.samples) == 0 { // Nothing to do. return } - // Inner loop to compute intermediateParameters from pendingUpdates. - // Passing over pendingUpdates multiple times in random order helps improve convergence. + // Inner loop to compute intermediateParameters from samples. + // Passing over samples multiple times in random order helps improve convergence. fe.intermediateParameters.CopyVec(fe.parameters) for k := 0; k < fe.numInnerIterations; k++ { // Compute gradient with respect to updates. fe.gradient.Zero() - slices.Shuffle(fe.pendingUpdates) - for _, update := range fe.pendingUpdates { + slices.Shuffle(fe.samples) + for _, sample := range fe.samples { gi, gj := fe.negLogLikelihoodGradient( - fe.intermediateParameters.AtVec(update.i), - fe.intermediateParameters.AtVec(update.j), - update.c, + fe.intermediateParameters.AtVec(sample.i), + fe.intermediateParameters.AtVec(sample.j), + sample.c, ) - fe.gradient.SetVec(update.i, fe.gradient.AtVec(update.i)+gi) - fe.gradient.SetVec(update.j, fe.gradient.AtVec(update.j)+gj) + fe.gradient.SetVec(sample.i, fe.gradient.AtVec(sample.i)+gi) + fe.gradient.SetVec(sample.j, fe.gradient.AtVec(sample.j)+gj) } // Update intermediateParameters using this gradient. @@ -216,8 +218,8 @@ func (fe *FailureEstimator) Update() { fe.parameters = fe.outerOptimiser.Update(fe.parameters, fe.parameters, fe.gradient) applyBoundsVec(fe.parameters) - // Empty the pending updates. - fe.pendingUpdates = fe.pendingUpdates[0:0] + // Empty the buffer. + fe.samples = fe.samples[0:0] } func applyBoundsVec(vec *mat.VecDense) {