Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qps merger #5

Merged
merged 25 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c0c3149
multi-queue
ben-tzionlmb Mar 17, 2024
15e783d
first version of the queue per spark app
Mar 28, 2024
e0dd347
Add MemoryLimit as option that will override spark pods limits (by us…
ben-tzionlmb Mar 31, 2024
1bdd22f
first version of the queue per spark app
Mar 31, 2024
d10c855
Merge branch 'add-memory-limits' of gitlab.mobileye.com:rem-cloud/rem…
ben-tzionlmb Mar 31, 2024
9cdff4a
added logs
Apr 2, 2024
e35a37f
added logs
Apr 2, 2024
67de33b
fixed names
Apr 4, 2024
b8b0563
prevent concurrent access to the appsQueues map
Apr 14, 2024
9150a5b
use RWmutex when accessing the appQueues
Apr 17, 2024
814cf34
Revert spark-operator image to general image
ben-tzionlmb Apr 17, 2024
41faea7
Make multiple maps for queue per app
ben-tzionlmb May 16, 2024
7f23db3
remove unneeded mutex lock
May 19, 2024
b874a40
Merge branch 'map-mutex' of gitlab.mobileye.com:rem-cloud/rem-applica…
ben-tzionlmb May 19, 2024
b04b838
Add logs
ben-tzionlmb May 20, 2024
b01496a
Initiate each Map of queues
ben-tzionlmb May 20, 2024
d05951c
Update list-map-mutex-prof-2 spark-operator image
ben-tzionlmb May 22, 2024
04f90d5
Add qps and burst as a parameter
ben-tzionlmb May 23, 2024
9c72627
Merge branch 'master' of github.com:bnetzi/spark-operator into qps-me…
ben-tzionlmb Jun 5, 2024
ed7831e
Merge branch 'master' of github.com:kubeflow/spark-operator into qps-…
ben-tzionlmb Jun 5, 2024
48ec26e
export-pprof.sh fixes
ben-tzionlmb Jun 5, 2024
1e391bf
Remove
ben-tzionlmb Jun 5, 2024
63efa24
Merge branch 'qps-merger' of gitlab.mobileye.com:rem-cloud/rem-applic…
ben-tzionlmb Jun 5, 2024
91807d3
Fix tests
ben-tzionlmb Jun 5, 2024
d737610
Merge branch 'qps-merger' of gitlab.mobileye.com:rem-cloud/rem-applic…
ben-tzionlmb Jun 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"fmt"
"math/big"
"os/exec"
"strings"

"sync"
"time"

Expand Down
67 changes: 41 additions & 26 deletions pkg/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ func newFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Cont
}, metav1.CreateOptions{})

podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second)

controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder,
&util.MetricConfig{}, "", "", nil, true)
&util.MetricConfig{}, "", "", nil, true, 3)

informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
if app != nil {
Expand All @@ -95,14 +96,14 @@ func TestOnAdd(t *testing.T) {
Status: v1beta2.SparkApplicationStatus{},
}
ctrl.onAdd(app)

item, _ := ctrl.queue.Get()
defer ctrl.queue.Done(item)
q := ctrl.GetOrCreateRelevantQueue(app.Name).Queue
item, _ := q.Get()
defer q.Done(item)
key, ok := item.(string)
assert.True(t, ok)
expectedKey, _ := cache.MetaNamespaceKeyFunc(app)
assert.Equal(t, expectedKey, key)
ctrl.queue.Forget(item)
q.Forget(item)
}

func TestOnUpdate(t *testing.T) {
Expand Down Expand Up @@ -131,13 +132,14 @@ func TestOnUpdate(t *testing.T) {
ctrl.onUpdate(appTemplate, copyWithSameSpec)

// Verify that the SparkApplication was enqueued but no spec update events fired.
item, _ := ctrl.queue.Get()
q := ctrl.GetOrCreateRelevantQueue(appTemplate.Name).Queue
item, _ := q.Get()
key, ok := item.(string)
assert.True(t, ok)
expectedKey, _ := cache.MetaNamespaceKeyFunc(appTemplate)
assert.Equal(t, expectedKey, key)
ctrl.queue.Forget(item)
ctrl.queue.Done(item)
q.Forget(item)
q.Done(item)
assert.Equal(t, 0, len(recorder.Events))

// Case2: Spec update failed.
Expand All @@ -157,13 +159,14 @@ func TestOnUpdate(t *testing.T) {
ctrl.onUpdate(appTemplate, copyWithSpecUpdate)

// Verify App was enqueued.
item, _ = ctrl.queue.Get()

item, _ = q.Get()
key, ok = item.(string)
assert.True(t, ok)
expectedKey, _ = cache.MetaNamespaceKeyFunc(appTemplate)
assert.Equal(t, expectedKey, key)
ctrl.queue.Forget(item)
ctrl.queue.Done(item)
q.Forget(item)
q.Done(item)
// Verify that update was succeeded.
assert.Equal(t, 1, len(recorder.Events))
event = <-recorder.Events
Expand All @@ -186,16 +189,17 @@ func TestOnDelete(t *testing.T) {
Status: v1beta2.SparkApplicationStatus{},
}
ctrl.onAdd(app)
ctrl.queue.Get()
q := ctrl.GetOrCreateRelevantQueue(app.Name).Queue
q.Get()

ctrl.onDelete(app)
ctrl.queue.ShutDown()
item, _ := ctrl.queue.Get()
defer ctrl.queue.Done(item)
q.ShutDown()
item, _ := q.Get()
defer q.Done(item)
assert.True(t, item == nil)
event := <-recorder.Events
assert.True(t, strings.Contains(event, "SparkApplicationDeleted"))
ctrl.queue.Forget(item)
q.Forget(item)
}

func TestHelperProcessFailure(t *testing.T) {
Expand Down Expand Up @@ -275,7 +279,8 @@ func TestSyncSparkApplication_SubmissionFailed(t *testing.T) {
}

// Attempt 1
err = ctrl.syncSparkApplication("default/foo")
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication("default/foo")
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})

assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State)
Expand All @@ -296,7 +301,9 @@ func TestSyncSparkApplication_SubmissionFailed(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication("default/foo")

err, shouldDelete = ctrl.syncSparkApplication("default/foo")
assert.Equal(t, false, shouldDelete)

// Verify that the application failed again.
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
Expand All @@ -315,7 +322,7 @@ func TestSyncSparkApplication_SubmissionFailed(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication("default/foo")
err, shouldDelete = ctrl.syncSparkApplication("default/foo")

// Verify that the application failed again.
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -604,8 +611,9 @@ func TestSyncSparkApplication_SubmissionSuccess(t *testing.T) {
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}

err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", test.app.Namespace, test.app.Name))
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", test.app.Namespace, test.app.Name))
assert.Equal(t, false, shouldDelete)
assert.Nil(t, err)
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(test.app.Namespace).Get(context.TODO(), test.app.Name, metav1.GetOptions{})
assert.Nil(t, err)
Expand Down Expand Up @@ -1473,8 +1481,9 @@ func TestSyncSparkApplication_ExecutingState(t *testing.T) {
if test.executorPod != nil {
ctrl.kubeClient.CoreV1().Pods(app.Namespace).Create(context.TODO(), test.executorPod, metav1.CreateOptions{})
}

err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Equal(t, shouldDelete, false)
assert.Nil(t, err)
// Verify application and executor states.
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -1550,7 +1559,9 @@ func TestSyncSparkApplication_ApplicationExpired(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Equal(t, shouldDelete, true)
assert.Nil(t, err)

_, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -1594,7 +1605,9 @@ func TestIngressWithSubpathAffectsSparkConfiguration(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Equal(t, shouldDelete, false)
assert.Nil(t, err)
deployedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -1647,7 +1660,9 @@ func TestIngressWithClassName(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
var shouldDelete bool
err, shouldDelete = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Equal(t, shouldDelete, false)
assert.Nil(t, err)
_, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
if err != nil {
Expand Down
Loading