Merge pull request #2715 from ikaven1024/pr-cleanup-proxy
clean up proxy repeating code
This commit is contained in:
commit
f68da6d64e
|
@ -2,31 +2,23 @@ package storage
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
authenticationv1 "k8s.io/api/authentication/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/proxy"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
||||
"github.com/karmada-io/karmada/pkg/util/proxy"
|
||||
)
|
||||
|
||||
// ProxyREST implements the proxy subresource for a Cluster.
|
||||
type ProxyREST struct {
|
||||
Store *genericregistry.Store
|
||||
Redirector rest.Redirector
|
||||
|
||||
kubeClient kubernetes.Interface
|
||||
kubeClient kubernetes.Interface
|
||||
clusterGetter func(ctx context.Context, name string) (*clusterapis.Cluster, error)
|
||||
}
|
||||
|
||||
// Implement Connecter
|
||||
|
@ -56,78 +48,13 @@ func (r *ProxyREST) Connect(ctx context.Context, id string, options runtime.Obje
|
|||
return nil, fmt.Errorf("invalid options object: %#v", options)
|
||||
}
|
||||
|
||||
location, transport, err := r.Redirector.ResourceLocation(ctx, id)
|
||||
cluster, err := r.clusterGetter(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
location.Path = proxyOpts.Path
|
||||
|
||||
impersonateToken, err := r.getImpersonateToken(ctx, id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", id, err)
|
||||
secretGetter := func(ctx context.Context, namespace string, name string) (*corev1.Secret, error) {
|
||||
return r.kubeClient.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
return newProxyHandler(location, transport, impersonateToken, responder)
|
||||
}
|
||||
|
||||
func (r *ProxyREST) getImpersonateToken(ctx context.Context, clusterName string) (string, error) {
|
||||
cluster, err := getCluster(ctx, r.Store, clusterName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if cluster.Spec.ImpersonatorSecretRef == nil {
|
||||
return "", fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", clusterName)
|
||||
}
|
||||
|
||||
secret, err := r.kubeClient.CoreV1().Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(context.TODO(),
|
||||
cluster.Spec.ImpersonatorSecretRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
token, found := secret.Data[clusterapis.SecretTokenKey]
|
||||
if !found {
|
||||
return "", fmt.Errorf("the impresonate token of cluster %s is empty", clusterName)
|
||||
}
|
||||
return string(token), nil
|
||||
}
|
||||
|
||||
func newProxyHandler(location *url.URL, transport http.RoundTripper, impersonateToken string, responder rest.Responder) (http.Handler, error) {
|
||||
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
requester, exist := request.UserFrom(req.Context())
|
||||
if !exist {
|
||||
responsewriters.InternalError(rw, req, errors.New("no user found for request"))
|
||||
return
|
||||
}
|
||||
req.Header.Set(authenticationv1.ImpersonateUserHeader, requester.GetName())
|
||||
for _, group := range requester.GetGroups() {
|
||||
if !skipGroup(group) {
|
||||
req.Header.Add(authenticationv1.ImpersonateGroupHeader, group)
|
||||
}
|
||||
}
|
||||
|
||||
req.Header.Set("Authorization", fmt.Sprintf("bearer %s", impersonateToken))
|
||||
|
||||
// Retain RawQuery in location because upgrading the request will use it.
|
||||
// See https://github.com/karmada-io/karmada/issues/1618#issuecomment-1103793290 for more info.
|
||||
location.RawQuery = req.URL.RawQuery
|
||||
|
||||
handler := newThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder)
|
||||
handler.ServeHTTP(rw, req)
|
||||
}), nil
|
||||
}
|
||||
|
||||
func skipGroup(group string) bool {
|
||||
switch group {
|
||||
case user.AllAuthenticated, user.AllUnauthenticated:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
|
||||
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
|
||||
return handler
|
||||
return proxy.ConnectCluster(ctx, cluster, proxyOpts.Path, secretGetter, responder)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,148 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
|
||||
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
||||
utiltest "github.com/karmada-io/karmada/pkg/util/testing"
|
||||
)
|
||||
|
||||
func TestProxyREST_Connect(t *testing.T) {
|
||||
s := httptest.NewTLSServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
if req.URL.Path == "/proxy" {
|
||||
_, _ = io.WriteString(rw, "ok")
|
||||
} else {
|
||||
_, _ = io.WriteString(rw, "bad request: "+req.URL.Path)
|
||||
}
|
||||
}))
|
||||
defer s.Close()
|
||||
|
||||
type fields struct {
|
||||
kubeClient kubernetes.Interface
|
||||
clusterGetter func(ctx context.Context, name string) (*clusterapis.Cluster, error)
|
||||
}
|
||||
type args struct {
|
||||
id string
|
||||
options runtime.Object
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "options is invalid",
|
||||
fields: fields{
|
||||
kubeClient: fake.NewSimpleClientset(&corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
|
||||
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
|
||||
}),
|
||||
clusterGetter: func(_ context.Context, name string) (*clusterapis.Cluster, error) {
|
||||
return &clusterapis.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: name},
|
||||
Spec: clusterapis.ClusterSpec{
|
||||
APIEndpoint: s.URL,
|
||||
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
id: "cluster",
|
||||
options: &corev1.Pod{},
|
||||
},
|
||||
wantErr: true,
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "cluster not found",
|
||||
fields: fields{
|
||||
kubeClient: fake.NewSimpleClientset(&corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
|
||||
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
|
||||
}),
|
||||
clusterGetter: func(_ context.Context, name string) (*clusterapis.Cluster, error) {
|
||||
return nil, apierrors.NewNotFound(clusterapis.Resource("clusters"), name)
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
id: "cluster",
|
||||
options: &clusterapis.ClusterProxyOptions{Path: "/proxy"},
|
||||
},
|
||||
wantErr: true,
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "proxy success",
|
||||
fields: fields{
|
||||
kubeClient: fake.NewSimpleClientset(&corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
|
||||
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
|
||||
}),
|
||||
clusterGetter: func(_ context.Context, name string) (*clusterapis.Cluster, error) {
|
||||
return &clusterapis.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: name},
|
||||
Spec: clusterapis.ClusterSpec{
|
||||
APIEndpoint: s.URL,
|
||||
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
id: "cluster",
|
||||
options: &clusterapis.ClusterProxyOptions{Path: "/proxy"},
|
||||
},
|
||||
wantErr: false,
|
||||
want: "ok",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
req, err := http.NewRequestWithContext(request.WithUser(request.NewContext(), &user.DefaultInfo{}), "GET", "http://127.0.0.1/xxx", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp := httptest.NewRecorder()
|
||||
|
||||
r := &ProxyREST{
|
||||
kubeClient: tt.fields.kubeClient,
|
||||
clusterGetter: tt.fields.clusterGetter,
|
||||
}
|
||||
h, err := r.Connect(req.Context(), tt.args.id, tt.args.options, utiltest.NewResponder(resp))
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("Connect() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
h.ServeHTTP(resp, req)
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if got := string(body); got != tt.want {
|
||||
t.Errorf("Connect() got = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -2,14 +2,12 @@ package storage
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
|
@ -21,6 +19,7 @@ import (
|
|||
printersinternal "github.com/karmada-io/karmada/pkg/printers/internalversion"
|
||||
printerstorage "github.com/karmada-io/karmada/pkg/printers/storage"
|
||||
clusterregistry "github.com/karmada-io/karmada/pkg/registry/cluster"
|
||||
"github.com/karmada-io/karmada/pkg/util/proxy"
|
||||
)
|
||||
|
||||
// ClusterStorage includes storage for Cluster and for all the subresources.
|
||||
|
@ -62,9 +61,8 @@ func NewStorage(scheme *runtime.Scheme, kubeClient kubernetes.Interface, optsGet
|
|||
Cluster: clusterRest,
|
||||
Status: &StatusREST{&statusStore},
|
||||
Proxy: &ProxyREST{
|
||||
Store: store,
|
||||
Redirector: clusterRest,
|
||||
kubeClient: kubeClient,
|
||||
kubeClient: kubeClient,
|
||||
clusterGetter: clusterRest.getCluster,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
@ -79,31 +77,16 @@ var _ = rest.Redirector(&REST{})
|
|||
|
||||
// ResourceLocation returns a URL to which one can send traffic for the specified cluster.
|
||||
func (r *REST) ResourceLocation(ctx context.Context, name string) (*url.URL, http.RoundTripper, error) {
|
||||
cluster, err := getCluster(ctx, r, name)
|
||||
cluster, err := r.getCluster(ctx, name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
location, err := constructLocation(cluster)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
transport, err := createProxyTransport(cluster)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return location, transport, nil
|
||||
return proxy.Location(cluster)
|
||||
}
|
||||
|
||||
// ResourceGetter is an interface for retrieving resources by ResourceLocation.
|
||||
type ResourceGetter interface {
|
||||
Get(context.Context, string, *metav1.GetOptions) (runtime.Object, error)
|
||||
}
|
||||
|
||||
func getCluster(ctx context.Context, getter ResourceGetter, name string) (*clusterapis.Cluster, error) {
|
||||
obj, err := getter.Get(ctx, name, &metav1.GetOptions{})
|
||||
func (r *REST) getCluster(ctx context.Context, name string) (*clusterapis.Cluster, error) {
|
||||
obj, err := r.Get(ctx, name, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -114,34 +97,9 @@ func getCluster(ctx context.Context, getter ResourceGetter, name string) (*clust
|
|||
return cluster, nil
|
||||
}
|
||||
|
||||
func constructLocation(cluster *clusterapis.Cluster) (*url.URL, error) {
|
||||
if cluster.Spec.APIEndpoint == "" {
|
||||
return nil, fmt.Errorf("API endpoint of cluster %s should not be empty", cluster.Name)
|
||||
}
|
||||
|
||||
uri, err := url.Parse(cluster.Spec.APIEndpoint)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse api endpoint %s: %v", cluster.Spec.APIEndpoint, err)
|
||||
}
|
||||
return uri, nil
|
||||
}
|
||||
|
||||
func createProxyTransport(cluster *clusterapis.Cluster) (*http.Transport, error) {
|
||||
var proxyDialerFn utilnet.DialFunc
|
||||
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} // #nosec
|
||||
trans := utilnet.SetTransportDefaults(&http.Transport{
|
||||
DialContext: proxyDialerFn,
|
||||
TLSClientConfig: proxyTLSClientConfig,
|
||||
})
|
||||
|
||||
if cluster.Spec.ProxyURL != "" {
|
||||
proxy, err := url.Parse(cluster.Spec.ProxyURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse url of proxy url %s: %v", cluster.Spec.ProxyURL, err)
|
||||
}
|
||||
trans.Proxy = http.ProxyURL(proxy)
|
||||
}
|
||||
return trans, nil
|
||||
// ResourceGetter is an interface for retrieving resources by ResourceLocation.
|
||||
type ResourceGetter interface {
|
||||
Get(context.Context, string, *metav1.GetOptions) (runtime.Object, error)
|
||||
}
|
||||
|
||||
// StatusREST implements the REST endpoint for changing the status of a cluster.
|
||||
|
@ -162,7 +120,7 @@ func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOp
|
|||
// Update alters the status subset of an object.
|
||||
func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
|
||||
// We are explicitly setting forceAllowCreate to false in the call to the underlying storage because
|
||||
// subresources should never allow create on update.
|
||||
// subresources should never allow creating on update.
|
||||
return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,10 +7,13 @@ import (
|
|||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
listcorev1 "k8s.io/client-go/listers/core/v1"
|
||||
|
||||
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
|
||||
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
|
||||
|
@ -70,7 +73,22 @@ func (c *Cluster) Connect(ctx context.Context, request framework.ProxyRequest) (
|
|||
return nil, err
|
||||
}
|
||||
|
||||
h, err := proxy.ConnectCluster(ctx, c.clusterLister, c.secretLister, clusterName, request.ProxyPath, request.Responder)
|
||||
cls, err := c.clusterLister.Get(clusterName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cluster := &clusterapis.Cluster{}
|
||||
err = clusterv1alpha1.Convert_v1alpha1_Cluster_To_cluster_Cluster(cls, cluster, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
secretGetter := func(ctx context.Context, namespace, name string) (*corev1.Secret, error) {
|
||||
return c.secretLister.Secrets(namespace).Get(name)
|
||||
}
|
||||
|
||||
h, err := proxy.ConnectCluster(ctx, cluster, request.ProxyPath, secretGetter, request.Responder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
|
||||
"github.com/karmada-io/karmada/pkg/search/proxy/store"
|
||||
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
|
||||
utiltest "github.com/karmada-io/karmada/pkg/util/testing"
|
||||
)
|
||||
|
||||
func TestModifyRequest(t *testing.T) {
|
||||
|
@ -402,7 +403,7 @@ func Test_clusterProxy_connect(t *testing.T) {
|
|||
RequestInfo: tt.args.requestInfo,
|
||||
GroupVersionResource: proxytest.PodGVR,
|
||||
ProxyPath: "/proxy",
|
||||
Responder: proxytest.NewResponder(response),
|
||||
Responder: utiltest.NewResponder(response),
|
||||
HTTPReq: tt.args.request,
|
||||
})
|
||||
if !proxytest.ErrorMessageEquals(err, tt.want.err) {
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
|
||||
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
|
||||
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
|
||||
utiltest "github.com/karmada-io/karmada/pkg/util/testing"
|
||||
)
|
||||
|
||||
func Test_karmadaProxy(t *testing.T) {
|
||||
|
@ -76,7 +76,7 @@ func Test_karmadaProxy(t *testing.T) {
|
|||
response := httptest.NewRecorder()
|
||||
h, err := p.Connect(context.TODO(), framework.ProxyRequest{
|
||||
ProxyPath: tt.args.path,
|
||||
Responder: proxytest.NewResponder(response),
|
||||
Responder: utiltest.NewResponder(response),
|
||||
})
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
|
|
@ -17,47 +17,32 @@ import (
|
|||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
listcorev1 "k8s.io/client-go/listers/core/v1"
|
||||
|
||||
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
||||
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
|
||||
)
|
||||
|
||||
// todo: consider share logic with pkg/registry/cluster/storage/proxy.go:53
|
||||
|
||||
// ConnectCluster returns a handler for proxy cluster.
|
||||
func ConnectCluster(ctx context.Context,
|
||||
clusterLister clusterlisters.ClusterLister, secretLister listcorev1.SecretLister,
|
||||
clusterName string, proxyPath string, responder rest.Responder) (http.Handler, error) {
|
||||
cluster, err := clusterLister.Get(clusterName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
location, transport, err := Location(cluster.Name, cluster.Spec.APIEndpoint, cluster.Spec.ProxyURL)
|
||||
func ConnectCluster(ctx context.Context, cluster *clusterapis.Cluster, proxyPath string,
|
||||
secretGetter func(context.Context, string, string) (*corev1.Secret, error), responder rest.Responder) (http.Handler, error) {
|
||||
location, transport, err := Location(cluster)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
location.Path = path.Join(location.Path, proxyPath)
|
||||
|
||||
secretGetter := func(context.Context, string) (*corev1.Secret, error) {
|
||||
if cluster.Spec.ImpersonatorSecretRef == nil {
|
||||
return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name)
|
||||
}
|
||||
return secretLister.Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(cluster.Spec.ImpersonatorSecretRef.Name)
|
||||
if cluster.Spec.ImpersonatorSecretRef == nil {
|
||||
return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name)
|
||||
}
|
||||
return connectCluster(ctx, cluster.Name, location, transport, responder, secretGetter)
|
||||
}
|
||||
|
||||
func connectCluster(ctx context.Context, clusterName string, location *url.URL, transport http.RoundTripper, responder rest.Responder,
|
||||
impersonateSecretGetter func(context.Context, string) (*corev1.Secret, error)) (http.Handler, error) {
|
||||
secret, err := impersonateSecretGetter(ctx, clusterName)
|
||||
secret, err := secretGetter(ctx, cluster.Spec.ImpersonatorSecretRef.Namespace, cluster.Spec.ImpersonatorSecretRef.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
impersonateToken, err := getImpersonateToken(clusterName, secret)
|
||||
impersonateToken, err := getImpersonateToken(cluster.Name, secret)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", clusterName, err)
|
||||
return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", cluster.Name, err)
|
||||
}
|
||||
|
||||
return newProxyHandler(location, transport, impersonateToken, responder)
|
||||
|
@ -70,13 +55,13 @@ func NewThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.Roun
|
|||
}
|
||||
|
||||
// Location returns a URL to which one can send traffic for the specified cluster.
|
||||
func Location(clusterName string, apiEndpoint string, proxyURL string) (*url.URL, http.RoundTripper, error) {
|
||||
location, err := constructLocation(clusterName, apiEndpoint)
|
||||
func Location(cluster *clusterapis.Cluster) (*url.URL, http.RoundTripper, error) {
|
||||
location, err := constructLocation(cluster)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
transport, err := createProxyTransport(proxyURL)
|
||||
transport, err := createProxyTransport(cluster)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -84,9 +69,10 @@ func Location(clusterName string, apiEndpoint string, proxyURL string) (*url.URL
|
|||
return location, transport, nil
|
||||
}
|
||||
|
||||
func constructLocation(clusterName string, apiEndpoint string) (*url.URL, error) {
|
||||
func constructLocation(cluster *clusterapis.Cluster) (*url.URL, error) {
|
||||
apiEndpoint := cluster.Spec.APIEndpoint
|
||||
if apiEndpoint == "" {
|
||||
return nil, fmt.Errorf("API endpoint of cluster %s should not be empty", clusterName)
|
||||
return nil, fmt.Errorf("API endpoint of cluster %s should not be empty", cluster.GetName())
|
||||
}
|
||||
|
||||
uri, err := url.Parse(apiEndpoint)
|
||||
|
@ -96,7 +82,7 @@ func constructLocation(clusterName string, apiEndpoint string) (*url.URL, error)
|
|||
return uri, nil
|
||||
}
|
||||
|
||||
func createProxyTransport(proxyURL string) (*http.Transport, error) {
|
||||
func createProxyTransport(cluster *clusterapis.Cluster) (*http.Transport, error) {
|
||||
var proxyDialerFn utilnet.DialFunc
|
||||
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} // #nosec
|
||||
trans := utilnet.SetTransportDefaults(&http.Transport{
|
||||
|
@ -104,7 +90,7 @@ func createProxyTransport(proxyURL string) (*http.Transport, error) {
|
|||
TLSClientConfig: proxyTLSClientConfig,
|
||||
})
|
||||
|
||||
if proxyURL != "" {
|
||||
if proxyURL := cluster.Spec.ProxyURL; proxyURL != "" {
|
||||
u, err := url.Parse(proxyURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse url of proxy url %s: %v", proxyURL, err)
|
||||
|
|
|
@ -0,0 +1,222 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
|
||||
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
||||
utiltest "github.com/karmada-io/karmada/pkg/util/testing"
|
||||
)
|
||||
|
||||
func TestConnectCluster(t *testing.T) {
|
||||
const (
|
||||
testToken = "token"
|
||||
testGroup = "group"
|
||||
testUser = "user"
|
||||
)
|
||||
|
||||
s := httptest.NewTLSServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
if req.URL.Path != "/proxy" ||
|
||||
req.Header.Get("Authorization") != "bearer "+testToken ||
|
||||
req.Header.Get("Impersonate-Group") != testGroup ||
|
||||
req.Header.Get("Impersonate-User") != testUser {
|
||||
t.Errorf("bad request: %v, %v", req.URL.Path, req.Header)
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(rw, "ok")
|
||||
}))
|
||||
defer s.Close()
|
||||
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
cluster *clusterapis.Cluster
|
||||
secretGetter func(context.Context, string, string) (*corev1.Secret, error)
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "apiEndpoint is empty",
|
||||
args: args{
|
||||
cluster: &clusterapis.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||
Spec: clusterapis.ClusterSpec{},
|
||||
},
|
||||
secretGetter: nil,
|
||||
},
|
||||
wantErr: true,
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "apiEndpoint is invalid",
|
||||
args: args{
|
||||
cluster: &clusterapis.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||
Spec: clusterapis.ClusterSpec{APIEndpoint: "h :/ invalid"},
|
||||
},
|
||||
secretGetter: nil,
|
||||
},
|
||||
wantErr: true,
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "ProxyURL is invalid",
|
||||
args: args{
|
||||
cluster: &clusterapis.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||
Spec: clusterapis.ClusterSpec{
|
||||
APIEndpoint: s.URL,
|
||||
ProxyURL: "h :/ invalid",
|
||||
},
|
||||
},
|
||||
secretGetter: nil,
|
||||
},
|
||||
wantErr: true,
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "ImpersonatorSecretRef is nil",
|
||||
args: args{
|
||||
cluster: &clusterapis.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||
Spec: clusterapis.ClusterSpec{
|
||||
APIEndpoint: s.URL,
|
||||
ProxyURL: "http://proxy",
|
||||
},
|
||||
},
|
||||
secretGetter: nil,
|
||||
},
|
||||
wantErr: true,
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "secret not found",
|
||||
args: args{
|
||||
cluster: &clusterapis.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||
Spec: clusterapis.ClusterSpec{
|
||||
APIEndpoint: s.URL,
|
||||
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||
},
|
||||
},
|
||||
secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) {
|
||||
return nil, apierrors.NewNotFound(corev1.Resource("secrets"), name)
|
||||
},
|
||||
},
|
||||
wantErr: true,
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "SecretTokenKey not found",
|
||||
args: args{
|
||||
cluster: &clusterapis.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||
Spec: clusterapis.ClusterSpec{
|
||||
APIEndpoint: s.URL,
|
||||
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||
},
|
||||
},
|
||||
secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) {
|
||||
return &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name},
|
||||
Data: map[string][]byte{},
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
wantErr: true,
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "no user found for request",
|
||||
args: args{
|
||||
ctx: context.TODO(),
|
||||
cluster: &clusterapis.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||
Spec: clusterapis.ClusterSpec{
|
||||
APIEndpoint: s.URL,
|
||||
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||
},
|
||||
},
|
||||
secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) {
|
||||
return &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name},
|
||||
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte(testToken)},
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
wantErr: false,
|
||||
want: "Internal Server Error: \"\": no user found for request\n",
|
||||
},
|
||||
{
|
||||
name: "proxy success",
|
||||
args: args{
|
||||
ctx: request.WithUser(request.NewContext(), &user.DefaultInfo{Name: testUser, Groups: []string{testGroup, user.AllAuthenticated, user.AllUnauthenticated}}),
|
||||
cluster: &clusterapis.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||
Spec: clusterapis.ClusterSpec{
|
||||
APIEndpoint: s.URL,
|
||||
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||
},
|
||||
},
|
||||
secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) {
|
||||
return &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name},
|
||||
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte(testToken)},
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
wantErr: false,
|
||||
want: "ok",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := tt.args.ctx
|
||||
if ctx == nil {
|
||||
ctx = context.TODO()
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", "http://127.0.0.1/xxx", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
|
||||
h, err := ConnectCluster(context.TODO(), tt.args.cluster, "proxy", tt.args.secretGetter, utiltest.NewResponder(resp))
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("ConnectCluster() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
h.ServeHTTP(resp, req)
|
||||
if t.Failed() {
|
||||
return
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if got := string(body); got != tt.want {
|
||||
t.Errorf("Connect() got = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue