Merge pull request #2678 from cmicat/feature/proxy-connector-framework

Add Proxy Framework
This commit is contained in:
karmada-bot 2022-10-31 15:55:24 +08:00 committed by GitHub
commit bd8d734054
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1406 additions and 959 deletions

View File

@ -2,19 +2,46 @@ package app
import (
"context"
"fmt"
"net"
"net/http"
"path"
"time"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/endpoints/request"
genericapiserver "k8s.io/apiserver/pkg/server"
genericfilters "k8s.io/apiserver/pkg/server/filters"
genericoptions "k8s.io/apiserver/pkg/server/options"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"github.com/karmada-io/karmada/cmd/karmada-search/app/options"
searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi"
"github.com/karmada-io/karmada/pkg/search"
"github.com/karmada-io/karmada/pkg/search/proxy"
"github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
"github.com/karmada-io/karmada/pkg/sharedcli"
"github.com/karmada-io/karmada/pkg/sharedcli/klogflag"
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
"github.com/karmada-io/karmada/pkg/util/lifted"
"github.com/karmada-io/karmada/pkg/version"
"github.com/karmada-io/karmada/pkg/version/sharedcommand"
)
// Option configures a framework.Registry.
type Option func(*runtime.Registry)
// NewKarmadaSearchCommand creates a *cobra.Command object with default parameters
func NewKarmadaSearchCommand(ctx context.Context) *cobra.Command {
func NewKarmadaSearchCommand(ctx context.Context, registryOptions ...Option) *cobra.Command {
opts := options.NewOptions()
cmd := &cobra.Command{
@ -28,7 +55,7 @@ capabilities such as global search and resource proxy in a multi-cloud environme
if err := opts.Validate(); err != nil {
return err
}
if err := opts.Run(ctx); err != nil {
if err := run(ctx, opts, registryOptions...); err != nil {
return err
}
return nil
@ -52,3 +79,144 @@ capabilities such as global search and resource proxy in a multi-cloud environme
sharedcli.SetUsageAndHelpFunc(cmd, fss, cols)
return cmd
}
// WithPlugin creates an Option based on plugin factory.
// Please don't remove this function: it is used to register out-of-tree plugins,
// hence there are no references to it from the karmada-search code base.
func WithPlugin(factory runtime.PluginFactory) Option {
return func(registry *runtime.Registry) {
registry.Register(factory)
}
}
// `run` runs the karmada-search with options. This should never exit.
func run(ctx context.Context, o *options.Options, registryOptions ...Option) error {
klog.Infof("karmada-search version: %s", version.Get())
profileflag.ListenAndServe(o.ProfileOpts)
config, err := config(o, registryOptions...)
if err != nil {
return err
}
server, err := config.Complete().New()
if err != nil {
return err
}
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-informers", func(context genericapiserver.PostStartHookContext) error {
config.GenericConfig.SharedInformerFactory.Start(context.StopCh)
return nil
})
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-informers", func(context genericapiserver.PostStartHookContext) error {
config.ExtraConfig.KarmadaSharedInformerFactory.Start(context.StopCh)
return nil
})
if config.ExtraConfig.Controller != nil {
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error {
// start ResourceRegistry controller
config.ExtraConfig.Controller.Start(context.StopCh)
return nil
})
}
if config.ExtraConfig.ProxyController != nil {
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-proxy-controller", func(context genericapiserver.PostStartHookContext) error {
config.ExtraConfig.ProxyController.Start(context.StopCh)
return nil
})
server.GenericAPIServer.AddPreShutdownHookOrDie("stop-karmada-proxy-controller", func() error {
config.ExtraConfig.ProxyController.Stop()
return nil
})
}
return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
}
// `config` returns config for the api server given Options
func config(o *options.Options, outOfTreeRegistryOptions ...Option) (*search.Config, error) {
// TODO have a "real" external address
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{netutils.ParseIPSloppy("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
o.RecommendedOptions.Features = &genericoptions.FeatureOptions{EnableProfiling: false}
serverConfig := genericapiserver.NewRecommendedConfig(searchscheme.Codecs)
serverConfig.LongRunningFunc = customLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"))
serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(searchscheme.Scheme))
serverConfig.OpenAPIConfig.Info.Title = "karmada-search"
if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil {
return nil, err
}
serverConfig.ClientConfig.QPS = o.KubeAPIQPS
serverConfig.ClientConfig.Burst = o.KubeAPIBurst
restMapper, err := apiutil.NewDynamicRESTMapper(serverConfig.ClientConfig)
if err != nil {
klog.Errorf("Failed to create REST mapper: %v", err)
return nil, err
}
karmadaClient := karmadaclientset.NewForConfigOrDie(serverConfig.ClientConfig)
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
var ctl *search.Controller
if !o.DisableSearch {
ctl, err = search.NewController(serverConfig.ClientConfig, factory, restMapper)
if err != nil {
return nil, err
}
}
var proxyCtl *proxy.Controller
if !o.DisableProxy {
outOfTreeRegistry := make(runtime.Registry, 0, len(outOfTreeRegistryOptions))
for _, option := range outOfTreeRegistryOptions {
option(&outOfTreeRegistry)
}
proxyCtl, err = proxy.NewController(proxy.NewControllerOption{
RestConfig: serverConfig.ClientConfig,
RestMapper: restMapper,
KubeFactory: serverConfig.SharedInformerFactory,
KarmadaFactory: factory,
MinRequestTimeout: time.Second * time.Duration(serverConfig.Config.MinRequestTimeout),
OutOfTreeRegistry: outOfTreeRegistry,
})
if err != nil {
return nil, err
}
}
config := &search.Config{
GenericConfig: serverConfig,
ExtraConfig: search.ExtraConfig{
KarmadaSharedInformerFactory: factory,
Controller: ctl,
ProxyController: proxyCtl,
},
}
return config, nil
}
func customLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) request.LongRunningRequestCheck {
return func(r *http.Request, requestInfo *request.RequestInfo) bool {
if requestInfo.APIGroup == "search.karmada.io" && requestInfo.Resource == "proxying" {
reqClone := r.Clone(context.TODO())
// requestInfo.Parts is like [proxying foo proxy api v1 nodes]
reqClone.URL.Path = "/" + path.Join(requestInfo.Parts[3:]...)
requestInfo = lifted.NewRequestInfo(reqClone)
}
return genericfilters.BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources)(r, requestInfo)
}
}

View File

@ -1,43 +1,21 @@
package options
import (
"context"
"fmt"
"net"
"net/http"
"path"
"time"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
genericfilters "k8s.io/apiserver/pkg/server/filters"
genericoptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme"
searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi"
"github.com/karmada-io/karmada/pkg/search"
"github.com/karmada-io/karmada/pkg/search/proxy"
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
"github.com/karmada-io/karmada/pkg/util/lifted"
"github.com/karmada-io/karmada/pkg/version"
)
const defaultEtcdPathPrefix = "/registry"
// Options contains everything necessary to create and run karmada-search.
// Options contains command line parameters for karmada-search.
type Options struct {
RecommendedOptions *genericoptions.RecommendedOptions
@ -83,123 +61,3 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
func (o *Options) Complete() error {
return nil
}
// Run runs the aggregated-apiserver with options. This should never exit.
func (o *Options) Run(ctx context.Context) error {
klog.Infof("karmada-search version: %s", version.Get())
profileflag.ListenAndServe(o.ProfileOpts)
config, err := o.Config()
if err != nil {
return err
}
server, err := config.Complete().New()
if err != nil {
return err
}
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-informers", func(context genericapiserver.PostStartHookContext) error {
config.GenericConfig.SharedInformerFactory.Start(context.StopCh)
return nil
})
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-informers", func(context genericapiserver.PostStartHookContext) error {
config.ExtraConfig.KarmadaSharedInformerFactory.Start(context.StopCh)
return nil
})
if config.ExtraConfig.Controller != nil {
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error {
// start ResourceRegistry controller
config.ExtraConfig.Controller.Start(context.StopCh)
return nil
})
}
if config.ExtraConfig.ProxyController != nil {
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-proxy-controller", func(context genericapiserver.PostStartHookContext) error {
config.ExtraConfig.ProxyController.Start(context.StopCh)
return nil
})
server.GenericAPIServer.AddPreShutdownHookOrDie("stop-karmada-proxy-controller", func() error {
config.ExtraConfig.ProxyController.Stop()
return nil
})
}
return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
}
// Config returns config for the api server given Options
func (o *Options) Config() (*search.Config, error) {
// TODO have a "real" external address
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{netutils.ParseIPSloppy("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
o.RecommendedOptions.Features = &genericoptions.FeatureOptions{EnableProfiling: false}
serverConfig := genericapiserver.NewRecommendedConfig(searchscheme.Codecs)
serverConfig.LongRunningFunc = customLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"))
serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(searchscheme.Scheme))
serverConfig.OpenAPIConfig.Info.Title = "karmada-search"
if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil {
return nil, err
}
serverConfig.ClientConfig.QPS = o.KubeAPIQPS
serverConfig.ClientConfig.Burst = o.KubeAPIBurst
restMapper, err := apiutil.NewDynamicRESTMapper(serverConfig.ClientConfig)
if err != nil {
klog.Errorf("Failed to create REST mapper: %v", err)
return nil, err
}
karmadaClient := karmadaclientset.NewForConfigOrDie(serverConfig.ClientConfig)
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
var ctl *search.Controller
if !o.DisableSearch {
ctl, err = search.NewController(serverConfig.ClientConfig, factory, restMapper)
if err != nil {
return nil, err
}
}
var proxyCtl *proxy.Controller
if !o.DisableProxy {
proxyCtl, err = proxy.NewController(serverConfig.ClientConfig, restMapper, serverConfig.SharedInformerFactory, factory,
time.Second*time.Duration(serverConfig.Config.MinRequestTimeout))
if err != nil {
return nil, err
}
}
config := &search.Config{
GenericConfig: serverConfig,
ExtraConfig: search.ExtraConfig{
KarmadaSharedInformerFactory: factory,
Controller: ctl,
ProxyController: proxyCtl,
},
}
return config, nil
}
func customLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) request.LongRunningRequestCheck {
return func(r *http.Request, requestInfo *request.RequestInfo) bool {
if requestInfo.APIGroup == "search.karmada.io" && requestInfo.Resource == "proxying" {
reqClone := r.Clone(context.TODO())
// requestInfo.Parts is like [proxying foo proxy api v1 nodes]
reqClone.URL.Path = "/" + path.Join(requestInfo.Parts[3:]...)
requestInfo = lifted.NewRequestInfo(reqClone)
}
return genericfilters.BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources)(r, requestInfo)
}
}

View File

@ -0,0 +1,84 @@
package proxy
import (
"fmt"
"net/http"
"net/url"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/dynamic"
listcorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
)
func newMultiClusterStore(clusterLister clusterlisters.ClusterLister,
secretLister listcorev1.SecretLister, restMapper meta.RESTMapper) store.Store {
clientFactory := &clientFactory{
ClusterLister: clusterLister,
SecretLister: secretLister,
}
return store.NewMultiClusterCache(clientFactory.DynamicClientForCluster, restMapper)
}
type clientFactory struct {
ClusterLister clusterlisters.ClusterLister
SecretLister listcorev1.SecretLister
}
// DynamicClientForCluster creates a dynamic client for required cluster.
// TODO: reuse with karmada/pkg/util/membercluster_client.go
func (factory *clientFactory) DynamicClientForCluster(clusterName string) (dynamic.Interface, error) {
cluster, err := factory.ClusterLister.Get(clusterName)
if err != nil {
return nil, err
}
apiEndpoint := cluster.Spec.APIEndpoint
if apiEndpoint == "" {
return nil, fmt.Errorf("the api endpoint of cluster %s is empty", clusterName)
}
if cluster.Spec.SecretRef == nil {
return nil, fmt.Errorf("cluster %s does not have a secret", clusterName)
}
secret, err := factory.SecretLister.Secrets(cluster.Spec.SecretRef.Namespace).Get(cluster.Spec.SecretRef.Name)
if err != nil {
return nil, err
}
token, tokenFound := secret.Data[clusterv1alpha1.SecretTokenKey]
if !tokenFound || len(token) == 0 {
return nil, fmt.Errorf("the secret for cluster %s is missing a non-empty value for %q", clusterName, clusterv1alpha1.SecretTokenKey)
}
clusterConfig, err := clientcmd.BuildConfigFromFlags(apiEndpoint, "")
if err != nil {
return nil, err
}
clusterConfig.BearerToken = string(token)
if cluster.Spec.InsecureSkipTLSVerification {
clusterConfig.TLSClientConfig.Insecure = true
} else {
clusterConfig.CAData = secret.Data[clusterv1alpha1.SecretCADataKey]
}
if cluster.Spec.ProxyURL != "" {
proxy, err := url.Parse(cluster.Spec.ProxyURL)
if err != nil {
klog.Errorf("parse proxy error. %v", err)
return nil, err
}
clusterConfig.Proxy = http.ProxyURL(proxy)
}
return dynamic.NewForConfig(clusterConfig)
}

View File

@ -0,0 +1,246 @@
package proxy
import (
"errors"
"testing"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake"
karmadainformers "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
)
func TestClientFactory_dynamicClientForCluster(t *testing.T) {
// copy from go/src/net/http/internal/testcert/testcert.go
testCA := []byte(`-----BEGIN CERTIFICATE-----
MIIDOTCCAiGgAwIBAgIQSRJrEpBGFc7tNb1fb5pKFzANBgkqhkiG9w0BAQsFADAS
MRAwDgYDVQQKEwdBY21lIENvMCAXDTcwMDEwMTAwMDAwMFoYDzIwODQwMTI5MTYw
MDAwWjASMRAwDgYDVQQKEwdBY21lIENvMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
MIIBCgKCAQEA6Gba5tHV1dAKouAaXO3/ebDUU4rvwCUg/CNaJ2PT5xLD4N1Vcb8r
bFSW2HXKq+MPfVdwIKR/1DczEoAGf/JWQTW7EgzlXrCd3rlajEX2D73faWJekD0U
aUgz5vtrTXZ90BQL7WvRICd7FlEZ6FPOcPlumiyNmzUqtwGhO+9ad1W5BqJaRI6P
YfouNkwR6Na4TzSj5BrqUfP0FwDizKSJ0XXmh8g8G9mtwxOSN3Ru1QFc61Xyeluk
POGKBV/q6RBNklTNe0gI8usUMlYyoC7ytppNMW7X2vodAelSu25jgx2anj9fDVZu
h7AXF5+4nJS4AAt0n1lNY7nGSsdZas8PbQIDAQABo4GIMIGFMA4GA1UdDwEB/wQE
AwICpDATBgNVHSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MB0GA1Ud
DgQWBBStsdjh3/JCXXYlQryOrL4Sh7BW5TAuBgNVHREEJzAlggtleGFtcGxlLmNv
bYcEfwAAAYcQAAAAAAAAAAAAAAAAAAAAATANBgkqhkiG9w0BAQsFAAOCAQEAxWGI
5NhpF3nwwy/4yB4i/CwwSpLrWUa70NyhvprUBC50PxiXav1TeDzwzLx/o5HyNwsv
cxv3HdkLW59i/0SlJSrNnWdfZ19oTcS+6PtLoVyISgtyN6DpkKpdG1cOkW3Cy2P2
+tK/tKHRP1Y/Ra0RiDpOAmqn0gCOFGz8+lqDIor/T7MTpibL3IxqWfPrvfVRHL3B
grw/ZQTTIVjjh4JBSW3WyWgNo/ikC1lrVxzl4iPUGptxT36Cr7Zk2Bsg0XqwbOvK
5d+NTDREkSnUbie4GeutujmX3Dsx88UiV6UY/4lHJa6I5leHUNOHahRbpbWeOfs/
WkBKOclmOV2xlTVuPw==
-----END CERTIFICATE-----`)
type args struct {
clusters []runtime.Object
secrets []runtime.Object
}
type want struct {
err error
}
tests := []struct {
name string
args args
want want
}{
{
name: "cluster not found",
args: args{
clusters: nil,
secrets: nil,
},
want: want{
err: apierrors.NewNotFound(schema.GroupResource{Resource: "cluster", Group: "cluster.karmada.io"}, "test"),
},
},
{
name: "api endpoint is empty",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{},
}},
secrets: nil,
},
want: want{
err: errors.New("the api endpoint of cluster test is empty"),
},
},
{
name: "secret is empty",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
},
}},
secrets: nil,
},
want: want{
err: errors.New("cluster test does not have a secret"),
},
},
{
name: "secret not found",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
},
}},
secrets: nil,
},
want: want{
err: errors.New(`secret "test_secret" not found`),
},
},
{
name: "token not found",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
},
}},
secrets: []runtime.Object{&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test_secret",
},
Data: map[string][]byte{},
}},
},
want: want{
err: errors.New(`the secret for cluster test is missing a non-empty value for "token"`),
},
},
{
name: "success",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
},
}},
secrets: []runtime.Object{&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test_secret",
},
Data: map[string][]byte{
clusterv1alpha1.SecretTokenKey: []byte("test_token"),
clusterv1alpha1.SecretCADataKey: testCA,
},
}},
},
want: want{
err: nil,
},
},
{
name: "has proxy",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
ProxyURL: "https://localhost",
},
}},
secrets: []runtime.Object{&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test_secret",
},
Data: map[string][]byte{
clusterv1alpha1.SecretTokenKey: []byte("test_token"),
clusterv1alpha1.SecretCADataKey: testCA,
},
}},
},
want: want{
err: nil,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
kubeClient := fake.NewSimpleClientset(tt.args.secrets...)
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
karmadaClient := karmadafake.NewSimpleClientset(tt.args.clusters...)
karmadaFactory := karmadainformers.NewSharedInformerFactory(karmadaClient, 0)
factory := &clientFactory{
ClusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(),
SecretLister: kubeFactory.Core().V1().Secrets().Lister(),
}
stopCh := make(chan struct{})
defer close(stopCh)
karmadaFactory.Start(stopCh)
karmadaFactory.WaitForCacheSync(stopCh)
kubeFactory.Start(stopCh)
kubeFactory.WaitForCacheSync(stopCh)
client, err := factory.DynamicClientForCluster("test")
if !proxytest.ErrorMessageEquals(err, tt.want.err) {
t.Errorf("got error %v, want %v", err, tt.want.err)
return
}
if err != nil {
return
}
if client == nil {
t.Error("got client nil")
}
})
}
}

View File

