Skip to content

Commit

Permalink
implemented registration of CRDs
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Shmulevich <[email protected]>
  • Loading branch information
dmitsh committed May 16, 2024
1 parent 8116fa9 commit cf11120
Show file tree
Hide file tree
Showing 23 changed files with 810 additions and 374 deletions.
8 changes: 4 additions & 4 deletions pkg/engine/check_object_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type CheckObjTask struct {
}

// newCheckObjTask initializes and returns CheckObjTask
func newCheckObjTask(log logr.Logger, client *dynamic.DynamicClient, getter ObjGetter, cfg *config.Task) (*CheckObjTask, error) {
func newCheckObjTask(log logr.Logger, client *dynamic.DynamicClient, accessor ObjInfoAccessor, cfg *config.Task) (*CheckObjTask, error) {
if client == nil {
return nil, fmt.Errorf("%s/%s: DynamicClient is not set", cfg.Type, cfg.ID)
}
Expand All @@ -49,8 +49,8 @@ func newCheckObjTask(log logr.Logger, client *dynamic.DynamicClient, getter ObjG
taskType: cfg.Type,
taskID: cfg.ID,
},
client: client,
getter: getter,
client: client,
accessor: accessor,
},
}

Expand All @@ -63,7 +63,7 @@ func newCheckObjTask(log logr.Logger, client *dynamic.DynamicClient, getter ObjG

// Exec implements Runnable interface
func (task *CheckObjTask) Exec(ctx context.Context) error {
info, err := task.getter.GetObjInfo(task.RefTaskID)
info, err := task.accessor.GetObjInfo(task.RefTaskID)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/engine/check_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestNewCheckObjTask(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objMap[tc.refTaskId] = nil
eng.objInfoMap[tc.refTaskId] = nil
}

task, err := eng.GetTask(&config.Task{
Expand All @@ -102,7 +102,7 @@ func TestNewCheckObjTask(t *testing.T) {
require.EqualError(t, err, tc.err)
require.Nil(t, tc.task)
} else {
tc.task.getter = eng
tc.task.accessor = eng
require.NoError(t, err)
require.NotNil(t, tc.task)
require.Equal(t, tc.task, task)
Expand Down
119 changes: 80 additions & 39 deletions pkg/engine/check_pod_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"context"
"fmt"
"regexp"
"time"

"github.com/go-logr/logr"
Expand All @@ -41,8 +42,8 @@ type CheckPodTask struct {
BaseTask
checkPodTaskParams

client *kubernetes.Clientset
getter ObjGetter
client *kubernetes.Clientset
accessor ObjInfoAccessor
}

type checkPodTaskParams struct {
Expand All @@ -53,7 +54,7 @@ type checkPodTaskParams struct {
}

// newCheckPodTask initializes and returns CheckPodTask
func newCheckPodTask(log logr.Logger, client *kubernetes.Clientset, getter ObjGetter, cfg *config.Task) (*CheckPodTask, error) {
func newCheckPodTask(log logr.Logger, client *kubernetes.Clientset, accessor ObjInfoAccessor, cfg *config.Task) (*CheckPodTask, error) {
if client == nil {
return nil, fmt.Errorf("%s/%s: Kubernetes client is not set", cfg.Type, cfg.ID)
}
Expand All @@ -64,8 +65,8 @@ func newCheckPodTask(log logr.Logger, client *kubernetes.Clientset, getter ObjGe
taskType: cfg.Type,
taskID: cfg.ID,
},
client: client,
getter: getter,
client: client,
accessor: accessor,
}

if err := task.validate(cfg.Params); err != nil {
Expand Down Expand Up @@ -98,13 +99,13 @@ func (task *CheckPodTask) validate(params map[string]interface{}) error {

// Exec implements Runnable interface
func (task *CheckPodTask) Exec(ctx context.Context) error {
info, err := task.getter.GetObjInfo(task.RefTaskID)
info, err := task.accessor.GetObjInfo(task.RefTaskID)
if err != nil {
return err
}

if len(info.Pods) == 0 {
return nil
if len(info.PodRegexp) == 0 {
return fmt.Errorf("%s: no pods to check", task.ID())
}

if task.Timeout == 0 {
Expand All @@ -114,37 +115,57 @@ func (task *CheckPodTask) Exec(ctx context.Context) error {
}

func (task *CheckPodTask) checkPods(ctx context.Context, info *ObjInfo) error {
for _, name := range info.Pods {
pod, err := task.client.CoreV1().Pods(info.Namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("%s: failed to get pod '%s': %v", task.ID(), name, err)
}
list, err := task.client.CoreV1().Pods(info.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("%s: failed to list pods: %v", task.ID(), err)
}

status := string(pod.Status.Phase)
if status != task.Status {
return fmt.Errorf("%s: pod %s, status %s, expected %s", task.ID(), name, status, task.Status)
}
re, err := exp2regexp(info.PodRegexp)
if err != nil {
return fmt.Errorf("%s: %v", task.ID(), err)
}

if err := task.verifyLabels(ctx, pod); err != nil {
return err
var count int
for i := range list.Items {
pod := &list.Items[i]
for _, r := range re {
if r.MatchString(pod.Name) {
task.log.V(4).Info("Matched pod", "name", pod.Name)
count++

status := string(pod.Status.Phase)
if status != task.Status {
return fmt.Errorf("%s: pod %s, status %s, expected %s", task.ID(), pod.Name, status, task.Status)
}

if err := task.verifyLabels(ctx, pod); err != nil {
return err
}
}
}
}

if count != info.PodCount {
return fmt.Errorf("%s: verified %d pods, expected %d", task.ID(), count, info.PodCount)
}

return nil
}

// watchPods watches statuses of given pods and compares them with the expected status.
// The function runs until all statuses are equal to the expected one, or until the timeout, whichever comes first.
func (task *CheckPodTask) watchPods(ctx context.Context, info *ObjInfo) error {
task.log.Info("Create pod informer", "#pods", len(info.Pods), "timeout", task.Timeout.String())
task.log.Info("Create pod informer", "#pod", info.PodCount, "timeout", task.Timeout.String())

re, err := exp2regexp(info.PodRegexp)
if err != nil {
return fmt.Errorf("%s: %v", task.ID(), err)
}

ctx, cancel := context.WithTimeout(ctx, task.Timeout)
defer cancel()

podMap := utils.NewSyncMap()
for _, pod := range info.Pods {
podMap.Set(pod, true)
}

errs := make(chan error)

Expand All @@ -153,12 +174,12 @@ func (task *CheckPodTask) watchPods(ctx context.Context, info *ObjInfo) error {

informer := factory.Core().V1().Pods().Informer()

_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
task.verifyPod(ctx, podMap, obj, errs)
task.verifyPod(ctx, re, podMap, info.PodCount, obj, errs)
},
UpdateFunc: func(_, obj interface{}) {
task.verifyPod(ctx, podMap, obj, errs)
task.verifyPod(ctx, re, podMap, info.PodCount, obj, errs)
},
})
if err != nil {
Expand All @@ -173,10 +194,10 @@ func (task *CheckPodTask) watchPods(ctx context.Context, info *ObjInfo) error {
return
}
for i := range list.Items {
if podMap.Size() == 0 {
if podMap.Size() == info.PodCount {
break
}
task.verifyPod(ctx, podMap, &list.Items[i], errs)
task.verifyPod(ctx, re, podMap, info.PodCount, &list.Items[i], errs)
}
}()

Expand Down Expand Up @@ -209,24 +230,44 @@ func (task *CheckPodTask) verifyLabels(ctx context.Context, pod *v1.Pod) error {
return nil
}

func (task *CheckPodTask) verifyPod(ctx context.Context, podMap *utils.SyncMap, obj interface{}, errs chan error) {
func (task *CheckPodTask) verifyPod(ctx context.Context, re []*regexp.Regexp, podMap *utils.SyncMap, count int, obj interface{}, errs chan error) {
pod, ok := obj.(*v1.Pod)
if !ok {
errs <- fmt.Errorf("%s: unexpected object type %T, expected *v1.Pod", task.ID(), obj)
return
}

if _, ok := podMap.Get(pod.Name); ok {
status := string(pod.Status.Phase)
task.log.V(4).Info("Informer event", "pod", pod.Name, "status", status)
if err := task.verifyLabels(ctx, pod); err != nil {
errs <- err
return
for _, r := range re {
if r.MatchString(pod.Name) {
task.log.V(4).Info("Matched pod", "name", pod.Name)
if _, ok := podMap.Get(pod.Name); ok {
return
}
status := string(pod.Status.Phase)
task.log.V(4).Info("Informer event", "pod", pod.Name, "status", status)
if status != task.Status {
return
}
if err := task.verifyLabels(ctx, pod); err != nil {
errs <- err
return
}
if sz := podMap.Set(pod.Name, true); sz == count {
task.log.Info("Accounted for all pods")
errs <- nil
return
}
}
if sz := podMap.Delete(pod.Name); sz == 0 {
task.log.Info("Accounted for all pods")
errs <- nil
return
}
}

func exp2regexp(expr []string) ([]*regexp.Regexp, error) {
re := make([]*regexp.Regexp, len(expr))
for i, r := range expr {
var err error
if re[i], err = regexp.Compile(r); err != nil {
return nil, fmt.Errorf("failed to compile regexp '%s': %v", r, err)
}
}
return re, nil
}
4 changes: 2 additions & 2 deletions pkg/engine/check_pod_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestCheckPodParams(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objMap[tc.refTaskId] = nil
eng.objInfoMap[tc.refTaskId] = nil
}
task, err := eng.GetTask(&config.Task{
ID: taskID,
Expand All @@ -116,7 +116,7 @@ func TestCheckPodParams(t *testing.T) {
require.EqualError(t, err, tc.err)
require.Nil(t, tc.task)
} else {
tc.task.getter = eng
tc.task.accessor = eng
require.NoError(t, err)
require.NotNil(t, tc.task)
require.Equal(t, tc.task, task)
Expand Down
4 changes: 2 additions & 2 deletions pkg/engine/delete_object_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ type DeleteObjTask struct {
deleteObjTaskParams

client *dynamic.DynamicClient
getter ObjGetter
getter ObjInfoAccessor
}

type deleteObjTaskParams struct {
RefTaskID string `yaml:"refTaskId"`
}

// newDeleteObjTask initializes and returns DeleteObjTask
func newDeleteObjTask(log logr.Logger, client *dynamic.DynamicClient, getter ObjGetter, cfg *config.Task) (*DeleteObjTask, error) {
func newDeleteObjTask(log logr.Logger, client *dynamic.DynamicClient, getter ObjInfoAccessor, cfg *config.Task) (*DeleteObjTask, error) {
if client == nil {
return nil, fmt.Errorf("%s/%s: DynamicClient is not set", cfg.Type, cfg.ID)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/delete_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestNewDeleteObjTask(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objMap[tc.refTaskId] = nil
eng.objInfoMap[tc.refTaskId] = nil
}

task, err := eng.GetTask(&config.Task{
Expand Down
Loading

0 comments on commit cf11120

Please sign in to comment.