Merge pull request #1891 from XiShanYongYe-Chang/karmada-search

Support karmada-search component search interface
This commit is contained in:
karmada-bot 2022-05-27 10:46:45 +08:00 committed by GitHub
commit bad8959132
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 261 additions and 26 deletions

View File

@ -14,7 +14,6 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
netutils "k8s.io/utils/net"
searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1"
@ -76,11 +75,7 @@ func (o *Options) Run(ctx context.Context) error {
return err
}
restConfig := config.GenericConfig.ClientConfig
restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst
kubeClientSet := kubernetes.NewForConfigOrDie(restConfig)
server, err := config.Complete().New(kubeClientSet)
server, err := config.Complete().New()
if err != nil {
return err
}
@ -92,11 +87,7 @@ func (o *Options) Run(ctx context.Context) error {
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error {
// start ResourceRegistry controller
ctl, err := search.NewController(restConfig, search.CachedResourceHandler())
if err != nil {
return err
}
ctl.Start(context.StopCh)
config.Controller.Start(context.StopCh)
return nil
})
@ -117,9 +108,16 @@ func (o *Options) Config() (*search.Config, error) {
return nil, err
}
serverConfig.ClientConfig.QPS = o.KubeAPIQPS
serverConfig.ClientConfig.Burst = o.KubeAPIBurst
ctl, err := search.NewController(serverConfig.ClientConfig, search.CachedResourceHandler())
if err != nil {
return nil, err
}
config := &search.Config{
GenericConfig: serverConfig,
ExtraConfig: search.ExtraConfig{},
Controller: ctl,
}
return config, nil
}

View File

@ -0,0 +1,107 @@
package storage
import (
"encoding/json"
"fmt"
"net/http"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/klog/v2"
)
type errorResponse struct {
Error string `json:"error"`
}
type reqResponse struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []runtime.Object `json:"items"`
}
func (r *SearchREST) newCacheHandler(info *genericrequest.RequestInfo, responder rest.Responder) (http.Handler, error) {
resourceGVR := schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
}
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
enc := json.NewEncoder(rw)
clusters, err := r.clusterLister.List(labels.Everything())
if err != nil {
rw.WriteHeader(http.StatusInternalServerError)
_ = enc.Encode(errorResponse{Error: fmt.Sprintf("Failed to list clusters: %v", err)})
return
}
items := make([]runtime.Object, 0)
for _, cluster := range clusters {
singleClusterManger := r.multiClusterInformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterManger == nil {
klog.Warningf("SingleClusterInformerManager for cluster(%s) is nil.", cluster.Name)
continue
}
switch {
case len(info.Namespace) > 0 && len(info.Name) > 0:
resourceObject, err := singleClusterManger.Lister(resourceGVR).ByNamespace(info.Namespace).Get(info.Name)
if err != nil {
klog.Errorf("Failed to get %s resource(%s/%s) from cluster(%s)'s informer cache.",
resourceGVR, info.Namespace, info.Name, cluster.Name)
}
items = append(items, addAnnotationWithClusterName([]runtime.Object{resourceObject}, cluster.Name)...)
case len(info.Namespace) > 0:
resourceObjects, err := singleClusterManger.Lister(resourceGVR).ByNamespace(info.Namespace).List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list %s resource under namespace(%s) from cluster(%s)'s informer cache.",
resourceGVR, info.Namespace, cluster.Name)
}
items = append(items, addAnnotationWithClusterName(resourceObjects, cluster.Name)...)
case len(info.Name) > 0:
resourceObject, err := singleClusterManger.Lister(resourceGVR).Get(info.Name)
if err != nil {
klog.Errorf("Failed to get %s resource(%s) from cluster(%s)'s informer cache.",
resourceGVR, info.Name, cluster.Name)
}
items = append(items, addAnnotationWithClusterName([]runtime.Object{resourceObject}, cluster.Name)...)
default:
resourceObjects, err := singleClusterManger.Lister(resourceGVR).List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list %s resource from cluster(%s)'s informer cache.",
resourceGVR, cluster.Name)
}
items = append(items, addAnnotationWithClusterName(resourceObjects, cluster.Name)...)
}
}
rr := reqResponse{}
rr.APIVersion = fmt.Sprintf("%s/%s", info.APIGroup, info.APIVersion)
rr.Kind = "List"
rr.Items = items
_ = enc.Encode(rr)
}), nil
}
func addAnnotationWithClusterName(resourceObjects []runtime.Object, clusterName string) []runtime.Object {
resources := make([]runtime.Object, 0)
for index := range resourceObjects {
resource := resourceObjects[index].(*unstructured.Unstructured)
annotations := resource.GetAnnotations()
annotations["cluster.karmada.io/name"] = clusterName
resource.SetAnnotations(annotations)
resources = append(resources, resource)
}
return resources
}

