Merge pull request #1446 from RainbowMango/pr_cleanup_ratelimiter

chore: cleanup ratelimiter flags
This commit is contained in:
karmada-bot 2022-03-09 02:34:20 +08:00 committed by GitHub
commit 8d460a1bd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 25 additions and 22 deletions

View File

@ -198,7 +198,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool,
ClusterLeaseDuration: opts.ClusterLeaseDuration, ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction, ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
RatelimiterOptions: ctx.Opts.RatelimiterOptions, RateLimiterOptions: ctx.Opts.RateLimiterOptions,
} }
if err := clusterStatusController.SetupWithManager(mgr); err != nil { if err := clusterStatusController.SetupWithManager(mgr); err != nil {
return false, err return false, err
@ -229,7 +229,7 @@ func startBindingController(ctx controllerscontext.Context) (enabled bool, err e
OverrideManager: ctx.OverrideManager, OverrideManager: ctx.OverrideManager,
InformerManager: ctx.ControlPlaneInformerManager, InformerManager: ctx.ControlPlaneInformerManager,
ResourceInterpreter: ctx.ResourceInterpreter, ResourceInterpreter: ctx.ResourceInterpreter,
RatelimiterOptions: ctx.Opts.RatelimiterOptions, RateLimiterOptions: ctx.Opts.RateLimiterOptions,
} }
if err := bindingController.SetupWithManager(ctx.Mgr); err != nil { if err := bindingController.SetupWithManager(ctx.Mgr); err != nil {
return false, err return false, err
@ -243,7 +243,7 @@ func startBindingController(ctx controllerscontext.Context) (enabled bool, err e
OverrideManager: ctx.OverrideManager, OverrideManager: ctx.OverrideManager,
InformerManager: ctx.ControlPlaneInformerManager, InformerManager: ctx.ControlPlaneInformerManager,
ResourceInterpreter: ctx.ResourceInterpreter, ResourceInterpreter: ctx.ResourceInterpreter,
RatelimiterOptions: ctx.Opts.RatelimiterOptions, RateLimiterOptions: ctx.Opts.RateLimiterOptions,
} }
if err := clusterResourceBindingController.SetupWithManager(ctx.Mgr); err != nil { if err := clusterResourceBindingController.SetupWithManager(ctx.Mgr); err != nil {
return false, err return false, err
@ -260,7 +260,7 @@ func startExecutionController(ctx controllerscontext.Context) (enabled bool, err
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
InformerManager: informermanager.GetInstance(), InformerManager: informermanager.GetInstance(),
ClusterClientSetFunc: util.NewClusterDynamicClientSet, ClusterClientSetFunc: util.NewClusterDynamicClientSet,
RatelimiterOption: ctx.Opts.RatelimiterOptions, RatelimiterOption: ctx.Opts.RateLimiterOptions,
} }
if err := executionController.SetupWithManager(ctx.Mgr); err != nil { if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
return false, err return false, err
@ -281,7 +281,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er
ClusterClientSetFunc: util.NewClusterDynamicClientSet, ClusterClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs, ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
RatelimiterOptions: ctx.Opts.RatelimiterOptions, RateLimiterOptions: ctx.Opts.RateLimiterOptions,
} }
workStatusController.RunWorkQueue() workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
@ -410,7 +410,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
} }
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter) objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
ratelimiterOptions := ratelimiter.Options{ rateLimiterOptions := ratelimiter.Options{
BaseDelay: opts.RateLimiterBaseDelay, BaseDelay: opts.RateLimiterBaseDelay,
MaxDelay: opts.RateLimiterMaxDelay, MaxDelay: opts.RateLimiterMaxDelay,
QPS: opts.RateLimiterQPS, QPS: opts.RateLimiterQPS,
@ -428,7 +428,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
EventRecorder: mgr.GetEventRecorderFor("resource-detector"), EventRecorder: mgr.GetEventRecorderFor("resource-detector"),
ConcurrentResourceTemplateSyncs: opts.ConcurrentResourceTemplateSyncs, ConcurrentResourceTemplateSyncs: opts.ConcurrentResourceTemplateSyncs,
ConcurrentResourceBindingSyncs: opts.ConcurrentResourceBindingSyncs, ConcurrentResourceBindingSyncs: opts.ConcurrentResourceBindingSyncs,
RatelimiterOptions: ratelimiterOptions, RateLimiterOptions: rateLimiterOptions,
} }
if err := mgr.Add(resourceDetector); err != nil { if err := mgr.Add(resourceDetector); err != nil {
klog.Fatalf("Failed to setup resource detector: %v", err) klog.Fatalf("Failed to setup resource detector: %v", err)
@ -464,7 +464,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterAPIBurst: opts.ClusterAPIBurst, ClusterAPIBurst: opts.ClusterAPIBurst,
SkippedPropagatingNamespaces: opts.SkippedPropagatingNamespaces, SkippedPropagatingNamespaces: opts.SkippedPropagatingNamespaces,
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs, ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
RatelimiterOptions: ratelimiterOptions, RateLimiterOptions: rateLimiterOptions,
}, },
StopChan: stopChan, StopChan: stopChan,
DynamicClientSet: dynamicClientSet, DynamicClientSet: dynamicClientSet,

View File

@ -49,7 +49,7 @@ type ResourceBindingController struct {
RESTMapper meta.RESTMapper RESTMapper meta.RESTMapper
OverrideManager overridemanager.OverrideManager OverrideManager overridemanager.OverrideManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter ResourceInterpreter resourceinterpreter.ResourceInterpreter
RatelimiterOptions ratelimiter.Options RateLimiterOptions ratelimiter.Options
} }
// Reconcile performs a full reconciliation for the object referred to by the Request. // Reconcile performs a full reconciliation for the object referred to by the Request.
@ -221,7 +221,7 @@ func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manag
Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
WithOptions(controller.Options{ WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions), RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RateLimiterOptions),
}). }).
Complete(c) Complete(c)
} }

View File

@ -44,7 +44,7 @@ type ClusterResourceBindingController struct {
RESTMapper meta.RESTMapper RESTMapper meta.RESTMapper
OverrideManager overridemanager.OverrideManager OverrideManager overridemanager.OverrideManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter ResourceInterpreter resourceinterpreter.ResourceInterpreter
RatelimiterOptions ratelimiter.Options RateLimiterOptions ratelimiter.Options
} }
// Reconcile performs a full reconciliation for the object referred to by the Request. // Reconcile performs a full reconciliation for the object referred to by the Request.
@ -162,7 +162,7 @@ func (c *ClusterResourceBindingController) SetupWithManager(mgr controllerruntim
Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
WithOptions(controller.Options{ WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions), RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RateLimiterOptions),
}). }).
Complete(c) Complete(c)
} }

View File

@ -50,7 +50,7 @@ type Options struct {
ClusterName string ClusterName string
// ConcurrentWorkSyncs is the number of Works that are allowed to sync concurrently. // ConcurrentWorkSyncs is the number of Works that are allowed to sync concurrently.
ConcurrentWorkSyncs int ConcurrentWorkSyncs int
RatelimiterOptions ratelimiter.Options RateLimiterOptions ratelimiter.Options
} }
// Context defines the context object for controller. // Context defines the context object for controller.

View File

@ -84,7 +84,7 @@ type ClusterStatusController struct {
ClusterLeaseControllers sync.Map ClusterLeaseControllers sync.Map
ClusterCacheSyncTimeout metav1.Duration ClusterCacheSyncTimeout metav1.Duration
RatelimiterOptions ratelimiter.Options RateLimiterOptions ratelimiter.Options
} }
// Reconcile syncs status of the given member cluster. // Reconcile syncs status of the given member cluster.
@ -117,7 +117,7 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr
// SetupWithManager creates a controller and register to controller manager. // SetupWithManager creates a controller and register to controller manager.
func (c *ClusterStatusController) SetupWithManager(mgr controllerruntime.Manager) error { func (c *ClusterStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{ return controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions), RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RateLimiterOptions),
}).Complete(c) }).Complete(c)
} }

View File

@ -49,7 +49,7 @@ type WorkStatusController struct {
PredicateFunc predicate.Predicate PredicateFunc predicate.Predicate
ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterCacheSyncTimeout metav1.Duration ClusterCacheSyncTimeout metav1.Duration
RatelimiterOptions ratelimiter.Options RateLimiterOptions ratelimiter.Options
} }
// Reconcile performs a full reconciliation for the object referred to by the Request. // Reconcile performs a full reconciliation for the object referred to by the Request.
@ -452,6 +452,6 @@ func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1.
// SetupWithManager creates a controller and register to controller manager. // SetupWithManager creates a controller and register to controller manager.
func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error { func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{ return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions), RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RateLimiterOptions),
}).Complete(c) }).Complete(c)
} }

View File

@ -77,7 +77,10 @@ type ResourceDetector struct {
// ConcurrentResourceBindingSyncs is the number of ResourceBinding that are allowed to sync concurrently. // ConcurrentResourceBindingSyncs is the number of ResourceBinding that are allowed to sync concurrently.
// Larger number means responsive resource template syncing but more CPU(and network) load. // Larger number means responsive resource template syncing but more CPU(and network) load.
ConcurrentResourceBindingSyncs int ConcurrentResourceBindingSyncs int
RatelimiterOptions ratelimiter.Options
// RateLimiterOptions is the configuration for rate limiter which may significantly influence the performance of
// the controller.
RateLimiterOptions ratelimiter.Options
stopCh <-chan struct{} stopCh <-chan struct{}
} }
@ -128,7 +131,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
Name: "resource detector", Name: "resource detector",
KeyFunc: ClusterWideKeyFunc, KeyFunc: ClusterWideKeyFunc,
ReconcileFunc: d.Reconcile, ReconcileFunc: d.Reconcile,
RatelimiterOptions: d.RatelimiterOptions, RateLimiterOptions: d.RateLimiterOptions,
} }
d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete) d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete)

View File

@ -7,7 +7,7 @@ import (
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
// Options are options for ratelimiter // Options are options for rate limiter.
type Options struct { type Options struct {
BaseDelay time.Duration BaseDelay time.Duration
MaxDelay time.Duration MaxDelay time.Duration

View File

@ -64,7 +64,7 @@ type Options struct {
Name string Name string
KeyFunc KeyFunc KeyFunc KeyFunc
ReconcileFunc ReconcileFunc ReconcileFunc ReconcileFunc
RatelimiterOptions ratelimiter.Options RateLimiterOptions ratelimiter.Options
} }
// NewAsyncWorker returns a asyncWorker which can process resource periodic. // NewAsyncWorker returns a asyncWorker which can process resource periodic.
@ -72,7 +72,7 @@ func NewAsyncWorker(opt Options) AsyncWorker {
return &asyncWorker{ return &asyncWorker{
keyFunc: opt.KeyFunc, keyFunc: opt.KeyFunc,
reconcileFunc: opt.ReconcileFunc, reconcileFunc: opt.ReconcileFunc,
queue: workqueue.NewNamedRateLimitingQueue(ratelimiter.DefaultControllerRateLimiter(opt.RatelimiterOptions), opt.Name), queue: workqueue.NewNamedRateLimitingQueue(ratelimiter.DefaultControllerRateLimiter(opt.RateLimiterOptions), opt.Name),
} }
} }