diff --git a/cmd/karmada-search/app/options/options.go b/cmd/karmada-search/app/options/options.go index 76750aa5d..883f06471 100644 --- a/cmd/karmada-search/app/options/options.go +++ b/cmd/karmada-search/app/options/options.go @@ -14,7 +14,6 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" genericoptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/kubernetes" netutils "k8s.io/utils/net" searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1" @@ -76,11 +75,7 @@ func (o *Options) Run(ctx context.Context) error { return err } - restConfig := config.GenericConfig.ClientConfig - restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst - kubeClientSet := kubernetes.NewForConfigOrDie(restConfig) - - server, err := config.Complete().New(kubeClientSet) + server, err := config.Complete().New() if err != nil { return err } @@ -92,11 +87,7 @@ func (o *Options) Run(ctx context.Context) error { server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error { // start ResourceRegistry controller - ctl, err := search.NewController(restConfig, search.CachedResourceHandler()) - if err != nil { - return err - } - ctl.Start(context.StopCh) + config.Controller.Start(context.StopCh) return nil }) @@ -117,9 +108,16 @@ func (o *Options) Config() (*search.Config, error) { return nil, err } + serverConfig.ClientConfig.QPS = o.KubeAPIQPS + serverConfig.ClientConfig.Burst = o.KubeAPIBurst + ctl, err := search.NewController(serverConfig.ClientConfig, search.CachedResourceHandler()) + if err != nil { + return nil, err + } + config := &search.Config{ GenericConfig: serverConfig, - ExtraConfig: search.ExtraConfig{}, + Controller: ctl, } return config, nil } diff --git a/pkg/registry/search/storage/cache.go b/pkg/registry/search/storage/cache.go new file mode 100644 index 000000000..c5c5738c2 --- /dev/null +++ b/pkg/registry/search/storage/cache.go @@ -0,0 +1,107 @@ +package storage + +import ( + "encoding/json" + "fmt" + "net/http" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + genericrequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/klog/v2" +) + +type errorResponse struct { + Error string `json:"error"` +} + +type reqResponse struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + + Items []runtime.Object `json:"items"` +} + +func (r *SearchREST) newCacheHandler(info *genericrequest.RequestInfo, responder rest.Responder) (http.Handler, error) { + resourceGVR := schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + } + + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + enc := json.NewEncoder(rw) + + clusters, err := r.clusterLister.List(labels.Everything()) + if err != nil { + rw.WriteHeader(http.StatusInternalServerError) + _ = enc.Encode(errorResponse{Error: fmt.Sprintf("Failed to list clusters: %v", err)}) + return + } + + items := make([]runtime.Object, 0) + for _, cluster := range clusters { + singleClusterManger := r.multiClusterInformerManager.GetSingleClusterManager(cluster.Name) + if singleClusterManger == nil { + klog.Warningf("SingleClusterInformerManager for cluster(%s) is nil.", cluster.Name) + continue + } + + switch { + case len(info.Namespace) > 0 && len(info.Name) > 0: + resourceObject, err := singleClusterManger.Lister(resourceGVR).ByNamespace(info.Namespace).Get(info.Name) + if err != nil { + klog.Errorf("Failed to get %s resource(%s/%s) from cluster(%s)'s informer cache.", + resourceGVR, info.Namespace, info.Name, cluster.Name) + } + items = append(items, addAnnotationWithClusterName([]runtime.Object{resourceObject}, cluster.Name)...) + case len(info.Namespace) > 0: + resourceObjects, err := singleClusterManger.Lister(resourceGVR).ByNamespace(info.Namespace).List(labels.Everything()) + if err != nil { + klog.Errorf("Failed to list %s resource under namespace(%s) from cluster(%s)'s informer cache.", + resourceGVR, info.Namespace, cluster.Name) + } + items = append(items, addAnnotationWithClusterName(resourceObjects, cluster.Name)...) + case len(info.Name) > 0: + resourceObject, err := singleClusterManger.Lister(resourceGVR).Get(info.Name) + if err != nil { + klog.Errorf("Failed to get %s resource(%s) from cluster(%s)'s informer cache.", + resourceGVR, info.Name, cluster.Name) + } + items = append(items, addAnnotationWithClusterName([]runtime.Object{resourceObject}, cluster.Name)...) + default: + resourceObjects, err := singleClusterManger.Lister(resourceGVR).List(labels.Everything()) + if err != nil { + klog.Errorf("Failed to list %s resource from cluster(%s)'s informer cache.", + resourceGVR, cluster.Name) + } + items = append(items, addAnnotationWithClusterName(resourceObjects, cluster.Name)...) + } + } + + rr := reqResponse{} + rr.APIVersion = fmt.Sprintf("%s/%s", info.APIGroup, info.APIVersion) + rr.Kind = "List" + rr.Items = items + _ = enc.Encode(rr) + }), nil +} + +func addAnnotationWithClusterName(resourceObjects []runtime.Object, clusterName string) []runtime.Object { + resources := make([]runtime.Object, 0) + for index := range resourceObjects { + resource := resourceObjects[index].(*unstructured.Unstructured) + + annotations := resource.GetAnnotations() + annotations["cluster.karmada.io/name"] = clusterName + + resource.SetAnnotations(annotations) + resources = append(resources, resource) + } + + return resources +} diff --git a/pkg/registry/search/storage/opensearch.go b/pkg/registry/search/storage/opensearch.go new file mode 100644 index 000000000..e178093b5 --- /dev/null +++ b/pkg/registry/search/storage/opensearch.go @@ -0,0 +1,14 @@ +package storage + +import ( + "net/http" + + genericrequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" +) + +func (r *SearchREST) newOpenSearchHandler(info *genericrequest.RequestInfo, responder rest.Responder) (http.Handler, error) { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + // Construct a handler and send the request to the ES. + }), nil +} diff --git a/pkg/registry/search/storage/requestinfo.go b/pkg/registry/search/storage/requestinfo.go new file mode 100644 index 000000000..99699be5e --- /dev/null +++ b/pkg/registry/search/storage/requestinfo.go @@ -0,0 +1,76 @@ +package storage + +import ( + "fmt" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + genericrequest "k8s.io/apiserver/pkg/endpoints/request" +) + +var ( + apiPrefixes = sets.NewString("api", "apis") + groupLessAPIPrefixes = sets.NewString("api") +) + +func parseK8sNativeResourceInfo(reqParts []string) (*genericrequest.RequestInfo, error) { + requestInfo := &genericrequest.RequestInfo{ + IsResourceRequest: false, + Path: strings.Join(reqParts, "/"), + } + + if len(reqParts) < 3 { + // return a non-resource request + return requestInfo, nil + } + + if !apiPrefixes.Has(reqParts[0]) { + // return a non-resource request + return requestInfo, nil + } + + requestInfo.APIPrefix = reqParts[0] + currentParts := reqParts[1:] + + if !groupLessAPIPrefixes.Has(requestInfo.APIPrefix) { + if len(currentParts) < 3 { + // return a non-resource request + return requestInfo, nil + } + + requestInfo.APIGroup = currentParts[0] + currentParts = currentParts[1:] + } + + requestInfo.IsResourceRequest = true + requestInfo.APIVersion = currentParts[0] + currentParts = currentParts[1:] + + // URL forms: /namespaces/{namespace}/{kind}/*, where parts are adjusted to be relative to kind + if currentParts[0] == "namespaces" { + if len(currentParts) > 1 { + requestInfo.Namespace = currentParts[1] + if len(currentParts) > 2 { + currentParts = currentParts[2:] + } + } + } else { + requestInfo.Namespace = metav1.NamespaceNone + } + + switch { + case len(currentParts) >= 3: + return nil, fmt.Errorf("invalid request parts(%s) for k8s native request URL", currentParts) + case len(currentParts) >= 2: + requestInfo.Name = currentParts[1] + fallthrough + case len(currentParts) >= 1: + requestInfo.Resource = currentParts[0] + } + + if requestInfo.Resource == "namespace" { + requestInfo.Namespace = "" + } + return requestInfo, nil +} diff --git a/pkg/registry/search/storage/search.go b/pkg/registry/search/storage/search.go index 26b26a699..6f57c3c29 100644 --- a/pkg/registry/search/storage/search.go +++ b/pkg/registry/search/storage/search.go @@ -2,19 +2,24 @@ package storage import ( "context" + "fmt" "net/http" + "strings" "k8s.io/apimachinery/pkg/runtime" + genericrequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" searchapis "github.com/karmada-io/karmada/pkg/apis/search" + clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" + "github.com/karmada-io/karmada/pkg/util/informermanager" ) // SearchREST implements a RESTStorage for search resource. type SearchREST struct { - kubeClient kubernetes.Interface + multiClusterInformerManager informermanager.MultiClusterInformerManager + clusterLister clusterlister.ClusterLister // add needed parameters here } @@ -24,9 +29,12 @@ var _ rest.Storage = &SearchREST{} var _ rest.Connecter = &SearchREST{} // NewSearchREST returns a RESTStorage object that will work against search. -func NewSearchREST(client kubernetes.Interface) *SearchREST { +func NewSearchREST( + multiClusterInformerManager informermanager.MultiClusterInformerManager, + clusterLister clusterlister.ClusterLister) *SearchREST { return &SearchREST{ - kubeClient: client, + multiClusterInformerManager: multiClusterInformerManager, + clusterLister: clusterLister, } } @@ -50,12 +58,37 @@ func (r *SearchREST) NewConnectOptions() (runtime.Object, bool, string) { return nil, true, "" } -// Connect returns a handler for the ES search. +// Connect returns a handler for search. func (r *SearchREST) Connect(ctx context.Context, id string, _ runtime.Object, responder rest.Responder) (http.Handler, error) { - klog.Infof("Prepare for construct handler to connect ES.") + info, ok := genericrequest.RequestInfoFrom(ctx) + if !ok { + return nil, fmt.Errorf("no RequestInfo found in the context") + } - return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if len(info.Parts) < 3 { + return nil, fmt.Errorf("invalid requestInfo parts: %v", info.Parts) + } - // Construct a handler and send the request to the ES. - }), nil + // reqParts are slices split by the k8s-native URL that include + // APIPrefix, APIGroup, APIVersion, Namespace and Resource. + // For example, the whole request URL is /apis/search.karmada.io/v1alpha1/search/cache/api/v1/nodes + // info.Parts is [search cache api v1 nodes], so reParts is [api v1 nodes] + reqParts := info.Parts[2:] + nativeResourceInfo, err := parseK8sNativeResourceInfo(reqParts) + if err != nil { + klog.Errorf("Failed to parse k8s native RequestInfo, err: %v", err) + return nil, err + } + if !nativeResourceInfo.IsResourceRequest { + return nil, fmt.Errorf("k8s native Request URL(%s) is not a resource request", strings.Join(reqParts, "/")) + } + + switch id { + case "cache": + return r.newCacheHandler(nativeResourceInfo, responder) + case "opensearch": + return r.newOpenSearchHandler(nativeResourceInfo, responder) + default: + return nil, fmt.Errorf("connect with unrecognized search category %s", id) + } } diff --git a/pkg/search/apiserver.go b/pkg/search/apiserver.go index 3496f2479..0827471f7 100644 --- a/pkg/search/apiserver.go +++ b/pkg/search/apiserver.go @@ -8,12 +8,13 @@ import ( "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" - "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" searchapis "github.com/karmada-io/karmada/pkg/apis/search" searchinstall "github.com/karmada-io/karmada/pkg/apis/search/install" + clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" searchstorage "github.com/karmada-io/karmada/pkg/registry/search/storage" + "github.com/karmada-io/karmada/pkg/util/informermanager" ) var ( @@ -46,13 +47,16 @@ func init() { // ExtraConfig holds custom apiserver config type ExtraConfig struct { + MultiClusterInformerManager informermanager.MultiClusterInformerManager + ClusterLister clusterlister.ClusterLister + // Add custom config if necessary. } // Config defines the config for the APIServer. type Config struct { GenericConfig *genericapiserver.RecommendedConfig - ExtraConfig ExtraConfig + Controller *Controller } // APIServer contains state for karmada-search. @@ -74,7 +78,10 @@ type CompletedConfig struct { func (cfg *Config) Complete() CompletedConfig { c := completedConfig{ cfg.GenericConfig.Complete(), - &cfg.ExtraConfig, + &ExtraConfig{ + MultiClusterInformerManager: cfg.Controller.InformerManager, + ClusterLister: cfg.Controller.clusterLister, + }, } c.GenericConfig.Version = &version.Info{ @@ -85,7 +92,7 @@ func (cfg *Config) Complete() CompletedConfig { return CompletedConfig{&c} } -func (c completedConfig) New(kubeClient kubernetes.Interface) (*APIServer, error) { +func (c completedConfig) New() (*APIServer, error) { genericServer, err := c.GenericConfig.New("karmada-search", genericapiserver.NewEmptyDelegate()) if err != nil { return nil, err @@ -102,7 +109,7 @@ func (c completedConfig) New(kubeClient kubernetes.Interface) (*APIServer, error klog.Errorf("unable to create REST storage for a resource due to %v, will die", err) return nil, err } - searchREST := searchstorage.NewSearchREST(kubeClient) + searchREST := searchstorage.NewSearchREST(c.ExtraConfig.MultiClusterInformerManager, c.ExtraConfig.ClusterLister) v1alpha1search := map[string]rest.Storage{} v1alpha1search["resourceregistries"] = resourceRegistryStorage.ResourceRegistry