diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 22f7b20b7..7c6c675a0 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -35,24 +35,28 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd/metrics" + etcdutil "k8s.io/apiserver/pkg/storage/etcd/util" utilcache "k8s.io/apiserver/pkg/util/cache" utiltrace "k8s.io/apiserver/pkg/util/trace" - etcdutil "k8s.io/apiserver/pkg/storage/etcd/util" ) // ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods // must be able to undo the transformation caused by the other. type ValueTransformer interface { // TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error. - TransformStringFromStorage(string) (string, error) + // Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object + // have not changed. + TransformStringFromStorage(string) (value string, stale bool, err error) // TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error. - TransformStringToStorage(string) (string, error) + TransformStringToStorage(string) (value string, err error) } type identityTransformer struct{} -func (identityTransformer) TransformStringFromStorage(s string) (string, error) { return s, nil } -func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil } +func (identityTransformer) TransformStringFromStorage(s string) (string, bool, error) { + return s, false, nil +} +func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil } // IdentityTransformer performs no transformation on the provided values. var IdentityTransformer ValueTransformer = identityTransformer{} @@ -148,7 +152,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob if _, err := conversion.EnforcePtr(out); err != nil { panic("unable to convert output object to pointer") } - _, _, err = h.extractObj(response, err, out, false, false) + _, _, _, err = h.extractObj(response, err, out, false, false) } return err } @@ -186,7 +190,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, if !etcdutil.IsEtcdNotFound(err) { // if the object that existed prior to the delete is returned by etcd, update the out object. if err != nil || response.PrevNode != nil { - _, _, err = h.extractObj(response, err, out, false, true) + _, _, _, err = h.extractObj(response, err, out, false, true) } } return toStorageErr(err, key, 0) @@ -195,7 +199,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, // Check the preconditions match. obj := reflect.New(v.Type()).Interface().(runtime.Object) for { - _, node, res, err := h.bodyAndExtractObj(ctx, key, obj, false) + _, node, res, _, err := h.bodyAndExtractObj(ctx, key, obj, false) if err != nil { return toStorageErr(err, key, 0) } @@ -216,7 +220,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, if !etcdutil.IsEtcdNotFound(err) { // if the object that existed prior to the delete is returned by etcd, update the out object. if err != nil || response.PrevNode != nil { - _, _, err = h.extractObj(response, err, out, false, true) + _, _, _, err = h.extractObj(response, err, out, false, true) } } return toStorageErr(err, key, 0) @@ -262,13 +266,13 @@ func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string glog.Errorf("Context is nil") } key = path.Join(h.pathPrefix, key) - _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound) + _, _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound) return err } // bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information // about the response, like the current etcd index and the ttl. -func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) { +func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, stale bool, err error) { if ctx == nil { glog.Errorf("Context is nil") } @@ -281,13 +285,13 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r response, err := h.etcdKeysAPI.Get(ctx, key, opts) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) if err != nil && !etcdutil.IsEtcdNotFound(err) { - return "", nil, nil, toStorageErr(err, key, 0) + return "", nil, nil, false, toStorageErr(err, key, 0) } - body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false) - return body, node, response, toStorageErr(err, key, 0) + body, node, stale, err = h.extractObj(response, err, objPtr, ignoreNotFound, false) + return body, node, response, stale, toStorageErr(err, key, 0) } -func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) { +func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, stale bool, err error) { if response != nil { if prevNode { node = response.PrevNode @@ -299,30 +303,30 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run if ignoreNotFound { v, err := conversion.EnforcePtr(objPtr) if err != nil { - return "", nil, err + return "", nil, false, err } v.Set(reflect.Zero(v.Type())) - return "", nil, nil + return "", nil, false, nil } else if inErr != nil { - return "", nil, inErr + return "", nil, false, inErr } - return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response) + return "", nil, false, fmt.Errorf("unable to locate a value on the response: %#v", response) } - body, err = h.transformer.TransformStringFromStorage(node.Value) + body, stale, err = h.transformer.TransformStringFromStorage(node.Value) if err != nil { - return body, nil, storage.NewInternalError(err.Error()) + return body, nil, stale, storage.NewInternalError(err.Error()) } out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr) if err != nil { - return body, nil, err + return body, nil, stale, err } if out != objPtr { - return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr)) + return body, nil, stale, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr)) } // being unable to set the version does not prevent the object from being extracted _ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex) - return body, node, err + return body, node, stale, err } // Implements storage.Interface. @@ -389,7 +393,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } } else { - body, err := h.transformer.TransformStringFromStorage(node.Value) + body, _, err := h.transformer.TransformStringFromStorage(node.Value) if err != nil { // omit items from lists and watches that cannot be transformed, but log the error utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err)) @@ -485,7 +489,7 @@ func (h *etcdHelper) GuaranteedUpdate( key = path.Join(h.pathPrefix, key) for { obj := reflect.New(v.Type()).Interface().(runtime.Object) - origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound) + origBody, node, res, stale, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound) if err != nil { return toStorageErr(err, key, 0) } @@ -552,14 +556,15 @@ func (h *etcdHelper) GuaranteedUpdate( if etcdutil.IsEtcdNodeExist(err) { continue } - _, _, err = h.extractObj(response, err, ptrToType, false, false) + _, _, _, err = h.extractObj(response, err, ptrToType, false, false) return toStorageErr(err, key, 0) } - if newBody == origBody { - // If we don't send an update, we simply return the currently existing - // version of the object. - _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false) + // If we don't send an update, we simply return the currently existing + // version of the object. However, the value transformer may indicate that + // the on disk representation has changed and that we must commit an update. + if newBody == origBody && !stale { + _, _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false) return err } @@ -575,7 +580,7 @@ func (h *etcdHelper) GuaranteedUpdate( // Try again. continue } - _, _, err = h.extractObj(response, err, ptrToType, false, false) + _, _, _, err = h.extractObj(response, err, ptrToType, false, false) return toStorageErr(err, key, int64(index)) } } diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index b0e3d19f0..5845e0a2c 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -48,14 +48,15 @@ import ( // prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. type prefixTransformer struct { prefix string + stale bool err error } -func (p prefixTransformer) TransformStringFromStorage(s string) (string, error) { +func (p prefixTransformer) TransformStringFromStorage(s string) (string, bool, error) { if !strings.HasPrefix(s, p.prefix) { - return "", fmt.Errorf("value does not have expected prefix: %s", s) + return "", false, fmt.Errorf("value does not have expected prefix: %s", s) } - return strings.TrimPrefix(s, p.prefix), p.err + return strings.TrimPrefix(s, p.prefix), p.stale, p.err } func (p prefixTransformer) TransformStringToStorage(s string) (string, error) { if len(s) > 0 { @@ -449,7 +450,8 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + original := &storagetesting.TestResource{} + err := helper.GuaranteedUpdate(context.TODO(), key, original, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { return obj, nil })) if err != nil { @@ -458,7 +460,27 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { // Update an existing node with the same data callbackCalled := false - objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} + objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: original.ResourceVersion}, Value: 1} + result := &storagetesting.TestResource{} + err = helper.GuaranteedUpdate(context.TODO(), key, result, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + callbackCalled = true + return objUpdate, nil + })) + if err != nil { + t.Fatalf("Unexpected error %#v", err) + } + if !callbackCalled { + t.Errorf("tryUpdate callback should have been called.") + } + if result.ResourceVersion != original.ResourceVersion { + t.Fatalf("updated the object resource version") + } + + // Update an existing node with the same data but return stale + helper.transformer = prefixTransformer{prefix: "test!", stale: true} + callbackCalled = false + result = &storagetesting.TestResource{} + objUpdate = &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { callbackCalled = true return objUpdate, nil @@ -469,6 +491,9 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { if !callbackCalled { t.Errorf("tryUpdate callback should have been called.") } + if result.ResourceVersion == original.ResourceVersion { + t.Errorf("did not update the object resource version") + } } func TestGuaranteedUpdateKeyNotFound(t *testing.T) { diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index 821a3705e..1cd368bd8 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -319,7 +319,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { return obj, nil } - body, err := w.valueTransformer.TransformStringFromStorage(node.Value) + body, _, err := w.valueTransformer.TransformStringFromStorage(node.Value) if err != nil { return nil, err } diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 7318f3557..19fc4b6dc 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -36,23 +36,25 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" - utiltrace "k8s.io/apiserver/pkg/util/trace" "k8s.io/apiserver/pkg/storage/etcd" + utiltrace "k8s.io/apiserver/pkg/util/trace" ) // ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods // must be able to undo the transformation caused by the other. type ValueTransformer interface { - // TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error. - TransformFromStorage([]byte) ([]byte, error) - // TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error. - TransformToStorage([]byte) ([]byte, error) + // TransformFromStorage may transform the provided data from its underlying storage representation or return an error. + // Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object + // have not changed. + TransformFromStorage([]byte) (data []byte, stale bool, err error) + // TransformToStorage may transform the provided data into the appropriate form in storage or return an error. + TransformToStorage([]byte) (data []byte, err error) } type identityTransformer struct{} -func (identityTransformer) TransformFromStorage(b []byte) ([]byte, error) { return b, nil } -func (identityTransformer) TransformToStorage(b []byte) ([]byte, error) { return b, nil } +func (identityTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) { return b, false, nil } +func (identityTransformer) TransformToStorage(b []byte) ([]byte, error) { return b, nil } // IdentityTransformer performs no transformation on the provided values. var IdentityTransformer ValueTransformer = identityTransformer{} @@ -75,10 +77,11 @@ type elemForDecode struct { } type objState struct { - obj runtime.Object - meta *storage.ResponseMeta - rev int64 - data []byte + obj runtime.Object + meta *storage.ResponseMeta + rev int64 + data []byte + stale bool } // New returns an etcd3 implementation of storage.Interface. @@ -131,7 +134,7 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out } kv := getResp.Kvs[0] - data, err := s.transformer.TransformFromStorage(kv.Value) + data, _, err := s.transformer.TransformFromStorage(kv.Value) if err != nil { return storage.NewInternalError(err.Error()) } @@ -208,7 +211,7 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime } kv := getResp.Kvs[0] - data, err := s.transformer.TransformFromStorage(kv.Value) + data, _, err := s.transformer.TransformFromStorage(kv.Value) if err != nil { return storage.NewInternalError(err.Error()) } @@ -292,7 +295,7 @@ func (s *store) GuaranteedUpdate( if err != nil { return err } - if bytes.Equal(data, origState.data) { + if !origState.stale && bytes.Equal(data, origState.data) { return decode(s.codec, s.versioner, origState.data, out, origState.rev) } @@ -349,7 +352,7 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin if len(getResp.Kvs) == 0 { return nil } - data, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value) + data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value) if err != nil { return storage.NewInternalError(err.Error()) } @@ -384,7 +387,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor elems := make([]*elemForDecode, 0, len(getResp.Kvs)) for _, kv := range getResp.Kvs { - data, err := s.transformer.TransformFromStorage(kv.Value) + data, _, err := s.transformer.TransformFromStorage(kv.Value) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err)) continue @@ -434,13 +437,14 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va return nil, err } } else { - data, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value) + data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value) if err != nil { return nil, storage.NewInternalError(err.Error()) } state.rev = getResp.Kvs[0].ModRevision state.meta.ResourceVersion = uint64(state.rev) state.data = data + state.stale = stale if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil { return nil, err } diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index 9c88d9ab6..de58ca58c 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -52,14 +52,15 @@ func init() { // prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. type prefixTransformer struct { prefix []byte + stale bool err error } -func (p prefixTransformer) TransformFromStorage(b []byte) ([]byte, error) { +func (p prefixTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) { if !bytes.HasPrefix(b, p.prefix) { - return nil, fmt.Errorf("value does not have expected prefix: %s", string(b)) + return nil, false, fmt.Errorf("value does not have expected prefix: %s", string(b)) } - return bytes.TrimPrefix(b, p.prefix), p.err + return bytes.TrimPrefix(b, p.prefix), p.stale, p.err } func (p prefixTransformer) TransformToStorage(b []byte) ([]byte, error) { if len(b) > 0 { @@ -316,6 +317,7 @@ func TestGuaranteedUpdate(t *testing.T) { expectNotFoundErr bool expectInvalidObjErr bool expectNoUpdate bool + transformStale bool }{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false key: "/non-existing", ignoreNotFound: false, @@ -344,6 +346,14 @@ func TestGuaranteedUpdate(t *testing.T) { expectNotFoundErr: false, expectInvalidObjErr: false, expectNoUpdate: true, + }, { // GuaranteedUpdate with same data but stale + key: key, + ignoreNotFound: false, + precondition: nil, + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: false, + transformStale: true, }, { // GuaranteedUpdate with UID match key: key, ignoreNotFound: false, @@ -366,6 +376,12 @@ func TestGuaranteedUpdate(t *testing.T) { if tt.expectNoUpdate { name = storeObj.Name } + originalTransformer := store.transformer.(prefixTransformer) + if tt.transformStale { + transformer := originalTransformer + transformer.stale = true + store.transformer = transformer + } version := storeObj.ResourceVersion err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { @@ -378,6 +394,7 @@ func TestGuaranteedUpdate(t *testing.T) { pod.Name = name return &pod, nil })) + store.transformer = originalTransformer if tt.expectNotFoundErr { if err == nil || !storage.IsNotFound(err) { diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 05c6a8bd1..fca49d9cc 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -343,7 +343,7 @@ func (wc *watchChan) sendEvent(e *event) { func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) { if !e.isDeleted { - data, err := wc.watcher.transformer.TransformFromStorage(e.value) + data, _, err := wc.watcher.transformer.TransformFromStorage(e.value) if err != nil { return nil, nil, err } @@ -358,7 +358,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim // we need the object only to compute whether it was filtered out // before). if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) { - data, err := wc.watcher.transformer.TransformFromStorage(e.prevValue) + data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue) if err != nil { return nil, nil, err }