spark-operator/cmd/operator/controller/start.go

421 lines
16 KiB
Go

/*
Copyright 2024 The Kubeflow authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"crypto/tls"
"flag"
"os"
"slices"
"time"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
logzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
ctrlwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
schedulingv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
sparkoperator "github.com/kubeflow/spark-operator"
"github.com/kubeflow/spark-operator/api/v1beta2"
"github.com/kubeflow/spark-operator/internal/controller/scheduledsparkapplication"
"github.com/kubeflow/spark-operator/internal/controller/sparkapplication"
"github.com/kubeflow/spark-operator/internal/metrics"
"github.com/kubeflow/spark-operator/internal/scheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/kubescheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/volcano"
"github.com/kubeflow/spark-operator/internal/scheduler/yunikorn"
"github.com/kubeflow/spark-operator/pkg/common"
"github.com/kubeflow/spark-operator/pkg/util"
// +kubebuilder:scaffold:imports
)
var (
scheme = runtime.NewScheme()
logger = ctrl.Log.WithName("")
)
var (
namespaces []string
// Controller
controllerThreads int
cacheSyncTimeout time.Duration
maxTrackedExecutorPerApp int
//WorkQueue
workqueueRateLimiterBucketQPS int
workqueueRateLimiterBucketSize int
workqueueRateLimiterMaxDelay time.Duration
// Batch scheduler
enableBatchScheduler bool
kubeSchedulerNames []string
defaultBatchScheduler string
// Spark web UI service and ingress
enableUIService bool
ingressClassName string
ingressURLFormat string
// Leader election
enableLeaderElection bool
leaderElectionLockName string
leaderElectionLockNamespace string
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
driverPodCreationGracePeriod time.Duration
// Metrics
enableMetrics bool
metricsBindAddress string
metricsEndpoint string
metricsPrefix string
metricsLabels []string
metricsJobStartLatencyBuckets []float64
healthProbeBindAddress string
pprofBindAddress string
secureMetrics bool
enableHTTP2 bool
development bool
zapOptions = logzap.Options{}
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(schedulingv1alpha1.AddToScheme(scheme))
utilruntime.Must(v1beta2.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
}
func NewStartCommand() *cobra.Command {
var command = &cobra.Command{
Use: "start",
Short: "Start controller and webhook",
PreRun: func(_ *cobra.Command, args []string) {
development = viper.GetBool("development")
},
Run: func(_ *cobra.Command, args []string) {
sparkoperator.PrintVersion(false)
start()
},
}
command.Flags().IntVar(&controllerThreads, "controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
command.Flags().StringSliceVar(&namespaces, "namespaces", []string{}, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset or contains empty string.")
command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.")
command.Flags().IntVar(&maxTrackedExecutorPerApp, "max-tracked-executor-per-app", 1000, "The maximum number of tracked executors per SparkApplication.")
command.Flags().IntVar(&workqueueRateLimiterBucketQPS, "workqueue-ratelimiter-bucket-qps", 10, "QPS of the bucket rate of the workqueue.")
command.Flags().IntVar(&workqueueRateLimiterBucketSize, "workqueue-ratelimiter-bucket-size", 100, "The token bucket size of the workqueue.")
command.Flags().DurationVar(&workqueueRateLimiterMaxDelay, "workqueue-ratelimiter-max-delay", rate.InfDuration, "The maximum delay of the workqueue.")
command.Flags().BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "Enable batch schedulers.")
command.Flags().StringSliceVar(&kubeSchedulerNames, "kube-scheduler-names", []string{}, "The kube-scheduler names for scheduling Spark applications.")
command.Flags().StringVar(&defaultBatchScheduler, "default-batch-scheduler", "", "Default batch scheduler.")
command.Flags().BoolVar(&enableUIService, "enable-ui-service", true, "Enable Spark Web UI service.")
command.Flags().StringVar(&ingressClassName, "ingress-class-name", "", "Set ingressClassName for ingress resources created.")
command.Flags().StringVar(&ingressURLFormat, "ingress-url-format", "", "Ingress URL format.")
command.Flags().BoolVar(&enableLeaderElection, "leader-election", false, "Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
command.Flags().StringVar(&leaderElectionLockName, "leader-election-lock-name", "spark-operator-lock", "Name of the ConfigMap for leader election.")
command.Flags().StringVar(&leaderElectionLockNamespace, "leader-election-lock-namespace", "spark-operator", "Namespace in which to create the ConfigMap for leader election.")
command.Flags().DurationVar(&leaderElectionLeaseDuration, "leader-election-lease-duration", 15*time.Second, "Leader election lease duration.")
command.Flags().DurationVar(&leaderElectionRenewDeadline, "leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.")
command.Flags().DurationVar(&leaderElectionRetryPeriod, "leader-election-retry-period", 4*time.Second, "Leader election retry period.")
command.Flags().DurationVar(&driverPodCreationGracePeriod, "driver-pod-creation-grace-period", 10*time.Second, "Grace period after a successful spark-submit when driver pod not found errors will be retried. Useful if the driver pod can take some time to be created.")
command.Flags().BoolVar(&enableMetrics, "enable-metrics", false, "Enable metrics.")
command.Flags().StringVar(&metricsBindAddress, "metrics-bind-address", "0", "The address the metric endpoint binds to. "+
"Use the port :8080. If not set, it will be 0 in order to disable the metrics server")
command.Flags().StringVar(&metricsEndpoint, "metrics-endpoint", "/metrics", "Metrics endpoint.")
command.Flags().StringVar(&metricsPrefix, "metrics-prefix", "", "Prefix for the metrics.")
command.Flags().StringSliceVar(&metricsLabels, "metrics-labels", []string{}, "Labels to be added to the metrics.")
command.Flags().Float64SliceVar(&metricsJobStartLatencyBuckets, "metrics-job-start-latency-buckets", []float64{30, 60, 90, 120, 150, 180, 210, 240, 270, 300}, "Buckets for the job start latency histogram.")
command.Flags().StringVar(&healthProbeBindAddress, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
command.Flags().BoolVar(&secureMetrics, "secure-metrics", false, "If set the metrics endpoint is served securely")
command.Flags().BoolVar(&enableHTTP2, "enable-http2", false, "If set, HTTP/2 will be enabled for the metrics and webhook servers")
command.Flags().StringVar(&pprofBindAddress, "pprof-bind-address", "0", "The address the pprof endpoint binds to. "+
"If not set, it will be 0 in order to disable the pprof server")
flagSet := flag.NewFlagSet("controller", flag.ExitOnError)
ctrl.RegisterFlags(flagSet)
zapOptions.BindFlags(flagSet)
command.Flags().AddGoFlagSet(flagSet)
return command
}
func start() {
setupLog()
// Create the client rest config. Use kubeConfig if given, otherwise assume in-cluster.
cfg, err := ctrl.GetConfig()
if err != nil {
logger.Error(err, "failed to get kube config")
os.Exit(1)
}
// Create the manager.
tlsOptions := newTLSOptions()
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme,
Cache: newCacheOptions(),
Metrics: metricsserver.Options{
BindAddress: metricsBindAddress,
SecureServing: secureMetrics,
TLSOpts: tlsOptions,
},
WebhookServer: ctrlwebhook.NewServer(ctrlwebhook.Options{
TLSOpts: tlsOptions,
}),
HealthProbeBindAddress: healthProbeBindAddress,
PprofBindAddress: pprofBindAddress,
LeaderElection: enableLeaderElection,
LeaderElectionID: leaderElectionLockName,
LeaderElectionNamespace: leaderElectionLockNamespace,
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
// when the Manager ends. This requires the binary to immediately end when the
// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
// speeds up voluntary leader transitions as the new leader don't have to wait
// LeaseDuration time first.
//
// In the default scaffold provided, the program ends immediately after
// the manager stops, so would be fine to enable this option. However,
// if you are doing or is intended to do any operation such as perform cleanups
// after the manager stops then its usage might be unsafe.
// LeaderElectionReleaseOnCancel: true,
})
if err != nil {
logger.Error(err, "failed to create manager")
os.Exit(1)
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Error(err, "failed to create clientset")
os.Exit(1)
}
if err = util.InitializeIngressCapabilities(clientset); err != nil {
logger.Error(err, "failed to retrieve cluster ingress capabilities")
os.Exit(1)
}
var registry *scheduler.Registry
if enableBatchScheduler {
registry = scheduler.GetRegistry()
_ = registry.Register(common.VolcanoSchedulerName, volcano.Factory)
_ = registry.Register(yunikorn.SchedulerName, yunikorn.Factory)
// Register kube-schedulers.
for _, name := range kubeSchedulerNames {
_ = registry.Register(name, kubescheduler.Factory)
}
schedulerNames := registry.GetRegisteredSchedulerNames()
if defaultBatchScheduler != "" && !slices.Contains(schedulerNames, defaultBatchScheduler) {
logger.Error(nil, "Failed to find default batch scheduler in registered schedulers")
os.Exit(1)
}
}
// Setup controller for SparkApplication.
if err = sparkapplication.NewReconciler(
mgr,
mgr.GetScheme(),
mgr.GetClient(),
mgr.GetEventRecorderFor("spark-application-controller"),
registry,
newSparkApplicationReconcilerOptions(),
).SetupWithManager(mgr, newControllerOptions()); err != nil {
logger.Error(err, "Failed to create controller", "controller", "SparkApplication")
os.Exit(1)
}
// Setup controller for ScheduledSparkApplication.
if err = scheduledsparkapplication.NewReconciler(
mgr.GetScheme(),
mgr.GetClient(),
mgr.GetEventRecorderFor("scheduled-spark-application-controller"),
clock.RealClock{},
newScheduledSparkApplicationReconcilerOptions(),
).SetupWithManager(mgr, newControllerOptions()); err != nil {
logger.Error(err, "Failed to create controller", "controller", "ScheduledSparkApplication")
os.Exit(1)
}
// +kubebuilder:scaffold:builder
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
logger.Error(err, "Failed to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
logger.Error(err, "Failed to set up ready check")
os.Exit(1)
}
logger.Info("Starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
logger.Error(err, "Failed to start manager")
os.Exit(1)
}
}
// setupLog Configures the logging system
func setupLog() {
ctrl.SetLogger(logzap.New(
logzap.UseFlagOptions(&zapOptions),
func(o *logzap.Options) {
o.Development = development
}, func(o *logzap.Options) {
o.ZapOpts = append(o.ZapOpts, zap.AddCaller())
}, func(o *logzap.Options) {
var config zapcore.EncoderConfig
if !development {
config = zap.NewProductionEncoderConfig()
} else {
config = zap.NewDevelopmentEncoderConfig()
}
config.EncodeLevel = zapcore.CapitalColorLevelEncoder
config.EncodeTime = zapcore.ISO8601TimeEncoder
config.EncodeCaller = zapcore.ShortCallerEncoder
o.Encoder = zapcore.NewConsoleEncoder(config)
}),
)
}
func newTLSOptions() []func(c *tls.Config) {
// if the enable-http2 flag is false (the default), http/2 should be disabled
// due to its vulnerabilities. More specifically, disabling http/2 will
// prevent from being vulnerable to the HTTP/2 Stream Cancellation and
// Rapid Reset CVEs. For more information see:
// - https://github.com/advisories/GHSA-qppj-fm5r-hxr3
// - https://github.com/advisories/GHSA-4374-p667-p6c8
disableHTTP2 := func(c *tls.Config) {
logger.Info("disabling http/2")
c.NextProtos = []string{"http/1.1"}
}
tlsOpts := []func(*tls.Config){}
if !enableHTTP2 {
tlsOpts = append(tlsOpts, disableHTTP2)
}
return tlsOpts
}
// newCacheOptions creates and returns a cache.Options instance configured with default namespaces and object caching settings.
func newCacheOptions() cache.Options {
defaultNamespaces := make(map[string]cache.Config)
if !util.ContainsString(namespaces, cache.AllNamespaces) {
for _, ns := range namespaces {
defaultNamespaces[ns] = cache.Config{}
}
}
options := cache.Options{
Scheme: scheme,
DefaultNamespaces: defaultNamespaces,
ByObject: map[client.Object]cache.ByObject{
&corev1.Pod{}: {
Label: labels.SelectorFromSet(labels.Set{
common.LabelLaunchedBySparkOperator: "true",
}),
},
&corev1.ConfigMap{}: {},
&corev1.PersistentVolumeClaim{}: {},
&corev1.Service{}: {},
&v1beta2.SparkApplication{}: {},
},
}
return options
}
// newControllerOptions creates and returns a controller.Options instance configured with the given options.
func newControllerOptions() controller.Options {
options := controller.Options{
MaxConcurrentReconciles: controllerThreads,
CacheSyncTimeout: cacheSyncTimeout,
RateLimiter: util.NewRateLimiter[ctrl.Request](workqueueRateLimiterBucketQPS, workqueueRateLimiterBucketSize, workqueueRateLimiterMaxDelay),
}
return options
}
func newSparkApplicationReconcilerOptions() sparkapplication.Options {
var sparkApplicationMetrics *metrics.SparkApplicationMetrics
var sparkExecutorMetrics *metrics.SparkExecutorMetrics
if enableMetrics {
sparkApplicationMetrics = metrics.NewSparkApplicationMetrics(metricsPrefix, metricsLabels, metricsJobStartLatencyBuckets)
sparkApplicationMetrics.Register()
sparkExecutorMetrics = metrics.NewSparkExecutorMetrics(metricsPrefix, metricsLabels)
sparkExecutorMetrics.Register()
}
options := sparkapplication.Options{
Namespaces: namespaces,
EnableUIService: enableUIService,
IngressClassName: ingressClassName,
IngressURLFormat: ingressURLFormat,
DefaultBatchScheduler: defaultBatchScheduler,
DriverPodCreationGracePeriod: driverPodCreationGracePeriod,
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
MaxTrackedExecutorPerApp: maxTrackedExecutorPerApp,
}
if enableBatchScheduler {
options.KubeSchedulerNames = kubeSchedulerNames
}
return options
}
func newScheduledSparkApplicationReconcilerOptions() scheduledsparkapplication.Options {
options := scheduledsparkapplication.Options{
Namespaces: namespaces,
}
return options
}