use singleton rate limiter for dynamically created member cluster clients

Signed-off-by: zach593 <zach_li@outlook.com>
This commit is contained in:
zach593 2025-03-09 17:26:59 +08:00
parent c31c8e4a58
commit 2b7ad8e673
20 changed files with 302 additions and 102 deletions

View File

@ -263,7 +263,10 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
return fmt.Errorf("failed to setup custom resource interpreter: %w", err)
}
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter)
rateLimiterGetter := util.GetClusterRateLimiterGetter().SetDefaultLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, clusterClientOption, resourceInterpreter)
controllerContext := controllerscontext.Context{
Mgr: mgr,
ObjectWatcher: objectWatcher,
@ -276,8 +279,6 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
ClusterFailureThreshold: opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: opts.RateLimiterOpts,
EnableClusterResourceModeling: opts.EnableClusterResourceModeling,
@ -287,6 +288,7 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
},
Context: ctx,
ResourceInterpreter: resourceInterpreter,
ClusterClientOption: clusterClientOption,
}
if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
@ -312,7 +314,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error)
GenericInformerManager: genericmanager.GetInstance(),
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterClientOption: &util.ClientOption{QPS: ctx.Opts.ClusterAPIQPS, Burst: ctx.Opts.ClusterAPIBurst},
ClusterClientOption: ctx.ClusterClientOption,
ClusterStatusUpdateFrequency: ctx.Opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: ctx.Opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: ctx.Opts.ClusterLeaseRenewIntervalFraction,

View File

@ -330,7 +330,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool,
GenericInformerManager: genericmanager.GetInstance(),
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst},
ClusterClientOption: ctx.ClusterClientOption,
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
@ -434,6 +434,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
@ -472,6 +473,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}
@ -498,6 +500,7 @@ func startEndpointSliceCollectController(ctx controllerscontext.Context) (enable
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}
@ -781,8 +784,9 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
if err := mgr.Add(resourceInterpreter); err != nil {
klog.Fatalf("Failed to setup custom resource interpreter: %v", err)
}
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
rateLimiterGetter := util.GetClusterRateLimiterGetter().SetDefaultLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, clusterClientOption, resourceInterpreter)
resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
@ -835,8 +839,6 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
ClusterFailureThreshold: opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
SkippedPropagatingNamespaces: opts.SkippedNamespacesRegexps(),
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
EnableTaintManager: opts.EnableTaintManager,
@ -851,6 +853,7 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
OverrideManager: overrideManager,
ControlPlaneInformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
ClusterClientOption: clusterClientOption,
}
if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {

View File

@ -136,7 +136,8 @@ func (o *Options) Config(ctx context.Context) (*metricsadapter.MetricsServer, er
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
metricsController := metricsadapter.NewMetricsController(ctx, restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst})
limiterGetter := util.GetClusterRateLimiterGetter().SetDefaultLimits(o.ClusterAPIQPS, o.ClusterAPIBurst)
metricsController := metricsadapter.NewMetricsController(ctx, restConfig, factory, kubeFactory, &util.ClientOption{RateLimiterGetter: limiterGetter.GetRateLimiter})
metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions)
metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))

View File

@ -31,6 +31,7 @@ import (
"github.com/karmada-io/karmada/pkg/controllers/federatedhpa/config"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
@ -68,10 +69,6 @@ type Options struct {
ClusterFailureThreshold metav1.Duration
// ClusterCacheSyncTimeout is the timeout period waiting for cluster cache to sync.
ClusterCacheSyncTimeout metav1.Duration
// ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver.
ClusterAPIQPS float32
// ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver.
ClusterAPIBurst int
// SkippedPropagatingNamespaces is a list of namespace regular expressions, matching namespaces will be skipped propagating.
SkippedPropagatingNamespaces []*regexp.Regexp
// ClusterName is the name of cluster.
@ -113,6 +110,7 @@ type Context struct {
OverrideManager overridemanager.OverrideManager
ControlPlaneInformerManager genericmanager.SingleClusterInformerManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter
ClusterClientOption *util.ClientOption
}
// IsControllerEnabled check if a specified controller enabled or not.

View File

@ -229,7 +229,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr
informerManager.ForCluster(cluster.Name, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
informerManager.Start(cluster.Name)
informerManager.WaitForCacheSync(cluster.Name)
clusterClientSetFunc := func(string, client.Client) (*util.DynamicClusterClient, error) {
clusterClientSetFunc := func(string, client.Client, *util.ClientOption) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
ClusterName: clusterName,
DynamicClientSet: dynamicClientSet,
@ -241,7 +241,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr
InformerManager: informerManager,
EventRecorder: recorder,
RESTMapper: restMapper,
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, resourceInterpreter),
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, nil, resourceInterpreter),
}
}

View File

