Skip to content

Commit

Permalink
Comments and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
severinson committed Feb 28, 2024
1 parent 046a892 commit cb4b113
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 18 deletions.
2 changes: 2 additions & 0 deletions internal/common/optimisation/descent/descent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/armadaproject/armada/internal/common/armadaerrors"
)

// Gradient descent optimiser; see the following link for details:
// https://fluxml.ai/Flux.jl/stable/training/optimisers/
type Descent struct {
eta float64
}
Expand Down
2 changes: 2 additions & 0 deletions internal/common/optimisation/nesterov/nesterov.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/armadaproject/armada/internal/common/linalg"
)

// Nesterov accelerated gradient descent optimiser; see the following link for details:
// https://fluxml.ai/Flux.jl/stable/training/optimisers/
type Nesterov struct {
eta float64
rho float64
Expand Down
3 changes: 3 additions & 0 deletions internal/common/optimisation/optimisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package optimisation

import "gonum.org/v1/gonum/mat"

// Optimiser represents a first-order optimisation algorithm.
type Optimiser interface {
// Update the parameters p using gradient g and store the result in out.
Update(out, p *mat.VecDense, g mat.Vector) *mat.VecDense
// Extend the internal state of the optimiser to accommodate at least n parameters.
Extend(n int)
}
5 changes: 4 additions & 1 deletion internal/common/slices/slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,21 @@ func AnyFunc[S ~[]T, T any](s S, predicate func(val T) bool) bool {
return false
}

// Zeros returns a slice T[] of length n with all elements equal to zero.
func Zeros[T any](n int) []T {
return make([]T, n)
}

func Fill[T interfaces.Number](v T, n int) []T {
// Fill returns a slice T[] of length n with all elements equal to v.
func Fill[T any](v T, n int) []T {
rv := make([]T, n)
for i := range rv {
rv[i] = v
}
return rv
}

// Ones returns a slice T[] of length n with all elements equal to 1.
func Ones[T interfaces.Number](n int) []T {
return Fill[T](1, n)
}
36 changes: 19 additions & 17 deletions internal/scheduler/failureestimator/failureestimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ type FailureEstimator struct {
parameterIndexByNode map[string]int
parameterIndexByQueue map[string]int

// Model updates that have not been applied yet.
pendingUpdates []Update
// Samples that have not been processed yet.
samples []Sample

// Optimisation settings.
numInnerIterations int
Expand All @@ -79,7 +79,7 @@ type FailureEstimator struct {
mu sync.Mutex
}

type Update struct {
type Sample struct {
i int
j int
c bool
Expand Down Expand Up @@ -139,6 +139,8 @@ func (fe *FailureEstimator) IsDisabled() bool {
return fe.disabled
}

// Push adds a sample to the internal buffer of the failure estimator.
// Samples added via Push are processed on the next call to Update.
func (fe *FailureEstimator) Push(node, queue string, success bool) {
fe.mu.Lock()
defer fe.mu.Unlock()
Expand All @@ -154,7 +156,7 @@ func (fe *FailureEstimator) Push(node, queue string, success bool) {
fe.parameterIndexByQueue[queue] = j
}
fe.extendParameters(armadamath.Max(i, j) + 1)
fe.pendingUpdates = append(fe.pendingUpdates, Update{
fe.samples = append(fe.samples, Sample{
i: i,
j: j,
c: success,
Expand All @@ -174,31 +176,31 @@ func (fe *FailureEstimator) extendParameters(n int) {
fe.gradient = linalg.ExtendVecDense(fe.gradient, n)
}

// Update success estimates based on pendingUpdates.
// Update success probability estimates based on pushed samples.
func (fe *FailureEstimator) Update() {
fe.mu.Lock()
defer fe.mu.Unlock()
if len(fe.pendingUpdates) == 0 {
if len(fe.samples) == 0 {
// Nothing to do.
return
}

// Inner loop to compute intermediateParameters from pendingUpdates.
// Passing over pendingUpdates multiple times in random order helps improve convergence.
// Inner loop to compute intermediateParameters from samples.
// Passing over samples multiple times in random order helps improve convergence.
fe.intermediateParameters.CopyVec(fe.parameters)
for k := 0; k < fe.numInnerIterations; k++ {

// Compute gradient with respect to updates.
fe.gradient.Zero()
slices.Shuffle(fe.pendingUpdates)
for _, update := range fe.pendingUpdates {
slices.Shuffle(fe.samples)
for _, sample := range fe.samples {
gi, gj := fe.negLogLikelihoodGradient(
fe.intermediateParameters.AtVec(update.i),
fe.intermediateParameters.AtVec(update.j),
update.c,
fe.intermediateParameters.AtVec(sample.i),
fe.intermediateParameters.AtVec(sample.j),
sample.c,
)
fe.gradient.SetVec(update.i, fe.gradient.AtVec(update.i)+gi)
fe.gradient.SetVec(update.j, fe.gradient.AtVec(update.j)+gj)
fe.gradient.SetVec(sample.i, fe.gradient.AtVec(sample.i)+gi)
fe.gradient.SetVec(sample.j, fe.gradient.AtVec(sample.j)+gj)
}

// Update intermediateParameters using this gradient.
Expand All @@ -216,8 +218,8 @@ func (fe *FailureEstimator) Update() {
fe.parameters = fe.outerOptimiser.Update(fe.parameters, fe.parameters, fe.gradient)
applyBoundsVec(fe.parameters)

// Empty the pending updates.
fe.pendingUpdates = fe.pendingUpdates[0:0]
// Empty the buffer.
fe.samples = fe.samples[0:0]
}

func applyBoundsVec(vec *mat.VecDense) {
Expand Down

0 comments on commit cb4b113

Please sign in to comment.