diff --git a/cmd/knavigator/main.go b/cmd/knavigator/main.go index 3617455..7ebaf09 100644 --- a/cmd/knavigator/main.go +++ b/cmd/knavigator/main.go @@ -31,11 +31,16 @@ import ( ) func mainInternal() error { - var kubeConfigPath, kubeCtx, taskConfigs string - var qps float64 - var burst int + var ( + kubeConfigPath, kubeCtx, taskConfigs string + qps float64 + burst int + cleanupInfo engine.CleanupInfo + ) flag.StringVar(&kubeConfigPath, "kubeconfig", "", "kubeconfig file path") flag.StringVar(&kubeCtx, "kubectx", "", "kube context") + flag.BoolVar(&cleanupInfo.Enabled, "cleanup", false, "delete objects") + flag.DurationVar(&cleanupInfo.Timeout, "cleanup.timeout", engine.DefaultCleanupTimeout, "time limit for cleanup") flag.StringVar(&taskConfigs, "tasks", "", "comma-separated list of task config files and dirs") flag.Float64Var(&qps, "kube-api-qps", 500, "Maximum QPS to use while talking with Kubernetes API") flag.IntVar(&burst, "kube-api-burst", 500, "Maximum burst for throttle while talking with Kubernetes API") @@ -68,7 +73,7 @@ func mainInternal() error { return err } - eng, err := engine.New(log, restConfig) + eng, err := engine.New(log, restConfig, &cleanupInfo) if err != nil { return err } diff --git a/pkg/engine/check_object_task_test.go b/pkg/engine/check_object_task_test.go index d742121..aea5121 100644 --- a/pkg/engine/check_object_task_test.go +++ b/pkg/engine/check_object_task_test.go @@ -87,7 +87,7 @@ func TestNewCheckObjTask(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - eng, err := New(testLogger, nil, tc.simClients) + eng, err := New(testLogger, nil, nil, tc.simClients) require.NoError(t, err) if len(tc.refTaskId) != 0 { eng.objInfoMap[tc.refTaskId] = nil diff --git a/pkg/engine/check_pod_task_test.go b/pkg/engine/check_pod_task_test.go index b6aef2a..cb7dd20 100644 --- a/pkg/engine/check_pod_task_test.go +++ b/pkg/engine/check_pod_task_test.go @@ -102,7 +102,7 @@ func TestCheckPodParams(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - eng, err := New(testLogger, nil, tc.simClients) + eng, err := New(testLogger, nil, nil, tc.simClients) require.NoError(t, err) if len(tc.refTaskId) != 0 { eng.objInfoMap[tc.refTaskId] = nil diff --git a/pkg/engine/delete_object_task_test.go b/pkg/engine/delete_object_task_test.go index acdc6d9..1339b3d 100644 --- a/pkg/engine/delete_object_task_test.go +++ b/pkg/engine/delete_object_task_test.go @@ -76,7 +76,7 @@ func TestNewDeleteObjTask(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - eng, err := New(testLogger, nil, tc.simClients) + eng, err := New(testLogger, nil, nil, tc.simClients) require.NoError(t, err) if len(tc.refTaskId) != 0 { eng.objInfoMap[tc.refTaskId] = nil diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 1ffd39d..4c46907 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -34,6 +35,7 @@ import ( type Engine interface { RunTask(context.Context, *config.Task) error Reset(context.Context) error + DeleteAllObjects(context.Context) error } type Eng struct { @@ -44,13 +46,15 @@ type Eng struct { discoveryClient *discovery.DiscoveryClient objTypeMap map[string]*RegisterObjParams objInfoMap map[string]*ObjInfo + cleanup *CleanupInfo } -func New(log logr.Logger, config *rest.Config, sim ...bool) (*Eng, error) { +func New(log logr.Logger, config *rest.Config, cleanupInfo *CleanupInfo, sim ...bool) (*Eng, error) { eng := &Eng{ log: log, objTypeMap: make(map[string]*RegisterObjParams), objInfoMap: make(map[string]*ObjInfo), + cleanup: cleanupInfo, } if len(sim) == 0 { // len(sim) != 0 in unit tests @@ -247,6 +251,38 @@ func execRunnable(ctx context.Context, log logr.Logger, r Runnable) error { return nil } +// Reset re-initializes engine and deletes the remaining objects func (eng *Eng) Reset(ctx context.Context) error { + eng.log.Info("Reset Engine") + + if eng.cleanup == nil || !eng.cleanup.Enabled { + return nil + } + + eng.log.Info("Cleaning up objects") + ctx, cancel := context.WithTimeout(ctx, eng.cleanup.Timeout) + defer cancel() + + return eng.DeleteAllObjects(ctx) +} + +// DeleteAllObjects deletes all objects +func (eng *Eng) DeleteAllObjects(ctx context.Context) error { + deletePolicy := metav1.DeletePropagationBackground + deletions := metav1.DeleteOptions{ + PropagationPolicy: &deletePolicy, + } + + for _, objInfo := range eng.objInfoMap { + ns := objInfo.Namespace + for _, name := range objInfo.Names { + err := eng.dynamicClient.Resource(objInfo.GVR).Namespace(ns).Delete(ctx, name, deletions) + if err != nil { + return err + } + } + } + + eng.log.Info("Deleted all objects") return nil } diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go index ec2e720..0ddca66 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/engine_test.go @@ -40,17 +40,23 @@ var ( ) type testEngine struct { - execErr error - resetErr error + execErr error + resetErr error + deleteErr error } func (eng *testEngine) RunTask(context.Context, *config.Task) error { return eng.execErr } + func (eng *testEngine) Reset(context.Context) error { return eng.resetErr } +func (eng *testEngine) DeleteAllObjects(context.Context) error { + return eng.deleteErr +} + func TestRunEngine(t *testing.T) { testCases := []struct { name string diff --git a/pkg/engine/pause_task_test.go b/pkg/engine/pause_task_test.go index 3c4e739..279f997 100644 --- a/pkg/engine/pause_task_test.go +++ b/pkg/engine/pause_task_test.go @@ -27,7 +27,7 @@ import ( ) func TestPauseExec(t *testing.T) { - eng, err := New(testLogger, nil, false) + eng, err := New(testLogger, nil, nil, false) require.NoError(t, err) task, err := eng.GetTask(&config.Task{ diff --git a/pkg/engine/register_object_task_test.go b/pkg/engine/register_object_task_test.go index 122797f..cbc980b 100644 --- a/pkg/engine/register_object_task_test.go +++ b/pkg/engine/register_object_task_test.go @@ -137,7 +137,7 @@ func TestNewRegisterObjTask(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - eng, err := New(testLogger, nil, tc.simClients) + eng, err := New(testLogger, nil, nil, tc.simClients) require.NoError(t, err) runnable, err := eng.GetTask(&config.Task{ diff --git a/pkg/engine/sleep_task_test.go b/pkg/engine/sleep_task_test.go index 43916d1..8e42a5d 100644 --- a/pkg/engine/sleep_task_test.go +++ b/pkg/engine/sleep_task_test.go @@ -66,7 +66,7 @@ func TestSleepParams(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - eng, err := New(testLogger, nil, false) + eng, err := New(testLogger, nil, nil, false) require.NoError(t, err) task, err := eng.GetTask(&config.Task{ diff --git a/pkg/engine/submit_object_task_test.go b/pkg/engine/submit_object_task_test.go index 7eee44e..73b310a 100644 --- a/pkg/engine/submit_object_task_test.go +++ b/pkg/engine/submit_object_task_test.go @@ -244,7 +244,7 @@ func TestNewSubmitObjTask(t *testing.T) { t.Run(tc.name, func(t *testing.T) { utils.SetObjectID(0) - eng, err := New(testLogger, nil, tc.simClients) + eng, err := New(testLogger, nil, nil, tc.simClients) require.NoError(t, err) if len(tc.refTaskID) != 0 { diff --git a/pkg/engine/types.go b/pkg/engine/types.go index 30bd056..9734f6f 100644 --- a/pkg/engine/types.go +++ b/pkg/engine/types.go @@ -27,15 +27,16 @@ import ( ) const ( - TaskRegisterObj = "RegisterObj" - TaskSubmitObj = "SubmitObj" - TaskUpdateObj = "UpdateObj" - TaskCheckObj = "CheckObj" - TaskDeleteObj = "DeleteObj" - TaskCheckPod = "CheckPod" - TaskUpdateNodes = "UpdateNodes" - TaskSleep = "Sleep" - TaskPause = "Pause" + TaskRegisterObj = "RegisterObj" + TaskSubmitObj = "SubmitObj" + TaskUpdateObj = "UpdateObj" + TaskCheckObj = "CheckObj" + TaskDeleteObj = "DeleteObj" + TaskCheckPod = "CheckPod" + TaskUpdateNodes = "UpdateNodes" + TaskSleep = "Sleep" + TaskPause = "Pause" + DefaultCleanupTimeout = 10 * time.Minute ) type Runnable interface { diff --git a/pkg/engine/update_nodes_task_test.go b/pkg/engine/update_nodes_task_test.go index 444d7eb..132694d 100644 --- a/pkg/engine/update_nodes_task_test.go +++ b/pkg/engine/update_nodes_task_test.go @@ -86,7 +86,7 @@ func TestUpdateNodesTask(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - eng, err := New(testLogger, nil, tc.simClients) + eng, err := New(testLogger, nil, nil, tc.simClients) require.NoError(t, err) _, err = eng.GetTask(&config.Task{ ID: taskID, diff --git a/pkg/engine/update_object_task_test.go b/pkg/engine/update_object_task_test.go index fcac2da..3ef8dfc 100644 --- a/pkg/engine/update_object_task_test.go +++ b/pkg/engine/update_object_task_test.go @@ -87,7 +87,7 @@ func TestNewUpdateObjTask(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - eng, err := New(testLogger, nil, tc.simClients) + eng, err := New(testLogger, nil, nil, tc.simClients) require.NoError(t, err) if len(tc.refTaskId) != 0 { eng.objInfoMap[tc.refTaskId] = nil