From 38b5eabb4a3b156ec486a2eba2dcda8738e1f8da Mon Sep 17 00:00:00 2001 From: yingjinhui Date: Mon, 31 Oct 2022 19:01:50 +0800 Subject: [PATCH] clean up proxy repeating code Signed-off-by: yingjinhui --- pkg/registry/cluster/storage/proxy.go | 89 +------ pkg/registry/cluster/storage/proxy_test.go | 148 ++++++++++++ pkg/registry/cluster/storage/storage.go | 64 +---- .../framework/plugins/cluster/cluster.go | 20 +- .../framework/plugins/cluster/cluster_test.go | 3 +- .../framework/plugins/karmada/karmada_test.go | 4 +- pkg/util/proxy/proxy.go | 48 ++-- pkg/util/proxy/proxy_test.go | 222 ++++++++++++++++++ .../proxy => util}/testing/mock_responder.go | 0 9 files changed, 429 insertions(+), 169 deletions(-) create mode 100644 pkg/registry/cluster/storage/proxy_test.go create mode 100644 pkg/util/proxy/proxy_test.go rename pkg/{search/proxy => util}/testing/mock_responder.go (100%) diff --git a/pkg/registry/cluster/storage/proxy.go b/pkg/registry/cluster/storage/proxy.go index ea0bdb5ba..b79189203 100644 --- a/pkg/registry/cluster/storage/proxy.go +++ b/pkg/registry/cluster/storage/proxy.go @@ -2,31 +2,23 @@ package storage import ( "context" - "errors" "fmt" "net/http" - "net/url" - authenticationv1 "k8s.io/api/authentication/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/proxy" - "k8s.io/apiserver/pkg/authentication/user" - "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" - "k8s.io/apiserver/pkg/endpoints/request" - genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/kubernetes" clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" + "github.com/karmada-io/karmada/pkg/util/proxy" ) // ProxyREST implements the proxy subresource for a Cluster. type ProxyREST struct { - Store *genericregistry.Store - Redirector rest.Redirector - - kubeClient kubernetes.Interface + kubeClient kubernetes.Interface + clusterGetter func(ctx context.Context, name string) (*clusterapis.Cluster, error) } // Implement Connecter @@ -56,78 +48,13 @@ func (r *ProxyREST) Connect(ctx context.Context, id string, options runtime.Obje return nil, fmt.Errorf("invalid options object: %#v", options) } - location, transport, err := r.Redirector.ResourceLocation(ctx, id) + cluster, err := r.clusterGetter(ctx, id) if err != nil { return nil, err } - location.Path = proxyOpts.Path - impersonateToken, err := r.getImpersonateToken(ctx, id) - if err != nil { - return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", id, err) + secretGetter := func(ctx context.Context, namespace string, name string) (*corev1.Secret, error) { + return r.kubeClient.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{}) } - - return newProxyHandler(location, transport, impersonateToken, responder) -} - -func (r *ProxyREST) getImpersonateToken(ctx context.Context, clusterName string) (string, error) { - cluster, err := getCluster(ctx, r.Store, clusterName) - if err != nil { - return "", err - } - - if cluster.Spec.ImpersonatorSecretRef == nil { - return "", fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", clusterName) - } - - secret, err := r.kubeClient.CoreV1().Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(context.TODO(), - cluster.Spec.ImpersonatorSecretRef.Name, metav1.GetOptions{}) - if err != nil { - return "", err - } - - token, found := secret.Data[clusterapis.SecretTokenKey] - if !found { - return "", fmt.Errorf("the impresonate token of cluster %s is empty", clusterName) - } - return string(token), nil -} - -func newProxyHandler(location *url.URL, transport http.RoundTripper, impersonateToken string, responder rest.Responder) (http.Handler, error) { - return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - requester, exist := request.UserFrom(req.Context()) - if !exist { - responsewriters.InternalError(rw, req, errors.New("no user found for request")) - return - } - req.Header.Set(authenticationv1.ImpersonateUserHeader, requester.GetName()) - for _, group := range requester.GetGroups() { - if !skipGroup(group) { - req.Header.Add(authenticationv1.ImpersonateGroupHeader, group) - } - } - - req.Header.Set("Authorization", fmt.Sprintf("bearer %s", impersonateToken)) - - // Retain RawQuery in location because upgrading the request will use it. - // See https://github.com/karmada-io/karmada/issues/1618#issuecomment-1103793290 for more info. - location.RawQuery = req.URL.RawQuery - - handler := newThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder) - handler.ServeHTTP(rw, req) - }), nil -} - -func skipGroup(group string) bool { - switch group { - case user.AllAuthenticated, user.AllUnauthenticated: - return true - default: - return false - } -} - -func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler { - handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder)) - return handler + return proxy.ConnectCluster(ctx, cluster, proxyOpts.Path, secretGetter, responder) } diff --git a/pkg/registry/cluster/storage/proxy_test.go b/pkg/registry/cluster/storage/proxy_test.go new file mode 100644 index 000000000..6cc72b299 --- /dev/null +++ b/pkg/registry/cluster/storage/proxy_test.go @@ -0,0 +1,148 @@ +package storage + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "testing" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + + clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" + utiltest "github.com/karmada-io/karmada/pkg/util/testing" +) + +func TestProxyREST_Connect(t *testing.T) { + s := httptest.NewTLSServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.URL.Path == "/proxy" { + _, _ = io.WriteString(rw, "ok") + } else { + _, _ = io.WriteString(rw, "bad request: "+req.URL.Path) + } + })) + defer s.Close() + + type fields struct { + kubeClient kubernetes.Interface + clusterGetter func(ctx context.Context, name string) (*clusterapis.Cluster, error) + } + type args struct { + id string + options runtime.Object + } + tests := []struct { + name string + fields fields + args args + want string + wantErr bool + }{ + { + name: "options is invalid", + fields: fields{ + kubeClient: fake.NewSimpleClientset(&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"}, + Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")}, + }), + clusterGetter: func(_ context.Context, name string) (*clusterapis.Cluster, error) { + return &clusterapis.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: clusterapis.ClusterSpec{ + APIEndpoint: s.URL, + ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"}, + }, + }, nil + }, + }, + args: args{ + id: "cluster", + options: &corev1.Pod{}, + }, + wantErr: true, + want: "", + }, + { + name: "cluster not found", + fields: fields{ + kubeClient: fake.NewSimpleClientset(&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"}, + Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")}, + }), + clusterGetter: func(_ context.Context, name string) (*clusterapis.Cluster, error) { + return nil, apierrors.NewNotFound(clusterapis.Resource("clusters"), name) + }, + }, + args: args{ + id: "cluster", + options: &clusterapis.ClusterProxyOptions{Path: "/proxy"}, + }, + wantErr: true, + want: "", + }, + { + name: "proxy success", + fields: fields{ + kubeClient: fake.NewSimpleClientset(&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"}, + Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")}, + }), + clusterGetter: func(_ context.Context, name string) (*clusterapis.Cluster, error) { + return &clusterapis.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: clusterapis.ClusterSpec{ + APIEndpoint: s.URL, + ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"}, + }, + }, nil + }, + }, + args: args{ + id: "cluster", + options: &clusterapis.ClusterProxyOptions{Path: "/proxy"}, + }, + wantErr: false, + want: "ok", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req, err := http.NewRequestWithContext(request.WithUser(request.NewContext(), &user.DefaultInfo{}), "GET", "http://127.0.0.1/xxx", nil) + if err != nil { + t.Fatal(err) + } + resp := httptest.NewRecorder() + + r := &ProxyREST{ + kubeClient: tt.fields.kubeClient, + clusterGetter: tt.fields.clusterGetter, + } + h, err := r.Connect(req.Context(), tt.args.id, tt.args.options, utiltest.NewResponder(resp)) + if (err != nil) != tt.wantErr { + t.Errorf("Connect() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if err != nil { + return + } + + h.ServeHTTP(resp, req) + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Error(err) + return + } + if got := string(body); got != tt.want { + t.Errorf("Connect() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/registry/cluster/storage/storage.go b/pkg/registry/cluster/storage/storage.go index f3ab53caf..898158287 100644 --- a/pkg/registry/cluster/storage/storage.go +++ b/pkg/registry/cluster/storage/storage.go @@ -2,14 +2,12 @@ package storage import ( "context" - "crypto/tls" "fmt" "net/http" "net/url" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" @@ -21,6 +19,7 @@ import ( printersinternal "github.com/karmada-io/karmada/pkg/printers/internalversion" printerstorage "github.com/karmada-io/karmada/pkg/printers/storage" clusterregistry "github.com/karmada-io/karmada/pkg/registry/cluster" + "github.com/karmada-io/karmada/pkg/util/proxy" ) // ClusterStorage includes storage for Cluster and for all the subresources. @@ -62,9 +61,8 @@ func NewStorage(scheme *runtime.Scheme, kubeClient kubernetes.Interface, optsGet Cluster: clusterRest, Status: &StatusREST{&statusStore}, Proxy: &ProxyREST{ - Store: store, - Redirector: clusterRest, - kubeClient: kubeClient, + kubeClient: kubeClient, + clusterGetter: clusterRest.getCluster, }, }, nil } @@ -79,31 +77,16 @@ var _ = rest.Redirector(&REST{}) // ResourceLocation returns a URL to which one can send traffic for the specified cluster. func (r *REST) ResourceLocation(ctx context.Context, name string) (*url.URL, http.RoundTripper, error) { - cluster, err := getCluster(ctx, r, name) + cluster, err := r.getCluster(ctx, name) if err != nil { return nil, nil, err } - location, err := constructLocation(cluster) - if err != nil { - return nil, nil, err - } - - transport, err := createProxyTransport(cluster) - if err != nil { - return nil, nil, err - } - - return location, transport, nil + return proxy.Location(cluster) } -// ResourceGetter is an interface for retrieving resources by ResourceLocation. -type ResourceGetter interface { - Get(context.Context, string, *metav1.GetOptions) (runtime.Object, error) -} - -func getCluster(ctx context.Context, getter ResourceGetter, name string) (*clusterapis.Cluster, error) { - obj, err := getter.Get(ctx, name, &metav1.GetOptions{}) +func (r *REST) getCluster(ctx context.Context, name string) (*clusterapis.Cluster, error) { + obj, err := r.Get(ctx, name, &metav1.GetOptions{}) if err != nil { return nil, err } @@ -114,34 +97,9 @@ func getCluster(ctx context.Context, getter ResourceGetter, name string) (*clust return cluster, nil } -func constructLocation(cluster *clusterapis.Cluster) (*url.URL, error) { - if cluster.Spec.APIEndpoint == "" { - return nil, fmt.Errorf("API endpoint of cluster %s should not be empty", cluster.Name) - } - - uri, err := url.Parse(cluster.Spec.APIEndpoint) - if err != nil { - return nil, fmt.Errorf("failed to parse api endpoint %s: %v", cluster.Spec.APIEndpoint, err) - } - return uri, nil -} - -func createProxyTransport(cluster *clusterapis.Cluster) (*http.Transport, error) { - var proxyDialerFn utilnet.DialFunc - proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} // #nosec - trans := utilnet.SetTransportDefaults(&http.Transport{ - DialContext: proxyDialerFn, - TLSClientConfig: proxyTLSClientConfig, - }) - - if cluster.Spec.ProxyURL != "" { - proxy, err := url.Parse(cluster.Spec.ProxyURL) - if err != nil { - return nil, fmt.Errorf("failed to parse url of proxy url %s: %v", cluster.Spec.ProxyURL, err) - } - trans.Proxy = http.ProxyURL(proxy) - } - return trans, nil +// ResourceGetter is an interface for retrieving resources by ResourceLocation. +type ResourceGetter interface { + Get(context.Context, string, *metav1.GetOptions) (runtime.Object, error) } // StatusREST implements the REST endpoint for changing the status of a cluster. @@ -162,7 +120,7 @@ func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOp // Update alters the status subset of an object. func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { // We are explicitly setting forceAllowCreate to false in the call to the underlying storage because - // subresources should never allow create on update. + // subresources should never allow creating on update. return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options) } diff --git a/pkg/search/proxy/framework/plugins/cluster/cluster.go b/pkg/search/proxy/framework/plugins/cluster/cluster.go index 105feb2db..76e914d87 100644 --- a/pkg/search/proxy/framework/plugins/cluster/cluster.go +++ b/pkg/search/proxy/framework/plugins/cluster/cluster.go @@ -7,10 +7,13 @@ import ( "io/ioutil" "net/http" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" listcorev1 "k8s.io/client-go/listers/core/v1" + clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" "github.com/karmada-io/karmada/pkg/search/proxy/framework" pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime" @@ -70,7 +73,22 @@ func (c *Cluster) Connect(ctx context.Context, request framework.ProxyRequest) ( return nil, err } - h, err := proxy.ConnectCluster(ctx, c.clusterLister, c.secretLister, clusterName, request.ProxyPath, request.Responder) + cls, err := c.clusterLister.Get(clusterName) + if err != nil { + return nil, err + } + + cluster := &clusterapis.Cluster{} + err = clusterv1alpha1.Convert_v1alpha1_Cluster_To_cluster_Cluster(cls, cluster, nil) + if err != nil { + return nil, err + } + + secretGetter := func(ctx context.Context, namespace, name string) (*corev1.Secret, error) { + return c.secretLister.Secrets(namespace).Get(name) + } + + h, err := proxy.ConnectCluster(ctx, cluster, request.ProxyPath, secretGetter, request.Responder) if err != nil { return nil, err } diff --git a/pkg/search/proxy/framework/plugins/cluster/cluster_test.go b/pkg/search/proxy/framework/plugins/cluster/cluster_test.go index d6cea5a89..d728fda9b 100644 --- a/pkg/search/proxy/framework/plugins/cluster/cluster_test.go +++ b/pkg/search/proxy/framework/plugins/cluster/cluster_test.go @@ -33,6 +33,7 @@ import ( "github.com/karmada-io/karmada/pkg/search/proxy/framework" "github.com/karmada-io/karmada/pkg/search/proxy/store" proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing" + utiltest "github.com/karmada-io/karmada/pkg/util/testing" ) func TestModifyRequest(t *testing.T) { @@ -402,7 +403,7 @@ func Test_clusterProxy_connect(t *testing.T) { RequestInfo: tt.args.requestInfo, GroupVersionResource: proxytest.PodGVR, ProxyPath: "/proxy", - Responder: proxytest.NewResponder(response), + Responder: utiltest.NewResponder(response), HTTPReq: tt.args.request, }) if !proxytest.ErrorMessageEquals(err, tt.want.err) { diff --git a/pkg/search/proxy/framework/plugins/karmada/karmada_test.go b/pkg/search/proxy/framework/plugins/karmada/karmada_test.go index 1c60b0818..d5f72bc23 100644 --- a/pkg/search/proxy/framework/plugins/karmada/karmada_test.go +++ b/pkg/search/proxy/framework/plugins/karmada/karmada_test.go @@ -11,7 +11,7 @@ import ( "github.com/karmada-io/karmada/pkg/search/proxy/framework" pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime" - proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing" + utiltest "github.com/karmada-io/karmada/pkg/util/testing" ) func Test_karmadaProxy(t *testing.T) { @@ -76,7 +76,7 @@ func Test_karmadaProxy(t *testing.T) { response := httptest.NewRecorder() h, err := p.Connect(context.TODO(), framework.ProxyRequest{ ProxyPath: tt.args.path, - Responder: proxytest.NewResponder(response), + Responder: utiltest.NewResponder(response), }) if err != nil { t.Error(err) diff --git a/pkg/util/proxy/proxy.go b/pkg/util/proxy/proxy.go index 10094e2a3..d547016ee 100644 --- a/pkg/util/proxy/proxy.go +++ b/pkg/util/proxy/proxy.go @@ -17,47 +17,32 @@ import ( "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - listcorev1 "k8s.io/client-go/listers/core/v1" clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" - clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" ) -// todo: consider share logic with pkg/registry/cluster/storage/proxy.go:53 - // ConnectCluster returns a handler for proxy cluster. -func ConnectCluster(ctx context.Context, - clusterLister clusterlisters.ClusterLister, secretLister listcorev1.SecretLister, - clusterName string, proxyPath string, responder rest.Responder) (http.Handler, error) { - cluster, err := clusterLister.Get(clusterName) - if err != nil { - return nil, err - } - location, transport, err := Location(cluster.Name, cluster.Spec.APIEndpoint, cluster.Spec.ProxyURL) +func ConnectCluster(ctx context.Context, cluster *clusterapis.Cluster, proxyPath string, + secretGetter func(context.Context, string, string) (*corev1.Secret, error), responder rest.Responder) (http.Handler, error) { + location, transport, err := Location(cluster) if err != nil { return nil, err } + location.Path = path.Join(location.Path, proxyPath) - secretGetter := func(context.Context, string) (*corev1.Secret, error) { - if cluster.Spec.ImpersonatorSecretRef == nil { - return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name) - } - return secretLister.Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(cluster.Spec.ImpersonatorSecretRef.Name) + if cluster.Spec.ImpersonatorSecretRef == nil { + return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name) } - return connectCluster(ctx, cluster.Name, location, transport, responder, secretGetter) -} -func connectCluster(ctx context.Context, clusterName string, location *url.URL, transport http.RoundTripper, responder rest.Responder, - impersonateSecretGetter func(context.Context, string) (*corev1.Secret, error)) (http.Handler, error) { - secret, err := impersonateSecretGetter(ctx, clusterName) + secret, err := secretGetter(ctx, cluster.Spec.ImpersonatorSecretRef.Namespace, cluster.Spec.ImpersonatorSecretRef.Name) if err != nil { return nil, err } - impersonateToken, err := getImpersonateToken(clusterName, secret) + impersonateToken, err := getImpersonateToken(cluster.Name, secret) if err != nil { - return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", clusterName, err) + return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", cluster.Name, err) } return newProxyHandler(location, transport, impersonateToken, responder) @@ -70,13 +55,13 @@ func NewThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.Roun } // Location returns a URL to which one can send traffic for the specified cluster. -func Location(clusterName string, apiEndpoint string, proxyURL string) (*url.URL, http.RoundTripper, error) { - location, err := constructLocation(clusterName, apiEndpoint) +func Location(cluster *clusterapis.Cluster) (*url.URL, http.RoundTripper, error) { + location, err := constructLocation(cluster) if err != nil { return nil, nil, err } - transport, err := createProxyTransport(proxyURL) + transport, err := createProxyTransport(cluster) if err != nil { return nil, nil, err } @@ -84,9 +69,10 @@ func Location(clusterName string, apiEndpoint string, proxyURL string) (*url.URL return location, transport, nil } -func constructLocation(clusterName string, apiEndpoint string) (*url.URL, error) { +func constructLocation(cluster *clusterapis.Cluster) (*url.URL, error) { + apiEndpoint := cluster.Spec.APIEndpoint if apiEndpoint == "" { - return nil, fmt.Errorf("API endpoint of cluster %s should not be empty", clusterName) + return nil, fmt.Errorf("API endpoint of cluster %s should not be empty", cluster.GetName()) } uri, err := url.Parse(apiEndpoint) @@ -96,7 +82,7 @@ func constructLocation(clusterName string, apiEndpoint string) (*url.URL, error) return uri, nil } -func createProxyTransport(proxyURL string) (*http.Transport, error) { +func createProxyTransport(cluster *clusterapis.Cluster) (*http.Transport, error) { var proxyDialerFn utilnet.DialFunc proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} // #nosec trans := utilnet.SetTransportDefaults(&http.Transport{ @@ -104,7 +90,7 @@ func createProxyTransport(proxyURL string) (*http.Transport, error) { TLSClientConfig: proxyTLSClientConfig, }) - if proxyURL != "" { + if proxyURL := cluster.Spec.ProxyURL; proxyURL != "" { u, err := url.Parse(proxyURL) if err != nil { return nil, fmt.Errorf("failed to parse url of proxy url %s: %v", proxyURL, err) diff --git a/pkg/util/proxy/proxy_test.go b/pkg/util/proxy/proxy_test.go new file mode 100644 index 000000000..8e514a322 --- /dev/null +++ b/pkg/util/proxy/proxy_test.go @@ -0,0 +1,222 @@ +package proxy + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/request" + + clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" + utiltest "github.com/karmada-io/karmada/pkg/util/testing" +) + +func TestConnectCluster(t *testing.T) { + const ( + testToken = "token" + testGroup = "group" + testUser = "user" + ) + + s := httptest.NewTLSServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.URL.Path != "/proxy" || + req.Header.Get("Authorization") != "bearer "+testToken || + req.Header.Get("Impersonate-Group") != testGroup || + req.Header.Get("Impersonate-User") != testUser { + t.Errorf("bad request: %v, %v", req.URL.Path, req.Header) + return + } + fmt.Fprintf(rw, "ok") + })) + defer s.Close() + + type args struct { + ctx context.Context + cluster *clusterapis.Cluster + secretGetter func(context.Context, string, string) (*corev1.Secret, error) + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "apiEndpoint is empty", + args: args{ + cluster: &clusterapis.Cluster{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"}, + Spec: clusterapis.ClusterSpec{}, + }, + secretGetter: nil, + }, + wantErr: true, + want: "", + }, + { + name: "apiEndpoint is invalid", + args: args{ + cluster: &clusterapis.Cluster{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"}, + Spec: clusterapis.ClusterSpec{APIEndpoint: "h :/ invalid"}, + }, + secretGetter: nil, + }, + wantErr: true, + want: "", + }, + { + name: "ProxyURL is invalid", + args: args{ + cluster: &clusterapis.Cluster{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"}, + Spec: clusterapis.ClusterSpec{ + APIEndpoint: s.URL, + ProxyURL: "h :/ invalid", + }, + }, + secretGetter: nil, + }, + wantErr: true, + want: "", + }, + { + name: "ImpersonatorSecretRef is nil", + args: args{ + cluster: &clusterapis.Cluster{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"}, + Spec: clusterapis.ClusterSpec{ + APIEndpoint: s.URL, + ProxyURL: "http://proxy", + }, + }, + secretGetter: nil, + }, + wantErr: true, + want: "", + }, + { + name: "secret not found", + args: args{ + cluster: &clusterapis.Cluster{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"}, + Spec: clusterapis.ClusterSpec{ + APIEndpoint: s.URL, + ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"}, + }, + }, + secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) { + return nil, apierrors.NewNotFound(corev1.Resource("secrets"), name) + }, + }, + wantErr: true, + want: "", + }, + { + name: "SecretTokenKey not found", + args: args{ + cluster: &clusterapis.Cluster{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"}, + Spec: clusterapis.ClusterSpec{ + APIEndpoint: s.URL, + ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"}, + }, + }, + secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) { + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name}, + Data: map[string][]byte{}, + }, nil + }, + }, + wantErr: true, + want: "", + }, + { + name: "no user found for request", + args: args{ + ctx: context.TODO(), + cluster: &clusterapis.Cluster{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"}, + Spec: clusterapis.ClusterSpec{ + APIEndpoint: s.URL, + ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"}, + }, + }, + secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) { + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name}, + Data: map[string][]byte{clusterapis.SecretTokenKey: []byte(testToken)}, + }, nil + }, + }, + wantErr: false, + want: "Internal Server Error: \"\": no user found for request\n", + }, + { + name: "proxy success", + args: args{ + ctx: request.WithUser(request.NewContext(), &user.DefaultInfo{Name: testUser, Groups: []string{testGroup, user.AllAuthenticated, user.AllUnauthenticated}}), + cluster: &clusterapis.Cluster{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"}, + Spec: clusterapis.ClusterSpec{ + APIEndpoint: s.URL, + ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"}, + }, + }, + secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) { + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name}, + Data: map[string][]byte{clusterapis.SecretTokenKey: []byte(testToken)}, + }, nil + }, + }, + wantErr: false, + want: "ok", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := tt.args.ctx + if ctx == nil { + ctx = context.TODO() + } + req, err := http.NewRequestWithContext(ctx, "GET", "http://127.0.0.1/xxx", nil) + if err != nil { + t.Fatal(err) + } + + resp := httptest.NewRecorder() + + h, err := ConnectCluster(context.TODO(), tt.args.cluster, "proxy", tt.args.secretGetter, utiltest.NewResponder(resp)) + if (err != nil) != tt.wantErr { + t.Errorf("ConnectCluster() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err != nil { + return + } + + h.ServeHTTP(resp, req) + if t.Failed() { + return + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Error(err) + return + } + if got := string(body); got != tt.want { + t.Errorf("Connect() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/search/proxy/testing/mock_responder.go b/pkg/util/testing/mock_responder.go similarity index 100% rename from pkg/search/proxy/testing/mock_responder.go rename to pkg/util/testing/mock_responder.go