add cache source annotation for objects returned by proxy

Signed-off-by: yingjinhui <yingjinhui@didiglobal.com>
This commit is contained in:
yingjinhui 2022-09-01 14:04:46 +08:00
parent 3a5f8cd001
commit a9fcfa2ecd
6 changed files with 428 additions and 11 deletions

View File

@ -1,13 +1,17 @@
package proxy
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"path"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
@ -42,7 +46,25 @@ func (c *clusterProxy) connect(ctx context.Context, requestInfo *request.Request
if err != nil {
return nil, err
}
return c.connectCluster(ctx, clusterName, proxyPath, responder)
h, err := c.connectCluster(ctx, clusterName, proxyPath, responder)
if err != nil {
return nil, err
}
if requestInfo.Verb != "update" {
return h, nil
}
// Objects get by client via proxy are edited some fields, different from objets in member clusters.
// So before update, we shall recover these fields.
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if err = modifyRequest(req, clusterName); err != nil {
responder.Error(err)
return
}
h.ServeHTTP(rw, req)
}), nil
}
func (c *clusterProxy) connectCluster(ctx context.Context, clusterName string, proxyPath string, responder rest.Responder) (http.Handler, error) {
@ -64,3 +86,40 @@ func (c *clusterProxy) connectCluster(ctx context.Context, clusterName string, p
}
return proxy.ConnectCluster(ctx, cluster.Name, location, transport, responder, secretGetter)
}
func modifyRequest(req *http.Request, cluster string) error {
if req.ContentLength == 0 {
return nil
}
body := bytes.NewBuffer(make([]byte, 0, req.ContentLength))
_, err := io.Copy(body, req.Body)
if err != nil {
return err
}
_ = req.Body.Close()
defer func() {
req.Body = ioutil.NopCloser(body)
req.ContentLength = int64(body.Len())
}()
obj := &unstructured.Unstructured{}
_, _, err = unstructured.UnstructuredJSONScheme.Decode(body.Bytes(), nil, obj)
if err != nil {
// ignore error
return nil
}
changed := false
changed = store.RemoveCacheSourceAnnotation(obj) || changed
changed = store.RecoverClusterResourceVersion(obj, cluster) || changed
if changed {
// write changed object into body
body.Reset()
return unstructured.UnstructuredJSONScheme.Encode(obj, body)
}
return nil
}

View File

@ -0,0 +1,144 @@
package proxy
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"reflect"
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
)
func TestModifyRequest(t *testing.T) {
newObjectFunc := func(annotations map[string]string, resourceVersion string) *unstructured.Unstructured {
obj := &unstructured.Unstructured{}
obj.SetAPIVersion("v1")
obj.SetKind("Pod")
obj.SetAnnotations(annotations)
obj.SetResourceVersion(resourceVersion)
return obj
}
type args struct {
body interface{}
cluster string
}
type want struct {
body interface{}
}
tests := []struct {
name string
args args
want want
}{
{
name: "Empty body",
args: args{
body: nil,
},
want: want{
body: nil,
},
},
{
name: "Body with nil annotations",
args: args{
body: newObjectFunc(nil, ""),
},
want: want{
body: newObjectFunc(nil, ""),
},
},
{
name: "Body with empty annotations",
args: args{
body: newObjectFunc(map[string]string{}, ""),
},
want: want{
body: newObjectFunc(map[string]string{}, ""),
},
},
{
name: "Body with cache source annotation",
args: args{
body: newObjectFunc(map[string]string{clusterv1alpha1.CacheSourceAnnotationKey: "bar"}, ""),
},
want: want{
body: newObjectFunc(map[string]string{}, ""),
},
},
{
name: "Body with single cluster resource version",
args: args{
body: newObjectFunc(nil, "1234"),
cluster: "cluster1",
},
want: want{
body: newObjectFunc(nil, "1234"),
},
},
{
name: "Body with multi cluster resource version",
args: args{
body: newObjectFunc(nil, store.BuildMultiClusterResourceVersion(map[string]string{"cluster1": "1234", "cluster2": "5678"})),
cluster: "cluster1",
},
want: want{
body: newObjectFunc(nil, "1234"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var body io.Reader
if tt.args.body != nil {
buf := bytes.NewBuffer(nil)
err := json.NewEncoder(buf).Encode(tt.args.body)
if err != nil {
t.Error(err)
return
}
body = buf
}
req, _ := http.NewRequest("PUT", "/api/v1/namespaces/default/pods/foo", body)
err := modifyRequest(req, tt.args.cluster)
if err != nil {
t.Error(err)
return
}
var get runtime.Object
if req.ContentLength != 0 {
data, err := ioutil.ReadAll(req.Body)
if err != nil {
t.Error(err)
return
}
if int64(len(data)) != req.ContentLength {
t.Errorf("expect contentLength %v, but got %v", len(data), req.ContentLength)
return
}
get, _, err = unstructured.UnstructuredJSONScheme.Decode(data, nil, nil)
if err != nil {
t.Error(err)
return
}
}
if !reflect.DeepEqual(tt.want.body, get) {
t.Errorf("get body diff: %v", cmp.Diff(tt.want.body, get))
}
})
}
}

View File

@ -155,6 +155,9 @@ func (c *MultiClusterCache) Get(ctx context.Context, gvr schema.GroupVersionReso
mrv := newMultiClusterResourceVersionWithCapacity(1)
mrv.set(clusterName, accessor.GetResourceVersion())
accessor.SetResourceVersion(mrv.String())
addCacheSourceAnnotation(cloneObj, clusterName)
return cloneObj, err
}
@ -205,6 +208,12 @@ func (c *MultiClusterCache) List(ctx context.Context, gvr schema.GroupVersionRes
if err != nil {
return 0, "", err
}
for i := range extractList {
// Add annotation will modify the object in cache, but seems not bad. So we don't deep copy it here.
addCacheSourceAnnotation(extractList[i], cluster)
}
items = append(items, extractList...)
responseResourceVersion.set(cluster, list.GetResourceVersion())
return len(extractList), list.GetContinue(), nil
@ -303,6 +312,7 @@ func (c *MultiClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionRe
mux.AddSource(w, func(e watch.Event) {
// We can safely modify data because it is deepCopied in cacheWatcher.convertToWatchEvent
setObjectResourceVersionFunc(cluster, e.Object)
addCacheSourceAnnotation(e.Object, cluster)
})
}
mux.Start()

View File

@ -298,7 +298,7 @@ func TestMultiClusterCache_Get(t *testing.T) {
options: &metav1.GetOptions{},
},
want: want{
object: newUnstructuredObject(podGVK, "pod11", withDefaultNamespace(), withResourceVersion(buildMultiClusterRV("cluster1", "1000"))),
object: newUnstructuredObject(podGVK, "pod11", withDefaultNamespace(), withResourceVersion(buildMultiClusterRV("cluster1", "1000")), withCacheSourceAnnotation("cluster1")),
errAssert: noError,
},
},
@ -311,7 +311,7 @@ func TestMultiClusterCache_Get(t *testing.T) {
options: &metav1.GetOptions{},
},
want: want{
object: newUnstructuredObject(podGVK, "pod21", withDefaultNamespace(), withResourceVersion(buildMultiClusterRV("cluster2", "2000"))),
object: newUnstructuredObject(podGVK, "pod21", withDefaultNamespace(), withResourceVersion(buildMultiClusterRV("cluster2", "2000")), withCacheSourceAnnotation("cluster2")),
errAssert: noError,
},
},
@ -350,7 +350,7 @@ func TestMultiClusterCache_Get(t *testing.T) {
options: &metav1.GetOptions{},
},
want: want{
object: newUnstructuredObject(nodeGVK, "node11", withResourceVersion(buildMultiClusterRV("cluster1", "1000"))),
object: newUnstructuredObject(nodeGVK, "node11", withResourceVersion(buildMultiClusterRV("cluster1", "1000")), withCacheSourceAnnotation("cluster1")),
errAssert: noError,
},
},
@ -494,6 +494,59 @@ func TestMultiClusterCache_List(t *testing.T) {
}
}
func TestMultiClusterCache_List_CachSourceAnnotation(t *testing.T) {
cluster1 := newCluster("cluster1")
cluster2 := newCluster("cluster2")
cluster1Client := fakedynamic.NewSimpleDynamicClient(scheme,
newUnstructuredObject(podGVK, "pod11"),
newUnstructuredObject(podGVK, "pod12"),
)
cluster2Client := fakedynamic.NewSimpleDynamicClient(scheme,
newUnstructuredObject(podGVK, "pod21"),
newUnstructuredObject(podGVK, "pod22"),
)
newClientFunc := func(cluster string) (dynamic.Interface, error) {
switch cluster {
case cluster1.Name:
return cluster1Client, nil
case cluster2.Name:
return cluster2Client, nil
}
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]struct{}{
cluster1.Name: resourceSet(podGVR),
cluster2.Name: resourceSet(podGVR),
})
if err != nil {
t.Error(err)
return
}
list, err := cache.List(context.TODO(), podGVR, &metainternalversion.ListOptions{})
if err != nil {
t.Errorf("List error: %v", err)
return
}
items, err := meta.ExtractList(list)
if err != nil {
t.Errorf("ExtractList error: %v", err)
return
}
expect := []runtime.Object{
newUnstructuredObject(podGVK, "pod11", withCacheSourceAnnotation("cluster1")),
newUnstructuredObject(podGVK, "pod12", withCacheSourceAnnotation("cluster1")),
newUnstructuredObject(podGVK, "pod21", withCacheSourceAnnotation("cluster2")),
newUnstructuredObject(podGVK, "pod22", withCacheSourceAnnotation("cluster2")),
}
if !reflect.DeepEqual(items, expect) {
t.Errorf("list items diff: %v", cmp.Diff(expect, items))
}
}
func newCluster(name string) *clusterv1alpha1.Cluster {
o := &clusterv1alpha1.Cluster{}
o.Name = name
@ -522,6 +575,12 @@ func withResourceVersion(rv string) func(*unstructured.Unstructured) {
}
}
func withCacheSourceAnnotation(cluster string) func(*unstructured.Unstructured) {
return func(obj *unstructured.Unstructured) {
addCacheSourceAnnotation(obj, cluster)
}
}
func withLabel(label, value string) func(*unstructured.Unstructured) {
return func(obj *unstructured.Unstructured) {
err := unstructured.SetNestedField(obj.Object, value, "metadata", "labels", label)

View File

@ -5,7 +5,11 @@ import (
"encoding/json"
"sync"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
)
type multiClusterResourceVersion struct {
@ -170,3 +174,75 @@ func (w *watchMux) startWatchSource(source watch.Interface, decorator func(watch
}
}
}
func addCacheSourceAnnotation(obj runtime.Object, clusterName string) {
accessor, err := meta.Accessor(obj)
if err != nil {
// Object has no meta, do nothing
return
}
annotations := accessor.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[clusterv1alpha1.CacheSourceAnnotationKey] = clusterName
accessor.SetAnnotations(annotations)
}
// RemoveCacheSourceAnnotation delete CacheSourceAnnotationKey annotation in object. If obj is updated, return true.
func RemoveCacheSourceAnnotation(obj runtime.Object) bool {
accessor, err := meta.Accessor(obj)
if err != nil {
// Object has no meta, do nothing
return false
}
annotations := accessor.GetAnnotations()
_, exist := annotations[clusterv1alpha1.CacheSourceAnnotationKey]
if exist {
delete(annotations, clusterv1alpha1.CacheSourceAnnotationKey)
accessor.SetAnnotations(annotations)
return true
}
return false
}
// RecoverClusterResourceVersion convert global resource version to single cluster resource version. If obj is updated, return true.
func RecoverClusterResourceVersion(obj runtime.Object, cluster string) bool {
accessor, err := meta.Accessor(obj)
if err != nil {
// Object has no meta, do nothing
return false
}
rv := accessor.GetResourceVersion()
if rv == "" || rv == "0" {
return false
}
decoded, err := base64.RawURLEncoding.DecodeString(rv)
if err != nil {
// it's not global rv, do nothing
return false
}
m := make(map[string]string)
err = json.Unmarshal(decoded, &m)
if err != nil {
// it's not global rv, do nothing
return false
}
crv := m[cluster]
accessor.SetResourceVersion(crv)
return true
}
// BuildMultiClusterResourceVersion build multi cluster resource version.
func BuildMultiClusterResourceVersion(clusterResourceMap map[string]string) string {
m := newMultiClusterResourceVersionWithCapacity(len(clusterResourceMap))
for cluster, rv := range clusterResourceMap {
m.set(cluster, rv)
}
return m.String()
}

View File

@ -18,6 +18,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
@ -345,9 +346,9 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere
ginkgo.Describe("karmada proxy testing", ginkgo.Ordered, func() {
var (
m1Client, m2Client, proxyClient kubernetes.Interface
m1Dynamic, m2Dynamic, proxyDynamic dynamic.Interface
nodeGVR = corev1.SchemeGroupVersion.WithResource("nodes")
m1Client, m2Client, proxyClient kubernetes.Interface
m1Dynamic, m2Dynamic dynamic.Interface
nodeGVR = corev1.SchemeGroupVersion.WithResource("nodes")
)
ginkgo.BeforeAll(func() {
@ -364,7 +365,6 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere
proxyConfig := *restConfig
proxyConfig.Host += "/apis/search.karmada.io/v1alpha1/proxying/karmada/proxy"
proxyClient = kubernetes.NewForConfigOrDie(&proxyConfig)
proxyDynamic = dynamic.NewForConfigOrDie(&proxyConfig)
})
ginkgo.Describe("resourceRegistry testings", func() {
@ -517,6 +517,12 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere
ginkgo.Context("caching nodes", func() {
var (
rr *searchv1alpha1.ResourceRegistry
deleteAnnotationAfterTest = func(c kubernetes.Interface, name string, anno string) {
data := []byte(`{"metadata": {"annotations": {"` + anno + `":null}}}`)
_, err := c.CoreV1().Nodes().Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
klog.Warningf("Clean node %v's annotation %v failed: %v", name, anno, err)
}
)
ginkgo.BeforeAll(func() {
@ -543,6 +549,14 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere
framework.RemoveResourceRegistry(karmadaClient, rr.Name)
})
ginkgo.It("could get node", func() {
testObject := framework.GetAnyResourceOrFail(m1Dynamic.Resource(nodeGVR))
get, err := proxyClient.CoreV1().Nodes().Get(context.TODO(), testObject.GetName(), metav1.GetOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(get.Annotations[clusterv1alpha1.CacheSourceAnnotationKey]).Should(gomega.Equal(member1))
})
ginkgo.It("could list nodes", func() {
fromM1 := framework.GetResourceNames(m1Dynamic.Resource(nodeGVR))
ginkgo.By("list nodes from member1: " + strings.Join(fromM1.List(), ","))
@ -550,10 +564,32 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere
ginkgo.By("list nodes from member2: " + strings.Join(fromM2.List(), ","))
fromMembers := sets.NewString().Union(fromM1).Union(fromM2)
var proxyList *corev1.NodeList
gomega.Eventually(func(g gomega.Gomega) {
fromProxy := framework.GetResourceNames(proxyDynamic.Resource(nodeGVR))
var err error
proxyList, err = proxyClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
fromProxy := sets.NewString()
for _, item := range proxyList.Items {
fromProxy.Insert(item.Name)
}
g.Expect(fromProxy).Should(gomega.Equal(fromMembers))
}, time.Second*10).Should(gomega.Succeed())
// assert cache source annotation
groupM1, groupM2 := sets.NewString(), sets.NewString()
for _, item := range proxyList.Items {
cluster := item.Annotations[clusterv1alpha1.CacheSourceAnnotationKey]
switch cluster {
case member1:
groupM1.Insert(item.Name)
case member2:
groupM2.Insert(item.Name)
}
}
gomega.Expect(groupM1).Should(gomega.Equal(fromM1))
gomega.Expect(groupM2).Should(gomega.Equal(fromM2))
})
ginkgo.It("could chunk list nodes", func() {
@ -598,15 +634,22 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere
data := []byte(`{"metadata": {"annotations": {"` + anno + `": "true"}}}`)
_, err = m1Client.CoreV1().Nodes().Patch(context.TODO(), testNode.GetName(), types.StrategicMergePatchType, data, metav1.PatchOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
defer func() {
deleteAnnotationAfterTest(m1Client, testNode.GetName(), anno)
}()
var get *corev1.Node
gomega.Eventually(func() bool {
var ok bool
event := <-watcher.ResultChan()
o, ok := event.Object.(*corev1.Node)
get, ok = event.Object.(*corev1.Node)
if !ok {
return false
}
return o.UID == testNode.GetUID() && metav1.HasAnnotation(o.ObjectMeta, anno)
return get.UID == testNode.GetUID() && metav1.HasAnnotation(get.ObjectMeta, anno)
}, time.Second*10, 0).Should(gomega.BeTrue())
gomega.Expect(get.Annotations[clusterv1alpha1.CacheSourceAnnotationKey]).Should(gomega.Equal(member1))
})
ginkgo.It("could path nodes", func() {
@ -616,11 +659,37 @@ var _ = ginkgo.Describe("[karmada-search] karmada search testing", ginkgo.Ordere
data := []byte(`{"metadata": {"annotations": {"` + anno + `": "true"}}}`)
_, err := proxyClient.CoreV1().Nodes().Patch(context.TODO(), testObject.GetName(), types.StrategicMergePatchType, data, metav1.PatchOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
defer func() {
deleteAnnotationAfterTest(m1Client, testObject.GetName(), anno)
}()
testPod, err := m1Client.CoreV1().Nodes().Get(context.TODO(), testObject.GetName(), metav1.GetOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(testPod.Annotations).Should(gomega.HaveKey(anno))
})
ginkgo.It("could update node", func() {
testObject := framework.GetAnyResourceOrFail(m2Dynamic.Resource(nodeGVR))
anno := "proxy-ann-" + rand.String(RandomStrLength)
ginkgo.By("update node " + testObject.GetName())
gomega.Eventually(func(g gomega.Gomega) {
node, err := proxyClient.CoreV1().Nodes().Get(context.TODO(), testObject.GetName(), metav1.GetOptions{})
g.Expect(err).ShouldNot(gomega.HaveOccurred())
metav1.SetMetaDataAnnotation(&node.ObjectMeta, anno, "true")
_, err = proxyClient.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
g.Expect(err).ShouldNot(gomega.HaveOccurred())
}, time.Second*10, 0).Should(gomega.Succeed())
defer func() {
deleteAnnotationAfterTest(m2Client, testObject.GetName(), anno)
}()
node, err := m2Client.CoreV1().Nodes().Get(context.TODO(), testObject.GetName(), metav1.GetOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(metav1.HasAnnotation(node.ObjectMeta, anno)).Should(gomega.BeTrue())
gomega.Expect(metav1.HasAnnotation(node.ObjectMeta, clusterv1alpha1.CacheSourceAnnotationKey)).Should(gomega.BeFalse())
})
})
})
})