Skip to content

Commit

Permalink
Add the cleanup support (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchen8911 authored May 17, 2024
2 parents 77f5fe2 + 81018bd commit 0a49d8b
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 25 deletions.
13 changes: 9 additions & 4 deletions cmd/knavigator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ import (
)

func mainInternal() error {
var kubeConfigPath, kubeCtx, taskConfigs string
var qps float64
var burst int
var (
kubeConfigPath, kubeCtx, taskConfigs string
qps float64
burst int
cleanupInfo engine.CleanupInfo
)
flag.StringVar(&kubeConfigPath, "kubeconfig", "", "kubeconfig file path")
flag.StringVar(&kubeCtx, "kubectx", "", "kube context")
flag.BoolVar(&cleanupInfo.Enabled, "cleanup", false, "delete objects")
flag.DurationVar(&cleanupInfo.Timeout, "cleanup.timeout", engine.DefaultCleanupTimeout, "time limit for cleanup")
flag.StringVar(&taskConfigs, "tasks", "", "comma-separated list of task config files and dirs")
flag.Float64Var(&qps, "kube-api-qps", 500, "Maximum QPS to use while talking with Kubernetes API")
flag.IntVar(&burst, "kube-api-burst", 500, "Maximum burst for throttle while talking with Kubernetes API")
Expand Down Expand Up @@ -68,7 +73,7 @@ func mainInternal() error {
return err
}

eng, err := engine.New(log, restConfig)
eng, err := engine.New(log, restConfig, &cleanupInfo)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/check_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestNewCheckObjTask(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objInfoMap[tc.refTaskId] = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/check_pod_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestCheckPodParams(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objInfoMap[tc.refTaskId] = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/delete_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestNewDeleteObjTask(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objInfoMap[tc.refTaskId] = nil
Expand Down
38 changes: 37 additions & 1 deletion pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand All @@ -34,6 +35,7 @@ import (
type Engine interface {
RunTask(context.Context, *config.Task) error
Reset(context.Context) error
DeleteAllObjects(context.Context) error
}

type Eng struct {
Expand All @@ -44,13 +46,15 @@ type Eng struct {
discoveryClient *discovery.DiscoveryClient
objTypeMap map[string]*RegisterObjParams
objInfoMap map[string]*ObjInfo
cleanup *CleanupInfo
}

func New(log logr.Logger, config *rest.Config, sim ...bool) (*Eng, error) {
func New(log logr.Logger, config *rest.Config, cleanupInfo *CleanupInfo, sim ...bool) (*Eng, error) {
eng := &Eng{
log: log,
objTypeMap: make(map[string]*RegisterObjParams),
objInfoMap: make(map[string]*ObjInfo),
cleanup: cleanupInfo,
}

if len(sim) == 0 { // len(sim) != 0 in unit tests
Expand Down Expand Up @@ -247,6 +251,38 @@ func execRunnable(ctx context.Context, log logr.Logger, r Runnable) error {
return nil
}

// Reset re-initializes engine and deletes the remaining objects
func (eng *Eng) Reset(ctx context.Context) error {
eng.log.Info("Reset Engine")

if eng.cleanup == nil || !eng.cleanup.Enabled {
return nil
}

eng.log.Info("Cleaning up objects")
ctx, cancel := context.WithTimeout(ctx, eng.cleanup.Timeout)
defer cancel()

return eng.DeleteAllObjects(ctx)
}

// DeleteAllObjects deletes all objects
func (eng *Eng) DeleteAllObjects(ctx context.Context) error {
deletePolicy := metav1.DeletePropagationBackground
deletions := metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}

for _, objInfo := range eng.objInfoMap {
ns := objInfo.Namespace
for _, name := range objInfo.Names {
err := eng.dynamicClient.Resource(objInfo.GVR).Namespace(ns).Delete(ctx, name, deletions)
if err != nil {
return err
}
}
}

eng.log.Info("Deleted all objects")
return nil
}
10 changes: 8 additions & 2 deletions pkg/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,23 @@ var (
)

type testEngine struct {
execErr error
resetErr error
execErr error
resetErr error
deleteErr error
}

func (eng *testEngine) RunTask(context.Context, *config.Task) error {
return eng.execErr
}

func (eng *testEngine) Reset(context.Context) error {
return eng.resetErr
}

func (eng *testEngine) DeleteAllObjects(context.Context) error {
return eng.deleteErr
}

func TestRunEngine(t *testing.T) {
testCases := []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/pause_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestPauseExec(t *testing.T) {
eng, err := New(testLogger, nil, false)
eng, err := New(testLogger, nil, nil, false)
require.NoError(t, err)

task, err := eng.GetTask(&config.Task{
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/register_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestNewRegisterObjTask(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)

runnable, err := eng.GetTask(&config.Task{
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/sleep_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestSleepParams(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, false)
eng, err := New(testLogger, nil, nil, false)
require.NoError(t, err)

task, err := eng.GetTask(&config.Task{
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/submit_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestNewSubmitObjTask(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
utils.SetObjectID(0)

eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)

if len(tc.refTaskID) != 0 {
Expand Down
19 changes: 10 additions & 9 deletions pkg/engine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ import (
)

const (
TaskRegisterObj = "RegisterObj"
TaskSubmitObj = "SubmitObj"
TaskUpdateObj = "UpdateObj"
TaskCheckObj = "CheckObj"
TaskDeleteObj = "DeleteObj"
TaskCheckPod = "CheckPod"
TaskUpdateNodes = "UpdateNodes"
TaskSleep = "Sleep"
TaskPause = "Pause"
TaskRegisterObj = "RegisterObj"
TaskSubmitObj = "SubmitObj"
TaskUpdateObj = "UpdateObj"
TaskCheckObj = "CheckObj"
TaskDeleteObj = "DeleteObj"
TaskCheckPod = "CheckPod"
TaskUpdateNodes = "UpdateNodes"
TaskSleep = "Sleep"
TaskPause = "Pause"
DefaultCleanupTimeout = 10 * time.Minute
)

type Runnable interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/update_nodes_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestUpdateNodesTask(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)
_, err = eng.GetTask(&config.Task{
ID: taskID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/update_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestNewUpdateObjTask(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objInfoMap[tc.refTaskId] = nil
Expand Down

0 comments on commit 0a49d8b

Please sign in to comment.