Merge pull request #2366 from ikaven1024/feature-proxy

add proxy skeleton
This commit is contained in:
karmada-bot 2022-08-18 08:46:45 +08:00 committed by GitHub
commit dd7d70ae6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 691 additions and 9 deletions

View File

@ -4,13 +4,18 @@ import (
"context"
"fmt"
"net"
"net/http"
"path"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
genericfilters "k8s.io/apiserver/pkg/server/filters"
genericoptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
@ -18,9 +23,14 @@ import (
searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme"
searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi"
"github.com/karmada-io/karmada/pkg/search"
"github.com/karmada-io/karmada/pkg/search/proxy"
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/lifted"
"github.com/karmada-io/karmada/pkg/version"
)
@ -89,12 +99,23 @@ func (o *Options) Run(ctx context.Context) error {
return nil
})
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-informers", func(context genericapiserver.PostStartHookContext) error {
config.KarmadaSharedInformerFactory.Start(context.StopCh)
return nil
})
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error {
// start ResourceRegistry controller
config.Controller.Start(context.StopCh)
return nil
})
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-proxy-controller", func(context genericapiserver.PostStartHookContext) error {
// start ResourceRegistry controller
config.ProxyController.Start(context.StopCh)
return nil
})
return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
}
@ -108,6 +129,9 @@ func (o *Options) Config() (*search.Config, error) {
o.RecommendedOptions.Features = &genericoptions.FeatureOptions{EnableProfiling: false}
serverConfig := genericapiserver.NewRecommendedConfig(searchscheme.Codecs)
serverConfig.LongRunningFunc = customLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"))
serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(searchscheme.Scheme))
serverConfig.OpenAPIConfig.Info.Title = "karmada-search"
if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil {
@ -121,9 +145,31 @@ func (o *Options) Config() (*search.Config, error) {
return nil, err
}
karmadaClient := karmadaclientset.NewForConfigOrDie(serverConfig.ClientConfig)
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
proxyCtl, err := proxy.NewController(serverConfig.ClientConfig, genericmanager.GetInstance(), serverConfig.SharedInformerFactory, factory)
if err != nil {
return nil, err
}
config := &search.Config{
GenericConfig: serverConfig,
Controller: ctl,
ProxyController: proxyCtl,
KarmadaSharedInformerFactory: factory,
}
return config, nil
}
func customLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) request.LongRunningRequestCheck {
return func(r *http.Request, requestInfo *request.RequestInfo) bool {
if requestInfo.APIGroup == "search.karmada.io" && requestInfo.Resource == "proxying" {
reqClone := r.Clone(context.TODO())
// requestInfo.Parts is like [proxying foo proxy api v1 nodes]
reqClone.URL.Path = "/" + path.Join(requestInfo.Parts[3:]...)
requestInfo = lifted.NewRequestInfo(reqClone)
}
return genericfilters.BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources)(r, requestInfo)
}
}

View File

@ -34,6 +34,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&ResourceRegistry{},
&ResourceRegistryList{},
&Search{},
&Proxying{},
)
return nil
}

View File

