Merge pull request #625 from RainbowMango/pr_move_qps_opts
Move QPS and Burst to cluster status controller
This commit is contained in:
commit
396fba7335
|
@ -100,6 +100,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
StopChan: stopChan,
|
||||
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
|
||||
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
||||
ClusterClientOption: &util.ClientOption{QPS: 40, Burst: 60},
|
||||
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
||||
ClusterLeaseDuration: opts.ClusterLeaseDuration,
|
||||
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
|
||||
|
|
|
@ -168,6 +168,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
StopChan: stopChan,
|
||||
ClusterClientSetFunc: util.NewClusterClientSet,
|
||||
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
|
||||
ClusterClientOption: &util.ClientOption{QPS: 40, Burst: 60},
|
||||
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
||||
ClusterLeaseDuration: opts.ClusterLeaseDuration,
|
||||
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
|
||||
|
|
|
@ -60,8 +60,10 @@ type ClusterStatusController struct {
|
|||
PredicateFunc predicate.Predicate
|
||||
InformerManager informermanager.MultiClusterInformerManager
|
||||
StopChan <-chan struct{}
|
||||
ClusterClientSetFunc func(c *v1alpha1.Cluster, client client.Client) (*util.ClusterClient, error)
|
||||
ClusterClientSetFunc func(*v1alpha1.Cluster, client.Client, *util.ClientOption) (*util.ClusterClient, error)
|
||||
ClusterDynamicClientSetFunc func(c *v1alpha1.Cluster, client client.Client) (*util.DynamicClusterClient, error)
|
||||
// ClusterClientOption holds the attributes that should be injected to a Kubernetes client.
|
||||
ClusterClientOption *util.ClientOption
|
||||
|
||||
// ClusterStatusUpdateFrequency is the frequency that controller computes and report cluster status.
|
||||
ClusterStatusUpdateFrequency metav1.Duration
|
||||
|
@ -113,7 +115,7 @@ func (c *ClusterStatusController) SetupWithManager(mgr controllerruntime.Manager
|
|||
|
||||
func (c *ClusterStatusController) syncClusterStatus(cluster *v1alpha1.Cluster) (controllerruntime.Result, error) {
|
||||
// create a ClusterClient for the given member cluster
|
||||
clusterClient, err := c.ClusterClientSetFunc(cluster, c.Client)
|
||||
clusterClient, err := c.ClusterClientSetFunc(cluster, c.Client, c.ClusterClientOption)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create a ClusterClient for the given member cluster: %v, err is : %v", cluster.Name, err)
|
||||
return controllerruntime.Result{Requeue: true}, err
|
||||
|
|
|
@ -20,12 +20,8 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// kubeAPIQPS is the maximum QPS to the master from this client
|
||||
kubeAPIQPS = 20.0
|
||||
// kubeAPIBurst is the maximum burst for throttle
|
||||
kubeAPIBurst = 30
|
||||
tokenKey = "token"
|
||||
cADataKey = "caBundle"
|
||||
tokenKey = "token"
|
||||
cADataKey = "caBundle"
|
||||
)
|
||||
|
||||
// ClusterClient stands for a cluster Clientset for the given member cluster
|
||||
|
@ -40,29 +36,53 @@ type DynamicClusterClient struct {
|
|||
ClusterName string
|
||||
}
|
||||
|
||||
// Config holds the common attributes that can be passed to a Kubernetes client on
|
||||
// initialization.
|
||||
|
||||
// ClientOption holds the attributes that should be injected to a Kubernetes client.
|
||||
type ClientOption struct {
|
||||
// QPS indicates the maximum QPS to the master from this client.
|
||||
// If it's zero, the created RESTClient will use DefaultQPS: 5
|
||||
QPS float32
|
||||
|
||||
// Burst indicates the maximum burst for throttle.
|
||||
// If it's zero, the created RESTClient will use DefaultBurst: 10.
|
||||
Burst int
|
||||
}
|
||||
|
||||
// NewClusterClientSet returns a ClusterClient for the given member cluster.
|
||||
func NewClusterClientSet(c *v1alpha1.Cluster, client client.Client) (*ClusterClient, error) {
|
||||
func NewClusterClientSet(c *v1alpha1.Cluster, client client.Client, clientOption *ClientOption) (*ClusterClient, error) {
|
||||
clusterConfig, err := buildClusterConfig(c, client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var clusterClientSet = ClusterClient{ClusterName: c.Name}
|
||||
|
||||
if clusterConfig != nil {
|
||||
if clientOption != nil {
|
||||
clusterConfig.QPS = clientOption.QPS
|
||||
clusterConfig.Burst = clientOption.Burst
|
||||
}
|
||||
clusterClientSet.KubeClient = kubeclientset.NewForConfigOrDie(clusterConfig)
|
||||
}
|
||||
return &clusterClientSet, nil
|
||||
}
|
||||
|
||||
// NewClusterClientSetForAgent returns a ClusterClient for the given member cluster which will be used in karmada agent.
|
||||
func NewClusterClientSetForAgent(c *v1alpha1.Cluster, client client.Client) (*ClusterClient, error) {
|
||||
func NewClusterClientSetForAgent(c *v1alpha1.Cluster, client client.Client, clientOption *ClientOption) (*ClusterClient, error) {
|
||||
clusterConfig, err := controllerruntime.GetConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building kubeconfig of member cluster: %s", err.Error())
|
||||
}
|
||||
|
||||
var clusterClientSet = ClusterClient{ClusterName: c.Name}
|
||||
|
||||
if clusterConfig != nil {
|
||||
if clientOption != nil {
|
||||
clusterConfig.QPS = clientOption.QPS
|
||||
clusterConfig.Burst = clientOption.Burst
|
||||
}
|
||||
clusterClientSet.KubeClient = kubeclientset.NewForConfigOrDie(clusterConfig)
|
||||
}
|
||||
return &clusterClientSet, nil
|
||||
|
@ -125,8 +145,6 @@ func buildClusterConfig(cluster *v1alpha1.Cluster, client client.Client) (*rest.
|
|||
}
|
||||
|
||||
clusterConfig.BearerToken = string(token)
|
||||
clusterConfig.QPS = kubeAPIQPS
|
||||
clusterConfig.Burst = kubeAPIBurst
|
||||
|
||||
if cluster.Spec.InsecureSkipTLSVerification {
|
||||
clusterConfig.TLSClientConfig.Insecure = true
|
||||
|
|
|
@ -192,7 +192,7 @@ var _ = ginkgo.Describe("[namespace auto-provision] namespace auto-provision tes
|
|||
ginkgo.By(fmt.Sprintf("waiting namespace(%s) present on cluster: %s", namespaceName, clusterName), func() {
|
||||
clusterJoined, err := karmadaClient.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
clusterClient, err := util.NewClusterClientSet(clusterJoined, controlPlaneClient)
|
||||
clusterClient, err := util.NewClusterClientSet(clusterJoined, controlPlaneClient, nil)
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
err = wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) {
|
||||
_, err = clusterClient.KubeClient.CoreV1().Namespaces().Get(context.TODO(), namespaceName, metav1.GetOptions{})
|
||||
|
|
|
@ -290,7 +290,7 @@ func deleteCluster(clusterName, kubeConfigPath string) error {
|
|||
|
||||
func newClusterClientSet(c *clusterv1alpha1.Cluster) (*util.ClusterClient, *util.DynamicClusterClient, error) {
|
||||
if c.Spec.SyncMode == clusterv1alpha1.Push {
|
||||
clusterClient, err := util.NewClusterClientSet(c, controlPlaneClient)
|
||||
clusterClient, err := util.NewClusterClientSet(c, controlPlaneClient, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue