Merge pull request #1917 from XiShanYongYe-Chang/search-with-labels

Support to search resources with labels
This commit is contained in:
karmada-bot 2022-06-01 15:50:51 +08:00 committed by GitHub
commit e80336a403
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 133 additions and 81 deletions

View File

@ -16,6 +16,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
netutils "k8s.io/utils/net"
searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme"
searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1"
generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi"
"github.com/karmada-io/karmada/pkg/search"
@ -38,7 +39,7 @@ func NewOptions() *Options {
o := &Options{
RecommendedOptions: genericoptions.NewRecommendedOptions(
defaultEtcdPathPrefix,
search.Codecs.LegacyCodec(searchv1alpha1.SchemeGroupVersion)),
searchscheme.Codecs.LegacyCodec(searchv1alpha1.SchemeGroupVersion)),
}
o.RecommendedOptions.Etcd.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(searchv1alpha1.SchemeGroupVersion,
schema.GroupKind{Group: searchv1alpha1.GroupName})
@ -101,8 +102,8 @@ func (o *Options) Config() (*search.Config, error) {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
serverConfig := genericapiserver.NewRecommendedConfig(search.Codecs)
serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(search.Scheme))
serverConfig := genericapiserver.NewRecommendedConfig(searchscheme.Codecs)
serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(searchscheme.Scheme))
serverConfig.OpenAPIConfig.Info.Title = "karmada-search"
if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil {
return nil, err

View File

@ -0,0 +1,42 @@
package scheme
import (
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
searchinstall "github.com/karmada-io/karmada/pkg/apis/search/install"
)
var (
// Scheme defines methods for serializing and deserializing API objects.
Scheme = runtime.NewScheme()
// Codecs provides methods for retrieving codecs and serializers for specific
// versions and content types.
Codecs = serializer.NewCodecFactory(Scheme)
// ParameterCodec handles versioning of objects that are converted to query parameters.
ParameterCodec = runtime.NewParameterCodec(Scheme)
)
func init() {
searchinstall.Install(Scheme)
utilruntime.Must(internalversion.AddToScheme(Scheme))
// we need to add the options to empty v1
// TODO fix the server code to avoid this
metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"})
// TODO: keep the generic API server from wanting this
unversioned := schema.GroupVersion{Group: "", Version: "v1"}
Scheme.AddUnversionedTypes(unversioned,
&metav1.Status{},
&metav1.APIVersions{},
&metav1.APIGroupList{},
&metav1.APIGroup{},
&metav1.APIResourceList{},
)
}

View File

@ -5,6 +5,8 @@ import (
"fmt"
"net/http"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
@ -13,6 +15,9 @@ import (
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme"
)
type errorResponse struct {
@ -36,6 +41,26 @@ func (r *SearchREST) newCacheHandler(info *genericrequest.RequestInfo, responder
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
enc := json.NewEncoder(rw)
opts := metainternalversion.ListOptions{}
if err := searchscheme.ParameterCodec.DecodeParameters(req.URL.Query(), metav1.SchemeGroupVersion, &opts); err != nil {
rw.WriteHeader(http.StatusBadRequest)
klog.Errorf("Failed to decode parameters from req.URL.Query(): %v", err)
_ = enc.Encode(errorResponse{Error: err.Error()})
return
}
if errs := metainternalversionvalidation.ValidateListOptions(&opts); len(errs) > 0 {
rw.WriteHeader(http.StatusBadRequest)
klog.Errorf("Invalid decoded ListOptions: %v.", errs)
_ = enc.Encode(errorResponse{Error: errs.ToAggregate().Error()})
return
}
label := labels.Everything()
if opts.LabelSelector != nil {
label = opts.LabelSelector
}
clusters, err := r.clusterLister.List(labels.Everything())
if err != nil {
rw.WriteHeader(http.StatusInternalServerError)
@ -43,7 +68,28 @@ func (r *SearchREST) newCacheHandler(info *genericrequest.RequestInfo, responder
return
}
// TODO: process opts.Limit to prevent the client from being unable to process the response
// due to the large size of the response body.
objItems := r.getObjectItemsFromClusters(clusters, resourceGVR, info.Namespace, info.Name, label)
rr := reqResponse{
TypeMeta: metav1.TypeMeta{
APIVersion: resourceGVR.GroupVersion().String(),
Kind: "List", // TODO: obtains the kind type of the actual resource list.
},
Items: objItems,
}
_ = enc.Encode(rr)
}), nil
}
func (r *SearchREST) getObjectItemsFromClusters(
clusters []*clusterv1alpha1.Cluster,
objGVR schema.GroupVersionResource,
namespace, name string,
label labels.Selector) []runtime.Object {
items := make([]runtime.Object, 0)
// TODO: encapsulating the Interface for Obtaining resource from the Multi-Cluster Cache.
for _, cluster := range clusters {
singleClusterManger := r.multiClusterInformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterManger == nil {
@ -51,44 +97,38 @@ func (r *SearchREST) newCacheHandler(info *genericrequest.RequestInfo, responder
continue
}
switch {
case len(info.Namespace) > 0 && len(info.Name) > 0:
resourceObject, err := singleClusterManger.Lister(resourceGVR).ByNamespace(info.Namespace).Get(info.Name)
var err error
objLister := singleClusterManger.Lister(objGVR)
if len(name) > 0 {
var resourceObject runtime.Object
if len(namespace) > 0 {
resourceObject, err = objLister.ByNamespace(namespace).Get(name)
} else {
resourceObject, err = objLister.Get(name)
}
if err != nil {
klog.Errorf("Failed to get %s resource(%s/%s) from cluster(%s)'s informer cache.",
resourceGVR, info.Namespace, info.Name, cluster.Name)
klog.Errorf("Failed to get %s resource(%s/%s) from cluster(%s)'s informer cache: %v",
objGVR, namespace, name, cluster.Name, err)
continue
}
items = append(items, addAnnotationWithClusterName([]runtime.Object{resourceObject}, cluster.Name)...)
case len(info.Namespace) > 0:
resourceObjects, err := singleClusterManger.Lister(resourceGVR).ByNamespace(info.Namespace).List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list %s resource under namespace(%s) from cluster(%s)'s informer cache.",
resourceGVR, info.Namespace, cluster.Name)
} else {
var resourceObjects []runtime.Object
if len(namespace) > 0 {
resourceObjects, err = objLister.ByNamespace(namespace).List(label)
} else {
resourceObjects, err = objLister.List(label)
}
items = append(items, addAnnotationWithClusterName(resourceObjects, cluster.Name)...)
case len(info.Name) > 0:
resourceObject, err := singleClusterManger.Lister(resourceGVR).Get(info.Name)
if err != nil {
klog.Errorf("Failed to get %s resource(%s) from cluster(%s)'s informer cache.",
resourceGVR, info.Name, cluster.Name)
}
items = append(items, addAnnotationWithClusterName([]runtime.Object{resourceObject}, cluster.Name)...)
default:
resourceObjects, err := singleClusterManger.Lister(resourceGVR).List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list %s resource from cluster(%s)'s informer cache.",
resourceGVR, cluster.Name)
klog.Errorf("Failed to list %s resource from cluster(%s)'s informer cache: %v",
objGVR, cluster.Name, err)
continue
}
items = append(items, addAnnotationWithClusterName(resourceObjects, cluster.Name)...)
}
}
rr := reqResponse{}
rr.APIVersion = fmt.Sprintf("%s/%s", info.APIGroup, info.APIVersion)
rr.Kind = "List"
rr.Items = items
_ = enc.Encode(rr)
}), nil
return items
}
func addAnnotationWithClusterName(resourceObjects []runtime.Object, clusterName string) []runtime.Object {
@ -100,6 +140,7 @@ func addAnnotationWithClusterName(resourceObjects []runtime.Object, clusterName
if annotations == nil {
annotations = make(map[string]string)
}
// TODO: move this annotation key `cluster.karmada.io/name` to the Cluster API.
annotations["cluster.karmada.io/name"] = clusterName
resource.SetAnnotations(annotations)

View File

@ -1,50 +1,18 @@
package search
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/klog/v2"
searchapis "github.com/karmada-io/karmada/pkg/apis/search"
searchinstall "github.com/karmada-io/karmada/pkg/apis/search/install"
searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
searchstorage "github.com/karmada-io/karmada/pkg/registry/search/storage"
"github.com/karmada-io/karmada/pkg/util/informermanager"
)
var (
// Scheme defines methods for serializing and deserializing API objects.
Scheme = runtime.NewScheme()
// Codecs provides methods for retrieving codecs and serializers for specific
// versions and content types.
Codecs = serializer.NewCodecFactory(Scheme)
// ParameterCodec handles versioning of objects that are converted to query parameters.
ParameterCodec = runtime.NewParameterCodec(Scheme)
)
func init() {
searchinstall.Install(Scheme)
// we need to add the options to empty v1
// TODO fix the server code to avoid this
metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"})
// TODO: keep the generic API server from wanting this
unversioned := schema.GroupVersion{Group: "", Version: "v1"}
Scheme.AddUnversionedTypes(unversioned,
&metav1.Status{},
&metav1.APIVersions{},
&metav1.APIGroupList{},
&metav1.APIGroup{},
&metav1.APIResourceList{},
)
}
// ExtraConfig holds custom apiserver config
type ExtraConfig struct {
MultiClusterInformerManager informermanager.MultiClusterInformerManager
@ -102,9 +70,9 @@ func (c completedConfig) New() (*APIServer, error) {
GenericAPIServer: genericServer,
}
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(searchapis.GroupName, Scheme, ParameterCodec, Codecs)
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(searchapis.GroupName, searchscheme.Scheme, searchscheme.ParameterCodec, searchscheme.Codecs)
resourceRegistryStorage, err := searchstorage.NewResourceRegistryStorage(Scheme, c.GenericConfig.RESTOptionsGetter)
resourceRegistryStorage, err := searchstorage.NewResourceRegistryStorage(searchscheme.Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
klog.Errorf("unable to create REST storage for a resource due to %v, will die", err)
return nil, err