Allow ValueTransformer to indicate resource is stale

Allows a transformer (such as an encrypter) to force an update if a new
key is in use, thus allowing simple writes to the REST layer to
trivially migrate keys.
This commit is contained in:
Clayton Coleman 2017-02-05 22:37:10 -05:00 committed by deads2k
parent 3d8615c851
commit 74dc1360f1
6 changed files with 111 additions and 60 deletions

View File

@ -35,24 +35,28 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd/metrics"
etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
utilcache "k8s.io/apiserver/pkg/util/cache"
utiltrace "k8s.io/apiserver/pkg/util/trace"
etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
)
// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
// must be able to undo the transformation caused by the other.
type ValueTransformer interface {
// TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error.
TransformStringFromStorage(string) (string, error)
// Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
// have not changed.
TransformStringFromStorage(string) (value string, stale bool, err error)
// TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error.
TransformStringToStorage(string) (string, error)
TransformStringToStorage(string) (value string, err error)
}
type identityTransformer struct{}
func (identityTransformer) TransformStringFromStorage(s string) (string, error) { return s, nil }
func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil }
func (identityTransformer) TransformStringFromStorage(s string) (string, bool, error) {
return s, false, nil
}
func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil }
// IdentityTransformer performs no transformation on the provided values.
var IdentityTransformer ValueTransformer = identityTransformer{}
@ -148,7 +152,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
_, _, _, err = h.extractObj(response, err, out, false, false)
}
return err
}
@ -186,7 +190,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
_, _, _, err = h.extractObj(response, err, out, false, true)
}
}
return toStorageErr(err, key, 0)
@ -195,7 +199,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
// Check the preconditions match.
obj := reflect.New(v.Type()).Interface().(runtime.Object)
for {
_, node, res, err := h.bodyAndExtractObj(ctx, key, obj, false)
_, node, res, _, err := h.bodyAndExtractObj(ctx, key, obj, false)
if err != nil {
return toStorageErr(err, key, 0)
}
@ -216,7 +220,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
_, _, _, err = h.extractObj(response, err, out, false, true)
}
}
return toStorageErr(err, key, 0)
@ -262,13 +266,13 @@ func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string
glog.Errorf("Context is nil")
}
key = path.Join(h.pathPrefix, key)
_, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
_, _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
return err
}
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
// about the response, like the current etcd index and the ttl.
func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, stale bool, err error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
@ -281,13 +285,13 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
if err != nil && !etcdutil.IsEtcdNotFound(err) {
return "", nil, nil, toStorageErr(err, key, 0)
return "", nil, nil, false, toStorageErr(err, key, 0)
}
body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
return body, node, response, toStorageErr(err, key, 0)
body, node, stale, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
return body, node, response, stale, toStorageErr(err, key, 0)
}
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) {
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, stale bool, err error) {
if response != nil {
if prevNode {
node = response.PrevNode
@ -299,30 +303,30 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
if ignoreNotFound {
v, err := conversion.EnforcePtr(objPtr)
if err != nil {
return "", nil, err
return "", nil, false, err
}
v.Set(reflect.Zero(v.Type()))
return "", nil, nil
return "", nil, false, nil
} else if inErr != nil {
return "", nil, inErr
return "", nil, false, inErr
}
return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
return "", nil, false, fmt.Errorf("unable to locate a value on the response: %#v", response)
}
body, err = h.transformer.TransformStringFromStorage(node.Value)
body, stale, err = h.transformer.TransformStringFromStorage(node.Value)
if err != nil {
return body, nil, storage.NewInternalError(err.Error())
return body, nil, stale, storage.NewInternalError(err.Error())
}
out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
if err != nil {
return body, nil, err
return body, nil, stale, err
}
if out != objPtr {
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
return body, nil, stale, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
}
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
return body, node, err
return body, node, stale, err
}
// Implements storage.Interface.
@ -389,7 +393,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
} else {
body, err := h.transformer.TransformStringFromStorage(node.Value)
body, _, err := h.transformer.TransformStringFromStorage(node.Value)
if err != nil {
// omit items from lists and watches that cannot be transformed, but log the error
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err))
@ -485,7 +489,7 @@ func (h *etcdHelper) GuaranteedUpdate(
key = path.Join(h.pathPrefix, key)
for {
obj := reflect.New(v.Type()).Interface().(runtime.Object)
origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
origBody, node, res, stale, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
if err != nil {
return toStorageErr(err, key, 0)
}
@ -552,14 +556,15 @@ func (h *etcdHelper) GuaranteedUpdate(
if etcdutil.IsEtcdNodeExist(err) {
continue
}
_, _, err = h.extractObj(response, err, ptrToType, false, false)
_, _, _, err = h.extractObj(response, err, ptrToType, false, false)
return toStorageErr(err, key, 0)
}
if newBody == origBody {
// If we don't send an update, we simply return the currently existing
// version of the object.
_, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
// If we don't send an update, we simply return the currently existing
// version of the object. However, the value transformer may indicate that
// the on disk representation has changed and that we must commit an update.
if newBody == origBody && !stale {
_, _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
return err
}
@ -575,7 +580,7 @@ func (h *etcdHelper) GuaranteedUpdate(
// Try again.
continue
}
_, _, err = h.extractObj(response, err, ptrToType, false, false)
_, _, _, err = h.extractObj(response, err, ptrToType, false, false)
return toStorageErr(err, key, int64(index))
}
}

View File

@ -48,14 +48,15 @@ import (
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
type prefixTransformer struct {
prefix string
stale bool
err error
}
func (p prefixTransformer) TransformStringFromStorage(s string) (string, error) {
func (p prefixTransformer) TransformStringFromStorage(s string) (string, bool, error) {
if !strings.HasPrefix(s, p.prefix) {
return "", fmt.Errorf("value does not have expected prefix: %s", s)
return "", false, fmt.Errorf("value does not have expected prefix: %s", s)
}
return strings.TrimPrefix(s, p.prefix), p.err
return strings.TrimPrefix(s, p.prefix), p.stale, p.err
}
func (p prefixTransformer) TransformStringToStorage(s string) (string, error) {
if len(s) > 0 {
@ -449,7 +450,8 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
original := &storagetesting.TestResource{}
err := helper.GuaranteedUpdate(context.TODO(), key, original, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil
}))
if err != nil {
@ -458,7 +460,27 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
// Update an existing node with the same data
callbackCalled := false
objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: original.ResourceVersion}, Value: 1}
result := &storagetesting.TestResource{}
err = helper.GuaranteedUpdate(context.TODO(), key, result, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true
return objUpdate, nil
}))
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.")
}
if result.ResourceVersion != original.ResourceVersion {
t.Fatalf("updated the object resource version")
}
// Update an existing node with the same data but return stale
helper.transformer = prefixTransformer{prefix: "test!", stale: true}
callbackCalled = false
result = &storagetesting.TestResource{}
objUpdate = &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true
return objUpdate, nil
@ -469,6 +491,9 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.")
}
if result.ResourceVersion == original.ResourceVersion {
t.Errorf("did not update the object resource version")
}
}
func TestGuaranteedUpdateKeyNotFound(t *testing.T) {

View File

@ -319,7 +319,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
return obj, nil
}
body, err := w.valueTransformer.TransformStringFromStorage(node.Value)
body, _, err := w.valueTransformer.TransformStringFromStorage(node.Value)
if err != nil {
return nil, err
}

