diff --git a/.golangci.yml b/.golangci.yml index 5875fccc6..0060c7432 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -86,3 +86,6 @@ issues: - path: cmd/scheduler-estimator/main.go linters: - gci + - path: operator/cmd/operator/operator.go + linters: + - gci diff --git a/operator/cmd/operator/app/operator.go b/operator/cmd/operator/app/operator.go new file mode 100644 index 000000000..cc53fd329 --- /dev/null +++ b/operator/cmd/operator/app/operator.go @@ -0,0 +1,164 @@ +package app + +import ( + "context" + "flag" + "fmt" + "net" + "os" + "strconv" + + "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/util/sets" + restclient "k8s.io/client-go/rest" + cliflag "k8s.io/component-base/cli/flag" + "k8s.io/component-base/term" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + ctrlruntimecfg "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" + "sigs.k8s.io/controller-runtime/pkg/healthz" + + "github.com/karmada-io/karmada/operator/cmd/operator/app/options" + operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1" + ctrlctx "github.com/karmada-io/karmada/operator/pkg/controller/context" + "github.com/karmada-io/karmada/operator/pkg/controller/karmada" + "github.com/karmada-io/karmada/operator/pkg/scheme" + "github.com/karmada-io/karmada/pkg/sharedcli" + "github.com/karmada-io/karmada/pkg/sharedcli/klogflag" +) + +// NewOperatorCommand creates a *cobra.Command object with default parameters +func NewOperatorCommand(ctx context.Context) *cobra.Command { + o := options.NewOptions() + cmd := &cobra.Command{ + Use: "karmada-operator", + PersistentPreRunE: func(*cobra.Command, []string) error { + // silence client-go warnings. + // karmada-operator generically watches APIs (including deprecated ones), + // and CI ensures it works properly against matching kube-apiserver versions. + restclient.SetDefaultWarningHandler(restclient.NoWarnings{}) + return nil + }, + RunE: func(cmd *cobra.Command, args []string) error { + if err := o.Validate(); err != nil { + return err + } + return Run(ctx, o) + }, + Args: func(cmd *cobra.Command, args []string) error { + for _, arg := range args { + if len(arg) > 0 { + return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args) + } + } + return nil + }, + } + + fss := cliflag.NamedFlagSets{} + + genericFlagSet := fss.FlagSet("generic") + // Add the flag(--kubeconfig) that is added by controller-runtime + // (https://github.com/kubernetes-sigs/controller-runtime/blob/v0.11.1/pkg/client/config/config.go#L39). + genericFlagSet.AddGoFlagSet(flag.CommandLine) + o.AddFlags(genericFlagSet, controllers.ControllerNames(), controllersDisabledByDefault.List()) + + // Set klog flags + logsFlagSet := fss.FlagSet("logs") + klogflag.Add(logsFlagSet) + + cmd.Flags().AddFlagSet(genericFlagSet) + cmd.Flags().AddFlagSet(logsFlagSet) + + cols, _, _ := term.TerminalSize(cmd.OutOrStdout()) + sharedcli.SetUsageAndHelpFunc(cmd, fss, cols) + return cmd +} + +// Run runs the karmada-operator. This should never exit. +func Run(ctx context.Context, o *options.Options) error { + klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK")) + + manager, err := createControllerManager(ctx, o) + if err != nil { + klog.Errorf("failed to build controller manager: %v", err) + return err + } + + if err := manager.AddHealthzCheck("ping", healthz.Ping); err != nil { + klog.Errorf("failed to add health check endpoint: %v", err) + return err + } + + controllerCtx := ctrlctx.Context{ + Controllers: o.Controllers, + Manager: manager, + } + if err := controllers.StartControllers(controllerCtx, controllersDisabledByDefault); err != nil { + klog.Errorf("failed to start controllers: %v", err) + return err + } + + // blocks until the context is done. + if err := manager.Start(ctx); err != nil { + klog.Errorf("controller manager exits unexpectedly: %v", err) + return err + } + + // never reach here + return nil +} + +var controllers = make(ctrlctx.Initializers) + +// controllersDisabledByDefault is the set of controllers which is disabled by default +var controllersDisabledByDefault = sets.NewString() + +func init() { + controllers["karmada"] = startKarmadaController +} + +func startKarmadaController(ctx ctrlctx.Context) (bool, error) { + ctrl := &karmada.Controller{ + Client: ctx.Manager.GetClient(), + EventRecorder: ctx.Manager.GetEventRecorderFor(karmada.ControllerName), + } + if err := ctrl.SetupWithManager(ctx.Manager); err != nil { + klog.ErrorS(err, "unable to setup with manager", "controller", karmada.ControllerName) + return false, err + } + return true, nil +} + +// createControllerManager creates controllerruntime.Manager from the given configuration +func createControllerManager(ctx context.Context, o *options.Options) (controllerruntime.Manager, error) { + config, err := controllerruntime.GetConfig() + if err != nil { + return nil, err + } + + opts := controllerruntime.Options{ + Logger: klog.Background(), + Scheme: scheme.Scheme, + BaseContext: func() context.Context { + return ctx + }, + SyncPeriod: &o.ResyncPeriod.Duration, + LeaderElection: o.LeaderElection.LeaderElect, + LeaderElectionID: o.LeaderElection.ResourceName, + LeaderElectionNamespace: o.LeaderElection.ResourceNamespace, + LeaseDuration: &o.LeaderElection.LeaseDuration.Duration, + RenewDeadline: &o.LeaderElection.RenewDeadline.Duration, + RetryPeriod: &o.LeaderElection.RetryPeriod.Duration, + LeaderElectionResourceLock: o.LeaderElection.ResourceLock, + HealthProbeBindAddress: net.JoinHostPort(o.BindAddress, strconv.Itoa(o.SecurePort)), + LivenessEndpointName: "/healthz", + MetricsBindAddress: o.MetricsBindAddress, + Controller: ctrlruntimecfg.ControllerConfigurationSpec{ + GroupKindConcurrency: map[string]int{ + operatorv1alpha1.SchemeGroupVersion.WithKind("Karmada").GroupKind().String(): o.ConcurrentKarmadaSyncs, + }, + }, + } + return controllerruntime.NewManager(config, opts) +} diff --git a/operator/cmd/operator/app/options/options.go b/operator/cmd/operator/app/options/options.go new file mode 100644 index 000000000..684e881c3 --- /dev/null +++ b/operator/cmd/operator/app/options/options.go @@ -0,0 +1,89 @@ +package options + +import ( + "fmt" + "strings" + "time" + + "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/leaderelection/resourcelock" + componentbaseconfig "k8s.io/component-base/config" + "k8s.io/component-base/config/options" +) + +// Options is the main context object for the karmada-operator. +type Options struct { + // Controllers is the list of controllers to enable or disable + // '*' means "all enabled by default controllers" + // 'foo' means "enable 'foo'" + // '-foo' means "disable 'foo'" + // first item for a particular name wins + Controllers []string + // LeaderElection defines the configuration of leader election client. + LeaderElection componentbaseconfig.LeaderElectionConfiguration + // BindAddress is the IP address on which to listen for the --secure-port port. + BindAddress string + // SecurePort is the port that the the server serves at. + // Note: We hope support https in the future once controller-runtime provides the functionality. + SecurePort int + // KubeAPIQPS is the QPS to use while talking with karmada-apiserver. + KubeAPIQPS float32 + // KubeAPIBurst is the burst to allow while talking with karmada-apiserver. + KubeAPIBurst int32 + // ResyncPeriod is the base frequency the informers are resynced. + // Defaults to 0, which means the created informer will never do resyncs. + ResyncPeriod metav1.Duration + // MetricsBindAddress is the TCP address that the controller should bind to + // for serving prometheus metrics. + // It can be set to "0" to disable the metrics serving. + // Defaults to ":8080". + MetricsBindAddress string + // ConcurrentKarmadaSyncs is the number of karmada objects that are allowed to sync concurrently. + ConcurrentKarmadaSyncs int +} + +// NewOptions creates a new Options with a default config. +func NewOptions() *Options { + o := Options{ + Controllers: []string{"*"}, + LeaderElection: componentbaseconfig.LeaderElectionConfiguration{ + LeaderElect: true, + LeaseDuration: metav1.Duration{Duration: 15 * time.Second}, + RenewDeadline: metav1.Duration{Duration: 10 * time.Second}, + RetryPeriod: metav1.Duration{Duration: 2 * time.Second}, + ResourceLock: resourcelock.LeasesResourceLock, + ResourceNamespace: "karmada-system", + ResourceName: "karmada-operator", + }, + BindAddress: "0.0.0.0", + SecurePort: 8443, + KubeAPIQPS: 50, + KubeAPIBurst: 100, + ConcurrentKarmadaSyncs: 5, + } + return &o +} + +// AddFlags adds flags to the specified FlagSet. +func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string, disabledByDefaultControllers []string) { + fs.DurationVar(&o.ResyncPeriod.Duration, "resync-period", o.ResyncPeriod.Duration, "ResyncPeriod determines the minimum frequency at which watched resources are reconciled.") + fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", o.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver.") + fs.Int32Var(&o.KubeAPIBurst, "kube-api-burst", o.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver.") + fs.StringSliceVar(&o.Controllers, "controllers", o.Controllers, fmt.Sprintf(""+ + "A list of controllers to enable. '*' enables all on-by-default controllers, 'foo' enables the controller "+ + "named 'foo', '-foo' disables the controller named 'foo'.\nAll controllers: %s\nDisabled-by-default controllers: %s", + strings.Join(allControllers, ", "), strings.Join(disabledByDefaultControllers, ", "))) + fs.IntVar(&o.ConcurrentKarmadaSyncs, "concurrent-karmada-syncs", o.ConcurrentKarmadaSyncs, "The number of karmada objects that are allowed to sync concurrently..") + options.BindLeaderElectionFlags(&o.LeaderElection, fs) +} + +// Validate is used to validate the options and config before launching the controller manager +func (o *Options) Validate() error { + var errs []error + + // do validation logic here + + return utilerrors.NewAggregate(errs) +} diff --git a/operator/cmd/operator/operator.go b/operator/cmd/operator/operator.go new file mode 100644 index 000000000..c71f767da --- /dev/null +++ b/operator/cmd/operator/operator.go @@ -0,0 +1,29 @@ +package main + +import ( + "os" + + // Note that Kubernetes registers workqueue metrics to default prometheus Registry. And the registry will be + // initialized by the package 'k8s.io/apiserver/pkg/server'. + // See https://github.com/kubernetes/kubernetes/blob/release-1.26/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go#L25-L26 + // But the controller-runtime registers workqueue metrics to its own Registry instead of default prometheus Registry. + // See https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.14/pkg/metrics/workqueue.go#L24-L26 + // However, global workqueue metrics factory will be only initialized once. + // See https://github.com/kubernetes/kubernetes/blob/release-1.26/staging/src/k8s.io/client-go/util/workqueue/metrics.go#L257-L261 + // So this package should be initialized before 'k8s.io/apiserver/pkg/server', thus the internal registry of + // controller-runtime could be set first. + _ "sigs.k8s.io/controller-runtime/pkg/metrics" + + apiserver "k8s.io/apiserver/pkg/server" + "k8s.io/component-base/cli" + _ "k8s.io/component-base/logs/json/register" // for JSON log format registration + + "github.com/karmada-io/karmada/operator/cmd/operator/app" +) + +func main() { + ctx := apiserver.SetupSignalContext() + command := app.NewOperatorCommand(ctx) + code := cli.Run(command) + os.Exit(code) +} diff --git a/operator/pkg/controller/context/context.go b/operator/pkg/controller/context/context.go new file mode 100644 index 000000000..24666f8af --- /dev/null +++ b/operator/pkg/controller/context/context.go @@ -0,0 +1,77 @@ +package context + +import ( + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" +) + +// Context defines the context object for controller +type Context struct { + // Controllers is the list of controllers to enable or disable + // '*' means "all enabled by default controllers" + // 'foo' means "enable 'foo'" + // '-foo' means "disable 'foo'" + // first item for a particular name wins + Controllers []string + + Manager controllerruntime.Manager +} + +// IsControllerEnabled checks if the context's controllers enabled or not +func (c Context) IsControllerEnabled(name string, disabledByDefaultControllers sets.String) bool { + hasStar := false + for _, ctrl := range c.Controllers { + if ctrl == name { + return true + } + if ctrl == "-"+name { + return false + } + if ctrl == "*" { + hasStar = true + } + } + // if we get here, there was no explicit choice + if !hasStar { + // nothing on by default + return false + } + + return !disabledByDefaultControllers.Has(name) +} + +// InitFunc is used to launch a particular controller. +// Any error returned will cause the controller process to `Fatal` +// The bool indicates whether the controller was enabled. +type InitFunc func(ctx Context) (enabled bool, err error) + +// Initializers is a public map of named controller groups +type Initializers map[string]InitFunc + +// ControllerNames returns all known controller names +func (i Initializers) ControllerNames() []string { + return sets.StringKeySet(i).List() +} + +// StartControllers starts a set of controllers with a specified ControllerContext +func (i Initializers) StartControllers(ctx Context, controllersDisabledByDefault sets.String) error { + for controllerName, initFn := range i { + if !ctx.IsControllerEnabled(controllerName, controllersDisabledByDefault) { + klog.Warningf("%q is disabled", controllerName) + continue + } + klog.V(1).Infof("Starting %q", controllerName) + started, err := initFn(ctx) + if err != nil { + klog.Errorf("Error starting %q", controllerName) + return err + } + if !started { + klog.Warningf("Skipping %q", controllerName) + continue + } + klog.Infof("Started %q", controllerName) + } + return nil +} diff --git a/operator/pkg/controller/karmada/karmada_controller.go b/operator/pkg/controller/karmada/karmada_controller.go new file mode 100644 index 000000000..53c656883 --- /dev/null +++ b/operator/pkg/controller/karmada/karmada_controller.go @@ -0,0 +1,98 @@ +package karmada + +import ( + "context" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1" +) + +const ( + // ControllerName is the controller name that will be used when reporting events. + ControllerName = "karmada-operator-controller" + + // ControllerFinalizerName is the name of the karmada controller finalizer + ControllerFinalizerName = "operator.karmada.io/finalizer" +) + +// Controller controls the Karmada resource. +type Controller struct { + client.Client + + EventRecorder record.EventRecorder +} + +// Reconcile performs a full reconciliation for the object referred to by the Request. +// The Controller will requeue the Request to be processed again if an error is non-nil or +// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. +func (ctrl *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + startTime := time.Now() + klog.V(4).InfoS("Started syncing karmada", "karmada", req, "startTime", startTime) + defer func() { + klog.V(4).InfoS("Finished syncing karmada", "karmada", req, "duration", time.Since(startTime)) + }() + + karmada := &operatorv1alpha1.Karmada{} + if err := ctrl.Get(ctx, req.NamespacedName, karmada); err != nil { + // The resource may no longer exist, in which case we stop processing. + if errors.IsNotFound(err) { + klog.V(2).InfoS("Karmada has been deleted", "karmada", req) + return controllerruntime.Result{}, nil + } + return controllerruntime.Result{}, err + } + + // examine DeletionTimestamp to determine if object is under deletion + if karmada.DeletionTimestamp.IsZero() { + // The object is not being deleted, so if it does not have our finalizer, + // then lets add the finalizer and update the object. This is equivalent + // registering our finalizer. + if !controllerutil.ContainsFinalizer(karmada, ControllerFinalizerName) { + controllerutil.AddFinalizer(karmada, ControllerFinalizerName) + if err := ctrl.Update(ctx, karmada); err != nil { + return controllerruntime.Result{}, err + } + } + } else { + // The object is being deleted + if controllerutil.ContainsFinalizer(karmada, ControllerFinalizerName) { + // our finalizer is present, so lets handle any external dependency + if err := ctrl.deleteUnableGCResources(karmada); err != nil { + // if fail to delete the external dependency here, return with error + // so that it can be retried + return controllerruntime.Result{}, err + } + + // remove our finalizer from the list and update it. + controllerutil.RemoveFinalizer(karmada, ControllerFinalizerName) + if err := ctrl.Update(ctx, karmada); err != nil { + return controllerruntime.Result{}, err + } + // Stop reconciliation as the item is being deleted + return controllerruntime.Result{}, nil + } + } + + klog.V(2).InfoS("Reconciling karmada", "name", req.Name) + + // do reconcile + + return controllerruntime.Result{}, nil +} + +func (ctrl *Controller) deleteUnableGCResources(karmada *operatorv1alpha1.Karmada) error { + klog.InfoS("Deleting unable gc resources", "karmada", klog.KObj(karmada)) + return nil +} + +// SetupWithManager creates a controller and register to controller manager. +func (ctrl *Controller) SetupWithManager(mgr controllerruntime.Manager) error { + return controllerruntime.NewControllerManagedBy(mgr).For(&operatorv1alpha1.Karmada{}).Complete(ctrl) +} diff --git a/operator/pkg/scheme/scheme.go b/operator/pkg/scheme/scheme.go new file mode 100644 index 000000000..87ad1e9f8 --- /dev/null +++ b/operator/pkg/scheme/scheme.go @@ -0,0 +1,17 @@ +package scheme + +import ( + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes/scheme" + + operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1" +) + +// Scheme holds the aggregated Kubernetes's schemes and extended schemes. +var Scheme = runtime.NewScheme() + +func init() { + utilruntime.Must(scheme.AddToScheme(Scheme)) + utilruntime.Must(operatorv1alpha1.AddToScheme(Scheme)) +} diff --git a/vendor/k8s.io/component-base/config/options/leaderelectionconfig.go b/vendor/k8s.io/component-base/config/options/leaderelectionconfig.go new file mode 100644 index 000000000..5c671a1b7 --- /dev/null +++ b/vendor/k8s.io/component-base/config/options/leaderelectionconfig.go @@ -0,0 +1,53 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "github.com/spf13/pflag" + "k8s.io/component-base/config" +) + +// BindLeaderElectionFlags binds the LeaderElectionConfiguration struct fields to a flagset +func BindLeaderElectionFlags(l *config.LeaderElectionConfiguration, fs *pflag.FlagSet) { + fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+ + "Start a leader election client and gain leadership before "+ + "executing the main loop. Enable this when running replicated "+ + "components for high availability.") + fs.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+ + "The duration that non-leader candidates will wait after observing a leadership "+ + "renewal until attempting to acquire leadership of a led but unrenewed leader "+ + "slot. This is effectively the maximum duration that a leader can be stopped "+ + "before it is replaced by another candidate. This is only applicable if leader "+ + "election is enabled.") + fs.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+ + "The interval between attempts by the acting master to renew a leadership slot "+ + "before it stops leading. This must be less than or equal to the lease duration. "+ + "This is only applicable if leader election is enabled.") + fs.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+ + "The duration the clients should wait between attempting acquisition and renewal "+ + "of a leadership. This is only applicable if leader election is enabled.") + fs.StringVar(&l.ResourceLock, "leader-elect-resource-lock", l.ResourceLock, ""+ + "The type of resource object that is used for locking during "+ + "leader election. Supported options are 'leases', 'endpointsleases' "+ + "and 'configmapsleases'.") + fs.StringVar(&l.ResourceName, "leader-elect-resource-name", l.ResourceName, ""+ + "The name of resource object that is used for locking during "+ + "leader election.") + fs.StringVar(&l.ResourceNamespace, "leader-elect-resource-namespace", l.ResourceNamespace, ""+ + "The namespace of resource object that is used for locking during "+ + "leader election.") +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 6c8d8f72e..09a243ced 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1385,6 +1385,7 @@ k8s.io/code-generator/third_party/forked/golang/reflect k8s.io/component-base/cli k8s.io/component-base/cli/flag k8s.io/component-base/config +k8s.io/component-base/config/options k8s.io/component-base/config/v1alpha1 k8s.io/component-base/featuregate k8s.io/component-base/logs