From 9e28d6c6e80edf8871af9c3d1653797b5cca8b33 Mon Sep 17 00:00:00 2001 From: pigletfly Date: Fri, 25 Feb 2022 16:18:05 +0800 Subject: [PATCH] add ratelimiter flags Signed-off-by: pigletfly --- .../app/controllermanager.go | 15 +++++++- cmd/controller-manager/app/options/options.go | 12 ++++++ go.mod | 2 +- pkg/clusterdiscovery/clusterapi/clusterapi.go | 7 +++- pkg/controllers/binding/binding_controller.go | 6 +++ .../cluster_resource_binding_controller.go | 6 +++ pkg/controllers/context/context.go | 2 + .../execution/execution_controller.go | 6 +++ .../mcs/service_export_controller.go | 7 +++- .../status/cluster_status_controller.go | 7 +++- .../status/workstatus_controller.go | 14 ++++++- .../dependencies_distributor.go | 15 ++++++-- pkg/descheduler/descheduler.go | 15 ++++++-- pkg/detector/detector.go | 31 ++++++++++++++-- pkg/scheduler/scheduler.go | 7 +++- pkg/util/ratelimiter/rate_limiter.go | 37 +++++++++++++++++++ pkg/util/worker.go | 20 ++++++++-- 17 files changed, 187 insertions(+), 22 deletions(-) create mode 100644 pkg/util/ratelimiter/rate_limiter.go diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 95fcb2a24..5cb879634 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -46,6 +46,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/objectwatcher" "github.com/karmada-io/karmada/pkg/util/overridemanager" + "github.com/karmada-io/karmada/pkg/util/ratelimiter" "github.com/karmada-io/karmada/pkg/version" "github.com/karmada-io/karmada/pkg/version/sharedcommand" ) @@ -194,6 +195,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool, ClusterLeaseDuration: opts.ClusterLeaseDuration, ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, + RatelimiterOptions: ctx.Opts.RatelimiterOptions, } if err := clusterStatusController.SetupWithManager(mgr); err != nil { return false, err @@ -224,6 +226,7 @@ func startBindingController(ctx controllerscontext.Context) (enabled bool, err e OverrideManager: ctx.OverrideManager, InformerManager: ctx.ControlPlaneInformerManager, ResourceInterpreter: ctx.ResourceInterpreter, + RatelimiterOptions: ctx.Opts.RatelimiterOptions, } if err := bindingController.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -237,6 +240,7 @@ func startBindingController(ctx controllerscontext.Context) (enabled bool, err e OverrideManager: ctx.OverrideManager, InformerManager: ctx.ControlPlaneInformerManager, ResourceInterpreter: ctx.ResourceInterpreter, + RatelimiterOptions: ctx.Opts.RatelimiterOptions, } if err := clusterResourceBindingController.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -253,6 +257,7 @@ func startExecutionController(ctx controllerscontext.Context) (enabled bool, err PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), InformerManager: informermanager.GetInstance(), ClusterClientSetFunc: util.NewClusterDynamicClientSet, + RatelimiterOption: ctx.Opts.RatelimiterOptions, } if err := executionController.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -273,6 +278,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er ClusterClientSetFunc: util.NewClusterDynamicClientSet, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs, + RatelimiterOptions: ctx.Opts.RatelimiterOptions, } workStatusController.RunWorkQueue() if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { @@ -379,7 +385,12 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop } objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter) - + ratelimiterOptions := ratelimiter.Options{ + BaseDelay: opts.RateLimiterBaseDelay, + MaxDelay: opts.RateLimiterMaxDelay, + QPS: opts.RateLimiterQPS, + BucketSize: opts.RateLimiterBucketSize, + } resourceDetector := &detector.ResourceDetector{ DiscoveryClientSet: discoverClientSet, Client: mgr.GetClient(), @@ -392,6 +403,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop EventRecorder: mgr.GetEventRecorderFor("resource-detector"), ConcurrentResourceTemplateSyncs: opts.ConcurrentResourceTemplateSyncs, ConcurrentResourceBindingSyncs: opts.ConcurrentResourceBindingSyncs, + RatelimiterOptions: ratelimiterOptions, } if err := mgr.Add(resourceDetector); err != nil { klog.Fatalf("Failed to setup resource detector: %v", err) @@ -427,6 +439,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop ClusterAPIBurst: opts.ClusterAPIBurst, SkippedPropagatingNamespaces: opts.SkippedPropagatingNamespaces, ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs, + RatelimiterOptions: ratelimiterOptions, }, StopChan: stopChan, DynamicClientSet: dynamicClientSet, diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index a177dea3b..9787bb756 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -97,6 +97,14 @@ type Options struct { ConcurrentNamespaceSyncs int // ConcurrentResourceTemplateSyncs is the number of resource templates that are allowed to sync concurrently. ConcurrentResourceTemplateSyncs int + // RateLimiterBaseDelay is the base delay for ItemExponentialFailureRateLimiter. + RateLimiterBaseDelay time.Duration + // RateLimiterMaxDelay is the max delay for ItemExponentialFailureRateLimiter. + RateLimiterMaxDelay time.Duration + // RateLimiterQPS is the qps for BucketRateLimiter + RateLimiterQPS int + // RateLimiterBucketSize is the bucket size for BucketRateLimiter + RateLimiterBucketSize int } // NewOptions builds an empty options. @@ -158,5 +166,9 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string) { flags.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of Works that are allowed to sync concurrently.") flags.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 1, "The number of Namespaces that are allowed to sync concurrently.") flags.IntVar(&o.ConcurrentResourceTemplateSyncs, "concurrent-resource-template-syncs", 5, "The number of resource templates that are allowed to sync concurrently.") + flags.DurationVar(&o.RateLimiterBaseDelay, "rate-limiter-base-delay", time.Millisecond*5, "The base delay for rate limiter.") + flags.DurationVar(&o.RateLimiterMaxDelay, "rate-limiter-max-delay", time.Second*1000, "The max delay for rate limiter.") + flags.IntVar(&o.RateLimiterQPS, "rate-limiter-qps", 10, "The qps for rate limier.") + flags.IntVar(&o.RateLimiterBucketSize, "rate-limiter-bucket-size", 100, "The bucket size for rate limier.") features.FeatureGate.AddFlag(flags) } diff --git a/go.mod b/go.mod index e3add8272..5d272dbfe 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/stretchr/testify v1.7.0 github.com/vektra/mockery/v2 v2.9.4 golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b + golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac golang.org/x/tools v0.1.6-0.20210820212750-d4cc65f0b2ff gomodules.xyz/jsonpatch/v2 v2.2.0 google.golang.org/grpc v1.40.0 @@ -143,7 +144,6 @@ require ( golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211029165221-6e7872819dc8 // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 // indirect diff --git a/pkg/clusterdiscovery/clusterapi/clusterapi.go b/pkg/clusterdiscovery/clusterapi/clusterapi.go index 36886f09d..b83dd314d 100644 --- a/pkg/clusterdiscovery/clusterapi/clusterapi.go +++ b/pkg/clusterdiscovery/clusterapi/clusterapi.go @@ -58,7 +58,12 @@ func (d *ClusterDetector) Start(ctx context.Context) error { d.stopCh = ctx.Done() d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) - d.Processor = util.NewAsyncWorker("cluster-api cluster detector", ClusterWideKeyFunc, d.Reconcile) + workerOptions := util.Options{ + Name: "cluster-api cluster detector", + KeyFunc: ClusterWideKeyFunc, + ReconcileFunc: d.Reconcile, + } + d.Processor = util.NewAsyncWorker(workerOptions) d.Processor.Run(d.ConcurrentReconciles, d.stopCh) d.discoveryCluster() diff --git a/pkg/controllers/binding/binding_controller.go b/pkg/controllers/binding/binding_controller.go index a219e8e7b..135ee5326 100644 --- a/pkg/controllers/binding/binding_controller.go +++ b/pkg/controllers/binding/binding_controller.go @@ -15,6 +15,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -28,6 +29,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/overridemanager" + "github.com/karmada-io/karmada/pkg/util/ratelimiter" ) // ControllerName is the controller name that will be used when reporting events. @@ -42,6 +44,7 @@ type ResourceBindingController struct { RESTMapper meta.RESTMapper OverrideManager overridemanager.OverrideManager ResourceInterpreter resourceinterpreter.ResourceInterpreter + RatelimiterOptions ratelimiter.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -169,6 +172,9 @@ func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manag Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn). Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions), + }). Complete(c) } diff --git a/pkg/controllers/binding/cluster_resource_binding_controller.go b/pkg/controllers/binding/cluster_resource_binding_controller.go index 616335e9e..f353052a9 100644 --- a/pkg/controllers/binding/cluster_resource_binding_controller.go +++ b/pkg/controllers/binding/cluster_resource_binding_controller.go @@ -15,6 +15,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -28,6 +29,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/overridemanager" + "github.com/karmada-io/karmada/pkg/util/ratelimiter" ) // ClusterResourceBindingControllerName is the controller name that will be used when reporting events. @@ -42,6 +44,7 @@ type ClusterResourceBindingController struct { RESTMapper meta.RESTMapper OverrideManager overridemanager.OverrideManager ResourceInterpreter resourceinterpreter.ResourceInterpreter + RatelimiterOptions ratelimiter.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -158,6 +161,9 @@ func (c *ClusterResourceBindingController) SetupWithManager(mgr controllerruntim Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn). Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions), + }). Complete(c) } diff --git a/pkg/controllers/context/context.go b/pkg/controllers/context/context.go index aff92ad61..42f9ccd03 100644 --- a/pkg/controllers/context/context.go +++ b/pkg/controllers/context/context.go @@ -11,6 +11,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/objectwatcher" "github.com/karmada-io/karmada/pkg/util/overridemanager" + "github.com/karmada-io/karmada/pkg/util/ratelimiter" ) // Options defines all the parameters required by our controllers. @@ -49,6 +50,7 @@ type Options struct { ClusterName string // ConcurrentWorkSyncs is the number of Works that are allowed to sync concurrently. ConcurrentWorkSyncs int + RatelimiterOptions ratelimiter.Options } // Context defines the context object for controller. diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index b43ccf8e9..a5e3d6622 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -15,6 +15,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -25,6 +26,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/informermanager/keys" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/pkg/util/objectwatcher" + "github.com/karmada-io/karmada/pkg/util/ratelimiter" ) const ( @@ -41,6 +43,7 @@ type Controller struct { PredicateFunc predicate.Predicate InformerManager informermanager.MultiClusterInformerManager ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + RatelimiterOption ratelimiter.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -100,6 +103,9 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { For(&workv1alpha1.Work{}). WithEventFilter(predicate.GenerationChangedPredicate{}). WithEventFilter(c.PredicateFunc). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOption), + }). Complete(c) } diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index 69265c254..b31af1417 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -112,7 +112,12 @@ func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *ServiceExportController) RunWorkQueue() { - c.worker = util.NewAsyncWorker("service-export", nil, c.syncServiceExportOrEndpointSlice) + workerOptions := util.Options{ + Name: "service-export", + KeyFunc: nil, + ReconcileFunc: c.syncServiceExportOrEndpointSlice, + } + c.worker = util.NewAsyncWorker(workerOptions) c.worker.Run(c.WorkerNumber, c.StopChan) } diff --git a/pkg/controllers/status/cluster_status_controller.go b/pkg/controllers/status/cluster_status_controller.go index dffced595..a3f8f6fb1 100644 --- a/pkg/controllers/status/cluster_status_controller.go +++ b/pkg/controllers/status/cluster_status_controller.go @@ -27,6 +27,7 @@ import ( "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -34,6 +35,7 @@ import ( "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/informermanager" + "github.com/karmada-io/karmada/pkg/util/ratelimiter" ) const ( @@ -82,6 +84,7 @@ type ClusterStatusController struct { ClusterLeaseControllers sync.Map ClusterCacheSyncTimeout metav1.Duration + RatelimiterOptions ratelimiter.Options } // Reconcile syncs status of the given member cluster. @@ -113,7 +116,9 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr // SetupWithManager creates a controller and register to controller manager. func (c *ClusterStatusController) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).Complete(c) + return controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{ + RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions), + }).Complete(c) } func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Cluster) (controllerruntime.Result, error) { diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index 9084f9e7d..74f609d2b 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -16,6 +16,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/predicate" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" @@ -26,6 +27,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/informermanager/keys" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/pkg/util/objectwatcher" + "github.com/karmada-io/karmada/pkg/util/ratelimiter" "github.com/karmada-io/karmada/pkg/util/restmapper" ) @@ -47,6 +49,7 @@ type WorkStatusController struct { PredicateFunc predicate.Predicate ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) ClusterCacheSyncTimeout metav1.Duration + RatelimiterOptions ratelimiter.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -114,7 +117,12 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *WorkStatusController) RunWorkQueue() { - c.worker = util.NewAsyncWorker("work-status", generateKey, c.syncWorkStatus) + workerOptions := util.Options{ + Name: "work-status", + KeyFunc: generateKey, + ReconcileFunc: c.syncWorkStatus, + } + c.worker = util.NewAsyncWorker(workerOptions) c.worker.Run(c.ConcurrentWorkStatusSyncs, c.StopChan) } @@ -443,5 +451,7 @@ func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1. // SetupWithManager creates a controller and register to controller manager. func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).Complete(c) + return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{ + RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions), + }).Complete(c) } diff --git a/pkg/dependenciesdistributor/dependencies_distributor.go b/pkg/dependenciesdistributor/dependencies_distributor.go index 77e730297..60462266f 100644 --- a/pkg/dependenciesdistributor/dependencies_distributor.go +++ b/pkg/dependenciesdistributor/dependencies_distributor.go @@ -78,8 +78,13 @@ func (d *DependenciesDistributor) Start(ctx context.Context) error { klog.Infof("Starting dependencies distributor.") d.stopCh = ctx.Done() + bindingWorkerOptions := util.Options{ + Name: "resourceBinding reconciler", + KeyFunc: detector.ClusterWideKeyFunc, + ReconcileFunc: d.ReconcileResourceBinding, + } // setup binding reconcile worker - d.bindingReconcileWorker = util.NewAsyncWorker("resourceBinding reconciler", detector.ClusterWideKeyFunc, d.ReconcileResourceBinding) + d.bindingReconcileWorker = util.NewAsyncWorker(bindingWorkerOptions) d.bindingReconcileWorker.Run(2, d.stopCh) // watch and enqueue ResourceBinding changes. @@ -92,9 +97,13 @@ func (d *DependenciesDistributor) Start(ctx context.Context) error { bindingHandler := informermanager.NewHandlerOnEvents(nil, d.OnResourceBindingUpdate, d.OnResourceBindingDelete) d.InformerManager.ForResource(resourceBindingGVR, bindingHandler) d.resourceBindingLister = d.InformerManager.Lister(resourceBindingGVR) - + resourceWorkerOptions := util.Options{ + Name: "resource detector", + KeyFunc: detector.ClusterWideKeyFunc, + ReconcileFunc: d.Reconcile, + } d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) - d.Processor = util.NewAsyncWorker("resource detector", detector.ClusterWideKeyFunc, d.Reconcile) + d.Processor = util.NewAsyncWorker(resourceWorkerOptions) d.Processor.Run(2, d.stopCh) go d.discoverResources(30 * time.Second) diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index 64f68cff7..7ee77264e 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -70,11 +70,20 @@ func NewDescheduler(karmadaClient karmadaclientset.Interface, kubeClient kuberne unschedulableThreshold: opts.UnschedulableThreshold.Duration, deschedulingInterval: opts.DeschedulingInterval.Duration, } - desched.schedulerEstimatorWorker = util.NewAsyncWorker("scheduler-estimator", nil, desched.reconcileEstimatorConnection) + schedulerEstimatorWorkerOptions := util.Options{ + Name: "scheduler-estimator", + KeyFunc: nil, + ReconcileFunc: desched.reconcileEstimatorConnection, + } + desched.schedulerEstimatorWorker = util.NewAsyncWorker(schedulerEstimatorWorkerOptions) schedulerEstimator := estimatorclient.NewSchedulerEstimator(desched.schedulerEstimatorCache, opts.SchedulerEstimatorTimeout.Duration) estimatorclient.RegisterSchedulerEstimator(schedulerEstimator) - - desched.deschedulerWorker = util.NewAsyncWorker("descheduler", util.MetaNamespaceKeyFunc, desched.worker) + deschedulerWorkerOptions := util.Options{ + Name: "descheduler", + KeyFunc: util.MetaNamespaceKeyFunc, + ReconcileFunc: desched.worker, + } + desched.deschedulerWorker = util.NewAsyncWorker(deschedulerWorkerOptions) desched.clusterInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: desched.addCluster, diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index 67fa47316..972ee185d 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -36,6 +36,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/informermanager/keys" "github.com/karmada-io/karmada/pkg/util/names" + "github.com/karmada-io/karmada/pkg/util/ratelimiter" "github.com/karmada-io/karmada/pkg/util/restmapper" ) @@ -82,6 +83,7 @@ type ResourceDetector struct { // ConcurrentResourceBindingSyncs is the number of ResourceBinding that are allowed to sync concurrently. // Larger number means responsive resource template syncing but more CPU(and network) load. ConcurrentResourceBindingSyncs int + RatelimiterOptions ratelimiter.Options stopCh <-chan struct{} } @@ -93,9 +95,19 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.stopCh = ctx.Done() // setup policy reconcile worker - d.policyReconcileWorker = util.NewAsyncWorker("propagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcilePropagationPolicy) + policyWorkerOptions := util.Options{ + Name: "propagationPolicy reconciler", + KeyFunc: ClusterWideKeyFunc, + ReconcileFunc: d.ReconcilePropagationPolicy, + } + d.policyReconcileWorker = util.NewAsyncWorker(policyWorkerOptions) d.policyReconcileWorker.Run(1, d.stopCh) - d.clusterPolicyReconcileWorker = util.NewAsyncWorker("clusterPropagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcileClusterPropagationPolicy) + clusterPolicyWorkerOptions := util.Options{ + Name: "clusterPropagationPolicy reconciler", + KeyFunc: ClusterWideKeyFunc, + ReconcileFunc: d.ReconcileClusterPropagationPolicy, + } + d.clusterPolicyReconcileWorker = util.NewAsyncWorker(clusterPolicyWorkerOptions) d.clusterPolicyReconcileWorker.Run(1, d.stopCh) // watch and enqueue PropagationPolicy changes. @@ -119,7 +131,12 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR) // setup binding reconcile worker - d.bindingReconcileWorker = util.NewAsyncWorker("resourceBinding reconciler", ClusterWideKeyFunc, d.ReconcileResourceBinding) + bindingWorkerOptions := util.Options{ + Name: "resourceBinding reconciler", + KeyFunc: ClusterWideKeyFunc, + ReconcileFunc: d.ReconcileResourceBinding, + } + d.bindingReconcileWorker = util.NewAsyncWorker(bindingWorkerOptions) d.bindingReconcileWorker.Run(d.ConcurrentResourceBindingSyncs, d.stopCh) // watch and enqueue ResourceBinding changes. @@ -138,11 +155,17 @@ func (d *ResourceDetector) Start(ctx context.Context) error { Version: workv1alpha2.GroupVersion.Version, Resource: "clusterresourcebindings", } + detectorWorkerOptions := util.Options{ + Name: "resource detector", + KeyFunc: ClusterWideKeyFunc, + ReconcileFunc: d.Reconcile, + RatelimiterOptions: d.RatelimiterOptions, + } clusterBindingHandler := informermanager.NewHandlerOnEvents(d.OnClusterResourceBindingAdd, d.OnClusterResourceBindingUpdate, nil) d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler) d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete) - d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile) + d.Processor = util.NewAsyncWorker(detectorWorkerOptions) d.Processor.Run(d.ConcurrentResourceTemplateSyncs, d.stopCh) go d.discoverResources(30 * time.Second) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 94fcadbb6..34856acf3 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -115,7 +115,12 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse if opts.EnableSchedulerEstimator { sched.schedulerEstimatorCache = estimatorclient.NewSchedulerEstimatorCache() sched.schedulerEstimatorPort = opts.SchedulerEstimatorPort - sched.schedulerEstimatorWorker = util.NewAsyncWorker("scheduler-estimator", nil, sched.reconcileEstimatorConnection) + schedulerEstimatorWorkerOptions := util.Options{ + Name: "scheduler-estimator", + KeyFunc: nil, + ReconcileFunc: sched.reconcileEstimatorConnection, + } + sched.schedulerEstimatorWorker = util.NewAsyncWorker(schedulerEstimatorWorkerOptions) schedulerEstimator := estimatorclient.NewSchedulerEstimator(sched.schedulerEstimatorCache, opts.SchedulerEstimatorTimeout.Duration) estimatorclient.RegisterSchedulerEstimator(schedulerEstimator) } diff --git a/pkg/util/ratelimiter/rate_limiter.go b/pkg/util/ratelimiter/rate_limiter.go new file mode 100644 index 000000000..7a5c3f47b --- /dev/null +++ b/pkg/util/ratelimiter/rate_limiter.go @@ -0,0 +1,37 @@ +package ratelimiter + +import ( + "time" + + "golang.org/x/time/rate" + "k8s.io/client-go/util/workqueue" +) + +// Options are options for ratelimiter +type Options struct { + BaseDelay time.Duration + MaxDelay time.Duration + QPS int + BucketSize int +} + +// DefaultControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags. +func DefaultControllerRateLimiter(opts Options) workqueue.RateLimiter { + // set defaults + if opts.BaseDelay <= 0 { + opts.BaseDelay = 5 * time.Millisecond + } + if opts.MaxDelay <= 0 { + opts.MaxDelay = 1000 * time.Second + } + if opts.QPS <= 0 { + opts.QPS = 10 + } + if opts.BucketSize <= 0 { + opts.BucketSize = 100 + } + return workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(opts.BaseDelay, opts.MaxDelay), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(opts.QPS), opts.BucketSize)}, + ) +} diff --git a/pkg/util/worker.go b/pkg/util/worker.go index a5399d2cf..49d11d511 100644 --- a/pkg/util/worker.go +++ b/pkg/util/worker.go @@ -6,6 +6,8 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + + "github.com/karmada-io/karmada/pkg/util/ratelimiter" ) const ( @@ -55,12 +57,22 @@ type asyncWorker struct { queue workqueue.RateLimitingInterface } +// Options are the arguments for creating a new AsyncWorker. +type Options struct { + // Name is the queue's name that will be used to emit metrics. + // Defaults to "", which means disable metrics. + Name string + KeyFunc KeyFunc + ReconcileFunc ReconcileFunc + RatelimiterOptions ratelimiter.Options +} + // NewAsyncWorker returns a asyncWorker which can process resource periodic. -func NewAsyncWorker(name string, keyFunc KeyFunc, reconcileFunc ReconcileFunc) AsyncWorker { +func NewAsyncWorker(opt Options) AsyncWorker { return &asyncWorker{ - keyFunc: keyFunc, - reconcileFunc: reconcileFunc, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), + keyFunc: opt.KeyFunc, + reconcileFunc: opt.ReconcileFunc, + queue: workqueue.NewNamedRateLimitingQueue(ratelimiter.DefaultControllerRateLimiter(opt.RatelimiterOptions), opt.Name), } }