Skip to content

Commit

Permalink
add spot price
Browse files Browse the repository at this point in the history
  • Loading branch information
d80tb7 committed Dec 1, 2024
1 parent 1c66a28 commit 5d996fb
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 0 deletions.
12 changes: 12 additions & 0 deletions internal/scheduler/metrics/cycle_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type cycleMetrics struct {
loopNumber *prometheus.GaugeVec
evictedJobs *prometheus.GaugeVec
evictedResources *prometheus.GaugeVec
spotPrice *prometheus.GaugeVec
allResettableMetrics []resettableMetric
}

Expand Down Expand Up @@ -193,6 +194,14 @@ func newCycleMetrics() *cycleMetrics {
poolQueueAndResourceLabels,
)

spotPrice := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "spot_price",
Help: "spop price for given pool",
},
poolLabels,
)

return &cycleMetrics{
leaderMetricsEnabled: true,
scheduledJobs: scheduledJobs,
Expand All @@ -213,6 +222,7 @@ func newCycleMetrics() *cycleMetrics {
loopNumber: loopNumber,
evictedJobs: evictedJobs,
evictedResources: evictedResources,
spotPrice: spotPrice,
allResettableMetrics: []resettableMetric{
scheduledJobs,
premptedJobs,
Expand All @@ -231,6 +241,7 @@ func newCycleMetrics() *cycleMetrics {
loopNumber,
evictedJobs,
evictedResources,
spotPrice,
},
reconciliationCycleTime: reconciliationCycleTime,
}
Expand Down Expand Up @@ -277,6 +288,7 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult)
m.cappedDemand.WithLabelValues(pool, queue).Set(cappedDemand)
}
m.fairnessError.WithLabelValues(pool).Set(schedContext.FairnessError())
m.spotPrice.WithLabelValues(pool).Set(schedContext.SpotPrice)
}

for _, jobCtx := range result.ScheduledJobs {
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/scheduling/context/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type SchedulingContext struct {
// Used to immediately reject new jobs with identical reqirements.
// Maps to the JobSchedulingContext of a previous job attempted to schedule with the same key.
UnfeasibleSchedulingKeys map[schedulerobjects.SchedulingKey]*JobSchedulingContext
SpotPrice float64
}

func NewSchedulingContext(
Expand Down
45 changes: 45 additions & 0 deletions internal/scheduler/scheduling/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduling

import (
"context"
"math"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -573,6 +574,13 @@ func (l *FairSchedulingAlgo) SchedulePool(
WithQueued(false).
WithNewRun(node.GetExecutor(), node.GetId(), node.GetName(), pool, priority)
}

if marketDriven {
fractionAllocated := fsctx.schedulingContext.FairnessCostProvider.UnweightedCostFromAllocation(fsctx.schedulingContext.Allocated)
price := l.calculateSpotPrice(maps.Keys(fsctx.nodeIdByJobId), result.ScheduledJobs, result.PreemptedJobs, fractionAllocated, fsctx.Txn)
fsctx.schedulingContext.SpotPrice = price
}

return result, fsctx.schedulingContext, nil
}

Expand Down Expand Up @@ -701,3 +709,40 @@ func (l *FairSchedulingAlgo) filterLaggingExecutors(
}
return activeExecutors
}

func (l *FairSchedulingAlgo) calculateSpotPrice(initialRunningJobIds []string, scheduledJobs, preemptedJobs []*schedulercontext.JobSchedulingContext, fractionAllocated float64, txn *jobdb.Txn) float64 {

// If we've allocated less that 95% of available resources then we don't charge.
// TODO: make this configurable
if fractionAllocated < 0.95 {
return 0.0
}

allRunningJobIds := make(map[string]bool, len(initialRunningJobIds))
for _, jobId := range initialRunningJobIds {
allRunningJobIds[jobId] = true
}

for _, scheduledJob := range scheduledJobs {
allRunningJobIds[scheduledJob.JobId] = true
}

for _, preemptedJob := range preemptedJobs {
delete(allRunningJobIds, preemptedJob.JobId)
}

// Find the minimum bid price among running jobs
var minPrice = math.MaxFloat64
for jobId := range allRunningJobIds {
job := txn.GetById(jobId)
if job != nil && job.BidPrice() < minPrice {
minPrice = job.BidPrice()
}
}

// Return the lowest bid price, or 0 if no valid price was found
if minPrice == math.MaxFloat64 {
return 0.0
}
return minPrice
}

0 comments on commit 5d996fb

Please sign in to comment.