diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index e10bacfd1..87eea83a4 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -103,9 +103,238 @@ func Run(ctx context.Context, opts *options.Options) error { return nil } +// ControllerContext defines the context object for controller +type ControllerContext struct { + Mgr controllerruntime.Manager + ObjectWatcher objectwatcher.ObjectWatcher + Opts *options.Options + StopChan <-chan struct{} + DynamicClientSet dynamic.Interface + OverrideManager overridemanager.OverrideManager + ControlPlaneInformerManager informermanager.SingleClusterInformerManager +} + +// InitFunc is used to launch a particular controller. +// Any error returned will cause the controller process to `Fatal` +// The bool indicates whether the controller was enabled. +type InitFunc func(ctx ControllerContext) (enabled bool, err error) + +// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func) +func NewControllerInitializers() map[string]InitFunc { + controllers := map[string]InitFunc{} + controllers["cluster"] = startClusterController + controllers["clusterStatus"] = startClusterStatusController + controllers["hpa"] = startHpaController + controllers["binding"] = startBindingController + controllers["execution"] = startExecutionController + controllers["workStatus"] = startWorkStatusController + controllers["namespace"] = startNamespaceController + controllers["serviceExport"] = startServiceExportController + controllers["endpointSlice"] = startEndpointSliceController + controllers["serviceImport"] = startServiceImportController + return controllers +} + +func startClusterController(ctx ControllerContext) (enabled bool, err error) { + mgr := ctx.Mgr + opts := ctx.Opts + clusterController := &cluster.Controller{ + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName), + ClusterMonitorPeriod: opts.ClusterMonitorPeriod.Duration, + ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod.Duration, + ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod.Duration, + } + if err := clusterController.SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed to setup cluster controller: %v", err) + return false, err + } + return true, nil +} + +func startClusterStatusController(ctx ControllerContext) (enabled bool, err error) { + mgr := ctx.Mgr + opts := ctx.Opts + stopChan := ctx.StopChan + clusterPredicateFunc := predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + obj := createEvent.Object.(*clusterv1alpha1.Cluster) + return obj.Spec.SyncMode == clusterv1alpha1.Push + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + obj := updateEvent.ObjectNew.(*clusterv1alpha1.Cluster) + return obj.Spec.SyncMode == clusterv1alpha1.Push + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + obj := deleteEvent.Object.(*clusterv1alpha1.Cluster) + return obj.Spec.SyncMode == clusterv1alpha1.Push + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + } + clusterStatusController := &status.ClusterStatusController{ + Client: mgr.GetClient(), + KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()), + EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), + PredicateFunc: clusterPredicateFunc, + InformerManager: informermanager.GetInstance(), + StopChan: stopChan, + ClusterClientSetFunc: util.NewClusterClientSet, + ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, + ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst}, + ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency, + ClusterLeaseDuration: opts.ClusterLeaseDuration, + ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction, + } + if err := clusterStatusController.SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed to setup cluster status controller: %v", err) + return false, err + } + return true, nil +} + +func startHpaController(ctx ControllerContext) (enabled bool, err error) { + hpaController := &hpa.HorizontalPodAutoscalerController{ + Client: ctx.Mgr.GetClient(), + DynamicClient: ctx.DynamicClientSet, + EventRecorder: ctx.Mgr.GetEventRecorderFor(hpa.ControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: ctx.ControlPlaneInformerManager, + } + if err := hpaController.SetupWithManager(ctx.Mgr); err != nil { + klog.Fatalf("Failed to setup hpa controller: %v", err) + return false, err + } + return true, nil +} + +func startBindingController(ctx ControllerContext) (enabled bool, err error) { + bindingController := &binding.ResourceBindingController{ + Client: ctx.Mgr.GetClient(), + DynamicClient: ctx.DynamicClientSet, + EventRecorder: ctx.Mgr.GetEventRecorderFor(binding.ControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + OverrideManager: ctx.OverrideManager, + InformerManager: ctx.ControlPlaneInformerManager, + } + if err := bindingController.SetupWithManager(ctx.Mgr); err != nil { + klog.Fatalf("Failed to setup binding controller: %v", err) + return false, err + } + + clusterResourceBindingController := &binding.ClusterResourceBindingController{ + Client: ctx.Mgr.GetClient(), + DynamicClient: ctx.DynamicClientSet, + EventRecorder: ctx.Mgr.GetEventRecorderFor(binding.ClusterResourceBindingControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + OverrideManager: ctx.OverrideManager, + InformerManager: ctx.ControlPlaneInformerManager, + } + if err := clusterResourceBindingController.SetupWithManager(ctx.Mgr); err != nil { + klog.Fatalf("Failed to setup cluster resource binding controller: %v", err) + return false, err + } + return true, nil +} + +func startExecutionController(ctx ControllerContext) (enabled bool, err error) { + executionController := &execution.Controller{ + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + ObjectWatcher: ctx.ObjectWatcher, + PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), + InformerManager: informermanager.GetInstance(), + } + if err := executionController.SetupWithManager(ctx.Mgr); err != nil { + klog.Fatalf("Failed to setup execution controller: %v", err) + return false, err + } + return true, nil +} + +func startWorkStatusController(ctx ControllerContext) (enabled bool, err error) { + workStatusController := &status.WorkStatusController{ + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: informermanager.GetInstance(), + StopChan: ctx.StopChan, + WorkerNumber: 1, + ObjectWatcher: ctx.ObjectWatcher, + PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), + ClusterClientSetFunc: util.NewClusterDynamicClientSet, + } + workStatusController.RunWorkQueue() + if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { + klog.Fatalf("Failed to setup work status controller: %v", err) + return false, err + } + return true, nil +} + +func startNamespaceController(ctx ControllerContext) (enabled bool, err error) { + skippedPropagatingNamespaces := map[string]struct{}{} + for _, ns := range ctx.Opts.SkippedPropagatingNamespaces { + skippedPropagatingNamespaces[ns] = struct{}{} + } + namespaceSyncController := &namespace.Controller{ + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(namespace.ControllerName), + SkippedPropagatingNamespaces: skippedPropagatingNamespaces, + } + if err := namespaceSyncController.SetupWithManager(ctx.Mgr); err != nil { + klog.Fatalf("Failed to setup namespace sync controller: %v", err) + return false, err + } + return true, nil +} + +func startServiceExportController(ctx ControllerContext) (enabled bool, err error) { + serviceExportController := &mcs.ServiceExportController{ + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceExportControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: informermanager.GetInstance(), + StopChan: ctx.StopChan, + WorkerNumber: 1, + PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr), + ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, + } + serviceExportController.RunWorkQueue() + if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil { + klog.Fatalf("Failed to setup ServiceExport controller: %v", err) + return false, err + } + return true, nil +} + +func startEndpointSliceController(ctx ControllerContext) (enabled bool, err error) { + endpointSliceController := &mcs.EndpointSliceController{ + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.EndpointSliceControllerName), + } + if err := endpointSliceController.SetupWithManager(ctx.Mgr); err != nil { + klog.Fatalf("Failed to setup EndpointSlice controller: %v", err) + return false, err + } + return true, nil +} + +func startServiceImportController(ctx ControllerContext) (enabled bool, err error) { + serviceImportController := &mcs.ServiceImportController{ + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceImportControllerName), + } + if err := serviceImportController.SetupWithManager(ctx.Mgr); err != nil { + klog.Fatalf("Failed to setup ServiceImport controller: %v", err) + return false, err + } + return true, nil +} + // setupControllers initialize controllers and setup one by one. -// Note: ignore cyclomatic complexity check(by gocyclo) because it will not effect readability. -//nolint:gocyclo func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) { restConfig := mgr.GetConfig() dynamicClientSet := dynamic.NewForConfigOrDie(restConfig) @@ -147,175 +376,18 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop } setupClusterAPIClusterDetector(mgr, opts, stopChan) - - if opts.IsControllerEnabled("cluster") { - clusterController := &cluster.Controller{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName), - ClusterMonitorPeriod: opts.ClusterMonitorPeriod.Duration, - ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod.Duration, - ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod.Duration, - } - if err := clusterController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup cluster controller: %v", err) - } + controllerContext := ControllerContext{ + Mgr: mgr, + ObjectWatcher: objectWatcher, + Opts: opts, + StopChan: stopChan, + DynamicClientSet: dynamicClientSet, + OverrideManager: overrideManager, + ControlPlaneInformerManager: controlPlaneInformerManager, } - if opts.IsControllerEnabled("clusterStatus") { - clusterPredicateFunc := predicate.Funcs{ - CreateFunc: func(createEvent event.CreateEvent) bool { - obj := createEvent.Object.(*clusterv1alpha1.Cluster) - return obj.Spec.SyncMode == clusterv1alpha1.Push - }, - UpdateFunc: func(updateEvent event.UpdateEvent) bool { - obj := updateEvent.ObjectNew.(*clusterv1alpha1.Cluster) - return obj.Spec.SyncMode == clusterv1alpha1.Push - }, - DeleteFunc: func(deleteEvent event.DeleteEvent) bool { - obj := deleteEvent.Object.(*clusterv1alpha1.Cluster) - return obj.Spec.SyncMode == clusterv1alpha1.Push - }, - GenericFunc: func(genericEvent event.GenericEvent) bool { - return false - }, - } - - clusterStatusController := &status.ClusterStatusController{ - Client: mgr.GetClient(), - KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()), - EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), - PredicateFunc: clusterPredicateFunc, - InformerManager: informermanager.GetInstance(), - StopChan: stopChan, - ClusterClientSetFunc: util.NewClusterClientSet, - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, - ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst}, - ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency, - ClusterLeaseDuration: opts.ClusterLeaseDuration, - ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction, - } - if err := clusterStatusController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup cluster status controller: %v", err) - } - } - - if opts.IsControllerEnabled("hpa") { - hpaController := &hpa.HorizontalPodAutoscalerController{ - Client: mgr.GetClient(), - DynamicClient: dynamicClientSet, - EventRecorder: mgr.GetEventRecorderFor(hpa.ControllerName), - RESTMapper: mgr.GetRESTMapper(), - InformerManager: controlPlaneInformerManager, - } - if err := hpaController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup hpa controller: %v", err) - } - } - - if opts.IsControllerEnabled("binding") { - bindingController := &binding.ResourceBindingController{ - Client: mgr.GetClient(), - DynamicClient: dynamicClientSet, - EventRecorder: mgr.GetEventRecorderFor(binding.ControllerName), - RESTMapper: mgr.GetRESTMapper(), - OverrideManager: overrideManager, - InformerManager: controlPlaneInformerManager, - } - if err := bindingController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup binding controller: %v", err) - } - - clusterResourceBindingController := &binding.ClusterResourceBindingController{ - Client: mgr.GetClient(), - DynamicClient: dynamicClientSet, - EventRecorder: mgr.GetEventRecorderFor(binding.ClusterResourceBindingControllerName), - RESTMapper: mgr.GetRESTMapper(), - OverrideManager: overrideManager, - InformerManager: controlPlaneInformerManager, - } - if err := clusterResourceBindingController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup cluster resource binding controller: %v", err) - } - } - - if opts.IsControllerEnabled("execution") { - executionController := &execution.Controller{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName), - RESTMapper: mgr.GetRESTMapper(), - ObjectWatcher: objectWatcher, - PredicateFunc: helper.NewExecutionPredicate(mgr), - InformerManager: informermanager.GetInstance(), - } - if err := executionController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup execution controller: %v", err) - } - } - - if opts.IsControllerEnabled("workStatus") { - workStatusController := &status.WorkStatusController{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName), - RESTMapper: mgr.GetRESTMapper(), - InformerManager: informermanager.GetInstance(), - StopChan: stopChan, - WorkerNumber: 1, - ObjectWatcher: objectWatcher, - PredicateFunc: helper.NewExecutionPredicate(mgr), - ClusterClientSetFunc: util.NewClusterDynamicClientSet, - } - workStatusController.RunWorkQueue() - if err := workStatusController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup work status controller: %v", err) - } - } - - if opts.IsControllerEnabled("namespace") { - namespaceSyncController := &namespace.Controller{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(namespace.ControllerName), - SkippedPropagatingNamespaces: skippedPropagatingNamespaces, - } - if err := namespaceSyncController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup namespace sync controller: %v", err) - } - } - - if opts.IsControllerEnabled("serviceExport") { - serviceExportController := &mcs.ServiceExportController{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName), - RESTMapper: mgr.GetRESTMapper(), - InformerManager: informermanager.GetInstance(), - StopChan: stopChan, - WorkerNumber: 1, - PredicateFunc: helper.NewPredicateForServiceExportController(mgr), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, - } - serviceExportController.RunWorkQueue() - if err := serviceExportController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup ServiceExport controller: %v", err) - } - } - - if opts.IsControllerEnabled("endpointSlice") { - endpointSliceController := &mcs.EndpointSliceController{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(mcs.EndpointSliceControllerName), - } - if err := endpointSliceController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup EndpointSlice controller: %v", err) - } - } - - if opts.IsControllerEnabled("serviceImport") { - serviceImportController := &mcs.ServiceImportController{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceImportControllerName), - } - if err := serviceImportController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup ServiceImport controller: %v", err) - } + if err := StartControllers(controllerContext, NewControllerInitializers()); err != nil { + klog.Fatalf("error starting controllers: %v", err) } // Ensure the InformerManager stops when the stop channel closes @@ -325,6 +397,29 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop }() } +// StartControllers starts a set of controllers with a specified ControllerContext +func StartControllers(ctx ControllerContext, controllers map[string]InitFunc) error { + for controllerName, initFn := range controllers { + if !ctx.IsControllerEnabled(controllerName) { + klog.Warningf("%q is disabled", controllerName) + continue + } + klog.V(1).Infof("Starting %q", controllerName) + started, err := initFn(ctx) + if err != nil { + klog.Errorf("Error starting %q", controllerName) + return err + } + if !started { + klog.Warningf("Skipping %q", controllerName) + continue + } + klog.Infof("Started %q", controllerName) + } + + return nil +} + // setupClusterAPIClusterDetector initialize Cluster detector with the cluster-api management cluster. func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) { if len(opts.ClusterAPIKubeconfig) == 0 { @@ -357,3 +452,20 @@ func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options klog.Infof("Success to setup cluster-api cluster detector") } + +// IsControllerEnabled check if a specified controller enabled or not. +func (c ControllerContext) IsControllerEnabled(name string) bool { + hasStar := false + for _, ctrl := range c.Opts.Controllers { + if ctrl == name { + return true + } + if ctrl == "-"+name { + return false + } + if ctrl == "*" { + hasStar = true + } + } + return hasStar +}