Skip to content

Commit

Permalink
add support for multiple objects in template files
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Shmulevich <[email protected]>
  • Loading branch information
dmitsh committed Aug 17, 2024
1 parent 320d1ec commit 576cb44
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 154 deletions.
21 changes: 11 additions & 10 deletions pkg/engine/check_object_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,20 @@ func (task *CheckObjTask) Exec(ctx context.Context) error {

// TODO: add TweakListOptionsFunc for the CR
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(task.client, 0, info.Namespace, nil)
informer := factory.ForResource(info.GVR).Informer()
informer := factory.ForResource(info.GVR[task.Index]).Informer()

done := make(chan struct{})
defer close(done)

_, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
resource := obj.(*unstructured.Unstructured)
log.V(4).Infof("Informer added %s %s", info.GVR.Resource, resource.GetName())
log.V(4).Infof("Informer added %s %s", info.GVR[task.Index].Resource, resource.GetName())
task.checkStateAsync(ctx, resource.GetName(), info, nameMap, done)
},
UpdateFunc: func(_, obj interface{}) {
resource := obj.(*unstructured.Unstructured)
log.V(4).Infof("Informer updated %s %s", info.GVR.Resource, resource.GetName())
log.V(4).Infof("Informer updated %s %s", info.GVR[task.Index].Resource, resource.GetName())
task.checkStateAsync(ctx, resource.GetName(), info, nameMap, done)
},
})
Expand All @@ -112,10 +112,10 @@ func (task *CheckObjTask) Exec(ctx context.Context) error {
log.V(4).Infof("Wait for completion with informers")
select {
case <-ctx.Done():
log.Errorf("Validation failed for %s %v, err: %v", info.GVR.Resource, nameMap.Keys(), err)
log.Errorf("Validation failed for %s %v, err: %v", info.GVR[task.Index].Resource, nameMap.Keys(), err)
err = ctx.Err()
case <-done:
log.Infof("Validation passed for %s", info.GVR.Resource)
log.Infof("Validation passed for %s", info.GVR[task.Index].Resource)
err = nil
}
}
Expand All @@ -132,10 +132,10 @@ func (task *CheckObjTask) checkStates(ctx context.Context, info *ObjInfo, nameMa
}

if invalid := nameMap.Keys(); len(invalid) != 0 {
return fmt.Errorf("%s: failed to validate %s %v", task.ID(), info.GVR.Resource, nameMap.Keys())
return fmt.Errorf("%s: failed to validate %s %v", task.ID(), info.GVR[task.Index].Resource, nameMap.Keys())
}

log.Infof("Validation passed for %s", info.GVR.Resource)
log.Infof("Validation passed for %s", info.GVR[task.Index].Resource)
return nil
}

Expand All @@ -152,12 +152,13 @@ func (task *CheckObjTask) checkStateAsync(ctx context.Context, name string, info

// checkState validates state conformance and removes object name from the map if succeeded
func (task *CheckObjTask) checkState(ctx context.Context, name string, info *ObjInfo, nameMap *utils.SyncMap) error {
cr, err := task.client.Resource(info.GVR).Namespace(info.Namespace).Get(ctx, name, metav1.GetOptions{})
gvr := info.GVR[task.Index]
cr, err := task.client.Resource(gvr).Namespace(info.Namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("%s: failed to get %s %s: %v", task.ID(), info.GVR.Resource, name, err)
return fmt.Errorf("%s: failed to get %s %s: %v", task.ID(), gvr.Resource, name, err)
}
if !utils.IsSubset(cr.Object, task.State) {
return fmt.Errorf("%s: state mismatch in %s %s", task.ID(), info.GVR.Resource, name)
return fmt.Errorf("%s: state mismatch in %s %s", task.ID(), gvr.Resource, name)
}

nameMap.Delete(name)
Expand Down
18 changes: 10 additions & 8 deletions pkg/engine/delete_object_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,18 @@ func (task *DeleteObjTask) Exec(ctx context.Context) error {
return err
}

log.V(4).Infof("Deleting objects %s %v", info.GVR.String(), info.Names)
prop := v1.DeletePropagationBackground
opt := v1.DeleteOptions{
PropagationPolicy: &prop,
}

for _, name := range info.Names {
prop := v1.DeletePropagationBackground
opt := v1.DeleteOptions{
PropagationPolicy: &prop,
}
err = task.client.Resource(info.GVR).Namespace(info.Namespace).Delete(ctx, name, opt)
if err != nil {
return err
for i := range info.GVR {
log.V(4).Infof("Deleting objects %s %v", info.GVR[i].String(), info.Names)
err = task.client.Resource(info.GVR[i]).Namespace(info.Namespace).Delete(ctx, name, opt)
if err != nil {
return err
}
}
}
return nil
Expand Down
8 changes: 5 additions & 3 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,11 @@ func (eng *Eng) DeleteAllObjects(ctx context.Context) {
for _, objInfo := range eng.objInfoMap {
ns := objInfo.Namespace
for _, name := range objInfo.Names {
err := eng.dynamicClient.Resource(objInfo.GVR).Namespace(ns).Delete(ctx, name, deletions)
if err != nil {
log.Infof("Warning: cannot delete object %s: %v", name, err)
for i := range objInfo.GVR {
err := eng.dynamicClient.Resource(objInfo.GVR[i]).Namespace(ns).Delete(ctx, name, deletions)
if err != nil {
log.Infof("Warning: cannot delete object %s: %v", name, err)
}
}
}
}
Expand Down
111 changes: 66 additions & 45 deletions pkg/engine/register_object_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,33 @@ import (
"context"
"fmt"
"os"
"regexp"
"strings"
"text/template"

"gopkg.in/yaml.v3"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
log "k8s.io/klog/v2"

"github.com/NVIDIA/knavigator/pkg/config"
)

var reDelim *regexp.Regexp

type RegisterObjTask struct {
BaseTask
RegisterObjParams

client *discovery.DiscoveryClient
accessor ObjInfoAccessor

gvk schema.GroupVersionKind
gvk []schema.GroupVersionKind
}

func init() {
reDelim = regexp.MustCompile(`(?m)^---$`)
}

// newRegisterObjTask initializes and returns RegisterObjTask
Expand Down Expand Up @@ -83,35 +92,44 @@ func (task *RegisterObjTask) validate(params map[string]interface{}) error {
}

tplStr := string(tplData)
var ver, kind string
scanner := bufio.NewScanner(strings.NewReader(tplStr))
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "apiVersion:") {
ver = strings.TrimSpace(line[11:])
task.gvk = []schema.GroupVersionKind{}
task.objTpl = []*template.Template{}

blocks := reDelim.Split(tplStr, -1)
for _, block := range blocks {
var ver, kind string
scanner := bufio.NewScanner(strings.NewReader(block))
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "apiVersion:") {
ver = strings.TrimSpace(line[11:])
}
if strings.HasPrefix(line, "kind:") {
kind = strings.TrimSpace(line[5:])
}
if len(ver) != 0 && len(kind) != 0 {
break
}
}
if strings.HasPrefix(line, "kind:") {
kind = strings.TrimSpace(line[5:])
if err := scanner.Err(); err != nil {
return fmt.Errorf("%s: failed to process template %s: %v", task.ID(), task.Template, err)
}
if len(ver) != 0 && len(kind) != 0 {
break
if len(ver) == 0 {
return fmt.Errorf("%s: failed to fetch 'apiVersion' from template %s", task.ID(), task.Template)
}
if len(kind) == 0 {
return fmt.Errorf("%s: failed to fetch 'kind' from template %s", task.ID(), task.Template)
}
}
if err := scanner.Err(); err != nil {
fmt.Println("Error reading string:", err)
}
if len(ver) == 0 {
return fmt.Errorf("%s: failed to fetch 'apiVersion' from template %s: %d", task.ID(), task.Template, len(ver))
}
if len(kind) == 0 {
return fmt.Errorf("%s: failed to fetch 'kind' from template %s: %d", task.ID(), task.Template, len(kind))
}

task.gvk = schema.FromAPIVersionAndKind(ver, kind)
gvk := schema.FromAPIVersionAndKind(ver, kind)
log.Infof("Register %s", gvk.String())
task.gvk = append(task.gvk, gvk)

task.objTpl, err = template.New("object").Parse(tplStr)
if err != nil {
return fmt.Errorf("%s: failed to parse template %s: %v", task.ID(), task.Template, err)
objTpl, err := template.New(gvk.String()).Parse(block)
if err != nil {
return fmt.Errorf("%s: failed to parse template %s: %v", task.ID(), task.Template, err)
}
task.objTpl = append(task.objTpl, objTpl)
}

if len(task.PodNameFormat) != 0 {
Expand All @@ -136,36 +154,39 @@ func (task *RegisterObjTask) validate(params map[string]interface{}) error {

// Exec implements Runnable interface
func (task *RegisterObjTask) Exec(ctx context.Context) error {
switch task.gvk.String() {
case "batch/v1, Kind=Job":
task.gvr = schema.GroupVersionResource{
Group: task.gvk.Group,
Version: task.gvk.Version,
Resource: "jobs",
}
default:
if err := task.getGVR(); err != nil {
return err
}
}

return task.accessor.SetObjType(task.taskID, &task.RegisterObjParams)
}

func (task *RegisterObjTask) getGVR() error {
apiResourceList, err := task.client.ServerPreferredResources()
if err != nil {
return fmt.Errorf("%s: failed to retrieve API resources: %v", task.ID(), err)
}

task.gvr = make([]schema.GroupVersionResource, 0, len(task.gvk))

for _, gvk := range task.gvk {
switch gvk.String() {
case "batch/v1, Kind=Job":
task.gvr = append(task.gvr, schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: "jobs",
})
default:
if err := task.getGVR(apiResourceList, gvk); err != nil {
return err
}
}
}
return task.accessor.SetObjType(task.taskID, &task.RegisterObjParams)
}

func (task *RegisterObjTask) getGVR(apiResourceList []*v1.APIResourceList, gvk schema.GroupVersionKind) error {
for _, list := range apiResourceList {
for _, r := range list.APIResources {
if r.Group == task.gvk.Group && r.Kind == task.gvk.Kind {
task.gvr = schema.GroupVersionResource{Group: r.Group, Version: r.Version, Resource: r.Name}
if r.Group == gvk.Group && r.Kind == gvk.Kind {
task.gvr = append(task.gvr, schema.GroupVersionResource{Group: r.Group, Version: r.Version, Resource: r.Name})
return nil
}
}
}

return fmt.Errorf("%s: failed to find resource for %s", task.ID(), task.gvk.String())
return fmt.Errorf("%s: failed to find resource for %s", task.ID(), gvk.String())
}
10 changes: 6 additions & 4 deletions pkg/engine/register_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ func TestNewRegisterObjTask(t *testing.T) {
PodCount: "2",
},
client: testDiscoveryClient,
gvk: schema.GroupVersionKind{
Group: "example.com",
Version: "v1",
Kind: "MyObject",
gvk: []schema.GroupVersionKind{
{
Group: "example.com",
Version: "v1",
Kind: "MyObject",
},
},
},
},
Expand Down
60 changes: 32 additions & 28 deletions pkg/engine/submit_object_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,57 +124,61 @@ func (task *SubmitObjTask) Exec(ctx context.Context) error {
return err
}

for _, obj := range objs {
crd := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": obj.APIVersion,
"kind": obj.Kind,
"metadata": obj.Metadata,
"spec": obj.Spec,
},
}
for _, arr := range objs {
for i, obj := range arr {
crd := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": obj.APIVersion,
"kind": obj.Kind,
"metadata": obj.Metadata,
"spec": obj.Spec,
},
}

if task.CanExist {
_, err := task.client.Resource(regObjParams.gvr).Namespace(obj.Metadata.Namespace).Get(ctx, obj.Metadata.Name, metav1.GetOptions{})
if err == nil {
log.V(4).Infof("Object %s/%s already exist", obj.Kind, obj.Metadata.Name)
return nil
if task.CanExist {
_, err := task.client.Resource(regObjParams.gvr[i]).Namespace(obj.Metadata.Namespace).Get(ctx, obj.Metadata.Name, metav1.GetOptions{})
if err == nil {
log.V(4).Infof("Object %s/%s already exist", obj.Kind, obj.Metadata.Name)
return nil
}
}
}

if _, err := task.client.Resource(regObjParams.gvr).Namespace(obj.Metadata.Namespace).Create(ctx, crd, metav1.CreateOptions{}); err != nil {
return err
if _, err := task.client.Resource(regObjParams.gvr[i]).Namespace(obj.Metadata.Namespace).Create(ctx, crd, metav1.CreateOptions{}); err != nil {
return err
}
}
}

return task.accessor.SetObjInfo(task.taskID,
NewObjInfo(names, objs[0].Metadata.Namespace, regObjParams.gvr, podCount, podRegexp...))
NewObjInfo(names, objs[0][0].Metadata.Namespace, regObjParams.gvr, podCount, podRegexp...))
}

func (task *SubmitObjTask) getGenericObjects(regObjParams *RegisterObjParams) ([]GenericObject, []string, int, []string, error) {
func (task *SubmitObjTask) getGenericObjects(regObjParams *RegisterObjParams) ([][]*GenericObject, []string, int, []string, error) {
names, err := utils.GenerateNames(regObjParams.NameFormat, task.Count, task.Params)
if err != nil {
return nil, nil, 0, nil, fmt.Errorf("%s: failed to generate object names: %v", task.ID(), err)
}

objs := make([]GenericObject, task.Count)
objs := make([][]*GenericObject, task.Count)
podRegexp := []string{}

for i := 0; i < task.Count; i++ {
if len(names[i]) != 0 {
task.Params["_NAME_"] = names[i]
}
data, err := utils.ExecTemplate(regObjParams.objTpl, task.Params)
if err != nil {
return nil, nil, 0, nil, err
}

if err = yaml.Unmarshal(data, &objs[i]); err != nil {
return nil, nil, 0, nil, err
objs[i] = make([]*GenericObject, len(regObjParams.objTpl))
for j, objTpl := range regObjParams.objTpl {
data, err := utils.ExecTemplate(objTpl, task.Params)
if err != nil {
return nil, nil, 0, nil, err
}
if err = yaml.Unmarshal(data, &objs[i][j]); err != nil {
return nil, nil, 0, nil, err
}
}

if regObjParams.podNameTpl != nil {
data, err = utils.ExecTemplate(regObjParams.podNameTpl, task.Params)
data, err := utils.ExecTemplate(regObjParams.podNameTpl, task.Params)
if err != nil {
return nil, nil, 0, nil, err
}
Expand Down
Loading

0 comments on commit 576cb44

Please sign in to comment.