Merge pull request #2761 from ikaven1024/pr-mem-clientset

refactor util.BuildClusterConfig
This commit is contained in:
karmada-bot 2022-11-14 09:42:57 +08:00 committed by GitHub
commit bc7b298c40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 46 additions and 340 deletions

View File

@ -1,84 +0,0 @@
package proxy
import (
"fmt"
"net/http"
"net/url"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/dynamic"
listcorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
)
func newMultiClusterStore(clusterLister clusterlisters.ClusterLister,
secretLister listcorev1.SecretLister, restMapper meta.RESTMapper) store.Store {
clientFactory := &clientFactory{
ClusterLister: clusterLister,
SecretLister: secretLister,
}
return store.NewMultiClusterCache(clientFactory.DynamicClientForCluster, restMapper)
}
type clientFactory struct {
ClusterLister clusterlisters.ClusterLister
SecretLister listcorev1.SecretLister
}
// DynamicClientForCluster creates a dynamic client for required cluster.
// TODO: reuse with karmada/pkg/util/membercluster_client.go
func (factory *clientFactory) DynamicClientForCluster(clusterName string) (dynamic.Interface, error) {
cluster, err := factory.ClusterLister.Get(clusterName)
if err != nil {
return nil, err
}
apiEndpoint := cluster.Spec.APIEndpoint
if apiEndpoint == "" {
return nil, fmt.Errorf("the api endpoint of cluster %s is empty", clusterName)
}
if cluster.Spec.SecretRef == nil {
return nil, fmt.Errorf("cluster %s does not have a secret", clusterName)
}
secret, err := factory.SecretLister.Secrets(cluster.Spec.SecretRef.Namespace).Get(cluster.Spec.SecretRef.Name)
if err != nil {
return nil, err
}
token, tokenFound := secret.Data[clusterv1alpha1.SecretTokenKey]
if !tokenFound || len(token) == 0 {
return nil, fmt.Errorf("the secret for cluster %s is missing a non-empty value for %q", clusterName, clusterv1alpha1.SecretTokenKey)
}
clusterConfig, err := clientcmd.BuildConfigFromFlags(apiEndpoint, "")
if err != nil {
return nil, err
}
clusterConfig.BearerToken = string(token)
if cluster.Spec.InsecureSkipTLSVerification {
clusterConfig.TLSClientConfig.Insecure = true
} else {
clusterConfig.CAData = secret.Data[clusterv1alpha1.SecretCADataKey]
}
if cluster.Spec.ProxyURL != "" {
proxy, err := url.Parse(cluster.Spec.ProxyURL)
if err != nil {
klog.Errorf("parse proxy error. %v", err)
return nil, err
}
clusterConfig.Proxy = http.ProxyURL(proxy)
}
return dynamic.NewForConfig(clusterConfig)
}

View File

