/* Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package etcd3 import ( "bytes" "encoding/base64" "encoding/json" "errors" "fmt" "reflect" "strconv" "sync" "testing" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" "github.com/coreos/pkg/capnslog" "golang.org/x/net/context" apierrors "k8s.io/apimachinery/pkg/api/errors" apitesting "k8s.io/apimachinery/pkg/api/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" storagetests "k8s.io/apiserver/pkg/storage/tests" "k8s.io/apiserver/pkg/storage/value" ) var scheme = runtime.NewScheme() var codecs = serializer.NewCodecFactory(scheme) const defaultTestPrefix = "test!" func init() { metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) example.AddToScheme(scheme) examplev1.AddToScheme(scheme) capnslog.SetGlobalLogLevel(capnslog.CRITICAL) } // prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. type prefixTransformer struct { prefix []byte stale bool err error } func (p prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) { if ctx == nil { panic("no context provided") } if !bytes.HasPrefix(b, p.prefix) { return nil, false, fmt.Errorf("value does not have expected prefix: %s", string(b)) } return bytes.TrimPrefix(b, p.prefix), p.stale, p.err } func (p prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) { if ctx == nil { panic("no context provided") } if len(b) > 0 { return append(append([]byte{}, p.prefix...), b...), p.err } return b, p.err } func TestCreate(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) etcdClient := cluster.RandClient() key := "/testkey" out := &example.Pod{} obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}} // verify that kv pair is empty before set getResp, err := etcdClient.KV.Get(ctx, key) if err != nil { t.Fatalf("etcdClient.KV.Get failed: %v", err) } if len(getResp.Kvs) != 0 { t.Fatalf("expecting empty result on key: %s", key) } err = store.Create(ctx, key, obj, out, 0) if err != nil { t.Fatalf("Set failed: %v", err) } // basic tests of the output if obj.ObjectMeta.Name != out.ObjectMeta.Name { t.Errorf("pod name want=%s, get=%s", obj.ObjectMeta.Name, out.ObjectMeta.Name) } if out.ResourceVersion == "" { t.Errorf("output should have non-empty resource version") } if out.SelfLink != "" { t.Errorf("output should have empty self link") } checkStorageInvariants(ctx, t, etcdClient, store, key) } func checkStorageInvariants(ctx context.Context, t *testing.T, etcdClient *clientv3.Client, store *store, key string) { getResp, err := etcdClient.KV.Get(ctx, key) if err != nil { t.Fatalf("etcdClient.KV.Get failed: %v", err) } if len(getResp.Kvs) == 0 { t.Fatalf("expecting non empty result on key: %s", key) } decoded, err := runtime.Decode(store.codec, getResp.Kvs[0].Value[len(defaultTestPrefix):]) if err != nil { t.Fatalf("expecting successful decode of object from %v\n%v", err, string(getResp.Kvs[0].Value)) } obj := decoded.(*example.Pod) if obj.ResourceVersion != "" { t.Errorf("stored object should have empty resource version") } if obj.SelfLink != "" { t.Errorf("stored output should have empty self link") } } func TestCreateWithTTL(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} key := "/somekey" out := &example.Pod{} if err := store.Create(ctx, key, input, out, 1); err != nil { t.Fatalf("Create failed: %v", err) } w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything) if err != nil { t.Fatalf("Watch failed: %v", err) } testCheckEventType(t, watch.Deleted, w) } func TestCreateWithKeyExist(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} key, _ := testPropogateStore(ctx, t, store, obj) out := &example.Pod{} err := store.Create(ctx, key, obj, out, 0) if err == nil || !storage.IsNodeExist(err) { t.Errorf("expecting key exists error, but get: %s", err) } } func TestGet(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) tests := []struct { key string ignoreNotFound bool expectNotFoundErr bool expectedOut *example.Pod }{{ // test get on existing item key: key, ignoreNotFound: false, expectNotFoundErr: false, expectedOut: storedObj, }, { // test get on non-existing item with ignoreNotFound=false key: "/non-existing", ignoreNotFound: false, expectNotFoundErr: true, }, { // test get on non-existing item with ignoreNotFound=true key: "/non-existing", ignoreNotFound: true, expectNotFoundErr: false, expectedOut: &example.Pod{}, }} for i, tt := range tests { out := &example.Pod{} err := store.Get(ctx, tt.key, "", out, tt.ignoreNotFound) if tt.expectNotFoundErr { if err == nil || !storage.IsNotFound(err) { t.Errorf("#%d: expecting not found error, but get: %s", i, err) } continue } if err != nil { t.Fatalf("Get failed: %v", err) } if !reflect.DeepEqual(tt.expectedOut, out) { t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedOut, out) } } } func TestUnconditionalDelete(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) tests := []struct { key string expectedObj *example.Pod expectNotFoundErr bool }{{ // test unconditional delete on existing key key: key, expectedObj: storedObj, expectNotFoundErr: false, }, { // test unconditional delete on non-existing key key: "/non-existing", expectedObj: nil, expectNotFoundErr: true, }} for i, tt := range tests { out := &example.Pod{} // reset err := store.Delete(ctx, tt.key, out, nil) if tt.expectNotFoundErr { if err == nil || !storage.IsNotFound(err) { t.Errorf("#%d: expecting not found error, but get: %s", i, err) } continue } if err != nil { t.Fatalf("Delete failed: %v", err) } if !reflect.DeepEqual(tt.expectedObj, out) { t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedObj, out) } } } func TestConditionalDelete(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) tests := []struct { precondition *storage.Preconditions expectInvalidObjErr bool }{{ // test conditional delete with UID match precondition: storage.NewUIDPreconditions("A"), expectInvalidObjErr: false, }, { // test conditional delete with UID mismatch precondition: storage.NewUIDPreconditions("B"), expectInvalidObjErr: true, }} for i, tt := range tests { out := &example.Pod{} err := store.Delete(ctx, key, out, tt.precondition) if tt.expectInvalidObjErr { if err == nil || !storage.IsInvalidObj(err) { t.Errorf("#%d: expecting invalid UID error, but get: %s", i, err) } continue } if err != nil { t.Fatalf("Delete failed: %v", err) } if !reflect.DeepEqual(storedObj, out) { t.Errorf("#%d: pod want=%#v, get=%#v", i, storedObj, out) } key, storedObj = testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) } } func TestGetToList(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) tests := []struct { key string pred storage.SelectionPredicate expectedOut []*example.Pod }{{ // test GetToList on existing key key: key, pred: storage.Everything, expectedOut: []*example.Pod{storedObj}, }, { // test GetToList on non-existing key key: "/non-existing", pred: storage.Everything, expectedOut: nil, }, { // test GetToList with matching pod name key: "/non-existing", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name), GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, }, expectedOut: nil, }} for i, tt := range tests { out := &example.PodList{} err := store.GetToList(ctx, tt.key, "", tt.pred, out) if err != nil { t.Fatalf("GetToList failed: %v", err) } if len(out.Items) != len(tt.expectedOut) { t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) continue } for j, wantPod := range tt.expectedOut { getPod := &out.Items[j] if !reflect.DeepEqual(wantPod, getPod) { t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) } } } } func TestGuaranteedUpdate(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) etcdClient := cluster.RandClient() key := "/testkey" tests := []struct { key string ignoreNotFound bool precondition *storage.Preconditions expectNotFoundErr bool expectInvalidObjErr bool expectNoUpdate bool transformStale bool hasSelfLink bool }{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false key: "/non-existing", ignoreNotFound: false, precondition: nil, expectNotFoundErr: true, expectInvalidObjErr: false, expectNoUpdate: false, }, { // GuaranteedUpdate on non-existing key with ignoreNotFound=true key: "/non-existing", ignoreNotFound: true, precondition: nil, expectNotFoundErr: false, expectInvalidObjErr: false, expectNoUpdate: false, }, { // GuaranteedUpdate on existing key key: key, ignoreNotFound: false, precondition: nil, expectNotFoundErr: false, expectInvalidObjErr: false, expectNoUpdate: false, }, { // GuaranteedUpdate with same data key: key, ignoreNotFound: false, precondition: nil, expectNotFoundErr: false, expectInvalidObjErr: false, expectNoUpdate: true, }, { // GuaranteedUpdate with same data AND a self link key: key, ignoreNotFound: false, precondition: nil, expectNotFoundErr: false, expectInvalidObjErr: false, expectNoUpdate: true, hasSelfLink: true, }, { // GuaranteedUpdate with same data but stale key: key, ignoreNotFound: false, precondition: nil, expectNotFoundErr: false, expectInvalidObjErr: false, expectNoUpdate: false, transformStale: true, }, { // GuaranteedUpdate with UID match key: key, ignoreNotFound: false, precondition: storage.NewUIDPreconditions("A"), expectNotFoundErr: false, expectInvalidObjErr: false, expectNoUpdate: true, }, { // GuaranteedUpdate with UID mismatch key: key, ignoreNotFound: false, precondition: storage.NewUIDPreconditions("B"), expectNotFoundErr: false, expectInvalidObjErr: true, expectNoUpdate: true, }} for i, tt := range tests { key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) out := &example.Pod{} name := fmt.Sprintf("foo-%d", i) if tt.expectNoUpdate { name = storeObj.Name } originalTransformer := store.transformer.(prefixTransformer) if tt.transformStale { transformer := originalTransformer transformer.stale = true store.transformer = transformer } version := storeObj.ResourceVersion err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { if tt.expectNotFoundErr && tt.ignoreNotFound { if pod := obj.(*example.Pod); pod.Name != "" { t.Errorf("#%d: expecting zero value, but get=%#v", i, pod) } } pod := *storeObj if tt.hasSelfLink { pod.SelfLink = "testlink" } pod.Name = name return &pod, nil })) store.transformer = originalTransformer if tt.expectNotFoundErr { if err == nil || !storage.IsNotFound(err) { t.Errorf("#%d: expecting not found error, but get: %v", i, err) } continue } if tt.expectInvalidObjErr { if err == nil || !storage.IsInvalidObj(err) { t.Errorf("#%d: expecting invalid UID error, but get: %s", i, err) } continue } if err != nil { t.Fatalf("GuaranteedUpdate failed: %v", err) } if out.ObjectMeta.Name != name { t.Errorf("#%d: pod name want=%s, get=%s", i, name, out.ObjectMeta.Name) } if out.SelfLink != "" { t.Errorf("#%d: selflink should not be set", i) } // verify that kv pair is not empty after set and that the underlying data matches expectations checkStorageInvariants(ctx, t, etcdClient, store, key) switch tt.expectNoUpdate { case true: if version != out.ResourceVersion { t.Errorf("#%d: expect no version change, before=%s, after=%s", i, version, out.ResourceVersion) } case false: if version == out.ResourceVersion { t.Errorf("#%d: expect version change, but get the same version=%s", i, version) } } } } func TestGuaranteedUpdateWithTTL(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} key := "/somekey" out := &example.Pod{} err := store.GuaranteedUpdate(ctx, key, out, true, nil, func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { ttl := uint64(1) return input, &ttl, nil }) if err != nil { t.Fatalf("Create failed: %v", err) } w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything) if err != nil { t.Fatalf("Watch failed: %v", err) } testCheckEventType(t, watch.Deleted, w) } func TestGuaranteedUpdateChecksStoredData(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} key := "/somekey" // serialize input into etcd with data that would be normalized by a write - in this case, leading // and trailing whitespace codec := codecs.LegacyCodec(examplev1.SchemeGroupVersion) data, err := runtime.Encode(codec, input) if err != nil { t.Fatal(err) } resp, err := store.client.Put(ctx, key, "test! "+string(data)+" ") if err != nil { t.Fatal(err) } // this update should write the canonical value to etcd because the new serialization differs // from the stored serialization input.ResourceVersion = strconv.FormatInt(resp.Header.Revision, 10) out := &example.Pod{} err = store.GuaranteedUpdate(ctx, key, out, true, nil, func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { return input, nil, nil }, input) if err != nil { t.Fatalf("Update failed: %v", err) } if out.ResourceVersion == strconv.FormatInt(resp.Header.Revision, 10) { t.Errorf("guaranteed update should have updated the serialized data, got %#v", out) } } func TestGuaranteedUpdateWithConflict(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) key, _ := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) errChan := make(chan error, 1) var firstToFinish sync.WaitGroup var secondToEnter sync.WaitGroup firstToFinish.Add(1) secondToEnter.Add(1) go func() { err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { pod := obj.(*example.Pod) pod.Name = "foo-1" secondToEnter.Wait() return pod, nil })) firstToFinish.Done() errChan <- err }() updateCount := 0 err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { if updateCount == 0 { secondToEnter.Done() firstToFinish.Wait() } updateCount++ pod := obj.(*example.Pod) pod.Name = "foo-2" return pod, nil })) if err != nil { t.Fatalf("Second GuaranteedUpdate error %#v", err) } if err := <-errChan; err != nil { t.Fatalf("First GuaranteedUpdate error %#v", err) } if updateCount != 2 { t.Errorf("Should have conflict and called update func twice") } } func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) // First, update without a suggestion so originalPod is outdated updatedPod := &example.Pod{} err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { pod := obj.(*example.Pod) pod.Name = "foo-2" return pod, nil }), ) if err != nil { t.Fatalf("unexpected error: %v", err) } // Second, update using the outdated originalPod as the suggestion. Return a conflict error when // passed originalPod, and make sure that SimpleUpdate is called a second time after a live lookup // with the value of updatedPod. sawConflict := false updatedPod2 := &example.Pod{} err = store.GuaranteedUpdate(ctx, key, updatedPod2, false, nil, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { pod := obj.(*example.Pod) if pod.Name != "foo-2" { if sawConflict { t.Fatalf("unexpected second conflict") } sawConflict = true // simulated stale object - return a conflict return nil, apierrors.NewConflict(example.SchemeGroupVersion.WithResource("pods").GroupResource(), "name", errors.New("foo")) } pod.Name = "foo-3" return pod, nil }), originalPod, ) if err != nil { t.Fatalf("unexpected error: %v", err) } if updatedPod2.Name != "foo-3" { t.Errorf("unexpected pod name: %q", updatedPod2.Name) } } func TestTransformationFailure(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) store := newStore(cluster.RandClient(), false, false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() preset := []struct { key string obj *example.Pod storedObj *example.Pod }{{ key: "/one-level/test", obj: &example.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "bar"}, Spec: storagetests.DeepEqualSafePodSpec(), }, }, { key: "/two-level/1/test", obj: &example.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "baz"}, Spec: storagetests.DeepEqualSafePodSpec(), }, }} for i, ps := range preset[:1] { preset[i].storedObj = &example.Pod{} err := store.Create(ctx, ps.key, ps.obj, preset[:1][i].storedObj, 0) if err != nil { t.Fatalf("Set failed: %v", err) } } // create a second resource with an invalid prefix oldTransformer := store.transformer store.transformer = prefixTransformer{prefix: []byte("otherprefix!")} for i, ps := range preset[1:] { preset[1:][i].storedObj = &example.Pod{} err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0) if err != nil { t.Fatalf("Set failed: %v", err) } } store.transformer = oldTransformer // only the first item is returned, and no error var got example.PodList if err := store.List(ctx, "/", "", storage.Everything, &got); err != nil { t.Errorf("Unexpected error %v", err) } if e, a := []example.Pod{*preset[0].storedObj}, got.Items; !reflect.DeepEqual(e, a) { t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a)) } // Get should fail if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) { t.Errorf("Unexpected error: %v", err) } // GuaranteedUpdate without suggestion should return an error if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { return input, nil, nil }); !storage.IsInternalError(err) { t.Errorf("Unexpected error: %v", err) } // GuaranteedUpdate with suggestion should return an error if we don't change the object if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { return input, nil, nil }, preset[1].obj); err == nil { t.Errorf("Unexpected error: %v", err) } // Delete succeeds but reports an error because we cannot access the body if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil); !storage.IsInternalError(err) { t.Errorf("Unexpected error: %v", err) } if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsNotFound(err) { t.Errorf("Unexpected error: %v", err) } } func TestList(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) disablePagingStore := newStore(cluster.RandClient(), false, false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: // / // - one-level/ // | - test // | // - two-level/ // | - 1/ // | | - test // | | // | - 2/ // | - test // | // - z-level/ // - 3/ // | - test // | // - 3/ // - test-2 preset := []struct { key string obj *example.Pod storedObj *example.Pod }{ { key: "/one-level/test", obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, }, { key: "/two-level/1/test", obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, }, { key: "/two-level/2/test", obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, }, { key: "/z-level/3/test", obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "fourth"}}, }, { key: "/z-level/3/test-2", obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, }, } for i, ps := range preset { preset[i].storedObj = &example.Pod{} err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) if err != nil { t.Fatalf("Set failed: %v", err) } } list := &example.PodList{} store.List(ctx, "/two-level", "0", storage.Everything, list) continueRV, _ := strconv.Atoi(list.ResourceVersion) secondContinuation, err := encodeContinue("/two-level/2", "/two-level/", int64(continueRV)) if err != nil { t.Fatal(err) } getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil } tests := []struct { name string disablePaging bool rv string prefix string pred storage.SelectionPredicate expectedOut []*example.Pod expectContinue bool expectError bool }{ { name: "rejects invalid resource version", prefix: "/", pred: storage.Everything, rv: "abc", expectError: true, }, { name: "rejects resource version and continue token", prefix: "/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, Continue: secondContinuation, }, rv: "1", expectError: true, }, { name: "test List on existing key", prefix: "/one-level/", pred: storage.Everything, expectedOut: []*example.Pod{preset[0].storedObj}, }, { name: "test List on non-existing key", prefix: "/non-existing/", pred: storage.Everything, expectedOut: nil, }, { name: "test List with pod name matching", prefix: "/one-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.ParseSelectorOrDie("metadata.name!=foo"), }, expectedOut: nil, }, { name: "test List with limit", prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, }, expectedOut: []*example.Pod{preset[1].storedObj}, expectContinue: true, }, { name: "test List with limit when paging disabled", disablePaging: true, prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, }, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, expectContinue: false, }, { name: "test List with pregenerated continue token", prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, Continue: secondContinuation, }, expectedOut: []*example.Pod{preset[2].storedObj}, }, { name: "ignores resource version 0 for List with pregenerated continue token", prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, Continue: secondContinuation, }, rv: "0", expectedOut: []*example.Pod{preset[2].storedObj}, }, { name: "test List with multiple levels of directories and expect flattened result", prefix: "/two-level/", pred: storage.Everything, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, }, { name: "test List with filter returning only one item, ensure only a single page returned", prefix: "/", pred: storage.SelectionPredicate{ Field: fields.OneTermEqualSelector("metadata.name", "fourth"), Label: labels.Everything(), Limit: 1, }, expectedOut: []*example.Pod{preset[3].storedObj}, expectContinue: true, }, { name: "test List with filter returning only one item, covers the entire list", prefix: "/", pred: storage.SelectionPredicate{ Field: fields.OneTermEqualSelector("metadata.name", "fourth"), Label: labels.Everything(), Limit: 2, }, expectedOut: []*example.Pod{preset[3].storedObj}, expectContinue: false, }, { name: "test List with filter returning only one item, covers the entire list, with resource version 0", prefix: "/", pred: storage.SelectionPredicate{ Field: fields.OneTermEqualSelector("metadata.name", "fourth"), Label: labels.Everything(), Limit: 2, }, rv: "0", expectedOut: []*example.Pod{preset[3].storedObj}, expectContinue: false, }, { name: "test List with filter returning two items, more pages possible", prefix: "/", pred: storage.SelectionPredicate{ Field: fields.OneTermEqualSelector("metadata.name", "foo"), Label: labels.Everything(), Limit: 2, }, expectContinue: true, expectedOut: []*example.Pod{preset[0].storedObj, preset[1].storedObj}, }, { name: "filter returns two items split across multiple pages", prefix: "/", pred: storage.SelectionPredicate{ Field: fields.OneTermEqualSelector("metadata.name", "bar"), Label: labels.Everything(), Limit: 2, }, expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj}, }, { name: "filter returns one item for last page, ends on last item, not full", prefix: "/", pred: storage.SelectionPredicate{ Field: fields.OneTermEqualSelector("metadata.name", "bar"), Label: labels.Everything(), Limit: 2, Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3"), }, expectedOut: []*example.Pod{preset[4].storedObj}, }, { name: "filter returns one item for last page, starts on last item, full", prefix: "/", pred: storage.SelectionPredicate{ Field: fields.OneTermEqualSelector("metadata.name", "bar"), Label: labels.Everything(), Limit: 1, Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"), }, expectedOut: []*example.Pod{preset[4].storedObj}, }, { name: "filter returns one item for last page, starts on last item, partial page", prefix: "/", pred: storage.SelectionPredicate{ Field: fields.OneTermEqualSelector("metadata.name", "bar"), Label: labels.Everything(), Limit: 2, Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"), }, expectedOut: []*example.Pod{preset[4].storedObj}, }, { name: "filter returns two items, page size equal to total list size", prefix: "/", pred: storage.SelectionPredicate{ Field: fields.OneTermEqualSelector("metadata.name", "bar"), Label: labels.Everything(), Limit: 5, }, expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj}, }, { name: "filter returns one item, page size equal to total list size", prefix: "/", pred: storage.SelectionPredicate{ Field: fields.OneTermEqualSelector("metadata.name", "fourth"), Label: labels.Everything(), Limit: 5, }, expectedOut: []*example.Pod{preset[3].storedObj}, }, } for _, tt := range tests { if tt.pred.GetAttrs == nil { tt.pred.GetAttrs = getAttrs } out := &example.PodList{} var err error if tt.disablePaging { err = disablePagingStore.List(ctx, tt.prefix, tt.rv, tt.pred, out) } else { err = store.List(ctx, tt.prefix, tt.rv, tt.pred, out) } if (err != nil) != tt.expectError { t.Errorf("(%s): List failed: %v", tt.name, err) } if err != nil { continue } if (len(out.Continue) > 0) != tt.expectContinue { t.Errorf("(%s): unexpected continue token: %q", tt.name, out.Continue) } if len(tt.expectedOut) != len(out.Items) { t.Errorf("(%s): length of list want=%d, got=%d", tt.name, len(tt.expectedOut), len(out.Items)) continue } for j, wantPod := range tt.expectedOut { getPod := &out.Items[j] if !reflect.DeepEqual(wantPod, getPod) { t.Errorf("(%s): pod want=%#v, got=%#v", tt.name, wantPod, getPod) } } } } func TestListContinuation(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: // / // - one-level/ // | - test // | // - two-level/ // - 1/ // | - test // | // - 2/ // - test // preset := []struct { key string obj *example.Pod storedObj *example.Pod }{ { key: "/one-level/test", obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, }, { key: "/two-level/1/test", obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, }, { key: "/two-level/2/test", obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, }, } for i, ps := range preset { preset[i].storedObj = &example.Pod{} err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) if err != nil { t.Fatalf("Set failed: %v", err) } } // test continuations out := &example.PodList{} pred := func(limit int64, continueValue string) storage.SelectionPredicate { return storage.SelectionPredicate{ Limit: limit, Continue: continueValue, Label: labels.Everything(), Field: fields.Everything(), GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, } } if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil { t.Fatalf("Unable to get initial list: %v", err) } if len(out.Continue) == 0 { t.Fatalf("No continuation token set") } if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) { t.Fatalf("Unexpected first page: %#v", out.Items) } continueFromSecondItem := out.Continue // no limit, should get two items out = &example.PodList{} if err := store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out); err != nil { t.Fatalf("Unable to get second page: %v", err) } if len(out.Continue) != 0 { t.Fatalf("Unexpected continuation token set") } if !reflect.DeepEqual(out.Items, []example.Pod{*preset[1].storedObj, *preset[2].storedObj}) { key, rv, err := decodeContinue(continueFromSecondItem, "/") t.Logf("continue token was %d %s %v", rv, key, err) t.Fatalf("Unexpected second page: %#v", out.Items) } // limit, should get two more pages out = &example.PodList{} if err := store.List(ctx, "/", "0", pred(1, continueFromSecondItem), out); err != nil { t.Fatalf("Unable to get second page: %v", err) } if len(out.Continue) == 0 { t.Fatalf("No continuation token set") } if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[1].storedObj) { t.Fatalf("Unexpected second page: %#v", out.Items) } continueFromThirdItem := out.Continue out = &example.PodList{} if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil { t.Fatalf("Unable to get second page: %v", err) } if len(out.Continue) != 0 { t.Fatalf("Unexpected continuation token set") } if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) { t.Fatalf("Unexpected third page: %#v", out.Items) } } func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() return ctx, store, cluster } // testPropogateStore helps propogates store with objects, automates key generation, and returns // keys and stored objects. func testPropogateStore(ctx context.Context, t *testing.T, store *store, obj *example.Pod) (string, *example.Pod) { // Setup store with a key and grab the output for returning. key := "/testkey" err := store.unconditionalDelete(ctx, key, &example.Pod{}) if err != nil && !storage.IsNotFound(err) { t.Fatal("Cleanup failed: %v", err) } setOutput := &example.Pod{} if err := store.Create(ctx, key, obj, setOutput, 0); err != nil { t.Fatalf("Set failed: %v", err) } return key, setOutput } func TestPrefix(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) transformer := prefixTransformer{prefix: []byte(defaultTestPrefix)} testcases := map[string]string{ "custom/prefix": "/custom/prefix", "/custom//prefix//": "/custom/prefix", "/registry": "/registry", } for configuredPrefix, effectivePrefix := range testcases { store := newStore(cluster.RandClient(), false, true, codec, configuredPrefix, transformer) if store.pathPrefix != effectivePrefix { t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) } } } func encodeContinueOrDie(apiVersion string, resourceVersion int64, nextKey string) string { out, err := json.Marshal(&continueToken{APIVersion: apiVersion, ResourceVersion: resourceVersion, StartKey: nextKey}) if err != nil { panic(err) } return base64.RawURLEncoding.EncodeToString(out) } func Test_decodeContinue(t *testing.T) { type args struct { continueValue string keyPrefix string } tests := []struct { name string args args wantFromKey string wantRv int64 wantErr bool }{ {name: "valid", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"}, {name: "root path", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "/"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/"}, {name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, {name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, {name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "../key"), keyPrefix: "/test/"}, wantErr: true}, {name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./key"), keyPrefix: "/test/"}, wantErr: true}, {name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true}, {name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { gotFromKey, gotRv, err := decodeContinue(tt.args.continueValue, tt.args.keyPrefix) if (err != nil) != tt.wantErr { t.Errorf("decodeContinue() error = %v, wantErr %v", err, tt.wantErr) return } if gotFromKey != tt.wantFromKey { t.Errorf("decodeContinue() gotFromKey = %v, want %v", gotFromKey, tt.wantFromKey) } if gotRv != tt.wantRv { t.Errorf("decodeContinue() gotRv = %v, want %v", gotRv, tt.wantRv) } }) } } func Test_growSlice(t *testing.T) { type args struct { t reflect.Type initialCapacity int v reflect.Value maxCapacity int sizes []int } tests := []struct { name string args args cap int }{ { name: "empty", args: args{v: reflect.ValueOf([]example.Pod{})}, cap: 0, }, { name: "no sizes", args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10}, cap: 10, }, { name: "above maxCapacity", args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{1, 12}}, cap: 10, }, { name: "takes max", args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{8, 4}}, cap: 8, }, { name: "with existing capacity above max", args: args{initialCapacity: 12, maxCapacity: 10, sizes: []int{8, 4}}, cap: 12, }, { name: "with existing capacity below max", args: args{initialCapacity: 5, maxCapacity: 10, sizes: []int{8, 4}}, cap: 8, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if tt.args.initialCapacity > 0 { tt.args.v = reflect.ValueOf(make([]example.Pod, 0, tt.args.initialCapacity)) } // reflection requires that the value be addressible in order to call set, // so we must ensure the value we created is available on the heap (not a problem // for normal usage) if !tt.args.v.CanAddr() { x := reflect.New(tt.args.v.Type()) x.Elem().Set(tt.args.v) tt.args.v = x.Elem() } growSlice(tt.args.v, tt.args.maxCapacity, tt.args.sizes...) if tt.cap != tt.args.v.Cap() { t.Errorf("Unexpected capacity: got=%d want=%d", tt.args.v.Cap(), tt.cap) } }) } }