Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC of Market Based Pricing #4070

Merged
merged 22 commits into from
Dec 2, 2024
4 changes: 2 additions & 2 deletions .run/Executor.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<env name="ARMADA_EXECUTORAPICONNECTION_FORCENOTLS" value="true" />
<env name="ARMADA_HTTPPORT" value="8085" />
<env name="HOME" value="$USER_HOME$/" />
<env name="KUBECONFIG" value="$PROJECT_DIR$/.kube/internal/config" />
<env name="KUBECONFIG" value="$PROJECT_DIR$/.kube/external/config" />
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional. The run config for the executor is incorrect as internal will attempt to connect via the kind network, which won't work from Goland. external will attempt to connect on loclahost which is correct.

</envs>
<pass_parent_env value="false" />
<kind value="FILE" />
Expand All @@ -17,4 +17,4 @@
<filePath value="$PROJECT_DIR$/cmd/executor/main.go" />
<method v="2" />
</configuration>
</component>
</component>
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,4 @@ For more information about contributing to Armada see [CONTRIBUTING.md](https://
## Discussion

If you are interested in discussing Armada you can find us on [![slack](https://img.shields.io/badge/slack-armada-brightgreen.svg?logo=slack)](https://cloud-native.slack.com/?redir=%2Farchives%2FC03T9CBCEMC)

3 changes: 3 additions & 0 deletions e2e/setup/kind.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ apiVersion: kind.x-k8s.io/v1alpha4
name: armada-test
featureGates:
"KubeletInUserNamespace": true
networking:
apiServerAddress: 0.0.0.0
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional. WIthout this the kubeconfig produced by kind will list the address as 0.0.0.0 but a cert that is valid for 127.0.0.1. Settting the apiServerAddress explicitly ensures that kubeconfig alligns with cert.

nodes:
- role: worker
image: kindest/node:v1.26.15
Expand All @@ -28,3 +30,4 @@ nodes:
- containerPort: 6443 # control plane
hostPort: 6443 # exposes control plane on localhost:6443
protocol: TCP

15 changes: 11 additions & 4 deletions internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j
podSpecs = k8sPodSpecs
}

var priceInfo *api.ExperimentalPriceInfo
if e.ExperimentalPriceInfo != nil {
priceInfo = &api.ExperimentalPriceInfo{
BidPrice: e.ExperimentalPriceInfo.BidPrice,
}
}

return &api.Job{
Id: e.JobId,
ClientId: e.DeduplicationId,
Expand All @@ -170,10 +177,10 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j
Labels: e.ObjectMeta.Labels,
Annotations: e.ObjectMeta.Annotations,

K8SIngress: k8sIngresses,
K8SService: k8sServices,

Priority: float64(e.Priority),
K8SIngress: k8sIngresses,
K8SService: k8sServices,
ExperimentalPriceInfo: priceInfo,
Priority: float64(e.Priority),

PodSpec: podSpec,
PodSpecs: podSpecs,
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ type PoolConfig struct {
Name string `validate:"required"`
AwayPools []string
ProtectedFractionOfFairShare *float64
MarketDriven bool
}

func (sc *SchedulingConfig) GetProtectedFractionOfFairShare(poolName string) float64 {
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/database/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (r *PostgresJobRepository) FetchInitialJobs(ctx *armadacontext.Context) ([]
JobSet: row.JobSet,
Queue: row.Queue,
Priority: row.Priority,
BidPrice: row.BidPrice,
Submitted: row.Submitted,
Validated: row.Validated,
Queued: row.Queued,
Expand Down Expand Up @@ -228,6 +229,7 @@ func (r *PostgresJobRepository) FetchJobUpdates(ctx *armadacontext.Context, jobS
JobSet: row.JobSet,
Queue: row.Queue,
Priority: row.Priority,
BidPrice: row.BidPrice,
Submitted: row.Submitted,
Validated: row.Validated,
Queued: row.Queued,
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/database/migrations/018_add_price.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE jobs ADD COLUMN bid_price double precision NOT NULL DEFAULT 0;
1 change: 1 addition & 0 deletions internal/scheduler/database/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 18 additions & 8 deletions internal/scheduler/database/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/scheduler/database/query/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ SELECT * FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2;
SELECT job_id FROM jobs;

-- name: SelectInitialJobs :many
SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 AND cancelled = 'false' AND succeeded = 'false' and failed = 'false' ORDER BY serial LIMIT $2;
SELECT job_id, job_set, queue, priority, bid_price, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 AND cancelled = 'false' AND succeeded = 'false' and failed = 'false' ORDER BY serial LIMIT $2;

-- name: SelectUpdatedJobs :many
SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2;
SELECT job_id, job_set, queue, priority, bid_price, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2;

-- name: UpdateJobPriorityByJobSet :exec
UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 and cancelled = false and succeeded = false and failed = false;
Expand Down
68 changes: 66 additions & 2 deletions internal/scheduler/jobdb/comparison.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package jobdb

type (
JobPriorityComparer struct{}
JobIdHasher struct{}
JobPriorityComparer struct{}
MarketJobPriorityComparer struct{}
JobIdHasher struct{}
)

func (JobIdHasher) Hash(j *Job) uint32 {
Expand All @@ -21,6 +22,10 @@ func (JobPriorityComparer) Compare(job, other *Job) int {
return SchedulingOrderCompare(job, other)
}

func (MarketJobPriorityComparer) Compare(job, other *Job) int {
return MarketSchedulingOrderCompare(job, other)
}

// SchedulingOrderCompare defines the order in which jobs in a particular queue should be scheduled,
func (job *Job) SchedulingOrderCompare(other *Job) int {
// We need this cast for now to expose this method via an interface.
Expand Down Expand Up @@ -93,3 +98,62 @@ func SchedulingOrderCompare(job, other *Job) int {
}
panic("We should never get here. Since we check for job id equality at the top of this function.")
}

func MarketSchedulingOrderCompare(job, other *Job) int {
// Jobs with equal id are always considered equal.
// This ensures at most one job with a particular id can exist in the jobDb.
if job.id == other.id {
return 0
}

// Next we sort on bidPrice
if job.bidPrice > other.bidPrice {
return -1
} else if job.bidPrice < other.bidPrice {
return 1
}

// PriorityClassPriority indicates urgency.
// Hence, jobs of higher priorityClassPriority come first.
if job.priorityClass.Priority > other.priorityClass.Priority {
return -1
} else if job.priorityClass.Priority < other.priorityClass.Priority {
return 1
}

// Jobs higher in queue-priority come first.
if job.priority < other.priority {
return -1
} else if job.priority > other.priority {
return 1
}

// If both jobs are active, order by time since the job was scheduled.
// This ensures jobs that have been running for longer are rescheduled first,
// which reduces wasted compute time when preempting.
jobIsActive := job.activeRun != nil && !job.activeRun.InTerminalState()
otherIsActive := other.activeRun != nil && !other.activeRun.InTerminalState()
if jobIsActive && otherIsActive {
if job.activeRunTimestamp < other.activeRunTimestamp {
return -1
} else if job.activeRunTimestamp > other.activeRunTimestamp {
return 1
}
}

// Jobs that have been queuing for longer are scheduled first.
if job.submittedTime < other.submittedTime {
return -1
} else if job.submittedTime > other.submittedTime {
return 1
}

// Tie-break by jobId, which must be unique.
// This ensures there is a total order between jobs, i.e., no jobs are equal from an ordering point of view.
if job.id < other.id {
return -1
} else if job.id > other.id {
return 1
}
panic("We should never get here. Since we check for job id equality at the top of this function.")
}
17 changes: 17 additions & 0 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Job struct {
jobSet string
// Per-queue priority of this job.
priority uint32
// BidPrice the user is willing to pay to have this job scheduled
bidPrice float64
// Requested per queue priority of this job.
// This is used when syncing the postgres database with the scheduler-internal database.
requestedPriority uint32
Expand Down Expand Up @@ -300,6 +302,9 @@ func (job *Job) Equal(other *Job) bool {
if job.priority != other.priority {
return false
}
if job.bidPrice != other.bidPrice {
return false
}
if job.requestedPriority != other.requestedPriority {
return false
}
Expand Down Expand Up @@ -378,6 +383,11 @@ func (job *Job) Priority() uint32 {
return job.priority
}

// BidPrice returns the bidPrice of the job.
func (job *Job) BidPrice() float64 {
return job.bidPrice
}

// PriorityClass returns the priority class of the job.
func (job *Job) PriorityClass() types.PriorityClass {
return job.priorityClass
Expand Down Expand Up @@ -413,6 +423,13 @@ func (job *Job) WithPriority(priority uint32) *Job {
return j
}

// WithBidPrice returns a copy of the job with the bidPrice updated.
func (job *Job) WithBidPrice(price float64) *Job {
j := copyJob(*job)
d80tb7 marked this conversation as resolved.
Show resolved Hide resolved
j.bidPrice = price
return j
}

// WithPools returns a copy of the job with the pools updated.
func (job *Job) WithPools(pools []string) *Job {
j := copyJob(*job)
Expand Down
Loading