Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the cleanup support #37

Merged
merged 1 commit into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cmd/knavigator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ import (
)

func mainInternal() error {
var kubeConfigPath, kubeCtx, taskConfigs string
var qps float64
var burst int
var (
kubeConfigPath, kubeCtx, taskConfigs string
qps float64
burst int
cleanupInfo engine.CleanupInfo
)
flag.StringVar(&kubeConfigPath, "kubeconfig", "", "kubeconfig file path")
flag.StringVar(&kubeCtx, "kubectx", "", "kube context")
flag.BoolVar(&cleanupInfo.Enabled, "cleanup", false, "delete objects")
flag.DurationVar(&cleanupInfo.Timeout, "cleanup.timeout", engine.DefaultCleanupTimeout, "time limit for cleanup")
flag.StringVar(&taskConfigs, "tasks", "", "comma-separated list of task config files and dirs")
flag.Float64Var(&qps, "kube-api-qps", 500, "Maximum QPS to use while talking with Kubernetes API")
flag.IntVar(&burst, "kube-api-burst", 500, "Maximum burst for throttle while talking with Kubernetes API")
Expand Down Expand Up @@ -68,7 +73,7 @@ func mainInternal() error {
return err
}

eng, err := engine.New(log, restConfig)
eng, err := engine.New(log, restConfig, &cleanupInfo)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/check_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestNewCheckObjTask(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objInfoMap[tc.refTaskId] = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/check_pod_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestCheckPodParams(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objInfoMap[tc.refTaskId] = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/delete_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestNewDeleteObjTask(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objInfoMap[tc.refTaskId] = nil
Expand Down
38 changes: 37 additions & 1 deletion pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand All @@ -34,6 +35,7 @@ import (
type Engine interface {
RunTask(context.Context, *config.Task) error
Reset(context.Context) error
DeleteAllObjects(context.Context) error
}

type Eng struct {
Expand All @@ -44,13 +46,15 @@ type Eng struct {
discoveryClient *discovery.DiscoveryClient
objTypeMap map[string]*RegisterObjParams
objInfoMap map[string]*ObjInfo
cleanup *CleanupInfo
}

func New(log logr.Logger, config *rest.Config, sim ...bool) (*Eng, error) {
func New(log logr.Logger, config *rest.Config, cleanupInfo *CleanupInfo, sim ...bool) (*Eng, error) {
eng := &Eng{
log: log,
objTypeMap: make(map[string]*RegisterObjParams),
objInfoMap: make(map[string]*ObjInfo),
cleanup: cleanupInfo,
}

if len(sim) == 0 { // len(sim) != 0 in unit tests
Expand Down Expand Up @@ -247,6 +251,38 @@ func execRunnable(ctx context.Context, log logr.Logger, r Runnable) error {
return nil
}

// Reset re-initializes engine and deletes the remaining objects
func (eng *Eng) Reset(ctx context.Context) error {
eng.log.Info("Reset Engine")

if eng.cleanup == nil || !eng.cleanup.Enabled {
return nil
}

eng.log.Info("Cleaning up objects")
ctx, cancel := context.WithTimeout(ctx, eng.cleanup.Timeout)
defer cancel()

return eng.DeleteAllObjects(ctx)
}

// DeleteAllObjects deletes all objects
func (eng *Eng) DeleteAllObjects(ctx context.Context) error {
deletePolicy := metav1.DeletePropagationBackground
deletions := metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}

for _, objInfo := range eng.objInfoMap {
ns := objInfo.Namespace
for _, name := range objInfo.Names {
err := eng.dynamicClient.Resource(objInfo.GVR).Namespace(ns).Delete(ctx, name, deletions)
if err != nil {
return err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it return an error if the object has been already deleted by the test or expired naturally?

Copy link
Collaborator Author

@yuanchen8911 yuanchen8911 May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the following the correct way to add a DeleteObj task? The SubmitObj task's id is job.

- id: delete
  type: DeleteObj
  params:
    refTaskId: job
  refTaskId: job

After adding the above task to k8s/test-job.yaml, i ran the test (without --cleanup) and got the following error.

E0516 09:56:44.038701   53311 engine.go:200] "Task failed" err="the server could not find the requested resource" id="DeleteObj/delete"
I0516 09:56:44.038712   53311 engine.go:209] "Reset Engine"
E0516 09:56:44.038718   53311 main.go:96] the server could not find the requested resource

Copy link
Collaborator Author

@yuanchen8911 yuanchen8911 May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I was wondering why we need to define refTaskId twice in DeleteObj.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't need refTaskId twice.

Copy link
Collaborator Author

@yuanchen8911 yuanchen8911 May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the second refTaskId and still got the same error. Do we have a test with a DeleteObj task other than the unit test? Can you test the DeleteObj task by yourself?

 ./bin/knavigator --tasks resources/tests/k8s/test-job.yml
I0516 10:15:41.155408   53935 k8s_config.go:42] "Using external kubeconfig"
I0516 10:15:41.158578   53935 main.go:84] "Starting test" name="test-k8s-job"
I0516 10:15:41.158590   53935 engine.go:103] "Creating task" name="SubmitObj" id="job"
I0516 10:15:41.160156   53935 engine.go:197] "Starting task" id="SubmitObj/job"
I0516 10:15:41.182176   53935 engine.go:203] "Task completed" id="SubmitObj/job" duration="22.011666ms"
I0516 10:15:41.182188   53935 engine.go:103] "Creating task" name="DeleteObj" id="delete"
I0516 10:15:41.182224   53935 engine.go:197] "Starting task" id="DeleteObj/delete"
E0516 10:15:41.184724   53935 engine.go:200] "Task failed" err="the server could not find the requested resource" id="DeleteObj/delete"
I0516 10:15:41.184735   53935 engine.go:209] "Reset Engine"
E0516 10:15:41.184741   53935 main.go:96] the server could not find the requested resource

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, we never tested it. Let me check

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bug indeed. Fixed in #41

