apimachinery: mechanical removal of ObjectCopier plumbing
Kubernetes-commit: 509df603b18d356777176953e5d160b6f3d0bba9
This commit is contained in:
parent
dff7812868
commit
3cfc602704
|
|
@ -283,7 +283,6 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
|
||||||
|
|
||||||
Creater: scheme,
|
Creater: scheme,
|
||||||
Convertor: scheme,
|
Convertor: scheme,
|
||||||
Copier: scheme,
|
|
||||||
Defaulter: scheme,
|
Defaulter: scheme,
|
||||||
Typer: scheme,
|
Typer: scheme,
|
||||||
Linker: selfLinker,
|
Linker: selfLinker,
|
||||||
|
|
@ -3258,7 +3257,6 @@ func TestParentResourceIsRequired(t *testing.T) {
|
||||||
Root: "/" + prefix,
|
Root: "/" + prefix,
|
||||||
Creater: scheme,
|
Creater: scheme,
|
||||||
Convertor: scheme,
|
Convertor: scheme,
|
||||||
Copier: scheme,
|
|
||||||
Defaulter: scheme,
|
Defaulter: scheme,
|
||||||
Typer: scheme,
|
Typer: scheme,
|
||||||
Linker: selfLinker,
|
Linker: selfLinker,
|
||||||
|
|
@ -3290,7 +3288,6 @@ func TestParentResourceIsRequired(t *testing.T) {
|
||||||
Root: "/" + prefix,
|
Root: "/" + prefix,
|
||||||
Creater: scheme,
|
Creater: scheme,
|
||||||
Convertor: scheme,
|
Convertor: scheme,
|
||||||
Copier: scheme,
|
|
||||||
Defaulter: scheme,
|
Defaulter: scheme,
|
||||||
Typer: scheme,
|
Typer: scheme,
|
||||||
Linker: selfLinker,
|
Linker: selfLinker,
|
||||||
|
|
@ -3902,7 +3899,6 @@ func TestXGSubresource(t *testing.T) {
|
||||||
|
|
||||||
Creater: scheme,
|
Creater: scheme,
|
||||||
Convertor: scheme,
|
Convertor: scheme,
|
||||||
Copier: scheme,
|
|
||||||
Defaulter: scheme,
|
Defaulter: scheme,
|
||||||
Typer: scheme,
|
Typer: scheme,
|
||||||
Linker: selfLinker,
|
Linker: selfLinker,
|
||||||
|
|
|
||||||
|
|
@ -66,7 +66,6 @@ type APIGroupVersion struct {
|
||||||
Typer runtime.ObjectTyper
|
Typer runtime.ObjectTyper
|
||||||
Creater runtime.ObjectCreater
|
Creater runtime.ObjectCreater
|
||||||
Convertor runtime.ObjectConvertor
|
Convertor runtime.ObjectConvertor
|
||||||
Copier runtime.ObjectCopier
|
|
||||||
Defaulter runtime.ObjectDefaulter
|
Defaulter runtime.ObjectDefaulter
|
||||||
Linker runtime.SelfLinker
|
Linker runtime.SelfLinker
|
||||||
UnsafeConvertor runtime.ObjectConvertor
|
UnsafeConvertor runtime.ObjectConvertor
|
||||||
|
|
|
||||||
|
|
@ -63,7 +63,6 @@ type RequestScope struct {
|
||||||
Creater runtime.ObjectCreater
|
Creater runtime.ObjectCreater
|
||||||
Convertor runtime.ObjectConvertor
|
Convertor runtime.ObjectConvertor
|
||||||
Defaulter runtime.ObjectDefaulter
|
Defaulter runtime.ObjectDefaulter
|
||||||
Copier runtime.ObjectCopier
|
|
||||||
Typer runtime.ObjectTyper
|
Typer runtime.ObjectTyper
|
||||||
UnsafeConvertor runtime.ObjectConvertor
|
UnsafeConvertor runtime.ObjectConvertor
|
||||||
|
|
||||||
|
|
@ -594,7 +593,7 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := patchResource(ctx, updateAdmit, timeout, versionedObj, r, name, patchType, patchJS,
|
result, err := patchResource(ctx, updateAdmit, timeout, versionedObj, r, name, patchType, patchJS,
|
||||||
scope.Namer, scope.Copier, scope.Creater, scope.Defaulter, scope.UnsafeConvertor, scope.Kind, scope.Resource, codec)
|
scope.Namer, scope.Creater, scope.Defaulter, scope.UnsafeConvertor, scope.Kind, scope.Resource, codec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
scope.err(err, w, req)
|
scope.err(err, w, req)
|
||||||
return
|
return
|
||||||
|
|
@ -627,7 +626,6 @@ func patchResource(
|
||||||
patchType types.PatchType,
|
patchType types.PatchType,
|
||||||
patchJS []byte,
|
patchJS []byte,
|
||||||
namer ScopeNamer,
|
namer ScopeNamer,
|
||||||
copier runtime.ObjectCopier,
|
|
||||||
creater runtime.ObjectCreater,
|
creater runtime.ObjectCreater,
|
||||||
defaulter runtime.ObjectDefaulter,
|
defaulter runtime.ObjectDefaulter,
|
||||||
unsafeConvertor runtime.ObjectConvertor,
|
unsafeConvertor runtime.ObjectConvertor,
|
||||||
|
|
@ -837,7 +835,7 @@ func patchResource(
|
||||||
return patchedObject, admit(patchedObject, currentObject)
|
return patchedObject, admit(patchedObject, currentObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
updatedObjectInfo := rest.DefaultUpdatedObjectInfo(nil, copier, applyPatch, applyAdmission)
|
updatedObjectInfo := rest.DefaultUpdatedObjectInfo(nil, applyPatch, applyAdmission)
|
||||||
|
|
||||||
return finishRequest(timeout, func() (runtime.Object, error) {
|
return finishRequest(timeout, func() (runtime.Object, error) {
|
||||||
updateObject, _, updateErr := patcher.Update(ctx, name, updatedObjectInfo)
|
updateObject, _, updateErr := patcher.Update(ctx, name, updatedObjectInfo)
|
||||||
|
|
@ -914,7 +912,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
|
||||||
trace.Step("About to store object in database")
|
trace.Step("About to store object in database")
|
||||||
wasCreated := false
|
wasCreated := false
|
||||||
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
||||||
obj, created, err := r.Update(ctx, name, rest.DefaultUpdatedObjectInfo(obj, scope.Copier, transformers...))
|
obj, created, err := r.Update(ctx, name, rest.DefaultUpdatedObjectInfo(obj, transformers...))
|
||||||
wasCreated = created
|
wasCreated = created
|
||||||
return obj, err
|
return obj, err
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -212,7 +212,6 @@ func (tc *patchTestCase) Run(t *testing.T) {
|
||||||
ctx = request.WithNamespace(ctx, namespace)
|
ctx = request.WithNamespace(ctx, namespace)
|
||||||
|
|
||||||
namer := &testNamer{namespace, name}
|
namer := &testNamer{namespace, name}
|
||||||
copier := runtime.ObjectCopier(scheme)
|
|
||||||
creater := runtime.ObjectCreater(scheme)
|
creater := runtime.ObjectCreater(scheme)
|
||||||
defaulter := runtime.ObjectDefaulter(scheme)
|
defaulter := runtime.ObjectDefaulter(scheme)
|
||||||
convertor := runtime.UnsafeObjectConvertor(scheme)
|
convertor := runtime.UnsafeObjectConvertor(scheme)
|
||||||
|
|
@ -266,7 +265,7 @@ func (tc *patchTestCase) Run(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
resultObj, err := patchResource(ctx, admit, 1*time.Second, versionedObj, testPatcher, name, patchType, patch, namer, copier, creater, defaulter, convertor, kind, resource, codec)
|
resultObj, err := patchResource(ctx, admit, 1*time.Second, versionedObj, testPatcher, name, patchType, patch, namer, creater, defaulter, convertor, kind, resource, codec)
|
||||||
if len(tc.expectedError) != 0 {
|
if len(tc.expectedError) != 0 {
|
||||||
if err == nil || err.Error() != tc.expectedError {
|
if err == nil || err.Error() != tc.expectedError {
|
||||||
t.Errorf("%s: expected error %v, but got %v", tc.name, tc.expectedError, err)
|
t.Errorf("%s: expected error %v, but got %v", tc.name, tc.expectedError, err)
|
||||||
|
|
|
||||||
|
|
@ -532,7 +532,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||||
ParameterCodec: a.group.ParameterCodec,
|
ParameterCodec: a.group.ParameterCodec,
|
||||||
Creater: a.group.Creater,
|
Creater: a.group.Creater,
|
||||||
Convertor: a.group.Convertor,
|
Convertor: a.group.Convertor,
|
||||||
Copier: a.group.Copier,
|
|
||||||
Defaulter: a.group.Defaulter,
|
Defaulter: a.group.Defaulter,
|
||||||
Typer: a.group.Typer,
|
Typer: a.group.Typer,
|
||||||
UnsafeConvertor: a.group.UnsafeConvertor,
|
UnsafeConvertor: a.group.UnsafeConvertor,
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,6 @@ import (
|
||||||
// Creates a cacher based given storageConfig.
|
// Creates a cacher based given storageConfig.
|
||||||
func StorageWithCacher(capacity int) generic.StorageDecorator {
|
func StorageWithCacher(capacity int) generic.StorageDecorator {
|
||||||
return func(
|
return func(
|
||||||
copier runtime.ObjectCopier,
|
|
||||||
storageConfig *storagebackend.Config,
|
storageConfig *storagebackend.Config,
|
||||||
objectType runtime.Object,
|
objectType runtime.Object,
|
||||||
resourcePrefix string,
|
resourcePrefix string,
|
||||||
|
|
@ -52,7 +51,6 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
|
||||||
CacheCapacity: capacity,
|
CacheCapacity: capacity,
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Versioner: etcdstorage.APIObjectVersioner{},
|
Versioner: etcdstorage.APIObjectVersioner{},
|
||||||
Copier: copier,
|
|
||||||
Type: objectType,
|
Type: objectType,
|
||||||
ResourcePrefix: resourcePrefix,
|
ResourcePrefix: resourcePrefix,
|
||||||
KeyFunc: keyFunc,
|
KeyFunc: keyFunc,
|
||||||
|
|
|
||||||
|
|
@ -76,9 +76,6 @@ type GenericStore interface {
|
||||||
//
|
//
|
||||||
// TODO: make the default exposed methods exactly match a generic RESTStorage
|
// TODO: make the default exposed methods exactly match a generic RESTStorage
|
||||||
type Store struct {
|
type Store struct {
|
||||||
// Copier is used to make some storage caching decorators work
|
|
||||||
Copier runtime.ObjectCopier
|
|
||||||
|
|
||||||
// NewFunc returns a new instance of the type this registry returns for a
|
// NewFunc returns a new instance of the type this registry returns for a
|
||||||
// GET of a single object, e.g.:
|
// GET of a single object, e.g.:
|
||||||
//
|
//
|
||||||
|
|
@ -1331,7 +1328,6 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
|
||||||
|
|
||||||
if e.Storage == nil {
|
if e.Storage == nil {
|
||||||
e.Storage, e.DestroyFunc = opts.Decorator(
|
e.Storage, e.DestroyFunc = opts.Decorator(
|
||||||
e.Copier,
|
|
||||||
opts.StorageConfig,
|
opts.StorageConfig,
|
||||||
e.NewFunc(),
|
e.NewFunc(),
|
||||||
prefix,
|
prefix,
|
||||||
|
|
|
||||||
|
|
@ -440,7 +440,7 @@ func TestStoreCreateInitialized(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pod.Initializers = nil
|
pod.Initializers = nil
|
||||||
updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod, scheme))
|
updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
@ -533,7 +533,7 @@ func TestStoreCreateInitializedFailed(t *testing.T) {
|
||||||
}
|
}
|
||||||
pod.Initializers.Pending = nil
|
pod.Initializers.Pending = nil
|
||||||
pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure, Code: 403, Reason: metav1.StatusReasonForbidden, Message: "induced failure"}
|
pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure, Code: 403, Reason: metav1.StatusReasonForbidden, Message: "induced failure"}
|
||||||
updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod, scheme))
|
updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
@ -574,7 +574,7 @@ func TestStoreCreateInitializedFailed(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateAndVerify(t *testing.T, ctx genericapirequest.Context, registry *Store, pod *example.Pod) bool {
|
func updateAndVerify(t *testing.T, ctx genericapirequest.Context, registry *Store, pod *example.Pod) bool {
|
||||||
obj, _, err := registry.Update(ctx, pod.Name, rest.DefaultUpdatedObjectInfo(pod, scheme))
|
obj, _, err := registry.Update(ctx, pod.Name, rest.DefaultUpdatedObjectInfo(pod))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
return false
|
return false
|
||||||
|
|
@ -610,7 +610,7 @@ func TestStoreUpdate(t *testing.T) {
|
||||||
defer destroyFunc()
|
defer destroyFunc()
|
||||||
|
|
||||||
// Test1 try to update a non-existing node
|
// Test1 try to update a non-existing node
|
||||||
_, _, err := registry.Update(testContext, podA.Name, rest.DefaultUpdatedObjectInfo(podA, scheme))
|
_, _, err := registry.Update(testContext, podA.Name, rest.DefaultUpdatedObjectInfo(podA))
|
||||||
if !errors.IsNotFound(err) {
|
if !errors.IsNotFound(err) {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -623,7 +623,7 @@ func TestStoreUpdate(t *testing.T) {
|
||||||
registry.UpdateStrategy.(*testRESTStrategy).allowCreateOnUpdate = false
|
registry.UpdateStrategy.(*testRESTStrategy).allowCreateOnUpdate = false
|
||||||
|
|
||||||
// Test3 outofDate
|
// Test3 outofDate
|
||||||
_, _, err = registry.Update(testContext, podAWithResourceVersion.Name, rest.DefaultUpdatedObjectInfo(podAWithResourceVersion, scheme))
|
_, _, err = registry.Update(testContext, podAWithResourceVersion.Name, rest.DefaultUpdatedObjectInfo(podAWithResourceVersion))
|
||||||
if !errors.IsConflict(err) {
|
if !errors.IsConflict(err) {
|
||||||
t.Errorf("Unexpected error updating podAWithResourceVersion: %v", err)
|
t.Errorf("Unexpected error updating podAWithResourceVersion: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -671,7 +671,7 @@ func TestNoOpUpdates(t *testing.T) {
|
||||||
|
|
||||||
var updateResult runtime.Object
|
var updateResult runtime.Object
|
||||||
p := newPod()
|
p := newPod()
|
||||||
if updateResult, _, err = registry.Update(genericapirequest.NewDefaultContext(), p.Name, rest.DefaultUpdatedObjectInfo(p, scheme)); err != nil {
|
if updateResult, _, err = registry.Update(genericapirequest.NewDefaultContext(), p.Name, rest.DefaultUpdatedObjectInfo(p)); err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -972,7 +972,7 @@ func TestGracefulStoreHandleFinalizers(t *testing.T) {
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", Finalizers: []string{"foo.com/x"}, ResourceVersion: podWithFinalizer.ObjectMeta.ResourceVersion},
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Finalizers: []string{"foo.com/x"}, ResourceVersion: podWithFinalizer.ObjectMeta.ResourceVersion},
|
||||||
Spec: example.PodSpec{NodeName: "machine"},
|
Spec: example.PodSpec{NodeName: "machine"},
|
||||||
}
|
}
|
||||||
_, _, err = registry.Update(testContext, updatedPodWithFinalizer.ObjectMeta.Name, rest.DefaultUpdatedObjectInfo(updatedPodWithFinalizer, scheme))
|
_, _, err = registry.Update(testContext, updatedPodWithFinalizer.ObjectMeta.Name, rest.DefaultUpdatedObjectInfo(updatedPodWithFinalizer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -987,7 +987,7 @@ func TestGracefulStoreHandleFinalizers(t *testing.T) {
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: podWithFinalizer.ObjectMeta.ResourceVersion},
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: podWithFinalizer.ObjectMeta.ResourceVersion},
|
||||||
Spec: example.PodSpec{NodeName: "anothermachine"},
|
Spec: example.PodSpec{NodeName: "anothermachine"},
|
||||||
}
|
}
|
||||||
_, _, err = registry.Update(testContext, podWithFinalizer.ObjectMeta.Name, rest.DefaultUpdatedObjectInfo(podWithNoFinalizer, scheme))
|
_, _, err = registry.Update(testContext, podWithFinalizer.ObjectMeta.Name, rest.DefaultUpdatedObjectInfo(podWithNoFinalizer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1024,7 +1024,7 @@ func TestFailedInitializationStoreUpdate(t *testing.T) {
|
||||||
|
|
||||||
// update the pod with initialization failure, the pod should be deleted
|
// update the pod with initialization failure, the pod should be deleted
|
||||||
pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure}
|
pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure}
|
||||||
result, _, err := registry.Update(testContext, podInitializing.Name, rest.DefaultUpdatedObjectInfo(pod, scheme))
|
result, _, err := registry.Update(testContext, podInitializing.Name, rest.DefaultUpdatedObjectInfo(pod))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1092,7 +1092,7 @@ func TestNonGracefulStoreHandleFinalizers(t *testing.T) {
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", Finalizers: []string{"foo.com/x"}, ResourceVersion: podWithFinalizer.ObjectMeta.ResourceVersion},
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Finalizers: []string{"foo.com/x"}, ResourceVersion: podWithFinalizer.ObjectMeta.ResourceVersion},
|
||||||
Spec: example.PodSpec{NodeName: "machine"},
|
Spec: example.PodSpec{NodeName: "machine"},
|
||||||
}
|
}
|
||||||
_, _, err = registry.Update(testContext, updatedPodWithFinalizer.ObjectMeta.Name, rest.DefaultUpdatedObjectInfo(updatedPodWithFinalizer, scheme))
|
_, _, err = registry.Update(testContext, updatedPodWithFinalizer.ObjectMeta.Name, rest.DefaultUpdatedObjectInfo(updatedPodWithFinalizer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1111,7 +1111,7 @@ func TestNonGracefulStoreHandleFinalizers(t *testing.T) {
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: podWithFinalizer.ObjectMeta.ResourceVersion},
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: podWithFinalizer.ObjectMeta.ResourceVersion},
|
||||||
Spec: example.PodSpec{NodeName: "anothermachine"},
|
Spec: example.PodSpec{NodeName: "anothermachine"},
|
||||||
}
|
}
|
||||||
_, _, err = registry.Update(testContext, podWithFinalizer.ObjectMeta.Name, rest.DefaultUpdatedObjectInfo(podWithNoFinalizer, scheme))
|
_, _, err = registry.Update(testContext, podWithFinalizer.ObjectMeta.Name, rest.DefaultUpdatedObjectInfo(podWithNoFinalizer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1814,7 +1814,6 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
|
||||||
CacheCapacity: 10,
|
CacheCapacity: 10,
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Versioner: etcdstorage.APIObjectVersioner{},
|
Versioner: etcdstorage.APIObjectVersioner{},
|
||||||
Copier: scheme,
|
|
||||||
Type: &example.Pod{},
|
Type: &example.Pod{},
|
||||||
ResourcePrefix: podPrefix,
|
ResourcePrefix: podPrefix,
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
|
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
|
||||||
|
|
@ -1832,7 +1831,6 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
|
||||||
}
|
}
|
||||||
|
|
||||||
return destroyFunc, &Store{
|
return destroyFunc, &Store{
|
||||||
Copier: scheme,
|
|
||||||
NewFunc: func() runtime.Object { return &example.Pod{} },
|
NewFunc: func() runtime.Object { return &example.Pod{} },
|
||||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||||
DefaultQualifiedResource: example.Resource("pods"),
|
DefaultQualifiedResource: example.Resource("pods"),
|
||||||
|
|
@ -1924,7 +1922,7 @@ func TestQualifiedResource(t *testing.T) {
|
||||||
defer destroyFunc()
|
defer destroyFunc()
|
||||||
|
|
||||||
// update a non-exist object
|
// update a non-exist object
|
||||||
_, _, err := registry.Update(testContext, podA.Name, rest.DefaultUpdatedObjectInfo(podA, scheme))
|
_, _, err := registry.Update(testContext, podA.Name, rest.DefaultUpdatedObjectInfo(podA))
|
||||||
if !errors.IsNotFound(err) {
|
if !errors.IsNotFound(err) {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,6 @@ import (
|
||||||
// StorageDecorator is a function signature for producing a storage.Interface
|
// StorageDecorator is a function signature for producing a storage.Interface
|
||||||
// and an associated DestroyFunc from given parameters.
|
// and an associated DestroyFunc from given parameters.
|
||||||
type StorageDecorator func(
|
type StorageDecorator func(
|
||||||
copier runtime.ObjectCopier,
|
|
||||||
config *storagebackend.Config,
|
config *storagebackend.Config,
|
||||||
objectType runtime.Object,
|
objectType runtime.Object,
|
||||||
resourcePrefix string,
|
resourcePrefix string,
|
||||||
|
|
@ -39,7 +38,6 @@ type StorageDecorator func(
|
||||||
// UndecoratedStorage returns the given a new storage from the given config
|
// UndecoratedStorage returns the given a new storage from the given config
|
||||||
// without any decoration.
|
// without any decoration.
|
||||||
func UndecoratedStorage(
|
func UndecoratedStorage(
|
||||||
copier runtime.ObjectCopier,
|
|
||||||
config *storagebackend.Config,
|
config *storagebackend.Config,
|
||||||
objectType runtime.Object,
|
objectType runtime.Object,
|
||||||
resourcePrefix string,
|
resourcePrefix string,
|
||||||
|
|
|
||||||
|
|
@ -464,7 +464,7 @@ func (t *Tester) testUpdateEquals(obj runtime.Object, createFn CreateFunc, getFn
|
||||||
}
|
}
|
||||||
toUpdate = updateFn(toUpdate)
|
toUpdate = updateFn(toUpdate)
|
||||||
toUpdateMeta := t.getObjectMetaOrFail(toUpdate)
|
toUpdateMeta := t.getObjectMetaOrFail(toUpdate)
|
||||||
updated, created, err := t.storage.(rest.Updater).Update(ctx, toUpdateMeta.GetName(), rest.DefaultUpdatedObjectInfo(toUpdate, t.scheme))
|
updated, created, err := t.storage.(rest.Updater).Update(ctx, toUpdateMeta.GetName(), rest.DefaultUpdatedObjectInfo(toUpdate))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -504,7 +504,7 @@ func (t *Tester) testUpdateFailsOnVersionTooOld(obj runtime.Object, createFn Cre
|
||||||
olderMeta := t.getObjectMetaOrFail(older)
|
olderMeta := t.getObjectMetaOrFail(older)
|
||||||
olderMeta.SetResourceVersion("1")
|
olderMeta.SetResourceVersion("1")
|
||||||
|
|
||||||
_, _, err = t.storage.(rest.Updater).Update(t.TestContext(), olderMeta.GetName(), rest.DefaultUpdatedObjectInfo(older, t.scheme))
|
_, _, err = t.storage.(rest.Updater).Update(t.TestContext(), olderMeta.GetName(), rest.DefaultUpdatedObjectInfo(older))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("Expected an error, but we didn't get one")
|
t.Errorf("Expected an error, but we didn't get one")
|
||||||
} else if !errors.IsConflict(err) {
|
} else if !errors.IsConflict(err) {
|
||||||
|
|
@ -524,7 +524,7 @@ func (t *Tester) testUpdateInvokesValidation(obj runtime.Object, createFn Create
|
||||||
for _, update := range invalidUpdateFn {
|
for _, update := range invalidUpdateFn {
|
||||||
toUpdate := update(foo.DeepCopyObject())
|
toUpdate := update(foo.DeepCopyObject())
|
||||||
toUpdateMeta := t.getObjectMetaOrFail(toUpdate)
|
toUpdateMeta := t.getObjectMetaOrFail(toUpdate)
|
||||||
got, created, err := t.storage.(rest.Updater).Update(t.TestContext(), toUpdateMeta.GetName(), rest.DefaultUpdatedObjectInfo(toUpdate, t.scheme))
|
got, created, err := t.storage.(rest.Updater).Update(t.TestContext(), toUpdateMeta.GetName(), rest.DefaultUpdatedObjectInfo(toUpdate))
|
||||||
if got != nil || created {
|
if got != nil || created {
|
||||||
t.Errorf("expected nil object and no creation for object: %v", toUpdate)
|
t.Errorf("expected nil object and no creation for object: %v", toUpdate)
|
||||||
}
|
}
|
||||||
|
|
@ -545,7 +545,7 @@ func (t *Tester) testUpdateWithWrongUID(obj runtime.Object, createFn CreateFunc,
|
||||||
}
|
}
|
||||||
objectMeta.SetUID(types.UID("UID1111"))
|
objectMeta.SetUID(types.UID("UID1111"))
|
||||||
|
|
||||||
obj, created, err := t.storage.(rest.Updater).Update(ctx, objectMeta.GetName(), rest.DefaultUpdatedObjectInfo(foo, t.scheme))
|
obj, created, err := t.storage.(rest.Updater).Update(ctx, objectMeta.GetName(), rest.DefaultUpdatedObjectInfo(foo))
|
||||||
if created || obj != nil {
|
if created || obj != nil {
|
||||||
t.Errorf("expected nil object and no creation for object: %v", foo)
|
t.Errorf("expected nil object and no creation for object: %v", foo)
|
||||||
}
|
}
|
||||||
|
|
@ -589,7 +589,7 @@ func (t *Tester) testUpdateRetrievesOldObject(obj runtime.Object, createFn Creat
|
||||||
return updatedObject, nil
|
return updatedObject, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
updatedObj, created, err := t.storage.(rest.Updater).Update(ctx, objectMeta.GetName(), rest.DefaultUpdatedObjectInfo(storedFooWithUpdates, t.scheme, noopTransform))
|
updatedObj, created, err := t.storage.(rest.Updater).Update(ctx, objectMeta.GetName(), rest.DefaultUpdatedObjectInfo(storedFooWithUpdates, noopTransform))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
return
|
return
|
||||||
|
|
@ -624,7 +624,7 @@ func (t *Tester) testUpdatePropagatesUpdatedObjectError(obj runtime.Object, crea
|
||||||
return nil, propagateErr
|
return nil, propagateErr
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _, err := t.storage.(rest.Updater).Update(ctx, name, rest.DefaultUpdatedObjectInfo(foo, t.scheme, noopTransform))
|
_, _, err := t.storage.(rest.Updater).Update(ctx, name, rest.DefaultUpdatedObjectInfo(foo, noopTransform))
|
||||||
if err != propagateErr {
|
if err != propagateErr {
|
||||||
t.Errorf("expected propagated error, got %#v", err)
|
t.Errorf("expected propagated error, got %#v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -650,7 +650,7 @@ func (t *Tester) testUpdateIgnoreGenerationUpdates(obj runtime.Object, createFn
|
||||||
olderMeta := t.getObjectMetaOrFail(older)
|
olderMeta := t.getObjectMetaOrFail(older)
|
||||||
olderMeta.SetGeneration(2)
|
olderMeta.SetGeneration(2)
|
||||||
|
|
||||||
_, _, err = t.storage.(rest.Updater).Update(t.TestContext(), olderMeta.GetName(), rest.DefaultUpdatedObjectInfo(older, t.scheme))
|
_, _, err = t.storage.(rest.Updater).Update(t.TestContext(), olderMeta.GetName(), rest.DefaultUpdatedObjectInfo(older))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -666,7 +666,7 @@ func (t *Tester) testUpdateIgnoreGenerationUpdates(obj runtime.Object, createFn
|
||||||
|
|
||||||
func (t *Tester) testUpdateOnNotFound(obj runtime.Object) {
|
func (t *Tester) testUpdateOnNotFound(obj runtime.Object) {
|
||||||
t.setObjectMeta(obj, t.namer(0))
|
t.setObjectMeta(obj, t.namer(0))
|
||||||
_, created, err := t.storage.(rest.Updater).Update(t.TestContext(), t.namer(0), rest.DefaultUpdatedObjectInfo(obj, t.scheme))
|
_, created, err := t.storage.(rest.Updater).Update(t.TestContext(), t.namer(0), rest.DefaultUpdatedObjectInfo(obj))
|
||||||
if t.createOnUpdate {
|
if t.createOnUpdate {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("creation allowed on updated, but got an error: %v", err)
|
t.Errorf("creation allowed on updated, but got an error: %v", err)
|
||||||
|
|
@ -701,7 +701,7 @@ func (t *Tester) testUpdateRejectsMismatchedNamespace(obj runtime.Object, create
|
||||||
objectMeta.SetName(t.namer(1))
|
objectMeta.SetName(t.namer(1))
|
||||||
objectMeta.SetNamespace("not-default")
|
objectMeta.SetNamespace("not-default")
|
||||||
|
|
||||||
obj, updated, err := t.storage.(rest.Updater).Update(t.TestContext(), "foo1", rest.DefaultUpdatedObjectInfo(storedFoo, t.scheme))
|
obj, updated, err := t.storage.(rest.Updater).Update(t.TestContext(), "foo1", rest.DefaultUpdatedObjectInfo(storedFoo))
|
||||||
if obj != nil || updated {
|
if obj != nil || updated {
|
||||||
t.Errorf("expected nil object and not updated")
|
t.Errorf("expected nil object and not updated")
|
||||||
}
|
}
|
||||||
|
|
@ -732,7 +732,7 @@ func (t *Tester) testUpdateIgnoreClusterName(obj runtime.Object, createFn Create
|
||||||
olderMeta := t.getObjectMetaOrFail(older)
|
olderMeta := t.getObjectMetaOrFail(older)
|
||||||
olderMeta.SetClusterName("clustername-to-ignore")
|
olderMeta.SetClusterName("clustername-to-ignore")
|
||||||
|
|
||||||
_, _, err = t.storage.(rest.Updater).Update(t.TestContext(), olderMeta.GetName(), rest.DefaultUpdatedObjectInfo(older, t.scheme))
|
_, _, err = t.storage.(rest.Updater).Update(t.TestContext(), olderMeta.GetName(), rest.DefaultUpdatedObjectInfo(older))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -136,19 +136,14 @@ type defaultUpdatedObjectInfo struct {
|
||||||
// obj is the updated object
|
// obj is the updated object
|
||||||
obj runtime.Object
|
obj runtime.Object
|
||||||
|
|
||||||
// copier makes a copy of the object before returning it.
|
|
||||||
// this allows repeated calls to UpdatedObject() to return
|
|
||||||
// pristine data, even if the returned value is mutated.
|
|
||||||
copier runtime.ObjectCopier
|
|
||||||
|
|
||||||
// transformers is an optional list of transforming functions that modify or
|
// transformers is an optional list of transforming functions that modify or
|
||||||
// replace obj using information from the context, old object, or other sources.
|
// replace obj using information from the context, old object, or other sources.
|
||||||
transformers []TransformFunc
|
transformers []TransformFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultUpdatedObjectInfo returns an UpdatedObjectInfo impl based on the specified object.
|
// DefaultUpdatedObjectInfo returns an UpdatedObjectInfo impl based on the specified object.
|
||||||
func DefaultUpdatedObjectInfo(obj runtime.Object, copier runtime.ObjectCopier, transformers ...TransformFunc) UpdatedObjectInfo {
|
func DefaultUpdatedObjectInfo(obj runtime.Object, transformers ...TransformFunc) UpdatedObjectInfo {
|
||||||
return &defaultUpdatedObjectInfo{obj, copier, transformers}
|
return &defaultUpdatedObjectInfo{obj, transformers}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Preconditions satisfies the UpdatedObjectInfo interface.
|
// Preconditions satisfies the UpdatedObjectInfo interface.
|
||||||
|
|
|
||||||
|
|
@ -416,7 +416,6 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
|
||||||
Creater: apiGroupInfo.Scheme,
|
Creater: apiGroupInfo.Scheme,
|
||||||
Convertor: apiGroupInfo.Scheme,
|
Convertor: apiGroupInfo.Scheme,
|
||||||
UnsafeConvertor: runtime.UnsafeObjectConvertor(apiGroupInfo.Scheme),
|
UnsafeConvertor: runtime.UnsafeObjectConvertor(apiGroupInfo.Scheme),
|
||||||
Copier: apiGroupInfo.Scheme,
|
|
||||||
Defaulter: apiGroupInfo.Scheme,
|
Defaulter: apiGroupInfo.Scheme,
|
||||||
Typer: apiGroupInfo.Scheme,
|
Typer: apiGroupInfo.Scheme,
|
||||||
SubresourceGroupVersionKind: apiGroupInfo.SubresourceGroupVersionKind,
|
SubresourceGroupVersionKind: apiGroupInfo.SubresourceGroupVersionKind,
|
||||||
|
|
|
||||||
|
|
@ -37,9 +37,9 @@ type RecommendedOptions struct {
|
||||||
CoreAPI *CoreAPIOptions
|
CoreAPI *CoreAPIOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRecommendedOptions(prefix string, copier runtime.ObjectCopier, codec runtime.Codec) *RecommendedOptions {
|
func NewRecommendedOptions(prefix string, codec runtime.Codec) *RecommendedOptions {
|
||||||
return &RecommendedOptions{
|
return &RecommendedOptions{
|
||||||
Etcd: NewEtcdOptions(storagebackend.NewDefaultConfig(prefix, copier, codec)),
|
Etcd: NewEtcdOptions(storagebackend.NewDefaultConfig(prefix, codec)),
|
||||||
SecureServing: NewSecureServingOptions(),
|
SecureServing: NewSecureServingOptions(),
|
||||||
Authentication: NewDelegatingAuthenticationOptions(),
|
Authentication: NewDelegatingAuthenticationOptions(),
|
||||||
Authorization: NewDelegatingAuthorizationOptions(),
|
Authorization: NewDelegatingAuthorizationOptions(),
|
||||||
|
|
|
||||||
|
|
@ -145,7 +145,6 @@ func TestUpdateEtcdOverrides(t *testing.T) {
|
||||||
defaultConfig := storagebackend.Config{
|
defaultConfig := storagebackend.Config{
|
||||||
Prefix: "/registry",
|
Prefix: "/registry",
|
||||||
ServerList: defaultEtcdLocation,
|
ServerList: defaultEtcdLocation,
|
||||||
Copier: scheme,
|
|
||||||
}
|
}
|
||||||
storageFactory := NewDefaultStorageFactory(defaultConfig, "", codecs, NewDefaultResourceEncodingConfig(registry), NewResourceConfig(), nil)
|
storageFactory := NewDefaultStorageFactory(defaultConfig, "", codecs, NewDefaultResourceEncodingConfig(registry), NewResourceConfig(), nil)
|
||||||
storageFactory.SetEtcdLocation(test.resource, test.servers)
|
storageFactory.SetEtcdLocation(test.resource, test.servers)
|
||||||
|
|
|
||||||
|
|
@ -54,8 +54,6 @@ type CacherConfig struct {
|
||||||
// An underlying storage.Versioner.
|
// An underlying storage.Versioner.
|
||||||
Versioner Versioner
|
Versioner Versioner
|
||||||
|
|
||||||
Copier runtime.ObjectCopier
|
|
||||||
|
|
||||||
// The Cache will be caching objects of a given Type and assumes that they
|
// The Cache will be caching objects of a given Type and assumes that they
|
||||||
// are all stored under ResourcePrefix directory in the underlying database.
|
// are all stored under ResourcePrefix directory in the underlying database.
|
||||||
Type interface{}
|
Type interface{}
|
||||||
|
|
@ -161,8 +159,6 @@ type Cacher struct {
|
||||||
// Underlying storage.Interface.
|
// Underlying storage.Interface.
|
||||||
storage Interface
|
storage Interface
|
||||||
|
|
||||||
copier runtime.ObjectCopier
|
|
||||||
|
|
||||||
// Expected type of objects in the underlying cache.
|
// Expected type of objects in the underlying cache.
|
||||||
objectType reflect.Type
|
objectType reflect.Type
|
||||||
|
|
||||||
|
|
@ -212,7 +208,6 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
|
||||||
cacher := &Cacher{
|
cacher := &Cacher{
|
||||||
ready: newReady(),
|
ready: newReady(),
|
||||||
storage: config.Storage,
|
storage: config.Storage,
|
||||||
copier: config.Copier,
|
|
||||||
objectType: reflect.TypeOf(config.Type),
|
objectType: reflect.TypeOf(config.Type),
|
||||||
watchCache: watchCache,
|
watchCache: watchCache,
|
||||||
reflector: cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0),
|
reflector: cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0),
|
||||||
|
|
@ -343,7 +338,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
|
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
|
||||||
watcher := newCacheWatcher(c.copier, watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget)
|
watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget)
|
||||||
|
|
||||||
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
||||||
c.watcherIdx++
|
c.watcherIdx++
|
||||||
|
|
@ -778,7 +773,6 @@ func (c *errWatcher) Stop() {
|
||||||
// cacherWatch implements watch.Interface
|
// cacherWatch implements watch.Interface
|
||||||
type cacheWatcher struct {
|
type cacheWatcher struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
copier runtime.ObjectCopier
|
|
||||||
input chan *watchCacheEvent
|
input chan *watchCacheEvent
|
||||||
result chan watch.Event
|
result chan watch.Event
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
|
@ -787,9 +781,8 @@ type cacheWatcher struct {
|
||||||
forget func(bool)
|
forget func(bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCacheWatcher(copier runtime.ObjectCopier, resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher {
|
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher {
|
||||||
watcher := &cacheWatcher{
|
watcher := &cacheWatcher{
|
||||||
copier: copier,
|
|
||||||
input: make(chan *watchCacheEvent, chanSize),
|
input: make(chan *watchCacheEvent, chanSize),
|
||||||
result: make(chan watch.Event, chanSize),
|
result: make(chan watch.Event, chanSize),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,6 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/util/diff"
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
|
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
|
||||||
|
|
@ -49,7 +48,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
||||||
}
|
}
|
||||||
// set the size of the buffer of w.result to 0, so that the writes to
|
// set the size of the buffer of w.result to 0, so that the writes to
|
||||||
// w.result is blocked.
|
// w.result is blocked.
|
||||||
w := newCacheWatcher(scheme.Scheme, 0, 0, initEvents, filter, forget)
|
w := newCacheWatcher(0, 0, initEvents, filter, forget)
|
||||||
w.Stop()
|
w.Stop()
|
||||||
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
lock.RLock()
|
lock.RLock()
|
||||||
|
|
@ -158,7 +157,7 @@ TestCase:
|
||||||
for j := range testCase.events {
|
for j := range testCase.events {
|
||||||
testCase.events[j].ResourceVersion = uint64(j) + 1
|
testCase.events[j].ResourceVersion = uint64(j) + 1
|
||||||
}
|
}
|
||||||
w := newCacheWatcher(scheme.Scheme, 0, 0, testCase.events, filter, forget)
|
w := newCacheWatcher(0, 0, testCase.events, filter, forget)
|
||||||
ch := w.ResultChan()
|
ch := w.ResultChan()
|
||||||
for j, event := range testCase.expected {
|
for j, event := range testCase.expected {
|
||||||
e := <-ch
|
e := <-ch
|
||||||
|
|
|
||||||
|
|
@ -62,13 +62,12 @@ var IdentityTransformer ValueTransformer = identityTransformer{}
|
||||||
|
|
||||||
// Creates a new storage interface from the client
|
// Creates a new storage interface from the client
|
||||||
// TODO: deprecate in favor of storage.Config abstraction over time
|
// TODO: deprecate in favor of storage.Config abstraction over time
|
||||||
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier, transformer ValueTransformer) storage.Interface {
|
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, transformer ValueTransformer) storage.Interface {
|
||||||
return &etcdHelper{
|
return &etcdHelper{
|
||||||
etcdMembersAPI: etcd.NewMembersAPI(client),
|
etcdMembersAPI: etcd.NewMembersAPI(client),
|
||||||
etcdKeysAPI: etcd.NewKeysAPI(client),
|
etcdKeysAPI: etcd.NewKeysAPI(client),
|
||||||
codec: codec,
|
codec: codec,
|
||||||
versioner: APIObjectVersioner{},
|
versioner: APIObjectVersioner{},
|
||||||
copier: copier,
|
|
||||||
transformer: transformer,
|
transformer: transformer,
|
||||||
pathPrefix: path.Join("/", prefix),
|
pathPrefix: path.Join("/", prefix),
|
||||||
quorum: quorum,
|
quorum: quorum,
|
||||||
|
|
@ -81,7 +80,6 @@ type etcdHelper struct {
|
||||||
etcdMembersAPI etcd.MembersAPI
|
etcdMembersAPI etcd.MembersAPI
|
||||||
etcdKeysAPI etcd.KeysAPI
|
etcdKeysAPI etcd.KeysAPI
|
||||||
codec runtime.Codec
|
codec runtime.Codec
|
||||||
copier runtime.ObjectCopier
|
|
||||||
transformer ValueTransformer
|
transformer ValueTransformer
|
||||||
// Note that versioner is required for etcdHelper to work correctly.
|
// Note that versioner is required for etcdHelper to work correctly.
|
||||||
// The public constructors (NewStorage & NewEtcdStorage) are setting it
|
// The public constructors (NewStorage & NewEtcdStorage) are setting it
|
||||||
|
|
|
||||||
|
|
@ -95,8 +95,8 @@ func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) {
|
||||||
return scheme, codecs
|
return scheme, codecs
|
||||||
}
|
}
|
||||||
|
|
||||||
func newEtcdHelper(client etcd.Client, scheme *runtime.Scheme, codec runtime.Codec, prefix string) etcdHelper {
|
func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper {
|
||||||
return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme, prefixTransformer{prefix: "test!"}).(*etcdHelper)
|
return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, prefixTransformer{prefix: "test!"}).(*etcdHelper)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns an encoded version of example.Pod with the given name.
|
// Returns an encoded version of example.Pod with the given name.
|
||||||
|
|
@ -128,11 +128,11 @@ func createPodList(t *testing.T, helper etcdHelper, list *example.PodList) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestList(t *testing.T) {
|
func TestList(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
list := example.PodList{
|
list := example.PodList{
|
||||||
Items: []example.Pod{
|
Items: []example.Pod{
|
||||||
|
|
@ -166,11 +166,11 @@ func TestList(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTransformationFailure(t *testing.T) {
|
func TestTransformationFailure(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
pods := []example.Pod{
|
pods := []example.Pod{
|
||||||
{
|
{
|
||||||
|
|
@ -221,11 +221,11 @@ func TestTransformationFailure(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListFiltered(t *testing.T) {
|
func TestListFiltered(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
list := example.PodList{
|
list := example.PodList{
|
||||||
Items: []example.Pod{
|
Items: []example.Pod{
|
||||||
|
|
@ -267,14 +267,14 @@ func TestListFiltered(t *testing.T) {
|
||||||
|
|
||||||
// TestListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query
|
// TestListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query
|
||||||
func TestListAcrossDirectories(t *testing.T) {
|
func TestListAcrossDirectories(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
|
|
||||||
roothelper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
roothelper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
helper1 := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()+"/dir1")
|
helper1 := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()+"/dir1")
|
||||||
helper2 := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()+"/dir2")
|
helper2 := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()+"/dir2")
|
||||||
|
|
||||||
list := example.PodList{
|
list := example.PodList{
|
||||||
Items: []example.Pod{
|
Items: []example.Pod{
|
||||||
|
|
@ -314,12 +314,12 @@ func TestListAcrossDirectories(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGet(t *testing.T) {
|
func TestGet(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
expect := example.Pod{
|
expect := example.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
|
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
|
||||||
Spec: storagetests.DeepEqualSafePodSpec(),
|
Spec: storagetests.DeepEqualSafePodSpec(),
|
||||||
|
|
@ -338,12 +338,12 @@ func TestGet(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetNotFoundErr(t *testing.T) {
|
func TestGetNotFoundErr(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: "v1"})
|
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: "v1"})
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
boguskey := "/some/boguskey"
|
boguskey := "/some/boguskey"
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
var got example.Pod
|
var got example.Pod
|
||||||
err := helper.Get(context.TODO(), boguskey, "", &got, false)
|
err := helper.Get(context.TODO(), boguskey, "", &got, false)
|
||||||
|
|
@ -353,12 +353,12 @@ func TestGetNotFoundErr(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreate(t *testing.T) {
|
func TestCreate(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
returnedObj := &example.Pod{}
|
returnedObj := &example.Pod{}
|
||||||
err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 5)
|
err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -382,12 +382,12 @@ func TestCreate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreateNilOutParam(t *testing.T) {
|
func TestCreateNilOutParam(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
err := helper.Create(context.TODO(), "/some/key", obj, nil, 5)
|
err := helper.Create(context.TODO(), "/some/key", obj, nil, 5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
|
|
@ -395,12 +395,12 @@ func TestCreateNilOutParam(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdate(t *testing.T) {
|
func TestGuaranteedUpdate(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
|
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
|
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
||||||
|
|
@ -441,12 +441,12 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdateNoChange(t *testing.T) {
|
func TestGuaranteedUpdateNoChange(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
|
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
|
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
original := &storagetesting.TestResource{}
|
original := &storagetesting.TestResource{}
|
||||||
|
|
@ -496,12 +496,12 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
|
func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
|
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
// Create a new node.
|
// Create a new node.
|
||||||
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
|
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
|
|
@ -524,12 +524,12 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
|
func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
|
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
const concurrency = 10
|
const concurrency = 10
|
||||||
var wgDone sync.WaitGroup
|
var wgDone sync.WaitGroup
|
||||||
|
|
@ -574,12 +574,12 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdateUIDMismatch(t *testing.T) {
|
func TestGuaranteedUpdateUIDMismatch(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
prefix := path.Join("/", etcdtest.PathPrefix())
|
prefix := path.Join("/", etcdtest.PathPrefix())
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, prefix)
|
helper := newEtcdHelper(server.Client, codec, prefix)
|
||||||
|
|
||||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
|
||||||
podPtr := &example.Pod{}
|
podPtr := &example.Pod{}
|
||||||
|
|
@ -596,12 +596,12 @@ func TestGuaranteedUpdateUIDMismatch(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeleteUIDMismatch(t *testing.T) {
|
func TestDeleteUIDMismatch(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
prefix := path.Join("/", etcdtest.PathPrefix())
|
prefix := path.Join("/", etcdtest.PathPrefix())
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, prefix)
|
helper := newEtcdHelper(server.Client, codec, prefix)
|
||||||
|
|
||||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
|
||||||
podPtr := &example.Pod{}
|
podPtr := &example.Pod{}
|
||||||
|
|
@ -638,7 +638,7 @@ func (f *fakeDeleteKeysAPI) Get(ctx context.Context, key string, opts *etcd.GetO
|
||||||
// deletion yet. Etcd will fail the deletion and report the conflict. etcdHelper
|
// deletion yet. Etcd will fail the deletion and report the conflict. etcdHelper
|
||||||
// should retry until there is no conflict.
|
// should retry until there is no conflict.
|
||||||
func TestDeleteWithRetry(t *testing.T) {
|
func TestDeleteWithRetry(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
|
|
@ -652,7 +652,7 @@ func TestDeleteWithRetry(t *testing.T) {
|
||||||
return &etcd.Response{Node: &etcd.Node{Value: defaultPrefixValue(data), ModifiedIndex: 99}}, nil
|
return &etcd.Response{Node: &etcd.Node{Value: defaultPrefixValue(data), ModifiedIndex: 99}}, nil
|
||||||
}
|
}
|
||||||
expectedRetries := 3
|
expectedRetries := 3
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, prefix)
|
helper := newEtcdHelper(server.Client, codec, prefix)
|
||||||
fake := &fakeDeleteKeysAPI{KeysAPI: helper.etcdKeysAPI, fakeGetCap: expectedRetries, fakeGetFunc: fakeGet}
|
fake := &fakeDeleteKeysAPI{KeysAPI: helper.etcdKeysAPI, fakeGetCap: expectedRetries, fakeGetFunc: fakeGet}
|
||||||
helper.etcdKeysAPI = fake
|
helper.etcdKeysAPI = fake
|
||||||
|
|
||||||
|
|
@ -676,7 +676,7 @@ func TestDeleteWithRetry(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPrefix(t *testing.T) {
|
func TestPrefix(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
|
|
@ -687,7 +687,7 @@ func TestPrefix(t *testing.T) {
|
||||||
"/registry": "/registry",
|
"/registry": "/registry",
|
||||||
}
|
}
|
||||||
for configuredPrefix, effectivePrefix := range testcases {
|
for configuredPrefix, effectivePrefix := range testcases {
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, configuredPrefix)
|
helper := newEtcdHelper(server.Client, codec, configuredPrefix)
|
||||||
if helper.pathPrefix != effectivePrefix {
|
if helper.pathPrefix != effectivePrefix {
|
||||||
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, helper.pathPrefix)
|
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, helper.pathPrefix)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -295,12 +295,12 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
func TestWatch(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -339,13 +339,13 @@ func TestWatch(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchEtcdState(t *testing.T) {
|
func TestWatchEtcdState(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
key := "/somekey/foo"
|
key := "/somekey/foo"
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
|
|
||||||
h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
|
@ -391,7 +391,7 @@ func TestWatchEtcdState(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchFromZeroIndex(t *testing.T) {
|
func TestWatchFromZeroIndex(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
|
|
||||||
|
|
@ -399,7 +399,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
|
|
||||||
h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
// set before the watch and verify events
|
// set before the watch and verify events
|
||||||
err := h.Create(context.TODO(), key, pod, pod, 0)
|
err := h.Create(context.TODO(), key, pod, pod, 0)
|
||||||
|
|
@ -465,12 +465,12 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchListFromZeroIndex(t *testing.T) {
|
func TestWatchListFromZeroIndex(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
prefix := "/some/key"
|
prefix := "/some/key"
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
h := newEtcdHelper(server.Client, scheme, codec, prefix)
|
h := newEtcdHelper(server.Client, codec, prefix)
|
||||||
|
|
||||||
watching, err := h.WatchList(context.TODO(), "/", "0", storage.Everything)
|
watching, err := h.WatchList(context.TODO(), "/", "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -496,13 +496,13 @@ func TestWatchListFromZeroIndex(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchListIgnoresRootKey(t *testing.T) {
|
func TestWatchListIgnoresRootKey(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
h := newEtcdHelper(server.Client, scheme, codec, key)
|
h := newEtcdHelper(server.Client, codec, key)
|
||||||
|
|
||||||
watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything)
|
watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -528,12 +528,12 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchPurposefulShutdown(t *testing.T) {
|
func TestWatchPurposefulShutdown(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
// Test purposeful shutdown
|
// Test purposeful shutdown
|
||||||
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
||||||
|
|
|
||||||
|
|
@ -321,7 +321,6 @@ func NewUnsecuredEtcd3TestClientServer(t *testing.T, scheme *runtime.Scheme) (*E
|
||||||
Prefix: etcdtest.PathPrefix(),
|
Prefix: etcdtest.PathPrefix(),
|
||||||
ServerList: server.V3Client.Endpoints(),
|
ServerList: server.V3Client.Endpoints(),
|
||||||
DeserializationCacheSize: etcdtest.DeserializationCacheSize,
|
DeserializationCacheSize: etcdtest.DeserializationCacheSize,
|
||||||
Copier: scheme,
|
|
||||||
Paging: true,
|
Paging: true,
|
||||||
}
|
}
|
||||||
return server, config
|
return server, config
|
||||||
|
|
|
||||||
|
|
@ -55,8 +55,7 @@ type Config struct {
|
||||||
// We will drop the cache once using protobuf.
|
// We will drop the cache once using protobuf.
|
||||||
DeserializationCacheSize int
|
DeserializationCacheSize int
|
||||||
|
|
||||||
Codec runtime.Codec
|
Codec runtime.Codec
|
||||||
Copier runtime.ObjectCopier
|
|
||||||
// Transformer allows the value to be transformed prior to persisting into etcd.
|
// Transformer allows the value to be transformed prior to persisting into etcd.
|
||||||
Transformer value.Transformer
|
Transformer value.Transformer
|
||||||
|
|
||||||
|
|
@ -65,13 +64,12 @@ type Config struct {
|
||||||
CompactionInterval time.Duration
|
CompactionInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDefaultConfig(prefix string, copier runtime.ObjectCopier, codec runtime.Codec) *Config {
|
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
|
||||||
return &Config{
|
return &Config{
|
||||||
Prefix: prefix,
|
Prefix: prefix,
|
||||||
// Default cache size to 0 - if unset, its size will be set based on target
|
// Default cache size to 0 - if unset, its size will be set based on target
|
||||||
// memory usage.
|
// memory usage.
|
||||||
DeserializationCacheSize: 0,
|
DeserializationCacheSize: 0,
|
||||||
Copier: copier,
|
|
||||||
Codec: codec,
|
Codec: codec,
|
||||||
CompactionInterval: DefaultCompactInterval,
|
CompactionInterval: DefaultCompactInterval,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier, etcd.IdentityTransformer)
|
s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, etcd.IdentityTransformer)
|
||||||
return s, tr.CloseIdleConnections, nil
|
return s, tr.CloseIdleConnections, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,6 @@ func TestTLSConnection(t *testing.T) {
|
||||||
KeyFile: keyFile,
|
KeyFile: keyFile,
|
||||||
CAFile: caFile,
|
CAFile: caFile,
|
||||||
Codec: codec,
|
Codec: codec,
|
||||||
Copier: scheme,
|
|
||||||
}
|
}
|
||||||
storage, destroyFunc, err := newETCD3Storage(cfg)
|
storage, destroyFunc, err := newETCD3Storage(cfg)
|
||||||
defer destroyFunc()
|
defer destroyFunc()
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,6 @@ func newTestCacher(s storage.Interface, cap int) *storage.Cacher {
|
||||||
CacheCapacity: cap,
|
CacheCapacity: cap,
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Versioner: etcdstorage.APIObjectVersioner{},
|
Versioner: etcdstorage.APIObjectVersioner{},
|
||||||
Copier: scheme,
|
|
||||||
Type: &example.Pod{},
|
Type: &example.Pod{},
|
||||||
ResourcePrefix: prefix,
|
ResourcePrefix: prefix,
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue