Skip to content

Commit

Permalink
Fix stale cycle metrics by resetting each round
Browse files Browse the repository at this point in the history
Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin committed Jan 29, 2025
1 parent fcee91c commit 473ba07
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 186 deletions.
280 changes: 138 additions & 142 deletions internal/scheduler/metrics/cycle_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -15,51 +16,28 @@ var (
poolQueueAndResourceLabels = []string{poolLabel, queueLabel, resourceLabel}
)

type cycleMetrics struct {
leaderMetricsEnabled bool

scheduledJobs *prometheus.CounterVec
premptedJobs *prometheus.CounterVec
consideredJobs *prometheus.GaugeVec
fairShare *prometheus.GaugeVec
adjustedFairShare *prometheus.GaugeVec
actualShare *prometheus.GaugeVec
fairnessError *prometheus.GaugeVec
demand *prometheus.GaugeVec
cappedDemand *prometheus.GaugeVec
queueWeight *prometheus.GaugeVec
rawQueueWeight *prometheus.GaugeVec
scheduleCycleTime prometheus.Histogram
reconciliationCycleTime prometheus.Histogram
gangsConsidered *prometheus.GaugeVec
gangsScheduled *prometheus.GaugeVec
firstGangQueuePosition *prometheus.GaugeVec
lastGangQueuePosition *prometheus.GaugeVec
perQueueCycleTime *prometheus.GaugeVec
loopNumber *prometheus.GaugeVec
evictedJobs *prometheus.GaugeVec
evictedResources *prometheus.GaugeVec
spotPrice *prometheus.GaugeVec
allResettableMetrics []resettableMetric
type perCycleMetrics struct {
consideredJobs *prometheus.GaugeVec
fairShare *prometheus.GaugeVec
adjustedFairShare *prometheus.GaugeVec
actualShare *prometheus.GaugeVec
fairnessError *prometheus.GaugeVec
demand *prometheus.GaugeVec
cappedDemand *prometheus.GaugeVec
queueWeight *prometheus.GaugeVec
rawQueueWeight *prometheus.GaugeVec
gangsConsidered *prometheus.GaugeVec
gangsScheduled *prometheus.GaugeVec
firstGangQueuePosition *prometheus.GaugeVec
lastGangQueuePosition *prometheus.GaugeVec
perQueueCycleTime *prometheus.GaugeVec
loopNumber *prometheus.GaugeVec
evictedJobs *prometheus.GaugeVec
evictedResources *prometheus.GaugeVec
spotPrice *prometheus.GaugeVec
}

func newCycleMetrics() *cycleMetrics {
scheduledJobs := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "scheduled_jobs",
Help: "Number of events scheduled",
},
queueAndPriorityClassLabels,
)

premptedJobs := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "preempted_jobs",
Help: "Number of jobs preempted",
},
queueAndPriorityClassLabels,
)

func newPerCycleMetrics() *perCycleMetrics {
consideredJobs := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "considered_jobs",
Expand Down Expand Up @@ -132,22 +110,6 @@ func newCycleMetrics() *cycleMetrics {
[]string{poolLabel},
)

scheduleCycleTime := prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: prefix + "schedule_cycle_times",
Help: "Cycle time when in a scheduling round.",
Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110),
},
)

reconciliationCycleTime := prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: prefix + "reconciliation_cycle_times",
Help: "Cycle time when in a scheduling round.",
Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110),
},
)

gangsConsidered := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "gangs_considered",
Expand Down Expand Up @@ -220,10 +182,7 @@ func newCycleMetrics() *cycleMetrics {
poolLabels,
)