@ -1,246 +0,0 @@
package proxy
import (
"errors"
"testing"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake"
karmadainformers "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
)
func TestClientFactory_dynamicClientForCluster(t *testing.T) {
// copy from go/src/net/http/internal/testcert/testcert.go
testCA := []byte(`-----BEGIN CERTIFICATE-----
MIIDOTCCAiGgAwIBAgIQSRJrEpBGFc7tNb1fb5pKFzANBgkqhkiG9w0BAQsFADAS
MRAwDgYDVQQKEwdBY21lIENvMCAXDTcwMDEwMTAwMDAwMFoYDzIwODQwMTI5MTYw
MDAwWjASMRAwDgYDVQQKEwdBY21lIENvMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
MIIBCgKCAQEA6Gba5tHV1dAKouAaXO3/ebDUU4rvwCUg/CNaJ2PT5xLD4N1Vcb8r
bFSW2HXKq+MPfVdwIKR/1DczEoAGf/JWQTW7EgzlXrCd3rlajEX2D73faWJekD0U
aUgz5vtrTXZ90BQL7WvRICd7FlEZ6FPOcPlumiyNmzUqtwGhO+9ad1W5BqJaRI6P
YfouNkwR6Na4TzSj5BrqUfP0FwDizKSJ0XXmh8g8G9mtwxOSN3Ru1QFc61Xyeluk
POGKBV/q6RBNklTNe0gI8usUMlYyoC7ytppNMW7X2vodAelSu25jgx2anj9fDVZu
h7AXF5+4nJS4AAt0n1lNY7nGSsdZas8PbQIDAQABo4GIMIGFMA4GA1UdDwEB/wQE
AwICpDATBgNVHSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MB0GA1Ud
DgQWBBStsdjh3/JCXXYlQryOrL4Sh7BW5TAuBgNVHREEJzAlggtleGFtcGxlLmNv
bYcEfwAAAYcQAAAAAAAAAAAAAAAAAAAAATANBgkqhkiG9w0BAQsFAAOCAQEAxWGI
5NhpF3nwwy/4yB4i/CwwSpLrWUa70NyhvprUBC50PxiXav1TeDzwzLx/o5HyNwsv
cxv3HdkLW59i/0SlJSrNnWdfZ19oTcS+6PtLoVyISgtyN6DpkKpdG1cOkW3Cy2P2
+tK/tKHRP1Y/Ra0RiDpOAmqn0gCOFGz8+lqDIor/T7MTpibL3IxqWfPrvfVRHL3B
grw/ZQTTIVjjh4JBSW3WyWgNo/ikC1lrVxzl4iPUGptxT36Cr7Zk2Bsg0XqwbOvK
5d+NTDREkSnUbie4GeutujmX3Dsx88UiV6UY/4lHJa6I5leHUNOHahRbpbWeOfs/
WkBKOclmOV2xlTVuPw==
-----END CERTIFICATE-----`)
type args struct {
clusters []runtime.Object
secrets []runtime.Object
}
type want struct {
err error
}
tests := []struct {
name string
args args
want want
}{
{
name: "cluster not found",
args: args{
clusters: nil,
secrets: nil,
},
want: want{
err: apierrors.NewNotFound(schema.GroupResource{Resource: "cluster", Group: "cluster.karmada.io"}, "test"),
},
},
{
name: "api endpoint is empty",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{},
}},
secrets: nil,
},
want: want{
err: errors.New("the api endpoint of cluster test is empty"),
},
},
{
name: "secret is empty",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
},
}},
secrets: nil,
},
want: want{
err: errors.New("cluster test does not have a secret"),
},
},
{
name: "secret not found",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
},
}},
secrets: nil,
},
want: want{
err: errors.New(`secret "test_secret" not found`),
},
},
{
name: "token not found",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
},
}},
secrets: []runtime.Object{&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test_secret",
},
Data: map[string][]byte{},
}},
},
want: want{
err: errors.New(`the secret for cluster test is missing a non-empty value for "token"`),
},
},
{
name: "success",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
},
}},
secrets: []runtime.Object{&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test_secret",
},
Data: map[string][]byte{
clusterv1alpha1.SecretTokenKey: []byte("test_token"),
clusterv1alpha1.SecretCADataKey: testCA,
},
}},
},
want: want{
err: nil,
},
},
{
name: "has proxy",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
ProxyURL: "https://localhost",
},
}},
secrets: []runtime.Object{&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test_secret",
},
Data: map[string][]byte{
clusterv1alpha1.SecretTokenKey: []byte("test_token"),
clusterv1alpha1.SecretCADataKey: testCA,
},
}},
},
want: want{
err: nil,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
kubeClient := fake.NewSimpleClientset(tt.args.secrets...)
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
karmadaClient := karmadafake.NewSimpleClientset(tt.args.clusters...)
karmadaFactory := karmadainformers.NewSharedInformerFactory(karmadaClient, 0)
factory := &clientFactory{
ClusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(),
SecretLister: kubeFactory.Core().V1().Secrets().Lister(),
}
stopCh := make(chan struct{})
defer close(stopCh)
karmadaFactory.Start(stopCh)
karmadaFactory.WaitForCacheSync(stopCh)
kubeFactory.Start(stopCh)
kubeFactory.WaitForCacheSync(stopCh)
client, err := factory.DynamicClientForCluster("test")
if !proxytest.ErrorMessageEquals(err, tt.want.err) {
t.Errorf("got error %v, want %v", err, tt.want.err)
return
}
if err != nil {
return
}
if client == nil {
t.Error("got client nil")
}
})
}
}

View File