@ -105,3 +105,10 @@ type ResourceRegistryList struct {
type Search struct {
metav1.TypeMeta
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// Proxying define a flag for resource proxying that do not have actual resources.
type Proxying struct {
metav1.TypeMeta
}

View File

@ -118,3 +118,10 @@ type ResourceRegistryList struct {
type Search struct {
metav1.TypeMeta `json:",inline"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// Proxying define a flag for resource proxying that do not have actual resources.
type Proxying struct {
metav1.TypeMeta `json:",inline"`
}

View File

@ -41,6 +41,16 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*Proxying)(nil), (*search.Proxying)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_Proxying_To_search_Proxying(a.(*Proxying), b.(*search.Proxying), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*search.Proxying)(nil), (*Proxying)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_search_Proxying_To_v1alpha1_Proxying(a.(*search.Proxying), b.(*Proxying), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*ResourceRegistry)(nil), (*search.ResourceRegistry)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_ResourceRegistry_To_search_ResourceRegistry(a.(*ResourceRegistry), b.(*search.ResourceRegistry), scope)
}); err != nil {
@ -146,6 +156,24 @@ func Convert_search_OpenSearchConfig_To_v1alpha1_OpenSearchConfig(in *search.Ope
return autoConvert_search_OpenSearchConfig_To_v1alpha1_OpenSearchConfig(in, out, s)
}
func autoConvert_v1alpha1_Proxying_To_search_Proxying(in *Proxying, out *search.Proxying, s conversion.Scope) error {
return nil
}
// Convert_v1alpha1_Proxying_To_search_Proxying is an autogenerated conversion function.
func Convert_v1alpha1_Proxying_To_search_Proxying(in *Proxying, out *search.Proxying, s conversion.Scope) error {
return autoConvert_v1alpha1_Proxying_To_search_Proxying(in, out, s)
}
func autoConvert_search_Proxying_To_v1alpha1_Proxying(in *search.Proxying, out *Proxying, s conversion.Scope) error {
return nil
}
// Convert_search_Proxying_To_v1alpha1_Proxying is an autogenerated conversion function.
func Convert_search_Proxying_To_v1alpha1_Proxying(in *search.Proxying, out *Proxying, s conversion.Scope) error {
return autoConvert_search_Proxying_To_v1alpha1_Proxying(in, out, s)
}
func autoConvert_v1alpha1_ResourceRegistry_To_search_ResourceRegistry(in *ResourceRegistry, out *search.ResourceRegistry, s conversion.Scope) error {
out.ObjectMeta = in.ObjectMeta
if err := Convert_v1alpha1_ResourceRegistrySpec_To_search_ResourceRegistrySpec(&in.Spec, &out.Spec, s); err != nil {

View File

@ -53,6 +53,31 @@ func (in *OpenSearchConfig) DeepCopy() *OpenSearchConfig {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Proxying) DeepCopyInto(out *Proxying) {
*out = *in
out.TypeMeta = in.TypeMeta
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Proxying.
func (in *Proxying) DeepCopy() *Proxying {
if in == nil {
return nil
}
out := new(Proxying)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *Proxying) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ResourceRegistry) DeepCopyInto(out *ResourceRegistry) {
*out = *in

View File

@ -42,6 +42,7 @@ func init() {
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Proxying{},
&ResourceRegistry{},
&ResourceRegistryList{},
&Search{},

View File

@ -53,6 +53,31 @@ func (in *OpenSearchConfig) DeepCopy() *OpenSearchConfig {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Proxying) DeepCopyInto(out *Proxying) {
*out = *in
out.TypeMeta = in.TypeMeta
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Proxying.
func (in *Proxying) DeepCopy() *Proxying {
if in == nil {
return nil
}
out := new(Proxying)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *Proxying) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ResourceRegistry) DeepCopyInto(out *ResourceRegistry) {
*out = *in

View File

@ -71,6 +71,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
"github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1.StaticClusterWeight": schema_pkg_apis_policy_v1alpha1_StaticClusterWeight(ref),
"github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.BackendStoreConfig": schema_pkg_apis_search_v1alpha1_BackendStoreConfig(ref),
"github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.OpenSearchConfig": schema_pkg_apis_search_v1alpha1_OpenSearchConfig(ref),
"github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.Proxying": schema_pkg_apis_search_v1alpha1_Proxying(ref),
"github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.ResourceRegistry": schema_pkg_apis_search_v1alpha1_ResourceRegistry(ref),
"github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.ResourceRegistryList": schema_pkg_apis_search_v1alpha1_ResourceRegistryList(ref),
"github.com/karmada-io/karmada/pkg/apis/search/v1alpha1.ResourceRegistrySpec": schema_pkg_apis_search_v1alpha1_ResourceRegistrySpec(ref),
@ -3063,6 +3064,33 @@ func schema_pkg_apis_search_v1alpha1_OpenSearchConfig(ref common.ReferenceCallba
}
}
func schema_pkg_apis_search_v1alpha1_Proxying(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
SchemaProps: spec.SchemaProps{
Description: "Proxying define a flag for resource proxying that do not have actual resources.",
Type: []string{"object"},
Properties: map[string]spec.Schema{
"kind": {
SchemaProps: spec.SchemaProps{
Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds",
Type: []string{"string"},
Format: "",
},
},
"apiVersion": {
SchemaProps: spec.SchemaProps{
Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources",
Type: []string{"string"},
Format: "",
},
},
},
},
},
}
}
func schema_pkg_apis_search_v1alpha1_ResourceRegistry(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{

View File

@ -0,0 +1,72 @@
package storage
import (
"context"
"fmt"
"net/http"
"path"
"k8s.io/apimachinery/pkg/runtime"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/klog/v2"
searchapis "github.com/karmada-io/karmada/pkg/apis/search"
"github.com/karmada-io/karmada/pkg/search/proxy"
)
var proxyMethods = []string{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}
// ProxyingREST implements a RESTStorage for proxying resource.
type ProxyingREST struct {
ctl *proxy.Controller
}
var _ rest.Scoper = &ProxyingREST{}
var _ rest.Storage = &ProxyingREST{}
var _ rest.Connecter = &ProxyingREST{}
// NewProxyingREST returns a RESTStorage object that will work against search.
func NewProxyingREST(ctl *proxy.Controller) *ProxyingREST {
return &ProxyingREST{
ctl: ctl,
}
}
// New return empty Proxy object.
func (r *ProxyingREST) New() runtime.Object {
return &searchapis.Proxying{}
}
// NamespaceScoped returns false because Proxy is not namespaced.
func (r *ProxyingREST) NamespaceScoped() bool {
return false
}
// ConnectMethods returns the list of HTTP methods handled by Connect.
func (r *ProxyingREST) ConnectMethods() []string {
return proxyMethods
}
// NewConnectOptions returns an empty options object that will be used to pass options to the Connect method.
func (r *ProxyingREST) NewConnectOptions() (runtime.Object, bool, string) {
return nil, true, ""
}
// Connect returns a handler for proxy.
func (r *ProxyingREST) Connect(ctx context.Context, _ string, _ runtime.Object, responder rest.Responder) (http.Handler, error) {
info, ok := genericrequest.RequestInfoFrom(ctx)
if !ok {
return nil, fmt.Errorf("no RequestInfo found in the context")
}
if len(info.Parts) < 2 {
return nil, fmt.Errorf("invalid requestInfo parts: %v", info.Parts)
}
// For example, the whole request URL is /apis/search.karmada.io/v1alpha1/proxying/foo/proxy/api/v1/nodes
// info.Parts is [proxying foo proxy api v1 nodes], so proxyPath is /api/v1/nodes
proxyPath := "/" + path.Join(info.Parts[3:]...)
klog.V(4).Infof("ProxyingREST connect %v", proxyPath)
return r.ctl.Connect(ctx, proxyPath, responder)
}

View File

@ -8,8 +8,10 @@ import (
searchapis "github.com/karmada-io/karmada/pkg/apis/search"
searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
searchstorage "github.com/karmada-io/karmada/pkg/registry/search/storage"
"github.com/karmada-io/karmada/pkg/search/proxy"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
)
@ -17,7 +19,8 @@ import (
type ExtraConfig struct {
MultiClusterInformerManager genericmanager.MultiClusterInformerManager
ClusterLister clusterlister.ClusterLister
KarmadaSharedInformerFactory informerfactory.SharedInformerFactory
ProxyController *proxy.Controller
// Add custom config if necessary.
}
@ -25,6 +28,8 @@ type ExtraConfig struct {
type Config struct {
GenericConfig *genericapiserver.RecommendedConfig
Controller *Controller
ProxyController *proxy.Controller
KarmadaSharedInformerFactory informerfactory.SharedInformerFactory
}
// APIServer contains state for karmada-search.
@ -49,6 +54,8 @@ func (cfg *Config) Complete() CompletedConfig {
&ExtraConfig{
MultiClusterInformerManager: cfg.Controller.InformerManager,
ClusterLister: cfg.Controller.clusterLister,
KarmadaSharedInformerFactory: cfg.KarmadaSharedInformerFactory,
ProxyController: cfg.ProxyController,
},
}
@ -78,11 +85,13 @@ func (c completedConfig) New() (*APIServer, error) {
return nil, err
}
searchREST := searchstorage.NewSearchREST(c.ExtraConfig.MultiClusterInformerManager, c.ExtraConfig.ClusterLister)
proxyingREST := searchstorage.NewProxyingREST(c.ExtraConfig.ProxyController)
v1alpha1search := map[string]rest.Storage{}
v1alpha1search["resourceregistries"] = resourceRegistryStorage.ResourceRegistry
v1alpha1search["resourceregistries/status"] = resourceRegistryStorage.Status
v1alpha1search["search"] = searchREST
v1alpha1search["proxying"] = proxyingREST
apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"] = v1alpha1search
if err = server.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {

View File

@ -0,0 +1,22 @@
package proxy
import (
"context"
"net/http"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
)
type cacheProxy struct {
store *store.MultiClusterCache
}
func (p *cacheProxy) connect(ctx context.Context) http.Handler {
return nil
}
func newCacheProxy(store *store.MultiClusterCache) *cacheProxy {
return &cacheProxy{
store: store,
}
}

View File

@ -0,0 +1,42 @@
package proxy
import (
"context"
"fmt"
"net/http"
"path"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/registry/rest"
listcorev1 "k8s.io/client-go/listers/core/v1"
"github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util/proxy"
)
// clusterProxy proxy to member clusters
type clusterProxy struct {
secretLister listcorev1.SecretLister
}
func newClusterProxy(secretLister listcorev1.SecretLister) *clusterProxy {
return &clusterProxy{
secretLister: secretLister,
}
}
func (c *clusterProxy) connect(ctx context.Context, cluster *v1alpha1.Cluster, proxyPath string, responder rest.Responder) (http.Handler, error) {
location, transport, err := proxy.Location(cluster.Name, cluster.Spec.APIEndpoint, cluster.Spec.ProxyURL)
if err != nil {
return nil, err
}
location.Path = path.Join(location.Path, proxyPath)
secretGetter := func(context.Context, string) (*corev1.Secret, error) {
if cluster.Spec.ImpersonatorSecretRef == nil {
return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name)
}
return c.secretLister.Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(cluster.Spec.ImpersonatorSecretRef.Name)
}
return proxy.ConnectCluster(ctx, cluster.Name, location, transport, responder, secretGetter)
}

View File

@ -0,0 +1,150 @@
package proxy
import (
"context"
"net/http"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/informers"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
searchlisters "github.com/karmada-io/karmada/pkg/generated/listers/search/v1alpha1"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/lifted"
)
const workKey = "key"
// Controller syncs Cluster and GlobalResource.
type Controller struct {
clusterLister clusterlisters.ClusterLister
registryLister searchlisters.ResourceRegistryLister
worker util.AsyncWorker
informerManager genericmanager.MultiClusterInformerManager
store *store.MultiClusterCache
// proxy
karmadaProxy *karmadaProxy
clusterProxy *clusterProxy
cacheProxy *cacheProxy
}
// NewController create a controller for proxy
func NewController(restConfig *restclient.Config, informerManager genericmanager.MultiClusterInformerManager, factory informers.SharedInformerFactory, karmadaFactory informerfactory.SharedInformerFactory) (*Controller, error) {
kp, err := newKarmadaProxy(restConfig)
if err != nil {
return nil, err
}
s := &store.MultiClusterCache{}
ctl := &Controller{
clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(),
registryLister: karmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
informerManager: informerManager,
store: s,
karmadaProxy: kp,
clusterProxy: newClusterProxy(factory.Core().V1().Secrets().Lister()),
cacheProxy: newCacheProxy(s),
}
workerOptions := util.Options{
Name: "proxy-controller",
KeyFunc: nil,
ReconcileFunc: ctl.reconcile,
}
ctl.worker = util.NewAsyncWorker(workerOptions)
resourceEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(interface{}) {
ctl.worker.Add(workKey)
},
UpdateFunc: func(_, _ interface{}) {
ctl.worker.Add(workKey)
},
DeleteFunc: func(interface{}) {
ctl.worker.Add(workKey)
},
}
karmadaFactory.Cluster().V1alpha1().Clusters().Informer().AddEventHandler(resourceEventHandler)
karmadaFactory.Search().V1alpha1().ResourceRegistries().Informer().AddEventHandler(resourceEventHandler)
return ctl, nil
}
// Start run the proxy controller
func (ctl *Controller) Start(stopCh <-chan struct{}) {
ctl.worker.Run(1, stopCh)
}
// reconcile cache
func (ctl *Controller) reconcile(util.QueueKey) error {
return nil
}
// Connect proxy and dispatch handlers
func (ctl *Controller) Connect(ctx context.Context, proxyPath string, responder rest.Responder) (http.Handler, error) {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
newReq := req.Clone(req.Context())
newReq.URL.Path = proxyPath
requestInfo := lifted.NewRequestInfo(newReq)
newCtx := request.WithRequestInfo(ctx, requestInfo)
newCtx = request.WithNamespace(newCtx, requestInfo.Namespace)
newReq = newReq.WithContext(newCtx)
h, err := ctl.connect(newCtx, requestInfo, proxyPath, responder)
if err != nil {
responder.Error(err)
return
}
h.ServeHTTP(rw, newReq)
}), nil
}
func (ctl *Controller) connect(ctx context.Context, requestInfo *request.RequestInfo, path string, responder rest.Responder) (http.Handler, error) {
gvr := schema.GroupVersionResource{
Group: requestInfo.APIGroup,
Version: requestInfo.APIVersion,
Resource: requestInfo.Resource,
}
// requests will be redirected to:
// 1. karmada apiserver
// 2. cache
// 3. member clusters
// see more information from https://github.com/karmada-io/karmada/tree/master/docs/proposals/resource-aggregation-proxy#request-routing
// 1. For non-resource requests, or resources are not defined in ResourceRegistry,
// we redirect the requests to karmada apiserver.
// Usually the request are
// - api index, e.g.: `/api`, `/apis`
// - to workload created in karmada controller panel, such as deployments and services.
if !requestInfo.IsResourceRequest || !ctl.store.HasResource(gvr) {
return ctl.karmadaProxy.connect(path, responder)
}
// 2. For reading requests, we redirect them to cache.
// Users call these requests to read resources in member clusters, such as pods and nodes.
if requestInfo.Subresource == "" && (requestInfo.Verb == "get" || requestInfo.Verb == "list" || requestInfo.Verb == "watch") {
return ctl.cacheProxy.connect(ctx), nil
}
// 3. The remaining requests are:
// - writing resources.
// - or subresource requests, e.g. `pods/log`
// We firstly find the resource from cache, and get the located cluster. Then redirect the request to the cluster.
cluster, err := ctl.store.GetClusterForResource(ctx, gvr)
if err != nil {
return nil, err
}
return ctl.clusterProxy.connect(ctx, cluster, path, responder)
}

View File

@ -0,0 +1,51 @@
package proxy
import (
"net/http"
"net/url"
"path"
"k8s.io/apiserver/pkg/registry/rest"
restclient "k8s.io/client-go/rest"
"github.com/karmada-io/karmada/pkg/util/proxy"
)
// karmadaProxy is proxy for karmada control panel
type karmadaProxy struct {
proxyLocation *url.URL
proxyTransport http.RoundTripper
}
func newKarmadaProxy(restConfig *restclient.Config) (*karmadaProxy, error) {
location, err := url.Parse(restConfig.Host)
if err != nil {
return nil, err
}
transport, err := restclient.TransportFor(restConfig)
if err != nil {
return nil, err
}
return &karmadaProxy{
proxyLocation: location,
proxyTransport: transport,
}, nil
}
// connect to Karmada-ApiServer directly
func (p *karmadaProxy) connect(proxyPath string, responder rest.Responder) (http.Handler, error) {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
location, transport := p.resourceLocation()
location.Path = path.Join(location.Path, proxyPath)
location.RawQuery = req.URL.RawQuery
handler := proxy.NewThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder)
handler.ServeHTTP(rw, req)
}), nil
}
func (p *karmadaProxy) resourceLocation() (*url.URL, http.RoundTripper) {
location := *p.proxyLocation
return &location, p.proxyTransport
}

View File

@ -0,0 +1,5 @@
package store
// clusterCache caches resources for single member cluster
type clusterCache struct {
}

View File

@ -0,0 +1,28 @@
package store
import (
"context"
"k8s.io/apimachinery/pkg/runtime/schema"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
)
// temporarily suppress lint: xxx is unused
// TODO: remove it after these structs are used
var _ = clusterCache{}
var _ = resourceCache{}
// MultiClusterCache caches resource from multi member clusters
type MultiClusterCache struct {
}
// HasResource return whether resource is cached.
func (c *MultiClusterCache) HasResource(_ schema.GroupVersionResource) bool {
return false
}
// GetClusterForResource returns which cluster the resource belong to.
func (c *MultiClusterCache) GetClusterForResource(ctx context.Context, gvr schema.GroupVersionResource) (*clusterv1alpha1.Cluster, error) {
return nil, nil
}

View File

@ -0,0 +1,5 @@
package store
// resourceCache cache one kind resource for single member cluster
type resourceCache struct {
}

130
pkg/util/proxy/proxy.go Normal file
View File

@ -0,0 +1,130 @@
package proxy
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"net/url"
authenticationv1 "k8s.io/api/authentication/v1"
corev1 "k8s.io/api/core/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
)
// NewThrottledUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval. Responder is required for returning
// errors to the caller.
func NewThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
return proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
}
// ConnectCluster returns a handler for proxy cluster.
func ConnectCluster(ctx context.Context, clusterName string, location *url.URL, transport http.RoundTripper, responder rest.Responder,
impersonateSecretGetter func(context.Context, string) (*corev1.Secret, error)) (http.Handler, error) {
secret, err := impersonateSecretGetter(ctx, clusterName)
if err != nil {
return nil, err
}
impersonateToken, err := getImpersonateToken(clusterName, secret)
if err != nil {
return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", clusterName, err)
}
return newProxyHandler(location, transport, impersonateToken, responder)
}
// Location returns a URL to which one can send traffic for the specified cluster.
func Location(clusterName string, apiEndpoint string, proxyURL string) (*url.URL, http.RoundTripper, error) {
location, err := constructLocation(clusterName, apiEndpoint)
if err != nil {
return nil, nil, err
}
transport, err := createProxyTransport(proxyURL)
if err != nil {
return nil, nil, err
}
return location, transport, nil
}
func constructLocation(clusterName string, apiEndpoint string) (*url.URL, error) {
if apiEndpoint == "" {
return nil, fmt.Errorf("API endpoint of cluster %s should not be empty", clusterName)
}
uri, err := url.Parse(apiEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to parse api endpoint %s: %v", apiEndpoint, err)
}
return uri, nil
}
func createProxyTransport(proxyURL string) (*http.Transport, error) {
var proxyDialerFn utilnet.DialFunc
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} // #nosec
trans := utilnet.SetTransportDefaults(&http.Transport{
DialContext: proxyDialerFn,
TLSClientConfig: proxyTLSClientConfig,
})
if proxyURL != "" {
u, err := url.Parse(proxyURL)
if err != nil {
return nil, fmt.Errorf("failed to parse url of proxy url %s: %v", proxyURL, err)
}
trans.Proxy = http.ProxyURL(u)
}
return trans, nil
}
func getImpersonateToken(clusterName string, secret *corev1.Secret) (string, error) {
token, found := secret.Data[clusterapis.SecretTokenKey]
if !found {
return "", fmt.Errorf("the impresonate token of cluster %s is empty", clusterName)
}
return string(token), nil
}
func newProxyHandler(location *url.URL, transport http.RoundTripper, impersonateToken string, responder rest.Responder) (http.Handler, error) {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
requester, exist := request.UserFrom(req.Context())
if !exist {
responsewriters.InternalError(rw, req, errors.New("no user found for request"))
return
}
req.Header.Set(authenticationv1.ImpersonateUserHeader, requester.GetName())
for _, group := range requester.GetGroups() {
if !skipGroup(group) {
req.Header.Add(authenticationv1.ImpersonateGroupHeader, group)
}
}
req.Header.Set("Authorization", fmt.Sprintf("bearer %s", impersonateToken))
// Retain RawQuery in location because upgrading the request will use it.
// See https://github.com/karmada-io/karmada/issues/1618#issuecomment-1103793290 for more info.
location.RawQuery = req.URL.RawQuery
handler := NewThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder)
handler.ServeHTTP(rw, req)
}), nil
}
func skipGroup(group string) bool {
switch group {
case user.AllAuthenticated, user.AllUnauthenticated:
return true
default:
return false
}
}