Move GuaranteedUpdate test to generic test package
Kubernetes-commit: 012676acc3dd2b235bd601e4105294d2715f155b
This commit is contained in:
parent
4202b94cb2
commit
1960fa837f
|
@ -17,7 +17,6 @@ limitations under the License.
|
|||
package etcd3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
@ -63,67 +62,35 @@ func init() {
|
|||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, os.Stderr))
|
||||
}
|
||||
|
||||
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
|
||||
type prefixTransformer struct {
|
||||
prefix []byte
|
||||
stale bool
|
||||
err error
|
||||
reads uint64
|
||||
}
|
||||
|
||||
func (p *prefixTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||
atomic.AddUint64(&p.reads, 1)
|
||||
if dataCtx == nil {
|
||||
panic("no context provided")
|
||||
}
|
||||
if !bytes.HasPrefix(data, p.prefix) {
|
||||
return nil, false, fmt.Errorf("value does not have expected prefix %q: %s,", p.prefix, string(data))
|
||||
}
|
||||
return bytes.TrimPrefix(data, p.prefix), p.stale, p.err
|
||||
}
|
||||
func (p *prefixTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||
if dataCtx == nil {
|
||||
panic("no context provided")
|
||||
}
|
||||
if len(data) > 0 {
|
||||
return append(append([]byte{}, p.prefix...), data...), p.err
|
||||
}
|
||||
return data, p.err
|
||||
}
|
||||
|
||||
func (p *prefixTransformer) resetReads() {
|
||||
p.reads = 0
|
||||
}
|
||||
|
||||
func newPod() runtime.Object {
|
||||
return &example.Pod{}
|
||||
}
|
||||
|
||||
func TestCreate(t *testing.T) {
|
||||
ctx, store, etcdClient := testSetup(t)
|
||||
storagetesting.RunTestCreate(ctx, t, store, func(ctx context.Context, t *testing.T, key string) {
|
||||
checkStorageInvariants(ctx, t, etcdClient, store.codec, key)
|
||||
})
|
||||
storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(etcdClient, store.codec))
|
||||
}
|
||||
|
||||
func checkStorageInvariants(ctx context.Context, t *testing.T, etcdClient *clientv3.Client, codec runtime.Codec, key string) {
|
||||
getResp, err := etcdClient.KV.Get(ctx, key)
|
||||
if err != nil {
|
||||
t.Fatalf("etcdClient.KV.Get failed: %v", err)
|
||||
}
|
||||
if len(getResp.Kvs) == 0 {
|
||||
t.Fatalf("expecting non empty result on key: %s", key)
|
||||
}
|
||||
decoded, err := runtime.Decode(codec, getResp.Kvs[0].Value[len(defaultTestPrefix):])
|
||||
if err != nil {
|
||||
t.Fatalf("expecting successful decode of object from %v\n%v", err, string(getResp.Kvs[0].Value))
|
||||
}
|
||||
obj := decoded.(*example.Pod)
|
||||
if obj.ResourceVersion != "" {
|
||||
t.Errorf("stored object should have empty resource version")
|
||||
}
|
||||
if obj.SelfLink != "" {
|
||||
t.Errorf("stored output should have empty selfLink")
|
||||
func checkStorageInvariants(etcdClient *clientv3.Client, codec runtime.Codec) storagetesting.KeyValidation {
|
||||
return func(ctx context.Context, t *testing.T, key string) {
|
||||
getResp, err := etcdClient.KV.Get(ctx, key)
|
||||
if err != nil {
|
||||
t.Fatalf("etcdClient.KV.Get failed: %v", err)
|
||||
}
|
||||
if len(getResp.Kvs) == 0 {
|
||||
t.Fatalf("expecting non empty result on key: %s", key)
|
||||
}
|
||||
decoded, err := runtime.Decode(codec, getResp.Kvs[0].Value[len(defaultTestPrefix):])
|
||||
if err != nil {
|
||||
t.Fatalf("expecting successful decode of object from %v\n%v", err, string(getResp.Kvs[0].Value))
|
||||
}
|
||||
obj := decoded.(*example.Pod)
|
||||
if obj.ResourceVersion != "" {
|
||||
t.Errorf("stored object should have empty resource version")
|
||||
}
|
||||
if obj.SelfLink != "" {
|
||||
t.Errorf("stored output should have empty selfLink")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -207,157 +174,22 @@ func TestGetListNonRecursive(t *testing.T) {
|
|||
storagetesting.RunTestGetListNonRecursive(ctx, t, store)
|
||||
}
|
||||
|
||||
type storeWithPrefixTransformer struct {
|
||||
*store
|
||||
}
|
||||
|
||||
func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetesting.PrefixTransformerModifier) func() {
|
||||
originalTransformer := s.transformer.(*storagetesting.PrefixTransformer)
|
||||
transformer := *originalTransformer
|
||||
s.transformer = modifier(&transformer)
|
||||
return func() {
|
||||
s.transformer = originalTransformer
|
||||
}
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdate(t *testing.T) {
|
||||
ctx, store, etcdClient := testSetup(t)
|
||||
key := "/testkey"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
key string
|
||||
ignoreNotFound bool
|
||||
precondition *storage.Preconditions
|
||||
expectNotFoundErr bool
|
||||
expectInvalidObjErr bool
|
||||
expectNoUpdate bool
|
||||
transformStale bool
|
||||
hasSelfLink bool
|
||||
}{{
|
||||
name: "non-existing key, ignoreNotFound=false",
|
||||
key: "/non-existing",
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: true,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
}, {
|
||||
name: "non-existing key, ignoreNotFound=true",
|
||||
key: "/non-existing",
|
||||
ignoreNotFound: true,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
}, {
|
||||
name: "existing key",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
}, {
|
||||
name: "same data",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
}, {
|
||||
name: "same data, a selfLink",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
hasSelfLink: true,
|
||||
}, {
|
||||
name: "same data, stale",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
transformStale: true,
|
||||
}, {
|
||||
name: "UID match",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: storage.NewUIDPreconditions("A"),
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
}, {
|
||||
name: "UID mismatch",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: storage.NewUIDPreconditions("B"),
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: true,
|
||||
expectNoUpdate: true,
|
||||
}}
|
||||
|
||||
for i, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
key, storeObj := storagetesting.TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||
|
||||
out := &example.Pod{}
|
||||
name := fmt.Sprintf("foo-%d", i)
|
||||
if tt.expectNoUpdate {
|
||||
name = storeObj.Name
|
||||
}
|
||||
originalTransformer := store.transformer.(*prefixTransformer)
|
||||
if tt.transformStale {
|
||||
transformer := *originalTransformer
|
||||
transformer.stale = true
|
||||
store.transformer = &transformer
|
||||
}
|
||||
version := storeObj.ResourceVersion
|
||||
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
|
||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||
if tt.expectNotFoundErr && tt.ignoreNotFound {
|
||||
if pod := obj.(*example.Pod); pod.Name != "" {
|
||||
t.Errorf("%s: expecting zero value, but get=%#v", tt.name, pod)
|
||||
}
|
||||
}
|
||||
pod := *storeObj
|
||||
if tt.hasSelfLink {
|
||||
pod.SelfLink = "testlink"
|
||||
}
|
||||
pod.Name = name
|
||||
return &pod, nil
|
||||
}), nil)
|
||||
store.transformer = originalTransformer
|
||||
|
||||
if tt.expectNotFoundErr {
|
||||
if err == nil || !storage.IsNotFound(err) {
|
||||
t.Errorf("%s: expecting not found error, but get: %v", tt.name, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if tt.expectInvalidObjErr {
|
||||
if err == nil || !storage.IsInvalidObj(err) {
|
||||
t.Errorf("%s: expecting invalid UID error, but get: %s", tt.name, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("%s: GuaranteedUpdate failed: %v", tt.name, err)
|
||||
}
|
||||
if out.ObjectMeta.Name != name {
|
||||
t.Errorf("%s: pod name want=%s, get=%s", tt.name, name, out.ObjectMeta.Name)
|
||||
}
|
||||
if out.SelfLink != "" {
|
||||
t.Errorf("%s: selfLink should not be set", tt.name)
|
||||
}
|
||||
|
||||
// verify that kv pair is not empty after set and that the underlying data matches expectations
|
||||
checkStorageInvariants(ctx, t, etcdClient, store.codec, key)
|
||||
|
||||
switch tt.expectNoUpdate {
|
||||
case true:
|
||||
if version != out.ResourceVersion {
|
||||
t.Errorf("%s: expect no version change, before=%s, after=%s", tt.name, version, out.ResourceVersion)
|
||||
}
|
||||
case false:
|
||||
if version == out.ResourceVersion {
|
||||
t.Errorf("%s: expect version change, but get the same version=%s", tt.name, version)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(etcdClient, store.codec))
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
||||
|
@ -413,7 +245,7 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
|
|||
t.Errorf("guaranteed update should have short-circuited write, got %#v", out)
|
||||
}
|
||||
|
||||
store.transformer = &prefixTransformer{prefix: []byte(defaultTestPrefix), stale: true}
|
||||
store.transformer = storagetesting.NewPrefixTransformer([]byte(defaultTestPrefix), true)
|
||||
|
||||
// this update should write to etcd because the transformer reported stale
|
||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil,
|
||||
|
@ -468,7 +300,7 @@ func TestTransformationFailure(t *testing.T) {
|
|||
|
||||
// create a second resource with an invalid prefix
|
||||
oldTransformer := store.transformer
|
||||
store.transformer = &prefixTransformer{prefix: []byte("otherprefix!")}
|
||||
store.transformer = storagetesting.NewPrefixTransformer([]byte("otherprefix!"), false)
|
||||
for i, ps := range preset[1:] {
|
||||
preset[1:][i].storedObj = &example.Pod{}
|
||||
err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0)
|
||||
|
@ -526,7 +358,7 @@ func TestListWithoutPaging(t *testing.T) {
|
|||
|
||||
func TestListContinuation(t *testing.T) {
|
||||
ctx, store, etcdClient := testSetup(t)
|
||||
transformer := store.transformer.(*prefixTransformer)
|
||||
transformer := store.transformer.(*storagetesting.PrefixTransformer)
|
||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||
etcdClient.KV = recorder
|
||||
|
||||
|
@ -595,13 +427,13 @@ func TestListContinuation(t *testing.T) {
|
|||
t.Fatalf("No continuation token set")
|
||||
}
|
||||
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
|
||||
if transformer.reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||
if reads := transformer.GetReads(); reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", reads)
|
||||
}
|
||||
if recorder.reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||
}
|
||||
transformer.resetReads()
|
||||
transformer.ResetReads()
|
||||
recorder.resetReads()
|
||||
|
||||
continueFromSecondItem := out.Continue
|
||||
|
@ -622,13 +454,13 @@ func TestListContinuation(t *testing.T) {
|
|||
key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/")
|
||||
t.Logf("continue token was %d %s %v", rv, key, err)
|
||||
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items)
|
||||
if transformer.reads != 2 {
|
||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||
if reads := transformer.GetReads(); reads != 2 {
|
||||
t.Errorf("unexpected reads: %d", reads)
|
||||
}
|
||||
if recorder.reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||
}
|
||||
transformer.resetReads()
|
||||
transformer.ResetReads()
|
||||
recorder.resetReads()
|
||||
|
||||
// limit, should get two more pages
|
||||
|
@ -645,13 +477,13 @@ func TestListContinuation(t *testing.T) {
|
|||
t.Fatalf("No continuation token set")
|
||||
}
|
||||
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
||||
if transformer.reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||
if reads := transformer.GetReads(); reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", reads)
|
||||
}
|
||||
if recorder.reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||
}
|
||||
transformer.resetReads()
|
||||
transformer.ResetReads()
|
||||
recorder.resetReads()
|
||||
|
||||
continueFromThirdItem := out.Continue
|
||||
|
@ -669,19 +501,17 @@ func TestListContinuation(t *testing.T) {
|
|||
t.Fatalf("Unexpected continuation token set")
|
||||
}
|
||||
storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
||||
if transformer.reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||
if reads := transformer.GetReads(); reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", reads)
|
||||
}
|
||||
if recorder.reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||
}
|
||||
transformer.resetReads()
|
||||
recorder.resetReads()
|
||||
}
|
||||
|
||||
func TestListPaginationRareObject(t *testing.T) {
|
||||
ctx, store, etcdClient := testSetup(t)
|
||||
transformer := store.transformer.(*prefixTransformer)
|
||||
transformer := store.transformer.(*storagetesting.PrefixTransformer)
|
||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||
etcdClient.KV = recorder
|
||||
|
||||
|
@ -720,8 +550,8 @@ func TestListPaginationRareObject(t *testing.T) {
|
|||
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) {
|
||||
t.Fatalf("Unexpected first page: %#v", out.Items)
|
||||
}
|
||||
if transformer.reads != uint64(podCount) {
|
||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||
if reads := transformer.GetReads(); reads != uint64(podCount) {
|
||||
t.Errorf("unexpected reads: %d", reads)
|
||||
}
|
||||
// We expect that kube-apiserver will be increasing page sizes
|
||||
// if not full pages are received, so we should see significantly less
|
||||
|
@ -733,8 +563,6 @@ func TestListPaginationRareObject(t *testing.T) {
|
|||
if recorder.reads != 10 {
|
||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||
}
|
||||
transformer.resetReads()
|
||||
recorder.resetReads()
|
||||
}
|
||||
|
||||
type clientRecorder struct {
|
||||
|
@ -753,7 +581,7 @@ func (r *clientRecorder) resetReads() {
|
|||
|
||||
func TestListContinuationWithFilter(t *testing.T) {
|
||||
ctx, store, etcdClient := testSetup(t)
|
||||
transformer := store.transformer.(*prefixTransformer)
|
||||
transformer := store.transformer.(*storagetesting.PrefixTransformer)
|
||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||
etcdClient.KV = recorder
|
||||
|
||||
|
@ -817,13 +645,13 @@ func TestListContinuationWithFilter(t *testing.T) {
|
|||
t.Errorf("No continuation token set")
|
||||
}
|
||||
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items)
|
||||
if transformer.reads != 3 {
|
||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||
if reads := transformer.GetReads(); reads != 3 {
|
||||
t.Errorf("unexpected reads: %d", reads)
|
||||
}
|
||||
if recorder.reads != 2 {
|
||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||
}
|
||||
transformer.resetReads()
|
||||
transformer.ResetReads()
|
||||
recorder.resetReads()
|
||||
|
||||
// the rest of the test does not make sense if the previous call failed
|
||||
|
@ -849,14 +677,12 @@ func TestListContinuationWithFilter(t *testing.T) {
|
|||
t.Errorf("Unexpected continuation token set")
|
||||
}
|
||||
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items)
|
||||
if transformer.reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||
if reads := transformer.GetReads(); reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", reads)
|
||||
}
|
||||
if recorder.reads != 1 {
|
||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||
}
|
||||
transformer.resetReads()
|
||||
recorder.resetReads()
|
||||
}
|
||||
|
||||
func TestListInconsistentContinuation(t *testing.T) {
|
||||
|
@ -1026,8 +852,8 @@ func newTestLeaseManagerConfig() LeaseManagerConfig {
|
|||
return cfg
|
||||
}
|
||||
|
||||
func newTestTransformer() *prefixTransformer {
|
||||
return &prefixTransformer{prefix: []byte(defaultTestPrefix)}
|
||||
func newTestTransformer() value.Transformer {
|
||||
return storagetesting.NewPrefixTransformer([]byte(defaultTestPrefix), false)
|
||||
}
|
||||
|
||||
type setupOptions struct {
|
||||
|
|
|
@ -35,6 +35,7 @@ import (
|
|||
"k8s.io/apiserver/pkg/apis/example"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
|
@ -1149,6 +1150,169 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage
|
|||
}
|
||||
}
|
||||
|
||||
type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer
|
||||
|
||||
type InterfaceWithPrefixTransformer interface {
|
||||
storage.Interface
|
||||
|
||||
UpdatePrefixTransformer(PrefixTransformerModifier) func()
|
||||
}
|
||||
|
||||
func RunTestGuaranteedUpdate(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer, validation KeyValidation) {
|
||||
key := "/testkey"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
key string
|
||||
ignoreNotFound bool
|
||||
precondition *storage.Preconditions
|
||||
expectNotFoundErr bool
|
||||
expectInvalidObjErr bool
|
||||
expectNoUpdate bool
|
||||
transformStale bool
|
||||
hasSelfLink bool
|
||||
}{{
|
||||
name: "non-existing key, ignoreNotFound=false",
|
||||
key: "/non-existing",
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: true,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
}, {
|
||||
name: "non-existing key, ignoreNotFound=true",
|
||||
key: "/non-existing",
|
||||
ignoreNotFound: true,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
}, {
|
||||
name: "existing key",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
}, {
|
||||
name: "same data",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
}, {
|
||||
name: "same data, a selfLink",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
hasSelfLink: true,
|
||||
}, {
|
||||
name: "same data, stale",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
transformStale: true,
|
||||
}, {
|
||||
name: "UID match",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: storage.NewUIDPreconditions("A"),
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
}, {
|
||||
name: "UID mismatch",
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: storage.NewUIDPreconditions("B"),
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: true,
|
||||
expectNoUpdate: true,
|
||||
}}
|
||||
|
||||
for i, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
key, storeObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||
|
||||
out := &example.Pod{}
|
||||
name := fmt.Sprintf("foo-%d", i)
|
||||
if tt.expectNoUpdate {
|
||||
name = storeObj.Name
|
||||
}
|
||||
|
||||
if tt.transformStale {
|
||||
revertTransformer := store.UpdatePrefixTransformer(
|
||||
func(transformer *PrefixTransformer) value.Transformer {
|
||||
transformer.stale = true
|
||||
return transformer
|
||||
})
|
||||
defer revertTransformer()
|
||||
}
|
||||
|
||||
version := storeObj.ResourceVersion
|
||||
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
|
||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||
if tt.expectNotFoundErr && tt.ignoreNotFound {
|
||||
if pod := obj.(*example.Pod); pod.Name != "" {
|
||||
t.Errorf("%s: expecting zero value, but get=%#v", tt.name, pod)
|
||||
}
|
||||
}
|
||||
pod := *storeObj
|
||||
if tt.hasSelfLink {
|
||||
pod.SelfLink = "testlink"
|
||||
}
|
||||
pod.Name = name
|
||||
return &pod, nil
|
||||
}), nil)
|
||||
|
||||
if tt.expectNotFoundErr {
|
||||
if err == nil || !storage.IsNotFound(err) {
|
||||
t.Errorf("%s: expecting not found error, but get: %v", tt.name, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if tt.expectInvalidObjErr {
|
||||
if err == nil || !storage.IsInvalidObj(err) {
|
||||
t.Errorf("%s: expecting invalid UID error, but get: %s", tt.name, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("%s: GuaranteedUpdate failed: %v", tt.name, err)
|
||||
}
|
||||
if out.ObjectMeta.Name != name {
|
||||
t.Errorf("%s: pod name want=%s, get=%s", tt.name, name, out.ObjectMeta.Name)
|
||||
}
|
||||
if out.SelfLink != "" {
|
||||
t.Errorf("%s: selfLink should not be set", tt.name)
|
||||
}
|
||||
|
||||
// verify that kv pair is not empty after set and that the underlying data matches expectations
|
||||
validation(ctx, t, key)
|
||||
|
||||
switch tt.expectNoUpdate {
|
||||
case true:
|
||||
if version != out.ResourceVersion {
|
||||
t.Errorf("%s: expect no version change, before=%s, after=%s", tt.name, version, out.ResourceVersion)
|
||||
}
|
||||
case false:
|
||||
if version == out.ResourceVersion {
|
||||
t.Errorf("%s: expect version change, but get the same version=%s", tt.name, version)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func RunTestGuaranteedUpdateWithTTL(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
|
||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
|
|
|
@ -17,10 +17,12 @@ limitations under the License.
|
|||
package testing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -32,6 +34,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
)
|
||||
|
||||
// CreateObj will create a single object using the storage interface.
|
||||
|
@ -193,3 +196,46 @@ func ResourceVersionNotOlderThan(sentinel string) func(string) error {
|
|||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// PrefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
|
||||
type PrefixTransformer struct {
|
||||
prefix []byte
|
||||
stale bool
|
||||
err error
|
||||
reads uint64
|
||||
}
|
||||
|
||||
func NewPrefixTransformer(prefix []byte, stale bool) *PrefixTransformer {
|
||||
return &PrefixTransformer{
|
||||
prefix: prefix,
|
||||
stale: stale,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PrefixTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||
atomic.AddUint64(&p.reads, 1)
|
||||
if dataCtx == nil {
|
||||
panic("no context provided")
|
||||
}
|
||||
if !bytes.HasPrefix(data, p.prefix) {
|
||||
return nil, false, fmt.Errorf("value does not have expected prefix %q: %s,", p.prefix, string(data))
|
||||
}
|
||||
return bytes.TrimPrefix(data, p.prefix), p.stale, p.err
|
||||
}
|
||||
func (p *PrefixTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||
if dataCtx == nil {
|
||||
panic("no context provided")
|
||||
}
|
||||
if len(data) > 0 {
|
||||
return append(append([]byte{}, p.prefix...), data...), p.err
|
||||
}
|
||||
return data, p.err
|
||||
}
|
||||
|
||||
func (p *PrefixTransformer) GetReads() uint64 {
|
||||
return atomic.LoadUint64(&p.reads)
|
||||
}
|
||||
|
||||
func (p *PrefixTransformer) ResetReads() {
|
||||
atomic.StoreUint64(&p.reads, 0)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue