From dd1b6e29673c82ee47f769e341c31e2eb0ac5354 Mon Sep 17 00:00:00 2001 From: yingjinhui Date: Wed, 17 Aug 2022 18:25:56 +0800 Subject: [PATCH] add proxy skeleton Signed-off-by: yingjinhui --- cmd/karmada-search/app/options/options.go | 50 +++++- pkg/apis/search/register.go | 1 + pkg/apis/search/searchregistry_types.go | 7 + .../search/v1alpha1/searchregistry_types.go | 7 + .../v1alpha1/zz_generated.conversion.go | 28 ++++ .../search/v1alpha1/zz_generated.deepcopy.go | 25 +++ .../search/v1alpha1/zz_generated.register.go | 1 + pkg/apis/search/zz_generated.deepcopy.go | 25 +++ pkg/generated/openapi/zz_generated.openapi.go | 28 ++++ pkg/registry/search/storage/proxy.go | 72 +++++++++ pkg/search/apiserver.go | 23 ++- pkg/search/proxy/cache_proxy.go | 22 +++ pkg/search/proxy/cluster_proxy.go | 42 +++++ pkg/search/proxy/controller.go | 150 ++++++++++++++++++ pkg/search/proxy/karmada_proxy.go | 51 ++++++ pkg/search/proxy/store/cluster_cache.go | 5 + pkg/search/proxy/store/multi_cluster_cache.go | 28 ++++ pkg/search/proxy/store/resource_cache.go | 5 + pkg/util/proxy/proxy.go | 130 +++++++++++++++ 19 files changed, 691 insertions(+), 9 deletions(-) create mode 100644 pkg/registry/search/storage/proxy.go create mode 100644 pkg/search/proxy/cache_proxy.go create mode 100644 pkg/search/proxy/cluster_proxy.go create mode 100644 pkg/search/proxy/controller.go create mode 100644 pkg/search/proxy/karmada_proxy.go create mode 100644 pkg/search/proxy/store/cluster_cache.go create mode 100644 pkg/search/proxy/store/multi_cluster_cache.go create mode 100644 pkg/search/proxy/store/resource_cache.go create mode 100644 pkg/util/proxy/proxy.go diff --git a/cmd/karmada-search/app/options/options.go b/cmd/karmada-search/app/options/options.go index 1f4465b96..b5da3c075 100644 --- a/cmd/karmada-search/app/options/options.go +++ b/cmd/karmada-search/app/options/options.go @@ -4,13 +4,18 @@ import ( "context" "fmt" "net" + "net/http" + "path" "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/openapi" + "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" + genericfilters "k8s.io/apiserver/pkg/server/filters" genericoptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" @@ -18,9 +23,14 @@ import ( searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme" searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1" + karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" + informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi" "github.com/karmada-io/karmada/pkg/search" + "github.com/karmada-io/karmada/pkg/search/proxy" "github.com/karmada-io/karmada/pkg/sharedcli/profileflag" + "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/lifted" "github.com/karmada-io/karmada/pkg/version" ) @@ -89,12 +99,23 @@ func (o *Options) Run(ctx context.Context) error { return nil }) + server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-informers", func(context genericapiserver.PostStartHookContext) error { + config.KarmadaSharedInformerFactory.Start(context.StopCh) + return nil + }) + server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error { // start ResourceRegistry controller config.Controller.Start(context.StopCh) return nil }) + server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-proxy-controller", func(context genericapiserver.PostStartHookContext) error { + // start ResourceRegistry controller + config.ProxyController.Start(context.StopCh) + return nil + }) + return server.GenericAPIServer.PrepareRun().Run(ctx.Done()) } @@ -108,6 +129,9 @@ func (o *Options) Config() (*search.Config, error) { o.RecommendedOptions.Features = &genericoptions.FeatureOptions{EnableProfiling: false} serverConfig := genericapiserver.NewRecommendedConfig(searchscheme.Codecs) + serverConfig.LongRunningFunc = customLongRunningRequestCheck( + sets.NewString("watch", "proxy"), + sets.NewString("attach", "exec", "proxy", "log", "portforward")) serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(searchscheme.Scheme)) serverConfig.OpenAPIConfig.Info.Title = "karmada-search" if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil { @@ -121,9 +145,31 @@ func (o *Options) Config() (*search.Config, error) { return nil, err } + karmadaClient := karmadaclientset.NewForConfigOrDie(serverConfig.ClientConfig) + factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0) + + proxyCtl, err := proxy.NewController(serverConfig.ClientConfig, genericmanager.GetInstance(), serverConfig.SharedInformerFactory, factory) + if err != nil { + return nil, err + } + config := &search.Config{ - GenericConfig: serverConfig, - Controller: ctl, + GenericConfig: serverConfig, + Controller: ctl, + ProxyController: proxyCtl, + KarmadaSharedInformerFactory: factory, } return config, nil } + +func customLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) request.LongRunningRequestCheck { + return func(r *http.Request, requestInfo *request.RequestInfo) bool { + if requestInfo.APIGroup == "search.karmada.io" && requestInfo.Resource == "proxying" { + reqClone := r.Clone(context.TODO()) + // requestInfo.Parts is like [proxying foo proxy api v1 nodes] + reqClone.URL.Path = "/" + path.Join(requestInfo.Parts[3:]...) + requestInfo = lifted.NewRequestInfo(reqClone) + } + return genericfilters.BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources)(r, requestInfo) + } +} diff --git a/pkg/apis/search/register.go b/pkg/apis/search/register.go index aab33702e..57998cae7 100644 --- a/pkg/apis/search/register.go +++ b/pkg/apis/search/register.go @@ -34,6 +34,7 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ResourceRegistry{}, &ResourceRegistryList{}, &Search{}, + &Proxying{}, ) return nil } diff --git a/pkg/apis/search/searchregistry_types.go b/pkg/apis/search/searchregistry_types.go index a3f85e11c..32612076d 100644 --- a/pkg/apis/search/searchregistry_types.go +++ b/pkg/apis/search/searchregistry_types.go @@ -105,3 +105,10 @@ type ResourceRegistryList struct { type Search struct { metav1.TypeMeta } + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// Proxying define a flag for resource proxying that do not have actual resources. +type Proxying struct { + metav1.TypeMeta +} diff --git a/pkg/apis/search/v1alpha1/searchregistry_types.go b/pkg/apis/search/v1alpha1/searchregistry_types.go index 1b953d748..7ebe12588 100644 --- a/pkg/apis/search/v1alpha1/searchregistry_types.go +++ b/pkg/apis/search/v1alpha1/searchregistry_types.go @@ -118,3 +118,10 @@ type ResourceRegistryList struct { type Search struct { metav1.TypeMeta `json:",inline"` } + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// Proxying define a flag for resource proxying that do not have actual resources. +type Proxying struct { + metav1.TypeMeta `json:",inline"` +} diff --git a/pkg/apis/search/v1alpha1/zz_generated.conversion.go b/pkg/apis/search/v1alpha1/zz_generated.conversion.go index 08bbd8dfe..43de6b522 100644 --- a/pkg/apis/search/v1alpha1/zz_generated.conversion.go +++ b/pkg/apis/search/v1alpha1/zz_generated.conversion.go @@ -41,6 +41,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*Proxying)(nil), (*search.Proxying)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_Proxying_To_search_Proxying(a.(*Proxying), b.(*search.Proxying), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*search.Proxying)(nil), (*Proxying)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_search_Proxying_To_v1alpha1_Proxying(a.(*search.Proxying), b.(*Proxying), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*ResourceRegistry)(nil), (*search.ResourceRegistry)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1alpha1_ResourceRegistry_To_search_ResourceRegistry(a.(*ResourceRegistry), b.(*search.ResourceRegistry), scope) }); err != nil { @@ -146,6 +156,24 @@ func Convert_search_OpenSearchConfig_To_v1alpha1_OpenSearchConfig(in *search.Ope return autoConvert_search_OpenSearchConfig_To_v1alpha1_OpenSearchConfig(in, out, s) } +func autoConvert_v1alpha1_Proxying_To_search_Proxying(in *Proxying, out *search.Proxying, s conversion.Scope) error { + return nil +} + +// Convert_v1alpha1_Proxying_To_search_Proxying is an autogenerated conversion function. +func Convert_v1alpha1_Proxying_To_search_Proxying(in *Proxying, out *search.Proxying, s conversion.Scope) error { + return autoConvert_v1alpha1_Proxying_To_search_Proxying(in, out, s) +} + +func autoConvert_search_Proxying_To_v1alpha1_Proxying(in *search.Proxying, out *Proxying, s conversion.Scope) error { + return nil +} + +// Convert_search_Proxying_To_v1alpha1_Proxying is an autogenerated conversion function. +func Convert_search_Proxying_To_v1alpha1_Proxying(in *search.Proxying, out *Proxying, s conversion.Scope) error { + return autoConvert_search_Proxying_To_v1alpha1_Proxying(in, out, s) +} + func autoConvert_v1alpha1_ResourceRegistry_To_search_ResourceRegistry(in *ResourceRegistry, out *search.ResourceRegistry, s conversion.Scope) error { out.ObjectMeta = in.ObjectMeta if err := Convert_v1alpha1_ResourceRegistrySpec_To_search_ResourceRegistrySpec(&in.Spec, &out.Spec, s); err != nil { diff --git a/pkg/apis/search/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/search/v1alpha1/zz_generated.deepcopy.go index 5fbffc2d6..5d172231b 100644 --- a/pkg/apis/search/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/search/v1alpha1/zz_generated.deepcopy.go @@ -53,6 +53,31 @@ func (in *OpenSearchConfig) DeepCopy() *OpenSearchConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Proxying) DeepCopyInto(out *Proxying) { + *out = *in + out.TypeMeta = in.TypeMeta + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Proxying. +func (in *Proxying) DeepCopy() *Proxying { + if in == nil { + return nil + } + out := new(Proxying) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Proxying) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResourceRegistry) DeepCopyInto(out *ResourceRegistry) { *out = *in diff --git a/pkg/apis/search/v1alpha1/zz_generated.register.go b/pkg/apis/search/v1alpha1/zz_generated.register.go index 70eb13117..42edd5d60 100644 --- a/pkg/apis/search/v1alpha1/zz_generated.register.go +++ b/pkg/apis/search/v1alpha1/zz_generated.register.go @@ -42,6 +42,7 @@ func init() { // Adds the list of known types to Scheme. func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, + &Proxying{}, &ResourceRegistry{}, &ResourceRegistryList{}, &Search{}, diff --git a/pkg/apis/search/zz_generated.deepcopy.go b/pkg/apis/search/zz_generated.deepcopy.go index ebb7f7a87..e996dbca8 100644 --- a/pkg/apis/search/zz_generated.deepcopy.go +++ b/pkg/apis/search/zz_generated.deepcopy.go @@ -53,6 +53,31 @@ func (in *OpenSearchConfig) DeepCopy() *OpenSearchConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Proxying) DeepCopyInto(out *Proxying) { + *out = *in + out.TypeMeta = in.TypeMeta + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Proxying. +func (in *Proxying) DeepCopy() *Proxying { + if in == nil { + return nil + } + out := new(Proxying) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Proxying) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResourceRegistry) DeepCopyInto(out *ResourceRegistry) { *out = *in diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 67496c412..82c31b524 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -71,6 +71,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1.StaticClusterWeight": schema_pkg_apis_policy_v1alpha1_StaticClusterWeight(ref), "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.BackendStoreConfig": schema_pkg_apis_search_v1alpha1_BackendStoreConfig(ref), "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.OpenSearchConfig": schema_pkg_apis_search_v1alpha1_OpenSearchConfig(ref), + "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.Proxying": schema_pkg_apis_search_v1alpha1_Proxying(ref), "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.ResourceRegistry": schema_pkg_apis_search_v1alpha1_ResourceRegistry(ref), "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.ResourceRegistryList": schema_pkg_apis_search_v1alpha1_ResourceRegistryList(ref), "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.ResourceRegistrySpec": schema_pkg_apis_search_v1alpha1_ResourceRegistrySpec(ref), @@ -3063,6 +3064,33 @@ func schema_pkg_apis_search_v1alpha1_OpenSearchConfig(ref common.ReferenceCallba } } +func schema_pkg_apis_search_v1alpha1_Proxying(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "Proxying define a flag for resource proxying that do not have actual resources.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + } +} + func schema_pkg_apis_search_v1alpha1_ResourceRegistry(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ diff --git a/pkg/registry/search/storage/proxy.go b/pkg/registry/search/storage/proxy.go new file mode 100644 index 000000000..866726a2f --- /dev/null +++ b/pkg/registry/search/storage/proxy.go @@ -0,0 +1,72 @@ +package storage + +import ( + "context" + "fmt" + "net/http" + "path" + + "k8s.io/apimachinery/pkg/runtime" + genericrequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/klog/v2" + + searchapis "github.com/karmada-io/karmada/pkg/apis/search" + "github.com/karmada-io/karmada/pkg/search/proxy" +) + +var proxyMethods = []string{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"} + +// ProxyingREST implements a RESTStorage for proxying resource. +type ProxyingREST struct { + ctl *proxy.Controller +} + +var _ rest.Scoper = &ProxyingREST{} +var _ rest.Storage = &ProxyingREST{} +var _ rest.Connecter = &ProxyingREST{} + +// NewProxyingREST returns a RESTStorage object that will work against search. +func NewProxyingREST(ctl *proxy.Controller) *ProxyingREST { + return &ProxyingREST{ + ctl: ctl, + } +} + +// New return empty Proxy object. +func (r *ProxyingREST) New() runtime.Object { + return &searchapis.Proxying{} +} + +// NamespaceScoped returns false because Proxy is not namespaced. +func (r *ProxyingREST) NamespaceScoped() bool { + return false +} + +// ConnectMethods returns the list of HTTP methods handled by Connect. +func (r *ProxyingREST) ConnectMethods() []string { + return proxyMethods +} + +// NewConnectOptions returns an empty options object that will be used to pass options to the Connect method. +func (r *ProxyingREST) NewConnectOptions() (runtime.Object, bool, string) { + return nil, true, "" +} + +// Connect returns a handler for proxy. +func (r *ProxyingREST) Connect(ctx context.Context, _ string, _ runtime.Object, responder rest.Responder) (http.Handler, error) { + info, ok := genericrequest.RequestInfoFrom(ctx) + if !ok { + return nil, fmt.Errorf("no RequestInfo found in the context") + } + + if len(info.Parts) < 2 { + return nil, fmt.Errorf("invalid requestInfo parts: %v", info.Parts) + } + + // For example, the whole request URL is /apis/search.karmada.io/v1alpha1/proxying/foo/proxy/api/v1/nodes + // info.Parts is [proxying foo proxy api v1 nodes], so proxyPath is /api/v1/nodes + proxyPath := "/" + path.Join(info.Parts[3:]...) + klog.V(4).Infof("ProxyingREST connect %v", proxyPath) + return r.ctl.Connect(ctx, proxyPath, responder) +} diff --git a/pkg/search/apiserver.go b/pkg/search/apiserver.go index 9b2a6facb..5e31f1812 100644 --- a/pkg/search/apiserver.go +++ b/pkg/search/apiserver.go @@ -8,23 +8,28 @@ import ( searchapis "github.com/karmada-io/karmada/pkg/apis/search" searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme" + informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" searchstorage "github.com/karmada-io/karmada/pkg/registry/search/storage" + "github.com/karmada-io/karmada/pkg/search/proxy" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" ) // ExtraConfig holds custom apiserver config type ExtraConfig struct { - MultiClusterInformerManager genericmanager.MultiClusterInformerManager - ClusterLister clusterlister.ClusterLister - + MultiClusterInformerManager genericmanager.MultiClusterInformerManager + ClusterLister clusterlister.ClusterLister + KarmadaSharedInformerFactory informerfactory.SharedInformerFactory + ProxyController *proxy.Controller // Add custom config if necessary. } // Config defines the config for the APIServer. type Config struct { - GenericConfig *genericapiserver.RecommendedConfig - Controller *Controller + GenericConfig *genericapiserver.RecommendedConfig + Controller *Controller + ProxyController *proxy.Controller + KarmadaSharedInformerFactory informerfactory.SharedInformerFactory } // APIServer contains state for karmada-search. @@ -47,8 +52,10 @@ func (cfg *Config) Complete() CompletedConfig { c := completedConfig{ cfg.GenericConfig.Complete(), &ExtraConfig{ - MultiClusterInformerManager: cfg.Controller.InformerManager, - ClusterLister: cfg.Controller.clusterLister, + MultiClusterInformerManager: cfg.Controller.InformerManager, + ClusterLister: cfg.Controller.clusterLister, + KarmadaSharedInformerFactory: cfg.KarmadaSharedInformerFactory, + ProxyController: cfg.ProxyController, }, } @@ -78,11 +85,13 @@ func (c completedConfig) New() (*APIServer, error) { return nil, err } searchREST := searchstorage.NewSearchREST(c.ExtraConfig.MultiClusterInformerManager, c.ExtraConfig.ClusterLister) + proxyingREST := searchstorage.NewProxyingREST(c.ExtraConfig.ProxyController) v1alpha1search := map[string]rest.Storage{} v1alpha1search["resourceregistries"] = resourceRegistryStorage.ResourceRegistry v1alpha1search["resourceregistries/status"] = resourceRegistryStorage.Status v1alpha1search["search"] = searchREST + v1alpha1search["proxying"] = proxyingREST apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"] = v1alpha1search if err = server.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil { diff --git a/pkg/search/proxy/cache_proxy.go b/pkg/search/proxy/cache_proxy.go new file mode 100644 index 000000000..f0d8df5c6 --- /dev/null +++ b/pkg/search/proxy/cache_proxy.go @@ -0,0 +1,22 @@ +package proxy + +import ( + "context" + "net/http" + + "github.com/karmada-io/karmada/pkg/search/proxy/store" +) + +type cacheProxy struct { + store *store.MultiClusterCache +} + +func (p *cacheProxy) connect(ctx context.Context) http.Handler { + return nil +} + +func newCacheProxy(store *store.MultiClusterCache) *cacheProxy { + return &cacheProxy{ + store: store, + } +} diff --git a/pkg/search/proxy/cluster_proxy.go b/pkg/search/proxy/cluster_proxy.go new file mode 100644 index 000000000..fb5359a5c --- /dev/null +++ b/pkg/search/proxy/cluster_proxy.go @@ -0,0 +1,42 @@ +package proxy + +import ( + "context" + "fmt" + "net/http" + "path" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apiserver/pkg/registry/rest" + listcorev1 "k8s.io/client-go/listers/core/v1" + + "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + "github.com/karmada-io/karmada/pkg/util/proxy" +) + +// clusterProxy proxy to member clusters +type clusterProxy struct { + secretLister listcorev1.SecretLister +} + +func newClusterProxy(secretLister listcorev1.SecretLister) *clusterProxy { + return &clusterProxy{ + secretLister: secretLister, + } +} + +func (c *clusterProxy) connect(ctx context.Context, cluster *v1alpha1.Cluster, proxyPath string, responder rest.Responder) (http.Handler, error) { + location, transport, err := proxy.Location(cluster.Name, cluster.Spec.APIEndpoint, cluster.Spec.ProxyURL) + if err != nil { + return nil, err + } + location.Path = path.Join(location.Path, proxyPath) + + secretGetter := func(context.Context, string) (*corev1.Secret, error) { + if cluster.Spec.ImpersonatorSecretRef == nil { + return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name) + } + return c.secretLister.Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(cluster.Spec.ImpersonatorSecretRef.Name) + } + return proxy.ConnectCluster(ctx, cluster.Name, location, transport, responder, secretGetter) +} diff --git a/pkg/search/proxy/controller.go b/pkg/search/proxy/controller.go new file mode 100644 index 000000000..beb423ad7 --- /dev/null +++ b/pkg/search/proxy/controller.go @@ -0,0 +1,150 @@ +package proxy + +import ( + "context" + "net/http" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/client-go/informers" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + + informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" + clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" + searchlisters "github.com/karmada-io/karmada/pkg/generated/listers/search/v1alpha1" + "github.com/karmada-io/karmada/pkg/search/proxy/store" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/lifted" +) + +const workKey = "key" + +// Controller syncs Cluster and GlobalResource. +type Controller struct { + clusterLister clusterlisters.ClusterLister + registryLister searchlisters.ResourceRegistryLister + worker util.AsyncWorker + informerManager genericmanager.MultiClusterInformerManager + store *store.MultiClusterCache + + // proxy + karmadaProxy *karmadaProxy + clusterProxy *clusterProxy + cacheProxy *cacheProxy +} + +// NewController create a controller for proxy +func NewController(restConfig *restclient.Config, informerManager genericmanager.MultiClusterInformerManager, factory informers.SharedInformerFactory, karmadaFactory informerfactory.SharedInformerFactory) (*Controller, error) { + kp, err := newKarmadaProxy(restConfig) + if err != nil { + return nil, err + } + + s := &store.MultiClusterCache{} + + ctl := &Controller{ + clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(), + registryLister: karmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(), + informerManager: informerManager, + store: s, + karmadaProxy: kp, + clusterProxy: newClusterProxy(factory.Core().V1().Secrets().Lister()), + cacheProxy: newCacheProxy(s), + } + + workerOptions := util.Options{ + Name: "proxy-controller", + KeyFunc: nil, + ReconcileFunc: ctl.reconcile, + } + ctl.worker = util.NewAsyncWorker(workerOptions) + + resourceEventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(interface{}) { + ctl.worker.Add(workKey) + }, + UpdateFunc: func(_, _ interface{}) { + ctl.worker.Add(workKey) + }, + DeleteFunc: func(interface{}) { + ctl.worker.Add(workKey) + }, + } + + karmadaFactory.Cluster().V1alpha1().Clusters().Informer().AddEventHandler(resourceEventHandler) + karmadaFactory.Search().V1alpha1().ResourceRegistries().Informer().AddEventHandler(resourceEventHandler) + + return ctl, nil +} + +// Start run the proxy controller +func (ctl *Controller) Start(stopCh <-chan struct{}) { + ctl.worker.Run(1, stopCh) +} + +// reconcile cache +func (ctl *Controller) reconcile(util.QueueKey) error { + return nil +} + +// Connect proxy and dispatch handlers +func (ctl *Controller) Connect(ctx context.Context, proxyPath string, responder rest.Responder) (http.Handler, error) { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + newReq := req.Clone(req.Context()) + newReq.URL.Path = proxyPath + requestInfo := lifted.NewRequestInfo(newReq) + + newCtx := request.WithRequestInfo(ctx, requestInfo) + newCtx = request.WithNamespace(newCtx, requestInfo.Namespace) + newReq = newReq.WithContext(newCtx) + + h, err := ctl.connect(newCtx, requestInfo, proxyPath, responder) + if err != nil { + responder.Error(err) + return + } + h.ServeHTTP(rw, newReq) + }), nil +} + +func (ctl *Controller) connect(ctx context.Context, requestInfo *request.RequestInfo, path string, responder rest.Responder) (http.Handler, error) { + gvr := schema.GroupVersionResource{ + Group: requestInfo.APIGroup, + Version: requestInfo.APIVersion, + Resource: requestInfo.Resource, + } + + // requests will be redirected to: + // 1. karmada apiserver + // 2. cache + // 3. member clusters + // see more information from https://github.com/karmada-io/karmada/tree/master/docs/proposals/resource-aggregation-proxy#request-routing + + // 1. For non-resource requests, or resources are not defined in ResourceRegistry, + // we redirect the requests to karmada apiserver. + // Usually the request are + // - api index, e.g.: `/api`, `/apis` + // - to workload created in karmada controller panel, such as deployments and services. + if !requestInfo.IsResourceRequest || !ctl.store.HasResource(gvr) { + return ctl.karmadaProxy.connect(path, responder) + } + + // 2. For reading requests, we redirect them to cache. + // Users call these requests to read resources in member clusters, such as pods and nodes. + if requestInfo.Subresource == "" && (requestInfo.Verb == "get" || requestInfo.Verb == "list" || requestInfo.Verb == "watch") { + return ctl.cacheProxy.connect(ctx), nil + } + + // 3. The remaining requests are: + // - writing resources. + // - or subresource requests, e.g. `pods/log` + // We firstly find the resource from cache, and get the located cluster. Then redirect the request to the cluster. + cluster, err := ctl.store.GetClusterForResource(ctx, gvr) + if err != nil { + return nil, err + } + return ctl.clusterProxy.connect(ctx, cluster, path, responder) +} diff --git a/pkg/search/proxy/karmada_proxy.go b/pkg/search/proxy/karmada_proxy.go new file mode 100644 index 000000000..36c9a1c73 --- /dev/null +++ b/pkg/search/proxy/karmada_proxy.go @@ -0,0 +1,51 @@ +package proxy + +import ( + "net/http" + "net/url" + "path" + + "k8s.io/apiserver/pkg/registry/rest" + restclient "k8s.io/client-go/rest" + + "github.com/karmada-io/karmada/pkg/util/proxy" +) + +// karmadaProxy is proxy for karmada control panel +type karmadaProxy struct { + proxyLocation *url.URL + proxyTransport http.RoundTripper +} + +func newKarmadaProxy(restConfig *restclient.Config) (*karmadaProxy, error) { + location, err := url.Parse(restConfig.Host) + if err != nil { + return nil, err + } + transport, err := restclient.TransportFor(restConfig) + if err != nil { + return nil, err + } + + return &karmadaProxy{ + proxyLocation: location, + proxyTransport: transport, + }, nil +} + +// connect to Karmada-ApiServer directly +func (p *karmadaProxy) connect(proxyPath string, responder rest.Responder) (http.Handler, error) { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + location, transport := p.resourceLocation() + location.Path = path.Join(location.Path, proxyPath) + location.RawQuery = req.URL.RawQuery + + handler := proxy.NewThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder) + handler.ServeHTTP(rw, req) + }), nil +} + +func (p *karmadaProxy) resourceLocation() (*url.URL, http.RoundTripper) { + location := *p.proxyLocation + return &location, p.proxyTransport +} diff --git a/pkg/search/proxy/store/cluster_cache.go b/pkg/search/proxy/store/cluster_cache.go new file mode 100644 index 000000000..86f206473 --- /dev/null +++ b/pkg/search/proxy/store/cluster_cache.go @@ -0,0 +1,5 @@ +package store + +// clusterCache caches resources for single member cluster +type clusterCache struct { +} diff --git a/pkg/search/proxy/store/multi_cluster_cache.go b/pkg/search/proxy/store/multi_cluster_cache.go new file mode 100644 index 000000000..b96518976 --- /dev/null +++ b/pkg/search/proxy/store/multi_cluster_cache.go @@ -0,0 +1,28 @@ +package store + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime/schema" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" +) + +// temporarily suppress lint: xxx is unused +// TODO: remove it after these structs are used +var _ = clusterCache{} +var _ = resourceCache{} + +// MultiClusterCache caches resource from multi member clusters +type MultiClusterCache struct { +} + +// HasResource return whether resource is cached. +func (c *MultiClusterCache) HasResource(_ schema.GroupVersionResource) bool { + return false +} + +// GetClusterForResource returns which cluster the resource belong to. +func (c *MultiClusterCache) GetClusterForResource(ctx context.Context, gvr schema.GroupVersionResource) (*clusterv1alpha1.Cluster, error) { + return nil, nil +} diff --git a/pkg/search/proxy/store/resource_cache.go b/pkg/search/proxy/store/resource_cache.go new file mode 100644 index 000000000..589239e8b --- /dev/null +++ b/pkg/search/proxy/store/resource_cache.go @@ -0,0 +1,5 @@ +package store + +// resourceCache cache one kind resource for single member cluster +type resourceCache struct { +} diff --git a/pkg/util/proxy/proxy.go b/pkg/util/proxy/proxy.go new file mode 100644 index 000000000..50d90a050 --- /dev/null +++ b/pkg/util/proxy/proxy.go @@ -0,0 +1,130 @@ +package proxy + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "net/http" + "net/url" + + authenticationv1 "k8s.io/api/authentication/v1" + corev1 "k8s.io/api/core/v1" + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/proxy" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + + clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" +) + +// NewThrottledUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval. Responder is required for returning +// errors to the caller. +func NewThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler { + return proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder)) +} + +// ConnectCluster returns a handler for proxy cluster. +func ConnectCluster(ctx context.Context, clusterName string, location *url.URL, transport http.RoundTripper, responder rest.Responder, + impersonateSecretGetter func(context.Context, string) (*corev1.Secret, error)) (http.Handler, error) { + secret, err := impersonateSecretGetter(ctx, clusterName) + if err != nil { + return nil, err + } + + impersonateToken, err := getImpersonateToken(clusterName, secret) + if err != nil { + return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", clusterName, err) + } + + return newProxyHandler(location, transport, impersonateToken, responder) +} + +// Location returns a URL to which one can send traffic for the specified cluster. +func Location(clusterName string, apiEndpoint string, proxyURL string) (*url.URL, http.RoundTripper, error) { + location, err := constructLocation(clusterName, apiEndpoint) + if err != nil { + return nil, nil, err + } + + transport, err := createProxyTransport(proxyURL) + if err != nil { + return nil, nil, err + } + + return location, transport, nil +} + +func constructLocation(clusterName string, apiEndpoint string) (*url.URL, error) { + if apiEndpoint == "" { + return nil, fmt.Errorf("API endpoint of cluster %s should not be empty", clusterName) + } + + uri, err := url.Parse(apiEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to parse api endpoint %s: %v", apiEndpoint, err) + } + return uri, nil +} + +func createProxyTransport(proxyURL string) (*http.Transport, error) { + var proxyDialerFn utilnet.DialFunc + proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} // #nosec + trans := utilnet.SetTransportDefaults(&http.Transport{ + DialContext: proxyDialerFn, + TLSClientConfig: proxyTLSClientConfig, + }) + + if proxyURL != "" { + u, err := url.Parse(proxyURL) + if err != nil { + return nil, fmt.Errorf("failed to parse url of proxy url %s: %v", proxyURL, err) + } + trans.Proxy = http.ProxyURL(u) + } + return trans, nil +} + +func getImpersonateToken(clusterName string, secret *corev1.Secret) (string, error) { + token, found := secret.Data[clusterapis.SecretTokenKey] + if !found { + return "", fmt.Errorf("the impresonate token of cluster %s is empty", clusterName) + } + return string(token), nil +} + +func newProxyHandler(location *url.URL, transport http.RoundTripper, impersonateToken string, responder rest.Responder) (http.Handler, error) { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + requester, exist := request.UserFrom(req.Context()) + if !exist { + responsewriters.InternalError(rw, req, errors.New("no user found for request")) + return + } + req.Header.Set(authenticationv1.ImpersonateUserHeader, requester.GetName()) + for _, group := range requester.GetGroups() { + if !skipGroup(group) { + req.Header.Add(authenticationv1.ImpersonateGroupHeader, group) + } + } + + req.Header.Set("Authorization", fmt.Sprintf("bearer %s", impersonateToken)) + + // Retain RawQuery in location because upgrading the request will use it. + // See https://github.com/karmada-io/karmada/issues/1618#issuecomment-1103793290 for more info. + location.RawQuery = req.URL.RawQuery + + handler := NewThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder) + handler.ServeHTTP(rw, req) + }), nil +} + +func skipGroup(group string) bool { + switch group { + case user.AllAuthenticated, user.AllUnauthenticated: + return true + default: + return false + } +}