Skip to content

Commit

Permalink
Per Queue Pricing Metrics (armadaproject#4072)
Browse files Browse the repository at this point in the history
* per-queue pricing metrics

* lint
  • Loading branch information
d80tb7 authored Dec 3, 2024
1 parent 457db05 commit 7a76f24
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 2 deletions.
9 changes: 9 additions & 0 deletions internal/common/metrics/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ type QueueMetrics struct {
PriorityClass string
Resources ResourceMetrics
Durations *FloatMetrics
BidPrices *FloatMetrics
}

type QueueMetricsRecorder struct {
Pool string
PriorityClass string
resourceRecorder *ResourceMetricsRecorder
durationRecorder *FloatMetricsRecorder
bidPriceRecorder *FloatMetricsRecorder
}

type JobMetricsRecorder struct {
Expand All @@ -46,6 +48,11 @@ func (r *JobMetricsRecorder) RecordResources(pool string, priorityClass string,
recorder.resourceRecorder.Record(resources)
}

func (r *JobMetricsRecorder) RecordBidPrice(pool string, priorityClass string, price float64) {
recorder := r.getOrCreateRecorder(pool, priorityClass)
recorder.bidPriceRecorder.Record(price)
}

func (r *JobMetricsRecorder) Metrics() []*QueueMetrics {
result := make([]*QueueMetrics, 0, len(r.recordersByPoolAndPriorityClass))
for _, v := range r.recordersByPoolAndPriorityClass {
Expand All @@ -54,6 +61,7 @@ func (r *JobMetricsRecorder) Metrics() []*QueueMetrics {
PriorityClass: v.PriorityClass,
Resources: v.resourceRecorder.GetMetrics(),
Durations: v.durationRecorder.GetMetrics(),
BidPrices: v.bidPriceRecorder.GetMetrics(),
})
}
return result
Expand All @@ -68,6 +76,7 @@ func (r *JobMetricsRecorder) getOrCreateRecorder(pool string, pritorityClass str
PriorityClass: pritorityClass,
resourceRecorder: NewResourceMetricsRecorder(),
durationRecorder: NewDefaultJobDurationMetricsRecorder(),
bidPriceRecorder: NewFloatMetricsRecorder(),
}
r.recordersByPoolAndPriorityClass[recorderKey] = qmr
}
Expand Down
74 changes: 74 additions & 0 deletions internal/common/metrics/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,48 @@ var QueueDistinctSchedulingKeysDesc = prometheus.NewDesc(
nil,
)

var MinQueuePriceQueuedDesc = prometheus.NewDesc(
MetricPrefix+"queue_price_queued_min",
"Minimum price of queued jobs",
[]string{"pool", "priorityClass", "queue"},
nil,
)

var MaxQueuePriceQueuedDesc = prometheus.NewDesc(
MetricPrefix+"queue_price_queued_max",
"Maximum price of queued jobs",
[]string{"pool", "priorityClass", "queue"},
nil,
)

var MedianQueuePriceQueuedDesc = prometheus.NewDesc(
MetricPrefix+"queue_price_queued_median",
"Median price of queued jobs",
[]string{"pool", "priorityClass", "queue"},
nil,
)

var MinQueuePriceRunningDesc = prometheus.NewDesc(
MetricPrefix+"queue_price_running_min",
"Minimum price of running jobs",
[]string{"pool", "priorityClass", "queue"},
nil,
)

var MaxQueuePriceRunningDesc = prometheus.NewDesc(
MetricPrefix+"queue_price_running_max",
"Maximum price of running jobs",
[]string{"pool", "priorityClass", "queue"},
nil,
)

var MedianQueuePriceRunningDesc = prometheus.NewDesc(
MetricPrefix+"queue_price_running_median",
"Median price of running jobs",
[]string{"pool", "priorityClass", "queue"},
nil,
)

var QueueResourcesDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_queued",
"Resource required by queued jobs",
Expand Down Expand Up @@ -252,6 +294,10 @@ func CollectQueueMetrics(queueCounts map[string]int, queueDistinctSchedulingKeyC
metrics = append(metrics, NewMedianQueueDuration(queueDurations.GetMedian(), m.Pool, m.PriorityClass, q))
}

metrics = append(metrics, NewMinQueuePriceQueuedMetric(m.BidPrices.GetMin(), m.Pool, m.PriorityClass, q))
metrics = append(metrics, NewMaxQueuePriceQueuedMetric(m.BidPrices.GetMax(), m.Pool, m.PriorityClass, q))
metrics = append(metrics, NewMedianQueuePriceQueuedMetric(m.BidPrices.GetMedian(), m.Pool, m.PriorityClass, q))

// Sort the keys so we get a predictable output order
resources := maps.Keys(m.Resources)
slices.Sort(resources)
Expand All @@ -277,6 +323,10 @@ func CollectQueueMetrics(queueCounts map[string]int, queueDistinctSchedulingKeyC
metrics = append(metrics, NewMedianJobRunDuration(runningJobDurations.GetMedian(), m.Pool, m.PriorityClass, q))
}

metrics = append(metrics, NewMinQueuePriceRunningMetric(m.BidPrices.GetMin(), m.Pool, m.PriorityClass, q))
metrics = append(metrics, NewMaxQueuePriceRunningMetric(m.BidPrices.GetMax(), m.Pool, m.PriorityClass, q))
metrics = append(metrics, NewMedianQueuePriceRunningMetric(m.BidPrices.GetMedian(), m.Pool, m.PriorityClass, q))

// Sort the keys so we get a predicatable output order
resources := maps.Keys(m.Resources)
slices.Sort(resources)
Expand Down Expand Up @@ -398,6 +448,30 @@ func NewQueuePriorityMetric(value float64, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueuePriorityDesc, prometheus.GaugeValue, value, queue, queue)
}

func NewMinQueuePriceQueuedMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric {
return prometheus.MustNewConstMetric(MinQueuePriceQueuedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
}

func NewMaxQueuePriceQueuedMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric {
return prometheus.MustNewConstMetric(MaxQueuePriceQueuedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
}

func NewMedianQueuePriceQueuedMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric {
return prometheus.MustNewConstMetric(MedianQueuePriceQueuedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
}

func NewMinQueuePriceRunningMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric {
return prometheus.MustNewConstMetric(MinQueuePriceRunningDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
}

func NewMaxQueuePriceRunningMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric {
return prometheus.MustNewConstMetric(MaxQueuePriceRunningDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
}

func NewMedianQueuePriceRunningMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric {
return prometheus.MustNewConstMetric(MedianQueuePriceRunningDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
}

func NewQueueLabelsMetric(queue string, labels map[string]string) prometheus.Metric {
metricLabels := make([]string, 0, len(labels)+len(queueLabelDefaultLabels))
values := make([]string, 0, len(labels)+len(queueLabelDefaultLabels))
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro
for _, pool := range pools {
recorder.RecordJobRuntime(pool, priorityClass, timeInState)
recorder.RecordResources(pool, priorityClass, jobResources)
recorder.RecordBidPrice(pool, priorityClass, job.BidPrice())
}
}

Expand Down
13 changes: 11 additions & 2 deletions internal/scheduler/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
runningJobs := make([]*jobdb.Job, 3)
for i := 0; i < len(queuedJobs); i++ {
startTime := testfixtures.BaseTime.Add(-time.Duration(100*i) * time.Second).UnixNano()
queuedJobs[i] = testfixtures.TestQueuedJobDbJob().WithCreated(startTime)
runningJobs[i] = testfixtures.TestRunningJobDbJob(startTime)
queuedJobs[i] = testfixtures.TestQueuedJobDbJob().WithCreated(startTime).WithBidPrice(float64(i))
runningJobs[i] = testfixtures.TestRunningJobDbJob(startTime).WithBidPrice(float64(i) + 100)
}

// Run that has been returned
Expand Down Expand Up @@ -64,6 +64,9 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
commonmetrics.NewMinQueueDuration(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMaxQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMedianQueueDuration(100, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMinQueuePriceQueuedMetric(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMaxQueuePriceQueuedMetric(2, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMedianQueuePriceQueuedMetric(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewQueueResources(3, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
commonmetrics.NewMinQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
commonmetrics.NewMaxQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
Expand Down Expand Up @@ -92,6 +95,9 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
commonmetrics.NewMinQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMaxQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMedianQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMinQueuePriceQueuedMetric(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMaxQueuePriceQueuedMetric(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMedianQueuePriceQueuedMetric(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
commonmetrics.NewMinQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
commonmetrics.NewMaxQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
Expand All @@ -116,6 +122,9 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
commonmetrics.NewMinJobRunDuration(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMaxJobRunDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMedianJobRunDuration(100, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMinQueuePriceRunningMetric(100, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMaxQueuePriceRunningMetric(102, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMedianQueuePriceRunningMetric(101, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMinQueueAllocated(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
commonmetrics.NewMaxQueueAllocated(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
commonmetrics.NewMedianQueueAllocated(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
Expand Down

0 comments on commit 7a76f24

Please sign in to comment.