View File

@ -36,23 +36,25 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/apiserver/pkg/storage/etcd"
utiltrace "k8s.io/apiserver/pkg/util/trace"
)
// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
// must be able to undo the transformation caused by the other.
type ValueTransformer interface {
// TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error.
TransformFromStorage([]byte) ([]byte, error)
// TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error.
TransformToStorage([]byte) ([]byte, error)
// TransformFromStorage may transform the provided data from its underlying storage representation or return an error.
// Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
// have not changed.
TransformFromStorage([]byte) (data []byte, stale bool, err error)
// TransformToStorage may transform the provided data into the appropriate form in storage or return an error.
TransformToStorage([]byte) (data []byte, err error)
}
type identityTransformer struct{}
func (identityTransformer) TransformFromStorage(b []byte) ([]byte, error) { return b, nil }
func (identityTransformer) TransformToStorage(b []byte) ([]byte, error) { return b, nil }
func (identityTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) { return b, false, nil }
func (identityTransformer) TransformToStorage(b []byte) ([]byte, error) { return b, nil }
// IdentityTransformer performs no transformation on the provided values.
var IdentityTransformer ValueTransformer = identityTransformer{}
@ -75,10 +77,11 @@ type elemForDecode struct {
}
type objState struct {
obj runtime.Object
meta *storage.ResponseMeta
rev int64
data []byte
obj runtime.Object
meta *storage.ResponseMeta
rev int64
data []byte
stale bool
}
// New returns an etcd3 implementation of storage.Interface.
@ -131,7 +134,7 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out
}
kv := getResp.Kvs[0]
data, err := s.transformer.TransformFromStorage(kv.Value)
data, _, err := s.transformer.TransformFromStorage(kv.Value)
if err != nil {
return storage.NewInternalError(err.Error())
}
@ -208,7 +211,7 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime
}
kv := getResp.Kvs[0]
data, err := s.transformer.TransformFromStorage(kv.Value)
data, _, err := s.transformer.TransformFromStorage(kv.Value)
if err != nil {
return storage.NewInternalError(err.Error())
}
@ -292,7 +295,7 @@ func (s *store) GuaranteedUpdate(
if err != nil {
return err
}
if bytes.Equal(data, origState.data) {
if !origState.stale && bytes.Equal(data, origState.data) {
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
@ -349,7 +352,7 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
if len(getResp.Kvs) == 0 {
return nil
}
data, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
if err != nil {
return storage.NewInternalError(err.Error())
}
@ -384,7 +387,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
elems := make([]*elemForDecode, 0, len(getResp.Kvs))
for _, kv := range getResp.Kvs {
data, err := s.transformer.TransformFromStorage(kv.Value)
data, _, err := s.transformer.TransformFromStorage(kv.Value)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err))
continue
@ -434,13 +437,14 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va
return nil, err
}
} else {
data, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
if err != nil {
return nil, storage.NewInternalError(err.Error())
}
state.rev = getResp.Kvs[0].ModRevision
state.meta.ResourceVersion = uint64(state.rev)
state.data = data
state.stale = stale
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
return nil, err
}

