From 9cc1c02c64f6114b9f3c7d89d9c88d97014d9ba4 Mon Sep 17 00:00:00 2001 From: Jacob Salway Date: Wed, 28 Aug 2024 12:53:03 +1000 Subject: [PATCH] Add default batch scheduler argument (#2143) * Add default batch scheduler argument Signed-off-by: Jacob Salway * Add helm unit test Signed-off-by: Jacob Salway --------- Signed-off-by: Jacob Salway --- charts/spark-operator-chart/README.md | 1 + .../templates/controller/deployment.yaml | 3 ++- .../tests/controller/deployment_test.yaml | 11 +++++++++ charts/spark-operator-chart/values.yaml | 4 ++++ cmd/operator/controller/start.go | 17 +++++++++++--- .../controller/sparkapplication/controller.go | 23 ++++++++++++++----- 6 files changed, 49 insertions(+), 10 deletions(-) diff --git a/charts/spark-operator-chart/README.md b/charts/spark-operator-chart/README.md index d8489012..42cd579b 100644 --- a/charts/spark-operator-chart/README.md +++ b/charts/spark-operator-chart/README.md @@ -90,6 +90,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum | controller.uiIngress.enable | bool | `false` | Specifies whether to create ingress for Spark web UI. `controller.uiService.enable` must be `true` to enable ingress. | | controller.uiIngress.urlFormat | string | `""` | Ingress URL format. Required if `controller.uiIngress.enable` is true. | | controller.batchScheduler.enable | bool | `false` | Specifies whether to enable batch scheduler for spark jobs scheduling. If enabled, users can specify batch scheduler name in spark application. | +| controller.batchScheduler.default | string | `""` | Default batch scheduler to be used if not specified by the user. If specified, this value must be either "volcano" or "yunikorn". Specifying any other value will cause the controller to error on startup. | | controller.serviceAccount.create | bool | `true` | Specifies whether to create a service account for the controller. | | controller.serviceAccount.name | string | `""` | Optional name for the controller service account. | | controller.serviceAccount.annotations | object | `{}` | Extra annotations for the controller service account. | diff --git a/charts/spark-operator-chart/templates/controller/deployment.yaml b/charts/spark-operator-chart/templates/controller/deployment.yaml index 02f9c2c9..cbb3d874 100644 --- a/charts/spark-operator-chart/templates/controller/deployment.yaml +++ b/charts/spark-operator-chart/templates/controller/deployment.yaml @@ -70,8 +70,9 @@ spec: - --ingress-url-format={{ . }} {{- end }} {{- end }} - {{- with .Values.controller.batchScheduler.enable }} + {{- if .Values.controller.batchScheduler.enable }} - --enable-batch-scheduler=true + - --default-batch-scheduler={{ .Values.controller.batchScheduler.default }} {{- end }} {{- if .Values.prometheus.metrics.enable }} - --enable-metrics=true diff --git a/charts/spark-operator-chart/tests/controller/deployment_test.yaml b/charts/spark-operator-chart/tests/controller/deployment_test.yaml index e4b6983a..4fa7b156 100644 --- a/charts/spark-operator-chart/tests/controller/deployment_test.yaml +++ b/charts/spark-operator-chart/tests/controller/deployment_test.yaml @@ -160,6 +160,17 @@ tests: path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args content: --enable-batch-scheduler=true + - it: Should contain `--default-batch-scheduler` arg if `controller.batchScheduler.default` is set + set: + controller: + batchScheduler: + enable: true + default: yunikorn + asserts: + - contains: + path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args + content: --default-batch-scheduler=yunikorn + - it: Should contain `--enable-metrics` arg if `prometheus.metrics.enable` is set to `true` set: prometheus: diff --git a/charts/spark-operator-chart/values.yaml b/charts/spark-operator-chart/values.yaml index 6555ec4f..aaa0930b 100644 --- a/charts/spark-operator-chart/values.yaml +++ b/charts/spark-operator-chart/values.yaml @@ -67,6 +67,10 @@ controller: # -- Specifies whether to enable batch scheduler for spark jobs scheduling. # If enabled, users can specify batch scheduler name in spark application. enable: false + # -- Default batch scheduler to be used if not specified by the user. + # If specified, this value must be either "volcano" or "yunikorn". Specifying any other + # value will cause the controller to error on startup. + default: "" serviceAccount: # -- Specifies whether to create a service account for the controller. diff --git a/cmd/operator/controller/start.go b/cmd/operator/controller/start.go index 6f851ed7..3c0d83fb 100644 --- a/cmd/operator/controller/start.go +++ b/cmd/operator/controller/start.go @@ -20,6 +20,7 @@ import ( "crypto/tls" "flag" "os" + "slices" "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) @@ -72,7 +73,8 @@ var ( cacheSyncTimeout time.Duration // Batch scheduler - enableBatchScheduler bool + enableBatchScheduler bool + defaultBatchScheduler string // Spark web UI service and ingress enableUIService bool @@ -128,6 +130,8 @@ func NewStartCommand() *cobra.Command { command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.") command.Flags().BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "Enable batch schedulers.") + command.Flags().StringVar(&defaultBatchScheduler, "default-batch-scheduler", "", "Default batch scheduler.") + command.Flags().BoolVar(&enableUIService, "enable-ui-service", true, "Enable Spark Web UI service.") command.Flags().StringVar(&ingressClassName, "ingress-class-name", "", "Set ingressClassName for ingress resources created.") command.Flags().StringVar(&ingressURLFormat, "ingress-url-format", "", "Ingress URL format.") @@ -207,8 +211,14 @@ func start() { var registry *scheduler.Registry if enableBatchScheduler { registry = scheduler.GetRegistry() - registry.Register(common.VolcanoSchedulerName, volcano.Factory) - registry.Register(yunikorn.SchedulerName, yunikorn.Factory) + _ = registry.Register(common.VolcanoSchedulerName, volcano.Factory) + _ = registry.Register(yunikorn.SchedulerName, yunikorn.Factory) + + schedulerNames := registry.GetRegisteredSchedulerNames() + if defaultBatchScheduler != "" && !slices.Contains(schedulerNames, defaultBatchScheduler) { + logger.Error(nil, "Failed to find default batch scheduler in registered schedulers") + os.Exit(1) + } } // Setup controller for SparkApplication. @@ -348,6 +358,7 @@ func newSparkApplicationReconcilerOptions() sparkapplication.Options { EnableUIService: enableUIService, IngressClassName: ingressClassName, IngressURLFormat: ingressURLFormat, + DefaultBatchScheduler: defaultBatchScheduler, SparkApplicationMetrics: sparkApplicationMetrics, SparkExecutorMetrics: sparkExecutorMetrics, } diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index f938e4c9..9f2917cb 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -54,10 +54,11 @@ var ( // Options defines the options of the controller. type Options struct { - Namespaces []string - EnableUIService bool - IngressClassName string - IngressURLFormat string + Namespaces []string + EnableUIService bool + IngressClassName string + IngressURLFormat string + DefaultBatchScheduler string SparkApplicationMetrics *metrics.SparkApplicationMetrics SparkExecutorMetrics *metrics.SparkExecutorMetrics @@ -1184,14 +1185,24 @@ func (r *Reconciler) resetSparkApplicationStatus(app *v1beta2.SparkApplication) } func (r *Reconciler) shouldDoBatchScheduling(app *v1beta2.SparkApplication) (bool, scheduler.Interface) { - if r.registry == nil || app.Spec.BatchScheduler == nil || *app.Spec.BatchScheduler == "" { + // If batch scheduling isn't enabled + if r.registry == nil { + return false, nil + } + + schedulerName := r.options.DefaultBatchScheduler + if app.Spec.BatchScheduler != nil && *app.Spec.BatchScheduler != "" { + schedulerName = *app.Spec.BatchScheduler + } + + // If both the default and app batch scheduler are unspecified or empty + if schedulerName == "" { return false, nil } var err error var scheduler scheduler.Interface - schedulerName := *app.Spec.BatchScheduler switch schedulerName { case common.VolcanoSchedulerName: config := &volcano.Config{