From 02861c5ca0b2a8fe60fc4a5041b6bb7597b913be Mon Sep 17 00:00:00 2001 From: RainbowMango Date: Tue, 19 Nov 2024 11:21:52 +0800 Subject: [PATCH] AsyncWorker adopt typed rate limiter Signed-off-by: RainbowMango --- pkg/metricsadapter/controller.go | 4 ++-- pkg/scheduler/scheduler.go | 4 ++-- pkg/scheduler/scheduler_test.go | 2 +- pkg/search/controller.go | 4 ++-- .../coredns/detector.go | 4 ++-- .../ratelimiterflag/ratelimiterflag.go | 22 ------------------- pkg/util/worker.go | 4 ++-- 7 files changed, 11 insertions(+), 33 deletions(-) diff --git a/pkg/metricsadapter/controller.go b/pkg/metricsadapter/controller.go index 13c9c54ea..b9bf77893 100755 --- a/pkg/metricsadapter/controller.go +++ b/pkg/metricsadapter/controller.go @@ -56,7 +56,7 @@ type MetricsController struct { InformerManager genericmanager.MultiClusterInformerManager TypedInformerManager typedmanager.MultiClusterInformerManager MultiClusterDiscovery multiclient.MultiClusterDiscoveryInterface - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[any] restConfig *rest.Config } @@ -70,7 +70,7 @@ func NewMetricsController(stopCh <-chan struct{}, restConfig *rest.Config, facto InformerManager: genericmanager.GetInstance(), TypedInformerManager: newInstance(stopCh), restConfig: restConfig, - queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{ + queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ Name: "metrics-adapter", }), } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 6364c5221..daaf7135b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -96,7 +96,7 @@ type Scheduler struct { // ResourceBinding/ClusterResourceBinding rescheduling. clusterReconcileWorker util.AsyncWorker // TODO: implement a priority scheduling queue - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[any] Algorithm core.ScheduleAlgorithm schedulerCache schedulercache.Cache @@ -239,7 +239,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse for _, opt := range opts { opt(&options) } - queue := workqueue.NewRateLimitingQueueWithConfig(ratelimiterflag.LegacyControllerRateLimiter(options.RateLimiterOptions), workqueue.RateLimitingQueueConfig{Name: "scheduler-queue"}) + queue := workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](options.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{Name: "scheduler-queue"}) registry := frameworkplugins.NewInTreeRegistry() if err := registry.Merge(options.outOfTreeRegistry); err != nil { return nil, err diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index e1145ae37..59ca28f59 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -1084,7 +1084,7 @@ func TestWorkerAndScheduleNext(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + queue := workqueue.NewTypedRateLimitingQueue[any](workqueue.DefaultTypedControllerRateLimiter[any]()) bindingLister := &fakeBindingLister{binding: resourceBinding} clusterBindingLister := &fakeClusterBindingLister{binding: clusterResourceBinding} diff --git a/pkg/search/controller.go b/pkg/search/controller.go index 608ee9d36..2bfa1b1ad 100644 --- a/pkg/search/controller.go +++ b/pkg/search/controller.go @@ -68,7 +68,7 @@ type Controller struct { restMapper meta.RESTMapper informerFactory informerfactory.SharedInformerFactory clusterLister clusterlister.ClusterLister - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[any] clusterRegistry sync.Map @@ -78,7 +78,7 @@ type Controller struct { // NewController returns a new ResourceRegistry controller func NewController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, restMapper meta.RESTMapper) (*Controller, error) { clusterLister := factory.Cluster().V1alpha1().Clusters().Lister() - queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + queue := workqueue.NewTypedRateLimitingQueue[any](workqueue.DefaultTypedControllerRateLimiter[any]()) c := &Controller{ restConfig: restConfig, diff --git a/pkg/servicenameresolutiondetector/coredns/detector.go b/pkg/servicenameresolutiondetector/coredns/detector.go index 07f44cb04..2e1c19855 100644 --- a/pkg/servicenameresolutiondetector/coredns/detector.go +++ b/pkg/servicenameresolutiondetector/coredns/detector.go @@ -83,7 +83,7 @@ type Detector struct { nodeName string clusterName string - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[any] eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder } @@ -122,7 +122,7 @@ func NewCorednsDetector(memberClusterClient kubernetes.Interface, karmadaClient cacheSynced: []cache.InformerSynced{nodeInformer.Informer().HasSynced}, eventBroadcaster: broadcaster, eventRecorder: recorder, - queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: name}), + queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: name}), lec: leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: baselec.LeaseDuration.Duration, diff --git a/pkg/sharedcli/ratelimiterflag/ratelimiterflag.go b/pkg/sharedcli/ratelimiterflag/ratelimiterflag.go index 64c8f96a0..ecc689f4b 100644 --- a/pkg/sharedcli/ratelimiterflag/ratelimiterflag.go +++ b/pkg/sharedcli/ratelimiterflag/ratelimiterflag.go @@ -68,25 +68,3 @@ func DefaultControllerRateLimiter[T comparable](opts Options) workqueue.TypedRat &workqueue.TypedBucketRateLimiter[T]{Limiter: rate.NewLimiter(rate.Limit(opts.RateLimiterQPS), opts.RateLimiterBucketSize)}, ) } - -// LegacyControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags. -// TODO(@RainbowMango): This function will only used by asyncWorker and will be removed after bump Kubernetes dependency to v1.31. -func LegacyControllerRateLimiter(opts Options) workqueue.RateLimiter { - // set defaults - if opts.RateLimiterBaseDelay <= 0 { - opts.RateLimiterBaseDelay = 5 * time.Millisecond - } - if opts.RateLimiterMaxDelay <= 0 { - opts.RateLimiterMaxDelay = 1000 * time.Second - } - if opts.RateLimiterQPS <= 0 { - opts.RateLimiterQPS = 10 - } - if opts.RateLimiterBucketSize <= 0 { - opts.RateLimiterBucketSize = 100 - } - return workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(opts.RateLimiterBaseDelay, opts.RateLimiterMaxDelay), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(opts.RateLimiterQPS), opts.RateLimiterBucketSize)}, - ) -} diff --git a/pkg/util/worker.go b/pkg/util/worker.go index d1b934180..36f7206ad 100644 --- a/pkg/util/worker.go +++ b/pkg/util/worker.go @@ -65,7 +65,7 @@ type asyncWorker struct { // reconcileFunc is the function that process keys from the queue. reconcileFunc ReconcileFunc // queue allowing parallel processing of resources. - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[any] } // Options are the arguments for creating a new AsyncWorker. @@ -83,7 +83,7 @@ func NewAsyncWorker(opt Options) AsyncWorker { return &asyncWorker{ keyFunc: opt.KeyFunc, reconcileFunc: opt.ReconcileFunc, - queue: workqueue.NewRateLimitingQueueWithConfig(ratelimiterflag.LegacyControllerRateLimiter(opt.RateLimiterOptions), workqueue.RateLimitingQueueConfig{ + queue: workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](opt.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{ Name: opt.Name, }), }