@ -71,7 +71,8 @@ type ServiceExportController struct {
InformerManager genericmanager.MultiClusterInformerManager
WorkerNumber int // WorkerNumber is the number of worker goroutines
PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys.
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
ClusterClientOption *util.ClientOption
ClusterCacheSyncTimeout metav1.Duration
// eventHandlers holds the handlers which used to handle events reported from member clusters.
@ -244,7 +245,7 @@ func (c *ServiceExportController) buildResourceInformers(cluster *clusterv1alpha
func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err

View File

@ -62,7 +62,8 @@ type EndpointSliceCollectController struct {
InformerManager genericmanager.MultiClusterInformerManager
WorkerNumber int // WorkerNumber is the number of worker goroutines
PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys.
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
ClusterClientOption *util.ClientOption
// eventHandlers holds the handlers which used to handle events reported from member clusters.
// Each handler takes the cluster name as key and takes the handler function as the value, e.g.
// "member1": instance of ResourceEventHandler
@ -182,7 +183,7 @@ func (c *EndpointSliceCollectController) buildResourceInformers(clusterName stri
func (c *EndpointSliceCollectController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err

View File

@ -86,8 +86,8 @@ type ClusterStatusController struct {
PredicateFunc predicate.Predicate
TypedInformerManager typedmanager.MultiClusterInformerManager
GenericInformerManager genericmanager.MultiClusterInformerManager
ClusterClientSetFunc func(string, client.Client, *util.ClientOption) (*util.ClusterClient, error)
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterClientSetFunc util.NewClusterClientSetFunc
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
// ClusterClientOption holds the attributes that should be injected to a Kubernetes client.
ClusterClientOption *util.ClientOption
@ -342,7 +342,7 @@ func (c *ClusterStatusController) initializeGenericInformerManagerForCluster(clu
return
}
dynamicClient, err := c.ClusterDynamicClientSetFunc(clusterClient.ClusterName, c.Client)
dynamicClient, err := c.ClusterDynamicClientSetFunc(clusterClient.ClusterName, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterClient.ClusterName)
return

View File

@ -113,11 +113,8 @@ func TestClusterStatusController_Reconcile(t *testing.T) {
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
}
if tt.cluster != nil {
@ -900,11 +897,8 @@ func TestClusterStatusController_updateStatusIfNeeded(t *testing.T) {
).WithStatusSubresource(cluster).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
}
err := c.updateStatusIfNeeded(context.Background(), cluster, currentClusterStatus)
@ -964,11 +958,8 @@ func TestClusterStatusController_updateStatusIfNeeded(t *testing.T) {
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
}
err := c.updateStatusIfNeeded(context.Background(), cluster, currentClusterStatus)
@ -976,20 +967,17 @@ func TestClusterStatusController_updateStatusIfNeeded(t *testing.T) {
})
}
func NewClusterDynamicClientSetForAgentWithError(_ string, _ client.Client) (*util.DynamicClusterClient, error) {
func NewClusterDynamicClientSetForAgentWithError(_ string, _ client.Client, _ *util.ClientOption) (*util.DynamicClusterClient, error) {
return nil, fmt.Errorf("err")
}
func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *testing.T) {
t.Run("failed to create dynamicClient", func(*testing.T) {
c := &ClusterStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: NewClusterDynamicClientSetForAgentWithError,
}
@ -1002,13 +990,10 @@ func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *t
t.Run("suc to create dynamicClient", func(*testing.T) {
c := &ClusterStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
}
@ -1022,13 +1007,10 @@ func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *t
func TestClusterStatusController_initLeaseController(_ *testing.T) {
c := &ClusterStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: NewClusterDynamicClientSetForAgentWithError,
}

View File

@ -71,7 +71,8 @@ type WorkStatusController struct {
ConcurrentWorkStatusSyncs int
ObjectWatcher objectwatcher.ObjectWatcher
PredicateFunc predicate.Predicate
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
ClusterClientOption *util.ClientOption
ClusterCacheSyncTimeout metav1.Duration
RateLimiterOptions ratelimiterflag.Options
ResourceInterpreter resourceinterpreter.ResourceInterpreter
@ -534,7 +535,7 @@ func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1.
// the cache in informer manager should be updated.
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return nil, err

View File

@ -735,7 +735,7 @@ func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets
if len(dynamicClientSets) > 0 {
c.ResourceInterpreter = FakeResourceInterpreter{DefaultInterpreter: native.NewDefaultInterpreter()}
c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, c.RESTMapper, util.NewClusterDynamicClientSetForAgent, c.ResourceInterpreter)
c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, c.RESTMapper, util.NewClusterDynamicClientSetForAgent, nil, c.ResourceInterpreter)
// Generate InformerManager
clusterName := cluster.Name

View File

@ -58,6 +58,7 @@ type MetricsController struct {
MultiClusterDiscovery multiclient.MultiClusterDiscoveryInterface
queue workqueue.TypedRateLimitingInterface[any]
restConfig *rest.Config
clusterClientOption *util.ClientOption
}
// NewMetricsController creates a new metrics controller
@ -73,6 +74,7 @@ func NewMetricsController(ctx context.Context, restConfig *rest.Config, factory
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
Name: "metrics-adapter",
}),
clusterClientOption: clusterClientOption,
}
controller.addEventHandler()
@ -235,11 +237,11 @@ func (m *MetricsController) handleClusters() bool {
if !m.TypedInformerManager.IsManagerExist(clusterName) {
klog.Info("Try to build informer manager for cluster ", clusterName)
controlPlaneClient := gclient.NewForConfigOrDie(m.restConfig)
clusterClient, err := util.NewClusterClientSet(clusterName, controlPlaneClient, nil)
clusterClient, err := util.NewClusterClientSet(clusterName, controlPlaneClient, m.clusterClientOption)
if err != nil {
return false
}
clusterDynamicClient, err := util.NewClusterDynamicClientSet(clusterName, controlPlaneClient)
clusterDynamicClient, err := util.NewClusterDynamicClientSet(clusterName, controlPlaneClient, m.clusterClientOption)
if err != nil {
return false
}

View File

@ -74,8 +74,9 @@ func (m *MultiClusterDiscovery) Set(clusterName string) error {
if err != nil {
return err
}
clusterConfig.QPS = m.clusterClientOption.QPS
clusterConfig.Burst = m.clusterClientOption.Burst
if m.clusterClientOption != nil && m.clusterClientOption.RateLimiterGetter != nil {
clusterConfig.RateLimiter = m.clusterClientOption.RateLimiterGetter(clusterName)
}
m.Lock()
defer m.Unlock()
m.clients[clusterName] = discovery.NewDiscoveryClientForConfigOrDie(clusterConfig)

View File

@ -352,7 +352,8 @@ func (c *Controller) getRegistryBackendHandler(cluster string, matchedRegistries
}
var clusterDynamicClientBuilder = func(cluster string, controlPlaneClient client.Client) (*util.DynamicClusterClient, error) {
return util.NewClusterDynamicClientSet(cluster, controlPlaneClient)
// TODO: Add "--cluster-api-qps" and "--cluster-api-burst" flags to karmada-search and pass them via clientOption instead of passing a "nil" here
return util.NewClusterDynamicClientSet(cluster, controlPlaneClient, nil)
}
// doCacheCluster processes the resourceRegistry object

View File

@ -28,6 +28,7 @@ import (
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/scale"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -60,13 +61,9 @@ type ClusterScaleClient struct {
// ClientOption holds the attributes that should be injected to a Kubernetes client.
type ClientOption struct {
// QPS indicates the maximum QPS to the master from this client.
// If it's zero, the created RESTClient will use DefaultQPS: 5
QPS float32
// Burst indicates the maximum burst for throttle.
// If it's zero, the created RESTClient will use DefaultBurst: 10.
Burst int
// RateLimiter is used to limit the QPS to the master from this client.
// We use this instead of QPS/Burst to avoid multiple client initializations causing QPS/Burst to lose its effect.
RateLimiterGetter func(key string) flowcontrol.RateLimiter
}
// NewClusterScaleClientSet returns a ClusterScaleClient for the given member cluster.
@ -102,6 +99,9 @@ func NewClusterScaleClientSet(clusterName string, client client.Client) (*Cluste
return &clusterScaleClientSet, nil
}
// NewClusterClientSetFunc is a function that returns a ClusterClient for the given member cluster.
type NewClusterClientSetFunc = func(clusterName string, client client.Client, clientOption *ClientOption) (*ClusterClient, error)
// NewClusterClientSet returns a ClusterClient for the given member cluster.
func NewClusterClientSet(clusterName string, client client.Client, clientOption *ClientOption) (*ClusterClient, error) {
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client))
@ -113,8 +113,9 @@ func NewClusterClientSet(clusterName string, client client.Client, clientOption
if clusterConfig != nil {
if clientOption != nil {
clusterConfig.QPS = clientOption.QPS
clusterConfig.Burst = clientOption.Burst
if clientOption.RateLimiterGetter != nil {
clusterConfig.RateLimiter = clientOption.RateLimiterGetter(clusterName)
}
}
clusterClientSet.KubeClient = kubeclientset.NewForConfigOrDie(clusterConfig)
}
@ -132,16 +133,20 @@ func NewClusterClientSetForAgent(clusterName string, _ client.Client, clientOpti
if clusterConfig != nil {
if clientOption != nil {
clusterConfig.QPS = clientOption.QPS
clusterConfig.Burst = clientOption.Burst
if clientOption.RateLimiterGetter != nil {
clusterConfig.RateLimiter = clientOption.RateLimiterGetter(clusterName)
}
}
clusterClientSet.KubeClient = kubeclientset.NewForConfigOrDie(clusterConfig)
}
return &clusterClientSet, nil
}
// NewClusterDynamicClientSetFunc is a function that returns a dynamic client for the given member cluster.
type NewClusterDynamicClientSetFunc = func(clusterName string, client client.Client, clientOption *ClientOption) (*DynamicClusterClient, error)
// NewClusterDynamicClientSet returns a dynamic client for the given member cluster.
func NewClusterDynamicClientSet(clusterName string, client client.Client) (*DynamicClusterClient, error) {
func NewClusterDynamicClientSet(clusterName string, client client.Client, clientOption *ClientOption) (*DynamicClusterClient, error) {
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client))
if err != nil {
return nil, err
@ -149,13 +154,18 @@ func NewClusterDynamicClientSet(clusterName string, client client.Client) (*Dyna
var clusterClientSet = DynamicClusterClient{ClusterName: clusterName}
if clusterConfig != nil {
if clientOption != nil {
if clientOption.RateLimiterGetter != nil {
clusterConfig.RateLimiter = clientOption.RateLimiterGetter(clusterName)
}
}
clusterClientSet.DynamicClientSet = dynamic.NewForConfigOrDie(clusterConfig)
}
return &clusterClientSet, nil
}
// NewClusterDynamicClientSetForAgent returns a dynamic client for the given member cluster which will be used in karmada agent.
func NewClusterDynamicClientSetForAgent(clusterName string, _ client.Client) (*DynamicClusterClient, error) {
func NewClusterDynamicClientSetForAgent(clusterName string, _ client.Client, clientOption *ClientOption) (*DynamicClusterClient, error) {
clusterConfig, err := controllerruntime.GetConfig()
if err != nil {
return nil, fmt.Errorf("error building kubeconfig of member cluster: %s", err.Error())
@ -163,6 +173,11 @@ func NewClusterDynamicClientSetForAgent(clusterName string, _ client.Client) (*D
var clusterClientSet = DynamicClusterClient{ClusterName: clusterName}
if clusterConfig != nil {
if clientOption != nil {
if clientOption.RateLimiterGetter != nil {
clusterConfig.RateLimiter = clientOption.RateLimiterGetter(clusterName)
}
}
clusterClientSet.DynamicClientSet = dynamic.NewForConfigOrDie(clusterConfig)
}
return &clusterClientSet, nil

View File

@ -195,7 +195,7 @@ func TestNewClusterClientSetForAgent(t *testing.T) {
args: args{
clusterName: "test-agent",
client: fakeclient.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
clientOption: &ClientOption{QPS: 100, Burst: 200},
clientOption: &ClientOption{},
},
wantErr: false,
},
@ -231,8 +231,9 @@ func TestNewClusterClientSetForAgent(t *testing.T) {
func TestNewClusterDynamicClientSetForAgent(t *testing.T) {
type args struct {
clusterName string
client client.Client
clusterName string
client client.Client
clusterClientOption *ClientOption
}
tests := []struct {
name string
@ -263,7 +264,7 @@ func TestNewClusterDynamicClientSetForAgent(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewClusterDynamicClientSetForAgent(tt.args.clusterName, tt.args.client)
got, err := NewClusterDynamicClientSetForAgent(tt.args.clusterName, tt.args.client, tt.args.clusterClientOption)
if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, got)
@ -368,7 +369,7 @@ func TestNewClusterClientSet(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"},
Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token"), clusterv1alpha1.SecretCADataKey: testCA},
}).Build(),
clientOption: &ClientOption{QPS: 100, Burst: 200},
clientOption: &ClientOption{},
},
wantErr: false,
},
@ -389,7 +390,7 @@ func TestNewClusterClientSet(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"},
Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token")},
}).Build(),
clientOption: &ClientOption{QPS: 100, Burst: 200},
clientOption: &ClientOption{},
},
wantErr: false,
},
@ -409,7 +410,7 @@ func TestNewClusterClientSet(t *testing.T) {
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"},
Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token"), clusterv1alpha1.SecretCADataKey: testCA}}).Build(),
clientOption: &ClientOption{QPS: 100, Burst: 200},
clientOption: &ClientOption{},
},
wantErr: true,
},
@ -430,7 +431,7 @@ func TestNewClusterClientSet(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"},
Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token"), clusterv1alpha1.SecretCADataKey: testCA},
}).Build(),
clientOption: &ClientOption{QPS: 100, Burst: 200},
clientOption: &ClientOption{},
},
wantErr: false,
},
@ -499,8 +500,9 @@ func TestNewClusterClientSet_ClientWorks(t *testing.T) {
func TestNewClusterDynamicClientSet(t *testing.T) {
type args struct {
clusterName string
client client.Client
clusterName string
client client.Client
clusterClientOption *ClientOption
}
tests := []struct {
name string
@ -647,7 +649,7 @@ func TestNewClusterDynamicClientSet(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewClusterDynamicClientSet(tt.args.clusterName, tt.args.client)
got, err := NewClusterDynamicClientSet(tt.args.clusterName, tt.args.client, tt.args.clusterClientOption)
if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, got)
@ -691,7 +693,7 @@ func TestNewClusterDynamicClientSet_ClientWorks(t *testing.T) {
Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token"), clusterv1alpha1.SecretCADataKey: testCA},
}).Build()
clusterClient, err := NewClusterDynamicClientSet(clusterName, hostClient)
clusterClient, err := NewClusterDynamicClientSet(clusterName, hostClient, nil)
assert.NoError(t, err)
assert.NotNil(t, clusterClient)

View File

@ -0,0 +1,77 @@
/*
Copyright 2025 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"sync"
"k8s.io/client-go/util/flowcontrol"
)
var defaultRateLimiterGetter = &ClusterRateLimiterGetter{}
// GetClusterRateLimiterGetter returns a ClusterRateLimiterGetter.
func GetClusterRateLimiterGetter() *ClusterRateLimiterGetter {
return defaultRateLimiterGetter
}
// ClusterRateLimiterGetter is responsible for retrieving rate limiters for member clusters.
// It dynamically creates and manages a singleton rate limiter for each member cluster,
// using the preset default QPS and burst values to ensure consistent rate limiting across all clusters.
type ClusterRateLimiterGetter struct {
limiters map[string]flowcontrol.RateLimiter
mu sync.Mutex
qps float32
burst int
}
// SetDefaultLimits sets the default qps and burst for new rate limiters.
// Do not call this method after calling GetRateLimiter, otherwise the call will not take effect.
// If qps and limit are not set when using Getter,
// it will use 40 and 60 by default (the default qps and burst of '--cluster-api-qps' and '--cluster-api-burst' in karmada-controller-manager).
func (r *ClusterRateLimiterGetter) SetDefaultLimits(qps float32, burst int) *ClusterRateLimiterGetter {
r.mu.Lock()
defer r.mu.Unlock()
if len(r.limiters) == 0 {
r.qps = qps
r.burst = burst
}
return r
}
// GetRateLimiter gets rate limiter by key.
func (r *ClusterRateLimiterGetter) GetRateLimiter(key string) flowcontrol.RateLimiter {
r.mu.Lock()
defer r.mu.Unlock()
if r.limiters == nil {
r.limiters = make(map[string]flowcontrol.RateLimiter)
}
limiter, ok := r.limiters[key]
if !ok {
qps := r.qps
if qps <= 0 {
qps = 40
}
burst := r.burst
if burst <= 0 {
burst = 60
}
limiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
r.limiters[key] = limiter
}
return limiter
}

View File

@ -0,0 +1,114 @@
/*
Copyright 2025 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/util/flowcontrol"
)
func TestGetRateLimiterGetter(t *testing.T) {
tests := []struct {
name string
want *ClusterRateLimiterGetter
}{
{
name: "get the default singleton",
want: defaultRateLimiterGetter,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, GetClusterRateLimiterGetter(), "GetClusterRateLimiterGetter()")
})
}
}
func TestRateLimiterGetter_GetRateLimiter(t *testing.T) {
tests := []struct {
name string
getter *ClusterRateLimiterGetter
want flowcontrol.RateLimiter
}{
{
name: "if qps/burst not set, use default value",
getter: &ClusterRateLimiterGetter{},
want: flowcontrol.NewTokenBucketRateLimiter(40, 60),
},
{
name: "SetDefaultLimits() should able to work",
getter: func() *ClusterRateLimiterGetter {
return (&ClusterRateLimiterGetter{}).SetDefaultLimits(100, 200)
}(),
want: flowcontrol.NewTokenBucketRateLimiter(100, 200),
},
{
name: "if qps/burst invalid, use default value",
getter: func() *ClusterRateLimiterGetter {
return (&ClusterRateLimiterGetter{}).SetDefaultLimits(-1, -1)
}(),
want: flowcontrol.NewTokenBucketRateLimiter(40, 60),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, tt.getter.GetRateLimiter("a"), "GetRateLimiter()")
})
}
}
func TestRateLimiterGetter_GetSameRateLimiter(t *testing.T) {
type args struct {
key1 string
key2 string
}
tests := []struct {
name string
args args
wantSameLimiter bool
}{
{
name: "for a single cluster, the same limiter instance is obtained each time",
args: args{
key1: "a",
key2: "a",
},
wantSameLimiter: true,
},
{
name: "the rate limiter is independent for each cluster",
args: args{
key1: "a",
key2: "b",
},
wantSameLimiter: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
getter := &ClusterRateLimiterGetter{}
rl1 := getter.GetRateLimiter(tt.args.key1)
rl2 := getter.GetRateLimiter(tt.args.key2)
gotSameLimiter := rl1 == rl2
assert.Equalf(t, tt.wantSameLimiter, gotSameLimiter,
"key1: %v, key2: %v, wantSameLimiter: %v, rl1: %p, rl2: %p",
tt.args.key1, tt.args.key2, tt.wantSameLimiter, rl1, rl2)
})
}
}

View File

@ -59,33 +59,32 @@ type ObjectWatcher interface {
NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error)
}
// ClientSetFunc is used to generate client set of member cluster
type ClientSetFunc func(c string, client client.Client) (*util.DynamicClusterClient, error)
type objectWatcherImpl struct {
Lock sync.RWMutex
RESTMapper meta.RESTMapper
KubeClientSet client.Client
VersionRecord map[string]map[string]string
ClusterClientSetFunc ClientSetFunc
ClusterClientSetFunc util.NewClusterDynamicClientSetFunc
ClusterClientOption *util.ClientOption
resourceInterpreter resourceinterpreter.ResourceInterpreter
InformerManager genericmanager.MultiClusterInformerManager
}
// NewObjectWatcher returns an instance of ObjectWatcher
func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, clusterClientSetFunc ClientSetFunc, interpreter resourceinterpreter.ResourceInterpreter) ObjectWatcher {
func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, clusterClientSetFunc util.NewClusterDynamicClientSetFunc, clusterClientOption *util.ClientOption, interpreter resourceinterpreter.ResourceInterpreter) ObjectWatcher {
return &objectWatcherImpl{
KubeClientSet: kubeClientSet,
VersionRecord: make(map[string]map[string]string),
RESTMapper: restMapper,
ClusterClientSetFunc: clusterClientSetFunc,
ClusterClientOption: clusterClientOption,
resourceInterpreter: interpreter,
InformerManager: genericmanager.GetInstance(),
}
}
func (o *objectWatcherImpl) Create(ctx context.Context, clusterName string, desireObj *unstructured.Unstructured) error {
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet)
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet, o.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s, err: %v.", clusterName, err)
return err
@ -158,7 +157,7 @@ func (o *objectWatcherImpl) Update(ctx context.Context, clusterName string, desi
desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, workv1alpha2.ResourceConflictResolutionAnnotation)
}
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet)
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet, o.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s, err: %v.", clusterName, err)
return OperationResultNone, err
@ -216,7 +215,7 @@ func (o *objectWatcherImpl) Delete(ctx context.Context, clusterName string, desi
return nil
}
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet)
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet, o.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s, err: %v.", clusterName, err)
return err

View File

@ -199,7 +199,7 @@ func newClusterClientSet(controlPlaneClient client.Client, c *clusterv1alpha1.Cl
if err != nil {
return nil, nil, err
}
clusterDynamicClient, err := util.NewClusterDynamicClientSet(c.Name, controlPlaneClient)
clusterDynamicClient, err := util.NewClusterDynamicClientSet(c.Name, controlPlaneClient, nil)
if err != nil {
return nil, nil, err
}