diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 68d8572b9..6aaa6bc0f 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -15,9 +15,11 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "github.com/karmada-io/karmada/cmd/agent/app/options" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context" "github.com/karmada-io/karmada/pkg/controllers/execution" "github.com/karmada-io/karmada/pkg/controllers/mcs" @@ -126,6 +128,12 @@ func run(ctx context.Context, karmadaConfig karmadactl.KarmadaConfig, opts *opti LeaderElectionID: fmt.Sprintf("karmada-agent-%s", opts.ClusterName), LeaderElectionNamespace: opts.LeaderElection.ResourceNamespace, LeaderElectionResourceLock: opts.LeaderElection.ResourceLock, + Controller: v1alpha1.ControllerConfigurationSpec{ + GroupKindConcurrency: map[string]int{ + workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs, + clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs, + }, + }, }) if err != nil { return fmt.Errorf("failed to build controller manager: %w", err) @@ -165,6 +173,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, ClusterAPIQPS: opts.ClusterAPIQPS, ClusterAPIBurst: opts.ClusterAPIBurst, + ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs, }, StopChan: stopChan, } @@ -222,16 +231,16 @@ func startExecutionController(ctx controllerscontext.Context) (bool, error) { func startWorkStatusController(ctx controllerscontext.Context) (bool, error) { workStatusController := &status.WorkStatusController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - InformerManager: informermanager.GetInstance(), - StopChan: ctx.StopChan, - WorkerNumber: 1, - ObjectWatcher: ctx.ObjectWatcher, - PredicateFunc: helper.NewExecutionPredicateOnAgent(), - ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: informermanager.GetInstance(), + StopChan: ctx.StopChan, + ObjectWatcher: ctx.ObjectWatcher, + PredicateFunc: helper.NewExecutionPredicateOnAgent(), + ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, + ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, + ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs, } workStatusController.RunWorkQueue() if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { @@ -247,7 +256,7 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error) RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: informermanager.GetInstance(), StopChan: ctx.StopChan, - WorkerNumber: 1, + WorkerNumber: 3, PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(ctx.Opts.ClusterName), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index 092f0f321..7a93cef62 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -55,6 +55,12 @@ type Options struct { ClusterAPIEndpoint string // ProxyServerAddress holds the proxy server address that is used to proxy to the cluster. ProxyServerAddress string + // concurrentClusterSyncs is the number of cluster objects that are + // allowed to sync concurrently. + ConcurrentClusterSyncs int + // ConcurrentWorkSyncs is the number of work objects that are + // allowed to sync concurrently. + ConcurrentWorkSyncs int } // NewOptions builds an default scheduler options. @@ -97,4 +103,6 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string) { fs.StringVar(&o.ClusterAPIEndpoint, "cluster-api-endpoint", o.ClusterAPIEndpoint, "APIEndpoint of the cluster.") fs.StringVar(&o.ProxyServerAddress, "proxy-server-address", o.ProxyServerAddress, "Address of the proxy server that is used to proxy to the cluster.") fs.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.") + fs.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of Clusters that are allowed to sync concurrently.") + fs.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of Works that are allowed to sync concurrently.") } diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 97d2dadeb..02a56e5a7 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -8,18 +8,23 @@ import ( "strconv" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/karmada-io/karmada/cmd/controller-manager/app/options" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/clusterdiscovery/clusterapi" "github.com/karmada-io/karmada/pkg/controllers/binding" "github.com/karmada-io/karmada/pkg/controllers/cluster" @@ -88,6 +93,15 @@ func Run(ctx context.Context, opts *options.Options) error { HealthProbeBindAddress: net.JoinHostPort(opts.BindAddress, strconv.Itoa(opts.SecurePort)), LivenessEndpointName: "/healthz", MetricsBindAddress: opts.MetricsBindAddress, + Controller: v1alpha1.ControllerConfigurationSpec{ + GroupKindConcurrency: map[string]int{ + workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs, + workv1alpha2.SchemeGroupVersion.WithKind("ResourceBinding").GroupKind().String(): opts.ConcurrentResourceBindingSyncs, + workv1alpha2.SchemeGroupVersion.WithKind("ClusterResourceBinding").GroupKind().String(): opts.ConcurrentClusterResourceBindingSyncs, + clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs, + schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}.GroupKind().String(): opts.ConcurrentNamespaceSyncs, + }, + }, }) if err != nil { klog.Errorf("failed to build controller manager: %v", err) @@ -247,19 +261,20 @@ func startExecutionController(ctx controllerscontext.Context) (enabled bool, err func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, err error) { opts := ctx.Opts workStatusController := &status.WorkStatusController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - InformerManager: informermanager.GetInstance(), - StopChan: ctx.StopChan, - WorkerNumber: 1, - ObjectWatcher: ctx.ObjectWatcher, - PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), - ClusterClientSetFunc: util.NewClusterDynamicClientSet, - ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: informermanager.GetInstance(), + StopChan: ctx.StopChan, + ObjectWatcher: ctx.ObjectWatcher, + PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), + ClusterClientSetFunc: util.NewClusterDynamicClientSet, + ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, + ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs, } workStatusController.RunWorkQueue() if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { + klog.Fatalf("Failed to setup work status controller: %v", err) return false, err } return true, nil @@ -289,7 +304,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool, RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: informermanager.GetInstance(), StopChan: ctx.StopChan, - WorkerNumber: 1, + WorkerNumber: 3, PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, @@ -364,15 +379,17 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter) resourceDetector := &detector.ResourceDetector{ - DiscoveryClientSet: discoverClientSet, - Client: mgr.GetClient(), - InformerManager: controlPlaneInformerManager, - RESTMapper: mgr.GetRESTMapper(), - DynamicClient: dynamicClientSet, - SkippedResourceConfig: skippedResourceConfig, - SkippedPropagatingNamespaces: skippedPropagatingNamespaces, - ResourceInterpreter: resourceInterpreter, - EventRecorder: mgr.GetEventRecorderFor("resource-detector"), + DiscoveryClientSet: discoverClientSet, + Client: mgr.GetClient(), + InformerManager: controlPlaneInformerManager, + RESTMapper: mgr.GetRESTMapper(), + DynamicClient: dynamicClientSet, + SkippedResourceConfig: skippedResourceConfig, + SkippedPropagatingNamespaces: skippedPropagatingNamespaces, + ResourceInterpreter: resourceInterpreter, + EventRecorder: mgr.GetEventRecorderFor("resource-detector"), + ConcurrentResourceTemplateSyncs: opts.ConcurrentResourceTemplateSyncs, + ConcurrentResourceBindingSyncs: opts.ConcurrentResourceBindingSyncs, } if err := mgr.Add(resourceDetector); err != nil { klog.Fatalf("Failed to setup resource detector: %v", err) @@ -394,6 +411,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop ClusterAPIQPS: opts.ClusterAPIQPS, ClusterAPIBurst: opts.ClusterAPIBurst, SkippedPropagatingNamespaces: opts.SkippedPropagatingNamespaces, + ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs, }, StopChan: stopChan, DynamicClientSet: dynamicClientSet, @@ -438,6 +456,7 @@ func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options ClusterAPIConfig: clusterAPIRestConfig, ClusterAPIClient: clusterAPIClient, InformerManager: informermanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0, stopChan), + ConcurrentReconciles: 3, } if err := mgr.Add(clusterAPIClusterDetector); err != nil { klog.Fatalf("Failed to setup cluster-api cluster detector: %v", err) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index bdb4482c1..64d5eba49 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -79,6 +79,23 @@ type Options struct { // It can be set to "0" to disable the metrics serving. // Defaults to ":8080". MetricsBindAddress string + // concurrentClusterSyncs is the number of cluster objects that are + // allowed to sync concurrently. + ConcurrentClusterSyncs int + // ConcurrentClusterResourceBindingSyncs is the number of clusterresourcebinding objects that are + // allowed to sync concurrently. + ConcurrentClusterResourceBindingSyncs int + // ConcurrentWorkSyncs is the number of Work objects that are + // allowed to sync concurrently. + ConcurrentWorkSyncs int + // ConcurrentResourceBindingSyncs is the number of resourcebinding objects that are + // allowed to sync concurrently. + ConcurrentResourceBindingSyncs int + // ConcurrentNamespaceSyncs is the number of Namespace objects that are + // allowed to sync concurrently. + ConcurrentNamespaceSyncs int + // ConcurrentResourceTemplateSyncs is the number of resource templates that are allowed to sync concurrently. + ConcurrentResourceTemplateSyncs int } // NewOptions builds an empty options. @@ -134,4 +151,10 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string) { flags.DurationVar(&o.ClusterCacheSyncTimeout.Duration, "cluster-cache-sync-timeout", util.CacheSyncTimeout, "Timeout period waiting for cluster cache to sync.") flags.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.") flags.StringVar(&o.MetricsBindAddress, "metrics-bind-address", ":8080", "The TCP address that the controller should bind to for serving prometheus metrics(e.g. 127.0.0.1:8088, :8088)") + flags.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of Clusters that are allowed to sync concurrently.") + flags.IntVar(&o.ConcurrentClusterResourceBindingSyncs, "concurrent-clusterresourcebinding-syncs", 5, "The number of ClusterResourceBindings that are allowed to sync concurrently.") + flags.IntVar(&o.ConcurrentResourceBindingSyncs, "concurrent-resourcebinding-syncs", 5, "The number of ResourceBindings that are allowed to sync concurrently.") + flags.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of Works that are allowed to sync concurrently.") + flags.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 1, "The number of Namespaces that are allowed to sync concurrently.") + flags.IntVar(&o.ConcurrentResourceTemplateSyncs, "concurrent-resource-template-syncs", 5, "The number of resource templates that are allowed to sync concurrently.") } diff --git a/pkg/clusterdiscovery/clusterapi/clusterapi.go b/pkg/clusterdiscovery/clusterapi/clusterapi.go index 2c97dc262..36886f09d 100644 --- a/pkg/clusterdiscovery/clusterapi/clusterapi.go +++ b/pkg/clusterdiscovery/clusterapi/clusterapi.go @@ -47,6 +47,7 @@ type ClusterDetector struct { InformerManager informermanager.SingleClusterInformerManager EventHandler cache.ResourceEventHandler Processor util.AsyncWorker + ConcurrentReconciles int stopCh <-chan struct{} } @@ -58,7 +59,7 @@ func (d *ClusterDetector) Start(ctx context.Context) error { d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) d.Processor = util.NewAsyncWorker("cluster-api cluster detector", ClusterWideKeyFunc, d.Reconcile) - d.Processor.Run(1, d.stopCh) + d.Processor.Run(d.ConcurrentReconciles, d.stopCh) d.discoveryCluster() <-d.stopCh diff --git a/pkg/controllers/context/context.go b/pkg/controllers/context/context.go index 56e077623..aff92ad61 100644 --- a/pkg/controllers/context/context.go +++ b/pkg/controllers/context/context.go @@ -47,6 +47,8 @@ type Options struct { SkippedPropagatingNamespaces []string // ClusterName is the name of cluster. ClusterName string + // ConcurrentWorkSyncs is the number of Works that are allowed to sync concurrently. + ConcurrentWorkSyncs int } // Context defines the context object for controller. diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index 0d60faf58..9084f9e7d 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -34,19 +34,19 @@ const WorkStatusControllerName = "work-status-controller" // WorkStatusController is to sync status of Work. type WorkStatusController struct { - client.Client // used to operate Work resources. - EventRecorder record.EventRecorder - RESTMapper meta.RESTMapper - InformerManager informermanager.MultiClusterInformerManager - eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. - StopChan <-chan struct{} - WorkerNumber int // WorkerNumber is the number of worker goroutines - worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. - ObjectWatcher objectwatcher.ObjectWatcher - PredicateFunc predicate.Predicate - ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) - - ClusterCacheSyncTimeout metav1.Duration + client.Client // used to operate Work resources. + EventRecorder record.EventRecorder + RESTMapper meta.RESTMapper + InformerManager informermanager.MultiClusterInformerManager + eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. + StopChan <-chan struct{} + worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. + // ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently. + ConcurrentWorkStatusSyncs int + ObjectWatcher objectwatcher.ObjectWatcher + PredicateFunc predicate.Predicate + ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + ClusterCacheSyncTimeout metav1.Duration } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -115,7 +115,7 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *WorkStatusController) RunWorkQueue() { c.worker = util.NewAsyncWorker("work-status", generateKey, c.syncWorkStatus) - c.worker.Run(c.WorkerNumber, c.StopChan) + c.worker.Run(c.ConcurrentWorkStatusSyncs, c.StopChan) } // generateKey generates a key from obj, the key contains cluster, GVK, namespace and name. diff --git a/pkg/controllers/unifiedauth/unified_auth_controller.go b/pkg/controllers/unifiedauth/unified_auth_controller.go index 0618e34fd..25ad16eea 100644 --- a/pkg/controllers/unifiedauth/unified_auth_controller.go +++ b/pkg/controllers/unifiedauth/unified_auth_controller.go @@ -244,7 +244,8 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { return utilerrors.NewAggregate([]error{ controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(clusterPredicateFunc). Watches(&source.Kind{Type: &rbacv1.ClusterRole{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleMapFunc())). - Watches(&source.Kind{Type: &rbacv1.ClusterRoleBinding{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleBindingMapFunc())).Complete(c), + Watches(&source.Kind{Type: &rbacv1.ClusterRoleBinding{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleBindingMapFunc())). + Complete(c), mgr.Add(c), }) } diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index ba3b0e130..631818c5b 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -76,6 +76,12 @@ type ResourceDetector struct { waitingObjects map[keys.ClusterWideKey]struct{} // waitingLock is the lock for waitingObjects operation. waitingLock sync.RWMutex + // ConcurrentResourceTemplateSyncs is the number of resource templates that are allowed to sync concurrently. + // Larger number means responsive resource template syncing but more CPU(and network) load. + ConcurrentResourceTemplateSyncs int + // ConcurrentResourceBindingSyncs is the number of ResourceBinding that are allowed to sync concurrently. + // Larger number means responsive resource template syncing but more CPU(and network) load. + ConcurrentResourceBindingSyncs int stopCh <-chan struct{} } @@ -114,7 +120,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { // setup binding reconcile worker d.bindingReconcileWorker = util.NewAsyncWorker("resourceBinding reconciler", ClusterWideKeyFunc, d.ReconcileResourceBinding) - d.bindingReconcileWorker.Run(1, d.stopCh) + d.bindingReconcileWorker.Run(d.ConcurrentResourceBindingSyncs, d.stopCh) // watch and enqueue ResourceBinding changes. resourceBindingGVR := schema.GroupVersionResource{ @@ -137,7 +143,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete) d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile) - d.Processor.Run(1, d.stopCh) + d.Processor.Run(d.ConcurrentResourceTemplateSyncs, d.stopCh) go d.discoverResources(30 * time.Second) <-d.stopCh