diff --git a/pkg/storage/errors/storage.go b/pkg/storage/errors/storage.go index 84f8d8577..fd3b35ed0 100644 --- a/pkg/storage/errors/storage.go +++ b/pkg/storage/errors/storage.go @@ -30,6 +30,8 @@ func InterpretListError(err error, qualifiedResource schema.GroupResource) error return errors.NewNotFound(qualifiedResource, "") case storage.IsUnreachable(err): return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level + case storage.IsInternalError(err): + return errors.NewInternalError(err) default: return err } @@ -43,6 +45,8 @@ func InterpretGetError(err error, qualifiedResource schema.GroupResource, name s return errors.NewNotFound(qualifiedResource, name) case storage.IsUnreachable(err): return errors.NewServerTimeout(qualifiedResource, "get", 2) // TODO: make configurable or handled at a higher level + case storage.IsInternalError(err): + return errors.NewInternalError(err) default: return err } @@ -56,6 +60,8 @@ func InterpretCreateError(err error, qualifiedResource schema.GroupResource, nam return errors.NewAlreadyExists(qualifiedResource, name) case storage.IsUnreachable(err): return errors.NewServerTimeout(qualifiedResource, "create", 2) // TODO: make configurable or handled at a higher level + case storage.IsInternalError(err): + return errors.NewInternalError(err) default: return err } @@ -102,6 +108,8 @@ func InterpretWatchError(err error, resource schema.GroupResource, name string) case storage.IsInvalidError(err): invalidError, _ := err.(storage.InvalidError) return errors.NewInvalid(schema.GroupKind{Group: resource.Group, Kind: resource.Resource}, name, invalidError.Errs) + case storage.IsInternalError(err): + return errors.NewInternalError(err) default: return err } diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 13a54b38b..22f7b20b7 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -31,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd/metrics" @@ -39,15 +40,33 @@ import ( etcdutil "k8s.io/apiserver/pkg/storage/etcd/util" ) +// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods +// must be able to undo the transformation caused by the other. +type ValueTransformer interface { + // TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error. + TransformStringFromStorage(string) (string, error) + // TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error. + TransformStringToStorage(string) (string, error) +} + +type identityTransformer struct{} + +func (identityTransformer) TransformStringFromStorage(s string) (string, error) { return s, nil } +func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil } + +// IdentityTransformer performs no transformation on the provided values. +var IdentityTransformer ValueTransformer = identityTransformer{} + // Creates a new storage interface from the client // TODO: deprecate in favor of storage.Config abstraction over time -func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier) storage.Interface { +func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier, transformer ValueTransformer) storage.Interface { return &etcdHelper{ etcdMembersAPI: etcd.NewMembersAPI(client), etcdKeysAPI: etcd.NewKeysAPI(client), codec: codec, versioner: APIObjectVersioner{}, copier: copier, + transformer: transformer, pathPrefix: path.Join("/", prefix), quorum: quorum, cache: utilcache.NewCache(cacheSize), @@ -60,6 +79,7 @@ type etcdHelper struct { etcdKeysAPI etcd.KeysAPI codec runtime.Codec copier runtime.ObjectCopier + transformer ValueTransformer // Note that versioner is required for etcdHelper to work correctly. // The public constructors (NewStorage & NewEtcdStorage) are setting it // correctly, so be careful when manipulating with it manually. @@ -112,7 +132,13 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob TTL: time.Duration(ttl) * time.Second, PrevExist: etcd.PrevNoExist, } - response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts) + + newBody, err := h.transformer.TransformStringToStorage(string(data)) + if err != nil { + return storage.NewInternalError(err.Error()) + } + + response, err := h.etcdKeysAPI.Set(ctx, key, newBody, &opts) trace.Step("Object created") metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) if err != nil { @@ -186,9 +212,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, startTime := time.Now() response, err := h.etcdKeysAPI.Delete(ctx, key, &opt) metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) - if etcdutil.IsEtcdTestFailed(err) { - glog.Infof("deletion of %s failed because of a conflict, going to retry", key) - } else { + if !etcdutil.IsEtcdTestFailed(err) { if !etcdutil.IsEtcdNotFound(err) { // if the object that existed prior to the delete is returned by etcd, update the out object. if err != nil || response.PrevNode != nil { @@ -197,6 +221,8 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, } return toStorageErr(err, key, 0) } + + glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key) } } @@ -210,7 +236,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri return nil, err } key = path.Join(h.pathPrefix, key) - w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h) + w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) return w, nil } @@ -225,7 +251,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion return nil, err } key = path.Join(h.pathPrefix, key) - w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h) + w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) return w, nil } @@ -282,7 +308,11 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run } return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response) } - body = node.Value + + body, err = h.transformer.TransformStringFromStorage(node.Value) + if err != nil { + return body, nil, storage.NewInternalError(err.Error()) + } out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr) if err != nil { return body, nil, err @@ -359,7 +389,14 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } } else { - obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) + body, err := h.transformer.TransformStringFromStorage(node.Value) + if err != nil { + // omit items from lists and watches that cannot be transformed, but log the error + utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err)) + continue + } + + obj, _, err := h.codec.Decode([]byte(body), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) if err != nil { return err } @@ -493,10 +530,15 @@ func (h *etcdHelper) GuaranteedUpdate( return errors.New("resourceVersion cannot be set on objects store in etcd") } - data, err := runtime.Encode(h.codec, ret) + newBodyData, err := runtime.Encode(h.codec, ret) if err != nil { return err } + newBody := string(newBodyData) + data, err := h.transformer.TransformStringToStorage(newBody) + if err != nil { + return storage.NewInternalError(err.Error()) + } // First time this key has been used, try creating new value. if index == 0 { @@ -505,7 +547,7 @@ func (h *etcdHelper) GuaranteedUpdate( TTL: time.Duration(ttl) * time.Second, PrevExist: etcd.PrevNoExist, } - response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts) + response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts) metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime) if etcdutil.IsEtcdNodeExist(err) { continue @@ -514,7 +556,7 @@ func (h *etcdHelper) GuaranteedUpdate( return toStorageErr(err, key, 0) } - if string(data) == origBody { + if newBody == origBody { // If we don't send an update, we simply return the currently existing // version of the object. _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false) @@ -527,7 +569,7 @@ func (h *etcdHelper) GuaranteedUpdate( PrevIndex: index, TTL: time.Duration(ttl) * time.Second, } - response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts) + response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime) if etcdutil.IsEtcdTestFailed(err) { // Try again. diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 1d7f52535..b0e3d19f0 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -17,8 +17,10 @@ limitations under the License. package etcd import ( + "fmt" "path" "reflect" + "strings" "sync" "testing" "time" @@ -33,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" @@ -42,6 +45,33 @@ import ( storagetests "k8s.io/apiserver/pkg/storage/tests" ) +// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. +type prefixTransformer struct { + prefix string + err error +} + +func (p prefixTransformer) TransformStringFromStorage(s string) (string, error) { + if !strings.HasPrefix(s, p.prefix) { + return "", fmt.Errorf("value does not have expected prefix: %s", s) + } + return strings.TrimPrefix(s, p.prefix), p.err +} +func (p prefixTransformer) TransformStringToStorage(s string) (string, error) { + if len(s) > 0 { + return p.prefix + s, p.err + } + return s, p.err +} + +func defaultPrefix(s string) string { + return "test!" + s +} + +func defaultPrefixValue(value []byte) string { + return defaultPrefix(string(value)) +} + func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) { scheme := runtime.NewScheme() scheme.Log(t) @@ -66,7 +96,7 @@ func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) { } func newEtcdHelper(client etcd.Client, scheme *runtime.Scheme, codec runtime.Codec, prefix string) etcdHelper { - return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme).(*etcdHelper) + return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme, prefixTransformer{prefix: "test!"}).(*etcdHelper) } // Returns an encoded version of example.Pod with the given name. @@ -135,6 +165,61 @@ func TestList(t *testing.T) { } } +func TestTransformationFailure(t *testing.T) { + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + server := etcdtesting.NewEtcdTestClientServer(t) + defer server.Terminate(t) + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) + + pods := []example.Pod{ + { + ObjectMeta: metav1.ObjectMeta{Name: "bar"}, + Spec: storagetests.DeepEqualSafePodSpec(), + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "baz"}, + Spec: storagetests.DeepEqualSafePodSpec(), + }, + } + createPodList(t, helper, &example.PodList{Items: pods[:1]}) + + // create a second resource with an invalid prefix + oldTransformer := helper.transformer + helper.transformer = prefixTransformer{prefix: "otherprefix!"} + createPodList(t, helper, &example.PodList{Items: pods[1:]}) + helper.transformer = oldTransformer + + // only the first item is returned, and no error + var got example.PodList + if err := helper.List(context.TODO(), "/", "", storage.Everything, &got); err != nil { + t.Errorf("Unexpected error %v", err) + } + if e, a := pods[:1], got.Items; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a)) + } + + // Get should fail + if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + // GuaranteedUpdate should return an error + if err := helper.GuaranteedUpdate(context.TODO(), "/baz", &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { + return input, nil, nil + }, &pods[1]); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + + // Delete succeeds but reports an error because we cannot access the body + if err := helper.Delete(context.TODO(), "/baz", &example.Pod{}, nil); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + + if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsNotFound(err) { + t.Errorf("Unexpected error: %v", err) + } +} + func TestListFiltered(t *testing.T) { scheme, codecs := testScheme(t) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) @@ -540,7 +625,7 @@ func TestDeleteWithRetry(t *testing.T) { // party has updated the object. fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) { data, _ := runtime.Encode(codec, obj) - return &etcd.Response{Node: &etcd.Node{Value: string(data), ModifiedIndex: 99}}, nil + return &etcd.Response{Node: &etcd.Node{Value: defaultPrefixValue(data), ModifiedIndex: 99}}, nil } expectedRetries := 3 helper := newEtcdHelper(server.Client, scheme, codec, prefix) diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index a1fb30527..821a3705e 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -71,8 +71,9 @@ type etcdWatcher struct { // Note that versioner is required for etcdWatcher to work correctly. // There is no public constructor of it, so be careful when manipulating // with it manually. - versioner storage.Versioner - transform TransformFunc + versioner storage.Versioner + transform TransformFunc + valueTransformer ValueTransformer list bool // If we're doing a recursive watch, should be true. quorum bool // If we enable quorum, shoule be true @@ -107,15 +108,18 @@ const watchWaitDuration = 100 * time.Millisecond func newEtcdWatcher( list bool, quorum bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, + valueTransformer ValueTransformer, cache etcdCache) *etcdWatcher { w := &etcdWatcher{ - encoding: encoding, - versioner: versioner, - transform: transform, - list: list, - quorum: quorum, - include: include, - filter: filter, + encoding: encoding, + versioner: versioner, + transform: transform, + valueTransformer: valueTransformer, + + list: list, + quorum: quorum, + include: include, + filter: filter, // Buffer this channel, so that the etcd client is not forced // to context switch with every object it gets, and so that a // long time spent decoding an object won't block the *next* @@ -309,12 +313,18 @@ func (w *etcdWatcher) translate() { } } +// decodeObject extracts an object from the provided etcd node or returns an error. func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found { return obj, nil } - obj, err := runtime.Decode(w.encoding, []byte(node.Value)) + body, err := w.valueTransformer.TransformStringFromStorage(node.Value) + if err != nil { + return nil, err + } + + obj, err := runtime.Decode(w.encoding, []byte(body)) if err != nil { return nil, err } diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index b296c1494..d2fe49acf 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -133,7 +133,7 @@ func TestWatchInterpretations(t *testing.T) { } for name, item := range table { for _, action := range item.actions { - w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) emitCalled := false w.emit = func(event watch.Event) { emitCalled = true @@ -150,10 +150,10 @@ func TestWatchInterpretations(t *testing.T) { var n, pn *etcd.Node if item.nodeValue != "" { - n = &etcd.Node{Value: item.nodeValue} + n = &etcd.Node{Value: defaultPrefix(item.nodeValue)} } if item.prevNodeValue != "" { - pn = &etcd.Node{Value: item.prevNodeValue} + pn = &etcd.Node{Value: defaultPrefix(item.prevNodeValue)} } w.sendResult(&etcd.Response{ @@ -173,7 +173,7 @@ func TestWatchInterpretations(t *testing.T) { func TestWatchInterpretation_ResponseNotSet(t *testing.T) { _, codecs := testScheme(t) codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) - w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -189,7 +189,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) { codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -205,20 +205,20 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } w.sendResult(&etcd.Response{ Action: action, Node: &etcd.Node{ - Value: "foobar", + Value: defaultPrefix("foobar"), }, }) w.sendResult(&etcd.Response{ Action: action, PrevNode: &etcd.Node{ - Value: "foobar", + Value: defaultPrefix("foobar"), }, }) w.Stop() @@ -231,7 +231,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { filter := func(obj runtime.Object) bool { return obj.(*example.Pod).Name != "bar" } - w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) eventChan := make(chan watch.Event, 1) w.emit = func(e watch.Event) { @@ -259,7 +259,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { ModifiedIndex: 2, }, PrevNode: &etcd.Node{ - Value: string(fooBytes), + Value: defaultPrefixValue(fooBytes), ModifiedIndex: 1, }, }, @@ -268,11 +268,11 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { response: &etcd.Response{ Action: EtcdSet, Node: &etcd.Node{ - Value: string(barBytes), + Value: defaultPrefixValue(barBytes), ModifiedIndex: 2, }, PrevNode: &etcd.Node{ - Value: string(fooBytes), + Value: defaultPrefixValue(fooBytes), ModifiedIndex: 1, }, }, diff --git a/pkg/storage/storagebackend/factory/etcd2.go b/pkg/storage/storagebackend/factory/etcd2.go index 78377c493..a96897ca1 100644 --- a/pkg/storage/storagebackend/factory/etcd2.go +++ b/pkg/storage/storagebackend/factory/etcd2.go @@ -39,7 +39,7 @@ func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e if err != nil { return nil, nil, err } - s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier) + s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier, etcd.IdentityTransformer) return s, tr.CloseIdleConnections, nil }