Allow values to be wrapped prior to serialization in etcd2

This adds a new value transformer to the etcd2 store that can transform
the value from etcd on read and write. This will allow the store to
implement encryption at rest or otherwise transform the value prior to
persistence.
This commit is contained in:
Clayton Coleman 2017-01-29 22:50:55 -05:00 committed by deads2k
parent 5598b36661
commit 679a2b25d7
6 changed files with 183 additions and 38 deletions

View File

@ -30,6 +30,8 @@ func InterpretListError(err error, qualifiedResource schema.GroupResource) error
return errors.NewNotFound(qualifiedResource, "") return errors.NewNotFound(qualifiedResource, "")
case storage.IsUnreachable(err): case storage.IsUnreachable(err):
return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level
case storage.IsInternalError(err):
return errors.NewInternalError(err)
default: default:
return err return err
} }
@ -43,6 +45,8 @@ func InterpretGetError(err error, qualifiedResource schema.GroupResource, name s
return errors.NewNotFound(qualifiedResource, name) return errors.NewNotFound(qualifiedResource, name)
case storage.IsUnreachable(err): case storage.IsUnreachable(err):
return errors.NewServerTimeout(qualifiedResource, "get", 2) // TODO: make configurable or handled at a higher level return errors.NewServerTimeout(qualifiedResource, "get", 2) // TODO: make configurable or handled at a higher level
case storage.IsInternalError(err):
return errors.NewInternalError(err)
default: default:
return err return err
} }
@ -56,6 +60,8 @@ func InterpretCreateError(err error, qualifiedResource schema.GroupResource, nam
return errors.NewAlreadyExists(qualifiedResource, name) return errors.NewAlreadyExists(qualifiedResource, name)
case storage.IsUnreachable(err): case storage.IsUnreachable(err):
return errors.NewServerTimeout(qualifiedResource, "create", 2) // TODO: make configurable or handled at a higher level return errors.NewServerTimeout(qualifiedResource, "create", 2) // TODO: make configurable or handled at a higher level
case storage.IsInternalError(err):
return errors.NewInternalError(err)
default: default:
return err return err
} }
@ -102,6 +108,8 @@ func InterpretWatchError(err error, resource schema.GroupResource, name string)
case storage.IsInvalidError(err): case storage.IsInvalidError(err):
invalidError, _ := err.(storage.InvalidError) invalidError, _ := err.(storage.InvalidError)
return errors.NewInvalid(schema.GroupKind{Group: resource.Group, Kind: resource.Resource}, name, invalidError.Errs) return errors.NewInvalid(schema.GroupKind{Group: resource.Group, Kind: resource.Resource}, name, invalidError.Errs)
case storage.IsInternalError(err):
return errors.NewInternalError(err)
default: default:
return err return err
} }

View File

