-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: testing idea to wrap coscheduling
This is the "skeleton" of a new idea to wrap coscheduling, adding in the logic for fluence only where it is needed, likely in the PodGroup (in the new fluence/core/core that wraps the same in coscheduling). This is just a skeleton because we are deploying the sidecar with the wrapped scheduling and absolutely no logic ported over to AskFlux. I think I have a sense of where to put this, but wanted to save this vanilla/skeleton state in case we need to go back to it. Note that it did not work to have fluence inherit the functions from coscheduler, so I opted for a strategy of adding it as a helper field, and then just using it when necessary. Signed-off-by: vsoch <[email protected]>
- Loading branch information
Showing
15 changed files
with
556 additions
and
568 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,10 +58,8 @@ type PodGroupReconciler struct { | |
|
||
// Reconcile is part of the main kubernetes reconciliation loop which aims to | ||
// move the current state of the cluster closer to the desired state. | ||
// TODO(user): Modify the Reconcile function to compare the state specified by | ||
// the PodGroup object against the actual cluster state, and then | ||
// perform operations to make the cluster state reflect the state specified by | ||
// the user. | ||
// Note that we currently don't do deletion based on owner references, but that | ||
// would be ideal (I could not get it to work) | ||
// | ||
// For more details, check Reconcile and its Result here: | ||
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile | ||
|
@@ -82,6 +80,7 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c | |
log.Error(err, fmt.Sprintf("Unable to retrieve pod group %s", req.NamespacedName)) | ||
return ctrl.Result{}, err | ||
} | ||
log.Info("REFERENCES", "Reconciler", pg.ObjectMeta.OwnerReferences) | ||
|
||
// Grab all statuses (and groups of them) we are interested in | ||
schedulingOrPending := (pg.Status.Phase == schedv1alpha1.PodGroupScheduling || pg.Status.Phase == schedv1alpha1.PodGroupPending) | ||
|
@@ -175,35 +174,32 @@ func (r *PodGroupReconciler) updateStatus( | |
pods []v1.Pod, | ||
) (ctrl.Result, error) { | ||
|
||
log := log.FromContext(ctx) | ||
patch := client.MergeFrom(pg.DeepCopy()) | ||
log.Info("PodGroup", "Phase", pg.Status.Phase) | ||
|
||
switch pg.Status.Phase { | ||
case "": | ||
pg.Status.Phase = schedv1alpha1.PodGroupPending | ||
result, err := r.updateOwnerReferences(ctx, pg, pods) | ||
if result.Requeue || err != nil { | ||
return result, err | ||
} | ||
|
||
case schedv1alpha1.PodGroupPending: | ||
if len(pods) >= int(pg.Spec.MinMember) { | ||
log.Info("PodGroup", "Phase", "Scheduling") | ||
pg.Status.Phase = schedv1alpha1.PodGroupScheduling | ||
result, err := r.updateOwnerReferences(ctx, pg, pods) | ||
if result.Requeue || err != nil { | ||
return result, err | ||
} | ||
} | ||
default: | ||
|
||
// Get updated counts of running, succeeded, and failed pods | ||
running, succeeded, failed := getCurrentPodStats(pods) | ||
|
||
// If for some reason we weren't pending and now have fewer than min required, flip back to pending | ||
if len(pods) < int(pg.Spec.MinMember) { | ||
log.Info("PodGroup", "Phase", "Length of pods less than min member, pending") | ||
pg.Status.Phase = schedv1alpha1.PodGroupPending | ||
break | ||
} | ||
|
||
// Get updated counts of running, succeeded, and failed pods | ||
running, succeeded, failed := getCurrentPodStats(pods) | ||
log.Info("PodGroup", "Running", running, "Succeeded", succeeded, "Failed", failed) | ||
|
||
// A pod with succeeded + running STILL less than the minimum required is scheduling | ||
if succeeded+running < pg.Spec.MinMember { | ||
pg.Status.Phase = schedv1alpha1.PodGroupScheduling | ||
|
@@ -232,16 +228,18 @@ func (r *PodGroupReconciler) updateStatus( | |
} | ||
|
||
// Apply the patch to update, or delete if finished | ||
// TODO would be better if owner references took here, so delete on owner deletion | ||
// TODO deletion is not currently handled for Deployment, ReplicaSet, StatefulSet | ||
// as they are expected to persist. You can delete / lose and bring up again | ||
var err error | ||
if pg.Status.Phase == schedv1alpha1.PodGroupFinished || pg.Status.Phase == schedv1alpha1.PodGroupFailed { | ||
log.Info("PodGroup", "Status", "Finished", "Owners", pg.OwnerReferences) | ||
|
||
// Delete the group if it is finished or failed | ||
err = r.Delete(ctx, pg) | ||
} else { | ||
r.Status().Update(ctx, pg) | ||
err = r.Patch(ctx, pg, patch) | ||
// Update but don't requeue | ||
// _, err := r.updateOwnerReferences(ctx, pg, pods) | ||
return ctrl.Result{}, err | ||
} | ||
r.Status().Update(ctx, pg) | ||
err = r.Patch(ctx, pg, patch) | ||
return ctrl.Result{Requeue: true}, err | ||
} | ||
|
||
|
@@ -366,21 +364,25 @@ func (r *PodGroupReconciler) updateOwnerReferences( | |
return result, nil | ||
} | ||
|
||
// Collect owner references for pod group | ||
// Collect current owner references for pod group, | ||
// We want to ensure we add unique ones across the pod | ||
owners := []metav1.OwnerReference{} | ||
var refs []string | ||
for _, ownerRef := range pod.OwnerReferences { | ||
refs = append(refs, fmt.Sprintf("%s/%s", pod.Namespace, ownerRef.Name)) | ||
owners = append(owners, ownerRef) | ||
} | ||
|
||
patch := client.MergeFrom(pg.DeepCopy()) | ||
if len(refs) != 0 { | ||
sort.Strings(refs) | ||
pg.Status.OccupiedBy = strings.Join(refs, ",") | ||
} | ||
// If we have owners, collapose into list | ||
if len(owners) > 0 { | ||
pg.ObjectMeta.OwnerReferences = owners | ||
} | ||
|
||
// Apply the patch to update the size | ||
r.Status().Update(ctx, pg) | ||
err := r.Patch(ctx, pg, patch) | ||
|
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.