870 lines
37 KiB
Go
870 lines
37 KiB
Go
/*
|
|
Copyright 2020 The Karmada Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package app
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"time"
|
|
|
|
"github.com/spf13/cobra"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/client-go/discovery"
|
|
"k8s.io/client-go/dynamic"
|
|
"k8s.io/client-go/informers"
|
|
kubeclientset "k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
cliflag "k8s.io/component-base/cli/flag"
|
|
"k8s.io/component-base/term"
|
|
"k8s.io/klog/v2"
|
|
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
|
|
"k8s.io/metrics/pkg/client/custom_metrics"
|
|
"k8s.io/metrics/pkg/client/external_metrics"
|
|
controllerruntime "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/cache"
|
|
"sigs.k8s.io/controller-runtime/pkg/config"
|
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
|
"sigs.k8s.io/controller-runtime/pkg/healthz"
|
|
crtlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
|
|
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
|
|
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
|
|
|
"github.com/karmada-io/karmada/cmd/controller-manager/app/options"
|
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
|
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
|
|
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
|
"github.com/karmada-io/karmada/pkg/clusterdiscovery/clusterapi"
|
|
"github.com/karmada-io/karmada/pkg/controllers/applicationfailover"
|
|
"github.com/karmada-io/karmada/pkg/controllers/binding"
|
|
"github.com/karmada-io/karmada/pkg/controllers/certificate/approver"
|
|
"github.com/karmada-io/karmada/pkg/controllers/cluster"
|
|
controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context"
|
|
"github.com/karmada-io/karmada/pkg/controllers/cronfederatedhpa"
|
|
"github.com/karmada-io/karmada/pkg/controllers/deploymentreplicassyncer"
|
|
"github.com/karmada-io/karmada/pkg/controllers/execution"
|
|
"github.com/karmada-io/karmada/pkg/controllers/federatedhpa"
|
|
metricsclient "github.com/karmada-io/karmada/pkg/controllers/federatedhpa/metrics"
|
|
"github.com/karmada-io/karmada/pkg/controllers/federatedresourcequota"
|
|
"github.com/karmada-io/karmada/pkg/controllers/gracefuleviction"
|
|
"github.com/karmada-io/karmada/pkg/controllers/hpascaletargetmarker"
|
|
"github.com/karmada-io/karmada/pkg/controllers/mcs"
|
|
"github.com/karmada-io/karmada/pkg/controllers/multiclusterservice"
|
|
"github.com/karmada-io/karmada/pkg/controllers/namespace"
|
|
"github.com/karmada-io/karmada/pkg/controllers/remediation"
|
|
"github.com/karmada-io/karmada/pkg/controllers/status"
|
|
"github.com/karmada-io/karmada/pkg/controllers/unifiedauth"
|
|
"github.com/karmada-io/karmada/pkg/controllers/workloadrebalancer"
|
|
"github.com/karmada-io/karmada/pkg/dependenciesdistributor"
|
|
"github.com/karmada-io/karmada/pkg/detector"
|
|
"github.com/karmada-io/karmada/pkg/features"
|
|
"github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient"
|
|
"github.com/karmada-io/karmada/pkg/metrics"
|
|
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
|
|
"github.com/karmada-io/karmada/pkg/sharedcli"
|
|
"github.com/karmada-io/karmada/pkg/sharedcli/klogflag"
|
|
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
|
|
"github.com/karmada-io/karmada/pkg/util"
|
|
"github.com/karmada-io/karmada/pkg/util/fedinformer"
|
|
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
|
|
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager"
|
|
"github.com/karmada-io/karmada/pkg/util/gclient"
|
|
"github.com/karmada-io/karmada/pkg/util/helper"
|
|
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
|
|
"github.com/karmada-io/karmada/pkg/util/overridemanager"
|
|
"github.com/karmada-io/karmada/pkg/util/restmapper"
|
|
"github.com/karmada-io/karmada/pkg/version"
|
|
"github.com/karmada-io/karmada/pkg/version/sharedcommand"
|
|
)
|
|
|
|
// NewControllerManagerCommand creates a *cobra.Command object with default parameters
|
|
func NewControllerManagerCommand(ctx context.Context) *cobra.Command {
|
|
opts := options.NewOptions()
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "karmada-controller-manager",
|
|
Long: `The karmada-controller-manager runs various controllers.
|
|
The controllers watch Karmada objects and then talk to the underlying clusters' API servers
|
|
to create regular Kubernetes resources.`,
|
|
RunE: func(_ *cobra.Command, _ []string) error {
|
|
// validate options
|
|
if errs := opts.Validate(); len(errs) != 0 {
|
|
return errs.ToAggregate()
|
|
}
|
|
|
|
return Run(ctx, opts)
|
|
},
|
|
}
|
|
|
|
fss := cliflag.NamedFlagSets{}
|
|
|
|
genericFlagSet := fss.FlagSet("generic")
|
|
// Add the flag(--kubeconfig) that is added by controller-runtime
|
|
// (https://github.com/kubernetes-sigs/controller-runtime/blob/v0.11.1/pkg/client/config/config.go#L39),
|
|
// and update the flag usage.
|
|
genericFlagSet.AddGoFlagSet(flag.CommandLine)
|
|
genericFlagSet.Lookup("kubeconfig").Usage = "Path to karmada control plane kubeconfig file."
|
|
opts.AddFlags(genericFlagSet, controllers.ControllerNames(), sets.List(controllersDisabledByDefault))
|
|
|
|
// Set klog flags
|
|
logsFlagSet := fss.FlagSet("logs")
|
|
klogflag.Add(logsFlagSet)
|
|
|
|
cmd.AddCommand(sharedcommand.NewCmdVersion("karmada-controller-manager"))
|
|
cmd.Flags().AddFlagSet(genericFlagSet)
|
|
cmd.Flags().AddFlagSet(logsFlagSet)
|
|
|
|
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
|
|
sharedcli.SetUsageAndHelpFunc(cmd, fss, cols)
|
|
return cmd
|
|
}
|
|
|
|
// Run runs the controller-manager with options. This should never exit.
|
|
func Run(ctx context.Context, opts *options.Options) error {
|
|
klog.Infof("karmada-controller-manager version: %s", version.Get())
|
|
|
|
profileflag.ListenAndServe(opts.ProfileOpts)
|
|
|
|
controlPlaneRestConfig, err := controllerruntime.GetConfig()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
controlPlaneRestConfig.QPS, controlPlaneRestConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
|
|
controllerManager, err := controllerruntime.NewManager(controlPlaneRestConfig, controllerruntime.Options{
|
|
Logger: klog.Background(),
|
|
Scheme: gclient.NewSchema(),
|
|
Cache: cache.Options{SyncPeriod: &opts.ResyncPeriod.Duration},
|
|
LeaderElection: opts.LeaderElection.LeaderElect,
|
|
LeaderElectionID: opts.LeaderElection.ResourceName,
|
|
LeaderElectionNamespace: opts.LeaderElection.ResourceNamespace,
|
|
LeaseDuration: &opts.LeaderElection.LeaseDuration.Duration,
|
|
RenewDeadline: &opts.LeaderElection.RenewDeadline.Duration,
|
|
RetryPeriod: &opts.LeaderElection.RetryPeriod.Duration,
|
|
LeaderElectionResourceLock: opts.LeaderElection.ResourceLock,
|
|
HealthProbeBindAddress: opts.HealthProbeBindAddress,
|
|
LivenessEndpointName: "/healthz",
|
|
Metrics: metricsserver.Options{BindAddress: opts.MetricsBindAddress},
|
|
MapperProvider: restmapper.MapperProvider,
|
|
BaseContext: func() context.Context {
|
|
return ctx
|
|
},
|
|
Controller: config.Controller{
|
|
GroupKindConcurrency: map[string]int{
|
|
workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs,
|
|
workv1alpha2.SchemeGroupVersion.WithKind("ResourceBinding").GroupKind().String(): opts.ConcurrentResourceBindingSyncs,
|
|
workv1alpha2.SchemeGroupVersion.WithKind("ClusterResourceBinding").GroupKind().String(): opts.ConcurrentClusterResourceBindingSyncs,
|
|
clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs,
|
|
schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}.GroupKind().String(): opts.ConcurrentNamespaceSyncs,
|
|
},
|
|
CacheSyncTimeout: opts.ClusterCacheSyncTimeout.Duration,
|
|
},
|
|
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
|
|
opts.DefaultTransform = fedinformer.StripUnusedFields
|
|
return cache.New(config, opts)
|
|
},
|
|
})
|
|
if err != nil {
|
|
klog.Errorf("Failed to build controller manager: %v", err)
|
|
return err
|
|
}
|
|
|
|
if err := controllerManager.AddHealthzCheck("ping", healthz.Ping); err != nil {
|
|
klog.Errorf("Failed to add health check endpoint: %v", err)
|
|
return err
|
|
}
|
|
|
|
crtlmetrics.Registry.MustRegister(metrics.ClusterCollectors()...)
|
|
crtlmetrics.Registry.MustRegister(metrics.ResourceCollectors()...)
|
|
crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)
|
|
|
|
if err := helper.IndexWork(ctx, controllerManager); err != nil {
|
|
klog.Fatalf("Failed to index Work: %v", err)
|
|
}
|
|
|
|
setupControllers(controllerManager, opts, ctx.Done())
|
|
|
|
// blocks until the context is done.
|
|
if err := controllerManager.Start(ctx); err != nil {
|
|
klog.Errorf("controller manager exits unexpectedly: %v", err)
|
|
return err
|
|
}
|
|
|
|
// never reach here
|
|
return nil
|
|
}
|
|
|
|
var controllers = make(controllerscontext.Initializers)
|
|
|
|
// controllersDisabledByDefault is the set of controllers which is disabled by default
|
|
var controllersDisabledByDefault = sets.New("hpaScaleTargetMarker", "deploymentReplicasSyncer")
|
|
|
|
func init() {
|
|
controllers["cluster"] = startClusterController
|
|
controllers["clusterStatus"] = startClusterStatusController
|
|
controllers["binding"] = startBindingController
|
|
controllers["bindingStatus"] = startBindingStatusController
|
|
controllers["execution"] = startExecutionController
|
|
controllers["workStatus"] = startWorkStatusController
|
|
controllers["namespace"] = startNamespaceController
|
|
controllers["serviceExport"] = startServiceExportController
|
|
controllers["endpointSlice"] = startEndpointSliceController
|
|
controllers["serviceImport"] = startServiceImportController
|
|
controllers["unifiedAuth"] = startUnifiedAuthController
|
|
controllers["federatedResourceQuotaSync"] = startFederatedResourceQuotaSyncController
|
|
controllers["federatedResourceQuotaStatus"] = startFederatedResourceQuotaStatusController
|
|
controllers["gracefulEviction"] = startGracefulEvictionController
|
|
controllers["applicationFailover"] = startApplicationFailoverController
|
|
controllers["federatedHorizontalPodAutoscaler"] = startFederatedHorizontalPodAutoscalerController
|
|
controllers["cronFederatedHorizontalPodAutoscaler"] = startCronFederatedHorizontalPodAutoscalerController
|
|
controllers["hpaScaleTargetMarker"] = startHPAScaleTargetMarkerController
|
|
controllers["deploymentReplicasSyncer"] = startDeploymentReplicasSyncerController
|
|
controllers["multiclusterservice"] = startMCSController
|
|
controllers["endpointsliceCollect"] = startEndpointSliceCollectController
|
|
controllers["endpointsliceDispatch"] = startEndpointSliceDispatchController
|
|
controllers["remedy"] = startRemedyController
|
|
controllers["workloadRebalancer"] = startWorkloadRebalancerController
|
|
controllers["agentcsrapproving"] = startAgentCSRApprovingController
|
|
}
|
|
|
|
func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
mgr := ctx.Mgr
|
|
opts := ctx.Opts
|
|
|
|
clusterController := &cluster.Controller{
|
|
Client: mgr.GetClient(),
|
|
EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName),
|
|
ClusterMonitorPeriod: opts.ClusterMonitorPeriod.Duration,
|
|
ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod.Duration,
|
|
ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod.Duration,
|
|
FailoverEvictionTimeout: opts.FailoverEvictionTimeout.Duration,
|
|
EnableTaintManager: ctx.Opts.EnableTaintManager,
|
|
ClusterTaintEvictionRetryFrequency: 10 * time.Second,
|
|
ExecutionSpaceRetryFrequency: 10 * time.Second,
|
|
}
|
|
if err := clusterController.SetupWithManager(mgr); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if ctx.Opts.EnableTaintManager {
|
|
if err := cluster.IndexField(mgr); err != nil {
|
|
return false, err
|
|
}
|
|
taintManager := &cluster.NoExecuteTaintManager{
|
|
Client: mgr.GetClient(),
|
|
EventRecorder: mgr.GetEventRecorderFor(cluster.TaintManagerName),
|
|
ClusterTaintEvictionRetryFrequency: 10 * time.Second,
|
|
ConcurrentReconciles: 3,
|
|
}
|
|
if err := taintManager.SetupWithManager(mgr); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func startClusterStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
mgr := ctx.Mgr
|
|
opts := ctx.Opts
|
|
stopChan := ctx.StopChan
|
|
clusterPredicateFunc := predicate.Funcs{
|
|
CreateFunc: func(createEvent event.CreateEvent) bool {
|
|
obj := createEvent.Object.(*clusterv1alpha1.Cluster)
|
|
|
|
if obj.Spec.SecretRef == nil {
|
|
return false
|
|
}
|
|
|
|
return obj.Spec.SyncMode == clusterv1alpha1.Push
|
|
},
|
|
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
|
|
obj := updateEvent.ObjectNew.(*clusterv1alpha1.Cluster)
|
|
|
|
if obj.Spec.SecretRef == nil {
|
|
return false
|
|
}
|
|
|
|
return obj.Spec.SyncMode == clusterv1alpha1.Push
|
|
},
|
|
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
|
|
obj := deleteEvent.Object.(*clusterv1alpha1.Cluster)
|
|
|
|
if obj.Spec.SecretRef == nil {
|
|
return false
|
|
}
|
|
|
|
return obj.Spec.SyncMode == clusterv1alpha1.Push
|
|
},
|
|
GenericFunc: func(event.GenericEvent) bool {
|
|
return false
|
|
},
|
|
}
|
|
clusterStatusController := &status.ClusterStatusController{
|
|
Client: mgr.GetClient(),
|
|
KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()),
|
|
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
|
|
PredicateFunc: clusterPredicateFunc,
|
|
TypedInformerManager: typedmanager.GetInstance(),
|
|
GenericInformerManager: genericmanager.GetInstance(),
|
|
StopChan: stopChan,
|
|
ClusterClientSetFunc: util.NewClusterClientSet,
|
|
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
|
|
ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst},
|
|
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
|
ClusterLeaseDuration: opts.ClusterLeaseDuration,
|
|
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
|
|
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
|
|
ClusterFailureThreshold: opts.ClusterFailureThreshold,
|
|
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
EnableClusterResourceModeling: ctx.Opts.EnableClusterResourceModeling,
|
|
}
|
|
if err := clusterStatusController.SetupWithManager(mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startBindingController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
bindingController := &binding.ResourceBindingController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
DynamicClient: ctx.DynamicClientSet,
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(binding.ControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
OverrideManager: ctx.OverrideManager,
|
|
InformerManager: ctx.ControlPlaneInformerManager,
|
|
ResourceInterpreter: ctx.ResourceInterpreter,
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
}
|
|
if err := bindingController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
clusterResourceBindingController := &binding.ClusterResourceBindingController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
DynamicClient: ctx.DynamicClientSet,
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(binding.ClusterResourceBindingControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
OverrideManager: ctx.OverrideManager,
|
|
InformerManager: ctx.ControlPlaneInformerManager,
|
|
ResourceInterpreter: ctx.ResourceInterpreter,
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
}
|
|
if err := clusterResourceBindingController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startBindingStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
rbStatusController := &status.RBStatusController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
DynamicClient: ctx.DynamicClientSet,
|
|
InformerManager: ctx.ControlPlaneInformerManager,
|
|
ResourceInterpreter: ctx.ResourceInterpreter,
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.RBStatusControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
}
|
|
if err := rbStatusController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
crbStatusController := &status.CRBStatusController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
DynamicClient: ctx.DynamicClientSet,
|
|
InformerManager: ctx.ControlPlaneInformerManager,
|
|
ResourceInterpreter: ctx.ResourceInterpreter,
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.CRBStatusControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
}
|
|
if err := crbStatusController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func startExecutionController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
executionController := &execution.Controller{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
ObjectWatcher: ctx.ObjectWatcher,
|
|
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
|
|
InformerManager: genericmanager.GetInstance(),
|
|
RatelimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
}
|
|
if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
opts := ctx.Opts
|
|
workStatusController := &status.WorkStatusController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
InformerManager: genericmanager.GetInstance(),
|
|
StopChan: ctx.StopChan,
|
|
ObjectWatcher: ctx.ObjectWatcher,
|
|
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
|
|
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
|
|
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
|
|
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
ResourceInterpreter: ctx.ResourceInterpreter,
|
|
}
|
|
workStatusController.RunWorkQueue()
|
|
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
|
|
klog.Fatalf("Failed to setup work status controller: %v", err)
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startNamespaceController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
namespaceSyncController := &namespace.Controller{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(namespace.ControllerName),
|
|
SkippedPropagatingNamespaces: ctx.Opts.SkippedPropagatingNamespaces,
|
|
OverrideManager: ctx.OverrideManager,
|
|
}
|
|
if err := namespaceSyncController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startServiceExportController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
opts := ctx.Opts
|
|
serviceExportController := &mcs.ServiceExportController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
InformerManager: genericmanager.GetInstance(),
|
|
StopChan: ctx.StopChan,
|
|
WorkerNumber: 3,
|
|
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
|
|
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
|
|
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
|
|
}
|
|
serviceExportController.RunWorkQueue()
|
|
if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startEndpointSliceCollectController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
if !features.FeatureGate.Enabled(features.MultiClusterService) {
|
|
return false, nil
|
|
}
|
|
opts := ctx.Opts
|
|
endpointSliceCollectController := &multiclusterservice.EndpointSliceCollectController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
InformerManager: genericmanager.GetInstance(),
|
|
StopChan: ctx.StopChan,
|
|
WorkerNumber: 3,
|
|
PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr),
|
|
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
|
|
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
|
|
}
|
|
endpointSliceCollectController.RunWorkQueue()
|
|
if err := endpointSliceCollectController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startEndpointSliceDispatchController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
if !features.FeatureGate.Enabled(features.MultiClusterService) {
|
|
return false, nil
|
|
}
|
|
endpointSliceSyncController := &multiclusterservice.EndpointsliceDispatchController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(multiclusterservice.EndpointsliceDispatchControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
InformerManager: genericmanager.GetInstance(),
|
|
}
|
|
if err := endpointSliceSyncController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startEndpointSliceController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
endpointSliceController := &mcs.EndpointSliceController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.EndpointSliceControllerName),
|
|
}
|
|
if err := endpointSliceController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startServiceImportController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
serviceImportController := &mcs.ServiceImportController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceImportControllerName),
|
|
}
|
|
if err := serviceImportController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startUnifiedAuthController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
unifiedAuthController := &unifiedauth.Controller{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(unifiedauth.ControllerName),
|
|
}
|
|
if err := unifiedAuthController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startFederatedResourceQuotaSyncController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
controller := federatedresourcequota.SyncController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.SyncControllerName),
|
|
}
|
|
if err = controller.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startFederatedResourceQuotaStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
controller := federatedresourcequota.StatusController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.StatusControllerName),
|
|
}
|
|
if err = controller.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startGracefulEvictionController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
rbGracefulEvictionController := &gracefuleviction.RBGracefulEvictionController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(gracefuleviction.RBGracefulEvictionControllerName),
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
GracefulEvictionTimeout: ctx.Opts.GracefulEvictionTimeout.Duration,
|
|
}
|
|
if err := rbGracefulEvictionController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
crbGracefulEvictionController := &gracefuleviction.CRBGracefulEvictionController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(gracefuleviction.CRBGracefulEvictionControllerName),
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
GracefulEvictionTimeout: ctx.Opts.GracefulEvictionTimeout.Duration,
|
|
}
|
|
if err := crbGracefulEvictionController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func startApplicationFailoverController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
rbApplicationFailoverController := applicationfailover.RBApplicationFailoverController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(applicationfailover.RBApplicationFailoverControllerName),
|
|
ResourceInterpreter: ctx.ResourceInterpreter,
|
|
}
|
|
if err = rbApplicationFailoverController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
crbApplicationFailoverController := applicationfailover.CRBApplicationFailoverController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(applicationfailover.CRBApplicationFailoverControllerName),
|
|
ResourceInterpreter: ctx.ResourceInterpreter,
|
|
}
|
|
if err = crbApplicationFailoverController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startFederatedHorizontalPodAutoscalerController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(ctx.KubeClientSet.Discovery())
|
|
go custom_metrics.PeriodicallyInvalidate(
|
|
apiVersionsGetter,
|
|
ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerSyncPeriod.Duration,
|
|
ctx.StopChan)
|
|
metricsClient := metricsclient.NewRESTMetricsClient(
|
|
resourceclient.NewForConfigOrDie(ctx.Mgr.GetConfig()),
|
|
custom_metrics.NewForConfig(ctx.Mgr.GetConfig(), ctx.Mgr.GetRESTMapper(), apiVersionsGetter),
|
|
external_metrics.NewForConfigOrDie(ctx.Mgr.GetConfig()),
|
|
)
|
|
replicaCalculator := federatedhpa.NewReplicaCalculator(metricsClient,
|
|
ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerTolerance,
|
|
ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
|
|
ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerInitialReadinessDelay.Duration)
|
|
federatedHPAController := federatedhpa.FHPAController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedhpa.ControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
DownscaleStabilisationWindow: ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
|
|
HorizontalPodAutoscalerSyncPeriod: ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerSyncPeriod.Duration,
|
|
ReplicaCalc: replicaCalculator,
|
|
ClusterScaleClientSetFunc: util.NewClusterScaleClientSet,
|
|
TypedInformerManager: typedmanager.GetInstance(),
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
|
|
}
|
|
if err = federatedHPAController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
cronFHPAController := cronfederatedhpa.CronFHPAController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(cronfederatedhpa.ControllerName),
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
}
|
|
if err = cronFHPAController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startHPAScaleTargetMarkerController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
hpaScaleTargetMarker := hpascaletargetmarker.HpaScaleTargetMarker{
|
|
DynamicClient: ctx.DynamicClientSet,
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
}
|
|
err = hpaScaleTargetMarker.SetupWithManager(ctx.Mgr)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func startDeploymentReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
deploymentReplicasSyncer := deploymentreplicassyncer.DeploymentReplicasSyncer{
|
|
Client: ctx.Mgr.GetClient(),
|
|
}
|
|
err = deploymentReplicasSyncer.SetupWithManager(ctx.Mgr)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func startMCSController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
if !features.FeatureGate.Enabled(features.MultiClusterService) {
|
|
return false, nil
|
|
}
|
|
mcsController := &multiclusterservice.MCSController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(multiclusterservice.ControllerName),
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
}
|
|
if err = mcsController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startRemedyController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
c := &remediation.RemedyController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
RateLimitOptions: ctx.Opts.RateLimiterOptions,
|
|
}
|
|
if err = c.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startWorkloadRebalancerController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
workloadRebalancer := workloadrebalancer.RebalancerController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
}
|
|
err = workloadRebalancer.SetupWithManager(ctx.Mgr)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func startAgentCSRApprovingController(ctx controllerscontext.Context) (enabled bool, err error) {
|
|
agentCSRApprover := approver.AgentCSRApprovingController{Client: ctx.KubeClientSet}
|
|
err = agentCSRApprover.SetupWithManager(ctx.Mgr)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// setupControllers initialize controllers and setup one by one.
|
|
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
|
|
restConfig := mgr.GetConfig()
|
|
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
|
|
discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
|
|
kubeClientSet := kubeclientset.NewForConfigOrDie(restConfig)
|
|
|
|
overrideManager := overridemanager.New(mgr.GetClient(), mgr.GetEventRecorderFor(overridemanager.OverrideManagerName))
|
|
skippedResourceConfig := util.NewSkippedResourceConfig()
|
|
if err := skippedResourceConfig.Parse(opts.SkippedPropagatingAPIs); err != nil {
|
|
// The program will never go here because the parameters have been checked
|
|
return
|
|
}
|
|
|
|
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, opts.ResyncPeriod.Duration, stopChan)
|
|
// We need a service lister to build a resource interpreter with `ClusterIPServiceResolver`
|
|
// witch allows connection to the customized interpreter webhook without a cluster DNS service.
|
|
sharedFactory := informers.NewSharedInformerFactory(kubeClientSet, opts.ResyncPeriod.Duration)
|
|
serviceLister := sharedFactory.Core().V1().Services().Lister()
|
|
sharedFactory.Start(stopChan)
|
|
sharedFactory.WaitForCacheSync(stopChan)
|
|
|
|
resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
|
|
if err := mgr.Add(resourceInterpreter); err != nil {
|
|
klog.Fatalf("Failed to setup custom resource interpreter: %v", err)
|
|
}
|
|
|
|
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
|
|
|
|
resourceDetector := &detector.ResourceDetector{
|
|
DiscoveryClientSet: discoverClientSet,
|
|
Client: mgr.GetClient(),
|
|
InformerManager: controlPlaneInformerManager,
|
|
RESTMapper: mgr.GetRESTMapper(),
|
|
DynamicClient: dynamicClientSet,
|
|
SkippedResourceConfig: skippedResourceConfig,
|
|
SkippedPropagatingNamespaces: opts.SkippedNamespacesRegexps(),
|
|
ResourceInterpreter: resourceInterpreter,
|
|
EventRecorder: mgr.GetEventRecorderFor("resource-detector"),
|
|
ConcurrentPropagationPolicySyncs: opts.ConcurrentPropagationPolicySyncs,
|
|
ConcurrentClusterPropagationPolicySyncs: opts.ConcurrentClusterPropagationPolicySyncs,
|
|
ConcurrentResourceTemplateSyncs: opts.ConcurrentResourceTemplateSyncs,
|
|
RateLimiterOptions: opts.RateLimiterOpts,
|
|
}
|
|
|
|
if err := mgr.Add(resourceDetector); err != nil {
|
|
klog.Fatalf("Failed to setup resource detector: %v", err)
|
|
}
|
|
if features.FeatureGate.Enabled(features.PropagateDeps) {
|
|
dependenciesDistributor := &dependenciesdistributor.DependenciesDistributor{
|
|
Client: mgr.GetClient(),
|
|
DynamicClient: dynamicClientSet,
|
|
InformerManager: controlPlaneInformerManager,
|
|
ResourceInterpreter: resourceInterpreter,
|
|
RESTMapper: mgr.GetRESTMapper(),
|
|
EventRecorder: mgr.GetEventRecorderFor("dependencies-distributor"),
|
|
RateLimiterOptions: opts.RateLimiterOpts,
|
|
ConcurrentDependentResourceSyncs: opts.ConcurrentDependentResourceSyncs,
|
|
}
|
|
if err := dependenciesDistributor.SetupWithManager(mgr); err != nil {
|
|
klog.Fatalf("Failed to setup dependencies distributor: %v", err)
|
|
}
|
|
}
|
|
setupClusterAPIClusterDetector(mgr, opts, stopChan)
|
|
controllerContext := controllerscontext.Context{
|
|
Mgr: mgr,
|
|
ObjectWatcher: objectWatcher,
|
|
Opts: controllerscontext.Options{
|
|
Controllers: opts.Controllers,
|
|
ClusterMonitorPeriod: opts.ClusterMonitorPeriod,
|
|
ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod,
|
|
ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod,
|
|
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
|
FailoverEvictionTimeout: opts.FailoverEvictionTimeout,
|
|
ClusterLeaseDuration: opts.ClusterLeaseDuration,
|
|
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
|
|
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
|
|
ClusterFailureThreshold: opts.ClusterFailureThreshold,
|
|
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
|
|
ClusterAPIQPS: opts.ClusterAPIQPS,
|
|
ClusterAPIBurst: opts.ClusterAPIBurst,
|
|
SkippedPropagatingNamespaces: opts.SkippedNamespacesRegexps(),
|
|
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
|
|
EnableTaintManager: opts.EnableTaintManager,
|
|
RateLimiterOptions: opts.RateLimiterOpts,
|
|
GracefulEvictionTimeout: opts.GracefulEvictionTimeout,
|
|
EnableClusterResourceModeling: opts.EnableClusterResourceModeling,
|
|
HPAControllerConfiguration: opts.HPAControllerConfiguration,
|
|
},
|
|
StopChan: stopChan,
|
|
DynamicClientSet: dynamicClientSet,
|
|
KubeClientSet: kubeClientSet,
|
|
OverrideManager: overrideManager,
|
|
ControlPlaneInformerManager: controlPlaneInformerManager,
|
|
ResourceInterpreter: resourceInterpreter,
|
|
}
|
|
|
|
if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
|
|
klog.Fatalf("error starting controllers: %v", err)
|
|
}
|
|
|
|
// Ensure the InformerManager stops when the stop channel closes
|
|
go func() {
|
|
<-stopChan
|
|
genericmanager.StopInstance()
|
|
}()
|
|
}
|
|
|
|
// setupClusterAPIClusterDetector initialize Cluster detector with the cluster-api management cluster.
|
|
func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
|
|
if len(opts.ClusterAPIKubeconfig) == 0 {
|
|
return
|
|
}
|
|
|
|
klog.Infof("Begin to setup cluster-api cluster detector")
|
|
|
|
clusterAPIRestConfig, err := apiclient.RestConfig(opts.ClusterAPIContext, opts.ClusterAPIKubeconfig)
|
|
if err != nil {
|
|
klog.Fatalf("Failed to get cluster-api management cluster rest config. context: %s, kubeconfig: %s, err: %v", opts.ClusterAPIContext, opts.ClusterAPIKubeconfig, err)
|
|
}
|
|
|
|
clusterAPIClient, err := gclient.NewForConfig(clusterAPIRestConfig)
|
|
if err != nil {
|
|
klog.Fatalf("Failed to get config from clusterAPIRestConfig: %v", err)
|
|
}
|
|
|
|
clusterAPIClusterDetector := &clusterapi.ClusterDetector{
|
|
ControllerPlaneConfig: mgr.GetConfig(),
|
|
ClusterAPIConfig: clusterAPIRestConfig,
|
|
ClusterAPIClient: clusterAPIClient,
|
|
InformerManager: genericmanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0, stopChan),
|
|
ConcurrentReconciles: 3,
|
|
}
|
|
if err := mgr.Add(clusterAPIClusterDetector); err != nil {
|
|
klog.Fatalf("Failed to setup cluster-api cluster detector: %v", err)
|
|
}
|
|
|
|
klog.Infof("Success to setup cluster-api cluster detector")
|
|
}
|