diff --git a/pkg/controller/sparkapplication/controller.go b/pkg/controller/sparkapplication/controller.go index ce03fb97a..2aafb4b14 100644 --- a/pkg/controller/sparkapplication/controller.go +++ b/pkg/controller/sparkapplication/controller.go @@ -22,7 +22,7 @@ import ( "fmt" "math/big" "os/exec" - "strings" + "sync" "time" diff --git a/pkg/controller/sparkapplication/controller_test.go b/pkg/controller/sparkapplication/controller_test.go index 44f9003db..38bb7e4b0 100644 --- a/pkg/controller/sparkapplication/controller_test.go +++ b/pkg/controller/sparkapplication/controller_test.go @@ -67,8 +67,9 @@ func newFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Cont }, metav1.CreateOptions{}) podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second) + controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder, - &util.MetricConfig{}, "", "", nil, true) + &util.MetricConfig{}, "", "", nil, true, 3) informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer() if app != nil { @@ -95,14 +96,14 @@ func TestOnAdd(t *testing.T) { Status: v1beta2.SparkApplicationStatus{}, } ctrl.onAdd(app) - - item, _ := ctrl.queue.Get() - defer ctrl.queue.Done(item) + q := ctrl.GetOrCreateRelevantQueue(app.Name).Queue + item, _ := q.Get() + defer q.Done(item) key, ok := item.(string) assert.True(t, ok) expectedKey, _ := cache.MetaNamespaceKeyFunc(app) assert.Equal(t, expectedKey, key) - ctrl.queue.Forget(item) + q.Forget(item) } func TestOnUpdate(t *testing.T) { @@ -131,13 +132,14 @@ func TestOnUpdate(t *testing.T) { ctrl.onUpdate(appTemplate, copyWithSameSpec) // Verify that the SparkApplication was enqueued but no spec update events fired. - item, _ := ctrl.queue.Get() + q := ctrl.GetOrCreateRelevantQueue(appTemplate.Name).Queue + item, _ := q.Get() key, ok := item.(string) assert.True(t, ok) expectedKey, _ := cache.MetaNamespaceKeyFunc(appTemplate) assert.Equal(t, expectedKey, key) - ctrl.queue.Forget(item) - ctrl.queue.Done(item) + q.Forget(item) + q.Done(item) assert.Equal(t, 0, len(recorder.Events)) // Case2: Spec update failed. @@ -157,13 +159,14 @@ func TestOnUpdate(t *testing.T) { ctrl.onUpdate(appTemplate, copyWithSpecUpdate) // Verify App was enqueued. - item, _ = ctrl.queue.Get() + + item, _ = q.Get() key, ok = item.(string) assert.True(t, ok) expectedKey, _ = cache.MetaNamespaceKeyFunc(appTemplate) assert.Equal(t, expectedKey, key) - ctrl.queue.Forget(item) - ctrl.queue.Done(item) + q.Forget(item) + q.Done(item) // Verify that update was succeeded. assert.Equal(t, 1, len(recorder.Events)) event = <-recorder.Events @@ -186,16 +189,17 @@ func TestOnDelete(t *testing.T) { Status: v1beta2.SparkApplicationStatus{}, } ctrl.onAdd(app) - ctrl.queue.Get() + q := ctrl.GetOrCreateRelevantQueue(app.Name).Queue + q.Get() ctrl.onDelete(app) - ctrl.queue.ShutDown() - item, _ := ctrl.queue.Get() - defer ctrl.queue.Done(item) + q.ShutDown() + item, _ := q.Get() + defer q.Done(item) assert.True(t, item == nil) event := <-recorder.Events assert.True(t, strings.Contains(event, "SparkApplicationDeleted")) - ctrl.queue.Forget(item) + q.Forget(item) } func TestHelperProcessFailure(t *testing.T) { @@ -275,7 +279,8 @@ func TestSyncSparkApplication_SubmissionFailed(t *testing.T) { } // Attempt 1 - err = ctrl.syncSparkApplication("default/foo") + var shouldDelete bool + err, shouldDelete = ctrl.syncSparkApplication("default/foo") updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State) @@ -296,7 +301,9 @@ func TestSyncSparkApplication_SubmissionFailed(t *testing.T) { if err != nil { t.Fatal(err) } - err = ctrl.syncSparkApplication("default/foo") + + err, shouldDelete = ctrl.syncSparkApplication("default/foo") + assert.Equal(t, false, shouldDelete) // Verify that the application failed again. updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) @@ -315,7 +322,7 @@ func TestSyncSparkApplication_SubmissionFailed(t *testing.T) { if err != nil { t.Fatal(err) } - err = ctrl.syncSparkApplication("default/foo") + err, shouldDelete = ctrl.syncSparkApplication("default/foo") // Verify that the application failed again. updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) @@ -604,8 +611,9 @@ func TestSyncSparkApplication_SubmissionSuccess(t *testing.T) { cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"} return cmd } - - err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", test.app.Namespace, test.app.Name)) + var shouldDelete bool + err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", test.app.Namespace, test.app.Name)) + assert.Equal(t, false, shouldDelete) assert.Nil(t, err) updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(test.app.Namespace).Get(context.TODO(), test.app.Name, metav1.GetOptions{}) assert.Nil(t, err) @@ -1473,8 +1481,9 @@ func TestSyncSparkApplication_ExecutingState(t *testing.T) { if test.executorPod != nil { ctrl.kubeClient.CoreV1().Pods(app.Namespace).Create(context.TODO(), test.executorPod, metav1.CreateOptions{}) } - - err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + var shouldDelete bool + err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + assert.Equal(t, shouldDelete, false) assert.Nil(t, err) // Verify application and executor states. updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) @@ -1550,7 +1559,9 @@ func TestSyncSparkApplication_ApplicationExpired(t *testing.T) { if err != nil { t.Fatal(err) } - err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + var shouldDelete bool + err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + assert.Equal(t, shouldDelete, true) assert.Nil(t, err) _, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) @@ -1594,7 +1605,9 @@ func TestIngressWithSubpathAffectsSparkConfiguration(t *testing.T) { if err != nil { t.Fatal(err) } - err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + var shouldDelete bool + err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + assert.Equal(t, shouldDelete, false) assert.Nil(t, err) deployedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) if err != nil { @@ -1647,7 +1660,9 @@ func TestIngressWithClassName(t *testing.T) { if err != nil { t.Fatal(err) } - err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + var shouldDelete bool + err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + assert.Equal(t, shouldDelete, false) assert.Nil(t, err) _, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) if err != nil {