@ -5,6 +5,7 @@ import (
"net/http"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@ -13,6 +14,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
listcorev1 "k8s.io/client-go/listers/core/v1"
@ -20,6 +22,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
searchlisters "github.com/karmada-io/karmada/pkg/generated/listers/search/v1alpha1"
@ -66,7 +69,8 @@ func NewController(option NewControllerOption) (*Controller, error) {
secretLister := option.KubeFactory.Core().V1().Secrets().Lister()
clusterLister := option.KarmadaFactory.Cluster().V1alpha1().Clusters().Lister()
multiClusterStore := newMultiClusterStore(clusterLister, secretLister, option.RestMapper)
clientFactory := dynamicClientForClusterFunc(clusterLister, secretLister)
multiClusterStore := store.NewMultiClusterCache(clientFactory, option.RestMapper)
allPlugins, err := newPlugins(option, multiClusterStore)
if err != nil {
@ -244,3 +248,21 @@ func (ctl *Controller) Connect(ctx context.Context, proxyPath string, responder
h.ServeHTTP(rw, newReq)
}), nil
}
func dynamicClientForClusterFunc(clusterLister clusterlisters.ClusterLister,
secretLister listcorev1.SecretLister) func(string) (dynamic.Interface, error) {
clusterGetter := func(cluster string) (*clusterv1alpha1.Cluster, error) {
return clusterLister.Get(cluster)
}
secretGetter := func(namespace string, name string) (*corev1.Secret, error) {
return secretLister.Secrets(namespace).Get(name)
}
return func(clusterName string) (dynamic.Interface, error) {
clusterConfig, err := util.BuildClusterConfig(clusterName, clusterGetter, secretGetter)
if err != nil {
return nil, err
}
return dynamic.NewForConfig(clusterConfig)
}
}

View File

@ -47,7 +47,7 @@ type ClientOption struct {
// NewClusterClientSet returns a ClusterClient for the given member cluster.
func NewClusterClientSet(clusterName string, client client.Client, clientOption *ClientOption) (*ClusterClient, error) {
clusterConfig, err := buildClusterConfig(clusterName, client)
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client))
if err != nil {
return nil, err
}
@ -85,7 +85,7 @@ func NewClusterClientSetForAgent(clusterName string, client client.Client, clien
// NewClusterDynamicClientSet returns a dynamic client for the given member cluster.
func NewClusterDynamicClientSet(clusterName string, client client.Client) (*DynamicClusterClient, error) {
clusterConfig, err := buildClusterConfig(clusterName, client)
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client))
if err != nil {
return nil, err
}
@ -111,8 +111,11 @@ func NewClusterDynamicClientSetForAgent(clusterName string, client client.Client
return &clusterClientSet, nil
}
func buildClusterConfig(clusterName string, client client.Client) (*rest.Config, error) {
cluster, err := GetCluster(client, clusterName)
// BuildClusterConfig return rest config for member cluster.
func BuildClusterConfig(clusterName string,
clusterGetter func(string) (*clusterv1alpha1.Cluster, error),
secretGetter func(string, string) (*corev1.Secret, error)) (*rest.Config, error) {
cluster, err := clusterGetter(clusterName)
if err != nil {
return nil, err
}
@ -125,11 +128,8 @@ func buildClusterConfig(clusterName string, client client.Client) (*rest.Config,
return nil, fmt.Errorf("cluster %s does not have a secret", clusterName)
}
secretNamespace := cluster.Spec.SecretRef.Namespace
secretName := cluster.Spec.SecretRef.Name
secret := &corev1.Secret{}
if err := client.Get(context.TODO(), types.NamespacedName{Namespace: secretNamespace, Name: secretName}, secret); err != nil {
secret, err := secretGetter(cluster.Spec.SecretRef.Namespace, cluster.Spec.SecretRef.Name)
if err != nil {
return nil, err
}
@ -162,3 +162,17 @@ func buildClusterConfig(clusterName string, client client.Client) (*rest.Config,
return clusterConfig, nil
}
func clusterGetter(client client.Client) func(string) (*clusterv1alpha1.Cluster, error) {
return func(cluster string) (*clusterv1alpha1.Cluster, error) {
return GetCluster(client, cluster)
}
}
func secretGetter(client client.Client) func(string, string) (*corev1.Secret, error) {
return func(namespace string, name string) (*corev1.Secret, error) {
secret := &corev1.Secret{}
err := client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, secret)
return secret, err
}
}