/* Copyright 2022 The Karmada Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package store import ( "context" "fmt" "math" "sort" "strings" "sync" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/dynamic" "k8s.io/klog/v2" utiltrace "k8s.io/utils/trace" ) // Store is the cache for resources from multiple member clusters type Store interface { UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error HasResource(resource schema.GroupVersionResource) bool GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) Stop() Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error) Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) } // MultiClusterCache caches resource from multi member clusters type MultiClusterCache struct { lock sync.RWMutex cache map[string]*clusterCache registeredResources map[schema.GroupVersionResource]struct{} restMapper meta.RESTMapper // newClientFunc returns a dynamic client for member cluster apiserver newClientFunc func(string) (dynamic.Interface, error) } var _ Store = &MultiClusterCache{} // NewMultiClusterCache return a cache for resources from member clusters func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error), restMapper meta.RESTMapper) *MultiClusterCache { return &MultiClusterCache{ restMapper: restMapper, newClientFunc: newClientFunc, cache: map[string]*clusterCache{}, registeredResources: map[schema.GroupVersionResource]struct{}{}, } } // UpdateCache update cache for multi clusters func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error { if klog.V(3).Enabled() { start := time.Now() defer func() { klog.Infof("MultiClusterCache update cache takes %v", time.Since(start)) }() } c.lock.Lock() defer c.lock.Unlock() // remove non-exist clusters for clusterName := range c.cache { if _, exist := resourcesByCluster[clusterName]; !exist { klog.Infof("Remove cache for cluster %s", clusterName) c.cache[clusterName].stop() delete(c.cache, clusterName) } } // add/update cluster cache for clusterName, resources := range resourcesByCluster { cache, exist := c.cache[clusterName] if !exist { klog.Infof("Add cache for cluster %v", clusterName) cache = newClusterCache(clusterName, c.clientForClusterFunc(clusterName), c.restMapper) c.cache[clusterName] = cache } err := cache.updateCache(resources) if err != nil { return err } } c.registeredResources = registeredResources return nil } // Stop stops the cache for multi cluster. func (c *MultiClusterCache) Stop() { c.lock.RLock() defer c.lock.RUnlock() for _, cache := range c.cache { cache.stop() } } // HasResource return whether resource is registered. func (c *MultiClusterCache) HasResource(resource schema.GroupVersionResource) bool { c.lock.RLock() defer c.lock.RUnlock() _, ok := c.registeredResources[resource] return ok } // GetResourceFromCache returns which cluster the resource belong to. func (c *MultiClusterCache) GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) { obj, cluster, _, err := c.getResourceFromCache(ctx, gvr, namespace, name) return obj, cluster, err } // Get returns the target object func (c *MultiClusterCache) Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) { _, clusterName, cache, err := c.getResourceFromCache(ctx, gvr, request.NamespaceValue(ctx), name) if err != nil { return nil, err } obj, err := cache.Get(ctx, name, options) if err != nil { return nil, err } cloneObj := obj.DeepCopyObject() accessor, err := meta.Accessor(cloneObj) if err != nil { return nil, err } mrv := newMultiClusterResourceVersionWithCapacity(1) mrv.set(clusterName, accessor.GetResourceVersion()) accessor.SetResourceVersion(mrv.String()) addCacheSourceAnnotation(cloneObj, clusterName) return cloneObj, err } // List returns the object list // nolint:gocyclo func (c *MultiClusterCache) List(ctx context.Context, gvr schema.GroupVersionResource, o *metainternalversion.ListOptions) (runtime.Object, error) { klog.V(5).Infof("Request list %v with rv=%#v, continue=%#v, limit=%v", gvr.String(), newMultiClusterResourceVersionFromString(o.ResourceVersion), newMultiClusterContinueFromString(o.Continue), o.Limit) requestCluster, options, requestResourceVersion := prepareBeforeList(o) var resultObject runtime.Object items := make([]runtime.Object, 0, int(math.Min(float64(options.Limit), 1024))) clusters := c.getClusterNames() sort.Strings(clusters) responseResourceVersion := requestResourceVersion.clone() responseContinue := multiClusterContinue{} trace := utiltrace.New("MultiClusterCache.List", utiltrace.Field{Key: "gvr", Value: gvr}, utiltrace.Field{Key: "rv", Value: newMultiClusterResourceVersionFromString(o.ResourceVersion)}, utiltrace.Field{Key: "continue", Value: newMultiClusterContinueFromString(o.Continue)}, utiltrace.Field{Key: "limit", Value: o.Limit}, ) defer trace.LogIfLong(5 * time.Second) defer func() { klog.V(5).Infof("Response list %v with rv=%#v continue=%#v", gvr.String(), responseResourceVersion, responseContinue) }() listFunc := func(cluster string) (int, string, error) { if requestCluster != "" && requestCluster != cluster { return 0, "", nil } // clear the requestContinue, for searching other clusters at next list defer func() { requestCluster = "" options.Continue = "" }() cache := c.cacheForClusterResource(cluster, gvr) if cache == nil { klog.V(4).Infof("cluster %v does not cache resource %v", cluster, gvr.String()) return 0, "", nil } if options.Continue != "" { // specifying resource version is not allowed when using continue options.ResourceVersion = "" } else { options.ResourceVersion = requestResourceVersion.get(cluster) } defer trace.Step("list from member cluster", utiltrace.Field{Key: "cluster", Value: cluster}, utiltrace.Field{Key: "options", Value: fmt.Sprintf("%#v", options)}, ) obj, err := cache.List(ctx, options) if err != nil { return 0, "", err } list, err := meta.ListAccessor(obj) if err != nil { return 0, "", err } if resultObject == nil { resultObject = obj } cnt := 0 err = meta.EachListItem(obj, func(o runtime.Object) error { clone := o.DeepCopyObject() addCacheSourceAnnotation(clone, cluster) items = append(items, clone) cnt++ return nil }) if err != nil { return 0, "", err } responseResourceVersion.set(cluster, list.GetResourceVersion()) return cnt, list.GetContinue(), nil } if options.Limit == 0 { for _, cluster := range clusters { _, _, err := listFunc(cluster) if err != nil { return nil, err } } } else { for clusterIdx, cluster := range clusters { n, cont, err := listFunc(cluster) if err != nil { return nil, err } options.Limit -= int64(n) if options.Limit <= 0 { if cont != "" { // Current cluster has remaining items, return this cluster name and continue for next list. responseContinue.Cluster = cluster responseContinue.Continue = cont } else if (clusterIdx + 1) < len(clusters) { // Current cluster has no remaining items. But we don't know whether next cluster has. // So return the next cluster name for continue listing. responseContinue.Cluster = clusters[clusterIdx+1] } // No more items remains. Break the chuck list. break } } } if err := c.fillMissingClusterResourceVersion(ctx, responseResourceVersion, clusters, gvr); err != nil { return nil, err } responseContinue.RV = responseResourceVersion.String() if resultObject == nil { resultObject = &metav1.List{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "List", }, ListMeta: metav1.ListMeta{}, Items: []runtime.RawExtension{}, } } err := meta.SetList(resultObject, items) if err != nil { return nil, err } accessor, err := meta.ListAccessor(resultObject) if err != nil { return nil, err } accessor.SetResourceVersion(responseResourceVersion.String()) accessor.SetContinue(responseContinue.String()) return resultObject, nil } // Watch watches the resource func (c *MultiClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) { klog.V(5).Infof("Request watch %v with rv=%v", gvr.String(), options.ResourceVersion) resourceVersion := newMultiClusterResourceVersionFromString(options.ResourceVersion) responseResourceVersion := newMultiClusterResourceVersionFromString(options.ResourceVersion) responseResourceVersionLock := sync.Mutex{} setObjectResourceVersionFunc := func(cluster string, obj runtime.Object) { accessor, err := meta.Accessor(obj) if err != nil { return } responseResourceVersionLock.Lock() defer responseResourceVersionLock.Unlock() responseResourceVersion.set(cluster, accessor.GetResourceVersion()) accessor.SetResourceVersion(responseResourceVersion.String()) } mux := newWatchMux() clusters := c.getClusterNames() for i := range clusters { cluster := clusters[i] options.ResourceVersion = resourceVersion.get(cluster) cache := c.cacheForClusterResource(cluster, gvr) if cache == nil { continue } w, err := cache.Watch(ctx, options) if err != nil { return nil, err } mux.AddSource(w, func(e watch.Event) { setObjectResourceVersionFunc(cluster, e.Object) addCacheSourceAnnotation(e.Object, cluster) }) } mux.Start() return mux, nil } func (c *MultiClusterCache) getClusterNames() []string { c.lock.RLock() defer c.lock.RUnlock() clusters := make([]string, 0, len(c.cache)) for c := range c.cache { clusters = append(clusters, c) } return clusters } func (c *MultiClusterCache) cacheForClusterResource(cluster string, gvr schema.GroupVersionResource) *resourceCache { c.lock.RLock() cc := c.cache[cluster] c.lock.RUnlock() if cc == nil { return nil } return cc.cacheForResource(gvr) } func (c *MultiClusterCache) getResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, *resourceCache, error) { c.lock.RLock() defer c.lock.RUnlock() var findObjects []runtime.Object var findCaches []*resourceCache // Set ResourceVersion=0 to get resource from cache. options := &metav1.GetOptions{ResourceVersion: "0"} for clusterName, cc := range c.cache { cache := cc.cache[gvr] if cache == nil { // This cluster doesn't cache this resource continue } obj, err := cache.Get(request.WithNamespace(ctx, namespace), name, options) if apierrors.IsNotFound(err) { continue } if err != nil { return nil, "", nil, fmt.Errorf("fail to get %v %v from %v: %v", namespace, name, clusterName, err) } findObjects = append(findObjects, obj) findCaches = append(findCaches, cache) } if len(findObjects) == 0 { return nil, "", nil, apierrors.NewNotFound(gvr.GroupResource(), name) } if len(findObjects) > 1 { clusterNames := make([]string, 0, len(findCaches)) for _, cache := range findCaches { clusterNames = append(clusterNames, cache.clusterName) } return nil, "", nil, apierrors.NewConflict(gvr.GroupResource(), name, fmt.Errorf("ambiguous objects in clusters [%v]", strings.Join(clusterNames, ", "))) } return findObjects[0], findCaches[0].clusterName, findCaches[0], nil } func (c *MultiClusterCache) clientForClusterFunc(cluster string) func() (dynamic.Interface, error) { return func() (dynamic.Interface, error) { return c.newClientFunc(cluster) } } func (c *MultiClusterCache) fillMissingClusterResourceVersion(ctx context.Context, mcv *multiClusterResourceVersion, clusters []string, gvr schema.GroupVersionResource) error { errChan := make(chan error) var lock sync.Mutex var wg sync.WaitGroup for _, cluster := range clusters { if _, ok := mcv.rvs[cluster]; ok { continue } wg.Add(1) go func(cluster string) { defer wg.Done() klog.V(5).Infof("fillMissingClusterResourceVersion gvr=%v cluster=%v", gvr, cluster) rv, err := c.getClusterResourceVersion(ctx, cluster, gvr) if err != nil { errChan <- err return } if rv == "" { return } lock.Lock() defer lock.Unlock() mcv.set(cluster, rv) }(cluster) } waitChan := make(chan struct{}, 1) go func() { wg.Wait() waitChan <- struct{}{} }() var err error select { case <-waitChan: case err = <-errChan: } return err } func (c *MultiClusterCache) getClusterResourceVersion(ctx context.Context, cluster string, gvr schema.GroupVersionResource) (string, error) { cache := c.cacheForClusterResource(cluster, gvr) if cache == nil { klog.V(4).Infof("cluster %v does not cache resource %v", cluster, gvr.String()) return "", nil } obj, err := cache.List(ctx, &metainternalversion.ListOptions{ Limit: 1, }) if err != nil { return "", err } listObj, err := meta.ListAccessor(obj) if err != nil { return "", err } return listObj.GetResourceVersion(), nil } // Inputs and outputs: // o.ResourceVersion o.Continue | cluster options.ResourceVersion options.Continue mrv // xxxx "" | "" xxxx "" decode(xxx) // "" {rv=xxx,cluster=c2,continue=} | c2 decode(xxx)[c2] "" decode(xxx) // "" {rv=xxx,cluster=c2,continue=yyy} | c2 "" yyy decode(xxx) func prepareBeforeList(o *metainternalversion.ListOptions) (cluster string, options *metainternalversion.ListOptions, mrv *multiClusterResourceVersion) { options = o.DeepCopy() if o.Continue == "" { mrv = newMultiClusterResourceVersionFromString(o.ResourceVersion) return } mc := newMultiClusterContinueFromString(o.Continue) cluster = mc.Cluster mrv = newMultiClusterResourceVersionFromString(mc.RV) if mc.Continue == "" { options.ResourceVersion = mrv.get(cluster) options.Continue = "" } else { options.ResourceVersion = "" options.Continue = mc.Continue } return }