karmada/pkg/search/proxy/store/multi_cluster_cache.go

527 lines
16 KiB
Go

/*
Copyright 2022 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"context"
"fmt"
"math"
"sort"
"strings"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
// Store is the cache for resources from multiple member clusters
type Store interface {
UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error
HasResource(resource schema.GroupVersionResource) bool
GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error)
Stop()
Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error)
List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error)
Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error)
// ReadinessCheck checks if the storage is ready for accepting requests.
ReadinessCheck() error
}
// MultiClusterCache caches resource from multi member clusters
type MultiClusterCache struct {
lock sync.RWMutex
cache map[string]*clusterCache
registeredResources map[schema.GroupVersionResource]struct{}
restMapper meta.RESTMapper
// newClientFunc returns a dynamic client for member cluster apiserver
newClientFunc func(string) (dynamic.Interface, error)
}
var _ Store = &MultiClusterCache{}
// NewMultiClusterCache return a cache for resources from member clusters
func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error), restMapper meta.RESTMapper) *MultiClusterCache {
return &MultiClusterCache{
restMapper: restMapper,
newClientFunc: newClientFunc,
cache: map[string]*clusterCache{},
registeredResources: map[schema.GroupVersionResource]struct{}{},
}
}
// ReadinessCheck checks if the storage is ready for accepting requests.
func (c *MultiClusterCache) ReadinessCheck() error {
c.lock.RLock()
defer c.lock.RUnlock()
var failedChecks []string
for cluster, cc := range c.cache {
if cc.readinessCheck() != nil {
failedChecks = append(failedChecks, cluster)
}
}
if len(failedChecks) == 0 {
klog.Infof("MultiClusterCache is ready for all registered clusters")
return nil
}
klog.V(4).Infof("ClusterCache is not ready for clusters: %v", failedChecks)
return fmt.Errorf("ClusterCache is not ready for clusters: %v", failedChecks)
}
// UpdateCache update cache for multi clusters
func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error {
if klog.V(3).Enabled() {
start := time.Now()
defer func() {
klog.Infof("MultiClusterCache update cache takes %v", time.Since(start))
}()
}
c.lock.Lock()
defer c.lock.Unlock()
// remove non-exist clusters
for clusterName := range c.cache {
if _, exist := resourcesByCluster[clusterName]; !exist {
klog.Infof("Remove cache for cluster %s", clusterName)
c.cache[clusterName].stop()
delete(c.cache, clusterName)
}
}
// add/update cluster cache
for clusterName, resources := range resourcesByCluster {
cache, exist := c.cache[clusterName]
if !exist {
klog.Infof("Add cache for cluster %v", clusterName)
cache = newClusterCache(clusterName, c.clientForClusterFunc(clusterName), c.restMapper)
c.cache[clusterName] = cache
}
err := cache.updateCache(resources)
if err != nil {
return err
}
}
c.registeredResources = registeredResources
return nil
}
// Stop stops the cache for multi cluster.
func (c *MultiClusterCache) Stop() {
c.lock.RLock()
defer c.lock.RUnlock()
for _, cache := range c.cache {
cache.stop()
}
}
// HasResource return whether resource is registered.
func (c *MultiClusterCache) HasResource(resource schema.GroupVersionResource) bool {
c.lock.RLock()
defer c.lock.RUnlock()
_, ok := c.registeredResources[resource]
return ok
}
// GetResourceFromCache returns which cluster the resource belong to.
func (c *MultiClusterCache) GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) {
obj, cluster, _, err := c.getResourceFromCache(ctx, gvr, namespace, name)
return obj, cluster, err
}
// Get returns the target object
func (c *MultiClusterCache) Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) {
_, clusterName, cache, err := c.getResourceFromCache(ctx, gvr, request.NamespaceValue(ctx), name)
if err != nil {
return nil, err
}
obj, err := cache.Get(ctx, name, options)
if err != nil {
return nil, err
}
cloneObj := obj.DeepCopyObject()
accessor, err := meta.Accessor(cloneObj)
if err != nil {
return nil, err
}
mrv := newMultiClusterResourceVersionWithCapacity(1)
mrv.set(clusterName, accessor.GetResourceVersion())
accessor.SetResourceVersion(mrv.String())
addCacheSourceAnnotation(cloneObj, clusterName)
return cloneObj, err
}
// List returns the object list
// nolint:gocyclo
func (c *MultiClusterCache) List(ctx context.Context, gvr schema.GroupVersionResource, o *metainternalversion.ListOptions) (runtime.Object, error) {
klog.V(5).Infof("Request list %v with rv=%#v, continue=%#v, limit=%v", gvr.String(),
newMultiClusterResourceVersionFromString(o.ResourceVersion), newMultiClusterContinueFromString(o.Continue), o.Limit)
requestCluster, options, requestResourceVersion := prepareBeforeList(o)
var resultObject runtime.Object
items := make([]runtime.Object, 0, int(math.Min(float64(options.Limit), 1024)))
clusters := c.getClusterNames()
sort.Strings(clusters)
responseResourceVersion := requestResourceVersion.clone()
responseContinue := multiClusterContinue{}
trace := utiltrace.New("MultiClusterCache.List",
utiltrace.Field{Key: "gvr", Value: gvr},
utiltrace.Field{Key: "rv", Value: newMultiClusterResourceVersionFromString(o.ResourceVersion)},
utiltrace.Field{Key: "continue", Value: newMultiClusterContinueFromString(o.Continue)},
utiltrace.Field{Key: "limit", Value: o.Limit},
)
defer trace.LogIfLong(5 * time.Second)
defer func() {
klog.V(5).Infof("Response list %v with rv=%#v continue=%#v", gvr.String(), responseResourceVersion, responseContinue)
}()
listFunc := func(cluster string) (int, string, error) {
if requestCluster != "" && requestCluster != cluster {
return 0, "", nil
}
// clear the requestContinue, for searching other clusters at next list
defer func() {
requestCluster = ""
options.Continue = ""
}()
cache := c.cacheForClusterResource(cluster, gvr)
if cache == nil {
klog.V(4).Infof("cluster %v does not cache resource %v", cluster, gvr.String())
return 0, "", nil
}
if options.Continue != "" {
// specifying resource version is not allowed when using continue
options.ResourceVersion = ""
} else {
options.ResourceVersion = requestResourceVersion.get(cluster)
}
defer trace.Step("list from member cluster",
utiltrace.Field{Key: "cluster", Value: cluster},
utiltrace.Field{Key: "options", Value: fmt.Sprintf("%#v", options)},
)
obj, err := cache.List(ctx, options)
if err != nil {
return 0, "", err
}
list, err := meta.ListAccessor(obj)
if err != nil {
return 0, "", err
}
if resultObject == nil {
resultObject = obj
}
cnt := 0
err = meta.EachListItem(obj, func(o runtime.Object) error {
clone := o.DeepCopyObject()
addCacheSourceAnnotation(clone, cluster)
items = append(items, clone)
cnt++
return nil
})
if err != nil {
return 0, "", err
}
responseResourceVersion.set(cluster, list.GetResourceVersion())
return cnt, list.GetContinue(), nil
}
if options.Limit == 0 {
for _, cluster := range clusters {
_, _, err := listFunc(cluster)
if err != nil {
return nil, err
}
}
} else {
for clusterIdx, cluster := range clusters {
n, cont, err := listFunc(cluster)
if err != nil {
return nil, err
}
options.Limit -= int64(n)
if options.Limit <= 0 {
if cont != "" {
// Current cluster has remaining items, return this cluster name and continue for next list.
responseContinue.Cluster = cluster
responseContinue.Continue = cont
} else if (clusterIdx + 1) < len(clusters) {
// Current cluster has no remaining items. But we don't know whether next cluster has.
// So return the next cluster name for continue listing.
responseContinue.Cluster = clusters[clusterIdx+1]
}
// No more items remains. Break the chuck list.
break
}
}
}
if err := c.fillMissingClusterResourceVersion(ctx, responseResourceVersion, clusters, gvr); err != nil {
return nil, err
}
responseContinue.RV = responseResourceVersion.String()
if resultObject == nil {
resultObject = &metav1.List{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "List",
},
ListMeta: metav1.ListMeta{},
Items: []runtime.RawExtension{},
}
}
err := meta.SetList(resultObject, items)
if err != nil {
return nil, err
}
accessor, err := meta.ListAccessor(resultObject)
if err != nil {
return nil, err
}
accessor.SetResourceVersion(responseResourceVersion.String())
accessor.SetContinue(responseContinue.String())
return resultObject, nil
}
// Watch watches the resource
func (c *MultiClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) {
klog.V(5).Infof("Request watch %v with rv=%v", gvr.String(), options.ResourceVersion)
resourceVersion := newMultiClusterResourceVersionFromString(options.ResourceVersion)
responseResourceVersion := newMultiClusterResourceVersionFromString(options.ResourceVersion)
responseResourceVersionLock := sync.Mutex{}
setObjectResourceVersionFunc := func(cluster string, obj runtime.Object) {
accessor, err := meta.Accessor(obj)
if err != nil {
return
}
responseResourceVersionLock.Lock()
defer responseResourceVersionLock.Unlock()
responseResourceVersion.set(cluster, accessor.GetResourceVersion())
accessor.SetResourceVersion(responseResourceVersion.String())
}
mux := newWatchMux()
clusters := c.getClusterNames()
for i := range clusters {
cluster := clusters[i]
options.ResourceVersion = resourceVersion.get(cluster)
cache := c.cacheForClusterResource(cluster, gvr)
if cache == nil {
continue
}
w, err := cache.Watch(ctx, options)
if err != nil {
return nil, err
}
mux.AddSource(w, func(e watch.Event) {
setObjectResourceVersionFunc(cluster, e.Object)
addCacheSourceAnnotation(e.Object, cluster)
})
}
mux.Start()
return mux, nil
}
func (c *MultiClusterCache) getClusterNames() []string {
c.lock.RLock()
defer c.lock.RUnlock()
clusters := make([]string, 0, len(c.cache))
for c := range c.cache {
clusters = append(clusters, c)
}
return clusters
}
func (c *MultiClusterCache) cacheForClusterResource(cluster string, gvr schema.GroupVersionResource) *resourceCache {
c.lock.RLock()
cc := c.cache[cluster]
c.lock.RUnlock()
if cc == nil {
return nil
}
return cc.cacheForResource(gvr)
}
func (c *MultiClusterCache) getResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, *resourceCache, error) {
c.lock.RLock()
defer c.lock.RUnlock()
var findObjects []runtime.Object
var findCaches []*resourceCache
// Set ResourceVersion=0 to get resource from cache.
options := &metav1.GetOptions{ResourceVersion: "0"}
for clusterName, cc := range c.cache {
cache := cc.cache[gvr]
if cache == nil {
// This cluster doesn't cache this resource
continue
}
obj, err := cache.Get(request.WithNamespace(ctx, namespace), name, options)
if apierrors.IsNotFound(err) {
continue
}
if err != nil {
return nil, "", nil, fmt.Errorf("fail to get %v %v from %v: %v", namespace, name, clusterName, err)
}
findObjects = append(findObjects, obj)
findCaches = append(findCaches, cache)
}
if len(findObjects) == 0 {
return nil, "", nil, apierrors.NewNotFound(gvr.GroupResource(), name)
}
if len(findObjects) > 1 {
clusterNames := make([]string, 0, len(findCaches))
for _, cache := range findCaches {
clusterNames = append(clusterNames, cache.clusterName)
}
return nil, "", nil, apierrors.NewConflict(gvr.GroupResource(), name, fmt.Errorf("ambiguous objects in clusters [%v]", strings.Join(clusterNames, ", ")))
}
return findObjects[0], findCaches[0].clusterName, findCaches[0], nil
}
func (c *MultiClusterCache) clientForClusterFunc(cluster string) func() (dynamic.Interface, error) {
return func() (dynamic.Interface, error) {
return c.newClientFunc(cluster)
}
}
func (c *MultiClusterCache) fillMissingClusterResourceVersion(ctx context.Context, mcv *multiClusterResourceVersion, clusters []string, gvr schema.GroupVersionResource) error {
errChan := make(chan error)
var lock sync.Mutex
var wg sync.WaitGroup
for _, cluster := range clusters {
if _, ok := mcv.rvs[cluster]; ok {
continue
}
wg.Add(1)
go func(cluster string) {
defer wg.Done()
klog.V(5).Infof("fillMissingClusterResourceVersion gvr=%v cluster=%v", gvr, cluster)
rv, err := c.getClusterResourceVersion(ctx, cluster, gvr)
if err != nil {
errChan <- err
return
}
if rv == "" {
return
}
lock.Lock()
defer lock.Unlock()
mcv.set(cluster, rv)
}(cluster)
}
waitChan := make(chan struct{}, 1)
go func() {
wg.Wait()
waitChan <- struct{}{}
}()
var err error
select {
case <-waitChan:
case err = <-errChan:
}
return err
}
func (c *MultiClusterCache) getClusterResourceVersion(ctx context.Context, cluster string, gvr schema.GroupVersionResource) (string, error) {
cache := c.cacheForClusterResource(cluster, gvr)
if cache == nil {
klog.V(4).Infof("cluster %v does not cache resource %v", cluster, gvr.String())
return "", nil
}
obj, err := cache.List(ctx, &metainternalversion.ListOptions{
Limit: 1,
})
if err != nil {
return "", err
}
listObj, err := meta.ListAccessor(obj)
if err != nil {
return "", err
}
return listObj.GetResourceVersion(), nil
}
// Inputs and outputs:
// o.ResourceVersion o.Continue | cluster options.ResourceVersion options.Continue mrv
// xxxx "" | "" xxxx "" decode(xxx)
// "" {rv=xxx,cluster=c2,continue=} | c2 decode(xxx)[c2] "" decode(xxx)
// "" {rv=xxx,cluster=c2,continue=yyy} | c2 "" yyy decode(xxx)
func prepareBeforeList(o *metainternalversion.ListOptions) (cluster string, options *metainternalversion.ListOptions, mrv *multiClusterResourceVersion) {
options = o.DeepCopy()
if o.Continue == "" {
mrv = newMultiClusterResourceVersionFromString(o.ResourceVersion)
return
}
mc := newMultiClusterContinueFromString(o.Continue)
cluster = mc.Cluster
mrv = newMultiClusterResourceVersionFromString(mc.RV)
if mc.Continue == "" {
options.ResourceVersion = mrv.get(cluster)
options.Continue = ""
} else {
options.ResourceVersion = ""
options.Continue = mc.Continue
}
return
}