From 473ba07d631d385d9b02a934d28ec7a134ab8a45 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 29 Jan 2025 15:45:47 +0000 Subject: [PATCH] Fix stale cycle metrics by resetting each round Signed-off-by: JamesMurkin --- internal/scheduler/metrics/cycle_metrics.go | 280 +++++++++--------- .../scheduler/metrics/cycle_metrics_test.go | 99 ++++--- 2 files changed, 193 insertions(+), 186 deletions(-) diff --git a/internal/scheduler/metrics/cycle_metrics.go b/internal/scheduler/metrics/cycle_metrics.go index 6f0cf8c54f6..6a299818faa 100644 --- a/internal/scheduler/metrics/cycle_metrics.go +++ b/internal/scheduler/metrics/cycle_metrics.go @@ -1,6 +1,7 @@ package metrics import ( + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -15,51 +16,28 @@ var ( poolQueueAndResourceLabels = []string{poolLabel, queueLabel, resourceLabel} ) -type cycleMetrics struct { - leaderMetricsEnabled bool - - scheduledJobs *prometheus.CounterVec - premptedJobs *prometheus.CounterVec - consideredJobs *prometheus.GaugeVec - fairShare *prometheus.GaugeVec - adjustedFairShare *prometheus.GaugeVec - actualShare *prometheus.GaugeVec - fairnessError *prometheus.GaugeVec - demand *prometheus.GaugeVec - cappedDemand *prometheus.GaugeVec - queueWeight *prometheus.GaugeVec - rawQueueWeight *prometheus.GaugeVec - scheduleCycleTime prometheus.Histogram - reconciliationCycleTime prometheus.Histogram - gangsConsidered *prometheus.GaugeVec - gangsScheduled *prometheus.GaugeVec - firstGangQueuePosition *prometheus.GaugeVec - lastGangQueuePosition *prometheus.GaugeVec - perQueueCycleTime *prometheus.GaugeVec - loopNumber *prometheus.GaugeVec - evictedJobs *prometheus.GaugeVec - evictedResources *prometheus.GaugeVec - spotPrice *prometheus.GaugeVec - allResettableMetrics []resettableMetric +type perCycleMetrics struct { + consideredJobs *prometheus.GaugeVec + fairShare *prometheus.GaugeVec + adjustedFairShare *prometheus.GaugeVec + actualShare *prometheus.GaugeVec + fairnessError *prometheus.GaugeVec + demand *prometheus.GaugeVec + cappedDemand *prometheus.GaugeVec + queueWeight *prometheus.GaugeVec + rawQueueWeight *prometheus.GaugeVec + gangsConsidered *prometheus.GaugeVec + gangsScheduled *prometheus.GaugeVec + firstGangQueuePosition *prometheus.GaugeVec + lastGangQueuePosition *prometheus.GaugeVec + perQueueCycleTime *prometheus.GaugeVec + loopNumber *prometheus.GaugeVec + evictedJobs *prometheus.GaugeVec + evictedResources *prometheus.GaugeVec + spotPrice *prometheus.GaugeVec } -func newCycleMetrics() *cycleMetrics { - scheduledJobs := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: prefix + "scheduled_jobs", - Help: "Number of events scheduled", - }, - queueAndPriorityClassLabels, - ) - - premptedJobs := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: prefix + "preempted_jobs", - Help: "Number of jobs preempted", - }, - queueAndPriorityClassLabels, - ) - +func newPerCycleMetrics() *perCycleMetrics { consideredJobs := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: prefix + "considered_jobs", @@ -132,22 +110,6 @@ func newCycleMetrics() *cycleMetrics { []string{poolLabel}, ) - scheduleCycleTime := prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: prefix + "schedule_cycle_times", - Help: "Cycle time when in a scheduling round.", - Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110), - }, - ) - - reconciliationCycleTime := prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: prefix + "reconciliation_cycle_times", - Help: "Cycle time when in a scheduling round.", - Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110), - }, - ) - gangsConsidered := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: prefix + "gangs_considered", @@ -220,10 +182,7 @@ func newCycleMetrics() *cycleMetrics { poolLabels, ) - return &cycleMetrics{ - leaderMetricsEnabled: true, - scheduledJobs: scheduledJobs, - premptedJobs: premptedJobs, + return &perCycleMetrics{ consideredJobs: consideredJobs, fairShare: fairShare, adjustedFairShare: adjustedFairShare, @@ -233,7 +192,6 @@ func newCycleMetrics() *cycleMetrics { queueWeight: queueWeight, rawQueueWeight: rawQueueWeight, fairnessError: fairnessError, - scheduleCycleTime: scheduleCycleTime, gangsConsidered: gangsConsidered, gangsScheduled: gangsScheduled, firstGangQueuePosition: firstGangQueuePosition, @@ -243,30 +201,62 @@ func newCycleMetrics() *cycleMetrics { evictedJobs: evictedJobs, evictedResources: evictedResources, spotPrice: spotPrice, - allResettableMetrics: []resettableMetric{ - scheduledJobs, - premptedJobs, - consideredJobs, - fairShare, - adjustedFairShare, - actualShare, - demand, - cappedDemand, - fairnessError, - gangsConsidered, - gangsScheduled, - firstGangQueuePosition, - lastGangQueuePosition, - perQueueCycleTime, - loopNumber, - evictedJobs, - evictedResources, - spotPrice, - queueWeight, - rawQueueWeight, + } +} + +type cycleMetrics struct { + leaderMetricsEnabled bool + + scheduledJobs *prometheus.CounterVec + premptedJobs *prometheus.CounterVec + scheduleCycleTime prometheus.Histogram + reconciliationCycleTime prometheus.Histogram + latestCycleMetrics atomic.Pointer[perCycleMetrics] +} + +func newCycleMetrics() *cycleMetrics { + scheduledJobs := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: prefix + "scheduled_jobs", + Help: "Number of events scheduled", }, + queueAndPriorityClassLabels, + ) + + premptedJobs := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: prefix + "preempted_jobs", + Help: "Number of jobs preempted", + }, + queueAndPriorityClassLabels, + ) + + scheduleCycleTime := prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: prefix + "schedule_cycle_times", + Help: "Cycle time when in a scheduling round.", + Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110), + }, + ) + + reconciliationCycleTime := prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: prefix + "reconciliation_cycle_times", + Help: "Cycle time when in a scheduling round.", + Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110), + }, + ) + + cycleMetrics := &cycleMetrics{ + leaderMetricsEnabled: true, + scheduledJobs: scheduledJobs, + premptedJobs: premptedJobs, + scheduleCycleTime: scheduleCycleTime, reconciliationCycleTime: reconciliationCycleTime, + latestCycleMetrics: atomic.Pointer[perCycleMetrics]{}, } + cycleMetrics.latestCycleMetrics.Store(newPerCycleMetrics()) + return cycleMetrics } func (m *cycleMetrics) enableLeaderMetrics() { @@ -279,9 +269,9 @@ func (m *cycleMetrics) disableLeaderMetrics() { } func (m *cycleMetrics) resetLeaderMetrics() { - for _, metric := range m.allResettableMetrics { - metric.Reset() - } + m.premptedJobs.Reset() + m.scheduledJobs.Reset() + m.latestCycleMetrics.Store(newPerCycleMetrics()) } func (m *cycleMetrics) ReportScheduleCycleTime(cycleTime time.Duration) { @@ -294,6 +284,7 @@ func (m *cycleMetrics) ReportReconcileCycleTime(cycleTime time.Duration) { func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult) { // Metrics that depend on pool + currentCycle := newPerCycleMetrics() for _, schedContext := range result.SchedulingContexts { pool := schedContext.Pool for queue, queueContext := range schedContext.QueueSchedulingContexts { @@ -302,17 +293,17 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult) demand := schedContext.FairnessCostProvider.UnweightedCostFromAllocation(queueContext.Demand) cappedDemand := schedContext.FairnessCostProvider.UnweightedCostFromAllocation(queueContext.CappedDemand) - m.consideredJobs.WithLabelValues(pool, queue).Set(jobsConsidered) - m.fairShare.WithLabelValues(pool, queue).Set(queueContext.FairShare) - m.adjustedFairShare.WithLabelValues(pool, queue).Set(queueContext.AdjustedFairShare) - m.actualShare.WithLabelValues(pool, queue).Set(actualShare) - m.demand.WithLabelValues(pool, queue).Set(demand) - m.cappedDemand.WithLabelValues(pool, queue).Set(cappedDemand) - m.queueWeight.WithLabelValues(pool, queue).Set(queueContext.Weight) - m.rawQueueWeight.WithLabelValues(pool, queue).Set(queueContext.RawWeight) + currentCycle.consideredJobs.WithLabelValues(pool, queue).Set(jobsConsidered) + currentCycle.fairShare.WithLabelValues(pool, queue).Set(queueContext.FairShare) + currentCycle.adjustedFairShare.WithLabelValues(pool, queue).Set(queueContext.AdjustedFairShare) + currentCycle.actualShare.WithLabelValues(pool, queue).Set(actualShare) + currentCycle.demand.WithLabelValues(pool, queue).Set(demand) + currentCycle.cappedDemand.WithLabelValues(pool, queue).Set(cappedDemand) + currentCycle.queueWeight.WithLabelValues(pool, queue).Set(queueContext.Weight) + currentCycle.rawQueueWeight.WithLabelValues(pool, queue).Set(queueContext.RawWeight) } - m.fairnessError.WithLabelValues(pool).Set(schedContext.FairnessError()) - m.spotPrice.WithLabelValues(pool).Set(schedContext.SpotPrice) + currentCycle.fairnessError.WithLabelValues(pool).Set(schedContext.FairnessError()) + currentCycle.spotPrice.WithLabelValues(pool).Set(schedContext.SpotPrice) } for _, jobCtx := range result.ScheduledJobs { @@ -325,48 +316,51 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult) for pool, schedulingStats := range result.PerPoolSchedulingStats { for queue, s := range schedulingStats.StatsPerQueue { - m.gangsConsidered.WithLabelValues(pool, queue).Set(float64(s.GangsConsidered)) - m.gangsScheduled.WithLabelValues(pool, queue).Set(float64(s.GangsScheduled)) - m.firstGangQueuePosition.WithLabelValues(pool, queue).Set(float64(s.FirstGangConsideredQueuePosition)) - m.lastGangQueuePosition.WithLabelValues(pool, queue).Set(float64(s.LastGangScheduledQueuePosition)) - m.perQueueCycleTime.WithLabelValues(pool, queue).Set(float64(s.Time.Milliseconds())) + currentCycle.gangsConsidered.WithLabelValues(pool, queue).Set(float64(s.GangsConsidered)) + currentCycle.gangsScheduled.WithLabelValues(pool, queue).Set(float64(s.GangsScheduled)) + currentCycle.firstGangQueuePosition.WithLabelValues(pool, queue).Set(float64(s.FirstGangConsideredQueuePosition)) + currentCycle.lastGangQueuePosition.WithLabelValues(pool, queue).Set(float64(s.LastGangScheduledQueuePosition)) + currentCycle.perQueueCycleTime.WithLabelValues(pool, queue).Set(float64(s.Time.Milliseconds())) } - m.loopNumber.WithLabelValues(pool).Set(float64(schedulingStats.LoopNumber)) + currentCycle.loopNumber.WithLabelValues(pool).Set(float64(schedulingStats.LoopNumber)) for queue, s := range schedulingStats.EvictorResult.GetStatsPerQueue() { - m.evictedJobs.WithLabelValues(pool, queue).Set(float64(s.EvictedJobCount)) + currentCycle.evictedJobs.WithLabelValues(pool, queue).Set(float64(s.EvictedJobCount)) for _, r := range s.EvictedResources.GetResources() { - m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.RawValue)) + currentCycle.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.RawValue)) } } } + m.latestCycleMetrics.Store(currentCycle) } func (m *cycleMetrics) describe(ch chan<- *prometheus.Desc) { if m.leaderMetricsEnabled { m.scheduledJobs.Describe(ch) m.premptedJobs.Describe(ch) - m.consideredJobs.Describe(ch) - m.fairShare.Describe(ch) - m.adjustedFairShare.Describe(ch) - m.actualShare.Describe(ch) - m.fairnessError.Describe(ch) - m.demand.Describe(ch) - m.cappedDemand.Describe(ch) - m.queueWeight.Describe(ch) - m.rawQueueWeight.Describe(ch) m.scheduleCycleTime.Describe(ch) - m.gangsConsidered.Describe(ch) - m.gangsScheduled.Describe(ch) - m.firstGangQueuePosition.Describe(ch) - m.lastGangQueuePosition.Describe(ch) - m.perQueueCycleTime.Describe(ch) - m.loopNumber.Describe(ch) - m.evictedJobs.Describe(ch) - m.evictedResources.Describe(ch) - m.spotPrice.Describe(ch) + + currentCycle := m.latestCycleMetrics.Load() + currentCycle.consideredJobs.Describe(ch) + currentCycle.fairShare.Describe(ch) + currentCycle.adjustedFairShare.Describe(ch) + currentCycle.actualShare.Describe(ch) + currentCycle.fairnessError.Describe(ch) + currentCycle.demand.Describe(ch) + currentCycle.cappedDemand.Describe(ch) + currentCycle.queueWeight.Describe(ch) + currentCycle.rawQueueWeight.Describe(ch) + currentCycle.gangsConsidered.Describe(ch) + currentCycle.gangsScheduled.Describe(ch) + currentCycle.firstGangQueuePosition.Describe(ch) + currentCycle.lastGangQueuePosition.Describe(ch) + currentCycle.perQueueCycleTime.Describe(ch) + currentCycle.loopNumber.Describe(ch) + currentCycle.evictedJobs.Describe(ch) + currentCycle.evictedResources.Describe(ch) + currentCycle.spotPrice.Describe(ch) } m.reconciliationCycleTime.Describe(ch) @@ -376,25 +370,27 @@ func (m *cycleMetrics) collect(ch chan<- prometheus.Metric) { if m.leaderMetricsEnabled { m.scheduledJobs.Collect(ch) m.premptedJobs.Collect(ch) - m.consideredJobs.Collect(ch) - m.fairShare.Collect(ch) - m.adjustedFairShare.Collect(ch) - m.actualShare.Collect(ch) - m.fairnessError.Collect(ch) - m.demand.Collect(ch) - m.cappedDemand.Collect(ch) - m.rawQueueWeight.Collect(ch) - m.queueWeight.Collect(ch) m.scheduleCycleTime.Collect(ch) - m.gangsConsidered.Collect(ch) - m.gangsScheduled.Collect(ch) - m.firstGangQueuePosition.Collect(ch) - m.lastGangQueuePosition.Collect(ch) - m.perQueueCycleTime.Collect(ch) - m.loopNumber.Collect(ch) - m.evictedJobs.Collect(ch) - m.evictedResources.Collect(ch) - m.spotPrice.Collect(ch) + + currentCycle := m.latestCycleMetrics.Load() + currentCycle.consideredJobs.Collect(ch) + currentCycle.fairShare.Collect(ch) + currentCycle.adjustedFairShare.Collect(ch) + currentCycle.actualShare.Collect(ch) + currentCycle.fairnessError.Collect(ch) + currentCycle.demand.Collect(ch) + currentCycle.cappedDemand.Collect(ch) + currentCycle.rawQueueWeight.Collect(ch) + currentCycle.queueWeight.Collect(ch) + currentCycle.gangsConsidered.Collect(ch) + currentCycle.gangsScheduled.Collect(ch) + currentCycle.firstGangQueuePosition.Collect(ch) + currentCycle.lastGangQueuePosition.Collect(ch) + currentCycle.perQueueCycleTime.Collect(ch) + currentCycle.loopNumber.Collect(ch) + currentCycle.evictedJobs.Collect(ch) + currentCycle.evictedResources.Collect(ch) + currentCycle.spotPrice.Collect(ch) } m.reconciliationCycleTime.Collect(ch) diff --git a/internal/scheduler/metrics/cycle_metrics_test.go b/internal/scheduler/metrics/cycle_metrics_test.go index 4c07b001413..4fbebfadfa7 100644 --- a/internal/scheduler/metrics/cycle_metrics_test.go +++ b/internal/scheduler/metrics/cycle_metrics_test.go @@ -61,31 +61,27 @@ func TestReportStateTransitions(t *testing.T) { poolQueue := []string{"pool1", "queue1"} - consideredJobs := testutil.ToFloat64(m.consideredJobs.WithLabelValues(poolQueue...)) + consideredJobs := testutil.ToFloat64(m.latestCycleMetrics.Load().consideredJobs.WithLabelValues(poolQueue...)) assert.Equal(t, 3.0, consideredJobs, "consideredJobs") - allocated := testutil.ToFloat64(m.actualShare.WithLabelValues(poolQueue...)) + allocated := testutil.ToFloat64(m.latestCycleMetrics.Load().actualShare.WithLabelValues(poolQueue...)) assert.InDelta(t, 0.1, allocated, epsilon, "allocated") - demand := testutil.ToFloat64(m.demand.WithLabelValues(poolQueue...)) + demand := testutil.ToFloat64(m.latestCycleMetrics.Load().demand.WithLabelValues(poolQueue...)) assert.InDelta(t, 0.2, demand, epsilon, "demand") - cappedDemand := testutil.ToFloat64(m.cappedDemand.WithLabelValues(poolQueue...)) + cappedDemand := testutil.ToFloat64(m.latestCycleMetrics.Load().cappedDemand.WithLabelValues(poolQueue...)) assert.InDelta(t, 0.15, cappedDemand, epsilon, "cappedDemand") - adjustedFairShare := testutil.ToFloat64(m.adjustedFairShare.WithLabelValues(poolQueue...)) + adjustedFairShare := testutil.ToFloat64(m.latestCycleMetrics.Load().adjustedFairShare.WithLabelValues(poolQueue...)) assert.InDelta(t, 0.15, adjustedFairShare, epsilon, "adjustedFairShare") - fairnessError := testutil.ToFloat64(m.fairnessError.WithLabelValues("pool1")) + fairnessError := testutil.ToFloat64(m.latestCycleMetrics.Load().fairnessError.WithLabelValues("pool1")) assert.InDelta(t, 0.05, fairnessError, epsilon, "fairnessError") } -func TestResetLeaderMetrics(t *testing.T) { +func TestResetLeaderMetrics_Counters(t *testing.T) { m := newCycleMetrics() - - poolLabelValues := []string{"pool1"} - poolQueueLabelValues := []string{"pool1", "queue1"} - poolQueueResouceLabelValues := []string{"pool1", "queue1", "cpu"} queuePriorityClassLabelValues := []string{"pool1", "priorityClass1"} testResetCounter := func(vec *prometheus.CounterVec, labelValues []string) { @@ -96,32 +92,47 @@ func TestResetLeaderMetrics(t *testing.T) { counterVal = testutil.ToFloat64(vec.WithLabelValues(labelValues...)) assert.Equal(t, 0.0, counterVal) } - testResetGauge := func(vec *prometheus.GaugeVec, labelValues []string) { + + testResetCounter(m.scheduledJobs, queuePriorityClassLabelValues) + testResetCounter(m.premptedJobs, queuePriorityClassLabelValues) +} + +func TestResetLeaderMetrics_ResetsLatestCycleMetrics(t *testing.T) { + m := newCycleMetrics() + poolLabelValues := []string{"pool1"} + poolQueueLabelValues := []string{"pool1", "queue1"} + poolQueueResourceLabelValues := []string{"pool1", "queue1", "cpu"} + + testResetGauge := func(getVec func(metrics *cycleMetrics) *prometheus.GaugeVec, labelValues []string) { + vec := getVec(m) vec.WithLabelValues(labelValues...).Inc() counterVal := testutil.ToFloat64(vec.WithLabelValues(labelValues...)) assert.Equal(t, 1.0, counterVal) m.resetLeaderMetrics() + vec = getVec(m) counterVal = testutil.ToFloat64(vec.WithLabelValues(labelValues...)) assert.Equal(t, 0.0, counterVal) } - testResetCounter(m.scheduledJobs, queuePriorityClassLabelValues) - testResetCounter(m.premptedJobs, queuePriorityClassLabelValues) - testResetGauge(m.consideredJobs, poolQueueLabelValues) - testResetGauge(m.fairShare, poolQueueLabelValues) - testResetGauge(m.adjustedFairShare, poolQueueLabelValues) - testResetGauge(m.actualShare, poolQueueLabelValues) - testResetGauge(m.fairnessError, []string{"pool1"}) - testResetGauge(m.demand, poolQueueLabelValues) - testResetGauge(m.cappedDemand, poolQueueLabelValues) - testResetGauge(m.gangsConsidered, poolQueueLabelValues) - testResetGauge(m.gangsScheduled, poolQueueLabelValues) - testResetGauge(m.firstGangQueuePosition, poolQueueLabelValues) - testResetGauge(m.lastGangQueuePosition, poolQueueLabelValues) - testResetGauge(m.perQueueCycleTime, poolQueueLabelValues) - testResetGauge(m.loopNumber, poolLabelValues) - testResetGauge(m.evictedJobs, poolQueueLabelValues) - testResetGauge(m.evictedResources, poolQueueResouceLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().consideredJobs }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().fairShare }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().adjustedFairShare }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().actualShare }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().fairnessError }, []string{"pool1"}) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().demand }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().cappedDemand }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().gangsConsidered }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().gangsScheduled }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { + return m.latestCycleMetrics.Load().firstGangQueuePosition + }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { + return m.latestCycleMetrics.Load().lastGangQueuePosition + }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().perQueueCycleTime }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().loopNumber }, poolLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().evictedJobs }, poolQueueLabelValues) + testResetGauge(func(metrics *cycleMetrics) *prometheus.GaugeVec { return m.latestCycleMetrics.Load().evictedResources }, poolQueueResourceLabelValues) } func TestDisableLeaderMetrics(t *testing.T) { @@ -133,23 +144,23 @@ func TestDisableLeaderMetrics(t *testing.T) { collect := func(m *cycleMetrics) []prometheus.Metric { m.scheduledJobs.WithLabelValues(queuePriorityClassLabelValues...).Inc() m.premptedJobs.WithLabelValues(queuePriorityClassLabelValues...).Inc() - m.consideredJobs.WithLabelValues(poolQueueLabelValues...).Inc() - m.fairShare.WithLabelValues(poolQueueLabelValues...).Inc() - m.adjustedFairShare.WithLabelValues(poolQueueLabelValues...).Inc() - m.actualShare.WithLabelValues(poolQueueLabelValues...).Inc() - m.fairnessError.WithLabelValues("pool1").Inc() - m.demand.WithLabelValues(poolQueueLabelValues...).Inc() - m.cappedDemand.WithLabelValues(poolQueueLabelValues...).Inc() + m.latestCycleMetrics.Load().consideredJobs.WithLabelValues(poolQueueLabelValues...).Inc() + m.latestCycleMetrics.Load().fairShare.WithLabelValues(poolQueueLabelValues...).Inc() + m.latestCycleMetrics.Load().adjustedFairShare.WithLabelValues(poolQueueLabelValues...).Inc() + m.latestCycleMetrics.Load().actualShare.WithLabelValues(poolQueueLabelValues...).Inc() + m.latestCycleMetrics.Load().fairnessError.WithLabelValues("pool1").Inc() + m.latestCycleMetrics.Load().demand.WithLabelValues(poolQueueLabelValues...).Inc() + m.latestCycleMetrics.Load().cappedDemand.WithLabelValues(poolQueueLabelValues...).Inc() m.scheduleCycleTime.Observe(float64(1000)) m.reconciliationCycleTime.Observe(float64(1000)) - m.gangsConsidered.WithLabelValues("pool1", "queue1").Inc() - m.gangsScheduled.WithLabelValues("pool1", "queue1").Inc() - m.firstGangQueuePosition.WithLabelValues("pool1", "queue1").Inc() - m.lastGangQueuePosition.WithLabelValues("pool1", "queue1").Inc() - m.perQueueCycleTime.WithLabelValues("pool1", "queue1").Inc() - m.loopNumber.WithLabelValues("pool1").Inc() - m.evictedJobs.WithLabelValues("pool1", "queue1").Inc() - m.evictedResources.WithLabelValues("pool1", "queue1", "cpu").Inc() + m.latestCycleMetrics.Load().gangsConsidered.WithLabelValues("pool1", "queue1").Inc() + m.latestCycleMetrics.Load().gangsScheduled.WithLabelValues("pool1", "queue1").Inc() + m.latestCycleMetrics.Load().firstGangQueuePosition.WithLabelValues("pool1", "queue1").Inc() + m.latestCycleMetrics.Load().lastGangQueuePosition.WithLabelValues("pool1", "queue1").Inc() + m.latestCycleMetrics.Load().perQueueCycleTime.WithLabelValues("pool1", "queue1").Inc() + m.latestCycleMetrics.Load().loopNumber.WithLabelValues("pool1").Inc() + m.latestCycleMetrics.Load().evictedJobs.WithLabelValues("pool1", "queue1").Inc() + m.latestCycleMetrics.Load().evictedResources.WithLabelValues("pool1", "queue1", "cpu").Inc() ch := make(chan prometheus.Metric, 1000) m.collect(ch)