diff --git a/pkg/search/proxy/cluster_proxy.go b/pkg/search/proxy/cluster_proxy.go index a1d968819..65febd471 100644 --- a/pkg/search/proxy/cluster_proxy.go +++ b/pkg/search/proxy/cluster_proxy.go @@ -1,13 +1,17 @@ package proxy import ( + "bytes" "context" "fmt" + "io" + "io/ioutil" "net/http" "path" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" @@ -42,7 +46,25 @@ func (c *clusterProxy) connect(ctx context.Context, requestInfo *request.Request if err != nil { return nil, err } - return c.connectCluster(ctx, clusterName, proxyPath, responder) + + h, err := c.connectCluster(ctx, clusterName, proxyPath, responder) + if err != nil { + return nil, err + } + + if requestInfo.Verb != "update" { + return h, nil + } + + // Objects get by client via proxy are edited some fields, different from objets in member clusters. + // So before update, we shall recover these fields. + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if err = modifyRequest(req, clusterName); err != nil { + responder.Error(err) + return + } + h.ServeHTTP(rw, req) + }), nil } func (c *clusterProxy) connectCluster(ctx context.Context, clusterName string, proxyPath string, responder rest.Responder) (http.Handler, error) { @@ -64,3 +86,40 @@ func (c *clusterProxy) connectCluster(ctx context.Context, clusterName string, p } return proxy.ConnectCluster(ctx, cluster.Name, location, transport, responder, secretGetter) } + +func modifyRequest(req *http.Request, cluster string) error { + if req.ContentLength == 0 { + return nil + } + + body := bytes.NewBuffer(make([]byte, 0, req.ContentLength)) + _, err := io.Copy(body, req.Body) + if err != nil { + return err + } + _ = req.Body.Close() + + defer func() { + req.Body = ioutil.NopCloser(body) + req.ContentLength = int64(body.Len()) + }() + + obj := &unstructured.Unstructured{} + _, _, err = unstructured.UnstructuredJSONScheme.Decode(body.Bytes(), nil, obj) + if err != nil { + // ignore error + return nil + } + + changed := false + changed = store.RemoveCacheSourceAnnotation(obj) || changed + changed = store.RecoverClusterResourceVersion(obj, cluster) || changed + + if changed { + // write changed object into body + body.Reset() + return unstructured.UnstructuredJSONScheme.Encode(obj, body) + } + + return nil +} diff --git a/pkg/search/proxy/cluster_proxy_test.go b/pkg/search/proxy/cluster_proxy_test.go new file mode 100644 index 000000000..f10af4c44 --- /dev/null +++ b/pkg/search/proxy/cluster_proxy_test.go @@ -0,0 +1,144 @@ +package proxy + +import ( + "bytes" + "encoding/json" + "io" + "io/ioutil" + "net/http" + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + "github.com/karmada-io/karmada/pkg/search/proxy/store" +) + +func TestModifyRequest(t *testing.T) { + newObjectFunc := func(annotations map[string]string, resourceVersion string) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetAPIVersion("v1") + obj.SetKind("Pod") + obj.SetAnnotations(annotations) + obj.SetResourceVersion(resourceVersion) + return obj + } + + type args struct { + body interface{} + cluster string + } + type want struct { + body interface{} + } + + tests := []struct { + name string + args args + want want + }{ + { + name: "Empty body", + args: args{ + body: nil, + }, + want: want{ + body: nil, + }, + }, + { + name: "Body with nil annotations", + args: args{ + body: newObjectFunc(nil, ""), + }, + want: want{ + body: newObjectFunc(nil, ""), + }, + }, + { + name: "Body with empty annotations", + args: args{ + body: newObjectFunc(map[string]string{}, ""), + }, + want: want{ + body: newObjectFunc(map[string]string{}, ""), + }, + }, + { + name: "Body with cache source annotation", + args: args{ + body: newObjectFunc(map[string]string{clusterv1alpha1.CacheSourceAnnotationKey: "bar"}, ""), + }, + want: want{ + body: newObjectFunc(map[string]string{}, ""), + }, + }, + { + name: "Body with single cluster resource version", + args: args{ + body: newObjectFunc(nil, "1234"), + cluster: "cluster1", + }, + want: want{ + body: newObjectFunc(nil, "1234"), + }, + }, + { + name: "Body with multi cluster resource version", + args: args{ + body: newObjectFunc(nil, store.BuildMultiClusterResourceVersion(map[string]string{"cluster1": "1234", "cluster2": "5678"})), + cluster: "cluster1", + }, + want: want{ + body: newObjectFunc(nil, "1234"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var body io.Reader + if tt.args.body != nil { + buf := bytes.NewBuffer(nil) + err := json.NewEncoder(buf).Encode(tt.args.body) + if err != nil { + t.Error(err) + return + } + body = buf + } + req, _ := http.NewRequest("PUT", "/api/v1/namespaces/default/pods/foo", body) + err := modifyRequest(req, tt.args.cluster) + if err != nil { + t.Error(err) + return + } + + var get runtime.Object + if req.ContentLength != 0 { + data, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Error(err) + return + } + + if int64(len(data)) != req.ContentLength { + t.Errorf("expect contentLength %v, but got %v", len(data), req.ContentLength) + return + } + + get, _, err = unstructured.UnstructuredJSONScheme.Decode(data, nil, nil) + if err != nil { + t.Error(err) + return + } + } + + if !reflect.DeepEqual(tt.want.body, get) { + t.Errorf("get body diff: %v", cmp.Diff(tt.want.body, get)) + } + }) + } +} diff --git a/pkg/search/proxy/store/multi_cluster_cache.go b/pkg/search/proxy/store/multi_cluster_cache.go index 3291adabb..dfc02ffd9 100644 --- a/pkg/search/proxy/store/multi_cluster_cache.go +++ b/pkg/search/proxy/store/multi_cluster_cache.go @@ -155,6 +155,9 @@ func (c *MultiClusterCache) Get(ctx context.Context, gvr schema.GroupVersionReso mrv := newMultiClusterResourceVersionWithCapacity(1) mrv.set(clusterName, accessor.GetResourceVersion()) accessor.SetResourceVersion(mrv.String()) + + addCacheSourceAnnotation(cloneObj, clusterName) + return cloneObj, err } @@ -205,6 +208,12 @@ func (c *MultiClusterCache) List(ctx context.Context, gvr schema.GroupVersionRes if err != nil { return 0, "", err } + + for i := range extractList { + // Add annotation will modify the object in cache, but seems not bad. So we don't deep copy it here. + addCacheSourceAnnotation(extractList[i], cluster) + } + items = append(items, extractList...) responseResourceVersion.set(cluster, list.GetResourceVersion()) return len(extractList), list.GetContinue(), nil @@ -303,6 +312,7 @@ func (c *MultiClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionRe mux.AddSource(w, func(e watch.Event) { // We can safely modify data because it is deepCopied in cacheWatcher.convertToWatchEvent setObjectResourceVersionFunc(cluster, e.Object) + addCacheSourceAnnotation(e.Object, cluster) }) } mux.Start() diff --git a/pkg/search/proxy/store/multi_cluster_cache_test.go b/pkg/search/proxy/store/multi_cluster_cache_test.go index ca7ce02b2..40c711530 100644 --- a/pkg/search/proxy/store/multi_cluster_cache_test.go +++ b/pkg/search/proxy/store/multi_cluster_cache_test.go @@ -298,7 +298,7 @@ func TestMultiClusterCache_Get(t *testing.T) { options: &metav1.GetOptions{}, }, want: want{ - object: newUnstructuredObject(podGVK, "pod11", withDefaultNamespace(), withResourceVersion(buildMultiClusterRV("cluster1", "1000"))), + object: newUnstructuredObject(podGVK, "pod11", withDefaultNamespace(), withResourceVersion(buildMultiClusterRV("cluster1", "1000")), withCacheSourceAnnotation("cluster1")), errAssert: noError, }, }, @@ -311,7 +311,7 @@ func TestMultiClusterCache_Get(t *testing.T) { options: &metav1.GetOptions{}, }, want: want{ - object: newUnstructuredObject(podGVK, "pod21", withDefaultNamespace(), withResourceVersion(buildMultiClusterRV("cluster2", "2000"))), + object: newUnstructuredObject(podGVK, "pod21", withDefaultNamespace(), withResourceVersion(buildMultiClusterRV("cluster2", "2000")), withCacheSourceAnnotation("cluster2")), errAssert: noError, }, }, @@ -350,7 +350,7 @@ func TestMultiClusterCache_Get(t *testing.T) { options: &metav1.GetOptions{}, }, want: want{ - object: newUnstructuredObject(nodeGVK, "node11", withResourceVersion(buildMultiClusterRV("cluster1", "1000"))), + object: newUnstructuredObject(nodeGVK, "node11", withResourceVersion(buildMultiClusterRV("cluster1", "1000")), withCacheSourceAnnotation("cluster1")), errAssert: noError, }, }, @@ -494,6 +494,59 @@ func TestMultiClusterCache_List(t *testing.T) { } } +func TestMultiClusterCache_List_CachSourceAnnotation(t *testing.T) { + cluster1 := newCluster("cluster1") + cluster2 := newCluster("cluster2") + cluster1Client := fakedynamic.NewSimpleDynamicClient(scheme, + newUnstructuredObject(podGVK, "pod11"), + newUnstructuredObject(podGVK, "pod12"), + ) + cluster2Client := fakedynamic.NewSimpleDynamicClient(scheme, + newUnstructuredObject(podGVK, "pod21"), + newUnstructuredObject(podGVK, "pod22"), + ) + + newClientFunc := func(cluster string) (dynamic.Interface, error) { + switch cluster { + case cluster1.Name: + return cluster1Client, nil + case cluster2.Name: + return cluster2Client, nil + } + return fakedynamic.NewSimpleDynamicClient(scheme), nil + } + cache := NewMultiClusterCache(newClientFunc, restMapper) + err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]struct{}{ + cluster1.Name: resourceSet(podGVR), + cluster2.Name: resourceSet(podGVR), + }) + if err != nil { + t.Error(err) + return + } + + list, err := cache.List(context.TODO(), podGVR, &metainternalversion.ListOptions{}) + if err != nil { + t.Errorf("List error: %v", err) + return + } + items, err := meta.ExtractList(list) + if err != nil { + t.Errorf("ExtractList error: %v", err) + return + } + + expect := []runtime.Object{ + newUnstructuredObject(podGVK, "pod11", withCacheSourceAnnotation("cluster1")), + newUnstructuredObject(podGVK, "pod12", withCacheSourceAnnotation("cluster1")), + newUnstructuredObject(podGVK, "pod21", withCacheSourceAnnotation("cluster2")), + newUnstructuredObject(podGVK, "pod22", withCacheSourceAnnotation("cluster2")), + } + if !reflect.DeepEqual(items, expect) { + t.Errorf("list items diff: %v", cmp.Diff(expect, items)) + } +} + func newCluster(name string) *clusterv1alpha1.Cluster { o := &clusterv1alpha1.Cluster{} o.Name = name @@ -522,6 +575,12 @@ func withResourceVersion(rv string) func(*unstructured.Unstructured) { } } +func withCacheSourceAnnotation(cluster string) func(*unstructured.Unstructured) { + return func(obj *unstructured.Unstructured) { + addCacheSourceAnnotation(obj, cluster) + } +} + func withLabel(label, value string) func(*unstructured.Unstructured) { return func(obj *unstructured.Unstructured) { err := unstructured.SetNestedField(obj.Object, value, "metadata", "labels", label) diff --git a/pkg/search/proxy/store/util.go b/pkg/search/proxy/store/util.go index 39d1cc7e5..2aeaa9e17 100644 --- a/pkg/search/proxy/store/util.go +++ b/pkg/search/proxy/store/util.go @@ -5,7 +5,11 @@ import ( "encoding/json" "sync" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" ) type multiClusterResourceVersion struct { @@ -170,3 +174,75 @@ func (w *watchMux) startWatchSource(source watch.Interface, decorator func(watch } } } + +func addCacheSourceAnnotation(obj runtime.Object, clusterName string) { + accessor, err := meta.Accessor(obj) + if err != nil { + // Object has no meta, do nothing + return + } + annotations := accessor.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[clusterv1alpha1.CacheSourceAnnotationKey] = clusterName + accessor.SetAnnotations(annotations) +} + +// RemoveCacheSourceAnnotation delete CacheSourceAnnotationKey annotation in object. If obj is updated, return true. +func RemoveCacheSourceAnnotation(obj runtime.Object) bool { + accessor, err := meta.Accessor(obj) + if err != nil { + // Object has no meta, do nothing + return false + } + + annotations := accessor.GetAnnotations() + _, exist := annotations[clusterv1alpha1.CacheSourceAnnotationKey] + if exist { + delete(annotations, clusterv1alpha1.CacheSourceAnnotationKey) + accessor.SetAnnotations(annotations) + return true + } + return false +} + +// RecoverClusterResourceVersion convert global resource version to single cluster resource version. If obj is updated, return true. +func RecoverClusterResourceVersion(obj runtime.Object, cluster string) bool { + accessor, err := meta.Accessor(obj) + if err != nil { + // Object has no meta, do nothing + return false + } + + rv := accessor.GetResourceVersion() + if rv == "" || rv == "0" { + return false + } + + decoded, err := base64.RawURLEncoding.DecodeString(rv) + if err != nil { + // it's not global rv, do nothing + return false + } + + m := make(map[string]string) + err = json.Unmarshal(decoded, &m) + if err != nil { + // it's not global rv, do nothing + return false + } + + crv := m[cluster] + accessor.SetResourceVersion(crv) + return true +} + +// BuildMultiClusterResourceVersion build multi cluster resource version. +func BuildMultiClusterResourceVersion(clusterResourceMap map[string]string) string { + m := newMultiClusterResourceVersionWithCapacity(len(clusterResourceMap)) + for cluster, rv := range clusterResourceMap { + m.set(cluster, rv) + } + return m.String() +} diff --git a/test/e2e/search_test.go b/test/e2e/search_test.go index 7480d4e75..d3080b6d0 100644 --- a/test/e2e/search_test.go +++ b/test/e2e/search_test.go @@ -18,6 +18,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" @@ -345,9 +346,9 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere ginkgo.Describe("karmada proxy testing", ginkgo.Ordered, func() { var ( - m1Client, m2Client, proxyClient kubernetes.Interface - m1Dynamic, m2Dynamic, proxyDynamic dynamic.Interface - nodeGVR = corev1.SchemeGroupVersion.WithResource("nodes") + m1Client, m2Client, proxyClient kubernetes.Interface + m1Dynamic, m2Dynamic dynamic.Interface + nodeGVR = corev1.SchemeGroupVersion.WithResource("nodes") ) ginkgo.BeforeAll(func() { @@ -364,7 +365,6 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere proxyConfig := *restConfig proxyConfig.Host += "/apis/search.karmada.io/v1alpha1/proxying/karmada/proxy" proxyClient = kubernetes.NewForConfigOrDie(&proxyConfig) - proxyDynamic = dynamic.NewForConfigOrDie(&proxyConfig) }) ginkgo.Describe("resourceRegistry testings", func() { @@ -517,6 +517,12 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere ginkgo.Context("caching nodes", func() { var ( rr *searchv1alpha1.ResourceRegistry + + deleteAnnotationAfterTest = func(c kubernetes.Interface, name string, anno string) { + data := []byte(`{"metadata": {"annotations": {"` + anno + `":null}}}`) + _, err := c.CoreV1().Nodes().Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) + klog.Warningf("Clean node %v's annotation %v failed: %v", name, anno, err) + } ) ginkgo.BeforeAll(func() { @@ -543,6 +549,14 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere framework.RemoveResourceRegistry(karmadaClient, rr.Name) }) + ginkgo.It("could get node", func() { + testObject := framework.GetAnyResourceOrFail(m1Dynamic.Resource(nodeGVR)) + + get, err := proxyClient.CoreV1().Nodes().Get(context.TODO(), testObject.GetName(), metav1.GetOptions{}) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(get.Annotations[clusterv1alpha1.CacheSourceAnnotationKey]).Should(gomega.Equal(member1)) + }) + ginkgo.It("could list nodes", func() { fromM1 := framework.GetResourceNames(m1Dynamic.Resource(nodeGVR)) ginkgo.By("list nodes from member1: " + strings.Join(fromM1.List(), ",")) @@ -550,10 +564,32 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere ginkgo.By("list nodes from member2: " + strings.Join(fromM2.List(), ",")) fromMembers := sets.NewString().Union(fromM1).Union(fromM2) + var proxyList *corev1.NodeList gomega.Eventually(func(g gomega.Gomega) { - fromProxy := framework.GetResourceNames(proxyDynamic.Resource(nodeGVR)) + var err error + proxyList, err = proxyClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + fromProxy := sets.NewString() + for _, item := range proxyList.Items { + fromProxy.Insert(item.Name) + } g.Expect(fromProxy).Should(gomega.Equal(fromMembers)) }, time.Second*10).Should(gomega.Succeed()) + + // assert cache source annotation + groupM1, groupM2 := sets.NewString(), sets.NewString() + for _, item := range proxyList.Items { + cluster := item.Annotations[clusterv1alpha1.CacheSourceAnnotationKey] + switch cluster { + case member1: + groupM1.Insert(item.Name) + case member2: + groupM2.Insert(item.Name) + } + } + gomega.Expect(groupM1).Should(gomega.Equal(fromM1)) + gomega.Expect(groupM2).Should(gomega.Equal(fromM2)) }) ginkgo.It("could chunk list nodes", func() { @@ -598,15 +634,22 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere data := []byte(`{"metadata": {"annotations": {"` + anno + `": "true"}}}`) _, err = m1Client.CoreV1().Nodes().Patch(context.TODO(), testNode.GetName(), types.StrategicMergePatchType, data, metav1.PatchOptions{}) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + defer func() { + deleteAnnotationAfterTest(m1Client, testNode.GetName(), anno) + }() + var get *corev1.Node gomega.Eventually(func() bool { + var ok bool event := <-watcher.ResultChan() - o, ok := event.Object.(*corev1.Node) + get, ok = event.Object.(*corev1.Node) if !ok { return false } - return o.UID == testNode.GetUID() && metav1.HasAnnotation(o.ObjectMeta, anno) + return get.UID == testNode.GetUID() && metav1.HasAnnotation(get.ObjectMeta, anno) }, time.Second*10, 0).Should(gomega.BeTrue()) + + gomega.Expect(get.Annotations[clusterv1alpha1.CacheSourceAnnotationKey]).Should(gomega.Equal(member1)) }) ginkgo.It("could path nodes", func() { @@ -616,11 +659,37 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere data := []byte(`{"metadata": {"annotations": {"` + anno + `": "true"}}}`) _, err := proxyClient.CoreV1().Nodes().Patch(context.TODO(), testObject.GetName(), types.StrategicMergePatchType, data, metav1.PatchOptions{}) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + defer func() { + deleteAnnotationAfterTest(m1Client, testObject.GetName(), anno) + }() testPod, err := m1Client.CoreV1().Nodes().Get(context.TODO(), testObject.GetName(), metav1.GetOptions{}) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) gomega.Expect(testPod.Annotations).Should(gomega.HaveKey(anno)) }) + + ginkgo.It("could update node", func() { + testObject := framework.GetAnyResourceOrFail(m2Dynamic.Resource(nodeGVR)) + anno := "proxy-ann-" + rand.String(RandomStrLength) + + ginkgo.By("update node " + testObject.GetName()) + gomega.Eventually(func(g gomega.Gomega) { + node, err := proxyClient.CoreV1().Nodes().Get(context.TODO(), testObject.GetName(), metav1.GetOptions{}) + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + + metav1.SetMetaDataAnnotation(&node.ObjectMeta, anno, "true") + _, err = proxyClient.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}) + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + }, time.Second*10, 0).Should(gomega.Succeed()) + defer func() { + deleteAnnotationAfterTest(m2Client, testObject.GetName(), anno) + }() + + node, err := m2Client.CoreV1().Nodes().Get(context.TODO(), testObject.GetName(), metav1.GetOptions{}) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(metav1.HasAnnotation(node.ObjectMeta, anno)).Should(gomega.BeTrue()) + gomega.Expect(metav1.HasAnnotation(node.ObjectMeta, clusterv1alpha1.CacheSourceAnnotationKey)).Should(gomega.BeFalse()) + }) }) }) })