return &cycleMetrics{
leaderMetricsEnabled: true,
scheduledJobs: scheduledJobs,
premptedJobs: premptedJobs,
return &perCycleMetrics{
consideredJobs: consideredJobs,
fairShare: fairShare,
adjustedFairShare: adjustedFairShare,
Expand All @@ -233,7 +192,6 @@ func newCycleMetrics() *cycleMetrics {
queueWeight: queueWeight,
rawQueueWeight: rawQueueWeight,
fairnessError: fairnessError,
scheduleCycleTime: scheduleCycleTime,
gangsConsidered: gangsConsidered,
gangsScheduled: gangsScheduled,
firstGangQueuePosition: firstGangQueuePosition,
Expand All @@ -243,30 +201,62 @@ func newCycleMetrics() *cycleMetrics {
evictedJobs: evictedJobs,
evictedResources: evictedResources,
spotPrice: spotPrice,
allResettableMetrics: []resettableMetric{
scheduledJobs,
premptedJobs,
consideredJobs,
fairShare,
adjustedFairShare,
actualShare,
demand,
cappedDemand,
fairnessError,
gangsConsidered,
gangsScheduled,
firstGangQueuePosition,
lastGangQueuePosition,
perQueueCycleTime,
loopNumber,
evictedJobs,
evictedResources,
spotPrice,
queueWeight,
rawQueueWeight,
}
}

type cycleMetrics struct {
leaderMetricsEnabled bool

scheduledJobs *prometheus.CounterVec
premptedJobs *prometheus.CounterVec
scheduleCycleTime prometheus.Histogram
reconciliationCycleTime prometheus.Histogram
latestCycleMetrics atomic.Pointer[perCycleMetrics]
}

func newCycleMetrics() *cycleMetrics {
scheduledJobs := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "scheduled_jobs",
Help: "Number of events scheduled",
},
queueAndPriorityClassLabels,
)

premptedJobs := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "preempted_jobs",
Help: "Number of jobs preempted",
},
queueAndPriorityClassLabels,
)

scheduleCycleTime := prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: prefix + "schedule_cycle_times",
Help: "Cycle time when in a scheduling round.",
Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110),
},
)

reconciliationCycleTime := prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: prefix + "reconciliation_cycle_times",
Help: "Cycle time when in a scheduling round.",
Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110),
},
)

cycleMetrics := &cycleMetrics{
leaderMetricsEnabled: true,
scheduledJobs: scheduledJobs,
premptedJobs: premptedJobs,
scheduleCycleTime: scheduleCycleTime,
reconciliationCycleTime: reconciliationCycleTime,
latestCycleMetrics: atomic.Pointer[perCycleMetrics]{},
}
cycleMetrics.latestCycleMetrics.Store(newPerCycleMetrics())
return cycleMetrics
}

