Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): add partial scheduling based on min replicas #6221

Open
wants to merge 12 commits into
base: v2
Choose a base branch
from
74 changes: 52 additions & 22 deletions scheduler/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func DefaultSchedulerConfig(store store.ModelStore) SchedulerConfig {
return SchedulerConfig{
serverFilters: []filters.ServerFilter{filters.ServerReplicaFilter{}, filters.SharingServerFilter{}, filters.DeletedServerFilter{}, filters.ServerRequirementFilter{}},
replicaFilters: []filters.ReplicaFilter{filters.AvailableMemoryReplicaFilter{}, filters.ExplainerFilter{}, filters.ReplicaDrainingFilter{}},
serverSorts: []sorters.ServerSorter{},
serverSorts: []sorters.ServerSorter{sorters.ModelAlreadyLoadedOnServerSorter{}},
replicaSorts: []sorters.ReplicaSorter{sorters.ReplicaIndexSorter{}, sorters.AvailableMemorySorter{}, sorters.ModelAlreadyLoadedSorter{}},
}
}
Expand Down Expand Up @@ -160,13 +160,51 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
return errors.New(msg)
}

desiredReplicas := latestModel.DesiredReplicas()
minReplicas := latestModel.GetDeploymentSpec().GetMinReplicas()

s.sortServers(latestModel, filteredServers)
logger.
WithField("candidate_servers", filteredServers).
WithField("desired_replicas", latestModel.DesiredReplicas()).
WithField("desired_replicas", desiredReplicas).
Debug("Identified candidate servers for model")

// The main logic of trying to find a server for the model is as follows:
// 1. If there are enough replicas on a server, schedule the model
// 2. If there are not enough replicas on a server, try to schedule with min replicas. In this case we actually should get
// the models loaded on all the replicas of the servers (assuming min replicas is less than the number of replicas on the server)
// we also mark the model in this case as failed to schedule so that if the infra changes in the future we can try to reschedule

// For each server filter and sort replicas and attempt schedule if enough replicas
ok := s.findAndUpdateToServers(filteredServers, latestModel, desiredReplicas, desiredReplicas)
// Try to scheduler with min replicas if not enough replicas
if !ok {
if minReplicas > 0 {
sakoush marked this conversation as resolved.
Show resolved Hide resolved
if minOk := s.findAndUpdateToServers(filteredServers, latestModel, desiredReplicas, int(minReplicas)); minOk {
logger.Debug("Managed to scheduled model with min replicas")
}
}
}

if !ok {
msg := "Failed to schedule model as no matching server had enough suitable replicas"
logger.Debug(msg)
// we do not want to reset the server if it has live replicas or loading replicas
// in the case of loading replicas, we need to make sure that we can unload them later.
// for example in the case that a model is just marked as loading on a particular server replica
// then it gets a delete request (before it is marked as loaded or available) we need to make sure
// that we can unload it from the server
s.store.FailedScheduling(latestModel, msg, !latestModel.HasLiveReplicas() && !latestModel.IsLoadingOrLoadedOnServer())
return errors.New(msg)
}

//TODO Cleanup previous version if needed?
return nil
}

func (s *SimpleScheduler) findAndUpdateToServers(filteredServers []*store.ServerSnapshot, latestModel *store.ModelVersion, desiredReplicas, minReplicas int) bool {
modelName := latestModel.GetMeta().GetName()
logger := s.logger.WithField("func", "findAndUpdateToServers").WithField("model", modelName)
ok := false
for _, candidateServer := range filteredServers {
logger.WithField("server", candidateServer.Name).Debug("Checking compatibility with candidate server")
Expand All @@ -176,23 +214,29 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
// without the store being reflected and hence sorting on stale values
s.muSortAndUpdate.Lock()
candidateReplicas = s.filterReplicas(latestModel, candidateServer)
if len(candidateReplicas.ChosenReplicas) < latestModel.DesiredReplicas() {
numServerReplicas := len(candidateReplicas.ChosenReplicas)
if numServerReplicas < minReplicas {
logger.
WithField("server", candidateServer.Name).
WithField("available_replicas", len(candidateReplicas.ChosenReplicas)).
WithField("desired_replicas", latestModel.DesiredReplicas()).
WithField("available_replicas", numServerReplicas).
WithField("desired_replicas", desiredReplicas).
WithField("min_replicas", minReplicas).
Debug("Skipping server due to insufficient suitable replicas")

s.muSortAndUpdate.Unlock()
continue
}

s.sortReplicas(candidateReplicas)
err = s.store.UpdateLoadedModels(
numReplicas := minReplicas
if minReplicas != desiredReplicas {
numReplicas = min(numServerReplicas, desiredReplicas) // we have more replicas for the server than min, so we can use all of them
}
err := s.store.UpdateLoadedModels(
modelName,
latestModel.GetVersion(),
candidateServer.Name,
candidateReplicas.ChosenReplicas[0:latestModel.DesiredReplicas()],
candidateReplicas.ChosenReplicas[0:numReplicas],
)
s.muSortAndUpdate.Unlock()

Expand All @@ -204,21 +248,7 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
break
}
}

if !ok {
msg := "Failed to schedule model as no matching server had enough suitable replicas"
logger.Debug(msg)
// we do not want to reset the server if it has live replicas or loading replicas
// in the case of loading replicas, we need to make sure that we can unload them later.
// for example in the case that a model is just marked as loading on a particular server replica
// then it gets a delete request (before it is marked as loaded or available) we need to make sure
// that we can unload it from the server
s.store.FailedScheduling(latestModel, msg, !latestModel.HasLiveReplicas() && !latestModel.IsLoadingOrLoadedOnServer())
return errors.New(msg)
}

//TODO Cleanup previous version if needed?
return nil
return ok
}

func showServerSlice(servers []*store.ServerSnapshot) string {
Expand Down
86 changes: 66 additions & 20 deletions scheduler/pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func TestScheduler(t *testing.T) {
logger := log.New()
g := NewGomegaWithT(t)

newTestModel := func(name string, requiredMemory uint64, requirements []string, server *string, replicas uint32, loadedModels []int, deleted bool, scheduledServer string, drainedModels []int) *store.ModelSnapshot {
config := &pb.Model{ModelSpec: &pb.ModelSpec{MemoryBytes: &requiredMemory, Requirements: requirements, Server: server}, DeploymentSpec: &pb.DeploymentSpec{Replicas: replicas}}
newTestModel := func(name string, requiredMemory uint64, requirements []string, server *string, replicas, minReplicas uint32, loadedModels []int, deleted bool, scheduledServer string, drainedModels []int) *store.ModelSnapshot {
config := &pb.Model{ModelSpec: &pb.ModelSpec{MemoryBytes: &requiredMemory, Requirements: requirements, Server: server}, DeploymentSpec: &pb.DeploymentSpec{Replicas: replicas, MinReplicas: minReplicas}}
rmap := make(map[int]store.ReplicaStatus)
for _, ridx := range loadedModels {
rmap[ridx] = store.ReplicaStatus{State: store.Loaded}
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestScheduler(t *testing.T) {
tests := []test{
{
name: "SmokeTest",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{}, false, "", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{}, false, "", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -177,7 +177,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "ReplicasTwo",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{}, false, "", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{}, false, "", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -201,7 +201,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "NotEnoughReplicas",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{}, false, "", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{}, false, "", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -221,9 +221,33 @@ func TestScheduler(t *testing.T) {
},
scheduled: false,
},
{
name: "NotEnoughReplicas - schedule min replicas",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 3, 2, []int{}, false, "", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Replicas: map[int]*store.ServerReplica{0: gsr(0, 200, []string{"sklearn"}, "server1", true, false)},
Shared: true,
ExpectedReplicas: -1,
},
{
Name: "server2",
Replicas: map[int]*store.ServerReplica{
0: gsr(0, 200, []string{"sklearn"}, "server2", true, false), // expect schedule here
1: gsr(1, 200, []string{"sklearn"}, "server2", true, false), // expect schedule here
},
Shared: true,
ExpectedReplicas: -1,
},
},
scheduled: false, // not here that we still mark the model as scheduleFailed
scheduledServer: "server2",
scheduledReplicas: []int{0, 1},
},
{
name: "MemoryOneServer",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{}, false, "", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{}, false, "", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -246,7 +270,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "ModelsLoaded",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{1}, false, "", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{1}, false, "", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -270,7 +294,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "ModelUnLoaded",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{1}, true, "server2", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{1}, true, "server2", nil),
servers: []*store.ServerSnapshot{
{
Name: "server2",
Expand All @@ -288,7 +312,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "DeletedServer",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{}, false, "", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{}, false, "", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -312,7 +336,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "Reschedule",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{0}, false, "server1", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{0}, false, "server1", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -336,7 +360,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "DeletedServerFail",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{1}, false, "", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{1}, false, "", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -349,7 +373,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "Available memory sorting",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{1}, false, "", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{1}, false, "", nil),
servers: []*store.ServerSnapshot{
{
Name: "server2",
Expand All @@ -367,7 +391,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "Available memory sorting with multiple replicas",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{1}, false, "", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{1}, false, "", nil),
servers: []*store.ServerSnapshot{
{
Name: "server2",
Expand All @@ -386,7 +410,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "Scale up",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 3, []int{1, 2}, false, "server1", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 3, 0, []int{1, 2}, false, "server1", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -406,7 +430,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "Scale down",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{1, 2}, false, "server1", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{1, 2}, false, "server1", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -424,9 +448,29 @@ func TestScheduler(t *testing.T) {
scheduledServer: "server1",
scheduledReplicas: []int{1},
},
{
name: "Scale up - not enough replicas use max of the server",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 5, 3, []int{1, 2}, false, "server1", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Replicas: map[int]*store.ServerReplica{
0: gsr(0, 100, []string{"sklearn"}, "server1", true, false), // expect schedule here
1: gsr(1, 100, []string{"sklearn"}, "server1", true, false), // expect schedule here - nop
2: gsr(2, 100, []string{"sklearn"}, "server1", true, false), // expect schedule here - nop
3: gsr(3, 100, []string{"sklearn"}, "server1", true, false), // expect schedule here
},
Shared: true,
ExpectedReplicas: -1,
},
},
scheduled: false,
scheduledServer: "server1",
scheduledReplicas: []int{0, 1, 2, 3}, // used all replicas
},
{
name: "Scale up - no capacity on loaded replica servers, should still go there",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 3, []int{1, 2}, false, "server1", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 3, 0, []int{1, 2}, false, "server1", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -446,7 +490,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "Scale down - no capacity on loaded replica servers, should still go there",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{1, 2}, false, "server1", nil),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{1, 2}, false, "server1", nil),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand All @@ -466,7 +510,7 @@ func TestScheduler(t *testing.T) {
},
{
name: "Drain",
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{1}, false, "server1", []int{2}),
model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{1}, false, "server1", []int{2}),
servers: []*store.ServerSnapshot{
{
Name: "server1",
Expand Down Expand Up @@ -502,12 +546,14 @@ func TestScheduler(t *testing.T) {
err := scheduler.Schedule(test.model.Name)
if test.scheduled {
g.Expect(err).To(BeNil())
} else {
g.Expect(err).ToNot(BeNil())
}
if test.scheduledServer != "" {
g.Expect(test.scheduledServer).To(Equal(mockStore.scheduledServer))
sort.Ints(test.scheduledReplicas)
sort.Ints(mockStore.scheduledReplicas)
g.Expect(test.scheduledReplicas).To(Equal(mockStore.scheduledReplicas))
} else {
g.Expect(err).ToNot(BeNil())
}
})
}
Expand Down
12 changes: 12 additions & 0 deletions scheduler/pkg/scheduler/sorters/loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,15 @@ func (m ModelAlreadyLoadedSorter) IsLess(i *CandidateReplica, j *CandidateReplic
jIsLoading := j.Model.IsLoadingOrLoaded(j.Server.Name, j.Replica.GetReplicaIdx())
return iIsLoading && !jIsLoading
}

// This sorter favours servers that have the models already loaded on them, this is useful to minimise ping-pong of models between servers
// which can be expensive in terms of model loading time.
type ModelAlreadyLoadedOnServerSorter struct{}

func (m ModelAlreadyLoadedOnServerSorter) Name() string {
return "ModelAlreadyLoadedOnServerSorter"
}

func (m ModelAlreadyLoadedOnServerSorter) IsLess(i *CandidateServer, j *CandidateServer) bool {
return i.Model.Server() == i.Server.Name
}
Loading
Loading