View File

@ -0,0 +1,14 @@
package storage
import (
"net/http"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
)
func (r *SearchREST) newOpenSearchHandler(info *genericrequest.RequestInfo, responder rest.Responder) (http.Handler, error) {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// Construct a handler and send the request to the ES.
}), nil
}

View File

@ -0,0 +1,76 @@
package storage
import (
"fmt"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
)
var (
apiPrefixes = sets.NewString("api", "apis")
groupLessAPIPrefixes = sets.NewString("api")
)
func parseK8sNativeResourceInfo(reqParts []string) (*genericrequest.RequestInfo, error) {
requestInfo := &genericrequest.RequestInfo{
IsResourceRequest: false,
Path: strings.Join(reqParts, "/"),
}
if len(reqParts) < 3 {
// return a non-resource request
return requestInfo, nil
}
if !apiPrefixes.Has(reqParts[0]) {
// return a non-resource request
return requestInfo, nil
}
requestInfo.APIPrefix = reqParts[0]
currentParts := reqParts[1:]
if !groupLessAPIPrefixes.Has(requestInfo.APIPrefix) {
if len(currentParts) < 3 {
// return a non-resource request
return requestInfo, nil
}
requestInfo.APIGroup = currentParts[0]
currentParts = currentParts[1:]
}
requestInfo.IsResourceRequest = true
requestInfo.APIVersion = currentParts[0]
currentParts = currentParts[1:]
// URL forms: /namespaces/{namespace}/{kind}/*, where parts are adjusted to be relative to kind
if currentParts[0] == "namespaces" {
if len(currentParts) > 1 {
requestInfo.Namespace = currentParts[1]
if len(currentParts) > 2 {
currentParts = currentParts[2:]
}
}
} else {
requestInfo.Namespace = metav1.NamespaceNone
}
switch {
case len(currentParts) >= 3:
return nil, fmt.Errorf("invalid request parts(%s) for k8s native request URL", currentParts)
case len(currentParts) >= 2:
requestInfo.Name = currentParts[1]
fallthrough
case len(currentParts) >= 1:
requestInfo.Resource = currentParts[0]
}
if requestInfo.Resource == "namespace" {
requestInfo.Namespace = ""
}
return requestInfo, nil
}

View File

