Skip to content

Commit

Permalink
Qps merger (#5)
Browse files Browse the repository at this point in the history
Fix tests
  • Loading branch information
bnetzi authored Jun 5, 2024
1 parent 5ee95e6 commit 926f185
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"fmt"
"math/big"
"os/exec"
"strings"

"sync"
"time"

Expand Down
67 changes: 41 additions & 26 deletions pkg/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ func newFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Cont
}, metav1.CreateOptions{})

podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second)

controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder,
&util.MetricConfig{}, "", "", nil, true)
&util.MetricConfig{}, "", "", nil, true, 3)

informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
if app != nil {
Expand All @@ -95,14 +96,14 @@ func TestOnAdd(t *testing.T) {
Status: v1beta2.SparkApplicationStatus{},
}
ctrl.onAdd(app)

item, _ := ctrl.queue.Get()
defer ctrl.queue.Done(item)
q := ctrl.GetOrCreateRelevantQueue(app.Name).Queue
item, _ := q.Get()
defer q.Done(item)
key, ok := item.(string)
assert.True(t, ok)
expectedKey, _ := cache.MetaNamespaceKeyFunc(app)
assert.Equal(t, expectedKey, key)
ctrl.queue.Forget(item)
q.Forget(item)
}

func TestOnUpdate(t *testing.T) {
Expand Down Expand Up @@ -131,13 +132,14 @@ func TestOnUpdate(t *testing.T) {
ctrl.onUpdate(appTemplate, copyWithSameSpec)

// Verify that the SparkApplication was enqueued but no spec update events fired.
item, _ := ctrl.queue.Get()
q := ctrl.GetOrCreateRelevantQueue(appTemplate.Name).Queue
item, _ := q.Get()
key, ok := item.(string)
assert.True(t, ok)
expectedKey, _ := cache.MetaNamespaceKeyFunc(appTemplate)
assert.Equal(t, expectedKey, key)
ctrl.queue.Forget(item)
ctrl.queue.Done(item)
q.Forget(item)
q.Done(item)
assert.Equal(t, 0, len(recorder.Events))

// Case2: Spec update failed.
Expand All @@ -157,13 +159,14 @@ func TestOnUpdate(t *testing.T) {
ctrl.onUpdate(appTemplate, copyWithSpecUpdate)

// Verify App was enqueued.
item, _ = ctrl.queue.Get()

item, _ = q.Get()
key, ok = item.(string)
assert.True(t, ok)
expectedKey, _ = cache.MetaNamespaceKeyFunc(appTemplate)
assert.Equal(t, expectedKey, key)
ctrl.queue.Forget(item)
ctrl.queue.Done(item)
q.Forget(item)
q.Done(item)
// Verify that update was succeeded.
assert.Equal(t, 1, len(recorder.Events))
event = <-recorder.Events
Expand All @@ -186,16 +189,17 @@ func TestOnDelete(t *testing.T) {
Status: v1beta2.SparkApplicationStatus{},
}
ctrl.onAdd(app)
ctrl.queue.Get()
q := ctrl.GetOrCreateRelevantQueue(app.Name).Queue
q.Get()

ctrl.onDelete(app)
ctrl.queue.ShutDown()
item, _ := ctrl.queue.Get()
defer ctrl.queue.Done(item)
q.ShutDown()
item, _ := q.Get()
defer q.Done(item)
assert.True(t, item == nil)
event := <-recorder.Events
assert.True(t, strings.Contains(event, "SparkApplicationDeleted"))
ctrl.queue.Forget(item)
q.Forget(item)
}

func TestHelperProcessFailure(t *testing.T) {
Expand Down Expand Up @@ -275,7 +279,8 @@ func TestSyncSparkApplication_SubmissionFailed(t *testing.T) {
}

// Attempt 1
err = ctrl.syncSparkApplication("default/foo")
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication("default/foo")
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})

assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State)
Expand All @@ -296,7 +301,9 @@ func TestSyncSparkApplication_SubmissionFailed(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication("default/foo")

err, shouldDelete = ctrl.syncSparkApplication("default/foo")
assert.Equal(t, false, shouldDelete)

// Verify that the application failed again.
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
Expand All @@ -315,7 +322,7 @@ func TestSyncSparkApplication_SubmissionFailed(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication("default/foo")
err, shouldDelete = ctrl.syncSparkApplication("default/foo")

// Verify that the application failed again.
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -604,8 +611,9 @@ func TestSyncSparkApplication_SubmissionSuccess(t *testing.T) {
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}

err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", test.app.Namespace, test.app.Name))
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", test.app.Namespace, test.app.Name))
assert.Equal(t, false, shouldDelete)
assert.Nil(t, err)
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(test.app.Namespace).Get(context.TODO(), test.app.Name, metav1.GetOptions{})
assert.Nil(t, err)
Expand Down Expand Up @@ -1473,8 +1481,9 @@ func TestSyncSparkApplication_ExecutingState(t *testing.T) {
if test.executorPod != nil {
ctrl.kubeClient.CoreV1().Pods(app.Namespace).Create(context.TODO(), test.executorPod, metav1.CreateOptions{})
}

err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Equal(t, shouldDelete, false)
assert.Nil(t, err)
// Verify application and executor states.
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -1550,7 +1559,9 @@ func TestSyncSparkApplication_ApplicationExpired(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Equal(t, shouldDelete, true)
assert.Nil(t, err)

_, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -1594,7 +1605,9 @@ func TestIngressWithSubpathAffectsSparkConfiguration(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Equal(t, shouldDelete, false)
assert.Nil(t, err)
deployedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -1647,7 +1660,9 @@ func TestIngressWithClassName(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Equal(t, shouldDelete, false)
assert.Nil(t, err)
_, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
if err != nil {
Expand Down

0 comments on commit 926f185

Please sign in to comment.