Copy link
Collaborator Author

@yuanchen8911 yuanchen8911 May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested --cleanup with and without the DeleteObj task. It workded well. For the case with DeleteObj, there's no error. The deleted object should have been removed from eng.objInfoMap?

./bin/knavigator --tasks resources/tests/k8s/test-job.yml  --cleanup
I0517 10:24:41.489049   93711 k8s_config.go:42] "Using external kubeconfig"
I0517 10:24:41.494671   93711 main.go:84] "Starting test" name="test-k8s-job"
I0517 10:24:41.494690   93711 engine.go:111] "Creating task" name="RegisterObj" id="register"
I0517 10:24:41.495255   93711 engine.go:244] "Starting task" id="RegisterObj/register"
I0517 10:24:41.495266   93711 engine.go:250] "Task completed" id="RegisterObj/register" duration="1.708µs"
I0517 10:24:41.495271   93711 engine.go:111] "Creating task" name="SubmitObj" id="job"
I0517 10:24:41.495349   93711 engine.go:244] "Starting task" id="SubmitObj/job"
I0517 10:24:41.512305   93711 engine.go:250] "Task completed" id="SubmitObj/job" duration="16.949084ms"
I0517 10:24:41.512316   93711 engine.go:111] "Creating task" name="CheckPod" id="status"
I0517 10:24:41.512363   93711 engine.go:244] "Starting task" id="CheckPod/status"
I0517 10:24:41.512369   93711 check_pod_task.go:158] "Create pod informer" #pod=2 timeout="5s"
I0517 10:24:41.517275   93711 check_pod_task.go:256] "Accounted for all pods"
I0517 10:24:41.517300   93711 engine.go:250] "Task completed" id="CheckPod/status" duration="4.932042ms"
I0517 10:24:41.517317   93711 engine.go:111] "Creating task" name="DeleteObj" id="delete"
I0517 10:24:41.517385   93711 engine.go:244] "Starting task" id="DeleteObj/delete"
W0517 10:24:41.529210   93711 warnings.go:70] child pods are preserved by default when jobs are deleted; set propagationPolicy=Background to remove them or set propagationPolicy=Orphan to suppress this warning
I0517 10:24:41.529230   93711 engine.go:250] "Task completed" id="DeleteObj/delete" duration="11.837917ms"
I0517 10:24:41.529235   93711 engine.go:256] "Reset Engine"
I0517 10:24:41.529237   93711 engine.go:262] "Cleaning up objects"
I0517 10:24:41.533397   93711 engine.go:286] "Deleted all objects"

}
}
}

eng.log.Info("Deleted all objects")
return nil
}
10 changes: 8 additions & 2 deletions pkg/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,23 @@ var (
)

type testEngine struct {
execErr error
resetErr error
execErr error
resetErr error
deleteErr error
}

func (eng *testEngine) RunTask(context.Context, *config.Task) error {
return eng.execErr
}

func (eng *testEngine) Reset(context.Context) error {
return eng.resetErr
}

func (eng *testEngine) DeleteAllObjects(context.Context) error {
return eng.deleteErr
}

func TestRunEngine(t *testing.T) {
testCases := []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/pause_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestPauseExec(t *testing.T) {
eng, err := New(testLogger, nil, false)
eng, err := New(testLogger, nil, nil, false)
require.NoError(t, err)

task, err := eng.GetTask(&config.Task{
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/register_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestNewRegisterObjTask(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)

runnable, err := eng.GetTask(&config.Task{
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/sleep_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestSleepParams(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, false)
eng, err := New(testLogger, nil, nil, false)
require.NoError(t, err)

task, err := eng.GetTask(&config.Task{
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/submit_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestNewSubmitObjTask(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
utils.SetObjectID(0)

eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)

if len(tc.refTaskID) != 0 {
Expand Down
19 changes: 10 additions & 9 deletions pkg/engine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ import (
)

const (
TaskRegisterObj = "RegisterObj"
TaskSubmitObj = "SubmitObj"
TaskUpdateObj = "UpdateObj"
TaskCheckObj = "CheckObj"
TaskDeleteObj = "DeleteObj"
TaskCheckPod = "CheckPod"
TaskUpdateNodes = "UpdateNodes"
TaskSleep = "Sleep"
TaskPause = "Pause"
TaskRegisterObj = "RegisterObj"
TaskSubmitObj = "SubmitObj"
TaskUpdateObj = "UpdateObj"
TaskCheckObj = "CheckObj"
TaskDeleteObj = "DeleteObj"
TaskCheckPod = "CheckPod"
TaskUpdateNodes = "UpdateNodes"
TaskSleep = "Sleep"
TaskPause = "Pause"
DefaultCleanupTimeout = 10 * time.Minute
)

type Runnable interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/update_nodes_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestUpdateNodesTask(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)
_, err = eng.GetTask(&config.Task{
ID: taskID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/update_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestNewUpdateObjTask(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
eng, err := New(testLogger, nil, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objInfoMap[tc.refTaskId] = nil
Expand Down
Loading