@ -2,19 +2,24 @@ package storage
import (
"context"
"fmt"
"net/http"
"strings"
"k8s.io/apimachinery/pkg/runtime"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
searchapis "github.com/karmada-io/karmada/pkg/apis/search"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util/informermanager"
)
// SearchREST implements a RESTStorage for search resource.
type SearchREST struct {
kubeClient kubernetes.Interface
multiClusterInformerManager informermanager.MultiClusterInformerManager
clusterLister clusterlister.ClusterLister
// add needed parameters here
}
@ -24,9 +29,12 @@ var _ rest.Storage = &SearchREST{}
var _ rest.Connecter = &SearchREST{}
// NewSearchREST returns a RESTStorage object that will work against search.
func NewSearchREST(client kubernetes.Interface) *SearchREST {
func NewSearchREST(
multiClusterInformerManager informermanager.MultiClusterInformerManager,
clusterLister clusterlister.ClusterLister) *SearchREST {
return &SearchREST{
kubeClient: client,
multiClusterInformerManager: multiClusterInformerManager,
clusterLister: clusterLister,
}
}
@ -50,12 +58,37 @@ func (r *SearchREST) NewConnectOptions() (runtime.Object, bool, string) {
return nil, true, ""
}
// Connect returns a handler for the ES search.
// Connect returns a handler for search.
func (r *SearchREST) Connect(ctx context.Context, id string, _ runtime.Object, responder rest.Responder) (http.Handler, error) {
klog.Infof("Prepare for construct handler to connect ES.")
info, ok := genericrequest.RequestInfoFrom(ctx)
if !ok {
return nil, fmt.Errorf("no RequestInfo found in the context")
}
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if len(info.Parts) < 3 {
return nil, fmt.Errorf("invalid requestInfo parts: %v", info.Parts)
}
// Construct a handler and send the request to the ES.
}), nil
// reqParts are slices split by the k8s-native URL that include
// APIPrefix, APIGroup, APIVersion, Namespace and Resource.
// For example, the whole request URL is /apis/search.karmada.io/v1alpha1/search/cache/api/v1/nodes
// info.Parts is [search cache api v1 nodes], so reParts is [api v1 nodes]
reqParts := info.Parts[2:]
nativeResourceInfo, err := parseK8sNativeResourceInfo(reqParts)
if err != nil {
klog.Errorf("Failed to parse k8s native RequestInfo, err: %v", err)
return nil, err
}
if !nativeResourceInfo.IsResourceRequest {
return nil, fmt.Errorf("k8s native Request URL(%s) is not a resource request", strings.Join(reqParts, "/"))
}
switch id {
case "cache":
return r.newCacheHandler(nativeResourceInfo, responder)
case "opensearch":
return r.newOpenSearchHandler(nativeResourceInfo, responder)
default:
return nil, fmt.Errorf("connect with unrecognized search category %s", id)
}
}

View File

@ -8,12 +8,13 @@ import (
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
searchapis "github.com/karmada-io/karmada/pkg/apis/search"
searchinstall "github.com/karmada-io/karmada/pkg/apis/search/install"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
searchstorage "github.com/karmada-io/karmada/pkg/registry/search/storage"
"github.com/karmada-io/karmada/pkg/util/informermanager"
)
var (
@ -46,13 +47,16 @@ func init() {
// ExtraConfig holds custom apiserver config
type ExtraConfig struct {
MultiClusterInformerManager informermanager.MultiClusterInformerManager
ClusterLister clusterlister.ClusterLister
// Add custom config if necessary.
}
// Config defines the config for the APIServer.
type Config struct {
GenericConfig *genericapiserver.RecommendedConfig
ExtraConfig ExtraConfig
Controller *Controller
}
// APIServer contains state for karmada-search.
@ -74,7 +78,10 @@ type CompletedConfig struct {
func (cfg *Config) Complete() CompletedConfig {
c := completedConfig{
cfg.GenericConfig.Complete(),
&cfg.ExtraConfig,
&ExtraConfig{
MultiClusterInformerManager: cfg.Controller.InformerManager,
ClusterLister: cfg.Controller.clusterLister,
},
}
c.GenericConfig.Version = &version.Info{
@ -85,7 +92,7 @@ func (cfg *Config) Complete() CompletedConfig {
return CompletedConfig{&c}
}
func (c completedConfig) New(kubeClient kubernetes.Interface) (*APIServer, error) {
func (c completedConfig) New() (*APIServer, error) {
genericServer, err := c.GenericConfig.New("karmada-search", genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
@ -102,7 +109,7 @@ func (c completedConfig) New(kubeClient kubernetes.Interface) (*APIServer, error
klog.Errorf("unable to create REST storage for a resource due to %v, will die", err)
return nil, err
}
searchREST := searchstorage.NewSearchREST(kubeClient)
searchREST := searchstorage.NewSearchREST(c.ExtraConfig.MultiClusterInformerManager, c.ExtraConfig.ClusterLister)
v1alpha1search := map[string]rest.Storage{}
v1alpha1search["resourceregistries"] = resourceRegistryStorage.ResourceRegistry