@ -31,6 +31,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd/metrics" "k8s.io/apiserver/pkg/storage/etcd/metrics"
@ -39,15 +40,33 @@ import (
etcdutil "k8s.io/apiserver/pkg/storage/etcd/util" etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
) )
// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
// must be able to undo the transformation caused by the other.
type ValueTransformer interface {
// TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error.
TransformStringFromStorage(string) (string, error)
// TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error.
TransformStringToStorage(string) (string, error)
}
type identityTransformer struct{}
func (identityTransformer) TransformStringFromStorage(s string) (string, error) { return s, nil }
func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil }
// IdentityTransformer performs no transformation on the provided values.
var IdentityTransformer ValueTransformer = identityTransformer{}
// Creates a new storage interface from the client // Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time // TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier) storage.Interface { func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier, transformer ValueTransformer) storage.Interface {
return &etcdHelper{ return &etcdHelper{
etcdMembersAPI: etcd.NewMembersAPI(client), etcdMembersAPI: etcd.NewMembersAPI(client),
etcdKeysAPI: etcd.NewKeysAPI(client), etcdKeysAPI: etcd.NewKeysAPI(client),
codec: codec, codec: codec,
versioner: APIObjectVersioner{}, versioner: APIObjectVersioner{},
copier: copier, copier: copier,
transformer: transformer,
pathPrefix: path.Join("/", prefix), pathPrefix: path.Join("/", prefix),
quorum: quorum, quorum: quorum,
cache: utilcache.NewCache(cacheSize), cache: utilcache.NewCache(cacheSize),
@ -60,6 +79,7 @@ type etcdHelper struct {
etcdKeysAPI etcd.KeysAPI etcdKeysAPI etcd.KeysAPI
codec runtime.Codec codec runtime.Codec
copier runtime.ObjectCopier copier runtime.ObjectCopier
transformer ValueTransformer
// Note that versioner is required for etcdHelper to work correctly. // Note that versioner is required for etcdHelper to work correctly.
// The public constructors (NewStorage & NewEtcdStorage) are setting it // The public constructors (NewStorage & NewEtcdStorage) are setting it
// correctly, so be careful when manipulating with it manually. // correctly, so be careful when manipulating with it manually.
@ -112,7 +132,13 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
TTL: time.Duration(ttl) * time.Second, TTL: time.Duration(ttl) * time.Second,
PrevExist: etcd.PrevNoExist, PrevExist: etcd.PrevNoExist,
} }
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
newBody, err := h.transformer.TransformStringToStorage(string(data))
if err != nil {
return storage.NewInternalError(err.Error())
}
response, err := h.etcdKeysAPI.Set(ctx, key, newBody, &opts)
trace.Step("Object created") trace.Step("Object created")
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil { if err != nil {
@ -186,9 +212,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
startTime := time.Now() startTime := time.Now()
response, err := h.etcdKeysAPI.Delete(ctx, key, &opt) response, err := h.etcdKeysAPI.Delete(ctx, key, &opt)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if etcdutil.IsEtcdTestFailed(err) { if !etcdutil.IsEtcdTestFailed(err) {
glog.Infof("deletion of %s failed because of a conflict, going to retry", key)
} else {
if !etcdutil.IsEtcdNotFound(err) { if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update the out object. // if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response.PrevNode != nil { if err != nil || response.PrevNode != nil {
@ -197,6 +221,8 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
} }
return toStorageErr(err, key, 0) return toStorageErr(err, key, 0)
} }
glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
} }
} }
@ -210,7 +236,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
return nil, err return nil, err
} }
key = path.Join(h.pathPrefix, key) key = path.Join(h.pathPrefix, key)
w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h) w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil return w, nil
} }
@ -225,7 +251,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
return nil, err return nil, err
} }
key = path.Join(h.pathPrefix, key) key = path.Join(h.pathPrefix, key)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h) w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil return w, nil
} }
@ -282,7 +308,11 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
} }
return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response) return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
} }
body = node.Value
body, err = h.transformer.TransformStringFromStorage(node.Value)
if err != nil {
return body, nil, storage.NewInternalError(err.Error())
}
out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr) out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
if err != nil { if err != nil {
return body, nil, err return body, nil, err
@ -359,7 +389,14 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} }
} else { } else {
obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) body, err := h.transformer.TransformStringFromStorage(node.Value)
if err != nil {
// omit items from lists and watches that cannot be transformed, but log the error
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err))
continue
}
obj, _, err := h.codec.Decode([]byte(body), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err != nil { if err != nil {
return err return err
} }
@ -493,10 +530,15 @@ func (h *etcdHelper) GuaranteedUpdate(
return errors.New("resourceVersion cannot be set on objects store in etcd") return errors.New("resourceVersion cannot be set on objects store in etcd")
} }
data, err := runtime.Encode(h.codec, ret) newBodyData, err := runtime.Encode(h.codec, ret)
if err != nil { if err != nil {
return err return err
} }
newBody := string(newBodyData)
data, err := h.transformer.TransformStringToStorage(newBody)
if err != nil {
return storage.NewInternalError(err.Error())
}
// First time this key has been used, try creating new value. // First time this key has been used, try creating new value.
if index == 0 { if index == 0 {
@ -505,7 +547,7 @@ func (h *etcdHelper) GuaranteedUpdate(
TTL: time.Duration(ttl) * time.Second, TTL: time.Duration(ttl) * time.Second,
PrevExist: etcd.PrevNoExist, PrevExist: etcd.PrevNoExist,
} }
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts) response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime) metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
if etcdutil.IsEtcdNodeExist(err) { if etcdutil.IsEtcdNodeExist(err) {
continue continue
@ -514,7 +556,7 @@ func (h *etcdHelper) GuaranteedUpdate(
return toStorageErr(err, key, 0) return toStorageErr(err, key, 0)
} }
if string(data) == origBody { if newBody == origBody {
// If we don't send an update, we simply return the currently existing // If we don't send an update, we simply return the currently existing
// version of the object. // version of the object.
_, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false) _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
@ -527,7 +569,7 @@ func (h *etcdHelper) GuaranteedUpdate(
PrevIndex: index, PrevIndex: index,
TTL: time.Duration(ttl) * time.Second, TTL: time.Duration(ttl) * time.Second,
} }
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts) response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
if etcdutil.IsEtcdTestFailed(err) { if etcdutil.IsEtcdTestFailed(err) {
// Try again. // Try again.

View File

@ -17,8 +17,10 @@ limitations under the License.
package etcd package etcd
import ( import (
"fmt"
"path" "path"
"reflect" "reflect"
"strings"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -33,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1" examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
@ -42,6 +45,33 @@ import (
storagetests "k8s.io/apiserver/pkg/storage/tests" storagetests "k8s.io/apiserver/pkg/storage/tests"
) )
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
type prefixTransformer struct {
prefix string
err error
}
func (p prefixTransformer) TransformStringFromStorage(s string) (string, error) {
if !strings.HasPrefix(s, p.prefix) {
return "", fmt.Errorf("value does not have expected prefix: %s", s)
}
return strings.TrimPrefix(s, p.prefix), p.err
}
func (p prefixTransformer) TransformStringToStorage(s string) (string, error) {
if len(s) > 0 {
return p.prefix + s, p.err
}
return s, p.err
}
func defaultPrefix(s string) string {
return "test!" + s
}
func defaultPrefixValue(value []byte) string {
return defaultPrefix(string(value))
}
func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) { func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) {
scheme := runtime.NewScheme() scheme := runtime.NewScheme()
scheme.Log(t) scheme.Log(t)
@ -66,7 +96,7 @@ func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) {
} }
func newEtcdHelper(client etcd.Client, scheme *runtime.Scheme, codec runtime.Codec, prefix string) etcdHelper { func newEtcdHelper(client etcd.Client, scheme *runtime.Scheme, codec runtime.Codec, prefix string) etcdHelper {
return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme).(*etcdHelper) return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme, prefixTransformer{prefix: "test!"}).(*etcdHelper)
} }
// Returns an encoded version of example.Pod with the given name. // Returns an encoded version of example.Pod with the given name.
@ -135,6 +165,61 @@ func TestList(t *testing.T) {
} }
} }
func TestTransformationFailure(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
pods := []example.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
}
createPodList(t, helper, &example.PodList{Items: pods[:1]})
// create a second resource with an invalid prefix
oldTransformer := helper.transformer
helper.transformer = prefixTransformer{prefix: "otherprefix!"}
createPodList(t, helper, &example.PodList{Items: pods[1:]})
helper.transformer = oldTransformer
// only the first item is returned, and no error
var got example.PodList
if err := helper.List(context.TODO(), "/", "", storage.Everything, &got); err != nil {
t.Errorf("Unexpected error %v", err)
}
if e, a := pods[:1], got.Items; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a))
}
// Get should fail
if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
// GuaranteedUpdate should return an error
if err := helper.GuaranteedUpdate(context.TODO(), "/baz", &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
return input, nil, nil
}, &pods[1]); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
// Delete succeeds but reports an error because we cannot access the body
if err := helper.Delete(context.TODO(), "/baz", &example.Pod{}, nil); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsNotFound(err) {
t.Errorf("Unexpected error: %v", err)
}
}
func TestListFiltered(t *testing.T) { func TestListFiltered(t *testing.T) {
scheme, codecs := testScheme(t) scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
@ -540,7 +625,7 @@ func TestDeleteWithRetry(t *testing.T) {
// party has updated the object. // party has updated the object.
fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) { fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
data, _ := runtime.Encode(codec, obj) data, _ := runtime.Encode(codec, obj)
return &etcd.Response{Node: &etcd.Node{Value: string(data), ModifiedIndex: 99}}, nil return &etcd.Response{Node: &etcd.Node{Value: defaultPrefixValue(data), ModifiedIndex: 99}}, nil
} }
expectedRetries := 3 expectedRetries := 3
helper := newEtcdHelper(server.Client, scheme, codec, prefix) helper := newEtcdHelper(server.Client, scheme, codec, prefix)

View File

@ -73,6 +73,7 @@ type etcdWatcher struct {
// with it manually. // with it manually.
versioner storage.Versioner versioner storage.Versioner
transform TransformFunc transform TransformFunc
valueTransformer ValueTransformer
list bool // If we're doing a recursive watch, should be true. list bool // If we're doing a recursive watch, should be true.
quorum bool // If we enable quorum, shoule be true quorum bool // If we enable quorum, shoule be true
@ -107,11 +108,14 @@ const watchWaitDuration = 100 * time.Millisecond
func newEtcdWatcher( func newEtcdWatcher(
list bool, quorum bool, include includeFunc, filter storage.FilterFunc, list bool, quorum bool, include includeFunc, filter storage.FilterFunc,
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
valueTransformer ValueTransformer,
cache etcdCache) *etcdWatcher { cache etcdCache) *etcdWatcher {
w := &etcdWatcher{ w := &etcdWatcher{
encoding: encoding, encoding: encoding,
versioner: versioner, versioner: versioner,
transform: transform, transform: transform,
valueTransformer: valueTransformer,
list: list, list: list,
quorum: quorum, quorum: quorum,
include: include, include: include,
@ -309,12 +313,18 @@ func (w *etcdWatcher) translate() {
} }
} }
// decodeObject extracts an object from the provided etcd node or returns an error.
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found { if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found {
return obj, nil return obj, nil
} }
obj, err := runtime.Decode(w.encoding, []byte(node.Value)) body, err := w.valueTransformer.TransformStringFromStorage(node.Value)
if err != nil {
return nil, err
}
obj, err := runtime.Decode(w.encoding, []byte(body))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -133,7 +133,7 @@ func TestWatchInterpretations(t *testing.T) {
} }
for name, item := range table { for name, item := range table {
for _, action := range item.actions { for _, action := range item.actions {
w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
emitCalled := false emitCalled := false
w.emit = func(event watch.Event) { w.emit = func(event watch.Event) {
emitCalled = true emitCalled = true
@ -150,10 +150,10 @@ func TestWatchInterpretations(t *testing.T) {
var n, pn *etcd.Node var n, pn *etcd.Node
if item.nodeValue != "" { if item.nodeValue != "" {
n = &etcd.Node{Value: item.nodeValue} n = &etcd.Node{Value: defaultPrefix(item.nodeValue)}
} }
if item.prevNodeValue != "" { if item.prevNodeValue != "" {
pn = &etcd.Node{Value: item.prevNodeValue} pn = &etcd.Node{Value: defaultPrefix(item.prevNodeValue)}
} }
w.sendResult(&etcd.Response{ w.sendResult(&etcd.Response{
@ -173,7 +173,7 @@ func TestWatchInterpretations(t *testing.T) {
func TestWatchInterpretation_ResponseNotSet(t *testing.T) { func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
_, codecs := testScheme(t) _, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w.emit = func(e watch.Event) { w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e) t.Errorf("Unexpected emit: %v", e)
} }
@ -189,7 +189,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
actions := []string{"create", "set", "compareAndSwap", "delete"} actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions { for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w.emit = func(e watch.Event) { w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e) t.Errorf("Unexpected emit: %v", e)
} }
@ -205,20 +205,20 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
actions := []string{"create", "set", "compareAndSwap", "delete"} actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions { for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w.emit = func(e watch.Event) { w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e) t.Errorf("Unexpected emit: %v", e)
} }
w.sendResult(&etcd.Response{ w.sendResult(&etcd.Response{
Action: action, Action: action,
Node: &etcd.Node{ Node: &etcd.Node{
Value: "foobar", Value: defaultPrefix("foobar"),
}, },
}) })
w.sendResult(&etcd.Response{ w.sendResult(&etcd.Response{
Action: action, Action: action,
PrevNode: &etcd.Node{ PrevNode: &etcd.Node{
Value: "foobar", Value: defaultPrefix("foobar"),
}, },
}) })
w.Stop() w.Stop()
@ -231,7 +231,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
filter := func(obj runtime.Object) bool { filter := func(obj runtime.Object) bool {
return obj.(*example.Pod).Name != "bar" return obj.(*example.Pod).Name != "bar"
} }
w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
eventChan := make(chan watch.Event, 1) eventChan := make(chan watch.Event, 1)
w.emit = func(e watch.Event) { w.emit = func(e watch.Event) {
@ -259,7 +259,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
ModifiedIndex: 2, ModifiedIndex: 2,
}, },
PrevNode: &etcd.Node{ PrevNode: &etcd.Node{
Value: string(fooBytes), Value: defaultPrefixValue(fooBytes),
ModifiedIndex: 1, ModifiedIndex: 1,
}, },
}, },
@ -268,11 +268,11 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
response: &etcd.Response{ response: &etcd.Response{
Action: EtcdSet, Action: EtcdSet,
Node: &etcd.Node{ Node: &etcd.Node{
Value: string(barBytes), Value: defaultPrefixValue(barBytes),
ModifiedIndex: 2, ModifiedIndex: 2,
}, },
PrevNode: &etcd.Node{ PrevNode: &etcd.Node{
Value: string(fooBytes), Value: defaultPrefixValue(fooBytes),
ModifiedIndex: 1, ModifiedIndex: 1,
}, },
}, },

View File

@ -39,7 +39,7 @@ func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier) s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier, etcd.IdentityTransformer)
return s, tr.CloseIdleConnections, nil return s, tr.CloseIdleConnections, nil
} }