unify http ratelimiter for control plane client

Signed-off-by: zach593 <zach_li@outlook.com>
This commit is contained in:
zach593 2025-02-28 22:33:26 +08:00
parent 3b6c0e0fa2
commit 696e52ef7e
9 changed files with 18 additions and 11 deletions

View File

@ -28,6 +28,7 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
@ -140,8 +141,7 @@ func run(ctx context.Context, opts *options.Options) error {
if err != nil {
return fmt.Errorf("error building kubeconfig of karmada control plane: %w", err)
}
controlPlaneRestConfig.QPS, controlPlaneRestConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
controlPlaneRestConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
clusterConfig, err := controllerruntime.GetConfig()
if err != nil {
return fmt.Errorf("error building kubeconfig of member cluster: %w", err)

View File

@ -36,6 +36,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilversion "k8s.io/apiserver/pkg/util/version"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
@ -120,7 +121,7 @@ func (o *Options) Run(ctx context.Context) error {
}
restConfig := config.GenericConfig.ClientConfig
restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(o.KubeAPIQPS, o.KubeAPIBurst)
secretLister := config.GenericConfig.SharedInformerFactory.Core().V1().Secrets().Lister()
config.GenericConfig.EffectiveVersion = utilversion.NewEffectiveVersion("1.0")

View File

@ -29,6 +29,7 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
@ -144,7 +145,7 @@ func Run(ctx context.Context, opts *options.Options) error {
if err != nil {
panic(err)
}
controlPlaneRestConfig.QPS, controlPlaneRestConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
controlPlaneRestConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
controllerManager, err := controllerruntime.NewManager(controlPlaneRestConfig, controllerruntime.Options{
Logger: klog.Background(),
Scheme: gclient.NewSchema(),

View File

@ -31,6 +31,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
@ -134,7 +135,7 @@ func run(opts *options.Options, stopChan <-chan struct{}) error {
if err != nil {
return fmt.Errorf("error building kubeconfig: %s", err.Error())
}
restConfig.QPS, restConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
kubeClient := kubernetes.NewForConfigOrDie(restConfig)

View File

@ -33,6 +33,7 @@ import (
genericoptions "k8s.io/apiserver/pkg/server/options"
utilversion "k8s.io/apiserver/pkg/util/version"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
@ -179,8 +180,7 @@ func config(o *options.Options, outOfTreeRegistryOptions ...Option) (*search.Con
return nil, err
}
serverConfig.ClientConfig.QPS = o.KubeAPIQPS
serverConfig.ClientConfig.Burst = o.KubeAPIBurst
serverConfig.ClientConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(o.KubeAPIQPS, o.KubeAPIBurst)
serverConfig.Config.EffectiveVersion = utilversion.NewEffectiveVersion("1.0")
httpClient, err := rest.HTTPClientFor(serverConfig.ClientConfig)

View File

@ -28,6 +28,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
@ -128,7 +129,7 @@ func (o *Options) Config(stopCh <-chan struct{}) (*metricsadapter.MetricsServer,
klog.Errorf("Unable to build restConfig: %v", err)
return nil, err
}
restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(o.KubeAPIQPS, o.KubeAPIBurst)
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)

View File

@ -29,6 +29,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
@ -121,7 +122,7 @@ func run(ctx context.Context, opts *options.Options) error {
if err != nil {
return fmt.Errorf("error building kubeconfig: %s", err.Error())
}
restConfig.QPS, restConfig.Burst = opts.ClusterAPIQPS, opts.ClusterAPIBurst
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
dynamicClient := dynamic.NewForConfigOrDie(restConfig)

View File

@ -32,6 +32,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
@ -146,7 +147,7 @@ func run(opts *options.Options, stopChan <-chan struct{}, registryOptions ...Opt
if err != nil {
return fmt.Errorf("error building kubeconfig: %s", err.Error())
}
restConfig.QPS, restConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)

View File

@ -24,6 +24,7 @@ import (
"net/http"
"github.com/spf13/cobra"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
@ -116,7 +117,7 @@ func Run(ctx context.Context, opts *options.Options) error {
if err != nil {
panic(err)
}
config.QPS, config.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
hookManager, err := controllerruntime.NewManager(config, controllerruntime.Options{
Logger: klog.Background(),