From 957be9179631a3c709e4195132ff59a6a96cc02c Mon Sep 17 00:00:00 2001 From: pigletfly Date: Tue, 22 Feb 2022 10:29:28 +0800 Subject: [PATCH 1/2] Fix controller reconcile concurrent Signed-off-by: pigletfly --- cmd/agent/app/agent.go | 32 ++++++---- cmd/agent/app/options/options.go | 29 +++++++++ .../app/controllermanager.go | 59 ++++++++++++------- cmd/controller-manager/app/options/options.go | 29 +++++++++ pkg/clusterdiscovery/clusterapi/clusterapi.go | 3 +- pkg/controllers/context/context.go | 4 ++ .../status/workstatus_controller.go | 27 ++++----- .../unifiedauth/unified_auth_controller.go | 3 +- pkg/detector/detector.go | 10 +++- 9 files changed, 147 insertions(+), 49 deletions(-) diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 68d8572b9..99f7e0ac8 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -15,9 +15,11 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "github.com/karmada-io/karmada/cmd/agent/app/options" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context" "github.com/karmada-io/karmada/pkg/controllers/execution" "github.com/karmada-io/karmada/pkg/controllers/mcs" @@ -126,6 +128,12 @@ func run(ctx context.Context, karmadaConfig karmadactl.KarmadaConfig, opts *opti LeaderElectionID: fmt.Sprintf("karmada-agent-%s", opts.ClusterName), LeaderElectionNamespace: opts.LeaderElection.ResourceNamespace, LeaderElectionResourceLock: opts.LeaderElection.ResourceLock, + Controller: v1alpha1.ControllerConfigurationSpec{ + GroupKindConcurrency: map[string]int{ + workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs, + clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs, + }, + }, }) if err != nil { return fmt.Errorf("failed to build controller manager: %w", err) @@ -165,6 +173,8 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, ClusterAPIQPS: opts.ClusterAPIQPS, ClusterAPIBurst: opts.ClusterAPIBurst, + ConcurrentWorkReconciles: opts.ConcurrentWorkSyncs, + ConcurrentServiceExportReconciles: opts.ConcurrentServiceExportSyncs, }, StopChan: stopChan, } @@ -222,16 +232,16 @@ func startExecutionController(ctx controllerscontext.Context) (bool, error) { func startWorkStatusController(ctx controllerscontext.Context) (bool, error) { workStatusController := &status.WorkStatusController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - InformerManager: informermanager.GetInstance(), - StopChan: ctx.StopChan, - WorkerNumber: 1, - ObjectWatcher: ctx.ObjectWatcher, - PredicateFunc: helper.NewExecutionPredicateOnAgent(), - ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: informermanager.GetInstance(), + StopChan: ctx.StopChan, + ObjectWatcher: ctx.ObjectWatcher, + PredicateFunc: helper.NewExecutionPredicateOnAgent(), + ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, + ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, + ConcurrentWorkReconciles: ctx.Opts.ConcurrentWorkReconciles, } workStatusController.RunWorkQueue() if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { @@ -247,7 +257,7 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error) RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: informermanager.GetInstance(), StopChan: ctx.StopChan, - WorkerNumber: 1, + WorkerNumber: ctx.Opts.ConcurrentServiceExportReconciles, PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(ctx.Opts.ClusterName), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index 092f0f321..f446def0b 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -55,6 +55,27 @@ type Options struct { ClusterAPIEndpoint string // ProxyServerAddress holds the proxy server address that is used to proxy to the cluster. ProxyServerAddress string + // concurrentClusterSyncs is the number of cluster objects that are + // allowed to sync concurrently. + ConcurrentClusterSyncs int + // concurrentClusterAPISyncs is the number of clusterapi controller workers. + ConcurrentClusterAPISyncs int + // ConcurrentClusterResourceBindingSyncs is the number of clusterresourcebinding objects that are + // allowed to sync concurrently. + ConcurrentClusterResourceBindingSyncs int + // ConcurrentWorkSyncs is the number of work objects that are + // allowed to sync concurrently. + ConcurrentWorkSyncs int + // ConcurrentResourceBindingSyncs is the number of resourcebinding objects that are + // allowed to sync concurrently. + ConcurrentResourceBindingSyncs int + // ConcurrentNamespaceSyncs is the number of name objects that are + // allowed to sync concurrently. + ConcurrentNamespaceSyncs int + // ConcurrentDetectorSyncs is the number of detector workers. + ConcurrentDetectorSyncs int + // ConcurrentServiceExportSyncs is the number of resource workers. + ConcurrentServiceExportSyncs int } // NewOptions builds an default scheduler options. @@ -97,4 +118,12 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string) { fs.StringVar(&o.ClusterAPIEndpoint, "cluster-api-endpoint", o.ClusterAPIEndpoint, "APIEndpoint of the cluster.") fs.StringVar(&o.ProxyServerAddress, "proxy-server-address", o.ProxyServerAddress, "Address of the proxy server that is used to proxy to the cluster.") fs.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.") + fs.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)") + fs.IntVar(&o.ConcurrentClusterAPISyncs, "concurrent-clusterapi-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)") + fs.IntVar(&o.ConcurrentClusterResourceBindingSyncs, "concurrent-clusterresourcebinding-syncs", 5, "The number of clusterresourcebinding objects that are allowed to sync concurrently. (default 5)") + fs.IntVar(&o.ConcurrentResourceBindingSyncs, "concurrent-resourcebinding-syncs", 5, "The number of resourcebinding objects that are allowed to sync concurrently. (default 5)") + fs.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of work objects that are allowed to sync concurrently. (default 5)") + fs.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 5, "The number of namespace objects that are allowed to sync concurrently. (default 5)") + fs.IntVar(&o.ConcurrentDetectorSyncs, "concurrent-resource-template-syncs", 5, "The number of resource template workers that are allowed to sync concurrently. (default 5)") + fs.IntVar(&o.ConcurrentServiceExportSyncs, "concurrent-serviceexport-syncs", 5, "The number of serviceexport workers that are allowed to sync concurrently. (default 5)") } diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 97d2dadeb..add78a54b 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -8,18 +8,22 @@ import ( "strconv" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/karmada-io/karmada/cmd/controller-manager/app/options" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/clusterdiscovery/clusterapi" "github.com/karmada-io/karmada/pkg/controllers/binding" "github.com/karmada-io/karmada/pkg/controllers/cluster" @@ -88,6 +92,15 @@ func Run(ctx context.Context, opts *options.Options) error { HealthProbeBindAddress: net.JoinHostPort(opts.BindAddress, strconv.Itoa(opts.SecurePort)), LivenessEndpointName: "/healthz", MetricsBindAddress: opts.MetricsBindAddress, + Controller: v1alpha1.ControllerConfigurationSpec{ + GroupKindConcurrency: map[string]int{ + workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs, + workv1alpha1.SchemeGroupVersion.WithKind("ResourceBinding").GroupKind().String(): opts.ConcurrentResourceBindingSyncs, + workv1alpha1.SchemeGroupVersion.WithKind("ClusterResourceBinding").GroupKind().String(): opts.ConcurrentClusterResourceBindingSyncs, + clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs, + schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}.GroupKind().String(): opts.ConcurrentNamespaceSyncs, + }, + }, }) if err != nil { klog.Errorf("failed to build controller manager: %v", err) @@ -247,19 +260,20 @@ func startExecutionController(ctx controllerscontext.Context) (enabled bool, err func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, err error) { opts := ctx.Opts workStatusController := &status.WorkStatusController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - InformerManager: informermanager.GetInstance(), - StopChan: ctx.StopChan, - WorkerNumber: 1, - ObjectWatcher: ctx.ObjectWatcher, - PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), - ClusterClientSetFunc: util.NewClusterDynamicClientSet, - ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: informermanager.GetInstance(), + StopChan: ctx.StopChan, + ObjectWatcher: ctx.ObjectWatcher, + PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), + ClusterClientSetFunc: util.NewClusterDynamicClientSet, + ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, + ConcurrentWorkReconciles: opts.ConcurrentWorkReconciles, } workStatusController.RunWorkQueue() if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { + klog.Fatalf("Failed to setup work status controller: %v", err) return false, err } return true, nil @@ -289,7 +303,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool, RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: informermanager.GetInstance(), StopChan: ctx.StopChan, - WorkerNumber: 1, + WorkerNumber: ctx.Opts.ConcurrentServiceExportReconciles, PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, @@ -364,15 +378,17 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter) resourceDetector := &detector.ResourceDetector{ - DiscoveryClientSet: discoverClientSet, - Client: mgr.GetClient(), - InformerManager: controlPlaneInformerManager, - RESTMapper: mgr.GetRESTMapper(), - DynamicClient: dynamicClientSet, - SkippedResourceConfig: skippedResourceConfig, - SkippedPropagatingNamespaces: skippedPropagatingNamespaces, - ResourceInterpreter: resourceInterpreter, - EventRecorder: mgr.GetEventRecorderFor("resource-detector"), + DiscoveryClientSet: discoverClientSet, + Client: mgr.GetClient(), + InformerManager: controlPlaneInformerManager, + RESTMapper: mgr.GetRESTMapper(), + DynamicClient: dynamicClientSet, + SkippedResourceConfig: skippedResourceConfig, + SkippedPropagatingNamespaces: skippedPropagatingNamespaces, + ResourceInterpreter: resourceInterpreter, + EventRecorder: mgr.GetEventRecorderFor("resource-detector"), + ConcurrentResourceTemplateSyncs: opts.ConcurrentDetectorSyncs, + ConcurrentResourceBindingSyncs: opts.ConcurrentResourceBindingSyncs, } if err := mgr.Add(resourceDetector); err != nil { klog.Fatalf("Failed to setup resource detector: %v", err) @@ -394,6 +410,8 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop ClusterAPIQPS: opts.ClusterAPIQPS, ClusterAPIBurst: opts.ClusterAPIBurst, SkippedPropagatingNamespaces: opts.SkippedPropagatingNamespaces, + ConcurrentWorkReconciles: opts.ConcurrentWorkSyncs, + ConcurrentServiceExportReconciles: opts.ConcurrentServiceExportSyncs, }, StopChan: stopChan, DynamicClientSet: dynamicClientSet, @@ -438,6 +456,7 @@ func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options ClusterAPIConfig: clusterAPIRestConfig, ClusterAPIClient: clusterAPIClient, InformerManager: informermanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0, stopChan), + ConcurrentReconciles: opts.ConcurrentClusterAPISyncs, } if err := mgr.Add(clusterAPIClusterDetector); err != nil { klog.Fatalf("Failed to setup cluster-api cluster detector: %v", err) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index bdb4482c1..ac0227631 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -79,6 +79,27 @@ type Options struct { // It can be set to "0" to disable the metrics serving. // Defaults to ":8080". MetricsBindAddress string + // concurrentClusterSyncs is the number of cluster objects that are + // allowed to sync concurrently. + ConcurrentClusterSyncs int + // concurrentClusterAPISyncs is the number of clusterapi controller workers. + ConcurrentClusterAPISyncs int + // ConcurrentClusterResourceBindingSyncs is the number of clusterresourcebinding objects that are + // allowed to sync concurrently. + ConcurrentClusterResourceBindingSyncs int + // ConcurrentWorkSyncs is the number of work objects that are + // allowed to sync concurrently. + ConcurrentWorkSyncs int + // ConcurrentResourceBindingSyncs is the number of resourcebinding objects that are + // allowed to sync concurrently. + ConcurrentResourceBindingSyncs int + // ConcurrentNamespaceSyncs is the number of name objects that are + // allowed to sync concurrently. + ConcurrentNamespaceSyncs int + // ConcurrentDetectorSyncs is the number of detector workers. + ConcurrentDetectorSyncs int + // ConcurrentServiceExportSyncs is the number of resource workers. + ConcurrentServiceExportSyncs int } // NewOptions builds an empty options. @@ -134,4 +155,12 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string) { flags.DurationVar(&o.ClusterCacheSyncTimeout.Duration, "cluster-cache-sync-timeout", util.CacheSyncTimeout, "Timeout period waiting for cluster cache to sync.") flags.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.") flags.StringVar(&o.MetricsBindAddress, "metrics-bind-address", ":8080", "The TCP address that the controller should bind to for serving prometheus metrics(e.g. 127.0.0.1:8088, :8088)") + flags.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)") + flags.IntVar(&o.ConcurrentClusterAPISyncs, "concurrent-clusterapi-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)") + flags.IntVar(&o.ConcurrentClusterResourceBindingSyncs, "concurrent-clusterresourcebinding-syncs", 5, "The number of clusterresourcebinding objects that are allowed to sync concurrently. (default 5)") + flags.IntVar(&o.ConcurrentResourceBindingSyncs, "concurrent-resourcebinding-syncs", 5, "The number of resourcebinding objects that are allowed to sync concurrently. (default 5)") + flags.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of work objects that are allowed to sync concurrently. (default 5)") + flags.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 5, "The number of namespace objects that are allowed to sync concurrently. (default 5)") + flags.IntVar(&o.ConcurrentDetectorSyncs, "concurrent-resource-template-syncs", 5, "The number of resource template workers that are allowed to sync concurrently. (default 5)") + flags.IntVar(&o.ConcurrentServiceExportSyncs, "concurrent-serviceexport-syncs", 5, "The number of serviceexport workers that are allowed to sync concurrently. (default 5)") } diff --git a/pkg/clusterdiscovery/clusterapi/clusterapi.go b/pkg/clusterdiscovery/clusterapi/clusterapi.go index 2c97dc262..36886f09d 100644 --- a/pkg/clusterdiscovery/clusterapi/clusterapi.go +++ b/pkg/clusterdiscovery/clusterapi/clusterapi.go @@ -47,6 +47,7 @@ type ClusterDetector struct { InformerManager informermanager.SingleClusterInformerManager EventHandler cache.ResourceEventHandler Processor util.AsyncWorker + ConcurrentReconciles int stopCh <-chan struct{} } @@ -58,7 +59,7 @@ func (d *ClusterDetector) Start(ctx context.Context) error { d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) d.Processor = util.NewAsyncWorker("cluster-api cluster detector", ClusterWideKeyFunc, d.Reconcile) - d.Processor.Run(1, d.stopCh) + d.Processor.Run(d.ConcurrentReconciles, d.stopCh) d.discoveryCluster() <-d.stopCh diff --git a/pkg/controllers/context/context.go b/pkg/controllers/context/context.go index 56e077623..6b15bca12 100644 --- a/pkg/controllers/context/context.go +++ b/pkg/controllers/context/context.go @@ -47,6 +47,10 @@ type Options struct { SkippedPropagatingNamespaces []string // ClusterName is the name of cluster. ClusterName string + // ConcurrentWorkReconciles is the number of workstatus workers. + ConcurrentWorkReconciles int + // ConcurrentServiceExportReconciles is the number of resource workers. + ConcurrentServiceExportReconciles int } // Context defines the context object for controller. diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index 7851c8a84..b16088ac2 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -36,19 +36,18 @@ const WorkStatusControllerName = "work-status-controller" // WorkStatusController is to sync status of Work. type WorkStatusController struct { - client.Client // used to operate Work resources. - EventRecorder record.EventRecorder - RESTMapper meta.RESTMapper - InformerManager informermanager.MultiClusterInformerManager - eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. - StopChan <-chan struct{} - WorkerNumber int // WorkerNumber is the number of worker goroutines - worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. - ObjectWatcher objectwatcher.ObjectWatcher - PredicateFunc predicate.Predicate - ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) - - ClusterCacheSyncTimeout metav1.Duration + client.Client // used to operate Work resources. + EventRecorder record.EventRecorder + RESTMapper meta.RESTMapper + InformerManager informermanager.MultiClusterInformerManager + eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. + StopChan <-chan struct{} + worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. + ObjectWatcher objectwatcher.ObjectWatcher + PredicateFunc predicate.Predicate + ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + ConcurrentWorkReconciles int + ClusterCacheSyncTimeout metav1.Duration } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -117,7 +116,7 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *WorkStatusController) RunWorkQueue() { c.worker = util.NewAsyncWorker("work-status", generateKey, c.syncWorkStatus) - c.worker.Run(c.WorkerNumber, c.StopChan) + c.worker.Run(c.ConcurrentWorkReconciles, c.StopChan) } // generateKey generates a key from obj, the key contains cluster, GVK, namespace and name. diff --git a/pkg/controllers/unifiedauth/unified_auth_controller.go b/pkg/controllers/unifiedauth/unified_auth_controller.go index 0618e34fd..25ad16eea 100644 --- a/pkg/controllers/unifiedauth/unified_auth_controller.go +++ b/pkg/controllers/unifiedauth/unified_auth_controller.go @@ -244,7 +244,8 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { return utilerrors.NewAggregate([]error{ controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(clusterPredicateFunc). Watches(&source.Kind{Type: &rbacv1.ClusterRole{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleMapFunc())). - Watches(&source.Kind{Type: &rbacv1.ClusterRoleBinding{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleBindingMapFunc())).Complete(c), + Watches(&source.Kind{Type: &rbacv1.ClusterRoleBinding{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleBindingMapFunc())). + Complete(c), mgr.Add(c), }) } diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index ba3b0e130..631818c5b 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -76,6 +76,12 @@ type ResourceDetector struct { waitingObjects map[keys.ClusterWideKey]struct{} // waitingLock is the lock for waitingObjects operation. waitingLock sync.RWMutex + // ConcurrentResourceTemplateSyncs is the number of resource templates that are allowed to sync concurrently. + // Larger number means responsive resource template syncing but more CPU(and network) load. + ConcurrentResourceTemplateSyncs int + // ConcurrentResourceBindingSyncs is the number of ResourceBinding that are allowed to sync concurrently. + // Larger number means responsive resource template syncing but more CPU(and network) load. + ConcurrentResourceBindingSyncs int stopCh <-chan struct{} } @@ -114,7 +120,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { // setup binding reconcile worker d.bindingReconcileWorker = util.NewAsyncWorker("resourceBinding reconciler", ClusterWideKeyFunc, d.ReconcileResourceBinding) - d.bindingReconcileWorker.Run(1, d.stopCh) + d.bindingReconcileWorker.Run(d.ConcurrentResourceBindingSyncs, d.stopCh) // watch and enqueue ResourceBinding changes. resourceBindingGVR := schema.GroupVersionResource{ @@ -137,7 +143,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete) d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile) - d.Processor.Run(1, d.stopCh) + d.Processor.Run(d.ConcurrentResourceTemplateSyncs, d.stopCh) go d.discoverResources(30 * time.Second) <-d.stopCh From 3f0aa799481c07ab91f6e2c25c69b0909382ba74 Mon Sep 17 00:00:00 2001 From: RainbowMango Date: Thu, 24 Feb 2022 17:45:24 +0800 Subject: [PATCH 2/2] Fix comments from myself. Signed-off-by: RainbowMango --- cmd/agent/app/agent.go | 25 +++++++------- cmd/agent/app/options/options.go | 25 ++------------ .../app/controllermanager.go | 34 +++++++++---------- cmd/controller-manager/app/options/options.go | 26 ++++++-------- pkg/controllers/context/context.go | 6 ++-- .../status/workstatus_controller.go | 27 ++++++++------- 6 files changed, 57 insertions(+), 86 deletions(-) diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 99f7e0ac8..6aaa6bc0f 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -173,8 +173,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, ClusterAPIQPS: opts.ClusterAPIQPS, ClusterAPIBurst: opts.ClusterAPIBurst, - ConcurrentWorkReconciles: opts.ConcurrentWorkSyncs, - ConcurrentServiceExportReconciles: opts.ConcurrentServiceExportSyncs, + ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs, }, StopChan: stopChan, } @@ -232,16 +231,16 @@ func startExecutionController(ctx controllerscontext.Context) (bool, error) { func startWorkStatusController(ctx controllerscontext.Context) (bool, error) { workStatusController := &status.WorkStatusController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - InformerManager: informermanager.GetInstance(), - StopChan: ctx.StopChan, - ObjectWatcher: ctx.ObjectWatcher, - PredicateFunc: helper.NewExecutionPredicateOnAgent(), - ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, - ConcurrentWorkReconciles: ctx.Opts.ConcurrentWorkReconciles, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: informermanager.GetInstance(), + StopChan: ctx.StopChan, + ObjectWatcher: ctx.ObjectWatcher, + PredicateFunc: helper.NewExecutionPredicateOnAgent(), + ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, + ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, + ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs, } workStatusController.RunWorkQueue() if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { @@ -257,7 +256,7 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error) RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: informermanager.GetInstance(), StopChan: ctx.StopChan, - WorkerNumber: ctx.Opts.ConcurrentServiceExportReconciles, + WorkerNumber: 3, PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(ctx.Opts.ClusterName), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index f446def0b..7a93cef62 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -58,24 +58,9 @@ type Options struct { // concurrentClusterSyncs is the number of cluster objects that are // allowed to sync concurrently. ConcurrentClusterSyncs int - // concurrentClusterAPISyncs is the number of clusterapi controller workers. - ConcurrentClusterAPISyncs int - // ConcurrentClusterResourceBindingSyncs is the number of clusterresourcebinding objects that are - // allowed to sync concurrently. - ConcurrentClusterResourceBindingSyncs int // ConcurrentWorkSyncs is the number of work objects that are // allowed to sync concurrently. ConcurrentWorkSyncs int - // ConcurrentResourceBindingSyncs is the number of resourcebinding objects that are - // allowed to sync concurrently. - ConcurrentResourceBindingSyncs int - // ConcurrentNamespaceSyncs is the number of name objects that are - // allowed to sync concurrently. - ConcurrentNamespaceSyncs int - // ConcurrentDetectorSyncs is the number of detector workers. - ConcurrentDetectorSyncs int - // ConcurrentServiceExportSyncs is the number of resource workers. - ConcurrentServiceExportSyncs int } // NewOptions builds an default scheduler options. @@ -118,12 +103,6 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string) { fs.StringVar(&o.ClusterAPIEndpoint, "cluster-api-endpoint", o.ClusterAPIEndpoint, "APIEndpoint of the cluster.") fs.StringVar(&o.ProxyServerAddress, "proxy-server-address", o.ProxyServerAddress, "Address of the proxy server that is used to proxy to the cluster.") fs.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.") - fs.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)") - fs.IntVar(&o.ConcurrentClusterAPISyncs, "concurrent-clusterapi-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)") - fs.IntVar(&o.ConcurrentClusterResourceBindingSyncs, "concurrent-clusterresourcebinding-syncs", 5, "The number of clusterresourcebinding objects that are allowed to sync concurrently. (default 5)") - fs.IntVar(&o.ConcurrentResourceBindingSyncs, "concurrent-resourcebinding-syncs", 5, "The number of resourcebinding objects that are allowed to sync concurrently. (default 5)") - fs.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of work objects that are allowed to sync concurrently. (default 5)") - fs.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 5, "The number of namespace objects that are allowed to sync concurrently. (default 5)") - fs.IntVar(&o.ConcurrentDetectorSyncs, "concurrent-resource-template-syncs", 5, "The number of resource template workers that are allowed to sync concurrently. (default 5)") - fs.IntVar(&o.ConcurrentServiceExportSyncs, "concurrent-serviceexport-syncs", 5, "The number of serviceexport workers that are allowed to sync concurrently. (default 5)") + fs.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of Clusters that are allowed to sync concurrently.") + fs.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of Works that are allowed to sync concurrently.") } diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index add78a54b..02a56e5a7 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -24,6 +24,7 @@ import ( "github.com/karmada-io/karmada/cmd/controller-manager/app/options" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/clusterdiscovery/clusterapi" "github.com/karmada-io/karmada/pkg/controllers/binding" "github.com/karmada-io/karmada/pkg/controllers/cluster" @@ -95,8 +96,8 @@ func Run(ctx context.Context, opts *options.Options) error { Controller: v1alpha1.ControllerConfigurationSpec{ GroupKindConcurrency: map[string]int{ workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs, - workv1alpha1.SchemeGroupVersion.WithKind("ResourceBinding").GroupKind().String(): opts.ConcurrentResourceBindingSyncs, - workv1alpha1.SchemeGroupVersion.WithKind("ClusterResourceBinding").GroupKind().String(): opts.ConcurrentClusterResourceBindingSyncs, + workv1alpha2.SchemeGroupVersion.WithKind("ResourceBinding").GroupKind().String(): opts.ConcurrentResourceBindingSyncs, + workv1alpha2.SchemeGroupVersion.WithKind("ClusterResourceBinding").GroupKind().String(): opts.ConcurrentClusterResourceBindingSyncs, clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs, schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}.GroupKind().String(): opts.ConcurrentNamespaceSyncs, }, @@ -260,16 +261,16 @@ func startExecutionController(ctx controllerscontext.Context) (enabled bool, err func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, err error) { opts := ctx.Opts workStatusController := &status.WorkStatusController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - InformerManager: informermanager.GetInstance(), - StopChan: ctx.StopChan, - ObjectWatcher: ctx.ObjectWatcher, - PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), - ClusterClientSetFunc: util.NewClusterDynamicClientSet, - ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, - ConcurrentWorkReconciles: opts.ConcurrentWorkReconciles, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: informermanager.GetInstance(), + StopChan: ctx.StopChan, + ObjectWatcher: ctx.ObjectWatcher, + PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), + ClusterClientSetFunc: util.NewClusterDynamicClientSet, + ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, + ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs, } workStatusController.RunWorkQueue() if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { @@ -303,7 +304,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool, RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: informermanager.GetInstance(), StopChan: ctx.StopChan, - WorkerNumber: ctx.Opts.ConcurrentServiceExportReconciles, + WorkerNumber: 3, PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, @@ -387,7 +388,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop SkippedPropagatingNamespaces: skippedPropagatingNamespaces, ResourceInterpreter: resourceInterpreter, EventRecorder: mgr.GetEventRecorderFor("resource-detector"), - ConcurrentResourceTemplateSyncs: opts.ConcurrentDetectorSyncs, + ConcurrentResourceTemplateSyncs: opts.ConcurrentResourceTemplateSyncs, ConcurrentResourceBindingSyncs: opts.ConcurrentResourceBindingSyncs, } if err := mgr.Add(resourceDetector); err != nil { @@ -410,8 +411,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop ClusterAPIQPS: opts.ClusterAPIQPS, ClusterAPIBurst: opts.ClusterAPIBurst, SkippedPropagatingNamespaces: opts.SkippedPropagatingNamespaces, - ConcurrentWorkReconciles: opts.ConcurrentWorkSyncs, - ConcurrentServiceExportReconciles: opts.ConcurrentServiceExportSyncs, + ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs, }, StopChan: stopChan, DynamicClientSet: dynamicClientSet, @@ -456,7 +456,7 @@ func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options ClusterAPIConfig: clusterAPIRestConfig, ClusterAPIClient: clusterAPIClient, InformerManager: informermanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0, stopChan), - ConcurrentReconciles: opts.ConcurrentClusterAPISyncs, + ConcurrentReconciles: 3, } if err := mgr.Add(clusterAPIClusterDetector); err != nil { klog.Fatalf("Failed to setup cluster-api cluster detector: %v", err) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index ac0227631..64d5eba49 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -82,24 +82,20 @@ type Options struct { // concurrentClusterSyncs is the number of cluster objects that are // allowed to sync concurrently. ConcurrentClusterSyncs int - // concurrentClusterAPISyncs is the number of clusterapi controller workers. - ConcurrentClusterAPISyncs int // ConcurrentClusterResourceBindingSyncs is the number of clusterresourcebinding objects that are // allowed to sync concurrently. ConcurrentClusterResourceBindingSyncs int - // ConcurrentWorkSyncs is the number of work objects that are + // ConcurrentWorkSyncs is the number of Work objects that are // allowed to sync concurrently. ConcurrentWorkSyncs int // ConcurrentResourceBindingSyncs is the number of resourcebinding objects that are // allowed to sync concurrently. ConcurrentResourceBindingSyncs int - // ConcurrentNamespaceSyncs is the number of name objects that are + // ConcurrentNamespaceSyncs is the number of Namespace objects that are // allowed to sync concurrently. ConcurrentNamespaceSyncs int - // ConcurrentDetectorSyncs is the number of detector workers. - ConcurrentDetectorSyncs int - // ConcurrentServiceExportSyncs is the number of resource workers. - ConcurrentServiceExportSyncs int + // ConcurrentResourceTemplateSyncs is the number of resource templates that are allowed to sync concurrently. + ConcurrentResourceTemplateSyncs int } // NewOptions builds an empty options. @@ -155,12 +151,10 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string) { flags.DurationVar(&o.ClusterCacheSyncTimeout.Duration, "cluster-cache-sync-timeout", util.CacheSyncTimeout, "Timeout period waiting for cluster cache to sync.") flags.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.") flags.StringVar(&o.MetricsBindAddress, "metrics-bind-address", ":8080", "The TCP address that the controller should bind to for serving prometheus metrics(e.g. 127.0.0.1:8088, :8088)") - flags.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)") - flags.IntVar(&o.ConcurrentClusterAPISyncs, "concurrent-clusterapi-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)") - flags.IntVar(&o.ConcurrentClusterResourceBindingSyncs, "concurrent-clusterresourcebinding-syncs", 5, "The number of clusterresourcebinding objects that are allowed to sync concurrently. (default 5)") - flags.IntVar(&o.ConcurrentResourceBindingSyncs, "concurrent-resourcebinding-syncs", 5, "The number of resourcebinding objects that are allowed to sync concurrently. (default 5)") - flags.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of work objects that are allowed to sync concurrently. (default 5)") - flags.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 5, "The number of namespace objects that are allowed to sync concurrently. (default 5)") - flags.IntVar(&o.ConcurrentDetectorSyncs, "concurrent-resource-template-syncs", 5, "The number of resource template workers that are allowed to sync concurrently. (default 5)") - flags.IntVar(&o.ConcurrentServiceExportSyncs, "concurrent-serviceexport-syncs", 5, "The number of serviceexport workers that are allowed to sync concurrently. (default 5)") + flags.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of Clusters that are allowed to sync concurrently.") + flags.IntVar(&o.ConcurrentClusterResourceBindingSyncs, "concurrent-clusterresourcebinding-syncs", 5, "The number of ClusterResourceBindings that are allowed to sync concurrently.") + flags.IntVar(&o.ConcurrentResourceBindingSyncs, "concurrent-resourcebinding-syncs", 5, "The number of ResourceBindings that are allowed to sync concurrently.") + flags.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of Works that are allowed to sync concurrently.") + flags.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 1, "The number of Namespaces that are allowed to sync concurrently.") + flags.IntVar(&o.ConcurrentResourceTemplateSyncs, "concurrent-resource-template-syncs", 5, "The number of resource templates that are allowed to sync concurrently.") } diff --git a/pkg/controllers/context/context.go b/pkg/controllers/context/context.go index 6b15bca12..aff92ad61 100644 --- a/pkg/controllers/context/context.go +++ b/pkg/controllers/context/context.go @@ -47,10 +47,8 @@ type Options struct { SkippedPropagatingNamespaces []string // ClusterName is the name of cluster. ClusterName string - // ConcurrentWorkReconciles is the number of workstatus workers. - ConcurrentWorkReconciles int - // ConcurrentServiceExportReconciles is the number of resource workers. - ConcurrentServiceExportReconciles int + // ConcurrentWorkSyncs is the number of Works that are allowed to sync concurrently. + ConcurrentWorkSyncs int } // Context defines the context object for controller. diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index b16088ac2..383f85a13 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -36,18 +36,19 @@ const WorkStatusControllerName = "work-status-controller" // WorkStatusController is to sync status of Work. type WorkStatusController struct { - client.Client // used to operate Work resources. - EventRecorder record.EventRecorder - RESTMapper meta.RESTMapper - InformerManager informermanager.MultiClusterInformerManager - eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. - StopChan <-chan struct{} - worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. - ObjectWatcher objectwatcher.ObjectWatcher - PredicateFunc predicate.Predicate - ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) - ConcurrentWorkReconciles int - ClusterCacheSyncTimeout metav1.Duration + client.Client // used to operate Work resources. + EventRecorder record.EventRecorder + RESTMapper meta.RESTMapper + InformerManager informermanager.MultiClusterInformerManager + eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. + StopChan <-chan struct{} + worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. + // ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently. + ConcurrentWorkStatusSyncs int + ObjectWatcher objectwatcher.ObjectWatcher + PredicateFunc predicate.Predicate + ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + ClusterCacheSyncTimeout metav1.Duration } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -116,7 +117,7 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *WorkStatusController) RunWorkQueue() { c.worker = util.NewAsyncWorker("work-status", generateKey, c.syncWorkStatus) - c.worker.Run(c.ConcurrentWorkReconciles, c.StopChan) + c.worker.Run(c.ConcurrentWorkStatusSyncs, c.StopChan) } // generateKey generates a key from obj, the key contains cluster, GVK, namespace and name.