diff --git a/internal/common/metrics/domain.go b/internal/common/metrics/domain.go index 86b7bd80c83..138da763a95 100644 --- a/internal/common/metrics/domain.go +++ b/internal/common/metrics/domain.go @@ -19,6 +19,7 @@ type QueueMetrics struct { PriorityClass string Resources ResourceMetrics Durations *FloatMetrics + BidPrices *FloatMetrics } type QueueMetricsRecorder struct { @@ -26,6 +27,7 @@ type QueueMetricsRecorder struct { PriorityClass string resourceRecorder *ResourceMetricsRecorder durationRecorder *FloatMetricsRecorder + bidPriceRecorder *FloatMetricsRecorder } type JobMetricsRecorder struct { @@ -46,6 +48,11 @@ func (r *JobMetricsRecorder) RecordResources(pool string, priorityClass string, recorder.resourceRecorder.Record(resources) } +func (r *JobMetricsRecorder) RecordBidPrice(pool string, priorityClass string, price float64) { + recorder := r.getOrCreateRecorder(pool, priorityClass) + recorder.bidPriceRecorder.Record(price) +} + func (r *JobMetricsRecorder) Metrics() []*QueueMetrics { result := make([]*QueueMetrics, 0, len(r.recordersByPoolAndPriorityClass)) for _, v := range r.recordersByPoolAndPriorityClass { @@ -54,6 +61,7 @@ func (r *JobMetricsRecorder) Metrics() []*QueueMetrics { PriorityClass: v.PriorityClass, Resources: v.resourceRecorder.GetMetrics(), Durations: v.durationRecorder.GetMetrics(), + BidPrices: v.bidPriceRecorder.GetMetrics(), }) } return result @@ -68,6 +76,7 @@ func (r *JobMetricsRecorder) getOrCreateRecorder(pool string, pritorityClass str PriorityClass: pritorityClass, resourceRecorder: NewResourceMetricsRecorder(), durationRecorder: NewDefaultJobDurationMetricsRecorder(), + bidPriceRecorder: NewFloatMetricsRecorder(), } r.recordersByPoolAndPriorityClass[recorderKey] = qmr } diff --git a/internal/common/metrics/scheduler_metrics.go b/internal/common/metrics/scheduler_metrics.go index 80403bf412e..4b99dbc7e36 100644 --- a/internal/common/metrics/scheduler_metrics.go +++ b/internal/common/metrics/scheduler_metrics.go @@ -24,6 +24,48 @@ var QueueDistinctSchedulingKeysDesc = prometheus.NewDesc( nil, ) +var MinQueuePriceQueuedDesc = prometheus.NewDesc( + MetricPrefix+"queue_price_queued_min", + "Minimum price of queued jobs", + []string{"pool", "priorityClass", "queue"}, + nil, +) + +var MaxQueuePriceQueuedDesc = prometheus.NewDesc( + MetricPrefix+"queue_price_queued_max", + "Maximum price of queued jobs", + []string{"pool", "priorityClass", "queue"}, + nil, +) + +var MedianQueuePriceQueuedDesc = prometheus.NewDesc( + MetricPrefix+"queue_price_queued_median", + "Median price of queued jobs", + []string{"pool", "priorityClass", "queue"}, + nil, +) + +var MinQueuePriceRunningDesc = prometheus.NewDesc( + MetricPrefix+"queue_price_running_min", + "Minimum price of running jobs", + []string{"pool", "priorityClass", "queue"}, + nil, +) + +var MaxQueuePriceRunningDesc = prometheus.NewDesc( + MetricPrefix+"queue_price_running_max", + "Maximum price of running jobs", + []string{"pool", "priorityClass", "queue"}, + nil, +) + +var MedianQueuePriceRunningDesc = prometheus.NewDesc( + MetricPrefix+"queue_price_running_median", + "Median price of running jobs", + []string{"pool", "priorityClass", "queue"}, + nil, +) + var QueueResourcesDesc = prometheus.NewDesc( MetricPrefix+"queue_resource_queued", "Resource required by queued jobs", @@ -252,6 +294,10 @@ func CollectQueueMetrics(queueCounts map[string]int, queueDistinctSchedulingKeyC metrics = append(metrics, NewMedianQueueDuration(queueDurations.GetMedian(), m.Pool, m.PriorityClass, q)) } + metrics = append(metrics, NewMinQueuePriceQueuedMetric(m.BidPrices.GetMin(), m.Pool, m.PriorityClass, q)) + metrics = append(metrics, NewMaxQueuePriceQueuedMetric(m.BidPrices.GetMax(), m.Pool, m.PriorityClass, q)) + metrics = append(metrics, NewMedianQueuePriceQueuedMetric(m.BidPrices.GetMedian(), m.Pool, m.PriorityClass, q)) + // Sort the keys so we get a predictable output order resources := maps.Keys(m.Resources) slices.Sort(resources) @@ -277,6 +323,10 @@ func CollectQueueMetrics(queueCounts map[string]int, queueDistinctSchedulingKeyC metrics = append(metrics, NewMedianJobRunDuration(runningJobDurations.GetMedian(), m.Pool, m.PriorityClass, q)) } + metrics = append(metrics, NewMinQueuePriceRunningMetric(m.BidPrices.GetMin(), m.Pool, m.PriorityClass, q)) + metrics = append(metrics, NewMaxQueuePriceRunningMetric(m.BidPrices.GetMax(), m.Pool, m.PriorityClass, q)) + metrics = append(metrics, NewMedianQueuePriceRunningMetric(m.BidPrices.GetMedian(), m.Pool, m.PriorityClass, q)) + // Sort the keys so we get a predicatable output order resources := maps.Keys(m.Resources) slices.Sort(resources) @@ -398,6 +448,30 @@ func NewQueuePriorityMetric(value float64, queue string) prometheus.Metric { return prometheus.MustNewConstMetric(QueuePriorityDesc, prometheus.GaugeValue, value, queue, queue) } +func NewMinQueuePriceQueuedMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric { + return prometheus.MustNewConstMetric(MinQueuePriceQueuedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) +} + +func NewMaxQueuePriceQueuedMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric { + return prometheus.MustNewConstMetric(MaxQueuePriceQueuedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) +} + +func NewMedianQueuePriceQueuedMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric { + return prometheus.MustNewConstMetric(MedianQueuePriceQueuedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) +} + +func NewMinQueuePriceRunningMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric { + return prometheus.MustNewConstMetric(MinQueuePriceRunningDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) +} + +func NewMaxQueuePriceRunningMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric { + return prometheus.MustNewConstMetric(MaxQueuePriceRunningDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) +} + +func NewMedianQueuePriceRunningMetric(value float64, queue string, pool string, priorityClass string) prometheus.Metric { + return prometheus.MustNewConstMetric(MedianQueuePriceRunningDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) +} + func NewQueueLabelsMetric(queue string, labels map[string]string) prometheus.Metric { metricLabels := make([]string, 0, len(labels)+len(queueLabelDefaultLabels)) values := make([]string, 0, len(labels)+len(queueLabelDefaultLabels)) diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index af9d932fe64..8cc2f3f49d1 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -222,6 +222,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro for _, pool := range pools { recorder.RecordJobRuntime(pool, priorityClass, timeInState) recorder.RecordResources(pool, priorityClass, jobResources) + recorder.RecordBidPrice(pool, priorityClass, job.BidPrice()) } } diff --git a/internal/scheduler/metrics_test.go b/internal/scheduler/metrics_test.go index 772101890cc..c2f7d77c117 100644 --- a/internal/scheduler/metrics_test.go +++ b/internal/scheduler/metrics_test.go @@ -29,8 +29,8 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { runningJobs := make([]*jobdb.Job, 3) for i := 0; i < len(queuedJobs); i++ { startTime := testfixtures.BaseTime.Add(-time.Duration(100*i) * time.Second).UnixNano() - queuedJobs[i] = testfixtures.TestQueuedJobDbJob().WithCreated(startTime) - runningJobs[i] = testfixtures.TestRunningJobDbJob(startTime) + queuedJobs[i] = testfixtures.TestQueuedJobDbJob().WithCreated(startTime).WithBidPrice(float64(i)) + runningJobs[i] = testfixtures.TestRunningJobDbJob(startTime).WithBidPrice(float64(i) + 100) } // Run that has been returned @@ -64,6 +64,9 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { commonmetrics.NewMinQueueDuration(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), commonmetrics.NewMaxQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), commonmetrics.NewMedianQueueDuration(100, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMinQueuePriceQueuedMetric(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMaxQueuePriceQueuedMetric(2, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMedianQueuePriceQueuedMetric(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), commonmetrics.NewQueueResources(3, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), commonmetrics.NewMinQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), commonmetrics.NewMaxQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), @@ -92,6 +95,9 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { commonmetrics.NewMinQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), commonmetrics.NewMaxQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), commonmetrics.NewMedianQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMinQueuePriceQueuedMetric(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMaxQueuePriceQueuedMetric(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMedianQueuePriceQueuedMetric(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), commonmetrics.NewQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), commonmetrics.NewMinQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), commonmetrics.NewMaxQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), @@ -116,6 +122,9 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { commonmetrics.NewMinJobRunDuration(0, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), commonmetrics.NewMaxJobRunDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), commonmetrics.NewMedianJobRunDuration(100, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMinQueuePriceRunningMetric(100, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMaxQueuePriceRunningMetric(102, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMedianQueuePriceRunningMetric(101, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), commonmetrics.NewMinQueueAllocated(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), commonmetrics.NewMaxQueueAllocated(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), commonmetrics.NewMedianQueueAllocated(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),