func (m *cycleMetrics) enableLeaderMetrics() {
Expand All @@ -279,9 +269,9 @@ func (m *cycleMetrics) disableLeaderMetrics() {
}

func (m *cycleMetrics) resetLeaderMetrics() {
for _, metric := range m.allResettableMetrics {
metric.Reset()
}
m.premptedJobs.Reset()
m.scheduledJobs.Reset()
m.latestCycleMetrics.Store(newPerCycleMetrics())
}

func (m *cycleMetrics) ReportScheduleCycleTime(cycleTime time.Duration) {
Expand All @@ -294,6 +284,7 @@ func (m *cycleMetrics) ReportReconcileCycleTime(cycleTime time.Duration) {

func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult) {
// Metrics that depend on pool
currentCycle := newPerCycleMetrics()
for _, schedContext := range result.SchedulingContexts {
pool := schedContext.Pool
for queue, queueContext := range schedContext.QueueSchedulingContexts {
Expand All @@ -302,17 +293,17 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult)
demand := schedContext.FairnessCostProvider.UnweightedCostFromAllocation(queueContext.Demand)
cappedDemand := schedContext.FairnessCostProvider.UnweightedCostFromAllocation(queueContext.CappedDemand)

m.consideredJobs.WithLabelValues(pool, queue).Set(jobsConsidered)
m.fairShare.WithLabelValues(pool, queue).Set(queueContext.FairShare)
m.adjustedFairShare.WithLabelValues(pool, queue).Set(queueContext.AdjustedFairShare)
m.actualShare.WithLabelValues(pool, queue).Set(actualShare)
m.demand.WithLabelValues(pool, queue).Set(demand)
m.cappedDemand.WithLabelValues(pool, queue).Set(cappedDemand)
m.queueWeight.WithLabelValues(pool, queue).Set(queueContext.Weight)
m.rawQueueWeight.WithLabelValues(pool, queue).Set(queueContext.RawWeight)
currentCycle.consideredJobs.WithLabelValues(pool, queue).Set(jobsConsidered)
currentCycle.fairShare.WithLabelValues(pool, queue).Set(queueContext.FairShare)
currentCycle.adjustedFairShare.WithLabelValues(pool, queue).Set(queueContext.AdjustedFairShare)
currentCycle.actualShare.WithLabelValues(pool, queue).Set(actualShare)
currentCycle.demand.WithLabelValues(pool, queue).Set(demand)
currentCycle.cappedDemand.WithLabelValues(pool, queue).Set(cappedDemand)
currentCycle.queueWeight.WithLabelValues(pool, queue).Set(queueContext.Weight)
currentCycle.rawQueueWeight.WithLabelValues(pool, queue).Set(queueContext.RawWeight)
}
m.fairnessError.WithLabelValues(pool).Set(schedContext.FairnessError())
m.spotPrice.WithLabelValues(pool).Set(schedContext.SpotPrice)
currentCycle.fairnessError.WithLabelValues(pool).Set(schedContext.FairnessError())
currentCycle.spotPrice.WithLabelValues(pool).Set(schedContext.SpotPrice)
}

for _, jobCtx := range result.ScheduledJobs {
Expand All @@ -325,48 +316,51 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult)

for pool, schedulingStats := range result.PerPoolSchedulingStats {
for queue, s := range schedulingStats.StatsPerQueue {
m.gangsConsidered.WithLabelValues(pool, queue).Set(float64(s.GangsConsidered))
m.gangsScheduled.WithLabelValues(pool, queue).Set(float64(s.GangsScheduled))
m.firstGangQueuePosition.WithLabelValues(pool, queue).Set(float64(s.FirstGangConsideredQueuePosition))
m.lastGangQueuePosition.WithLabelValues(pool, queue).Set(float64(s.LastGangScheduledQueuePosition))
m.perQueueCycleTime.WithLabelValues(pool, queue).Set(float64(s.Time.Milliseconds()))
currentCycle.gangsConsidered.WithLabelValues(pool, queue).Set(float64(s.GangsConsidered))
currentCycle.gangsScheduled.WithLabelValues(pool, queue).Set(float64(s.GangsScheduled))
currentCycle.firstGangQueuePosition.WithLabelValues(pool, queue).Set(float64(s.FirstGangConsideredQueuePosition))
currentCycle.lastGangQueuePosition.WithLabelValues(pool, queue).Set(float64(s.LastGangScheduledQueuePosition))
currentCycle.perQueueCycleTime.WithLabelValues(pool, queue).Set(float64(s.Time.Milliseconds()))
}

m.loopNumber.WithLabelValues(pool).Set(float64(schedulingStats.LoopNumber))
currentCycle.loopNumber.WithLabelValues(pool).Set(float64(schedulingStats.LoopNumber))

for queue, s := range schedulingStats.EvictorResult.GetStatsPerQueue() {
m.evictedJobs.WithLabelValues(pool, queue).Set(float64(s.EvictedJobCount))
currentCycle.evictedJobs.WithLabelValues(pool, queue).Set(float64(s.EvictedJobCount))

for _, r := range s.EvictedResources.GetResources() {
m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.RawValue))
currentCycle.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.RawValue))
}
}
}
m.latestCycleMetrics.Store(currentCycle)
}