@ -2,9 +2,7 @@ package proxy
import (
"context"
"fmt"
"net/http"
"net/url"
"time"
"k8s.io/apimachinery/pkg/api/meta"
@ -15,19 +13,19 @@ import (
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
listcorev1 "k8s.io/client-go/listers/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
searchlisters "github.com/karmada-io/karmada/pkg/generated/listers/search/v1alpha1"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
"github.com/karmada-io/karmada/pkg/search/proxy/framework/plugins"
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/lifted"
@ -36,10 +34,6 @@ import (
const workKey = "key"
type connector interface {
connect(ctx context.Context, gvr schema.GroupVersionResource, proxyPath string, responder rest.Responder) (http.Handler, error)
}
// Controller syncs Cluster and GlobalResource.
type Controller struct {
restMapper meta.RESTMapper
@ -49,36 +43,47 @@ type Controller struct {
clusterLister clusterlisters.ClusterLister
registryLister searchlisters.ResourceRegistryLister
worker util.AsyncWorker
store store.Cache
store store.Store
// proxy
karmadaProxy connector
clusterProxy connector
cacheProxy connector
proxy framework.Proxy
}
// NewControllerOption is the Option for NewController().
type NewControllerOption struct {
RestConfig *restclient.Config
RestMapper meta.RESTMapper
KubeFactory informers.SharedInformerFactory
KarmadaFactory informerfactory.SharedInformerFactory
MinRequestTimeout time.Duration
OutOfTreeRegistry pluginruntime.Registry
}
// NewController create a controller for proxy
func NewController(restConfig *restclient.Config, restMapper meta.RESTMapper,
kubeFactory informers.SharedInformerFactory, karmadaFactory informerfactory.SharedInformerFactory,
minRequestTimeout time.Duration) (*Controller, error) {
kp, err := newKarmadaProxy(restConfig)
func NewController(option NewControllerOption) (*Controller, error) {
secretLister := option.KubeFactory.Core().V1().Secrets().Lister()
clusterLister := option.KarmadaFactory.Cluster().V1alpha1().Clusters().Lister()
multiClusterStore := newMultiClusterStore(clusterLister, secretLister, option.RestMapper)
allPlugins, err := newPlugins(option, multiClusterStore)
if err != nil {
return nil, err
}
ctl := &Controller{
restMapper: restMapper,
negotiatedSerializer: scheme.Codecs.WithoutConversion(),
secretLister: kubeFactory.Core().V1().Secrets().Lister(),
clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(),
registryLister: karmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
}
s := store.NewMultiClusterCache(ctl.dynamicClientForCluster, restMapper)
proxy := pluginruntime.NewFramework(allPlugins)
ctl.store = s
ctl.cacheProxy = newCacheProxy(s, restMapper, minRequestTimeout)
ctl.clusterProxy = newClusterProxy(s, ctl.clusterLister, ctl.secretLister)
ctl.karmadaProxy = kp
ctl := &Controller{
restMapper: option.RestMapper,
negotiatedSerializer: scheme.Codecs.WithoutConversion(),
secretLister: secretLister,
clusterLister: clusterLister,
registryLister: option.KarmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
store: multiClusterStore,
proxy: proxy,
}
workerOptions := util.Options{
Name: "proxy-controller",
@ -99,12 +104,38 @@ func NewController(restConfig *restclient.Config, restMapper meta.RESTMapper,
},
}
karmadaFactory.Cluster().V1alpha1().Clusters().Informer().AddEventHandler(resourceEventHandler)
karmadaFactory.Search().V1alpha1().ResourceRegistries().Informer().AddEventHandler(resourceEventHandler)
option.KarmadaFactory.Cluster().V1alpha1().Clusters().Informer().AddEventHandler(resourceEventHandler)
option.KarmadaFactory.Search().V1alpha1().ResourceRegistries().Informer().AddEventHandler(resourceEventHandler)
return ctl, nil
}
func newPlugins(option NewControllerOption, clusterStore store.Store) ([]framework.Plugin, error) {
pluginDependency := pluginruntime.PluginDependency{
RestConfig: option.RestConfig,
RestMapper: option.RestMapper,
KubeFactory: option.KubeFactory,
KarmadaFactory: option.KarmadaFactory,
MinRequestTimeout: option.MinRequestTimeout,
Store: clusterStore,
}
registry := plugins.NewInTreeRegistry()
registry.Merge(option.OutOfTreeRegistry)
allPlugins := make([]framework.Plugin, 0, len(registry))
for _, pluginFactory := range registry {
plugin, err := pluginFactory(pluginDependency)
if err != nil {
return nil, err
}
allPlugins = append(allPlugins, plugin)
}
return allPlugins, nil
}
// Start run the proxy controller
func (ctl *Controller) Start(stopCh <-chan struct{}) {
ctl.worker.Run(1, stopCh)
@ -160,6 +191,21 @@ func (ctl *Controller) reconcile(util.QueueKey) error {
return ctl.store.UpdateCache(resourcesByClusters)
}
type errorHTTPHandler struct {
requestInfo *request.RequestInfo
err error
negotiatedSerializer runtime.NegotiatedSerializer
}
func (handler *errorHTTPHandler) ServeHTTP(delegate http.ResponseWriter, req *http.Request) {
// Write error into delegate ResponseWriter, wrapped in metrics.InstrumentHandlerFunc, so metrics can record this error.
gv := schema.GroupVersion{
Group: handler.requestInfo.APIGroup,
Version: handler.requestInfo.Verb,
}
responsewriters.ErrorNegotiated(handler.err, handler.negotiatedSerializer, gv, delegate, req)
}
// Connect proxy and dispatch handlers
func (ctl *Controller) Connect(ctx context.Context, proxyPath string, responder rest.Responder) (http.Handler, error) {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
@ -177,106 +223,24 @@ func (ctl *Controller) Connect(ctx context.Context, proxyPath string, responder
Resource: requestInfo.Resource,
}
conn := ctl.connect(requestInfo)
h, err := conn.connect(newCtx, gvr, proxyPath, responder)
h, err := ctl.proxy.Connect(newCtx, framework.ProxyRequest{
RequestInfo: requestInfo,
GroupVersionResource: gvr,
ProxyPath: proxyPath,
Responder: responder,
HTTPReq: newReq,
})
if err != nil {
h = http.HandlerFunc(func(delegate http.ResponseWriter, req *http.Request) {
// Write error into delegate ResponseWriter, wrapped in metrics.InstrumentHandlerFunc, so metrics can record this error.
gv := schema.GroupVersion{
Group: requestInfo.APIGroup,
Version: requestInfo.Verb,
}
responsewriters.ErrorNegotiated(err, ctl.negotiatedSerializer, gv, delegate, req)
})
h = &errorHTTPHandler{
requestInfo: requestInfo,
err: err,
negotiatedSerializer: ctl.negotiatedSerializer,
}
}
h = metrics.InstrumentHandlerFunc(requestInfo.Verb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource,
"", "karmada-search", false, "", h.ServeHTTP)
h.ServeHTTP(rw, newReq)
}), nil
}
func (ctl *Controller) connect(requestInfo *request.RequestInfo) connector {
gvr := schema.GroupVersionResource{
Group: requestInfo.APIGroup,
Version: requestInfo.APIVersion,
Resource: requestInfo.Resource,
}
// requests will be redirected to:
// 1. karmada apiserver
// 2. cache
// 3. member clusters
// see more information from https://github.com/karmada-io/karmada/tree/master/docs/proposals/resource-aggregation-proxy#request-routing
// 1. For non-resource requests, or resources are not defined in ResourceRegistry,
// we redirect the requests to karmada apiserver.
// Usually the request are
// - api index, e.g.: `/api`, `/apis`
// - to workload created in karmada controller panel, such as deployments and services.
if !requestInfo.IsResourceRequest || !ctl.store.HasResource(gvr) {
return ctl.karmadaProxy
}
// 2. For reading requests, we redirect them to cache.
// Users call these requests to read resources in member clusters, such as pods and nodes.
if requestInfo.Subresource == "" && (requestInfo.Verb == "get" || requestInfo.Verb == "list" || requestInfo.Verb == "watch") {
return ctl.cacheProxy
}
// 3. The remaining requests are:
// - writing resources.
// - or subresource requests, e.g. `pods/log`
// We firstly find the resource from cache, and get the located cluster. Then redirect the request to the cluster.
return ctl.clusterProxy
}
// TODO: reuse with karmada/pkg/util/membercluster_client.go
func (ctl *Controller) dynamicClientForCluster(clusterName string) (dynamic.Interface, error) {
cluster, err := ctl.clusterLister.Get(clusterName)
if err != nil {
return nil, err
}
apiEndpoint := cluster.Spec.APIEndpoint
if apiEndpoint == "" {
return nil, fmt.Errorf("the api endpoint of cluster %s is empty", clusterName)
}
if cluster.Spec.SecretRef == nil {
return nil, fmt.Errorf("cluster %s does not have a secret", clusterName)
}
secret, err := ctl.secretLister.Secrets(cluster.Spec.SecretRef.Namespace).Get(cluster.Spec.SecretRef.Name)
if err != nil {
return nil, err
}
token, tokenFound := secret.Data[clusterv1alpha1.SecretTokenKey]
if !tokenFound || len(token) == 0 {
return nil, fmt.Errorf("the secret for cluster %s is missing a non-empty value for %q", clusterName, clusterv1alpha1.SecretTokenKey)
}
clusterConfig, err := clientcmd.BuildConfigFromFlags(apiEndpoint, "")
if err != nil {
return nil, err
}
clusterConfig.BearerToken = string(token)
if cluster.Spec.InsecureSkipTLSVerification {
clusterConfig.TLSClientConfig.Insecure = true
} else {
clusterConfig.CAData = secret.Data[clusterv1alpha1.SecretCADataKey]
}
if cluster.Spec.ProxyURL != "" {
proxy, err := url.Parse(cluster.Spec.ProxyURL)
if err != nil {
klog.Errorf("parse proxy error. %v", err)
return nil, err
}
clusterConfig.Proxy = http.ProxyURL(proxy)
}
return dynamic.NewForConfig(clusterConfig)
}

View File

@ -2,7 +2,6 @@ package proxy
import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
@ -13,14 +12,10 @@ import (
"time"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
@ -31,24 +26,11 @@ import (
searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1"
karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake"
karmadainformers "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
)
var (
podGVK = corev1.SchemeGroupVersion.WithKind("Pod")
nodeGVK = corev1.SchemeGroupVersion.WithKind("Node")
podSelector = searchv1alpha1.ResourceSelector{APIVersion: podGVK.GroupVersion().String(), Kind: podGVK.Kind}
nodeSelector = searchv1alpha1.ResourceSelector{APIVersion: nodeGVK.GroupVersion().String(), Kind: nodeGVK.Kind}
restMapper = meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
)
func init() {
restMapper.Add(podGVK, meta.RESTScopeNamespace)
restMapper.Add(nodeGVK, meta.RESTScopeRoot)
}
func TestController(t *testing.T) {
restConfig := &restclient.Config{
Host: "https//localhost:6443",
@ -59,7 +41,7 @@ func TestController(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "rr"},
Spec: searchv1alpha1.ResourceRegistrySpec{
ResourceSelectors: []searchv1alpha1.ResourceSelector{
podSelector,
proxytest.PodSelector,
},
},
}
@ -67,13 +49,14 @@ func TestController(t *testing.T) {
kubeFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)
karmadaFactory := karmadainformers.NewSharedInformerFactory(karmadafake.NewSimpleClientset(cluster1, rr), 0)
ctrl, err := NewController(
restConfig,
restMapper,
kubeFactory,
karmadaFactory,
0,
)
ctrl, err := NewController(NewControllerOption{
RestConfig: restConfig,
RestMapper: proxytest.RestMapper,
KubeFactory: kubeFactory,
KarmadaFactory: karmadaFactory,
MinRequestTimeout: 0,
})
if err != nil {
t.Error(err)
return
@ -95,7 +78,7 @@ func TestController(t *testing.T) {
// wait for controller synced
time.Sleep(time.Second)
hasPod := ctrl.store.HasResource(podGVR)
hasPod := ctrl.store.HasResource(proxytest.PodGVR)
if !hasPod {
t.Error("has no pod resource")
return
@ -127,7 +110,7 @@ func TestController_reconcile(t *testing.T) {
ClusterNames: []string{"cluster1"},
},
ResourceSelectors: []searchv1alpha1.ResourceSelector{
podSelector,
proxytest.PodSelector,
},
},
},
@ -146,8 +129,8 @@ func TestController_reconcile(t *testing.T) {
ClusterNames: []string{"cluster1", "cluster2"},
},
ResourceSelectors: []searchv1alpha1.ResourceSelector{
podSelector,
nodeSelector,
proxytest.PodSelector,
proxytest.NodeSelector,
},
},
},
@ -166,14 +149,14 @@ func TestController_reconcile(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "rr1"},
Spec: searchv1alpha1.ResourceRegistrySpec{
TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{"cluster1"}},
ResourceSelectors: []searchv1alpha1.ResourceSelector{podSelector},
ResourceSelectors: []searchv1alpha1.ResourceSelector{proxytest.PodSelector},
},
},
&searchv1alpha1.ResourceRegistry{
ObjectMeta: metav1.ObjectMeta{Name: "rr2"},
Spec: searchv1alpha1.ResourceRegistrySpec{
TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{"cluster2"}},
ResourceSelectors: []searchv1alpha1.ResourceSelector{nodeSelector},
ResourceSelectors: []searchv1alpha1.ResourceSelector{proxytest.NodeSelector},
},
},
},
@ -191,14 +174,14 @@ func TestController_reconcile(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "rr1"},
Spec: searchv1alpha1.ResourceRegistrySpec{
TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{"cluster1"}},
ResourceSelectors: []searchv1alpha1.ResourceSelector{podSelector, nodeSelector},
ResourceSelectors: []searchv1alpha1.ResourceSelector{proxytest.PodSelector, proxytest.NodeSelector},
},
},
&searchv1alpha1.ResourceRegistry{
ObjectMeta: metav1.ObjectMeta{Name: "rr2"},
Spec: searchv1alpha1.ResourceRegistrySpec{
TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{"cluster2"}},
ResourceSelectors: []searchv1alpha1.ResourceSelector{nodeSelector},
ResourceSelectors: []searchv1alpha1.ResourceSelector{proxytest.NodeSelector},
},
},
},
@ -219,8 +202,8 @@ func TestController_reconcile(t *testing.T) {
ClusterNames: []string{"cluster1"},
},
ResourceSelectors: []searchv1alpha1.ResourceSelector{
podSelector,
podSelector,
proxytest.PodSelector,
proxytest.PodSelector,
},
},
},
@ -241,7 +224,7 @@ func TestController_reconcile(t *testing.T) {
ClusterNames: []string{"cluster1"},
},
ResourceSelectors: []searchv1alpha1.ResourceSelector{
podSelector,
proxytest.PodSelector,
},
},
},
@ -252,7 +235,7 @@ func TestController_reconcile(t *testing.T) {
ClusterNames: []string{"cluster1"},
},
ResourceSelectors: []searchv1alpha1.ResourceSelector{
podSelector,
proxytest.PodSelector,
},
},
},
@ -288,10 +271,10 @@ func TestController_reconcile(t *testing.T) {
karmadaFactory := karmadainformers.NewSharedInformerFactory(karmadaClientset, 0)
ctl := &Controller{
restMapper: restMapper,
restMapper: proxytest.RestMapper,
clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(),
registryLister: karmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
store: &cacheFuncs{
store: &proxytest.MockStore{
UpdateCacheFunc: func(m map[string]map[schema.GroupVersionResource]struct{}) error {
for clusterName, resources := range m {
resourceNames := make([]string, 0, len(resources))
@ -324,148 +307,165 @@ func TestController_reconcile(t *testing.T) {
}
}
func TestController_Connect(t *testing.T) {
var karmadaProxying, clusterProxying, cacheProxying bool
ctl := &Controller{
karmadaProxy: connectFunc(func(context.Context, schema.GroupVersionResource, string, rest.Responder) (http.Handler, error) {
return http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
karmadaProxying = true
}), nil
}),
cacheProxy: connectFunc(func(context.Context, schema.GroupVersionResource, string, rest.Responder) (http.Handler, error) {
return http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
cacheProxying = true
}), nil
}),
clusterProxy: connectFunc(func(context.Context, schema.GroupVersionResource, string, rest.Responder) (http.Handler, error) {
return http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
clusterProxying = true
}), nil
}),
store: &cacheFuncs{
HasResourceFunc: func(gvr schema.GroupVersionResource) bool { return gvr == podGVR },
},
type mockPlugin struct {
TheOrder int
IsSupportRequest bool
Called bool
}
var _ framework.Plugin = (*mockPlugin)(nil)
func (r *mockPlugin) Order() int {
return r.TheOrder
}
func (r *mockPlugin) SupportRequest(request framework.ProxyRequest) bool {
return r.IsSupportRequest
}
func (r *mockPlugin) Connect(ctx context.Context, request framework.ProxyRequest) (http.Handler, error) {
return http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
r.Called = true
}), nil
}
func convertPluginSlice(in []*mockPlugin) []framework.Plugin {
out := make([]framework.Plugin, 0, len(in))
for _, plugin := range in {
out = append(out, plugin)
}
type args struct {
path string
}
type want struct {
karmadaProxying, clusterProxying, cacheProxying bool
return out
}
func TestController_Connect(t *testing.T) {
store := &proxytest.MockStore{
HasResourceFunc: func(gvr schema.GroupVersionResource) bool { return gvr == proxytest.PodGVR },
}
tests := []struct {
name string
args args
want want
name string
plugins []*mockPlugin
wantErr bool
wantCalled []bool
}{
{
name: "get api from karmada",
args: args{
path: "/api",
},
want: want{
karmadaProxying: true,
name: "call first",
plugins: []*mockPlugin{
{
TheOrder: 0,
IsSupportRequest: true,
},
{
TheOrder: 1,
IsSupportRequest: true,
},
},
wantErr: false,
wantCalled: []bool{true, false},
},
{
name: "get event api karmada",
args: args{
path: "/apis/events.k8s.io/v1",
},
want: want{
karmadaProxying: true,
name: "call second",
plugins: []*mockPlugin{
{
TheOrder: 0,
IsSupportRequest: false,
},
{
TheOrder: 1,
IsSupportRequest: true,
},
},
wantErr: false,
wantCalled: []bool{false, true},
},
{
name: "list nodes from karmada",
args: args{
path: "/api/v1/nodes",
},
want: want{
karmadaProxying: true,
},
},
{
name: "get node from karmada",
args: args{
path: "/api/v1/nodes",
},
want: want{
karmadaProxying: true,
},
},
{
name: "list pod from cache",
args: args{
path: "/api/v1/pods",
},
want: want{
cacheProxying: true,
},
},
{
name: "list pod from cache with namespace",
args: args{
path: "/api/v1/namespaces/default/pods",
},
want: want{
cacheProxying: true,
},
},
{
name: "get pod from cache",
args: args{
path: "/api/v1/namespaces/default/pods/foo",
},
want: want{
cacheProxying: true,
},
},
{
name: "get pod log from cluster",
args: args{
path: "/api/v1/namespaces/default/pods/foo/log",
},
want: want{
clusterProxying: true,
name: "call fail",
plugins: []*mockPlugin{
{
TheOrder: 0,
IsSupportRequest: false,
},
{
TheOrder: 1,
IsSupportRequest: false,
},
},
wantErr: true,
wantCalled: []bool{false, false},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
karmadaProxying, clusterProxying, cacheProxying = false, false, false
conn, err := ctl.Connect(context.TODO(), tt.args.path, nil)
if err != nil {
t.Error(err)
return
ctl := &Controller{
proxy: pluginruntime.NewFramework(convertPluginSlice(tt.plugins)),
negotiatedSerializer: scheme.Codecs.WithoutConversion(),
store: store,
}
req, err := http.NewRequest("GET", "/prefix"+tt.args.path, nil)
if err != nil {
t.Error(err)
return
}
conn.ServeHTTP(httptest.NewRecorder(), req)
if karmadaProxying != tt.want.karmadaProxying {
t.Errorf("karmadaProxying get = %v, want = %v", karmadaProxying, tt.want.karmadaProxying)
conn, err := ctl.Connect(context.TODO(), "/api/v1/pods", nil)
if err != nil {
t.Fatal(err)
}
if cacheProxying != tt.want.cacheProxying {
t.Errorf("cacheProxying get = %v, want = %v", cacheProxying, tt.want.cacheProxying)
req, err := http.NewRequest("GET", "/prefix/api/v1/pods", nil)
if err != nil {
t.Fatal(err)
}
if clusterProxying != tt.want.clusterProxying {
t.Errorf("clusterProxying get = %v, want = %v", clusterProxying, tt.want.clusterProxying)
recorder := httptest.NewRecorder()
conn.ServeHTTP(recorder, req)
response := recorder.Result()
fmt.Printf("response: %v", response)
if (response.StatusCode != 200) != tt.wantErr {
t.Errorf("http request returned status code = %v, want error = %v",
response.StatusCode, tt.wantErr)
}
if len(tt.plugins) != len(tt.wantCalled) {
panic("len(tt.plugins) != len(tt.wantCalled), please fix test cases")
}
for i, n := 0, len(tt.plugins); i < n; i++ {
if tt.plugins[i].Called != tt.wantCalled[i] {
t.Errorf("plugin[%v].Called = %v, want = %v", i, tt.plugins[i].Called, tt.wantCalled[i])
}
}
})
}
}
type failPlugin struct{}
var _ framework.Plugin = (*failPlugin)(nil)
func (r *failPlugin) Order() int {
return 0
}
func (r *failPlugin) SupportRequest(request framework.ProxyRequest) bool {
return true
}
func (r *failPlugin) Connect(ctx context.Context, request framework.ProxyRequest) (http.Handler, error) {
return nil, fmt.Errorf("test")
}
func TestController_Connect_Error(t *testing.T) {
store := &proxytest.MockStore{
HasResourceFunc: func(gvr schema.GroupVersionResource) bool {
return gvr == proxytest.PodGVR
},
}
plugins := []framework.Plugin{&failPlugin{}}
ctl := &Controller{
karmadaProxy: connectFunc(func(context.Context, schema.GroupVersionResource, string, rest.Responder) (http.Handler, error) {
return nil, fmt.Errorf("test")
}),
proxy: pluginruntime.NewFramework(plugins),
store: store,
negotiatedSerializer: scheme.Codecs.WithoutConversion(),
}
@ -491,287 +491,8 @@ func TestController_Connect_Error(t *testing.T) {
}
}
func TestController_dynamicClientForCluster(t *testing.T) {
// copy from go/src/net/http/internal/testcert/testcert.go
testCA := []byte(`-----BEGIN CERTIFICATE-----
MIIDOTCCAiGgAwIBAgIQSRJrEpBGFc7tNb1fb5pKFzANBgkqhkiG9w0BAQsFADAS
MRAwDgYDVQQKEwdBY21lIENvMCAXDTcwMDEwMTAwMDAwMFoYDzIwODQwMTI5MTYw
MDAwWjASMRAwDgYDVQQKEwdBY21lIENvMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
MIIBCgKCAQEA6Gba5tHV1dAKouAaXO3/ebDUU4rvwCUg/CNaJ2PT5xLD4N1Vcb8r
bFSW2HXKq+MPfVdwIKR/1DczEoAGf/JWQTW7EgzlXrCd3rlajEX2D73faWJekD0U
aUgz5vtrTXZ90BQL7WvRICd7FlEZ6FPOcPlumiyNmzUqtwGhO+9ad1W5BqJaRI6P
YfouNkwR6Na4TzSj5BrqUfP0FwDizKSJ0XXmh8g8G9mtwxOSN3Ru1QFc61Xyeluk
POGKBV/q6RBNklTNe0gI8usUMlYyoC7ytppNMW7X2vodAelSu25jgx2anj9fDVZu
h7AXF5+4nJS4AAt0n1lNY7nGSsdZas8PbQIDAQABo4GIMIGFMA4GA1UdDwEB/wQE
AwICpDATBgNVHSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MB0GA1Ud
DgQWBBStsdjh3/JCXXYlQryOrL4Sh7BW5TAuBgNVHREEJzAlggtleGFtcGxlLmNv
bYcEfwAAAYcQAAAAAAAAAAAAAAAAAAAAATANBgkqhkiG9w0BAQsFAAOCAQEAxWGI
5NhpF3nwwy/4yB4i/CwwSpLrWUa70NyhvprUBC50PxiXav1TeDzwzLx/o5HyNwsv
cxv3HdkLW59i/0SlJSrNnWdfZ19oTcS+6PtLoVyISgtyN6DpkKpdG1cOkW3Cy2P2
+tK/tKHRP1Y/Ra0RiDpOAmqn0gCOFGz8+lqDIor/T7MTpibL3IxqWfPrvfVRHL3B
grw/ZQTTIVjjh4JBSW3WyWgNo/ikC1lrVxzl4iPUGptxT36Cr7Zk2Bsg0XqwbOvK
5d+NTDREkSnUbie4GeutujmX3Dsx88UiV6UY/4lHJa6I5leHUNOHahRbpbWeOfs/
WkBKOclmOV2xlTVuPw==
-----END CERTIFICATE-----`)
type args struct {
clusters []runtime.Object
secrets []runtime.Object
}
type want struct {
err error
}
tests := []struct {
name string
args args
want want
}{
{
name: "cluster not found",
args: args{
clusters: nil,
secrets: nil,
},
want: want{
err: apierrors.NewNotFound(schema.GroupResource{Resource: "cluster", Group: "cluster.karmada.io"}, "test"),
},
},
{
name: "api endpoint is empty",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{},
}},
secrets: nil,
},
want: want{
err: errors.New("the api endpoint of cluster test is empty"),
},
},
{
name: "secret is empty",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
},
}},
secrets: nil,
},
want: want{
err: errors.New("cluster test does not have a secret"),
},
},
{
name: "secret not found",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
},
}},
secrets: nil,
},
want: want{
err: errors.New(`secret "test_secret" not found`),
},
},
{
name: "token not found",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
},
}},
secrets: []runtime.Object{&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test_secret",
},
Data: map[string][]byte{},
}},
},
want: want{
err: errors.New(`the secret for cluster test is missing a non-empty value for "token"`),
},
},
{
name: "success",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
},
}},
secrets: []runtime.Object{&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test_secret",
},
Data: map[string][]byte{
clusterv1alpha1.SecretTokenKey: []byte("test_token"),
clusterv1alpha1.SecretCADataKey: testCA,
},
}},
},
want: want{
err: nil,
},
},
{
name: "has proxy",
args: args{
clusters: []runtime.Object{&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://localhost",
SecretRef: &clusterv1alpha1.LocalSecretReference{
Namespace: "default",
Name: "test_secret",
},
ProxyURL: "https://localhost",
},
}},
secrets: []runtime.Object{&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test_secret",
},
Data: map[string][]byte{
clusterv1alpha1.SecretTokenKey: []byte("test_token"),
clusterv1alpha1.SecretCADataKey: testCA,
},
}},
},
want: want{
err: nil,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
kubeClient := fake.NewSimpleClientset(tt.args.secrets...)
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
karmadaClient := karmadafake.NewSimpleClientset(tt.args.clusters...)
karmadaFactory := karmadainformers.NewSharedInformerFactory(karmadaClient, 0)
ctrl := &Controller{
clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(),
secretLister: kubeFactory.Core().V1().Secrets().Lister(),
}
stopCh := make(chan struct{})
defer close(stopCh)
karmadaFactory.Start(stopCh)
karmadaFactory.WaitForCacheSync(stopCh)
kubeFactory.Start(stopCh)
kubeFactory.WaitForCacheSync(stopCh)
client, err := ctrl.dynamicClientForCluster("test")
if !errorEquals(err, tt.want.err) {
t.Errorf("got error %v, want %v", err, tt.want.err)
return
}
if err != nil {
return
}
if client == nil {
t.Error("got client nil")
}
})
}
}
func errorEquals(a, b error) bool {
if a == b {
return true
}
if a == nil || b == nil {
return false
}
return a.Error() == b.Error()
}
type connectFunc func(ctx context.Context, gvr schema.GroupVersionResource, proxyPath string, responder rest.Responder) (http.Handler, error)
func (c connectFunc) connect(ctx context.Context, gvr schema.GroupVersionResource, proxyPath string, responder rest.Responder) (http.Handler, error) {
return c(ctx, gvr, proxyPath, responder)
}
func newCluster(name string) *clusterv1alpha1.Cluster {
c := &clusterv1alpha1.Cluster{}
c.Name = name
return c
}
type cacheFuncs struct {
UpdateCacheFunc func(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error
HasResourceFunc func(resource schema.GroupVersionResource) bool
GetResourceFromCacheFunc func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error)
StopFunc func()
}
var _ store.Cache = &cacheFuncs{}
func (c *cacheFuncs) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error {
if c.UpdateCacheFunc == nil {
panic("implement me")
}
return c.UpdateCacheFunc(resourcesByCluster)
}
func (c *cacheFuncs) HasResource(resource schema.GroupVersionResource) bool {
if c.HasResourceFunc == nil {
panic("implement me")
}
return c.HasResourceFunc(resource)
}
func (c *cacheFuncs) GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) {
if c.GetResourceFromCacheFunc == nil {
panic("implement me")
}
return c.GetResourceFromCacheFunc(ctx, gvr, namespace, name)
}
func (c *cacheFuncs) Stop() {
if c.StopFunc != nil {
c.StopFunc()
}
}