View File

@ -52,14 +52,15 @@ func init() {
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
type prefixTransformer struct {
prefix []byte
stale bool
err error
}
func (p prefixTransformer) TransformFromStorage(b []byte) ([]byte, error) {
func (p prefixTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) {
if !bytes.HasPrefix(b, p.prefix) {
return nil, fmt.Errorf("value does not have expected prefix: %s", string(b))
return nil, false, fmt.Errorf("value does not have expected prefix: %s", string(b))
}
return bytes.TrimPrefix(b, p.prefix), p.err
return bytes.TrimPrefix(b, p.prefix), p.stale, p.err
}
func (p prefixTransformer) TransformToStorage(b []byte) ([]byte, error) {
if len(b) > 0 {
@ -316,6 +317,7 @@ func TestGuaranteedUpdate(t *testing.T) {
expectNotFoundErr bool
expectInvalidObjErr bool
expectNoUpdate bool
transformStale bool
}{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false
key: "/non-existing",
ignoreNotFound: false,
@ -344,6 +346,14 @@ func TestGuaranteedUpdate(t *testing.T) {
expectNotFoundErr: false,
expectInvalidObjErr: false,
expectNoUpdate: true,
}, { // GuaranteedUpdate with same data but stale
key: key,
ignoreNotFound: false,
precondition: nil,
expectNotFoundErr: false,
expectInvalidObjErr: false,
expectNoUpdate: false,
transformStale: true,
}, { // GuaranteedUpdate with UID match
key: key,
ignoreNotFound: false,
@ -366,6 +376,12 @@ func TestGuaranteedUpdate(t *testing.T) {
if tt.expectNoUpdate {
name = storeObj.Name
}
originalTransformer := store.transformer.(prefixTransformer)
if tt.transformStale {
transformer := originalTransformer
transformer.stale = true
store.transformer = transformer
}
version := storeObj.ResourceVersion
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
@ -378,6 +394,7 @@ func TestGuaranteedUpdate(t *testing.T) {
pod.Name = name
return &pod, nil
}))
store.transformer = originalTransformer
if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) {

View File

@ -343,7 +343,7 @@ func (wc *watchChan) sendEvent(e *event) {
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
if !e.isDeleted {
data, err := wc.watcher.transformer.TransformFromStorage(e.value)
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value)
if err != nil {
return nil, nil, err
}
@ -358,7 +358,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
// we need the object only to compute whether it was filtered out
// before).
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
data, err := wc.watcher.transformer.TransformFromStorage(e.prevValue)
data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue)
if err != nil {
return nil, nil, err
}