/* Copyright 2024 The Kubeflow authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controller import ( "crypto/tls" "flag" "os" "slices" "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" "github.com/spf13/cobra" "github.com/spf13/viper" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/healthz" logzap "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ctrlwebhook "sigs.k8s.io/controller-runtime/pkg/webhook" schedulingv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" sparkoperator "github.com/kubeflow/spark-operator" "github.com/kubeflow/spark-operator/api/v1beta2" "github.com/kubeflow/spark-operator/internal/controller/scheduledsparkapplication" "github.com/kubeflow/spark-operator/internal/controller/sparkapplication" "github.com/kubeflow/spark-operator/internal/metrics" "github.com/kubeflow/spark-operator/internal/scheduler" "github.com/kubeflow/spark-operator/internal/scheduler/kubescheduler" "github.com/kubeflow/spark-operator/internal/scheduler/volcano" "github.com/kubeflow/spark-operator/internal/scheduler/yunikorn" "github.com/kubeflow/spark-operator/pkg/common" "github.com/kubeflow/spark-operator/pkg/util" // +kubebuilder:scaffold:imports ) var ( scheme = runtime.NewScheme() logger = ctrl.Log.WithName("") ) var ( namespaces []string // Controller controllerThreads int cacheSyncTimeout time.Duration maxTrackedExecutorPerApp int //WorkQueue workqueueRateLimiterBucketQPS int workqueueRateLimiterBucketSize int workqueueRateLimiterMaxDelay time.Duration // Batch scheduler enableBatchScheduler bool kubeSchedulerNames []string defaultBatchScheduler string // Spark web UI service and ingress enableUIService bool ingressClassName string ingressURLFormat string // Leader election enableLeaderElection bool leaderElectionLockName string leaderElectionLockNamespace string leaderElectionLeaseDuration time.Duration leaderElectionRenewDeadline time.Duration leaderElectionRetryPeriod time.Duration driverPodCreationGracePeriod time.Duration // Metrics enableMetrics bool metricsBindAddress string metricsEndpoint string metricsPrefix string metricsLabels []string metricsJobStartLatencyBuckets []float64 healthProbeBindAddress string pprofBindAddress string secureMetrics bool enableHTTP2 bool development bool zapOptions = logzap.Options{} ) func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(schedulingv1alpha1.AddToScheme(scheme)) utilruntime.Must(v1beta2.AddToScheme(scheme)) // +kubebuilder:scaffold:scheme } func NewStartCommand() *cobra.Command { var command = &cobra.Command{ Use: "start", Short: "Start controller and webhook", PreRun: func(_ *cobra.Command, args []string) { development = viper.GetBool("development") }, Run: func(_ *cobra.Command, args []string) { sparkoperator.PrintVersion(false) start() }, } command.Flags().IntVar(&controllerThreads, "controller-threads", 10, "Number of worker threads used by the SparkApplication controller.") command.Flags().StringSliceVar(&namespaces, "namespaces", []string{}, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset or contains empty string.") command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.") command.Flags().IntVar(&maxTrackedExecutorPerApp, "max-tracked-executor-per-app", 1000, "The maximum number of tracked executors per SparkApplication.") command.Flags().IntVar(&workqueueRateLimiterBucketQPS, "workqueue-ratelimiter-bucket-qps", 10, "QPS of the bucket rate of the workqueue.") command.Flags().IntVar(&workqueueRateLimiterBucketSize, "workqueue-ratelimiter-bucket-size", 100, "The token bucket size of the workqueue.") command.Flags().DurationVar(&workqueueRateLimiterMaxDelay, "workqueue-ratelimiter-max-delay", rate.InfDuration, "The maximum delay of the workqueue.") command.Flags().BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "Enable batch schedulers.") command.Flags().StringSliceVar(&kubeSchedulerNames, "kube-scheduler-names", []string{}, "The kube-scheduler names for scheduling Spark applications.") command.Flags().StringVar(&defaultBatchScheduler, "default-batch-scheduler", "", "Default batch scheduler.") command.Flags().BoolVar(&enableUIService, "enable-ui-service", true, "Enable Spark Web UI service.") command.Flags().StringVar(&ingressClassName, "ingress-class-name", "", "Set ingressClassName for ingress resources created.") command.Flags().StringVar(&ingressURLFormat, "ingress-url-format", "", "Ingress URL format.") command.Flags().BoolVar(&enableLeaderElection, "leader-election", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") command.Flags().StringVar(&leaderElectionLockName, "leader-election-lock-name", "spark-operator-lock", "Name of the ConfigMap for leader election.") command.Flags().StringVar(&leaderElectionLockNamespace, "leader-election-lock-namespace", "spark-operator", "Namespace in which to create the ConfigMap for leader election.") command.Flags().DurationVar(&leaderElectionLeaseDuration, "leader-election-lease-duration", 15*time.Second, "Leader election lease duration.") command.Flags().DurationVar(&leaderElectionRenewDeadline, "leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.") command.Flags().DurationVar(&leaderElectionRetryPeriod, "leader-election-retry-period", 4*time.Second, "Leader election retry period.") command.Flags().DurationVar(&driverPodCreationGracePeriod, "driver-pod-creation-grace-period", 10*time.Second, "Grace period after a successful spark-submit when driver pod not found errors will be retried. Useful if the driver pod can take some time to be created.") command.Flags().BoolVar(&enableMetrics, "enable-metrics", false, "Enable metrics.") command.Flags().StringVar(&metricsBindAddress, "metrics-bind-address", "0", "The address the metric endpoint binds to. "+ "Use the port :8080. If not set, it will be 0 in order to disable the metrics server") command.Flags().StringVar(&metricsEndpoint, "metrics-endpoint", "/metrics", "Metrics endpoint.") command.Flags().StringVar(&metricsPrefix, "metrics-prefix", "", "Prefix for the metrics.") command.Flags().StringSliceVar(&metricsLabels, "metrics-labels", []string{}, "Labels to be added to the metrics.") command.Flags().Float64SliceVar(&metricsJobStartLatencyBuckets, "metrics-job-start-latency-buckets", []float64{30, 60, 90, 120, 150, 180, 210, 240, 270, 300}, "Buckets for the job start latency histogram.") command.Flags().StringVar(&healthProbeBindAddress, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") command.Flags().BoolVar(&secureMetrics, "secure-metrics", false, "If set the metrics endpoint is served securely") command.Flags().BoolVar(&enableHTTP2, "enable-http2", false, "If set, HTTP/2 will be enabled for the metrics and webhook servers") command.Flags().StringVar(&pprofBindAddress, "pprof-bind-address", "0", "The address the pprof endpoint binds to. "+ "If not set, it will be 0 in order to disable the pprof server") flagSet := flag.NewFlagSet("controller", flag.ExitOnError) ctrl.RegisterFlags(flagSet) zapOptions.BindFlags(flagSet) command.Flags().AddGoFlagSet(flagSet) return command } func start() { setupLog() // Create the client rest config. Use kubeConfig if given, otherwise assume in-cluster. cfg, err := ctrl.GetConfig() if err != nil { logger.Error(err, "failed to get kube config") os.Exit(1) } // Create the manager. tlsOptions := newTLSOptions() mgr, err := ctrl.NewManager(cfg, ctrl.Options{ Scheme: scheme, Cache: newCacheOptions(), Metrics: metricsserver.Options{ BindAddress: metricsBindAddress, SecureServing: secureMetrics, TLSOpts: tlsOptions, }, WebhookServer: ctrlwebhook.NewServer(ctrlwebhook.Options{ TLSOpts: tlsOptions, }), HealthProbeBindAddress: healthProbeBindAddress, PprofBindAddress: pprofBindAddress, LeaderElection: enableLeaderElection, LeaderElectionID: leaderElectionLockName, LeaderElectionNamespace: leaderElectionLockNamespace, // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily // when the Manager ends. This requires the binary to immediately end when the // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly // speeds up voluntary leader transitions as the new leader don't have to wait // LeaseDuration time first. // // In the default scaffold provided, the program ends immediately after // the manager stops, so would be fine to enable this option. However, // if you are doing or is intended to do any operation such as perform cleanups // after the manager stops then its usage might be unsafe. // LeaderElectionReleaseOnCancel: true, }) if err != nil { logger.Error(err, "failed to create manager") os.Exit(1) } clientset, err := kubernetes.NewForConfig(cfg) if err != nil { logger.Error(err, "failed to create clientset") os.Exit(1) } if err = util.InitializeIngressCapabilities(clientset); err != nil { logger.Error(err, "failed to retrieve cluster ingress capabilities") os.Exit(1) } var registry *scheduler.Registry if enableBatchScheduler { registry = scheduler.GetRegistry() _ = registry.Register(common.VolcanoSchedulerName, volcano.Factory) _ = registry.Register(yunikorn.SchedulerName, yunikorn.Factory) // Register kube-schedulers. for _, name := range kubeSchedulerNames { _ = registry.Register(name, kubescheduler.Factory) } schedulerNames := registry.GetRegisteredSchedulerNames() if defaultBatchScheduler != "" && !slices.Contains(schedulerNames, defaultBatchScheduler) { logger.Error(nil, "Failed to find default batch scheduler in registered schedulers") os.Exit(1) } } // Setup controller for SparkApplication. if err = sparkapplication.NewReconciler( mgr, mgr.GetScheme(), mgr.GetClient(), mgr.GetEventRecorderFor("spark-application-controller"), registry, newSparkApplicationReconcilerOptions(), ).SetupWithManager(mgr, newControllerOptions()); err != nil { logger.Error(err, "Failed to create controller", "controller", "SparkApplication") os.Exit(1) } // Setup controller for ScheduledSparkApplication. if err = scheduledsparkapplication.NewReconciler( mgr.GetScheme(), mgr.GetClient(), mgr.GetEventRecorderFor("scheduled-spark-application-controller"), clock.RealClock{}, newScheduledSparkApplicationReconcilerOptions(), ).SetupWithManager(mgr, newControllerOptions()); err != nil { logger.Error(err, "Failed to create controller", "controller", "ScheduledSparkApplication") os.Exit(1) } // +kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { logger.Error(err, "Failed to set up health check") os.Exit(1) } if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { logger.Error(err, "Failed to set up ready check") os.Exit(1) } logger.Info("Starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { logger.Error(err, "Failed to start manager") os.Exit(1) } } // setupLog Configures the logging system func setupLog() { ctrl.SetLogger(logzap.New( logzap.UseFlagOptions(&zapOptions), func(o *logzap.Options) { o.Development = development }, func(o *logzap.Options) { o.ZapOpts = append(o.ZapOpts, zap.AddCaller()) }, func(o *logzap.Options) { var config zapcore.EncoderConfig if !development { config = zap.NewProductionEncoderConfig() } else { config = zap.NewDevelopmentEncoderConfig() } config.EncodeLevel = zapcore.CapitalColorLevelEncoder config.EncodeTime = zapcore.ISO8601TimeEncoder config.EncodeCaller = zapcore.ShortCallerEncoder o.Encoder = zapcore.NewConsoleEncoder(config) }), ) } func newTLSOptions() []func(c *tls.Config) { // if the enable-http2 flag is false (the default), http/2 should be disabled // due to its vulnerabilities. More specifically, disabling http/2 will // prevent from being vulnerable to the HTTP/2 Stream Cancellation and // Rapid Reset CVEs. For more information see: // - https://github.com/advisories/GHSA-qppj-fm5r-hxr3 // - https://github.com/advisories/GHSA-4374-p667-p6c8 disableHTTP2 := func(c *tls.Config) { logger.Info("disabling http/2") c.NextProtos = []string{"http/1.1"} } tlsOpts := []func(*tls.Config){} if !enableHTTP2 { tlsOpts = append(tlsOpts, disableHTTP2) } return tlsOpts } // newCacheOptions creates and returns a cache.Options instance configured with default namespaces and object caching settings. func newCacheOptions() cache.Options { defaultNamespaces := make(map[string]cache.Config) if !util.ContainsString(namespaces, cache.AllNamespaces) { for _, ns := range namespaces { defaultNamespaces[ns] = cache.Config{} } } options := cache.Options{ Scheme: scheme, DefaultNamespaces: defaultNamespaces, ByObject: map[client.Object]cache.ByObject{ &corev1.Pod{}: { Label: labels.SelectorFromSet(labels.Set{ common.LabelLaunchedBySparkOperator: "true", }), }, &corev1.ConfigMap{}: {}, &corev1.PersistentVolumeClaim{}: {}, &corev1.Service{}: {}, &v1beta2.SparkApplication{}: {}, }, } return options } // newControllerOptions creates and returns a controller.Options instance configured with the given options. func newControllerOptions() controller.Options { options := controller.Options{ MaxConcurrentReconciles: controllerThreads, CacheSyncTimeout: cacheSyncTimeout, RateLimiter: util.NewRateLimiter[ctrl.Request](workqueueRateLimiterBucketQPS, workqueueRateLimiterBucketSize, workqueueRateLimiterMaxDelay), } return options } func newSparkApplicationReconcilerOptions() sparkapplication.Options { var sparkApplicationMetrics *metrics.SparkApplicationMetrics var sparkExecutorMetrics *metrics.SparkExecutorMetrics if enableMetrics { sparkApplicationMetrics = metrics.NewSparkApplicationMetrics(metricsPrefix, metricsLabels, metricsJobStartLatencyBuckets) sparkApplicationMetrics.Register() sparkExecutorMetrics = metrics.NewSparkExecutorMetrics(metricsPrefix, metricsLabels) sparkExecutorMetrics.Register() } options := sparkapplication.Options{ Namespaces: namespaces, EnableUIService: enableUIService, IngressClassName: ingressClassName, IngressURLFormat: ingressURLFormat, DefaultBatchScheduler: defaultBatchScheduler, DriverPodCreationGracePeriod: driverPodCreationGracePeriod, SparkApplicationMetrics: sparkApplicationMetrics, SparkExecutorMetrics: sparkExecutorMetrics, MaxTrackedExecutorPerApp: maxTrackedExecutorPerApp, } if enableBatchScheduler { options.KubeSchedulerNames = kubeSchedulerNames } return options } func newScheduledSparkApplicationReconcilerOptions() scheduledsparkapplication.Options { options := scheduledsparkapplication.Options{ Namespaces: namespaces, } return options }