diff --git a/cmd/karmada-search/app/options/options.go b/cmd/karmada-search/app/options/options.go index 7606cbf8a..bef5d18b6 100644 --- a/cmd/karmada-search/app/options/options.go +++ b/cmd/karmada-search/app/options/options.go @@ -16,6 +16,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" netutils "k8s.io/utils/net" + searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme" searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1" generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi" "github.com/karmada-io/karmada/pkg/search" @@ -38,7 +39,7 @@ func NewOptions() *Options { o := &Options{ RecommendedOptions: genericoptions.NewRecommendedOptions( defaultEtcdPathPrefix, - search.Codecs.LegacyCodec(searchv1alpha1.SchemeGroupVersion)), + searchscheme.Codecs.LegacyCodec(searchv1alpha1.SchemeGroupVersion)), } o.RecommendedOptions.Etcd.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(searchv1alpha1.SchemeGroupVersion, schema.GroupKind{Group: searchv1alpha1.GroupName}) @@ -101,8 +102,8 @@ func (o *Options) Config() (*search.Config, error) { return nil, fmt.Errorf("error creating self-signed certificates: %v", err) } - serverConfig := genericapiserver.NewRecommendedConfig(search.Codecs) - serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(search.Scheme)) + serverConfig := genericapiserver.NewRecommendedConfig(searchscheme.Codecs) + serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(searchscheme.Scheme)) serverConfig.OpenAPIConfig.Info.Title = "karmada-search" if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil { return nil, err diff --git a/pkg/apis/search/scheme/register.go b/pkg/apis/search/scheme/register.go new file mode 100644 index 000000000..75e7542c1 --- /dev/null +++ b/pkg/apis/search/scheme/register.go @@ -0,0 +1,42 @@ +package scheme + +import ( + "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + + searchinstall "github.com/karmada-io/karmada/pkg/apis/search/install" +) + +var ( + // Scheme defines methods for serializing and deserializing API objects. + Scheme = runtime.NewScheme() + // Codecs provides methods for retrieving codecs and serializers for specific + // versions and content types. + Codecs = serializer.NewCodecFactory(Scheme) + // ParameterCodec handles versioning of objects that are converted to query parameters. + ParameterCodec = runtime.NewParameterCodec(Scheme) +) + +func init() { + searchinstall.Install(Scheme) + + utilruntime.Must(internalversion.AddToScheme(Scheme)) + + // we need to add the options to empty v1 + // TODO fix the server code to avoid this + metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"}) + + // TODO: keep the generic API server from wanting this + unversioned := schema.GroupVersion{Group: "", Version: "v1"} + Scheme.AddUnversionedTypes(unversioned, + &metav1.Status{}, + &metav1.APIVersions{}, + &metav1.APIGroupList{}, + &metav1.APIGroup{}, + &metav1.APIResourceList{}, + ) +} diff --git a/pkg/registry/search/storage/cache.go b/pkg/registry/search/storage/cache.go index 71d46917d..3f480415d 100644 --- a/pkg/registry/search/storage/cache.go +++ b/pkg/registry/search/storage/cache.go @@ -5,6 +5,8 @@ import ( "fmt" "net/http" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -13,6 +15,9 @@ import ( genericrequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/klog/v2" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme" ) type errorResponse struct { @@ -36,6 +41,26 @@ func (r *SearchREST) newCacheHandler(info *genericrequest.RequestInfo, responder return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { enc := json.NewEncoder(rw) + opts := metainternalversion.ListOptions{} + if err := searchscheme.ParameterCodec.DecodeParameters(req.URL.Query(), metav1.SchemeGroupVersion, &opts); err != nil { + rw.WriteHeader(http.StatusBadRequest) + klog.Errorf("Failed to decode parameters from req.URL.Query(): %v", err) + _ = enc.Encode(errorResponse{Error: err.Error()}) + return + } + + if errs := metainternalversionvalidation.ValidateListOptions(&opts); len(errs) > 0 { + rw.WriteHeader(http.StatusBadRequest) + klog.Errorf("Invalid decoded ListOptions: %v.", errs) + _ = enc.Encode(errorResponse{Error: errs.ToAggregate().Error()}) + return + } + + label := labels.Everything() + if opts.LabelSelector != nil { + label = opts.LabelSelector + } + clusters, err := r.clusterLister.List(labels.Everything()) if err != nil { rw.WriteHeader(http.StatusInternalServerError) @@ -43,54 +68,69 @@ func (r *SearchREST) newCacheHandler(info *genericrequest.RequestInfo, responder return } - items := make([]runtime.Object, 0) - for _, cluster := range clusters { - singleClusterManger := r.multiClusterInformerManager.GetSingleClusterManager(cluster.Name) - if singleClusterManger == nil { - klog.Warningf("SingleClusterInformerManager for cluster(%s) is nil.", cluster.Name) - continue - } - - switch { - case len(info.Namespace) > 0 && len(info.Name) > 0: - resourceObject, err := singleClusterManger.Lister(resourceGVR).ByNamespace(info.Namespace).Get(info.Name) - if err != nil { - klog.Errorf("Failed to get %s resource(%s/%s) from cluster(%s)'s informer cache.", - resourceGVR, info.Namespace, info.Name, cluster.Name) - } - items = append(items, addAnnotationWithClusterName([]runtime.Object{resourceObject}, cluster.Name)...) - case len(info.Namespace) > 0: - resourceObjects, err := singleClusterManger.Lister(resourceGVR).ByNamespace(info.Namespace).List(labels.Everything()) - if err != nil { - klog.Errorf("Failed to list %s resource under namespace(%s) from cluster(%s)'s informer cache.", - resourceGVR, info.Namespace, cluster.Name) - } - items = append(items, addAnnotationWithClusterName(resourceObjects, cluster.Name)...) - case len(info.Name) > 0: - resourceObject, err := singleClusterManger.Lister(resourceGVR).Get(info.Name) - if err != nil { - klog.Errorf("Failed to get %s resource(%s) from cluster(%s)'s informer cache.", - resourceGVR, info.Name, cluster.Name) - } - items = append(items, addAnnotationWithClusterName([]runtime.Object{resourceObject}, cluster.Name)...) - default: - resourceObjects, err := singleClusterManger.Lister(resourceGVR).List(labels.Everything()) - if err != nil { - klog.Errorf("Failed to list %s resource from cluster(%s)'s informer cache.", - resourceGVR, cluster.Name) - } - items = append(items, addAnnotationWithClusterName(resourceObjects, cluster.Name)...) - } + // TODO: process opts.Limit to prevent the client from being unable to process the response + // due to the large size of the response body. + objItems := r.getObjectItemsFromClusters(clusters, resourceGVR, info.Namespace, info.Name, label) + rr := reqResponse{ + TypeMeta: metav1.TypeMeta{ + APIVersion: resourceGVR.GroupVersion().String(), + Kind: "List", // TODO: obtains the kind type of the actual resource list. + }, + Items: objItems, } - - rr := reqResponse{} - rr.APIVersion = fmt.Sprintf("%s/%s", info.APIGroup, info.APIVersion) - rr.Kind = "List" - rr.Items = items _ = enc.Encode(rr) }), nil } +func (r *SearchREST) getObjectItemsFromClusters( + clusters []*clusterv1alpha1.Cluster, + objGVR schema.GroupVersionResource, + namespace, name string, + label labels.Selector) []runtime.Object { + items := make([]runtime.Object, 0) + + // TODO: encapsulating the Interface for Obtaining resource from the Multi-Cluster Cache. + for _, cluster := range clusters { + singleClusterManger := r.multiClusterInformerManager.GetSingleClusterManager(cluster.Name) + if singleClusterManger == nil { + klog.Warningf("SingleClusterInformerManager for cluster(%s) is nil.", cluster.Name) + continue + } + + var err error + objLister := singleClusterManger.Lister(objGVR) + if len(name) > 0 { + var resourceObject runtime.Object + if len(namespace) > 0 { + resourceObject, err = objLister.ByNamespace(namespace).Get(name) + } else { + resourceObject, err = objLister.Get(name) + } + if err != nil { + klog.Errorf("Failed to get %s resource(%s/%s) from cluster(%s)'s informer cache: %v", + objGVR, namespace, name, cluster.Name, err) + continue + } + items = append(items, addAnnotationWithClusterName([]runtime.Object{resourceObject}, cluster.Name)...) + } else { + var resourceObjects []runtime.Object + if len(namespace) > 0 { + resourceObjects, err = objLister.ByNamespace(namespace).List(label) + } else { + resourceObjects, err = objLister.List(label) + } + if err != nil { + klog.Errorf("Failed to list %s resource from cluster(%s)'s informer cache: %v", + objGVR, cluster.Name, err) + continue + } + items = append(items, addAnnotationWithClusterName(resourceObjects, cluster.Name)...) + } + } + + return items +} + func addAnnotationWithClusterName(resourceObjects []runtime.Object, clusterName string) []runtime.Object { resources := make([]runtime.Object, 0) for index := range resourceObjects { @@ -100,6 +140,7 @@ func addAnnotationWithClusterName(resourceObjects []runtime.Object, clusterName if annotations == nil { annotations = make(map[string]string) } + // TODO: move this annotation key `cluster.karmada.io/name` to the Cluster API. annotations["cluster.karmada.io/name"] = clusterName resource.SetAnnotations(annotations) diff --git a/pkg/search/apiserver.go b/pkg/search/apiserver.go index 0827471f7..6a15b8857 100644 --- a/pkg/search/apiserver.go +++ b/pkg/search/apiserver.go @@ -1,50 +1,18 @@ package search import ( - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/klog/v2" searchapis "github.com/karmada-io/karmada/pkg/apis/search" - searchinstall "github.com/karmada-io/karmada/pkg/apis/search/install" + searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme" clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" searchstorage "github.com/karmada-io/karmada/pkg/registry/search/storage" "github.com/karmada-io/karmada/pkg/util/informermanager" ) -var ( - // Scheme defines methods for serializing and deserializing API objects. - Scheme = runtime.NewScheme() - // Codecs provides methods for retrieving codecs and serializers for specific - // versions and content types. - Codecs = serializer.NewCodecFactory(Scheme) - // ParameterCodec handles versioning of objects that are converted to query parameters. - ParameterCodec = runtime.NewParameterCodec(Scheme) -) - -func init() { - searchinstall.Install(Scheme) - - // we need to add the options to empty v1 - // TODO fix the server code to avoid this - metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"}) - - // TODO: keep the generic API server from wanting this - unversioned := schema.GroupVersion{Group: "", Version: "v1"} - Scheme.AddUnversionedTypes(unversioned, - &metav1.Status{}, - &metav1.APIVersions{}, - &metav1.APIGroupList{}, - &metav1.APIGroup{}, - &metav1.APIResourceList{}, - ) -} - // ExtraConfig holds custom apiserver config type ExtraConfig struct { MultiClusterInformerManager informermanager.MultiClusterInformerManager @@ -102,9 +70,9 @@ func (c completedConfig) New() (*APIServer, error) { GenericAPIServer: genericServer, } - apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(searchapis.GroupName, Scheme, ParameterCodec, Codecs) + apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(searchapis.GroupName, searchscheme.Scheme, searchscheme.ParameterCodec, searchscheme.Codecs) - resourceRegistryStorage, err := searchstorage.NewResourceRegistryStorage(Scheme, c.GenericConfig.RESTOptionsGetter) + resourceRegistryStorage, err := searchstorage.NewResourceRegistryStorage(searchscheme.Scheme, c.GenericConfig.RESTOptionsGetter) if err != nil { klog.Errorf("unable to create REST storage for a resource due to %v, will die", err) return nil, err