View File

@ -0,0 +1,68 @@
package framework
import (
"context"
"net/http"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
)
// Proxy connects to a backend
type Proxy interface {
// Connect returns a http.Handler to connect to backend.
Connect(ctx context.Context, request ProxyRequest) (http.Handler, error)
}
// Plugin for the proxy plugin framework
type Plugin interface {
Proxy
/*
* Order should return a constant int value. Smaller int value means higher priority.
*
* # Why order matters?
*
* The proxy plugin framework uses a variant of "Chain of Responsibility Pattern".
* There will be only one plugin selected. Smaller order value means this plugin has the chance to
* handle the request first. Only if the preceding plugin decide to not process the request,
* the next plugin can have the chance to process it.
*
* # Pattern Language Explanation
*
* "Chain of Responsibility Pattern" From "Design Patterns - Elements of Reusable Object-Oriented Software"
*
* Avoid coupling the sender of a request to its receiver by giving more than one object a chance
* to handle the request. Chain the receiving objects and pass the request along the chain until
* an object handles it.
*
* "Pipes and Filters Pattern" From "Pattern-Oriented Software Architecture Vol.1"
*
* The Pipes and Filters architectural pattern provides a structure for systems that process
* a stream of data. Each processing step is encapsulated in a filter component. Data is passed
* through pipes between adjacent filters. Recombining filters allows you to build families of
* related systems.
*
* Note the difference between "Chain of Responsibility Pattern" and "Pipes and Filters Pattern".
* Some other plugin frameworks in kubernetes use "Pipes and Filters Pattern",
* they will run multiple filters all together, so the order may not be as important as in
* "Chain of Responsibility Pattern"
*/
Order() int
// SupportRequest returns true if this plugin support the request, false if not support.
// If this method return false, the request will skip this plugin.
SupportRequest(request ProxyRequest) bool
}
// ProxyRequest holds parameter for Proxy.Connect()
type ProxyRequest struct {
RequestInfo *request.RequestInfo
GroupVersionResource schema.GroupVersionResource
ProxyPath string
Responder rest.Responder
HTTPReq *http.Request
}

View File

@ -1,4 +1,4 @@
package proxy
package cache
import (
"context"
@ -13,42 +13,66 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/kubernetes/scheme"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
)
// cacheProxy caches resource from member clusters, and handle the read request(get/list/watch) for resources.
type cacheProxy struct {
store store.RESTReader
const (
// We keep a big gap between in-tree plugins, to allow users to insert custom plugins between them.
order = 1000
)
// Cache caches resource from member clusters, and handle the read request(get/list/watch) for resources.
// For reading requests, we redirect them to cache.
// Users call these requests to read resources in member clusters, such as pods and nodes.
type Cache struct {
store store.Store
restMapper meta.RESTMapper
minRequestTimeout time.Duration
}
func newCacheProxy(store store.RESTReader, restMapper meta.RESTMapper, minRequestTimeout time.Duration) *cacheProxy {
return &cacheProxy{
store: store,
restMapper: restMapper,
minRequestTimeout: minRequestTimeout,
}
var _ framework.Plugin = (*Cache)(nil)
// New creates an instance of Cache
func New(dep pluginruntime.PluginDependency) (framework.Plugin, error) {
return &Cache{
store: dep.Store,
restMapper: dep.RestMapper,
minRequestTimeout: dep.MinRequestTimeout,
}, nil
}
func (c *cacheProxy) connect(ctx context.Context, _ schema.GroupVersionResource, _ string, _ rest.Responder) (http.Handler, error) {
requestInfo, _ := request.RequestInfoFrom(ctx)
gvr := schema.GroupVersionResource{
Group: requestInfo.APIGroup,
Version: requestInfo.APIVersion,
Resource: requestInfo.Resource,
}
// Order implements Plugin
func (c *Cache) Order() int {
return order
}
// SupportRequest implements Plugin
func (c *Cache) SupportRequest(request framework.ProxyRequest) bool {
requestInfo := request.RequestInfo
return requestInfo.IsResourceRequest &&
c.store.HasResource(request.GroupVersionResource) &&
requestInfo.Subresource == "" &&
(requestInfo.Verb == "get" ||
requestInfo.Verb == "list" ||
requestInfo.Verb == "watch")
}
// Connect implements Plugin
func (c *Cache) Connect(ctx context.Context, request framework.ProxyRequest) (http.Handler, error) {
requestInfo := request.RequestInfo
r := &rester{
store: c.store,
gvr: gvr,
tableConvertor: rest.NewDefaultTableConvertor(gvr.GroupResource()),
gvr: request.GroupVersionResource,
tableConvertor: rest.NewDefaultTableConvertor(request.GroupVersionResource.GroupResource()),
}
gvk, err := c.restMapper.KindFor(gvr)
gvk, err := c.restMapper.KindFor(request.GroupVersionResource)
if err != nil {
return nil, err
}
@ -59,7 +83,7 @@ func (c *cacheProxy) connect(ctx context.Context, _ schema.GroupVersionResource,
scope := &handlers.RequestScope{
Kind: gvk,
Resource: gvr,
Resource: request.GroupVersionResource,
Namer: &handlers.ContextBasedNaming{
Namer: meta.NewAccessor(),
ClusterScoped: mapping.Scope.Name() == meta.RESTScopeNameRoot,
@ -81,7 +105,7 @@ func (c *cacheProxy) connect(ctx context.Context, _ schema.GroupVersionResource,
}
type rester struct {
store store.RESTReader
store store.Store
gvr schema.GroupVersionResource
tableConvertor rest.TableConvertor
}

View File

@ -1,4 +1,4 @@
package proxy
package cache
import (
"context"
@ -10,7 +10,6 @@ import (
"time"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -21,18 +20,11 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
"github.com/karmada-io/karmada/pkg/util/lifted"
)
var (
podGVR = corev1.SchemeGroupVersion.WithResource("pods")
nodeGVR = corev1.SchemeGroupVersion.WithResource("nodes")
secretGVR = corev1.SchemeGroupVersion.WithResource("secret")
clusterGVR = clusterv1alpha1.SchemeGroupVersion.WithResource("cluster")
)
func TestCacheProxy_connect(t *testing.T) {
type args struct {
url string
@ -46,8 +38,8 @@ func TestCacheProxy_connect(t *testing.T) {
}
var actual want
p := &cacheProxy{
store: &restReaderFuncs{
p := &Cache{
store: &proxytest.MockStore{
GetFunc: func(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) {
actual = want{}
actual.namespace = request.NamespaceValue(ctx)
@ -76,7 +68,7 @@ func TestCacheProxy_connect(t *testing.T) {
return w, nil
},
},
restMapper: restMapper,
restMapper: proxytest.RestMapper,
}
tests := []struct {
name string
@ -90,7 +82,7 @@ func TestCacheProxy_connect(t *testing.T) {
},
want: want{
name: "foo",
gvr: nodeGVR,
gvr: proxytest.NodeGVR,
getOptions: &metav1.GetOptions{},
},
},
@ -102,7 +94,7 @@ func TestCacheProxy_connect(t *testing.T) {
want: want{
namespace: "default",
name: "foo",
gvr: podGVR,
gvr: proxytest.PodGVR,
getOptions: &metav1.GetOptions{},
},
},
@ -114,7 +106,7 @@ func TestCacheProxy_connect(t *testing.T) {
want: want{
namespace: "default",
name: "foo",
gvr: podGVR,
gvr: proxytest.PodGVR,
getOptions: &metav1.GetOptions{ResourceVersion: "1000"},
},
},
@ -124,7 +116,7 @@ func TestCacheProxy_connect(t *testing.T) {
url: "/api/v1/nodes",
},
want: want{
gvr: nodeGVR,
gvr: proxytest.NodeGVR,
listOptions: &metainternalversion.ListOptions{},
},
},
@ -135,7 +127,7 @@ func TestCacheProxy_connect(t *testing.T) {
},
want: want{
namespace: "default",
gvr: podGVR,
gvr: proxytest.PodGVR,
listOptions: &metainternalversion.ListOptions{},
},
},
@ -146,7 +138,7 @@ func TestCacheProxy_connect(t *testing.T) {
},
want: want{
namespace: "default",
gvr: podGVR,
gvr: proxytest.PodGVR,
listOptions: &metainternalversion.ListOptions{
LabelSelector: asLabelSelector("app=foo"),
FieldSelector: fields.OneTermEqualSelector("metadata.name", "bar"),
@ -161,7 +153,7 @@ func TestCacheProxy_connect(t *testing.T) {
url: "/api/v1/nodes?watch=true",
},
want: want{
gvr: nodeGVR,
gvr: proxytest.NodeGVR,
listOptions: &metainternalversion.ListOptions{
LabelSelector: labels.NewSelector(),
FieldSelector: fields.Everything(),
@ -176,7 +168,7 @@ func TestCacheProxy_connect(t *testing.T) {
},
want: want{
namespace: "default",
gvr: podGVR,
gvr: proxytest.PodGVR,
listOptions: &metainternalversion.ListOptions{
LabelSelector: labels.NewSelector(),
FieldSelector: fields.Everything(),
@ -191,7 +183,7 @@ func TestCacheProxy_connect(t *testing.T) {
},
want: want{
namespace: "default",
gvr: podGVR,
gvr: proxytest.PodGVR,
listOptions: &metainternalversion.ListOptions{
LabelSelector: asLabelSelector("app=foo"),
FieldSelector: fields.OneTermEqualSelector("metadata.name", "bar"),
@ -219,7 +211,19 @@ func TestCacheProxy_connect(t *testing.T) {
req = req.WithContext(request.WithNamespace(req.Context(), requestInfo.Namespace))
}
h, err := p.connect(req.Context(), podGVR, "", nil)
gvr := schema.GroupVersionResource{
Group: requestInfo.APIGroup,
Version: requestInfo.APIVersion,
Resource: requestInfo.Resource,
}
h, err := p.Connect(req.Context(), framework.ProxyRequest{
RequestInfo: requestInfo,
GroupVersionResource: gvr,
ProxyPath: "",
Responder: nil,
HTTPReq: req,
})
if err != nil {
t.Error(err)
return
@ -244,35 +248,6 @@ func TestCacheProxy_connect(t *testing.T) {
}
}
type restReaderFuncs struct {
GetFunc func(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error)
ListFunc func(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error)
WatchFunc func(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error)
}
var _ store.RESTReader = &restReaderFuncs{}
func (r *restReaderFuncs) Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) {
if r.GetFunc == nil {
panic("implement me")
}
return r.GetFunc(ctx, gvr, name, options)
}
func (r *restReaderFuncs) List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error) {
if r.GetFunc == nil {
panic("implement me")
}
return r.ListFunc(ctx, gvr, options)
}
func (r *restReaderFuncs) Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) {
if r.GetFunc == nil {
panic("implement me")
}
return r.WatchFunc(ctx, gvr, options)
}
type emptyResponseWriter struct{}
var _ http.ResponseWriter = &emptyResponseWriter{}

View File

@ -1,58 +1,76 @@
package proxy
package cluster
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"path"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
listcorev1 "k8s.io/client-go/listers/core/v1"
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
"github.com/karmada-io/karmada/pkg/util/proxy"
)
// clusterProxy proxy to member clusters
type clusterProxy struct {
store store.Cache
const (
// We keep a big gap between in-tree plugins, to allow users to insert custom plugins between them.
order = 2000
)
// Cluster proxies the remaining requests to member clusters:
// - writing resources.
// - or subresource requests, e.g. `pods/log`
// We firstly find the resource from cache, and get the located cluster. Then redirect the request to the cluster.
type Cluster struct {
store store.Store
clusterLister clusterlisters.ClusterLister
secretLister listcorev1.SecretLister
}
func newClusterProxy(store store.Cache, clusterLister clusterlisters.ClusterLister, secretLister listcorev1.SecretLister) *clusterProxy {
return &clusterProxy{
store: store,
var _ framework.Plugin = (*Cluster)(nil)
// New creates an instance of Cluster
func New(dep pluginruntime.PluginDependency) (framework.Plugin, error) {
secretLister := dep.KubeFactory.Core().V1().Secrets().Lister()
clusterLister := dep.KarmadaFactory.Cluster().V1alpha1().Clusters().Lister()
return &Cluster{
store: dep.Store,
clusterLister: clusterLister,
secretLister: secretLister,
}
}, nil
}
func (c *clusterProxy) connect(ctx context.Context, gvr schema.GroupVersionResource, proxyPath string, responder rest.Responder) (http.Handler, error) {
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
return nil, fmt.Errorf("missing requestInfo")
}
// Order implements Plugin
func (c *Cluster) Order() int {
return order
}
// SupportRequest implements Plugin
func (c *Cluster) SupportRequest(request framework.ProxyRequest) bool {
return request.RequestInfo.IsResourceRequest && c.store.HasResource(request.GroupVersionResource)
}
// Connect implements Plugin
func (c *Cluster) Connect(ctx context.Context, request framework.ProxyRequest) (http.Handler, error) {
requestInfo := request.RequestInfo
if requestInfo.Verb == "create" {
return nil, apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb)
return nil, apierrors.NewMethodNotSupported(request.GroupVersionResource.GroupResource(), requestInfo.Verb)
}
_, clusterName, err := c.store.GetResourceFromCache(ctx, gvr, requestInfo.Namespace, requestInfo.Name)
_, clusterName, err := c.store.GetResourceFromCache(ctx, request.GroupVersionResource, requestInfo.Namespace, requestInfo.Name)
if err != nil {
return nil, err
}
h, err := c.connectCluster(ctx, clusterName, proxyPath, responder)
h, err := proxy.ConnectCluster(ctx, c.clusterLister, c.secretLister, clusterName, request.ProxyPath, request.Responder)
if err != nil {
return nil, err
}
@ -65,33 +83,13 @@ func (c *clusterProxy) connect(ctx context.Context, gvr schema.GroupVersionResou
// So before update, we shall recover these fields.
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if err = modifyRequest(req, clusterName); err != nil {
responder.Error(err)
request.Responder.Error(err)
return
}
h.ServeHTTP(rw, req)
}), nil
}
func (c *clusterProxy) connectCluster(ctx context.Context, clusterName string, proxyPath string, responder rest.Responder) (http.Handler, error) {
cluster, err := c.clusterLister.Get(clusterName)
if err != nil {
return nil, err
}
location, transport, err := proxy.Location(cluster.Name, cluster.Spec.APIEndpoint, cluster.Spec.ProxyURL)
if err != nil {
return nil, err
}
location.Path = path.Join(location.Path, proxyPath)
secretGetter := func(context.Context, string) (*corev1.Secret, error) {
if cluster.Spec.ImpersonatorSecretRef == nil {
return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name)
}
return c.secretLister.Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(cluster.Spec.ImpersonatorSecretRef.Name)
}
return proxy.ConnectCluster(ctx, cluster.Name, location, transport, responder, secretGetter)
}
func modifyRequest(req *http.Request, cluster string) error {
if req.ContentLength == 0 {
return nil

View File

@ -1,4 +1,4 @@
package proxy
package cluster
import (
"bytes"
@ -30,7 +30,9 @@ import (
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake"
karmadainformers "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
)
func TestModifyRequest(t *testing.T) {
@ -167,13 +169,13 @@ func Test_clusterProxy_connect(t *testing.T) {
reqCtx := request.WithUser(context.TODO(), &user.DefaultInfo{})
type fields struct {
store store.Cache
store store.Store
secrets []runtime.Object
clusters []runtime.Object
}
type args struct {
ctx context.Context
request *http.Request
requestInfo *request.RequestInfo
request *http.Request
}
type want struct {
err error
@ -185,37 +187,27 @@ func Test_clusterProxy_connect(t *testing.T) {
args args
want want
}{
{
name: "missing requestInfo",
fields: fields{},
args: args{
ctx: context.TODO(),
},
want: want{
err: errors.New("missing requestInfo"),
},
},
{
name: "create not supported",
fields: fields{},
args: args{
ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "create"}),
requestInfo: &request.RequestInfo{Verb: "create"},
},
want: want{
err: apierrors.NewMethodNotSupported(podGVR.GroupResource(), "create"),
err: apierrors.NewMethodNotSupported(proxytest.PodGVR.GroupResource(), "create"),
},
},
{
name: "get cache error",
fields: fields{
store: &cacheFuncs{
store: &proxytest.MockStore{
GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) {
return nil, "", errors.New("test error")
},
},
},
args: args{
ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}),
requestInfo: &request.RequestInfo{Verb: "get"},
},
want: want{
err: errors.New("test error"),
@ -224,23 +216,23 @@ func Test_clusterProxy_connect(t *testing.T) {
{
name: "cluster not found",
fields: fields{
store: &cacheFuncs{
store: &proxytest.MockStore{
GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) {
return nil, "cluster1", nil
},
},
},
args: args{
ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}),
requestInfo: &request.RequestInfo{Verb: "get"},
},
want: want{
err: apierrors.NewNotFound(clusterGVR.GroupResource(), "cluster1"),
err: apierrors.NewNotFound(proxytest.ClusterGVR.GroupResource(), "cluster1"),
},
},
{
name: "API endpoint of cluster cluster1 should not be empty",
fields: fields{
store: &cacheFuncs{
store: &proxytest.MockStore{
GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) {
return nil, "cluster1", nil
},
@ -251,7 +243,7 @@ func Test_clusterProxy_connect(t *testing.T) {
}},
},
args: args{
ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}),
requestInfo: &request.RequestInfo{Verb: "get"},
},
want: want{
err: errors.New("API endpoint of cluster cluster1 should not be empty"),
@ -260,7 +252,7 @@ func Test_clusterProxy_connect(t *testing.T) {
{
name: "impersonatorSecretRef is nil",
fields: fields{
store: &cacheFuncs{
store: &proxytest.MockStore{
GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) {
return nil, "cluster1", nil
},
@ -273,7 +265,7 @@ func Test_clusterProxy_connect(t *testing.T) {
}},
},
args: args{
ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}),
requestInfo: &request.RequestInfo{Verb: "get"},
},
want: want{
err: errors.New("the impersonatorSecretRef of cluster cluster1 is nil"),
@ -282,7 +274,7 @@ func Test_clusterProxy_connect(t *testing.T) {
{
name: "secret not found",
fields: fields{
store: &cacheFuncs{
store: &proxytest.MockStore{
GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) {
return nil, "cluster1", nil
},
@ -299,16 +291,16 @@ func Test_clusterProxy_connect(t *testing.T) {
}},
},
args: args{
ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}),
requestInfo: &request.RequestInfo{Verb: "get"},
},
want: want{
err: apierrors.NewNotFound(secretGVR.GroupResource(), "secret"),
err: apierrors.NewNotFound(proxytest.SecretGVR.GroupResource(), "secret"),
},
},
{
name: "response ok",
fields: fields{
store: &cacheFuncs{
store: &proxytest.MockStore{
GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) {
return nil, "cluster1", nil
},
@ -334,8 +326,8 @@ func Test_clusterProxy_connect(t *testing.T) {
}},
},
args: args{
ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}),
request: makeRequest(reqCtx, "GET", "/test", nil),
requestInfo: &request.RequestInfo{Verb: "get"},
request: makeRequest(reqCtx, "GET", "/test", nil),
},
want: want{
err: nil,
@ -345,7 +337,7 @@ func Test_clusterProxy_connect(t *testing.T) {
{
name: "update error",
fields: fields{
store: &cacheFuncs{
store: &proxytest.MockStore{
GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) {
return nil, "cluster1", nil
},
@ -371,7 +363,7 @@ func Test_clusterProxy_connect(t *testing.T) {
}},
},
args: args{
ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "update"}),
requestInfo: &request.RequestInfo{Verb: "update"},
request: (&http.Request{
Method: "PUT",
URL: &url.URL{Scheme: "https", Host: "localhost", Path: "/test"},
@ -393,7 +385,7 @@ func Test_clusterProxy_connect(t *testing.T) {
kubeFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.fields.secrets...), 0)
karmadaFactory := karmadainformers.NewSharedInformerFactory(karmadafake.NewSimpleClientset(tt.fields.clusters...), 0)
c := &clusterProxy{
c := &Cluster{
store: tt.fields.store,
clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(),
secretLister: kubeFactory.Core().V1().Secrets().Lister(),
@ -406,9 +398,15 @@ func Test_clusterProxy_connect(t *testing.T) {
response := httptest.NewRecorder()
h, err := c.connect(tt.args.ctx, podGVR, "/proxy", newTestResponder(response))
if !errorEquals(err, tt.want.err) {
t.Errorf("connect() error = %v, want %v", err, tt.want.err)
h, err := c.Connect(context.TODO(), framework.ProxyRequest{
RequestInfo: tt.args.requestInfo,
GroupVersionResource: proxytest.PodGVR,
ProxyPath: "/proxy",
Responder: proxytest.NewResponder(response),
HTTPReq: tt.args.request,
})
if !proxytest.ErrorMessageEquals(err, tt.want.err) {
t.Errorf("Connect() error = %v, want %v", err, tt.want.err)
return
}
if err != nil {

View File

@ -0,0 +1,80 @@
package karmada
import (
"context"
"net/http"
"net/url"
"path"
restclient "k8s.io/client-go/rest"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
"github.com/karmada-io/karmada/pkg/util/proxy"
)
const (
// We keep a big gap between in-tree plugins, to allow users to insert custom plugins between them.
order = 3000
)
// Karmada proxies requests to karmada control panel.
// For non-resource requests, or resources are not defined in ResourceRegistry,
// we redirect the requests to karmada apiserver.
// Usually the request are
// - api index, e.g.: `/api`, `/apis`
// - to workload created in karmada controller panel, such as deployments and services.
type Karmada struct {
proxyLocation *url.URL
proxyTransport http.RoundTripper
}
var _ framework.Plugin = (*Karmada)(nil)
// New creates an instance of Karmada
func New(dep pluginruntime.PluginDependency) (framework.Plugin, error) {
location, err := url.Parse(dep.RestConfig.Host)
if err != nil {
return nil, err
}
transport, err := restclient.TransportFor(dep.RestConfig)
if err != nil {
return nil, err
}
return &Karmada{
proxyLocation: location,
proxyTransport: transport,
}, nil
}
// Order implements Plugin
func (p *Karmada) Order() int {
return order
}
// SupportRequest implements Plugin
func (p *Karmada) SupportRequest(request framework.ProxyRequest) bool {
// This plugin's order is the last one. It's actually a fallback plugin.
// So we return true here to indicate we always support the request.
return true
}
// Connect implements Plugin
func (p *Karmada) Connect(_ context.Context, request framework.ProxyRequest) (http.Handler, error) {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
location, transport := p.resourceLocation()
location.Path = path.Join(location.Path, request.ProxyPath)
location.RawQuery = req.URL.RawQuery
handler := proxy.NewThrottledUpgradeAwareProxyHandler(
location, transport, true, false, request.Responder)
handler.ServeHTTP(rw, req)
}), nil
}
func (p *Karmada) resourceLocation() (*url.URL, http.RoundTripper) {
location := *p.proxyLocation
return &location, p.proxyTransport
}

View File

@ -1,15 +1,17 @@
package proxy
package karmada
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"k8s.io/apimachinery/pkg/runtime"
restclient "k8s.io/client-go/rest"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
)
func Test_karmadaProxy(t *testing.T) {
@ -65,14 +67,17 @@ func Test_karmadaProxy(t *testing.T) {
},
Timeout: time.Second * 1,
}
p, err := newKarmadaProxy(restConfig)
p, err := New(pluginruntime.PluginDependency{RestConfig: restConfig})
if err != nil {
t.Error(err)
return
}
response := httptest.NewRecorder()
h, err := p.connect(context.TODO(), podGVR, tt.args.path, newTestResponder(response))
h, err := p.Connect(context.TODO(), framework.ProxyRequest{
ProxyPath: tt.args.path,
Responder: proxytest.NewResponder(response),
})
if err != nil {
t.Error(err)
return
@ -101,28 +106,3 @@ func Test_karmadaProxy(t *testing.T) {
})
}
}
type testResponder struct {
resp *httptest.ResponseRecorder
}
func newTestResponder(response *httptest.ResponseRecorder) *testResponder {
return &testResponder{
resp: response,
}
}
func (f *testResponder) Object(statusCode int, obj runtime.Object) {
f.resp.Code = statusCode
if obj != nil {
err := json.NewEncoder(f.resp).Encode(obj)
if err != nil {
f.Error(err)
}
}
}
func (f *testResponder) Error(err error) {
_, _ = f.resp.WriteString(err.Error())
}

View File

@ -0,0 +1,20 @@
package plugins
import (
cacheplugin "github.com/karmada-io/karmada/pkg/search/proxy/framework/plugins/cache"
clusterplugin "github.com/karmada-io/karmada/pkg/search/proxy/framework/plugins/cluster"
karmadaplugin "github.com/karmada-io/karmada/pkg/search/proxy/framework/plugins/karmada"
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
)
// For detailed information of in tree plugins' execution order, please see:
// https://github.com/karmada-io/karmada/tree/master/docs/proposals/resource-aggregation-proxy#request-routing
// NewInTreeRegistry builds the registry with all the in-tree plugins.
func NewInTreeRegistry() pluginruntime.Registry {
return pluginruntime.Registry{
cacheplugin.New,
clusterplugin.New,
karmadaplugin.New,
}
}

View File

@ -0,0 +1,51 @@
package runtime
import (
"context"
"fmt"
"net/http"
"sort"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
)
// frameworkImpl select appropriate plugin to do `Connect()`
type frameworkImpl struct {
plugins []framework.Plugin
}
// frameworkImpl is actually a Proxy
var _ framework.Proxy = (*frameworkImpl)(nil)
// NewFramework create instance of framework.Proxy with determined order of Plugin.
func NewFramework(plugins []framework.Plugin) framework.Proxy {
sort.Slice(plugins, func(i, j int) bool {
return plugins[i].Order() < plugins[j].Order()
})
return &frameworkImpl{
plugins: plugins,
}
}
// Connect implements Proxy
func (c *frameworkImpl) Connect(ctx context.Context, request framework.ProxyRequest) (http.Handler, error) {
plugin, err := c.selectPlugin(request)
if err != nil {
return nil, err
}
return plugin.Connect(ctx, request)
}
// selectPlugin return an appropriate Plugin by query Plugin.SupportRequest in order.
func (c *frameworkImpl) selectPlugin(request framework.ProxyRequest) (framework.Plugin, error) {
for _, plugin := range c.plugins {
if plugin.SupportRequest(request) {
return plugin, nil
}
}
return nil, fmt.Errorf("no plugin found for request: %v %v",
request.RequestInfo.Verb, request.RequestInfo.Path)
}

View File

@ -0,0 +1,44 @@
package runtime
import (
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/informers"
"k8s.io/client-go/rest"
"github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
)
// PluginDependency holds dependency for plugins. It will be passed to PluginFactory when initializing Plugin.
type PluginDependency struct {
RestConfig *rest.Config
RestMapper meta.RESTMapper
KubeFactory informers.SharedInformerFactory
KarmadaFactory externalversions.SharedInformerFactory
MinRequestTimeout time.Duration
Store store.Store
}
// PluginFactory is the function to create a plugin.
type PluginFactory func(dep PluginDependency) (framework.Plugin, error)
// Registry is a collection of all available plugins. The framework uses a
// registry to enable and initialize configured plugins.
// All plugins must be in the registry before initializing the framework.
type Registry []PluginFactory
// Register adds a new plugin to the registry.
func (r *Registry) Register(factory PluginFactory) {
*r = append(*r, factory)
}
// Merge merges the provided registry to the current one.
func (r *Registry) Merge(in Registry) {
*r = append(*r, in...)
}

View File

@ -0,0 +1,28 @@
package runtime
import (
"testing"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
)
func emptyPluginFactory(dep PluginDependency) (framework.Plugin, error) {
return nil, nil
}
func TestRegistry_Register(t *testing.T) {
// test nil slice
var nilSlice Registry
t.Logf("nilSlice: %v, len: %v, cap: %v\n", nilSlice, len(nilSlice), cap(nilSlice))
nilSlice.Register(emptyPluginFactory)
// no panic
t.Logf("nilSlice: %v, len: %v, cap: %v\n", nilSlice, len(nilSlice), cap(nilSlice))
if len(nilSlice) != 1 {
t.Fatalf("slice len = %v, expected = 1", len(nilSlice))
}
}

View File

@ -1,53 +0,0 @@
package proxy
import (
"context"
"net/http"
"net/url"
"path"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/rest"
restclient "k8s.io/client-go/rest"
"github.com/karmada-io/karmada/pkg/util/proxy"
)
// karmadaProxy is proxy for karmada control panel
type karmadaProxy struct {
proxyLocation *url.URL
proxyTransport http.RoundTripper
}
func newKarmadaProxy(restConfig *restclient.Config) (*karmadaProxy, error) {
location, err := url.Parse(restConfig.Host)
if err != nil {
return nil, err
}
transport, err := restclient.TransportFor(restConfig)
if err != nil {
return nil, err
}
return &karmadaProxy{
proxyLocation: location,
proxyTransport: transport,
}, nil
}
// connect to Karmada-ApiServer directly
func (p *karmadaProxy) connect(_ context.Context, _ schema.GroupVersionResource, proxyPath string, responder rest.Responder) (http.Handler, error) {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
location, transport := p.resourceLocation()
location.Path = path.Join(location.Path, proxyPath)
location.RawQuery = req.URL.RawQuery
handler := proxy.NewThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder)
handler.ServeHTTP(rw, req)
}), nil
}
func (p *karmadaProxy) resourceLocation() (*url.URL, http.RoundTripper) {
location := *p.proxyLocation
return &location, p.proxyTransport
}

View File

@ -21,16 +21,13 @@ import (
"k8s.io/klog/v2"
)
// Cache an interface for cache.
type Cache interface {
// Store is the cache for resources from multiple member clusters
type Store interface {
UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error
HasResource(resource schema.GroupVersionResource) bool
GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error)
Stop()
}
// RESTReader supports get/list/watch rest.
type RESTReader interface {
Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error)
List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error)
Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error)
@ -46,8 +43,7 @@ type MultiClusterCache struct {
newClientFunc func(string) (dynamic.Interface, error)
}
var _ Cache = &MultiClusterCache{}
var _ RESTReader = &MultiClusterCache{}
var _ Store = &MultiClusterCache{}
// NewMultiClusterCache return a cache for resources from member clusters
func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error), restMapper meta.RESTMapper) *MultiClusterCache {

View File

@ -0,0 +1,32 @@
package testing
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1"
)
// variables for test
var (
PodGVK = corev1.SchemeGroupVersion.WithKind("Pod")
NodeGVK = corev1.SchemeGroupVersion.WithKind("Node")
PodGVR = corev1.SchemeGroupVersion.WithResource("pods")
NodeGVR = corev1.SchemeGroupVersion.WithResource("nodes")
SecretGVR = corev1.SchemeGroupVersion.WithResource("secret")
ClusterGVR = clusterv1alpha1.SchemeGroupVersion.WithResource("cluster")
PodSelector = searchv1alpha1.ResourceSelector{APIVersion: PodGVK.GroupVersion().String(), Kind: PodGVK.Kind}
NodeSelector = searchv1alpha1.ResourceSelector{APIVersion: NodeGVK.GroupVersion().String(), Kind: NodeGVK.Kind}
RestMapper *meta.DefaultRESTMapper
)
func init() {
RestMapper = meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
RestMapper.Add(PodGVK, meta.RESTScopeNamespace)
RestMapper.Add(NodeGVK, meta.RESTScopeRoot)
}

View File

@ -0,0 +1,17 @@
package testing
// ErrorMessageEquals compare if two error message is equal.
// 1. nil error == nil error
// 2. nil error != non nil error
// 3. Other wise compare error.Error() returned string
func ErrorMessageEquals(a, b error) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return a.Error() == b.Error()
}

View File

@ -0,0 +1,37 @@
package testing
import (
"encoding/json"
"net/http/httptest"
"k8s.io/apimachinery/pkg/runtime"
)
// MockResponder is a mock for `k8s.io/apiserver/pkg/registry/rest/rest.go:292 => Responder interface`
type MockResponder struct {
resp *httptest.ResponseRecorder
}
// NewResponder creates an instance of MockResponder
func NewResponder(response *httptest.ResponseRecorder) *MockResponder {
return &MockResponder{
resp: response,
}
}
// Object implements Responder interface
func (f *MockResponder) Object(statusCode int, obj runtime.Object) {
f.resp.Code = statusCode
if obj != nil {
err := json.NewEncoder(f.resp).Encode(obj)
if err != nil {
f.Error(err)
}
}
}
// Error implements Responder interface
func (f *MockResponder) Error(err error) {
_, _ = f.resp.WriteString(err.Error())
}

View File

@ -0,0 +1,84 @@
package testing
import (
"context"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
)
// MockStore is a mock for store.Store interface
type MockStore struct {
UpdateCacheFunc func(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error
HasResourceFunc func(resource schema.GroupVersionResource) bool
GetResourceFromCacheFunc func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error)
StopFunc func()
GetFunc func(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error)
ListFunc func(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error)
WatchFunc func(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error)
}
var _ store.Store = &MockStore{}
// UpdateCache implements store.Store interface
func (c *MockStore) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error {
if c.UpdateCacheFunc == nil {
panic("implement me")
}
return c.UpdateCacheFunc(resourcesByCluster)
}
// HasResource implements store.Store interface
func (c *MockStore) HasResource(resource schema.GroupVersionResource) bool {
if c.HasResourceFunc == nil {
panic("implement me")
}
return c.HasResourceFunc(resource)
}
// GetResourceFromCache implements store.Store interface
func (c *MockStore) GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) {
if c.GetResourceFromCacheFunc == nil {
panic("implement me")
}
return c.GetResourceFromCacheFunc(ctx, gvr, namespace, name)
}
// Stop implements store.Store interface
func (c *MockStore) Stop() {
if c.StopFunc != nil {
c.StopFunc()
}
}
// Get implements store.Store interface
func (c *MockStore) Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) {
if c.GetFunc == nil {
panic("implement me")
}
return c.GetFunc(ctx, gvr, name, options)
}
// List implements store.Store interface
func (c *MockStore) List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error) {
if c.ListFunc == nil {
panic("implement me")
}
return c.ListFunc(ctx, gvr, options)
}
// Watch implements store.Store interface
func (c *MockStore) Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) {
if c.WatchFunc == nil {
panic("implement me")
}
return c.WatchFunc(ctx, gvr, options)
}

View File

@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"net/url"
"path"
authenticationv1 "k8s.io/api/authentication/v1"
corev1 "k8s.io/api/core/v1"
@ -16,18 +17,38 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
listcorev1 "k8s.io/client-go/listers/core/v1"
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
)
// NewThrottledUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval. Responder is required for returning
// errors to the caller.
func NewThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
return proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
}
// todo: consider share logic with pkg/registry/cluster/storage/proxy.go:53
// ConnectCluster returns a handler for proxy cluster.
func ConnectCluster(ctx context.Context, clusterName string, location *url.URL, transport http.RoundTripper, responder rest.Responder,
func ConnectCluster(ctx context.Context,
clusterLister clusterlisters.ClusterLister, secretLister listcorev1.SecretLister,
clusterName string, proxyPath string, responder rest.Responder) (http.Handler, error) {
cluster, err := clusterLister.Get(clusterName)
if err != nil {
return nil, err
}
location, transport, err := Location(cluster.Name, cluster.Spec.APIEndpoint, cluster.Spec.ProxyURL)
if err != nil {
return nil, err
}
location.Path = path.Join(location.Path, proxyPath)
secretGetter := func(context.Context, string) (*corev1.Secret, error) {
if cluster.Spec.ImpersonatorSecretRef == nil {
return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name)
}
return secretLister.Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(cluster.Spec.ImpersonatorSecretRef.Name)
}
return connectCluster(ctx, cluster.Name, location, transport, responder, secretGetter)
}
func connectCluster(ctx context.Context, clusterName string, location *url.URL, transport http.RoundTripper, responder rest.Responder,
impersonateSecretGetter func(context.Context, string) (*corev1.Secret, error)) (http.Handler, error) {
secret, err := impersonateSecretGetter(ctx, clusterName)
if err != nil {
@ -42,6 +63,12 @@ func ConnectCluster(ctx context.Context, clusterName string, location *url.URL,
return newProxyHandler(location, transport, impersonateToken, responder)
}
// NewThrottledUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval. Responder is required for returning
// errors to the caller.
func NewThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
return proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
}
// Location returns a URL to which one can send traffic for the specified cluster.
func Location(clusterName string, apiEndpoint string, proxyURL string) (*url.URL, http.RoundTripper, error) {
location, err := constructLocation(clusterName, apiEndpoint)