Skip to content

Commit

Permalink
Queue per app map, qps params, fix some tests, add helm values and so…
Browse files Browse the repository at this point in the history
…me docs

Signed-off-by: bnetzi <[email protected]>
  • Loading branch information
bnetzi committed Jun 19, 2024
1 parent a714420 commit 0f3789f
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 107 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ USER root
COPY --from=builder /usr/bin/spark-operator /usr/bin/
RUN apt-get update --allow-releaseinfo-change \
&& apt-get update \
&& apt-get install -y tini \
&& apt-get install -y tini htop\
&& rm -rf /var/lib/apt/lists/*

COPY entrypoint.sh /usr/bin/
COPY export-pprof.sh /usr/bin/

ENTRYPOINT ["/usr/bin/entrypoint.sh"]
2 changes: 2 additions & 0 deletions charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum
| imagePullSecrets | list | `[]` | Image pull secrets |
| ingressUrlFormat | string | `""` | Ingress URL format. Requires the UI service to be enabled by setting `uiService.enable` to true. |
| istio.enabled | bool | `false` | When using `istio`, spark jobs need to run without a sidecar to properly terminate |
| k8sBurstQps | int | `200` | K8s Api burst config |
| k8sQps | int | `100` | K8s Api qps config |
| labelSelectorFilter | string | `""` | A comma-separated list of key=value, or key labels to filter resources during watch and list based on the specified labels. |
| leaderElection.lockName | string | `"spark-operator-lock"` | Leader election lock name. Ref: https://github.com/kubeflow/spark-operator/blob/master/docs/user-guide.md#enabling-leader-election-for-high-availability. |
| leaderElection.lockNamespace | string | `""` | Optionally store the lock in another namespace. Defaults to operator's namespace |
Expand Down
5 changes: 5 additions & 0 deletions charts/spark-operator-chart/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ spec:
args:
- -v={{ .Values.logLevel }}
- -logtostderr
- -max-queue-time-without-update-in-minutes=30
- -queue-cleaner-interval-in-minutes=10
- -enable-profiling=false
- -api-qps={{ .Values.k8sBurstQps }}
- -api-burst={{ .Values.k8sQps }}
{{- if eq (len $jobNamespaces) 1 }}
- -namespace={{ index $jobNamespaces 0 }}
{{- end }}
Expand Down
6 changes: 6 additions & 0 deletions charts/spark-operator-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ sparkJobNamespaces:
# -- Operator concurrency, higher values might increase memory usage
controllerThreads: 10

# -- K8s Api qps config
k8sQps: 100

# -- K8s Api burst config
k8sBurstQps: 200

# -- Operator resync interval. Note that the operator will respond to events (e.g. create, update)
# unrelated to this setting
resyncInterval: 30
Expand Down
25 changes: 25 additions & 0 deletions docs/api-docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2873,6 +2873,31 @@ string
</tr>
<tr>
<td>
<code>memoryLimit</code><br/>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>MemoryLimit is the pod limit, used in cases we want to enable the memory limit for the pod.</p>
</td>
</tr>
<tr>
<td>
<code>memoryRequestOverride</code><br/>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>MemoryRequestOverride is an optional paramter, enables to define pod memory request that is less than the java memory + overhead
Which is used in spark by default.</p>
</td>
</tr>
<tr>
<td>
<code>gpu</code><br/>
<em>
<a href="#sparkoperator.k8s.io/v1beta2.GPUSpec">
Expand Down
41 changes: 41 additions & 0 deletions export-pprof.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/bin/bash

# Generate a timestamp
timestamp=$(date +%Y-%m-%d_%H-%M-%S)

# Function to fetch goroutine profile
fetch_goroutine_profile() {
local profile_type=$1 # 'start' or 'end'
local profile_path="/tmp/goroutine-profile-debug1-$profile_type-$timestamp.prof"
echo "Starting to fetch Goroutine profile ($profile_type) at $(date)..."
curl "http://localhost:6060/debug/pprof/goroutine?debug=1" -o "$profile_path"
echo "Completed fetching Goroutine profile ($profile_type) at $(date). File saved to $profile_path"
}

echo "Starting to fetch CPU profile at $(date)..."
cpu_profile="/tmp/cpu-profile-30sec-$timestamp.prof"
curl "http://localhost:6060/debug/pprof/profile?seconds=30" -o "$cpu_profile" &
echo "CPU profile fetch initiated, running for 30 seconds..."

echo "Starting to fetch Trace profile at $(date)..."
trace_profile="/tmp/trace-profile-30sec-$timestamp.prof"
curl "http://localhost:6060/debug/pprof/trace?seconds=30" -o "$trace_profile" &
echo "Trace profile fetch initiated, running for 30 seconds..."

echo "Fetching initial Goroutine profile..."
fetch_goroutine_profile "start" &

# Wait for CPU and trace profiling to complete
wait

echo "Starting to fetch final Goroutine profile after waiting for other profiles to complete..."
fetch_goroutine_profile "end"

echo "All profiling data collected"

# Copying profiles to S3 bucket
echo "CPU profile output - $cpu_profile"
echo "Trace profile output - $trace_profile"
echo "Initial Goroutine profile output - /tmp/goroutine-profile-debug1-start-$timestamp.prof"
echo "Final Goroutine profile output - /tmp/goroutine-profile-debug1-end-$timestamp.prof"

76 changes: 48 additions & 28 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strings"
Expand Down Expand Up @@ -49,39 +51,48 @@ import (
"github.com/kubeflow/spark-operator/pkg/controller/sparkapplication"
"github.com/kubeflow/spark-operator/pkg/util"
"github.com/kubeflow/spark-operator/pkg/webhook"

_ "net/http/pprof"
"runtime"
)

var (
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
controllerThreads = flag.Int("controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
labelSelectorFilter = flag.String("label-selector-filter", "", "A comma-separated list of key=value, or key labels to filter resources during watch and list based on the specified labels.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
webhookTimeout = flag.Int("webhook-timeout", 30, "Webhook Timeout in seconds before the webhook returns a timeout")
enableResourceQuotaEnforcement = flag.Bool("enable-resource-quota-enforcement", false, "Whether to enable ResourceQuota enforcement for SparkApplication resources. Requires the webhook to be enabled.")
ingressURLFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
enableUIService = flag.Bool("enable-ui-service", true, "Enable Spark service UI.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable Spark operator leader election.")
leaderElectionLockNamespace = flag.String("leader-election-lock-namespace", "spark-operator", "Namespace in which to create the ConfigMap for leader election.")
leaderElectionLockName = flag.String("leader-election-lock-name", "spark-operator-lock", "Name of the ConfigMap for leader election.")
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Leader election lease duration.")
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 4*time.Second, "Leader election retry period.")
enableBatchScheduler = flag.Bool("enable-batch-scheduler", false, fmt.Sprintf("Enable batch schedulers for pods' scheduling, the available batch schedulers are: (%s).", strings.Join(batchscheduler.GetRegisteredNames(), ",")))
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressClassName = flag.String("ingress-class-name", "", "Set ingressClassName for ingress resources created.")
maxQueueTimeWithoutUpdateInMinutes = flag.Duration("max-queue-time-without-update-in-minutes", 30*time.Minute, "Sets the maximum time that queue can be without update before it is considered as deleted.")
queueCleanerIntervalInMinutes = flag.Duration("queue-cleaner-interval-in-minutes", 30*time.Minute, "Sets the interval time for the queue cleaner.")
metricsLabels util.ArrayFlags
metricsJobStartLatencyBuckets util.HistogramBuckets = util.DefaultJobStartLatencyBuckets
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
controllerThreads = flag.Int("controller-threads", 10, "Number of app queues map that will be created and used by the SparkApplication controller.")
enableProfiling = flag.Bool("enable-profiling", false, "Whether to enable pprof server profiling")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
labelSelectorFilter = flag.String("label-selector-filter", "", "A comma-separated list of key=value, or key labels to filter resources during watch and list based on the specified labels.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
webhookTimeout = flag.Int("webhook-timeout", 30, "Webhook Timeout in seconds before the webhook returns a timeout")
enableResourceQuotaEnforcement = flag.Bool("enable-resource-quota-enforcement", false, "Whether to enable ResourceQuota enforcement for SparkApplication resources. Requires the webhook to be enabled.")
ingressURLFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
enableUIService = flag.Bool("enable-ui-service", true, "Enable Spark service UI.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable Spark operator leader election.")
leaderElectionLockNamespace = flag.String("leader-election-lock-namespace", "spark-operator", "Namespace in which to create the ConfigMap for leader election.")
leaderElectionLockName = flag.String("leader-election-lock-name", "spark-operator-lock", "Name of the ConfigMap for leader election.")
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Leader election lease duration.")
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 4*time.Second, "Leader election retry period.")
enableBatchScheduler = flag.Bool("enable-batch-scheduler", false, fmt.Sprintf("Enable batch schedulers for pods' scheduling, the available batch schedulers are: (%s).", strings.Join(batchscheduler.GetRegisteredNames(), ",")))
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressClassName = flag.String("ingress-class-name", "", "Set ingressClassName for ingress resources created.")
maxQueueTimeWithoutUpdateInMinutes = flag.Int("max-queue-time-without-update-in-minutes", 30, "Sets the maximum time that queue can be without update before it is considered as deleted.")
queueCleanerIntervalInMinutes = flag.Int("queue-cleaner-interval-in-minutes", 10, "Sets the interval time for the queue cleaner.")
apiQps = flag.Float64("api-qps", 100.00, "k8s api qps configuration")
apiBurst = flag.Int("api-burst", 200, "k8s api burst configuration")

metricsLabels util.ArrayFlags
metricsJobStartLatencyBuckets util.HistogramBuckets = util.DefaultJobStartLatencyBuckets
)

func main() {


flag.Var(&metricsLabels, "metrics-labels", "Labels for the metrics")
flag.Var(&metricsJobStartLatencyBuckets, "metrics-job-start-latency-buckets",
"Comma-separated boundary values (in seconds) for the job start latency histogram bucket; "+
Expand All @@ -93,6 +104,8 @@ func main() {
if err != nil {
glog.Fatal(err)
}
config.QPS = float32(*apiQps)
config.Burst = *apiBurst
kubeClient, err := clientset.NewForConfig(config)
if err != nil {
glog.Fatal(err)
Expand All @@ -104,6 +117,13 @@ func main() {
stopCh := make(chan struct{}, 1)
startCh := make(chan struct{}, 1)

go func() {
if *enableProfiling {
runtime.SetMutexProfileFraction(1) // Enable mutex profiling
log.Println(http.ListenAndServe("localhost:6060", nil))
}
}()

if *enableLeaderElection {
podName := os.Getenv("POD_NAME")
hostname, err := os.Hostname()
Expand Down Expand Up @@ -193,7 +213,7 @@ func main() {
}

applicationController := sparkapplication.NewController(
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, *ingressClassName, batchSchedulerMgr, *enableUIService)
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, *ingressClassName, batchSchedulerMgr, *enableUIService, *controllerThreads)
scheduledApplicationController := scheduledsparkapplication.NewController(
crClient, kubeClient, apiExtensionsClient, crInformerFactory, clock.RealClock{})

Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,13 @@ type SparkPodSpec struct {
// MemoryOverhead is the amount of off-heap memory to allocate in cluster mode, in MiB unless otherwise specified.
// +optional
MemoryOverhead *string `json:"memoryOverhead,omitempty"`
// MemoryLimit is the pod limit, used in cases we want to enable the memory limit for the pod.
// +optional
MemoryLimit *string `json:"memoryLimit,omitempty"`
// MemoryRequestOverride is an optional paramter, enables to define pod memory request that is less than the java memory + overhead
// Which is used in spark by default.
// +optional
MemoryRequestOverride *string `json:"memoryRequestOverride,omitempty"`
// GPU specifies GPU requirement for the pod.
// +optional
GPU *GPUSpec `json:"gpu,omitempty"`
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0f3789f

Please sign in to comment.