Merge pull request #1093 from jameszhangyukun/enhance-controller

Enhancement --controllers flag of karmada-controller-manager
This commit is contained in:
karmada-bot 2021-12-16 20:05:22 +08:00 committed by GitHub
commit c72470e5ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 281 additions and 169 deletions

View File

@ -103,9 +103,238 @@ func Run(ctx context.Context, opts *options.Options) error {
return nil return nil
} }
// ControllerContext defines the context object for controller
type ControllerContext struct {
Mgr controllerruntime.Manager
ObjectWatcher objectwatcher.ObjectWatcher
Opts *options.Options
StopChan <-chan struct{}
DynamicClientSet dynamic.Interface
OverrideManager overridemanager.OverrideManager
ControlPlaneInformerManager informermanager.SingleClusterInformerManager
}
// InitFunc is used to launch a particular controller.
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type InitFunc func(ctx ControllerContext) (enabled bool, err error)
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
func NewControllerInitializers() map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["cluster"] = startClusterController
controllers["clusterStatus"] = startClusterStatusController
controllers["hpa"] = startHpaController
controllers["binding"] = startBindingController
controllers["execution"] = startExecutionController
controllers["workStatus"] = startWorkStatusController
controllers["namespace"] = startNamespaceController
controllers["serviceExport"] = startServiceExportController
controllers["endpointSlice"] = startEndpointSliceController
controllers["serviceImport"] = startServiceImportController
return controllers
}
func startClusterController(ctx ControllerContext) (enabled bool, err error) {
mgr := ctx.Mgr
opts := ctx.Opts
clusterController := &cluster.Controller{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName),
ClusterMonitorPeriod: opts.ClusterMonitorPeriod.Duration,
ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod.Duration,
ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod.Duration,
}
if err := clusterController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup cluster controller: %v", err)
return false, err
}
return true, nil
}
func startClusterStatusController(ctx ControllerContext) (enabled bool, err error) {
mgr := ctx.Mgr
opts := ctx.Opts
stopChan := ctx.StopChan
clusterPredicateFunc := predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
obj := createEvent.Object.(*clusterv1alpha1.Cluster)
return obj.Spec.SyncMode == clusterv1alpha1.Push
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
obj := updateEvent.ObjectNew.(*clusterv1alpha1.Cluster)
return obj.Spec.SyncMode == clusterv1alpha1.Push
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
obj := deleteEvent.Object.(*clusterv1alpha1.Cluster)
return obj.Spec.SyncMode == clusterv1alpha1.Push
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
}
clusterStatusController := &status.ClusterStatusController{
Client: mgr.GetClient(),
KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()),
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
PredicateFunc: clusterPredicateFunc,
InformerManager: informermanager.GetInstance(),
StopChan: stopChan,
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst},
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
}
if err := clusterStatusController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup cluster status controller: %v", err)
return false, err
}
return true, nil
}
func startHpaController(ctx ControllerContext) (enabled bool, err error) {
hpaController := &hpa.HorizontalPodAutoscalerController{
Client: ctx.Mgr.GetClient(),
DynamicClient: ctx.DynamicClientSet,
EventRecorder: ctx.Mgr.GetEventRecorderFor(hpa.ControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: ctx.ControlPlaneInformerManager,
}
if err := hpaController.SetupWithManager(ctx.Mgr); err != nil {
klog.Fatalf("Failed to setup hpa controller: %v", err)
return false, err
}
return true, nil
}
func startBindingController(ctx ControllerContext) (enabled bool, err error) {
bindingController := &binding.ResourceBindingController{
Client: ctx.Mgr.GetClient(),
DynamicClient: ctx.DynamicClientSet,
EventRecorder: ctx.Mgr.GetEventRecorderFor(binding.ControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
OverrideManager: ctx.OverrideManager,
InformerManager: ctx.ControlPlaneInformerManager,
}
if err := bindingController.SetupWithManager(ctx.Mgr); err != nil {
klog.Fatalf("Failed to setup binding controller: %v", err)
return false, err
}
clusterResourceBindingController := &binding.ClusterResourceBindingController{
Client: ctx.Mgr.GetClient(),
DynamicClient: ctx.DynamicClientSet,
EventRecorder: ctx.Mgr.GetEventRecorderFor(binding.ClusterResourceBindingControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
OverrideManager: ctx.OverrideManager,
InformerManager: ctx.ControlPlaneInformerManager,
}
if err := clusterResourceBindingController.SetupWithManager(ctx.Mgr); err != nil {
klog.Fatalf("Failed to setup cluster resource binding controller: %v", err)
return false, err
}
return true, nil
}
func startExecutionController(ctx ControllerContext) (enabled bool, err error) {
executionController := &execution.Controller{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
InformerManager: informermanager.GetInstance(),
}
if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
klog.Fatalf("Failed to setup execution controller: %v", err)
return false, err
}
return true, nil
}
func startWorkStatusController(ctx ControllerContext) (enabled bool, err error) {
workStatusController := &status.WorkStatusController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 1,
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
}
workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
klog.Fatalf("Failed to setup work status controller: %v", err)
return false, err
}
return true, nil
}
func startNamespaceController(ctx ControllerContext) (enabled bool, err error) {
skippedPropagatingNamespaces := map[string]struct{}{}
for _, ns := range ctx.Opts.SkippedPropagatingNamespaces {
skippedPropagatingNamespaces[ns] = struct{}{}
}
namespaceSyncController := &namespace.Controller{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(namespace.ControllerName),
SkippedPropagatingNamespaces: skippedPropagatingNamespaces,
}
if err := namespaceSyncController.SetupWithManager(ctx.Mgr); err != nil {
klog.Fatalf("Failed to setup namespace sync controller: %v", err)
return false, err
}
return true, nil
}
func startServiceExportController(ctx ControllerContext) (enabled bool, err error) {
serviceExportController := &mcs.ServiceExportController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 1,
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
}
serviceExportController.RunWorkQueue()
if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil {
klog.Fatalf("Failed to setup ServiceExport controller: %v", err)
return false, err
}
return true, nil
}
func startEndpointSliceController(ctx ControllerContext) (enabled bool, err error) {
endpointSliceController := &mcs.EndpointSliceController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.EndpointSliceControllerName),
}
if err := endpointSliceController.SetupWithManager(ctx.Mgr); err != nil {
klog.Fatalf("Failed to setup EndpointSlice controller: %v", err)
return false, err
}
return true, nil
}
func startServiceImportController(ctx ControllerContext) (enabled bool, err error) {
serviceImportController := &mcs.ServiceImportController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceImportControllerName),
}
if err := serviceImportController.SetupWithManager(ctx.Mgr); err != nil {
klog.Fatalf("Failed to setup ServiceImport controller: %v", err)
return false, err
}
return true, nil
}
// setupControllers initialize controllers and setup one by one. // setupControllers initialize controllers and setup one by one.
// Note: ignore cyclomatic complexity check(by gocyclo) because it will not effect readability.
//nolint:gocyclo
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) { func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
restConfig := mgr.GetConfig() restConfig := mgr.GetConfig()
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig) dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
@ -147,175 +376,18 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
} }
setupClusterAPIClusterDetector(mgr, opts, stopChan) setupClusterAPIClusterDetector(mgr, opts, stopChan)
controllerContext := ControllerContext{
if opts.IsControllerEnabled("cluster") { Mgr: mgr,
clusterController := &cluster.Controller{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName),
ClusterMonitorPeriod: opts.ClusterMonitorPeriod.Duration,
ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod.Duration,
ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod.Duration,
}
if err := clusterController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup cluster controller: %v", err)
}
}
if opts.IsControllerEnabled("clusterStatus") {
clusterPredicateFunc := predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
obj := createEvent.Object.(*clusterv1alpha1.Cluster)
return obj.Spec.SyncMode == clusterv1alpha1.Push
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
obj := updateEvent.ObjectNew.(*clusterv1alpha1.Cluster)
return obj.Spec.SyncMode == clusterv1alpha1.Push
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
obj := deleteEvent.Object.(*clusterv1alpha1.Cluster)
return obj.Spec.SyncMode == clusterv1alpha1.Push
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
}
clusterStatusController := &status.ClusterStatusController{
Client: mgr.GetClient(),
KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()),
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
PredicateFunc: clusterPredicateFunc,
InformerManager: informermanager.GetInstance(),
StopChan: stopChan,
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst},
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
}
if err := clusterStatusController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup cluster status controller: %v", err)
}
}
if opts.IsControllerEnabled("hpa") {
hpaController := &hpa.HorizontalPodAutoscalerController{
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
EventRecorder: mgr.GetEventRecorderFor(hpa.ControllerName),
RESTMapper: mgr.GetRESTMapper(),
InformerManager: controlPlaneInformerManager,
}
if err := hpaController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup hpa controller: %v", err)
}
}
if opts.IsControllerEnabled("binding") {
bindingController := &binding.ResourceBindingController{
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
EventRecorder: mgr.GetEventRecorderFor(binding.ControllerName),
RESTMapper: mgr.GetRESTMapper(),
OverrideManager: overrideManager,
InformerManager: controlPlaneInformerManager,
}
if err := bindingController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup binding controller: %v", err)
}
clusterResourceBindingController := &binding.ClusterResourceBindingController{
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
EventRecorder: mgr.GetEventRecorderFor(binding.ClusterResourceBindingControllerName),
RESTMapper: mgr.GetRESTMapper(),
OverrideManager: overrideManager,
InformerManager: controlPlaneInformerManager,
}
if err := clusterResourceBindingController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup cluster resource binding controller: %v", err)
}
}
if opts.IsControllerEnabled("execution") {
executionController := &execution.Controller{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName),
RESTMapper: mgr.GetRESTMapper(),
ObjectWatcher: objectWatcher, ObjectWatcher: objectWatcher,
PredicateFunc: helper.NewExecutionPredicate(mgr), Opts: opts,
InformerManager: informermanager.GetInstance(),
}
if err := executionController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup execution controller: %v", err)
}
}
if opts.IsControllerEnabled("workStatus") {
workStatusController := &status.WorkStatusController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: stopChan, StopChan: stopChan,
WorkerNumber: 1, DynamicClientSet: dynamicClientSet,
ObjectWatcher: objectWatcher, OverrideManager: overrideManager,
PredicateFunc: helper.NewExecutionPredicate(mgr), ControlPlaneInformerManager: controlPlaneInformerManager,
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
}
workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup work status controller: %v", err)
}
} }
if opts.IsControllerEnabled("namespace") { if err := StartControllers(controllerContext, NewControllerInitializers()); err != nil {
namespaceSyncController := &namespace.Controller{ klog.Fatalf("error starting controllers: %v", err)
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(namespace.ControllerName),
SkippedPropagatingNamespaces: skippedPropagatingNamespaces,
}
if err := namespaceSyncController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup namespace sync controller: %v", err)
}
}
if opts.IsControllerEnabled("serviceExport") {
serviceExportController := &mcs.ServiceExportController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
RESTMapper: mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: stopChan,
WorkerNumber: 1,
PredicateFunc: helper.NewPredicateForServiceExportController(mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
}
serviceExportController.RunWorkQueue()
if err := serviceExportController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup ServiceExport controller: %v", err)
}
}
if opts.IsControllerEnabled("endpointSlice") {
endpointSliceController := &mcs.EndpointSliceController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.EndpointSliceControllerName),
}
if err := endpointSliceController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup EndpointSlice controller: %v", err)
}
}
if opts.IsControllerEnabled("serviceImport") {
serviceImportController := &mcs.ServiceImportController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceImportControllerName),
}
if err := serviceImportController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup ServiceImport controller: %v", err)
}
} }
// Ensure the InformerManager stops when the stop channel closes // Ensure the InformerManager stops when the stop channel closes
@ -325,6 +397,29 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
}() }()
} }
// StartControllers starts a set of controllers with a specified ControllerContext
func StartControllers(ctx ControllerContext, controllers map[string]InitFunc) error {
for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) {
klog.Warningf("%q is disabled", controllerName)
continue
}
klog.V(1).Infof("Starting %q", controllerName)
started, err := initFn(ctx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
klog.Warningf("Skipping %q", controllerName)
continue
}
klog.Infof("Started %q", controllerName)
}
return nil
}
// setupClusterAPIClusterDetector initialize Cluster detector with the cluster-api management cluster. // setupClusterAPIClusterDetector initialize Cluster detector with the cluster-api management cluster.
func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) { func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
if len(opts.ClusterAPIKubeconfig) == 0 { if len(opts.ClusterAPIKubeconfig) == 0 {
@ -357,3 +452,20 @@ func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options
klog.Infof("Success to setup cluster-api cluster detector") klog.Infof("Success to setup cluster-api cluster detector")
} }
// IsControllerEnabled check if a specified controller enabled or not.
func (c ControllerContext) IsControllerEnabled(name string) bool {
hasStar := false
for _, ctrl := range c.Opts.Controllers {
if ctrl == name {
return true
}
if ctrl == "-"+name {
return false
}
if ctrl == "*" {
hasStar = true
}
}
return hasStar
}