func (m *cycleMetrics) describe(ch chan<- *prometheus.Desc) {
if m.leaderMetricsEnabled {
m.scheduledJobs.Describe(ch)
m.premptedJobs.Describe(ch)
m.consideredJobs.Describe(ch)
m.fairShare.Describe(ch)
m.adjustedFairShare.Describe(ch)
m.actualShare.Describe(ch)
m.fairnessError.Describe(ch)
m.demand.Describe(ch)
m.cappedDemand.Describe(ch)
m.queueWeight.Describe(ch)
m.rawQueueWeight.Describe(ch)
m.scheduleCycleTime.Describe(ch)
m.gangsConsidered.Describe(ch)
m.gangsScheduled.Describe(ch)
m.firstGangQueuePosition.Describe(ch)
m.lastGangQueuePosition.Describe(ch)
m.perQueueCycleTime.Describe(ch)
m.loopNumber.Describe(ch)
m.evictedJobs.Describe(ch)
m.evictedResources.Describe(ch)
m.spotPrice.Describe(ch)

currentCycle := m.latestCycleMetrics.Load()
currentCycle.consideredJobs.Describe(ch)
currentCycle.fairShare.Describe(ch)
currentCycle.adjustedFairShare.Describe(ch)
currentCycle.actualShare.Describe(ch)
currentCycle.fairnessError.Describe(ch)
currentCycle.demand.Describe(ch)
currentCycle.cappedDemand.Describe(ch)
currentCycle.queueWeight.Describe(ch)
currentCycle.rawQueueWeight.Describe(ch)
currentCycle.gangsConsidered.Describe(ch)
currentCycle.gangsScheduled.Describe(ch)
currentCycle.firstGangQueuePosition.Describe(ch)
currentCycle.lastGangQueuePosition.Describe(ch)
currentCycle.perQueueCycleTime.Describe(ch)
currentCycle.loopNumber.Describe(ch)
currentCycle.evictedJobs.Describe(ch)
currentCycle.evictedResources.Describe(ch)
currentCycle.spotPrice.Describe(ch)
}

m.reconciliationCycleTime.Describe(ch)
Expand All @@ -376,25 +370,27 @@ func (m *cycleMetrics) collect(ch chan<- prometheus.Metric) {
if m.leaderMetricsEnabled {
m.scheduledJobs.Collect(ch)
m.premptedJobs.Collect(ch)
m.consideredJobs.Collect(ch)
m.fairShare.Collect(ch)
m.adjustedFairShare.Collect(ch)
m.actualShare.Collect(ch)
m.fairnessError.Collect(ch)
m.demand.Collect(ch)
m.cappedDemand.Collect(ch)
m.rawQueueWeight.Collect(ch)
m.queueWeight.Collect(ch)
m.scheduleCycleTime.Collect(ch)
m.gangsConsidered.Collect(ch)
m.gangsScheduled.Collect(ch)
m.firstGangQueuePosition.Collect(ch)
m.lastGangQueuePosition.Collect(ch)
m.perQueueCycleTime.Collect(ch)
m.loopNumber.Collect(ch)
m.evictedJobs.Collect(ch)
m.evictedResources.Collect(ch)
m.spotPrice.Collect(ch)

currentCycle := m.latestCycleMetrics.Load()
currentCycle.consideredJobs.Collect(ch)
currentCycle.fairShare.Collect(ch)
currentCycle.adjustedFairShare.Collect(ch)
currentCycle.actualShare.Collect(ch)
currentCycle.fairnessError.Collect(ch)
currentCycle.demand.Collect(ch)
currentCycle.cappedDemand.Collect(ch)
currentCycle.rawQueueWeight.Collect(ch)
currentCycle.queueWeight.Collect(ch)
currentCycle.gangsConsidered.Collect(ch)
currentCycle.gangsScheduled.Collect(ch)
currentCycle.firstGangQueuePosition.Collect(ch)
currentCycle.lastGangQueuePosition.Collect(ch)
currentCycle.perQueueCycleTime.Collect(ch)
currentCycle.loopNumber.Collect(ch)
currentCycle.evictedJobs.Collect(ch)
currentCycle.evictedResources.Collect(ch)
currentCycle.spotPrice.Collect(ch)
}

m.reconciliationCycleTime.Collect(ch)
Expand Down
Loading

0 comments on commit 473ba07

Please sign in to comment.