clean up proxy repeating code
Signed-off-by: yingjinhui <yingjinhui@didiglobal.com>
This commit is contained in:
parent
dc8e5d20fe
commit
38b5eabb4a
|
@ -2,31 +2,23 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
|
||||||
|
|
||||||
authenticationv1 "k8s.io/api/authentication/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/proxy"
|
|
||||||
"k8s.io/apiserver/pkg/authentication/user"
|
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
|
||||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
|
||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
|
|
||||||
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
||||||
|
"github.com/karmada-io/karmada/pkg/util/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ProxyREST implements the proxy subresource for a Cluster.
|
// ProxyREST implements the proxy subresource for a Cluster.
|
||||||
type ProxyREST struct {
|
type ProxyREST struct {
|
||||||
Store *genericregistry.Store
|
|
||||||
Redirector rest.Redirector
|
|
||||||
|
|
||||||
kubeClient kubernetes.Interface
|
kubeClient kubernetes.Interface
|
||||||
|
clusterGetter func(ctx context.Context, name string) (*clusterapis.Cluster, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implement Connecter
|
// Implement Connecter
|
||||||
|
@ -56,78 +48,13 @@ func (r *ProxyREST) Connect(ctx context.Context, id string, options runtime.Obje
|
||||||
return nil, fmt.Errorf("invalid options object: %#v", options)
|
return nil, fmt.Errorf("invalid options object: %#v", options)
|
||||||
}
|
}
|
||||||
|
|
||||||
location, transport, err := r.Redirector.ResourceLocation(ctx, id)
|
cluster, err := r.clusterGetter(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
location.Path = proxyOpts.Path
|
|
||||||
|
|
||||||
impersonateToken, err := r.getImpersonateToken(ctx, id)
|
secretGetter := func(ctx context.Context, namespace string, name string) (*corev1.Secret, error) {
|
||||||
if err != nil {
|
return r.kubeClient.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||||
return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", id, err)
|
|
||||||
}
|
}
|
||||||
|
return proxy.ConnectCluster(ctx, cluster, proxyOpts.Path, secretGetter, responder)
|
||||||
return newProxyHandler(location, transport, impersonateToken, responder)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ProxyREST) getImpersonateToken(ctx context.Context, clusterName string) (string, error) {
|
|
||||||
cluster, err := getCluster(ctx, r.Store, clusterName)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
if cluster.Spec.ImpersonatorSecretRef == nil {
|
|
||||||
return "", fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", clusterName)
|
|
||||||
}
|
|
||||||
|
|
||||||
secret, err := r.kubeClient.CoreV1().Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(context.TODO(),
|
|
||||||
cluster.Spec.ImpersonatorSecretRef.Name, metav1.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
token, found := secret.Data[clusterapis.SecretTokenKey]
|
|
||||||
if !found {
|
|
||||||
return "", fmt.Errorf("the impresonate token of cluster %s is empty", clusterName)
|
|
||||||
}
|
|
||||||
return string(token), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func newProxyHandler(location *url.URL, transport http.RoundTripper, impersonateToken string, responder rest.Responder) (http.Handler, error) {
|
|
||||||
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
|
||||||
requester, exist := request.UserFrom(req.Context())
|
|
||||||
if !exist {
|
|
||||||
responsewriters.InternalError(rw, req, errors.New("no user found for request"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
req.Header.Set(authenticationv1.ImpersonateUserHeader, requester.GetName())
|
|
||||||
for _, group := range requester.GetGroups() {
|
|
||||||
if !skipGroup(group) {
|
|
||||||
req.Header.Add(authenticationv1.ImpersonateGroupHeader, group)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
req.Header.Set("Authorization", fmt.Sprintf("bearer %s", impersonateToken))
|
|
||||||
|
|
||||||
// Retain RawQuery in location because upgrading the request will use it.
|
|
||||||
// See https://github.com/karmada-io/karmada/issues/1618#issuecomment-1103793290 for more info.
|
|
||||||
location.RawQuery = req.URL.RawQuery
|
|
||||||
|
|
||||||
handler := newThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder)
|
|
||||||
handler.ServeHTTP(rw, req)
|
|
||||||
}), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func skipGroup(group string) bool {
|
|
||||||
switch group {
|
|
||||||
case user.AllAuthenticated, user.AllUnauthenticated:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
|
|
||||||
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
|
|
||||||
return handler
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,148 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
|
||||||
|
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
||||||
|
utiltest "github.com/karmada-io/karmada/pkg/util/testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestProxyREST_Connect(t *testing.T) {
|
||||||
|
s := httptest.NewTLSServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
if req.URL.Path == "/proxy" {
|
||||||
|
_, _ = io.WriteString(rw, "ok")
|
||||||
|
} else {
|
||||||
|
_, _ = io.WriteString(rw, "bad request: "+req.URL.Path)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
type fields struct {
|
||||||
|
kubeClient kubernetes.Interface
|
||||||
|
clusterGetter func(ctx context.Context, name string) (*clusterapis.Cluster, error)
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
id string
|
||||||
|
options runtime.Object
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
want string
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "options is invalid",
|
||||||
|
fields: fields{
|
||||||
|
kubeClient: fake.NewSimpleClientset(&corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
|
||||||
|
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
|
||||||
|
}),
|
||||||
|
clusterGetter: func(_ context.Context, name string) (*clusterapis.Cluster, error) {
|
||||||
|
return &clusterapis.Cluster{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: name},
|
||||||
|
Spec: clusterapis.ClusterSpec{
|
||||||
|
APIEndpoint: s.URL,
|
||||||
|
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
id: "cluster",
|
||||||
|
options: &corev1.Pod{},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
want: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cluster not found",
|
||||||
|
fields: fields{
|
||||||
|
kubeClient: fake.NewSimpleClientset(&corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
|
||||||
|
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
|
||||||
|
}),
|
||||||
|
clusterGetter: func(_ context.Context, name string) (*clusterapis.Cluster, error) {
|
||||||
|
return nil, apierrors.NewNotFound(clusterapis.Resource("clusters"), name)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
id: "cluster",
|
||||||
|
options: &clusterapis.ClusterProxyOptions{Path: "/proxy"},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
want: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "proxy success",
|
||||||
|
fields: fields{
|
||||||
|
kubeClient: fake.NewSimpleClientset(&corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
|
||||||
|
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
|
||||||
|
}),
|
||||||
|
clusterGetter: func(_ context.Context, name string) (*clusterapis.Cluster, error) {
|
||||||
|
return &clusterapis.Cluster{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: name},
|
||||||
|
Spec: clusterapis.ClusterSpec{
|
||||||
|
APIEndpoint: s.URL,
|
||||||
|
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
id: "cluster",
|
||||||
|
options: &clusterapis.ClusterProxyOptions{Path: "/proxy"},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
want: "ok",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
req, err := http.NewRequestWithContext(request.WithUser(request.NewContext(), &user.DefaultInfo{}), "GET", "http://127.0.0.1/xxx", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
|
||||||
|
r := &ProxyREST{
|
||||||
|
kubeClient: tt.fields.kubeClient,
|
||||||
|
clusterGetter: tt.fields.clusterGetter,
|
||||||
|
}
|
||||||
|
h, err := r.Connect(req.Context(), tt.args.id, tt.args.options, utiltest.NewResponder(resp))
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("Connect() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.ServeHTTP(resp, req)
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if got := string(body); got != tt.want {
|
||||||
|
t.Errorf("Connect() got = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,14 +2,12 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
|
||||||
"k8s.io/apiserver/pkg/registry/generic"
|
"k8s.io/apiserver/pkg/registry/generic"
|
||||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
|
@ -21,6 +19,7 @@ import (
|
||||||
printersinternal "github.com/karmada-io/karmada/pkg/printers/internalversion"
|
printersinternal "github.com/karmada-io/karmada/pkg/printers/internalversion"
|
||||||
printerstorage "github.com/karmada-io/karmada/pkg/printers/storage"
|
printerstorage "github.com/karmada-io/karmada/pkg/printers/storage"
|
||||||
clusterregistry "github.com/karmada-io/karmada/pkg/registry/cluster"
|
clusterregistry "github.com/karmada-io/karmada/pkg/registry/cluster"
|
||||||
|
"github.com/karmada-io/karmada/pkg/util/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ClusterStorage includes storage for Cluster and for all the subresources.
|
// ClusterStorage includes storage for Cluster and for all the subresources.
|
||||||
|
@ -62,9 +61,8 @@ func NewStorage(scheme *runtime.Scheme, kubeClient kubernetes.Interface, optsGet
|
||||||
Cluster: clusterRest,
|
Cluster: clusterRest,
|
||||||
Status: &StatusREST{&statusStore},
|
Status: &StatusREST{&statusStore},
|
||||||
Proxy: &ProxyREST{
|
Proxy: &ProxyREST{
|
||||||
Store: store,
|
|
||||||
Redirector: clusterRest,
|
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
|
clusterGetter: clusterRest.getCluster,
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -79,31 +77,16 @@ var _ = rest.Redirector(&REST{})
|
||||||
|
|
||||||
// ResourceLocation returns a URL to which one can send traffic for the specified cluster.
|
// ResourceLocation returns a URL to which one can send traffic for the specified cluster.
|
||||||
func (r *REST) ResourceLocation(ctx context.Context, name string) (*url.URL, http.RoundTripper, error) {
|
func (r *REST) ResourceLocation(ctx context.Context, name string) (*url.URL, http.RoundTripper, error) {
|
||||||
cluster, err := getCluster(ctx, r, name)
|
cluster, err := r.getCluster(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
location, err := constructLocation(cluster)
|
return proxy.Location(cluster)
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
transport, err := createProxyTransport(cluster)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return location, transport, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResourceGetter is an interface for retrieving resources by ResourceLocation.
|
func (r *REST) getCluster(ctx context.Context, name string) (*clusterapis.Cluster, error) {
|
||||||
type ResourceGetter interface {
|
obj, err := r.Get(ctx, name, &metav1.GetOptions{})
|
||||||
Get(context.Context, string, *metav1.GetOptions) (runtime.Object, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getCluster(ctx context.Context, getter ResourceGetter, name string) (*clusterapis.Cluster, error) {
|
|
||||||
obj, err := getter.Get(ctx, name, &metav1.GetOptions{})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -114,34 +97,9 @@ func getCluster(ctx context.Context, getter ResourceGetter, name string) (*clust
|
||||||
return cluster, nil
|
return cluster, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func constructLocation(cluster *clusterapis.Cluster) (*url.URL, error) {
|
// ResourceGetter is an interface for retrieving resources by ResourceLocation.
|
||||||
if cluster.Spec.APIEndpoint == "" {
|
type ResourceGetter interface {
|
||||||
return nil, fmt.Errorf("API endpoint of cluster %s should not be empty", cluster.Name)
|
Get(context.Context, string, *metav1.GetOptions) (runtime.Object, error)
|
||||||
}
|
|
||||||
|
|
||||||
uri, err := url.Parse(cluster.Spec.APIEndpoint)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse api endpoint %s: %v", cluster.Spec.APIEndpoint, err)
|
|
||||||
}
|
|
||||||
return uri, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func createProxyTransport(cluster *clusterapis.Cluster) (*http.Transport, error) {
|
|
||||||
var proxyDialerFn utilnet.DialFunc
|
|
||||||
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} // #nosec
|
|
||||||
trans := utilnet.SetTransportDefaults(&http.Transport{
|
|
||||||
DialContext: proxyDialerFn,
|
|
||||||
TLSClientConfig: proxyTLSClientConfig,
|
|
||||||
})
|
|
||||||
|
|
||||||
if cluster.Spec.ProxyURL != "" {
|
|
||||||
proxy, err := url.Parse(cluster.Spec.ProxyURL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse url of proxy url %s: %v", cluster.Spec.ProxyURL, err)
|
|
||||||
}
|
|
||||||
trans.Proxy = http.ProxyURL(proxy)
|
|
||||||
}
|
|
||||||
return trans, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// StatusREST implements the REST endpoint for changing the status of a cluster.
|
// StatusREST implements the REST endpoint for changing the status of a cluster.
|
||||||
|
@ -162,7 +120,7 @@ func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOp
|
||||||
// Update alters the status subset of an object.
|
// Update alters the status subset of an object.
|
||||||
func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
|
func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
|
||||||
// We are explicitly setting forceAllowCreate to false in the call to the underlying storage because
|
// We are explicitly setting forceAllowCreate to false in the call to the underlying storage because
|
||||||
// subresources should never allow create on update.
|
// subresources should never allow creating on update.
|
||||||
return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
|
return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,10 +7,13 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
listcorev1 "k8s.io/client-go/listers/core/v1"
|
listcorev1 "k8s.io/client-go/listers/core/v1"
|
||||||
|
|
||||||
|
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
||||||
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||||
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
|
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
|
||||||
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
|
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
|
||||||
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
|
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
|
||||||
|
@ -70,7 +73,22 @@ func (c *Cluster) Connect(ctx context.Context, request framework.ProxyRequest) (
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
h, err := proxy.ConnectCluster(ctx, c.clusterLister, c.secretLister, clusterName, request.ProxyPath, request.Responder)
|
cls, err := c.clusterLister.Get(clusterName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster := &clusterapis.Cluster{}
|
||||||
|
err = clusterv1alpha1.Convert_v1alpha1_Cluster_To_cluster_Cluster(cls, cluster, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
secretGetter := func(ctx context.Context, namespace, name string) (*corev1.Secret, error) {
|
||||||
|
return c.secretLister.Secrets(namespace).Get(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
h, err := proxy.ConnectCluster(ctx, cluster, request.ProxyPath, secretGetter, request.Responder)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import (
|
||||||
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
|
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
|
||||||
"github.com/karmada-io/karmada/pkg/search/proxy/store"
|
"github.com/karmada-io/karmada/pkg/search/proxy/store"
|
||||||
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
|
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
|
||||||
|
utiltest "github.com/karmada-io/karmada/pkg/util/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestModifyRequest(t *testing.T) {
|
func TestModifyRequest(t *testing.T) {
|
||||||
|
@ -402,7 +403,7 @@ func Test_clusterProxy_connect(t *testing.T) {
|
||||||
RequestInfo: tt.args.requestInfo,
|
RequestInfo: tt.args.requestInfo,
|
||||||
GroupVersionResource: proxytest.PodGVR,
|
GroupVersionResource: proxytest.PodGVR,
|
||||||
ProxyPath: "/proxy",
|
ProxyPath: "/proxy",
|
||||||
Responder: proxytest.NewResponder(response),
|
Responder: utiltest.NewResponder(response),
|
||||||
HTTPReq: tt.args.request,
|
HTTPReq: tt.args.request,
|
||||||
})
|
})
|
||||||
if !proxytest.ErrorMessageEquals(err, tt.want.err) {
|
if !proxytest.ErrorMessageEquals(err, tt.want.err) {
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
|
|
||||||
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
|
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
|
||||||
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
|
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
|
||||||
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
|
utiltest "github.com/karmada-io/karmada/pkg/util/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_karmadaProxy(t *testing.T) {
|
func Test_karmadaProxy(t *testing.T) {
|
||||||
|
@ -76,7 +76,7 @@ func Test_karmadaProxy(t *testing.T) {
|
||||||
response := httptest.NewRecorder()
|
response := httptest.NewRecorder()
|
||||||
h, err := p.Connect(context.TODO(), framework.ProxyRequest{
|
h, err := p.Connect(context.TODO(), framework.ProxyRequest{
|
||||||
ProxyPath: tt.args.path,
|
ProxyPath: tt.args.path,
|
||||||
Responder: proxytest.NewResponder(response),
|
Responder: utiltest.NewResponder(response),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
|
|
@ -17,47 +17,32 @@ import (
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
listcorev1 "k8s.io/client-go/listers/core/v1"
|
|
||||||
|
|
||||||
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
||||||
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// todo: consider share logic with pkg/registry/cluster/storage/proxy.go:53
|
|
||||||
|
|
||||||
// ConnectCluster returns a handler for proxy cluster.
|
// ConnectCluster returns a handler for proxy cluster.
|
||||||
func ConnectCluster(ctx context.Context,
|
func ConnectCluster(ctx context.Context, cluster *clusterapis.Cluster, proxyPath string,
|
||||||
clusterLister clusterlisters.ClusterLister, secretLister listcorev1.SecretLister,
|
secretGetter func(context.Context, string, string) (*corev1.Secret, error), responder rest.Responder) (http.Handler, error) {
|
||||||
clusterName string, proxyPath string, responder rest.Responder) (http.Handler, error) {
|
location, transport, err := Location(cluster)
|
||||||
cluster, err := clusterLister.Get(clusterName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
location, transport, err := Location(cluster.Name, cluster.Spec.APIEndpoint, cluster.Spec.ProxyURL)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
location.Path = path.Join(location.Path, proxyPath)
|
location.Path = path.Join(location.Path, proxyPath)
|
||||||
|
|
||||||
secretGetter := func(context.Context, string) (*corev1.Secret, error) {
|
|
||||||
if cluster.Spec.ImpersonatorSecretRef == nil {
|
if cluster.Spec.ImpersonatorSecretRef == nil {
|
||||||
return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name)
|
return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name)
|
||||||
}
|
}
|
||||||
return secretLister.Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(cluster.Spec.ImpersonatorSecretRef.Name)
|
|
||||||
}
|
|
||||||
return connectCluster(ctx, cluster.Name, location, transport, responder, secretGetter)
|
|
||||||
}
|
|
||||||
|
|
||||||
func connectCluster(ctx context.Context, clusterName string, location *url.URL, transport http.RoundTripper, responder rest.Responder,
|
secret, err := secretGetter(ctx, cluster.Spec.ImpersonatorSecretRef.Namespace, cluster.Spec.ImpersonatorSecretRef.Name)
|
||||||
impersonateSecretGetter func(context.Context, string) (*corev1.Secret, error)) (http.Handler, error) {
|
|
||||||
secret, err := impersonateSecretGetter(ctx, clusterName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
impersonateToken, err := getImpersonateToken(clusterName, secret)
|
impersonateToken, err := getImpersonateToken(cluster.Name, secret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", clusterName, err)
|
return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", cluster.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return newProxyHandler(location, transport, impersonateToken, responder)
|
return newProxyHandler(location, transport, impersonateToken, responder)
|
||||||
|
@ -70,13 +55,13 @@ func NewThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.Roun
|
||||||
}
|
}
|
||||||
|
|
||||||
// Location returns a URL to which one can send traffic for the specified cluster.
|
// Location returns a URL to which one can send traffic for the specified cluster.
|
||||||
func Location(clusterName string, apiEndpoint string, proxyURL string) (*url.URL, http.RoundTripper, error) {
|
func Location(cluster *clusterapis.Cluster) (*url.URL, http.RoundTripper, error) {
|
||||||
location, err := constructLocation(clusterName, apiEndpoint)
|
location, err := constructLocation(cluster)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
transport, err := createProxyTransport(proxyURL)
|
transport, err := createProxyTransport(cluster)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -84,9 +69,10 @@ func Location(clusterName string, apiEndpoint string, proxyURL string) (*url.URL
|
||||||
return location, transport, nil
|
return location, transport, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func constructLocation(clusterName string, apiEndpoint string) (*url.URL, error) {
|
func constructLocation(cluster *clusterapis.Cluster) (*url.URL, error) {
|
||||||
|
apiEndpoint := cluster.Spec.APIEndpoint
|
||||||
if apiEndpoint == "" {
|
if apiEndpoint == "" {
|
||||||
return nil, fmt.Errorf("API endpoint of cluster %s should not be empty", clusterName)
|
return nil, fmt.Errorf("API endpoint of cluster %s should not be empty", cluster.GetName())
|
||||||
}
|
}
|
||||||
|
|
||||||
uri, err := url.Parse(apiEndpoint)
|
uri, err := url.Parse(apiEndpoint)
|
||||||
|
@ -96,7 +82,7 @@ func constructLocation(clusterName string, apiEndpoint string) (*url.URL, error)
|
||||||
return uri, nil
|
return uri, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createProxyTransport(proxyURL string) (*http.Transport, error) {
|
func createProxyTransport(cluster *clusterapis.Cluster) (*http.Transport, error) {
|
||||||
var proxyDialerFn utilnet.DialFunc
|
var proxyDialerFn utilnet.DialFunc
|
||||||
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} // #nosec
|
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} // #nosec
|
||||||
trans := utilnet.SetTransportDefaults(&http.Transport{
|
trans := utilnet.SetTransportDefaults(&http.Transport{
|
||||||
|
@ -104,7 +90,7 @@ func createProxyTransport(proxyURL string) (*http.Transport, error) {
|
||||||
TLSClientConfig: proxyTLSClientConfig,
|
TLSClientConfig: proxyTLSClientConfig,
|
||||||
})
|
})
|
||||||
|
|
||||||
if proxyURL != "" {
|
if proxyURL := cluster.Spec.ProxyURL; proxyURL != "" {
|
||||||
u, err := url.Parse(proxyURL)
|
u, err := url.Parse(proxyURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to parse url of proxy url %s: %v", proxyURL, err)
|
return nil, fmt.Errorf("failed to parse url of proxy url %s: %v", proxyURL, err)
|
||||||
|
|
|
@ -0,0 +1,222 @@
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
|
||||||
|
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
||||||
|
utiltest "github.com/karmada-io/karmada/pkg/util/testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestConnectCluster(t *testing.T) {
|
||||||
|
const (
|
||||||
|
testToken = "token"
|
||||||
|
testGroup = "group"
|
||||||
|
testUser = "user"
|
||||||
|
)
|
||||||
|
|
||||||
|
s := httptest.NewTLSServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
if req.URL.Path != "/proxy" ||
|
||||||
|
req.Header.Get("Authorization") != "bearer "+testToken ||
|
||||||
|
req.Header.Get("Impersonate-Group") != testGroup ||
|
||||||
|
req.Header.Get("Impersonate-User") != testUser {
|
||||||
|
t.Errorf("bad request: %v, %v", req.URL.Path, req.Header)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fmt.Fprintf(rw, "ok")
|
||||||
|
}))
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
type args struct {
|
||||||
|
ctx context.Context
|
||||||
|
cluster *clusterapis.Cluster
|
||||||
|
secretGetter func(context.Context, string, string) (*corev1.Secret, error)
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
want string
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "apiEndpoint is empty",
|
||||||
|
args: args{
|
||||||
|
cluster: &clusterapis.Cluster{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||||
|
Spec: clusterapis.ClusterSpec{},
|
||||||
|
},
|
||||||
|
secretGetter: nil,
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
want: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "apiEndpoint is invalid",
|
||||||
|
args: args{
|
||||||
|
cluster: &clusterapis.Cluster{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||||
|
Spec: clusterapis.ClusterSpec{APIEndpoint: "h :/ invalid"},
|
||||||
|
},
|
||||||
|
secretGetter: nil,
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
want: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ProxyURL is invalid",
|
||||||
|
args: args{
|
||||||
|
cluster: &clusterapis.Cluster{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||||
|
Spec: clusterapis.ClusterSpec{
|
||||||
|
APIEndpoint: s.URL,
|
||||||
|
ProxyURL: "h :/ invalid",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
secretGetter: nil,
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
want: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ImpersonatorSecretRef is nil",
|
||||||
|
args: args{
|
||||||
|
cluster: &clusterapis.Cluster{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||||
|
Spec: clusterapis.ClusterSpec{
|
||||||
|
APIEndpoint: s.URL,
|
||||||
|
ProxyURL: "http://proxy",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
secretGetter: nil,
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
want: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "secret not found",
|
||||||
|
args: args{
|
||||||
|
cluster: &clusterapis.Cluster{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||||
|
Spec: clusterapis.ClusterSpec{
|
||||||
|
APIEndpoint: s.URL,
|
||||||
|
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) {
|
||||||
|
return nil, apierrors.NewNotFound(corev1.Resource("secrets"), name)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
want: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "SecretTokenKey not found",
|
||||||
|
args: args{
|
||||||
|
cluster: &clusterapis.Cluster{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||||
|
Spec: clusterapis.ClusterSpec{
|
||||||
|
APIEndpoint: s.URL,
|
||||||
|
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) {
|
||||||
|
return &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name},
|
||||||
|
Data: map[string][]byte{},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
want: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no user found for request",
|
||||||
|
args: args{
|
||||||
|
ctx: context.TODO(),
|
||||||
|
cluster: &clusterapis.Cluster{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||||
|
Spec: clusterapis.ClusterSpec{
|
||||||
|
APIEndpoint: s.URL,
|
||||||
|
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) {
|
||||||
|
return &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name},
|
||||||
|
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte(testToken)},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
want: "Internal Server Error: \"\": no user found for request\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "proxy success",
|
||||||
|
args: args{
|
||||||
|
ctx: request.WithUser(request.NewContext(), &user.DefaultInfo{Name: testUser, Groups: []string{testGroup, user.AllAuthenticated, user.AllUnauthenticated}}),
|
||||||
|
cluster: &clusterapis.Cluster{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
|
||||||
|
Spec: clusterapis.ClusterSpec{
|
||||||
|
APIEndpoint: s.URL,
|
||||||
|
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
secretGetter: func(_ context.Context, ns string, name string) (*corev1.Secret, error) {
|
||||||
|
return &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name},
|
||||||
|
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte(testToken)},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
want: "ok",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ctx := tt.args.ctx
|
||||||
|
if ctx == nil {
|
||||||
|
ctx = context.TODO()
|
||||||
|
}
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "GET", "http://127.0.0.1/xxx", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
|
||||||
|
h, err := ConnectCluster(context.TODO(), tt.args.cluster, "proxy", tt.args.secretGetter, utiltest.NewResponder(resp))
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("ConnectCluster() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.ServeHTTP(resp, req)
|
||||||
|
if t.Failed() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if got := string(body); got != tt.want {
|
||||||
|
t.Errorf("Connect() got = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue