diff --git a/pkg/engine/check_object_task.go b/pkg/engine/check_object_task.go index b2d4069..597b6f7 100644 --- a/pkg/engine/check_object_task.go +++ b/pkg/engine/check_object_task.go @@ -83,7 +83,7 @@ func (task *CheckObjTask) Exec(ctx context.Context) error { // TODO: add TweakListOptionsFunc for the CR factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(task.client, 0, info.Namespace, nil) - informer := factory.ForResource(info.GVR).Informer() + informer := factory.ForResource(info.GVR[task.Index]).Informer() done := make(chan struct{}) defer close(done) @@ -91,12 +91,12 @@ func (task *CheckObjTask) Exec(ctx context.Context) error { _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { resource := obj.(*unstructured.Unstructured) - log.V(4).Infof("Informer added %s %s", info.GVR.Resource, resource.GetName()) + log.V(4).Infof("Informer added %s %s", info.GVR[task.Index].Resource, resource.GetName()) task.checkStateAsync(ctx, resource.GetName(), info, nameMap, done) }, UpdateFunc: func(_, obj interface{}) { resource := obj.(*unstructured.Unstructured) - log.V(4).Infof("Informer updated %s %s", info.GVR.Resource, resource.GetName()) + log.V(4).Infof("Informer updated %s %s", info.GVR[task.Index].Resource, resource.GetName()) task.checkStateAsync(ctx, resource.GetName(), info, nameMap, done) }, }) @@ -112,10 +112,10 @@ func (task *CheckObjTask) Exec(ctx context.Context) error { log.V(4).Infof("Wait for completion with informers") select { case <-ctx.Done(): - log.Errorf("Validation failed for %s %v, err: %v", info.GVR.Resource, nameMap.Keys(), err) + log.Errorf("Validation failed for %s %v, err: %v", info.GVR[task.Index].Resource, nameMap.Keys(), err) err = ctx.Err() case <-done: - log.Infof("Validation passed for %s", info.GVR.Resource) + log.Infof("Validation passed for %s", info.GVR[task.Index].Resource) err = nil } } @@ -132,10 +132,10 @@ func (task *CheckObjTask) checkStates(ctx context.Context, info *ObjInfo, nameMa } if invalid := nameMap.Keys(); len(invalid) != 0 { - return fmt.Errorf("%s: failed to validate %s %v", task.ID(), info.GVR.Resource, nameMap.Keys()) + return fmt.Errorf("%s: failed to validate %s %v", task.ID(), info.GVR[task.Index].Resource, nameMap.Keys()) } - log.Infof("Validation passed for %s", info.GVR.Resource) + log.Infof("Validation passed for %s", info.GVR[task.Index].Resource) return nil } @@ -152,12 +152,13 @@ func (task *CheckObjTask) checkStateAsync(ctx context.Context, name string, info // checkState validates state conformance and removes object name from the map if succeeded func (task *CheckObjTask) checkState(ctx context.Context, name string, info *ObjInfo, nameMap *utils.SyncMap) error { - cr, err := task.client.Resource(info.GVR).Namespace(info.Namespace).Get(ctx, name, metav1.GetOptions{}) + gvr := info.GVR[task.Index] + cr, err := task.client.Resource(gvr).Namespace(info.Namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("%s: failed to get %s %s: %v", task.ID(), info.GVR.Resource, name, err) + return fmt.Errorf("%s: failed to get %s %s: %v", task.ID(), gvr.Resource, name, err) } if !utils.IsSubset(cr.Object, task.State) { - return fmt.Errorf("%s: state mismatch in %s %s", task.ID(), info.GVR.Resource, name) + return fmt.Errorf("%s: state mismatch in %s %s", task.ID(), gvr.Resource, name) } nameMap.Delete(name) diff --git a/pkg/engine/delete_object_task.go b/pkg/engine/delete_object_task.go index 38f0df8..e7a2bc0 100644 --- a/pkg/engine/delete_object_task.go +++ b/pkg/engine/delete_object_task.go @@ -86,16 +86,18 @@ func (task *DeleteObjTask) Exec(ctx context.Context) error { return err } - log.V(4).Infof("Deleting objects %s %v", info.GVR.String(), info.Names) + prop := v1.DeletePropagationBackground + opt := v1.DeleteOptions{ + PropagationPolicy: &prop, + } for _, name := range info.Names { - prop := v1.DeletePropagationBackground - opt := v1.DeleteOptions{ - PropagationPolicy: &prop, - } - err = task.client.Resource(info.GVR).Namespace(info.Namespace).Delete(ctx, name, opt) - if err != nil { - return err + for i := range info.GVR { + log.V(4).Infof("Deleting objects %s %v", info.GVR[i].String(), info.Names) + err = task.client.Resource(info.GVR[i]).Namespace(info.Namespace).Delete(ctx, name, opt) + if err != nil { + return err + } } } return nil diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 746c82e..07d4894 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -296,9 +296,11 @@ func (eng *Eng) DeleteAllObjects(ctx context.Context) { for _, objInfo := range eng.objInfoMap { ns := objInfo.Namespace for _, name := range objInfo.Names { - err := eng.dynamicClient.Resource(objInfo.GVR).Namespace(ns).Delete(ctx, name, deletions) - if err != nil { - log.Infof("Warning: cannot delete object %s: %v", name, err) + for i := range objInfo.GVR { + err := eng.dynamicClient.Resource(objInfo.GVR[i]).Namespace(ns).Delete(ctx, name, deletions) + if err != nil { + log.Infof("Warning: cannot delete object %s: %v", name, err) + } } } } diff --git a/pkg/engine/register_object_task.go b/pkg/engine/register_object_task.go index 30ba885..eb22e3b 100644 --- a/pkg/engine/register_object_task.go +++ b/pkg/engine/register_object_task.go @@ -21,16 +21,21 @@ import ( "context" "fmt" "os" + "regexp" "strings" "text/template" "gopkg.in/yaml.v3" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" + log "k8s.io/klog/v2" "github.com/NVIDIA/knavigator/pkg/config" ) +var reDelim *regexp.Regexp + type RegisterObjTask struct { BaseTask RegisterObjParams @@ -38,7 +43,11 @@ type RegisterObjTask struct { client *discovery.DiscoveryClient accessor ObjInfoAccessor - gvk schema.GroupVersionKind + gvk []schema.GroupVersionKind +} + +func init() { + reDelim = regexp.MustCompile(`(?m)^---$`) } // newRegisterObjTask initializes and returns RegisterObjTask @@ -83,35 +92,44 @@ func (task *RegisterObjTask) validate(params map[string]interface{}) error { } tplStr := string(tplData) - var ver, kind string - scanner := bufio.NewScanner(strings.NewReader(tplStr)) - for scanner.Scan() { - line := scanner.Text() - if strings.HasPrefix(line, "apiVersion:") { - ver = strings.TrimSpace(line[11:]) + task.gvk = []schema.GroupVersionKind{} + task.objTpl = []*template.Template{} + + blocks := reDelim.Split(tplStr, -1) + for _, block := range blocks { + var ver, kind string + scanner := bufio.NewScanner(strings.NewReader(block)) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "apiVersion:") { + ver = strings.TrimSpace(line[11:]) + } + if strings.HasPrefix(line, "kind:") { + kind = strings.TrimSpace(line[5:]) + } + if len(ver) != 0 && len(kind) != 0 { + break + } } - if strings.HasPrefix(line, "kind:") { - kind = strings.TrimSpace(line[5:]) + if err := scanner.Err(); err != nil { + return fmt.Errorf("%s: failed to process template %s: %v", task.ID(), task.Template, err) } - if len(ver) != 0 && len(kind) != 0 { - break + if len(ver) == 0 { + return fmt.Errorf("%s: failed to fetch 'apiVersion' from template %s", task.ID(), task.Template) + } + if len(kind) == 0 { + return fmt.Errorf("%s: failed to fetch 'kind' from template %s", task.ID(), task.Template) } - } - if err := scanner.Err(); err != nil { - fmt.Println("Error reading string:", err) - } - if len(ver) == 0 { - return fmt.Errorf("%s: failed to fetch 'apiVersion' from template %s: %d", task.ID(), task.Template, len(ver)) - } - if len(kind) == 0 { - return fmt.Errorf("%s: failed to fetch 'kind' from template %s: %d", task.ID(), task.Template, len(kind)) - } - task.gvk = schema.FromAPIVersionAndKind(ver, kind) + gvk := schema.FromAPIVersionAndKind(ver, kind) + log.Infof("Register %s", gvk.String()) + task.gvk = append(task.gvk, gvk) - task.objTpl, err = template.New("object").Parse(tplStr) - if err != nil { - return fmt.Errorf("%s: failed to parse template %s: %v", task.ID(), task.Template, err) + objTpl, err := template.New(gvk.String()).Parse(block) + if err != nil { + return fmt.Errorf("%s: failed to parse template %s: %v", task.ID(), task.Template, err) + } + task.objTpl = append(task.objTpl, objTpl) } if len(task.PodNameFormat) != 0 { @@ -136,36 +154,39 @@ func (task *RegisterObjTask) validate(params map[string]interface{}) error { // Exec implements Runnable interface func (task *RegisterObjTask) Exec(ctx context.Context) error { - switch task.gvk.String() { - case "batch/v1, Kind=Job": - task.gvr = schema.GroupVersionResource{ - Group: task.gvk.Group, - Version: task.gvk.Version, - Resource: "jobs", - } - default: - if err := task.getGVR(); err != nil { - return err - } - } - - return task.accessor.SetObjType(task.taskID, &task.RegisterObjParams) -} - -func (task *RegisterObjTask) getGVR() error { apiResourceList, err := task.client.ServerPreferredResources() if err != nil { return fmt.Errorf("%s: failed to retrieve API resources: %v", task.ID(), err) } + task.gvr = make([]schema.GroupVersionResource, 0, len(task.gvk)) + + for _, gvk := range task.gvk { + switch gvk.String() { + case "batch/v1, Kind=Job": + task.gvr = append(task.gvr, schema.GroupVersionResource{ + Group: gvk.Group, + Version: gvk.Version, + Resource: "jobs", + }) + default: + if err := task.getGVR(apiResourceList, gvk); err != nil { + return err + } + } + } + return task.accessor.SetObjType(task.taskID, &task.RegisterObjParams) +} + +func (task *RegisterObjTask) getGVR(apiResourceList []*v1.APIResourceList, gvk schema.GroupVersionKind) error { for _, list := range apiResourceList { for _, r := range list.APIResources { - if r.Group == task.gvk.Group && r.Kind == task.gvk.Kind { - task.gvr = schema.GroupVersionResource{Group: r.Group, Version: r.Version, Resource: r.Name} + if r.Group == gvk.Group && r.Kind == gvk.Kind { + task.gvr = append(task.gvr, schema.GroupVersionResource{Group: r.Group, Version: r.Version, Resource: r.Name}) return nil } } } - return fmt.Errorf("%s: failed to find resource for %s", task.ID(), task.gvk.String()) + return fmt.Errorf("%s: failed to find resource for %s", task.ID(), gvk.String()) } diff --git a/pkg/engine/register_object_task_test.go b/pkg/engine/register_object_task_test.go index 2b7335f..414ff82 100644 --- a/pkg/engine/register_object_task_test.go +++ b/pkg/engine/register_object_task_test.go @@ -117,10 +117,12 @@ func TestNewRegisterObjTask(t *testing.T) { PodCount: "2", }, client: testDiscoveryClient, - gvk: schema.GroupVersionKind{ - Group: "example.com", - Version: "v1", - Kind: "MyObject", + gvk: []schema.GroupVersionKind{ + { + Group: "example.com", + Version: "v1", + Kind: "MyObject", + }, }, }, }, diff --git a/pkg/engine/submit_object_task.go b/pkg/engine/submit_object_task.go index ee5cf88..8745c7f 100644 --- a/pkg/engine/submit_object_task.go +++ b/pkg/engine/submit_object_task.go @@ -124,57 +124,61 @@ func (task *SubmitObjTask) Exec(ctx context.Context) error { return err } - for _, obj := range objs { - crd := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": obj.APIVersion, - "kind": obj.Kind, - "metadata": obj.Metadata, - "spec": obj.Spec, - }, - } + for _, arr := range objs { + for i, obj := range arr { + crd := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": obj.APIVersion, + "kind": obj.Kind, + "metadata": obj.Metadata, + "spec": obj.Spec, + }, + } - if task.CanExist { - _, err := task.client.Resource(regObjParams.gvr).Namespace(obj.Metadata.Namespace).Get(ctx, obj.Metadata.Name, metav1.GetOptions{}) - if err == nil { - log.V(4).Infof("Object %s/%s already exist", obj.Kind, obj.Metadata.Name) - return nil + if task.CanExist { + _, err := task.client.Resource(regObjParams.gvr[i]).Namespace(obj.Metadata.Namespace).Get(ctx, obj.Metadata.Name, metav1.GetOptions{}) + if err == nil { + log.V(4).Infof("Object %s/%s already exist", obj.Kind, obj.Metadata.Name) + return nil + } } - } - if _, err := task.client.Resource(regObjParams.gvr).Namespace(obj.Metadata.Namespace).Create(ctx, crd, metav1.CreateOptions{}); err != nil { - return err + if _, err := task.client.Resource(regObjParams.gvr[i]).Namespace(obj.Metadata.Namespace).Create(ctx, crd, metav1.CreateOptions{}); err != nil { + return err + } } } return task.accessor.SetObjInfo(task.taskID, - NewObjInfo(names, objs[0].Metadata.Namespace, regObjParams.gvr, podCount, podRegexp...)) + NewObjInfo(names, objs[0][0].Metadata.Namespace, regObjParams.gvr, podCount, podRegexp...)) } -func (task *SubmitObjTask) getGenericObjects(regObjParams *RegisterObjParams) ([]GenericObject, []string, int, []string, error) { +func (task *SubmitObjTask) getGenericObjects(regObjParams *RegisterObjParams) ([][]*GenericObject, []string, int, []string, error) { names, err := utils.GenerateNames(regObjParams.NameFormat, task.Count, task.Params) if err != nil { return nil, nil, 0, nil, fmt.Errorf("%s: failed to generate object names: %v", task.ID(), err) } - objs := make([]GenericObject, task.Count) + objs := make([][]*GenericObject, task.Count) podRegexp := []string{} for i := 0; i < task.Count; i++ { if len(names[i]) != 0 { task.Params["_NAME_"] = names[i] } - data, err := utils.ExecTemplate(regObjParams.objTpl, task.Params) - if err != nil { - return nil, nil, 0, nil, err - } - - if err = yaml.Unmarshal(data, &objs[i]); err != nil { - return nil, nil, 0, nil, err + objs[i] = make([]*GenericObject, len(regObjParams.objTpl)) + for j, objTpl := range regObjParams.objTpl { + data, err := utils.ExecTemplate(objTpl, task.Params) + if err != nil { + return nil, nil, 0, nil, err + } + if err = yaml.Unmarshal(data, &objs[i][j]); err != nil { + return nil, nil, 0, nil, err + } } if regObjParams.podNameTpl != nil { - data, err = utils.ExecTemplate(regObjParams.podNameTpl, task.Params) + data, err := utils.ExecTemplate(regObjParams.podNameTpl, task.Params) if err != nil { return nil, nil, 0, nil, err } diff --git a/pkg/engine/submit_object_task_test.go b/pkg/engine/submit_object_task_test.go index 10a99d8..93de9cc 100644 --- a/pkg/engine/submit_object_task_test.go +++ b/pkg/engine/submit_object_task_test.go @@ -89,7 +89,7 @@ func TestNewSubmitObjTask(t *testing.T) { refTaskID string err string task *SubmitObjTask - objs []GenericObject + objs [][]*GenericObject names []string podCount int podRegexp []string @@ -166,17 +166,19 @@ func TestNewSubmitObjTask(t *testing.T) { }, client: testDynamicClient, }, - objs: []GenericObject{ + objs: [][]*GenericObject{ { - TypeMeta: TypeMeta{ - APIVersion: "example.com/v1", - Kind: "MyObject", - }, - Metadata: objectMeta{ - Name: "job1", - Namespace: "default", + { + TypeMeta: TypeMeta{ + APIVersion: "example.com/v1", + Kind: "MyObject", + }, + Metadata: objectMeta{ + Name: "job1", + Namespace: "default", + }, + Spec: spec, }, - Spec: spec, }, }, names: []string{"job1"}, @@ -209,28 +211,32 @@ func TestNewSubmitObjTask(t *testing.T) { }, client: testDynamicClient, }, - objs: []GenericObject{ + objs: [][]*GenericObject{ { - TypeMeta: TypeMeta{ - APIVersion: "example.com/v1", - Kind: "MyObject", - }, - Metadata: objectMeta{ - Name: "job1", - Namespace: "default", + { + TypeMeta: TypeMeta{ + APIVersion: "example.com/v1", + Kind: "MyObject", + }, + Metadata: objectMeta{ + Name: "job1", + Namespace: "default", + }, + Spec: spec, }, - Spec: spec, }, { - TypeMeta: TypeMeta{ - APIVersion: "example.com/v1", - Kind: "MyObject", - }, - Metadata: objectMeta{ - Name: "job2", - Namespace: "default", + { + TypeMeta: TypeMeta{ + APIVersion: "example.com/v1", + Kind: "MyObject", + }, + Metadata: objectMeta{ + Name: "job2", + Namespace: "default", + }, + Spec: spec, }, - Spec: spec, }, }, names: []string{"job1", "job2"}, @@ -266,28 +272,32 @@ func TestNewSubmitObjTask(t *testing.T) { }, client: testDynamicClient, }, - objs: []GenericObject{ + objs: [][]*GenericObject{ { - TypeMeta: TypeMeta{ - APIVersion: "example.com/v1", - Kind: "MyObject", - }, - Metadata: objectMeta{ - Name: "job1", - Namespace: "default", + { + TypeMeta: TypeMeta{ + APIVersion: "example.com/v1", + Kind: "MyObject", + }, + Metadata: objectMeta{ + Name: "job1", + Namespace: "default", + }, + Spec: spec, }, - Spec: spec, }, { - TypeMeta: TypeMeta{ - APIVersion: "example.com/v1", - Kind: "MyObject", - }, - Metadata: objectMeta{ - Name: "job2", - Namespace: "default", + { + TypeMeta: TypeMeta{ + APIVersion: "example.com/v1", + Kind: "MyObject", + }, + Metadata: objectMeta{ + Name: "job2", + Namespace: "default", + }, + Spec: spec, }, - Spec: spec, }, }, names: []string{"job1", "job2"}, @@ -326,7 +336,8 @@ func TestNewSubmitObjTask(t *testing.T) { require.Equal(t, tc.task, task) - tc.regObjParams.objTpl, err = template.ParseFiles(tc.regObjParams.Template) + tc.regObjParams.objTpl = make([]*template.Template, 1) + tc.regObjParams.objTpl[0], err = template.ParseFiles(tc.regObjParams.Template) require.NoError(t, err) if len(tc.regObjParams.PodNameFormat) != 0 { diff --git a/pkg/engine/types.go b/pkg/engine/types.go index 63bfec4..fa5dbcb 100644 --- a/pkg/engine/types.go +++ b/pkg/engine/types.go @@ -62,9 +62,13 @@ func (t *BaseTask) ID() string { } type StateParams struct { - RefTaskID string `yaml:"refTaskId"` - State map[string]interface{} `yaml:"state"` - Timeout time.Duration `yaml:"timeout"` + // RefTaskID is the ID for the task from which the object was submitted + RefTaskID string `yaml:"refTaskId"` + // Index refers to the position of the object within the template file, + // or it defaults to zero if the template file contains only a single object. + Index int `yaml:"index,omitempty"` + State map[string]interface{} `yaml:"state"` + Timeout time.Duration `yaml:"timeout"` } type TypeMeta struct { @@ -92,8 +96,8 @@ type RegisterObjParams struct { PodCount string `yaml:"podCount,omitempty"` // derived - gvr schema.GroupVersionResource - objTpl *template.Template + gvr []schema.GroupVersionResource + objTpl []*template.Template podNameTpl *template.Template podCountTpl *template.Template } @@ -102,13 +106,13 @@ type RegisterObjParams struct { type ObjInfo struct { Names []string Namespace string - GVR schema.GroupVersionResource + GVR []schema.GroupVersionResource PodCount int PodRegexp []string } // NewObjInfo creates new ObjInfo -func NewObjInfo(names []string, ns string, gvr schema.GroupVersionResource, podCount int, podRegexp ...string) *ObjInfo { +func NewObjInfo(names []string, ns string, gvr []schema.GroupVersionResource, podCount int, podRegexp ...string) *ObjInfo { return &ObjInfo{ Names: names, Namespace: ns, diff --git a/pkg/engine/update_object_task.go b/pkg/engine/update_object_task.go index 8d1b949..489e0a4 100644 --- a/pkg/engine/update_object_task.go +++ b/pkg/engine/update_object_task.go @@ -68,17 +68,18 @@ func (task *UpdateObjTask) Exec(ctx context.Context) error { return fmt.Errorf("%s: failed to generate patch: %v", task.ID(), err) } + gvr := info.GVR[task.Index] for _, name := range info.Names { if patch.Root != nil { - _, err = task.client.Resource(info.GVR).Namespace(info.Namespace).Patch(ctx, name, types.MergePatchType, patch.Root, metav1.PatchOptions{}) + _, err = task.client.Resource(gvr).Namespace(info.Namespace).Patch(ctx, name, types.MergePatchType, patch.Root, metav1.PatchOptions{}) if err != nil { - return fmt.Errorf("%s: failed to patch %s %s: %v", task.ID(), info.GVR.Resource, name, err) + return fmt.Errorf("%s: failed to patch %s %s: %v", task.ID(), gvr.Resource, name, err) } } if patch.Status != nil { - _, err = task.client.Resource(info.GVR).Namespace(info.Namespace).Patch(ctx, name, types.MergePatchType, patch.Root, metav1.PatchOptions{}, "status") + _, err = task.client.Resource(gvr).Namespace(info.Namespace).Patch(ctx, name, types.MergePatchType, patch.Root, metav1.PatchOptions{}, "status") if err != nil { - return fmt.Errorf("%s: failed to patch status %s %s: %v", task.ID(), info.GVR.Resource, name, err) + return fmt.Errorf("%s: failed to patch status %s %s: %v", task.ID(), gvr.Resource, name, err) } } }