Allow initialization of resources
Add support for creating resources that are not immediately visible to naive clients, but must first be initialized by one or more privileged cluster agents. These controllers can mark the object as initialized, allowing others to see them. Permission to override initialization defaults or modify an initializing object is limited per resource to a virtual subresource "RESOURCE/initialize" via RBAC. Initialization is currently alpha. Kubernetes-commit: 331eea67d8000e5c4b37e2234a90903c15881c2f
This commit is contained in:
parent
89caee803d
commit
5fa08b8c5e
|
|
@ -407,6 +407,7 @@ type SimpleRESTStorage struct {
|
||||||
fakeWatch *watch.FakeWatcher
|
fakeWatch *watch.FakeWatcher
|
||||||
requestedLabelSelector labels.Selector
|
requestedLabelSelector labels.Selector
|
||||||
requestedFieldSelector fields.Selector
|
requestedFieldSelector fields.Selector
|
||||||
|
requestedUninitialized bool
|
||||||
requestedResourceVersion string
|
requestedResourceVersion string
|
||||||
requestedResourceNamespace string
|
requestedResourceNamespace string
|
||||||
|
|
||||||
|
|
@ -449,6 +450,7 @@ func (storage *SimpleRESTStorage) List(ctx request.Context, options *metainterna
|
||||||
if options != nil && options.FieldSelector != nil {
|
if options != nil && options.FieldSelector != nil {
|
||||||
storage.requestedFieldSelector = options.FieldSelector
|
storage.requestedFieldSelector = options.FieldSelector
|
||||||
}
|
}
|
||||||
|
storage.requestedUninitialized = options.IncludeUninitialized
|
||||||
return result, storage.errors["list"]
|
return result, storage.errors["list"]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -522,7 +524,7 @@ func (storage *SimpleRESTStorage) NewList() runtime.Object {
|
||||||
return &genericapitesting.SimpleList{}
|
return &genericapitesting.SimpleList{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (storage *SimpleRESTStorage) Create(ctx request.Context, obj runtime.Object) (runtime.Object, error) {
|
func (storage *SimpleRESTStorage) Create(ctx request.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) {
|
||||||
storage.checkContext(ctx)
|
storage.checkContext(ctx)
|
||||||
storage.created = obj.(*genericapitesting.Simple)
|
storage.created = obj.(*genericapitesting.Simple)
|
||||||
if err := storage.errors["create"]; err != nil {
|
if err := storage.errors["create"]; err != nil {
|
||||||
|
|
@ -717,7 +719,7 @@ type NamedCreaterRESTStorage struct {
|
||||||
createdName string
|
createdName string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (storage *NamedCreaterRESTStorage) Create(ctx request.Context, name string, obj runtime.Object) (runtime.Object, error) {
|
func (storage *NamedCreaterRESTStorage) Create(ctx request.Context, name string, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) {
|
||||||
storage.checkContext(ctx)
|
storage.checkContext(ctx)
|
||||||
storage.created = obj.(*genericapitesting.Simple)
|
storage.created = obj.(*genericapitesting.Simple)
|
||||||
storage.createdName = name
|
storage.createdName = name
|
||||||
|
|
@ -1470,6 +1472,52 @@ func TestGet(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetUninitialized(t *testing.T) {
|
||||||
|
storage := map[string]rest.Storage{}
|
||||||
|
simpleStorage := SimpleRESTStorage{
|
||||||
|
list: []genericapitesting.Simple{
|
||||||
|
{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Initializers: &metav1.Initializers{
|
||||||
|
Pending: []metav1.Initializer{{Name: "test"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Other: "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
selfLinker := &setTestSelfLinker{
|
||||||
|
t: t,
|
||||||
|
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id",
|
||||||
|
alternativeSet: sets.NewString("/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple"),
|
||||||
|
name: "id",
|
||||||
|
namespace: "default",
|
||||||
|
}
|
||||||
|
storage["simple"] = &simpleStorage
|
||||||
|
handler := handleLinker(storage, selfLinker)
|
||||||
|
server := httptest.NewServer(handler)
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple?includeUninitialized=true")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected response: %#v", resp)
|
||||||
|
}
|
||||||
|
var itemOut genericapitesting.SimpleList
|
||||||
|
body, err := extractBody(resp, &itemOut)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(itemOut.Items) != 1 || itemOut.Items[0].Other != "foo" {
|
||||||
|
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body))
|
||||||
|
}
|
||||||
|
if !simpleStorage.requestedUninitialized {
|
||||||
|
t.Errorf("Didn't set correct flag")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetPretty(t *testing.T) {
|
func TestGetPretty(t *testing.T) {
|
||||||
storage := map[string]rest.Storage{}
|
storage := map[string]rest.Storage{}
|
||||||
simpleStorage := SimpleRESTStorage{
|
simpleStorage := SimpleRESTStorage{
|
||||||
|
|
|
||||||
|
|
@ -449,13 +449,12 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: replace with content type negotiation?
|
||||||
|
includeUninitialized := req.URL.Query().Get("includeUninitialized") == "1"
|
||||||
|
|
||||||
trace.Step("About to store object in database")
|
trace.Step("About to store object in database")
|
||||||
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
||||||
out, err := r.Create(ctx, name, obj)
|
return r.Create(ctx, name, obj, includeUninitialized)
|
||||||
if status, ok := out.(*metav1.Status); ok && err == nil && status.Code == 0 {
|
|
||||||
status.Code = http.StatusCreated
|
|
||||||
}
|
|
||||||
return out, err
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
scope.err(err, w, req)
|
scope.err(err, w, req)
|
||||||
|
|
@ -474,7 +473,19 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object
|
||||||
}
|
}
|
||||||
trace.Step("Self-link added")
|
trace.Step("Self-link added")
|
||||||
|
|
||||||
transformResponseObject(ctx, scope, req, w, http.StatusCreated, result)
|
// If the object is partially initialized, always indicate it via StatusAccepted
|
||||||
|
code := http.StatusCreated
|
||||||
|
if accessor, err := meta.Accessor(result); err == nil {
|
||||||
|
if accessor.GetInitializers() != nil {
|
||||||
|
code = http.StatusAccepted
|
||||||
|
}
|
||||||
|
}
|
||||||
|
status, ok := result.(*metav1.Status)
|
||||||
|
if ok && err == nil && status.Code == 0 {
|
||||||
|
status.Code = int32(code)
|
||||||
|
}
|
||||||
|
|
||||||
|
transformResponseObject(ctx, scope, req, w, code, result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -492,8 +503,8 @@ type namedCreaterAdapter struct {
|
||||||
rest.Creater
|
rest.Creater
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *namedCreaterAdapter) Create(ctx request.Context, name string, obj runtime.Object) (runtime.Object, error) {
|
func (c *namedCreaterAdapter) Create(ctx request.Context, name string, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) {
|
||||||
return c.Creater.Create(ctx, obj)
|
return c.Creater.Create(ctx, obj, includeUninitialized)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PatchResource returns a function that will handle a resource patch
|
// PatchResource returns a function that will handle a resource patch
|
||||||
|
|
|
||||||
|
|
@ -259,6 +259,7 @@ func (e *Store) ListPredicate(ctx genericapirequest.Context, p storage.Selection
|
||||||
// By default we should serve the request from etcd.
|
// By default we should serve the request from etcd.
|
||||||
options = &metainternalversion.ListOptions{ResourceVersion: ""}
|
options = &metainternalversion.ListOptions{ResourceVersion: ""}
|
||||||
}
|
}
|
||||||
|
p.IncludeUninitialized = options.IncludeUninitialized
|
||||||
list := e.NewListFunc()
|
list := e.NewListFunc()
|
||||||
if name, ok := p.MatchesSingle(); ok {
|
if name, ok := p.MatchesSingle(); ok {
|
||||||
if key, err := e.KeyFunc(ctx, name); err == nil {
|
if key, err := e.KeyFunc(ctx, name); err == nil {
|
||||||
|
|
@ -273,7 +274,7 @@ func (e *Store) ListPredicate(ctx genericapirequest.Context, p storage.Selection
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create inserts a new item according to the unique key from the object.
|
// Create inserts a new item according to the unique key from the object.
|
||||||
func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) {
|
func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) {
|
||||||
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
|
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -319,15 +320,91 @@ func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object) (runti
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !includeUninitialized {
|
||||||
|
return e.WaitForInitialized(ctx, out)
|
||||||
|
}
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *Store) WaitForInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) {
|
||||||
|
// return early if we don't have initializers, or if they've completed already
|
||||||
|
accessor, err := meta.Accessor(obj)
|
||||||
|
if err != nil {
|
||||||
|
return obj, nil
|
||||||
|
}
|
||||||
|
initializers := accessor.GetInitializers()
|
||||||
|
if initializers == nil {
|
||||||
|
return obj, nil
|
||||||
|
}
|
||||||
|
if result := initializers.Result; result != nil {
|
||||||
|
return nil, kubeerr.FromObject(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := e.KeyFunc(ctx, accessor.GetName())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
w, err := e.Storage.Watch(ctx, key, accessor.GetResourceVersion(), storage.SelectionPredicate{
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.Everything(),
|
||||||
|
|
||||||
|
IncludeUninitialized: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer w.Stop()
|
||||||
|
|
||||||
|
latest := obj
|
||||||
|
ch := w.ResultChan()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
// TODO: should we just expose the partially initialized object?
|
||||||
|
return nil, kubeerr.NewServerTimeout(e.QualifiedResource, "create", 0)
|
||||||
|
}
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Deleted:
|
||||||
|
if latest = event.Object; latest != nil {
|
||||||
|
if accessor, err := meta.Accessor(latest); err == nil {
|
||||||
|
if initializers := accessor.GetInitializers(); initializers != nil && initializers.Result != nil {
|
||||||
|
// initialization failed, but we missed the modification event
|
||||||
|
return nil, kubeerr.FromObject(initializers.Result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, kubeerr.NewInternalError(fmt.Errorf("object deleted while waiting for creation"))
|
||||||
|
case watch.Error:
|
||||||
|
if status, ok := event.Object.(*metav1.Status); ok {
|
||||||
|
return nil, &kubeerr.StatusError{ErrStatus: *status}
|
||||||
|
}
|
||||||
|
return nil, kubeerr.NewInternalError(fmt.Errorf("unexpected object in watch stream, can't complete initialization %T", event.Object))
|
||||||
|
case watch.Modified:
|
||||||
|
latest = event.Object
|
||||||
|
accessor, err = meta.Accessor(latest)
|
||||||
|
if err != nil {
|
||||||
|
return nil, kubeerr.NewInternalError(fmt.Errorf("object no longer has access to metadata %T: %v", latest, err))
|
||||||
|
}
|
||||||
|
initializers := accessor.GetInitializers()
|
||||||
|
if initializers == nil {
|
||||||
|
// completed initialization
|
||||||
|
return latest, nil
|
||||||
|
}
|
||||||
|
if result := initializers.Result; result != nil {
|
||||||
|
// initialization failed
|
||||||
|
return nil, kubeerr.FromObject(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// shouldDeleteDuringUpdate checks if a Update is removing all the object's
|
// shouldDeleteDuringUpdate checks if a Update is removing all the object's
|
||||||
// finalizers. If so, it further checks if the object's
|
// finalizers. If so, it further checks if the object's
|
||||||
// DeletionGracePeriodSeconds is 0. If so, it returns true.
|
// DeletionGracePeriodSeconds is 0. If so, it returns true. If garbage collection
|
||||||
//
|
// is disabled it always returns false.
|
||||||
// If the store does not have garbage collection enabled,
|
|
||||||
// shouldDeleteDuringUpdate will always return false.
|
|
||||||
func (e *Store) shouldDeleteDuringUpdate(ctx genericapirequest.Context, key string, obj, existing runtime.Object) bool {
|
func (e *Store) shouldDeleteDuringUpdate(ctx genericapirequest.Context, key string, obj, existing runtime.Object) bool {
|
||||||
if !e.EnableGarbageCollection {
|
if !e.EnableGarbageCollection {
|
||||||
return false
|
return false
|
||||||
|
|
@ -345,9 +422,23 @@ func (e *Store) shouldDeleteDuringUpdate(ctx genericapirequest.Context, key stri
|
||||||
return len(newMeta.GetFinalizers()) == 0 && oldMeta.GetDeletionGracePeriodSeconds() != nil && *oldMeta.GetDeletionGracePeriodSeconds() == 0
|
return len(newMeta.GetFinalizers()) == 0 && oldMeta.GetDeletionGracePeriodSeconds() != nil && *oldMeta.GetDeletionGracePeriodSeconds() == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteForEmptyFinalizers handles deleting an object once its finalizer list
|
// shouldDeleteForFailedInitialization returns true if the provided object is initializing and has
|
||||||
// becomes empty due to an update.
|
// a failure recorded.
|
||||||
func (e *Store) deleteForEmptyFinalizers(ctx genericapirequest.Context, name, key string, obj runtime.Object, preconditions *storage.Preconditions) (runtime.Object, bool, error) {
|
func (e *Store) shouldDeleteForFailedInitialization(ctx genericapirequest.Context, obj runtime.Object) bool {
|
||||||
|
m, err := meta.Accessor(obj)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if initializers := m.GetInitializers(); initializers != nil && initializers.Result != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteWithoutFinalizers handles deleting an object ignoring its finalizer list.
|
||||||
|
// Used for objects that are either been finalized or have never initialized.
|
||||||
|
func (e *Store) deleteWithoutFinalizers(ctx genericapirequest.Context, name, key string, obj runtime.Object, preconditions *storage.Preconditions) (runtime.Object, bool, error) {
|
||||||
out := e.NewFunc()
|
out := e.NewFunc()
|
||||||
glog.V(6).Infof("going to delete %s from registry, triggered by update", name)
|
glog.V(6).Infof("going to delete %s from registry, triggered by update", name)
|
||||||
if err := e.Storage.Delete(ctx, key, out, preconditions); err != nil {
|
if err := e.Storage.Delete(ctx, key, out, preconditions); err != nil {
|
||||||
|
|
@ -477,7 +568,7 @@ func (e *Store) Update(ctx genericapirequest.Context, name string, objInfo rest.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// delete the object
|
// delete the object
|
||||||
if err == errEmptiedFinalizers {
|
if err == errEmptiedFinalizers {
|
||||||
return e.deleteForEmptyFinalizers(ctx, name, key, deleteObj, storagePreconditions)
|
return e.deleteWithoutFinalizers(ctx, name, key, deleteObj, storagePreconditions)
|
||||||
}
|
}
|
||||||
if creating {
|
if creating {
|
||||||
err = storeerr.InterpretCreateError(err, e.QualifiedResource, name)
|
err = storeerr.InterpretCreateError(err, e.QualifiedResource, name)
|
||||||
|
|
@ -487,6 +578,11 @@ func (e *Store) Update(ctx genericapirequest.Context, name string, objInfo rest.
|
||||||
}
|
}
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if e.shouldDeleteForFailedInitialization(ctx, out) {
|
||||||
|
return e.deleteWithoutFinalizers(ctx, name, key, out, storagePreconditions)
|
||||||
|
}
|
||||||
|
|
||||||
if creating {
|
if creating {
|
||||||
if e.AfterCreate != nil {
|
if e.AfterCreate != nil {
|
||||||
if err := e.AfterCreate(out); err != nil {
|
if err := e.AfterCreate(out); err != nil {
|
||||||
|
|
@ -1025,11 +1121,14 @@ func (e *Store) Watch(ctx genericapirequest.Context, options *metainternalversio
|
||||||
if options != nil && options.FieldSelector != nil {
|
if options != nil && options.FieldSelector != nil {
|
||||||
field = options.FieldSelector
|
field = options.FieldSelector
|
||||||
}
|
}
|
||||||
|
predicate := e.PredicateFunc(label, field)
|
||||||
|
|
||||||
resourceVersion := ""
|
resourceVersion := ""
|
||||||
if options != nil {
|
if options != nil {
|
||||||
resourceVersion = options.ResourceVersion
|
resourceVersion = options.ResourceVersion
|
||||||
|
predicate.IncludeUninitialized = options.IncludeUninitialized
|
||||||
}
|
}
|
||||||
return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
|
return e.WatchPredicate(ctx, predicate, resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchPredicate starts a watch for the items that m matches.
|
// WatchPredicate starts a watch for the items that m matches.
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/selection"
|
"k8s.io/apimachinery/pkg/selection"
|
||||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
|
@ -117,9 +118,9 @@ func NewTestGenericStoreRegistry(t *testing.T) (factory.DestroyFunc, *Store) {
|
||||||
return newTestGenericStoreRegistry(t, scheme, false)
|
return newTestGenericStoreRegistry(t, scheme, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPodAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
|
func getPodAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
pod := obj.(*example.Pod)
|
pod := obj.(*example.Pod)
|
||||||
return labels.Set{"name": pod.ObjectMeta.Name}, nil, nil
|
return labels.Set{"name": pod.ObjectMeta.Name}, nil, pod.Initializers != nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// matchPodName returns selection predicate that matches any pod with name in the set.
|
// matchPodName returns selection predicate that matches any pod with name in the set.
|
||||||
|
|
@ -142,8 +143,8 @@ func matchEverything() storage.SelectionPredicate {
|
||||||
return storage.SelectionPredicate{
|
return storage.SelectionPredicate{
|
||||||
Label: labels.Everything(),
|
Label: labels.Everything(),
|
||||||
Field: fields.Everything(),
|
Field: fields.Everything(),
|
||||||
GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) {
|
GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) {
|
||||||
return nil, nil, nil
|
return nil, nil, false, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -238,7 +239,7 @@ func TestStoreListResourceVersion(t *testing.T) {
|
||||||
destroyFunc, registry := newTestGenericStoreRegistry(t, scheme, true)
|
destroyFunc, registry := newTestGenericStoreRegistry(t, scheme, true)
|
||||||
defer destroyFunc()
|
defer destroyFunc()
|
||||||
|
|
||||||
obj, err := registry.Create(ctx, fooPod)
|
obj, err := registry.Create(ctx, fooPod, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
@ -268,7 +269,7 @@ func TestStoreListResourceVersion(t *testing.T) {
|
||||||
t.Fatalf("expected waiting, but get %#v", l)
|
t.Fatalf("expected waiting, but get %#v", l)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := registry.Create(ctx, barPod); err != nil {
|
if _, err := registry.Create(ctx, barPod, false); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -305,7 +306,7 @@ func TestStoreCreate(t *testing.T) {
|
||||||
registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy}
|
registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy}
|
||||||
|
|
||||||
// create the object
|
// create the object
|
||||||
objA, err := registry.Create(testContext, podA)
|
objA, err := registry.Create(testContext, podA, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -322,7 +323,7 @@ func TestStoreCreate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// now try to create the second pod
|
// now try to create the second pod
|
||||||
_, err = registry.Create(testContext, podB)
|
_, err = registry.Create(testContext, podB, false)
|
||||||
if !errors.IsAlreadyExists(err) {
|
if !errors.IsAlreadyExists(err) {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -341,7 +342,7 @@ func TestStoreCreate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// try to create before graceful deletion period is over
|
// try to create before graceful deletion period is over
|
||||||
_, err = registry.Create(testContext, podA)
|
_, err = registry.Create(testContext, podA, false)
|
||||||
if err == nil || !errors.IsAlreadyExists(err) {
|
if err == nil || !errors.IsAlreadyExists(err) {
|
||||||
t.Fatalf("Expected 'already exists' error from storage, but got %v", err)
|
t.Fatalf("Expected 'already exists' error from storage, but got %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -353,6 +354,208 @@ func TestStoreCreate(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isPendingInitialization(obj metav1.Object) bool {
|
||||||
|
return obj.GetInitializers() != nil && obj.GetInitializers().Result == nil && len(obj.GetInitializers().Pending) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func hasInitializers(obj metav1.Object, expected ...string) bool {
|
||||||
|
if !isPendingInitialization(obj) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if len(expected) != len(obj.GetInitializers().Pending) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i, init := range obj.GetInitializers().Pending {
|
||||||
|
if init.Name != expected[i] {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func isFailedInitialization(obj metav1.Object) bool {
|
||||||
|
return obj.GetInitializers() != nil && obj.GetInitializers().Result != nil && obj.GetInitializers().Result.Status == metav1.StatusFailure
|
||||||
|
}
|
||||||
|
|
||||||
|
func isInitialized(obj metav1.Object) bool {
|
||||||
|
return obj.GetInitializers() == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStoreCreateInitialized(t *testing.T) {
|
||||||
|
podA := &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "foo", Namespace: "test",
|
||||||
|
Initializers: &metav1.Initializers{
|
||||||
|
Pending: []metav1.Initializer{{Name: "Test"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: example.PodSpec{NodeName: "machine"},
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
|
||||||
|
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
|
||||||
|
ch := make(chan struct{})
|
||||||
|
chObserver := make(chan struct{})
|
||||||
|
|
||||||
|
// simulate a background initializer that initializes the object
|
||||||
|
early := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
defer close(ch)
|
||||||
|
w, err := registry.Watch(ctx, &metainternalversion.ListOptions{
|
||||||
|
IncludeUninitialized: true,
|
||||||
|
Watch: true,
|
||||||
|
FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer w.Stop()
|
||||||
|
event := <-w.ResultChan()
|
||||||
|
pod := event.Object.(*example.Pod)
|
||||||
|
if event.Type != watch.Added || !hasInitializers(pod, "Test") {
|
||||||
|
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-early:
|
||||||
|
t.Fatalf("CreateInitialized should not have returned")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
pod.Initializers = nil
|
||||||
|
updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod, scheme))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
pod = updated.(*example.Pod)
|
||||||
|
if !isInitialized(pod) {
|
||||||
|
t.Fatalf("unexpected update: %#v", pod.Initializers)
|
||||||
|
}
|
||||||
|
|
||||||
|
event = <-w.ResultChan()
|
||||||
|
if event.Type != watch.Modified || !isInitialized(event.Object.(*example.Pod)) {
|
||||||
|
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// create a background worker that should only observe the final creation
|
||||||
|
go func() {
|
||||||
|
defer close(chObserver)
|
||||||
|
w, err := registry.Watch(ctx, &metainternalversion.ListOptions{
|
||||||
|
IncludeUninitialized: false,
|
||||||
|
Watch: true,
|
||||||
|
FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer w.Stop()
|
||||||
|
|
||||||
|
event := <-w.ResultChan()
|
||||||
|
pod := event.Object.(*example.Pod)
|
||||||
|
if event.Type != watch.Added || !isInitialized(pod) {
|
||||||
|
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// create the object
|
||||||
|
objA, err := registry.Create(ctx, podA, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// signal that we're now waiting, then wait for both observers to see
|
||||||
|
// the result of the create.
|
||||||
|
early <- struct{}{}
|
||||||
|
<-ch
|
||||||
|
<-chObserver
|
||||||
|
|
||||||
|
// get the object
|
||||||
|
checkobj, err := registry.Get(ctx, podA.Name, &metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify objects are equal
|
||||||
|
if e, a := objA, checkobj; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Expected %#v, got %#v", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStoreCreateInitializedFailed(t *testing.T) {
|
||||||
|
podA := &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "foo", Namespace: "test",
|
||||||
|
Initializers: &metav1.Initializers{
|
||||||
|
Pending: []metav1.Initializer{{Name: "Test"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: example.PodSpec{NodeName: "machine"},
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
|
||||||
|
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
|
||||||
|
ch := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
w, err := registry.Watch(ctx, &metainternalversion.ListOptions{
|
||||||
|
IncludeUninitialized: true,
|
||||||
|
Watch: true,
|
||||||
|
FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
event := <-w.ResultChan()
|
||||||
|
pod := event.Object.(*example.Pod)
|
||||||
|
if event.Type != watch.Added || !hasInitializers(pod, "Test") {
|
||||||
|
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||||
|
}
|
||||||
|
pod.Initializers.Pending = nil
|
||||||
|
pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure, Code: 403, Reason: metav1.StatusReasonForbidden, Message: "induced failure"}
|
||||||
|
updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod, scheme))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
pod = updated.(*example.Pod)
|
||||||
|
if !isFailedInitialization(pod) {
|
||||||
|
t.Fatalf("unexpected update: %#v", pod.Initializers)
|
||||||
|
}
|
||||||
|
|
||||||
|
event = <-w.ResultChan()
|
||||||
|
if event.Type != watch.Modified || !isFailedInitialization(event.Object.(*example.Pod)) {
|
||||||
|
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||||
|
}
|
||||||
|
|
||||||
|
event = <-w.ResultChan()
|
||||||
|
if event.Type != watch.Deleted || !isFailedInitialization(event.Object.(*example.Pod)) {
|
||||||
|
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||||
|
}
|
||||||
|
w.Stop()
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// create the object
|
||||||
|
_, err := registry.Create(ctx, podA, false)
|
||||||
|
if !errors.IsForbidden(err) {
|
||||||
|
t.Fatalf("unexpected error: %#v", err.(errors.APIStatus).Status())
|
||||||
|
}
|
||||||
|
if err.(errors.APIStatus).Status().Message != "induced failure" {
|
||||||
|
t.Fatalf("unexpected error: %#v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-ch
|
||||||
|
|
||||||
|
// get the object
|
||||||
|
_, err = registry.Get(ctx, podA.Name, &metav1.GetOptions{})
|
||||||
|
if !errors.IsNotFound(err) {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func updateAndVerify(t *testing.T, ctx genericapirequest.Context, registry *Store, pod *example.Pod) bool {
|
func updateAndVerify(t *testing.T, ctx genericapirequest.Context, registry *Store, pod *example.Pod) bool {
|
||||||
obj, _, err := registry.Update(ctx, pod.Name, rest.DefaultUpdatedObjectInfo(pod, scheme))
|
obj, _, err := registry.Update(ctx, pod.Name, rest.DefaultUpdatedObjectInfo(pod, scheme))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -440,7 +643,7 @@ func TestNoOpUpdates(t *testing.T) {
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var createResult runtime.Object
|
var createResult runtime.Object
|
||||||
if createResult, err = registry.Create(genericapirequest.NewDefaultContext(), newPod()); err != nil {
|
if createResult, err = registry.Create(genericapirequest.NewDefaultContext(), newPod(), false); err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -615,7 +818,7 @@ func TestStoreDelete(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// create pod
|
// create pod
|
||||||
_, err = registry.Create(testContext, podA)
|
_, err = registry.Create(testContext, podA, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -687,7 +890,7 @@ func TestGracefulStoreHandleFinalizers(t *testing.T) {
|
||||||
registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy}
|
registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy}
|
||||||
defer destroyFunc()
|
defer destroyFunc()
|
||||||
// create pod
|
// create pod
|
||||||
_, err := registry.Create(testContext, podWithFinalizer)
|
_, err := registry.Create(testContext, podWithFinalizer, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -735,6 +938,43 @@ func TestGracefulStoreHandleFinalizers(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFailedInitializationStoreUpdate(t *testing.T) {
|
||||||
|
initialGeneration := int64(1)
|
||||||
|
podInitializing := &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "Test"}}}, Generation: initialGeneration},
|
||||||
|
Spec: example.PodSpec{NodeName: "machine"},
|
||||||
|
}
|
||||||
|
|
||||||
|
testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
|
||||||
|
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||||
|
registry.EnableGarbageCollection = true
|
||||||
|
defaultDeleteStrategy := testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true}
|
||||||
|
registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy}
|
||||||
|
defer destroyFunc()
|
||||||
|
|
||||||
|
// create pod, view initializing
|
||||||
|
obj, err := registry.Create(testContext, podInitializing, true)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
|
||||||
|
// update the pod with initialization failure, the pod should be deleted
|
||||||
|
pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure}
|
||||||
|
result, _, err := registry.Update(testContext, podInitializing.Name, rest.DefaultUpdatedObjectInfo(pod, scheme))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
_, err = registry.Get(testContext, podInitializing.Name, &metav1.GetOptions{})
|
||||||
|
if err == nil || !errors.IsNotFound(err) {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
pod = result.(*example.Pod)
|
||||||
|
if pod.Initializers == nil || pod.Initializers.Result == nil || pod.Initializers.Result.Status != metav1.StatusFailure {
|
||||||
|
t.Fatalf("Pod returned from update was not correct: %#v", pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestNonGracefulStoreHandleFinalizers(t *testing.T) {
|
func TestNonGracefulStoreHandleFinalizers(t *testing.T) {
|
||||||
initialGeneration := int64(1)
|
initialGeneration := int64(1)
|
||||||
podWithFinalizer := &example.Pod{
|
podWithFinalizer := &example.Pod{
|
||||||
|
|
@ -747,7 +987,7 @@ func TestNonGracefulStoreHandleFinalizers(t *testing.T) {
|
||||||
registry.EnableGarbageCollection = true
|
registry.EnableGarbageCollection = true
|
||||||
defer destroyFunc()
|
defer destroyFunc()
|
||||||
// create pod
|
// create pod
|
||||||
_, err := registry.Create(testContext, podWithFinalizer)
|
_, err := registry.Create(testContext, podWithFinalizer, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1048,7 +1288,7 @@ func TestStoreDeleteWithOrphanDependents(t *testing.T) {
|
||||||
for _, tc := range testcases {
|
for _, tc := range testcases {
|
||||||
registry.DeleteStrategy = tc.strategy
|
registry.DeleteStrategy = tc.strategy
|
||||||
// create pod
|
// create pod
|
||||||
_, err := registry.Create(testContext, tc.pod)
|
_, err := registry.Create(testContext, tc.pod, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1267,7 +1507,7 @@ func TestStoreDeletionPropagation(t *testing.T) {
|
||||||
i++
|
i++
|
||||||
pod := createPod(i, tc.existingFinalizers)
|
pod := createPod(i, tc.existingFinalizers)
|
||||||
// create pod
|
// create pod
|
||||||
_, err := registry.Create(testContext, pod)
|
_, err := registry.Create(testContext, pod, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1311,10 +1551,10 @@ func TestStoreDeleteCollection(t *testing.T) {
|
||||||
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||||
defer destroyFunc()
|
defer destroyFunc()
|
||||||
|
|
||||||
if _, err := registry.Create(testContext, podA); err != nil {
|
if _, err := registry.Create(testContext, podA, false); err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
if _, err := registry.Create(testContext, podB); err != nil {
|
if _, err := registry.Create(testContext, podB, false); err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1347,10 +1587,10 @@ func TestStoreDeleteCollectionNotFound(t *testing.T) {
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
// Setup
|
// Setup
|
||||||
if _, err := registry.Create(testContext, podA); err != nil {
|
if _, err := registry.Create(testContext, podA, false); err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
if _, err := registry.Create(testContext, podB); err != nil {
|
if _, err := registry.Create(testContext, podB, false); err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1386,7 +1626,7 @@ func TestStoreDeleteCollectionWithWatch(t *testing.T) {
|
||||||
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||||
defer destroyFunc()
|
defer destroyFunc()
|
||||||
|
|
||||||
objCreated, err := registry.Create(testContext, podA)
|
objCreated, err := registry.Create(testContext, podA, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1455,7 +1695,7 @@ func TestStoreWatch(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%v: unexpected error: %v", name, err)
|
t.Errorf("%v: unexpected error: %v", name, err)
|
||||||
} else {
|
} else {
|
||||||
obj, err := registry.Create(testContext, podA)
|
obj, err := registry.Create(testContext, podA, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
got, open := <-wi.ResultChan()
|
got, open := <-wi.ResultChan()
|
||||||
if !open {
|
if !open {
|
||||||
|
|
@ -1530,12 +1770,12 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
|
||||||
return storage.SelectionPredicate{
|
return storage.SelectionPredicate{
|
||||||
Label: label,
|
Label: label,
|
||||||
Field: field,
|
Field: field,
|
||||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
pod, ok := obj.(*example.Pod)
|
pod, ok := obj.(*example.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil, fmt.Errorf("not a pod")
|
return nil, nil, false, fmt.Errorf("not a pod")
|
||||||
}
|
}
|
||||||
return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(&pod.ObjectMeta, true), nil
|
return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(&pod.ObjectMeta, true), pod.Initializers != nil, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -174,8 +174,9 @@ type Creater interface {
|
||||||
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
|
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
|
||||||
New() runtime.Object
|
New() runtime.Object
|
||||||
|
|
||||||
// Create creates a new version of a resource.
|
// Create creates a new version of a resource. If includeUninitialized is set, the object may be returned
|
||||||
Create(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error)
|
// without completing initialization.
|
||||||
|
Create(ctx genericapirequest.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NamedCreater is an object that can create an instance of a RESTful object using a name parameter.
|
// NamedCreater is an object that can create an instance of a RESTful object using a name parameter.
|
||||||
|
|
@ -186,8 +187,9 @@ type NamedCreater interface {
|
||||||
|
|
||||||
// Create creates a new version of a resource. It expects a name parameter from the path.
|
// Create creates a new version of a resource. It expects a name parameter from the path.
|
||||||
// This is needed for create operations on subresources which include the name of the parent
|
// This is needed for create operations on subresources which include the name of the parent
|
||||||
// resource in the path.
|
// resource in the path. If includeUninitialized is set, the object may be returned without
|
||||||
Create(ctx genericapirequest.Context, name string, obj runtime.Object) (runtime.Object, error)
|
// completing initialization.
|
||||||
|
Create(ctx genericapirequest.Context, name string, obj runtime.Object, includeUninitialized bool) (runtime.Object, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdatedObjectInfo provides information about an updated object to an Updater.
|
// UpdatedObjectInfo provides information about an updated object to an Updater.
|
||||||
|
|
|
||||||
|
|
@ -251,7 +251,7 @@ func (t *Tester) testCreateAlreadyExisting(obj runtime.Object, createFn CreateFu
|
||||||
}
|
}
|
||||||
defer t.delete(ctx, foo)
|
defer t.delete(ctx, foo)
|
||||||
|
|
||||||
_, err := t.storage.(rest.Creater).Create(ctx, foo)
|
_, err := t.storage.(rest.Creater).Create(ctx, foo, false)
|
||||||
if !errors.IsAlreadyExists(err) {
|
if !errors.IsAlreadyExists(err) {
|
||||||
t.Errorf("expected already exists err, got %v", err)
|
t.Errorf("expected already exists err, got %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -263,7 +263,7 @@ func (t *Tester) testCreateEquals(obj runtime.Object, getFn GetFunc) {
|
||||||
foo := copyOrDie(obj, t.scheme)
|
foo := copyOrDie(obj, t.scheme)
|
||||||
t.setObjectMeta(foo, t.namer(2))
|
t.setObjectMeta(foo, t.namer(2))
|
||||||
|
|
||||||
created, err := t.storage.(rest.Creater).Create(ctx, foo)
|
created, err := t.storage.(rest.Creater).Create(ctx, foo, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -291,7 +291,7 @@ func (t *Tester) testCreateDiscardsObjectNamespace(valid runtime.Object) {
|
||||||
objectMeta.SetNamespace("not-default")
|
objectMeta.SetNamespace("not-default")
|
||||||
|
|
||||||
// Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted
|
// Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted
|
||||||
created, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme))
|
created, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -307,7 +307,7 @@ func (t *Tester) testCreateGeneratesName(valid runtime.Object) {
|
||||||
objectMeta.SetName("")
|
objectMeta.SetName("")
|
||||||
objectMeta.SetGenerateName("test-")
|
objectMeta.SetGenerateName("test-")
|
||||||
|
|
||||||
created, err := t.storage.(rest.Creater).Create(t.TestContext(), valid)
|
created, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -322,7 +322,7 @@ func (t *Tester) testCreateHasMetadata(valid runtime.Object) {
|
||||||
objectMeta.SetName(t.namer(1))
|
objectMeta.SetName(t.namer(1))
|
||||||
objectMeta.SetNamespace(t.TestNamespace())
|
objectMeta.SetNamespace(t.TestNamespace())
|
||||||
|
|
||||||
obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid)
|
obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -340,7 +340,7 @@ func (t *Tester) testCreateIgnoresContextNamespace(valid runtime.Object) {
|
||||||
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "not-default2")
|
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "not-default2")
|
||||||
|
|
||||||
// Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted
|
// Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted
|
||||||
created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme))
|
created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -359,7 +359,7 @@ func (t *Tester) testCreateIgnoresMismatchedNamespace(valid runtime.Object) {
|
||||||
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "not-default2")
|
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "not-default2")
|
||||||
|
|
||||||
// Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted
|
// Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted
|
||||||
created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme))
|
created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -377,7 +377,7 @@ func (t *Tester) testCreateValidatesNames(valid runtime.Object) {
|
||||||
objCopyMeta.SetName(invalidName)
|
objCopyMeta.SetName(invalidName)
|
||||||
|
|
||||||
ctx := t.TestContext()
|
ctx := t.TestContext()
|
||||||
_, err := t.storage.(rest.Creater).Create(ctx, objCopy)
|
_, err := t.storage.(rest.Creater).Create(ctx, objCopy, false)
|
||||||
if !errors.IsInvalid(err) {
|
if !errors.IsInvalid(err) {
|
||||||
t.Errorf("%s: Expected to get an invalid resource error, got '%v'", invalidName, err)
|
t.Errorf("%s: Expected to get an invalid resource error, got '%v'", invalidName, err)
|
||||||
}
|
}
|
||||||
|
|
@ -389,7 +389,7 @@ func (t *Tester) testCreateValidatesNames(valid runtime.Object) {
|
||||||
objCopyMeta.SetName(objCopyMeta.GetName() + invalidSuffix)
|
objCopyMeta.SetName(objCopyMeta.GetName() + invalidSuffix)
|
||||||
|
|
||||||
ctx := t.TestContext()
|
ctx := t.TestContext()
|
||||||
_, err := t.storage.(rest.Creater).Create(ctx, objCopy)
|
_, err := t.storage.(rest.Creater).Create(ctx, objCopy, false)
|
||||||
if !errors.IsInvalid(err) {
|
if !errors.IsInvalid(err) {
|
||||||
t.Errorf("%s: Expected to get an invalid resource error, got '%v'", invalidSuffix, err)
|
t.Errorf("%s: Expected to get an invalid resource error, got '%v'", invalidSuffix, err)
|
||||||
}
|
}
|
||||||
|
|
@ -399,7 +399,7 @@ func (t *Tester) testCreateValidatesNames(valid runtime.Object) {
|
||||||
func (t *Tester) testCreateInvokesValidation(invalid ...runtime.Object) {
|
func (t *Tester) testCreateInvokesValidation(invalid ...runtime.Object) {
|
||||||
for i, obj := range invalid {
|
for i, obj := range invalid {
|
||||||
ctx := t.TestContext()
|
ctx := t.TestContext()
|
||||||
_, err := t.storage.(rest.Creater).Create(ctx, obj)
|
_, err := t.storage.(rest.Creater).Create(ctx, obj, false)
|
||||||
if !errors.IsInvalid(err) {
|
if !errors.IsInvalid(err) {
|
||||||
t.Errorf("%d: Expected to get an invalid resource error, got %v", i, err)
|
t.Errorf("%d: Expected to get an invalid resource error, got %v", i, err)
|
||||||
}
|
}
|
||||||
|
|
@ -410,7 +410,7 @@ func (t *Tester) testCreateRejectsMismatchedNamespace(valid runtime.Object) {
|
||||||
objectMeta := t.getObjectMetaOrFail(valid)
|
objectMeta := t.getObjectMetaOrFail(valid)
|
||||||
objectMeta.SetNamespace("not-default")
|
objectMeta.SetNamespace("not-default")
|
||||||
|
|
||||||
_, err := t.storage.(rest.Creater).Create(t.TestContext(), valid)
|
_, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("Expected an error, but we didn't get one")
|
t.Errorf("Expected an error, but we didn't get one")
|
||||||
} else if !strings.Contains(err.Error(), "does not match the namespace sent on the request") {
|
} else if !strings.Contains(err.Error(), "does not match the namespace sent on the request") {
|
||||||
|
|
@ -424,7 +424,7 @@ func (t *Tester) testCreateResetsUserData(valid runtime.Object) {
|
||||||
objectMeta.SetUID("bad-uid")
|
objectMeta.SetUID("bad-uid")
|
||||||
objectMeta.SetCreationTimestamp(now)
|
objectMeta.SetCreationTimestamp(now)
|
||||||
|
|
||||||
obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid)
|
obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -442,7 +442,7 @@ func (t *Tester) testCreateIgnoreClusterName(valid runtime.Object) {
|
||||||
objectMeta.SetName(t.namer(3))
|
objectMeta.SetName(t.namer(3))
|
||||||
objectMeta.SetClusterName("clustername-to-ignore")
|
objectMeta.SetClusterName("clustername-to-ignore")
|
||||||
|
|
||||||
obj, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme))
|
obj, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1071,14 +1071,14 @@ func (t *Tester) testGetDifferentNamespace(obj runtime.Object) {
|
||||||
|
|
||||||
ctx1 := genericapirequest.WithNamespace(genericapirequest.NewContext(), "bar3")
|
ctx1 := genericapirequest.WithNamespace(genericapirequest.NewContext(), "bar3")
|
||||||
objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx1))
|
objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx1))
|
||||||
_, err := t.storage.(rest.Creater).Create(ctx1, obj)
|
_, err := t.storage.(rest.Creater).Create(ctx1, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx2 := genericapirequest.WithNamespace(genericapirequest.NewContext(), "bar4")
|
ctx2 := genericapirequest.WithNamespace(genericapirequest.NewContext(), "bar4")
|
||||||
objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx2))
|
objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx2))
|
||||||
_, err = t.storage.(rest.Creater).Create(ctx2, obj)
|
_, err = t.storage.(rest.Creater).Create(ctx2, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1112,7 +1112,7 @@ func (t *Tester) testGetFound(obj runtime.Object) {
|
||||||
ctx := t.TestContext()
|
ctx := t.TestContext()
|
||||||
t.setObjectMeta(obj, t.namer(1))
|
t.setObjectMeta(obj, t.namer(1))
|
||||||
|
|
||||||
existing, err := t.storage.(rest.Creater).Create(ctx, obj)
|
existing, err := t.storage.(rest.Creater).Create(ctx, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1135,7 +1135,7 @@ func (t *Tester) testGetMimatchedNamespace(obj runtime.Object) {
|
||||||
objMeta := t.getObjectMetaOrFail(obj)
|
objMeta := t.getObjectMetaOrFail(obj)
|
||||||
objMeta.SetName(t.namer(4))
|
objMeta.SetName(t.namer(4))
|
||||||
objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx1))
|
objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx1))
|
||||||
_, err := t.storage.(rest.Creater).Create(ctx1, obj)
|
_, err := t.storage.(rest.Creater).Create(ctx1, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -1154,7 +1154,7 @@ func (t *Tester) testGetMimatchedNamespace(obj runtime.Object) {
|
||||||
func (t *Tester) testGetNotFound(obj runtime.Object) {
|
func (t *Tester) testGetNotFound(obj runtime.Object) {
|
||||||
ctx := t.TestContext()
|
ctx := t.TestContext()
|
||||||
t.setObjectMeta(obj, t.namer(2))
|
t.setObjectMeta(obj, t.namer(2))
|
||||||
_, err := t.storage.(rest.Creater).Create(ctx, obj)
|
_, err := t.storage.(rest.Creater).Create(ctx, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,7 @@ type AdmissionOptions struct {
|
||||||
func NewAdmissionOptions() *AdmissionOptions {
|
func NewAdmissionOptions() *AdmissionOptions {
|
||||||
options := &AdmissionOptions{
|
options := &AdmissionOptions{
|
||||||
Plugins: &admission.Plugins{},
|
Plugins: &admission.Plugins{},
|
||||||
PluginNames: []string{},
|
PluginNames: []string{"Initializers"},
|
||||||
}
|
}
|
||||||
server.RegisterAllAdmissionPlugins(options.Plugins)
|
server.RegisterAllAdmissionPlugins(options.Plugins)
|
||||||
return options
|
return options
|
||||||
|
|
|
||||||
|
|
@ -62,8 +62,8 @@ type CacherConfig struct {
|
||||||
// KeyFunc is used to get a key in the underlying storage for a given object.
|
// KeyFunc is used to get a key in the underlying storage for a given object.
|
||||||
KeyFunc func(runtime.Object) (string, error)
|
KeyFunc func(runtime.Object) (string, error)
|
||||||
|
|
||||||
// GetAttrsFunc is used to get object labels and fields.
|
// GetAttrsFunc is used to get object labels, fields, and the uninitialized bool
|
||||||
GetAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
|
GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error)
|
||||||
|
|
||||||
// TriggerPublisherFunc is used for optimizing amount of watchers that
|
// TriggerPublisherFunc is used for optimizing amount of watchers that
|
||||||
// needs to process an incoming event.
|
// needs to process an incoming event.
|
||||||
|
|
@ -131,7 +131,7 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type watchFilterFunc func(string, labels.Set, fields.Set) bool
|
type watchFilterFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool
|
||||||
|
|
||||||
// Cacher is responsible for serving WATCH and LIST requests for a given
|
// Cacher is responsible for serving WATCH and LIST requests for a given
|
||||||
// resource from its internal cache and updating its cache in the background
|
// resource from its internal cache and updating its cache in the background
|
||||||
|
|
@ -658,11 +658,11 @@ func filterFunction(key string, p SelectionPredicate) func(string, runtime.Objec
|
||||||
}
|
}
|
||||||
|
|
||||||
func watchFilterFunction(key string, p SelectionPredicate) watchFilterFunc {
|
func watchFilterFunction(key string, p SelectionPredicate) watchFilterFunc {
|
||||||
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
|
filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool {
|
||||||
if !hasPathPrefix(objKey, key) {
|
if !hasPathPrefix(objKey, key) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return p.MatchesLabelsAndFields(label, field)
|
return p.MatchesObjectAttributes(label, field, uninitialized)
|
||||||
}
|
}
|
||||||
return filterFunc
|
return filterFunc
|
||||||
}
|
}
|
||||||
|
|
@ -840,10 +840,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
|
||||||
|
|
||||||
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
||||||
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
||||||
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
|
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.ObjUninitialized)
|
||||||
oldObjPasses := false
|
oldObjPasses := false
|
||||||
if event.PrevObject != nil {
|
if event.PrevObject != nil {
|
||||||
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
|
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObjUninitialized)
|
||||||
}
|
}
|
||||||
if !curObjPasses && !oldObjPasses {
|
if !curObjPasses && !oldObjPasses {
|
||||||
// Watcher is not interested in that object.
|
// Watcher is not interested in that object.
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ import (
|
||||||
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
||||||
var lock sync.RWMutex
|
var lock sync.RWMutex
|
||||||
count := 0
|
count := 0
|
||||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
filter := func(string, labels.Set, fields.Set, bool) bool { return true }
|
||||||
forget := func(bool) {
|
forget := func(bool) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
|
|
@ -61,7 +61,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCacheWatcherHandlesFiltering(t *testing.T) {
|
func TestCacheWatcherHandlesFiltering(t *testing.T) {
|
||||||
filter := func(_ string, _ labels.Set, field fields.Set) bool {
|
filter := func(_ string, _ labels.Set, field fields.Set, _ bool) bool {
|
||||||
return field["spec.nodeName"] == "host"
|
return field["spec.nodeName"] == "host"
|
||||||
}
|
}
|
||||||
forget := func(bool) {}
|
forget := func(bool) {}
|
||||||
|
|
|
||||||
|
|
@ -249,9 +249,9 @@ func TestListFiltered(t *testing.T) {
|
||||||
p := storage.SelectionPredicate{
|
p := storage.SelectionPredicate{
|
||||||
Label: labels.Everything(),
|
Label: labels.Everything(),
|
||||||
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "bar"}),
|
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "bar"}),
|
||||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
pod := obj.(*example.Pod)
|
pod := obj.(*example.Pod)
|
||||||
return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, nil
|
return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
var got example.PodList
|
var got example.PodList
|
||||||
|
|
|
||||||
|
|
@ -285,9 +285,9 @@ func TestGetToList(t *testing.T) {
|
||||||
pred: storage.SelectionPredicate{
|
pred: storage.SelectionPredicate{
|
||||||
Label: labels.Everything(),
|
Label: labels.Everything(),
|
||||||
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
|
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
|
||||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
pod := obj.(*example.Pod)
|
pod := obj.(*example.Pod)
|
||||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedOut: nil,
|
expectedOut: nil,
|
||||||
|
|
@ -644,9 +644,9 @@ func TestList(t *testing.T) {
|
||||||
pred: storage.SelectionPredicate{
|
pred: storage.SelectionPredicate{
|
||||||
Label: labels.Everything(),
|
Label: labels.Everything(),
|
||||||
Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name),
|
Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name),
|
||||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
pod := obj.(*example.Pod)
|
pod := obj.(*example.Pod)
|
||||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedOut: nil,
|
expectedOut: nil,
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,7 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
|
||||||
resultChan: make(chan watch.Event, outgoingBufSize),
|
resultChan: make(chan watch.Event, outgoingBufSize),
|
||||||
errChan: make(chan error, 1),
|
errChan: make(chan error, 1),
|
||||||
}
|
}
|
||||||
if pred.Label.Empty() && pred.Field.Empty() {
|
if pred.Empty() {
|
||||||
// The filter doesn't filter out any object.
|
// The filter doesn't filter out any object.
|
||||||
wc.internalFilter = nil
|
wc.internalFilter = nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -73,9 +73,9 @@ func testWatch(t *testing.T, recursive bool) {
|
||||||
pred: storage.SelectionPredicate{
|
pred: storage.SelectionPredicate{
|
||||||
Label: labels.Everything(),
|
Label: labels.Everything(),
|
||||||
Field: fields.ParseSelectorOrDie("metadata.name=bar"),
|
Field: fields.ParseSelectorOrDie("metadata.name=bar"),
|
||||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
pod := obj.(*example.Pod)
|
pod := obj.(*example.Pod)
|
||||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, { // update
|
}, { // update
|
||||||
|
|
@ -88,9 +88,9 @@ func testWatch(t *testing.T, recursive bool) {
|
||||||
pred: storage.SelectionPredicate{
|
pred: storage.SelectionPredicate{
|
||||||
Label: labels.Everything(),
|
Label: labels.Everything(),
|
||||||
Field: fields.ParseSelectorOrDie("metadata.name!=bar"),
|
Field: fields.ParseSelectorOrDie("metadata.name!=bar"),
|
||||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
pod := obj.(*example.Pod)
|
pod := obj.(*example.Pod)
|
||||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}}
|
}}
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,8 @@ type FilterFunc func(obj runtime.Object) bool
|
||||||
var Everything = SelectionPredicate{
|
var Everything = SelectionPredicate{
|
||||||
Label: labels.Everything(),
|
Label: labels.Everything(),
|
||||||
Field: fields.Everything(),
|
Field: fields.Everything(),
|
||||||
|
// TODO: split this into a new top level constant?
|
||||||
|
IncludeUninitialized: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update
|
// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update
|
||||||
|
|
|
||||||
|
|
@ -22,29 +22,33 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AttrFunc returns label and field sets for List or Watch to match.
|
// AttrFunc returns label and field sets and the uninitialized flag for List or Watch to match.
|
||||||
// In any failure to parse given object, it returns error.
|
// In any failure to parse given object, it returns error.
|
||||||
type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, error)
|
type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, bool, error)
|
||||||
|
|
||||||
// SelectionPredicate is used to represent the way to select objects from api storage.
|
// SelectionPredicate is used to represent the way to select objects from api storage.
|
||||||
type SelectionPredicate struct {
|
type SelectionPredicate struct {
|
||||||
Label labels.Selector
|
Label labels.Selector
|
||||||
Field fields.Selector
|
Field fields.Selector
|
||||||
GetAttrs AttrFunc
|
IncludeUninitialized bool
|
||||||
IndexFields []string
|
GetAttrs AttrFunc
|
||||||
|
IndexFields []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Matches returns true if the given object's labels and fields (as
|
// Matches returns true if the given object's labels and fields (as
|
||||||
// returned by s.GetAttrs) match s.Label and s.Field. An error is
|
// returned by s.GetAttrs) match s.Label and s.Field. An error is
|
||||||
// returned if s.GetAttrs fails.
|
// returned if s.GetAttrs fails.
|
||||||
func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
|
func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
|
||||||
if s.Label.Empty() && s.Field.Empty() {
|
if s.Empty() {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
labels, fields, err := s.GetAttrs(obj)
|
labels, fields, uninitialized, err := s.GetAttrs(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
if !s.IncludeUninitialized && uninitialized {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
matched := s.Label.Matches(labels)
|
matched := s.Label.Matches(labels)
|
||||||
if matched && s.Field != nil {
|
if matched && s.Field != nil {
|
||||||
matched = (matched && s.Field.Matches(fields))
|
matched = (matched && s.Field.Matches(fields))
|
||||||
|
|
@ -52,9 +56,12 @@ func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
|
||||||
return matched, nil
|
return matched, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MatchesLabelsAndFields returns true if the given labels and fields
|
// MatchesObjectAttributes returns true if the given labels and fields
|
||||||
// match s.Label and s.Field.
|
// match s.Label and s.Field.
|
||||||
func (s *SelectionPredicate) MatchesLabelsAndFields(l labels.Set, f fields.Set) bool {
|
func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set, uninitialized bool) bool {
|
||||||
|
if !s.IncludeUninitialized && uninitialized {
|
||||||
|
return false
|
||||||
|
}
|
||||||
if s.Label.Empty() && s.Field.Empty() {
|
if s.Label.Empty() && s.Field.Empty() {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
@ -93,10 +100,11 @@ func (s *SelectionPredicate) RemoveMatchesSingleRequirements() (SelectionPredica
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SelectionPredicate{
|
return SelectionPredicate{
|
||||||
Label: s.Label,
|
Label: s.Label,
|
||||||
Field: fieldsSelector,
|
Field: fieldsSelector,
|
||||||
GetAttrs: s.GetAttrs,
|
IncludeUninitialized: s.IncludeUninitialized,
|
||||||
IndexFields: s.IndexFields,
|
GetAttrs: s.GetAttrs,
|
||||||
|
IndexFields: s.IndexFields,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -113,3 +121,8 @@ func (s *SelectionPredicate) MatcherIndex() []MatchValue {
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Empty returns true if the predicate performs no filtering.
|
||||||
|
func (s *SelectionPredicate) Empty() bool {
|
||||||
|
return s.Label.Empty() && s.Field.Empty() && s.IncludeUninitialized
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,7 @@ func TestSelectionPredicate(t *testing.T) {
|
||||||
labelSelector, fieldSelector string
|
labelSelector, fieldSelector string
|
||||||
labels labels.Set
|
labels labels.Set
|
||||||
fields fields.Set
|
fields fields.Set
|
||||||
|
uninitialized bool
|
||||||
err error
|
err error
|
||||||
shouldMatch bool
|
shouldMatch bool
|
||||||
matchSingleKey string
|
matchSingleKey string
|
||||||
|
|
@ -74,6 +75,14 @@ func TestSelectionPredicate(t *testing.T) {
|
||||||
shouldMatch: true,
|
shouldMatch: true,
|
||||||
matchSingleKey: "12345",
|
matchSingleKey: "12345",
|
||||||
},
|
},
|
||||||
|
"E": {
|
||||||
|
fieldSelector: "metadata.name=12345",
|
||||||
|
labels: labels.Set{},
|
||||||
|
fields: fields.Set{"metadata.name": "12345"},
|
||||||
|
uninitialized: true,
|
||||||
|
shouldMatch: false,
|
||||||
|
matchSingleKey: "12345",
|
||||||
|
},
|
||||||
"error": {
|
"error": {
|
||||||
labelSelector: "name=foo",
|
labelSelector: "name=foo",
|
||||||
fieldSelector: "uid=12345",
|
fieldSelector: "uid=12345",
|
||||||
|
|
@ -94,8 +103,8 @@ func TestSelectionPredicate(t *testing.T) {
|
||||||
sp := &SelectionPredicate{
|
sp := &SelectionPredicate{
|
||||||
Label: parsedLabel,
|
Label: parsedLabel,
|
||||||
Field: parsedField,
|
Field: parsedField,
|
||||||
GetAttrs: func(runtime.Object) (label labels.Set, field fields.Set, err error) {
|
GetAttrs: func(runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) {
|
||||||
return item.labels, item.fields, item.err
|
return item.labels, item.fields, item.uninitialized, item.err
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
got, err := sp.Matches(&Ignored{})
|
got, err := sp.Matches(&Ignored{})
|
||||||
|
|
|
||||||
|
|
@ -61,12 +61,12 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAttrs returns labels and fields of a given object for filtering purposes.
|
// GetAttrs returns labels and fields of a given object for filtering purposes.
|
||||||
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
|
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
pod, ok := obj.(*example.Pod)
|
pod, ok := obj.(*example.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil, fmt.Errorf("not a pod")
|
return nil, nil, false, fmt.Errorf("not a pod")
|
||||||
}
|
}
|
||||||
return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), nil
|
return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), pod.Initializers != nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodToSelectableFields returns a field set that represents the object
|
// PodToSelectableFields returns a field set that represents the object
|
||||||
|
|
@ -469,12 +469,12 @@ func TestFiltering(t *testing.T) {
|
||||||
pred := storage.SelectionPredicate{
|
pred := storage.SelectionPredicate{
|
||||||
Label: labels.SelectorFromSet(labels.Set{"filter": "foo"}),
|
Label: labels.SelectorFromSet(labels.Set{"filter": "foo"}),
|
||||||
Field: fields.Everything(),
|
Field: fields.Everything(),
|
||||||
GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) {
|
GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) {
|
||||||
metadata, err := meta.Accessor(obj)
|
metadata, err := meta.Accessor(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
return labels.Set(metadata.GetLabels()), nil, nil
|
return labels.Set(metadata.GetLabels()), nil, metadata.GetInitializers() != nil, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, pred)
|
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, pred)
|
||||||
|
|
|
||||||
|
|
@ -47,15 +47,17 @@ const (
|
||||||
// the previous value of the object to enable proper filtering in the
|
// the previous value of the object to enable proper filtering in the
|
||||||
// upper layers.
|
// upper layers.
|
||||||
type watchCacheEvent struct {
|
type watchCacheEvent struct {
|
||||||
Type watch.EventType
|
Type watch.EventType
|
||||||
Object runtime.Object
|
Object runtime.Object
|
||||||
ObjLabels labels.Set
|
ObjLabels labels.Set
|
||||||
ObjFields fields.Set
|
ObjFields fields.Set
|
||||||
PrevObject runtime.Object
|
ObjUninitialized bool
|
||||||
PrevObjLabels labels.Set
|
PrevObject runtime.Object
|
||||||
PrevObjFields fields.Set
|
PrevObjLabels labels.Set
|
||||||
Key string
|
PrevObjFields fields.Set
|
||||||
ResourceVersion uint64
|
PrevObjUninitialized bool
|
||||||
|
Key string
|
||||||
|
ResourceVersion uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Computing a key of an object is generally non-trivial (it performs
|
// Computing a key of an object is generally non-trivial (it performs
|
||||||
|
|
@ -102,7 +104,7 @@ type watchCache struct {
|
||||||
keyFunc func(runtime.Object) (string, error)
|
keyFunc func(runtime.Object) (string, error)
|
||||||
|
|
||||||
// getAttrsFunc is used to get labels and fields of an object.
|
// getAttrsFunc is used to get labels and fields of an object.
|
||||||
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
|
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)
|
||||||
|
|
||||||
// cache is used a cyclic buffer - its first element (with the smallest
|
// cache is used a cyclic buffer - its first element (with the smallest
|
||||||
// resourceVersion) is defined by startIndex, its last element is defined
|
// resourceVersion) is defined by startIndex, its last element is defined
|
||||||
|
|
@ -136,7 +138,7 @@ type watchCache struct {
|
||||||
func newWatchCache(
|
func newWatchCache(
|
||||||
capacity int,
|
capacity int,
|
||||||
keyFunc func(runtime.Object) (string, error),
|
keyFunc func(runtime.Object) (string, error),
|
||||||
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)) *watchCache {
|
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)) *watchCache {
|
||||||
wc := &watchCache{
|
wc := &watchCache{
|
||||||
capacity: capacity,
|
capacity: capacity,
|
||||||
keyFunc: keyFunc,
|
keyFunc: keyFunc,
|
||||||
|
|
@ -229,30 +231,33 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
objLabels, objFields, err := w.getAttrsFunc(event.Object)
|
objLabels, objFields, objUninitialized, err := w.getAttrsFunc(event.Object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var prevObject runtime.Object
|
var prevObject runtime.Object
|
||||||
var prevObjLabels labels.Set
|
var prevObjLabels labels.Set
|
||||||
var prevObjFields fields.Set
|
var prevObjFields fields.Set
|
||||||
|
var prevObjUninitialized bool
|
||||||
if exists {
|
if exists {
|
||||||
prevObject = previous.(*storeElement).Object
|
prevObject = previous.(*storeElement).Object
|
||||||
prevObjLabels, prevObjFields, err = w.getAttrsFunc(prevObject)
|
prevObjLabels, prevObjFields, prevObjUninitialized, err = w.getAttrsFunc(prevObject)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
watchCacheEvent := &watchCacheEvent{
|
watchCacheEvent := &watchCacheEvent{
|
||||||
Type: event.Type,
|
Type: event.Type,
|
||||||
Object: event.Object,
|
Object: event.Object,
|
||||||
ObjLabels: objLabels,
|
ObjLabels: objLabels,
|
||||||
ObjFields: objFields,
|
ObjFields: objFields,
|
||||||
PrevObject: prevObject,
|
ObjUninitialized: objUninitialized,
|
||||||
PrevObjLabels: prevObjLabels,
|
PrevObject: prevObject,
|
||||||
PrevObjFields: prevObjFields,
|
PrevObjLabels: prevObjLabels,
|
||||||
Key: key,
|
PrevObjFields: prevObjFields,
|
||||||
ResourceVersion: resourceVersion,
|
PrevObjUninitialized: prevObjUninitialized,
|
||||||
|
Key: key,
|
||||||
|
ResourceVersion: resourceVersion,
|
||||||
}
|
}
|
||||||
if w.onEvent != nil {
|
if w.onEvent != nil {
|
||||||
w.onEvent(watchCacheEvent)
|
w.onEvent(watchCacheEvent)
|
||||||
|
|
@ -425,17 +430,18 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("not a storeElement: %v", elem)
|
return nil, fmt.Errorf("not a storeElement: %v", elem)
|
||||||
}
|
}
|
||||||
objLabels, objFields, err := w.getAttrsFunc(elem.Object)
|
objLabels, objFields, objUninitialized, err := w.getAttrsFunc(elem.Object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
result[i] = &watchCacheEvent{
|
result[i] = &watchCacheEvent{
|
||||||
Type: watch.Added,
|
Type: watch.Added,
|
||||||
Object: elem.Object,
|
Object: elem.Object,
|
||||||
ObjLabels: objLabels,
|
ObjLabels: objLabels,
|
||||||
ObjFields: objFields,
|
ObjFields: objFields,
|
||||||
Key: elem.Key,
|
ObjUninitialized: objUninitialized,
|
||||||
ResourceVersion: w.resourceVersion,
|
Key: elem.Key,
|
||||||
|
ResourceVersion: w.resourceVersion,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
|
|
|
||||||
|
|
@ -50,8 +50,8 @@ func newTestWatchCache(capacity int) *watchCache {
|
||||||
keyFunc := func(obj runtime.Object) (string, error) {
|
keyFunc := func(obj runtime.Object) (string, error) {
|
||||||
return NamespaceKeyFunc("prefix", obj)
|
return NamespaceKeyFunc("prefix", obj)
|
||||||
}
|
}
|
||||||
getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
return nil, nil, nil
|
return nil, nil, false, nil
|
||||||
}
|
}
|
||||||
wc := newWatchCache(capacity, keyFunc, getAttrsFunc)
|
wc := newWatchCache(capacity, keyFunc, getAttrsFunc)
|
||||||
wc.clock = clock.NewFakeClock(time.Now())
|
wc.clock = clock.NewFakeClock(time.Now())
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue