Deprecate and remove use of alpha metadata.initializers field, remove IncludeUninitialized options
Kubernetes-commit: 17aa60686ebe5fd04b4fe6f442dc36a8d70b6730
This commit is contained in:
parent
123cf8011f
commit
80029a760c
|
@ -335,7 +335,6 @@ type SimpleRESTStorage struct {
|
|||
fakeWatch *watch.FakeWatcher
|
||||
requestedLabelSelector labels.Selector
|
||||
requestedFieldSelector fields.Selector
|
||||
requestedUninitialized bool
|
||||
requestedResourceVersion string
|
||||
requestedResourceNamespace string
|
||||
|
||||
|
@ -390,7 +389,6 @@ func (storage *SimpleRESTStorage) List(ctx context.Context, options *metainterna
|
|||
if options != nil && options.FieldSelector != nil {
|
||||
storage.requestedFieldSelector = options.FieldSelector
|
||||
}
|
||||
storage.requestedUninitialized = options.IncludeUninitialized
|
||||
return result, storage.errors["list"]
|
||||
}
|
||||
|
||||
|
@ -1689,52 +1687,6 @@ func TestGetCompression(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestGetUninitialized(t *testing.T) {
|
||||
storage := map[string]rest.Storage{}
|
||||
simpleStorage := SimpleRESTStorage{
|
||||
list: []genericapitesting.Simple{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Initializers: &metav1.Initializers{
|
||||
Pending: []metav1.Initializer{{Name: "test"}},
|
||||
},
|
||||
},
|
||||
Other: "foo",
|
||||
},
|
||||
},
|
||||
}
|
||||
selfLinker := &setTestSelfLinker{
|
||||
t: t,
|
||||
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id",
|
||||
alternativeSet: sets.NewString("/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple"),
|
||||
name: "id",
|
||||
namespace: "default",
|
||||
}
|
||||
storage["simple"] = &simpleStorage
|
||||
handler := handleLinker(storage, selfLinker)
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple?includeUninitialized=true")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected response: %#v", resp)
|
||||
}
|
||||
var itemOut genericapitesting.SimpleList
|
||||
body, err := extractBody(resp, &itemOut)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if len(itemOut.Items) != 1 || itemOut.Items[0].Other != "foo" {
|
||||
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body))
|
||||
}
|
||||
if !simpleStorage.requestedUninitialized {
|
||||
t.Errorf("Didn't set correct flag")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPretty(t *testing.T) {
|
||||
storage := map[string]rest.Storage{}
|
||||
simpleStorage := SimpleRESTStorage{
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
|
||||
|
@ -150,13 +149,7 @@ func createHandler(r rest.NamedCreater, scope RequestScope, admit admission.Inte
|
|||
}
|
||||
trace.Step("Object stored in database")
|
||||
|
||||
// If the object is partially initialized, always indicate it via StatusAccepted
|
||||
code := http.StatusCreated
|
||||
if accessor, err := meta.Accessor(result); err == nil {
|
||||
if accessor.GetInitializers() != nil {
|
||||
code = http.StatusAccepted
|
||||
}
|
||||
}
|
||||
status, ok := result.(*metav1.Status)
|
||||
if ok && err == nil && status.Code == 0 {
|
||||
status.Code = int32(code)
|
||||
|
|
|
@ -65,13 +65,6 @@ const (
|
|||
// Enables compression of REST responses (GET and LIST only)
|
||||
APIResponseCompression utilfeature.Feature = "APIResponseCompression"
|
||||
|
||||
// owner: @smarterclayton
|
||||
// alpha: v1.7
|
||||
//
|
||||
// Allow asynchronous coordination of object creation.
|
||||
// Auto-enabled by the Initializers admission plugin.
|
||||
Initializers utilfeature.Feature = "Initializers"
|
||||
|
||||
// owner: @smarterclayton
|
||||
// alpha: v1.8
|
||||
// beta: v1.9
|
||||
|
@ -103,7 +96,6 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
|
|||
AdvancedAuditing: {Default: true, PreRelease: utilfeature.GA},
|
||||
DynamicAuditing: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
Initializers: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
APIListChunking: {Default: true, PreRelease: utilfeature.Beta},
|
||||
DryRun: {Default: true, PreRelease: utilfeature.Beta},
|
||||
}
|
||||
|
|
|
@ -307,7 +307,6 @@ func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate,
|
|||
// By default we should serve the request from etcd.
|
||||
options = &metainternalversion.ListOptions{ResourceVersion: ""}
|
||||
}
|
||||
p.IncludeUninitialized = options.IncludeUninitialized
|
||||
p.Limit = options.Limit
|
||||
p.Continue = options.Continue
|
||||
list := e.NewListFunc()
|
||||
|
@ -380,92 +379,9 @@ func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
if !options.IncludeUninitialized {
|
||||
return e.WaitForInitialized(ctx, out)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// WaitForInitialized holds until the object is initialized, or returns an error if the default limit expires.
|
||||
// This method is exposed publicly for consumers of generic rest tooling.
|
||||
func (e *Store) WaitForInitialized(ctx context.Context, obj runtime.Object) (runtime.Object, error) {
|
||||
// return early if we don't have initializers, or if they've completed already
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return obj, nil
|
||||
}
|
||||
initializers := accessor.GetInitializers()
|
||||
if initializers == nil {
|
||||
return obj, nil
|
||||
}
|
||||
if result := initializers.Result; result != nil {
|
||||
return nil, kubeerr.FromObject(result)
|
||||
}
|
||||
|
||||
key, err := e.KeyFunc(ctx, accessor.GetName())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
qualifiedResource := e.qualifiedResourceFromContext(ctx)
|
||||
w, err := e.Storage.Watch(ctx, key, accessor.GetResourceVersion(), storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
|
||||
IncludeUninitialized: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer w.Stop()
|
||||
|
||||
latest := obj
|
||||
ch := w.ResultChan()
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-ch:
|
||||
if !ok {
|
||||
msg := fmt.Sprintf("server has timed out waiting for the initialization of %s %s",
|
||||
qualifiedResource.String(), accessor.GetName())
|
||||
return nil, kubeerr.NewTimeoutError(msg, 0)
|
||||
}
|
||||
switch event.Type {
|
||||
case watch.Deleted:
|
||||
if latest = event.Object; latest != nil {
|
||||
if accessor, err := meta.Accessor(latest); err == nil {
|
||||
if initializers := accessor.GetInitializers(); initializers != nil && initializers.Result != nil {
|
||||
// initialization failed, but we missed the modification event
|
||||
return nil, kubeerr.FromObject(initializers.Result)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, kubeerr.NewInternalError(fmt.Errorf("object deleted while waiting for creation"))
|
||||
case watch.Error:
|
||||
if status, ok := event.Object.(*metav1.Status); ok {
|
||||
return nil, &kubeerr.StatusError{ErrStatus: *status}
|
||||
}
|
||||
return nil, kubeerr.NewInternalError(fmt.Errorf("unexpected object in watch stream, can't complete initialization %T", event.Object))
|
||||
case watch.Modified:
|
||||
latest = event.Object
|
||||
accessor, err = meta.Accessor(latest)
|
||||
if err != nil {
|
||||
return nil, kubeerr.NewInternalError(fmt.Errorf("object no longer has access to metadata %T: %v", latest, err))
|
||||
}
|
||||
initializers := accessor.GetInitializers()
|
||||
if initializers == nil {
|
||||
// completed initialization
|
||||
return latest, nil
|
||||
}
|
||||
if result := initializers.Result; result != nil {
|
||||
// initialization failed
|
||||
return nil, kubeerr.FromObject(result)
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// shouldDeleteDuringUpdate checks if a Update is removing all the object's
|
||||
// finalizers. If so, it further checks if the object's
|
||||
// DeletionGracePeriodSeconds is 0.
|
||||
|
@ -483,20 +399,6 @@ func (e *Store) shouldDeleteDuringUpdate(ctx context.Context, key string, obj, e
|
|||
return len(newMeta.GetFinalizers()) == 0 && oldMeta.GetDeletionGracePeriodSeconds() != nil && *oldMeta.GetDeletionGracePeriodSeconds() == 0
|
||||
}
|
||||
|
||||
// shouldDeleteForFailedInitialization returns true if the provided object is initializing and has
|
||||
// a failure recorded.
|
||||
func (e *Store) shouldDeleteForFailedInitialization(ctx context.Context, obj runtime.Object) bool {
|
||||
m, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return false
|
||||
}
|
||||
if initializers := m.GetInitializers(); initializers != nil && initializers.Result != nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// deleteWithoutFinalizers handles deleting an object ignoring its finalizer list.
|
||||
// Used for objects that are either been finalized or have never initialized.
|
||||
func (e *Store) deleteWithoutFinalizers(ctx context.Context, name, key string, obj runtime.Object, preconditions *storage.Preconditions, dryRun bool) (runtime.Object, bool, error) {
|
||||
|
@ -652,10 +554,6 @@ func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObj
|
|||
return nil, false, err
|
||||
}
|
||||
|
||||
if e.shouldDeleteForFailedInitialization(ctx, out) {
|
||||
return e.deleteWithoutFinalizers(ctx, name, key, out, storagePreconditions, dryrun.IsDryRun(options.DryRun))
|
||||
}
|
||||
|
||||
if creating {
|
||||
if e.AfterCreate != nil {
|
||||
if err := e.AfterCreate(out); err != nil {
|
||||
|
@ -1055,11 +953,6 @@ func (e *Store) DeleteCollection(ctx context.Context, options *metav1.DeleteOpti
|
|||
listOptions = listOptions.DeepCopy()
|
||||
}
|
||||
|
||||
// DeleteCollection must remain backwards compatible with old clients that expect it to
|
||||
// remove all resources, initialized or not, within the type. It is also consistent with
|
||||
// Delete which does not require IncludeUninitialized
|
||||
listOptions.IncludeUninitialized = true
|
||||
|
||||
listObj, err := e.List(ctx, listOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1174,7 +1067,6 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti
|
|||
resourceVersion := ""
|
||||
if options != nil {
|
||||
resourceVersion = options.ResourceVersion
|
||||
predicate.IncludeUninitialized = options.IncludeUninitialized
|
||||
}
|
||||
return e.WatchPredicate(ctx, predicate, resourceVersion)
|
||||
}
|
||||
|
|
|
@ -42,11 +42,9 @@ import (
|
|||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
|
@ -56,8 +54,6 @@ import (
|
|||
"k8s.io/apiserver/pkg/storage/names"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
|
||||
)
|
||||
|
||||
var scheme = runtime.NewScheme()
|
||||
|
@ -127,9 +123,9 @@ func NewTestGenericStoreRegistry(t *testing.T) (factory.DestroyFunc, *Store) {
|
|||
return newTestGenericStoreRegistry(t, scheme, false)
|
||||
}
|
||||
|
||||
func getPodAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
func getPodAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return labels.Set{"name": pod.ObjectMeta.Name}, nil, pod.Initializers != nil, nil
|
||||
return labels.Set{"name": pod.ObjectMeta.Name}, nil, nil
|
||||
}
|
||||
|
||||
// matchPodName returns selection predicate that matches any pod with name in the set.
|
||||
|
@ -152,8 +148,8 @@ func matchEverything() storage.SelectionPredicate {
|
|||
return storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) {
|
||||
return nil, nil, false, nil
|
||||
GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) {
|
||||
return nil, nil, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -379,33 +375,6 @@ func TestStoreCreate(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func isPendingInitialization(obj metav1.Object) bool {
|
||||
return obj.GetInitializers() != nil && obj.GetInitializers().Result == nil && len(obj.GetInitializers().Pending) > 0
|
||||
}
|
||||
|
||||
func hasInitializers(obj metav1.Object, expected ...string) bool {
|
||||
if !isPendingInitialization(obj) {
|
||||
return false
|
||||
}
|
||||
if len(expected) != len(obj.GetInitializers().Pending) {
|
||||
return false
|
||||
}
|
||||
for i, init := range obj.GetInitializers().Pending {
|
||||
if init.Name != expected[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func isFailedInitialization(obj metav1.Object) bool {
|
||||
return obj.GetInitializers() != nil && obj.GetInitializers().Result != nil && obj.GetInitializers().Result.Status == metav1.StatusFailure
|
||||
}
|
||||
|
||||
func isInitialized(obj metav1.Object) bool {
|
||||
return obj.GetInitializers() == nil
|
||||
}
|
||||
|
||||
func isQualifiedResource(err error, kind, group string) bool {
|
||||
if err.(errors.APIStatus).Status().Details.Kind != kind || err.(errors.APIStatus).Status().Details.Group != group {
|
||||
return false
|
||||
|
@ -413,185 +382,6 @@ func isQualifiedResource(err error, kind, group string) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func TestStoreCreateInitialized(t *testing.T) {
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.Initializers, true)()
|
||||
|
||||
podA := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo", Namespace: "test",
|
||||
Initializers: &metav1.Initializers{
|
||||
Pending: []metav1.Initializer{{Name: validInitializerName}},
|
||||
},
|
||||
},
|
||||
Spec: example.PodSpec{NodeName: "machine"},
|
||||
}
|
||||
|
||||
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
|
||||
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||
defer destroyFunc()
|
||||
|
||||
ch := make(chan struct{})
|
||||
chObserver := make(chan struct{})
|
||||
|
||||
// simulate a background initializer that initializes the object
|
||||
early := make(chan struct{}, 1)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
w, err := registry.Watch(ctx, &metainternalversion.ListOptions{
|
||||
IncludeUninitialized: true,
|
||||
Watch: true,
|
||||
FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w.Stop()
|
||||
event := <-w.ResultChan()
|
||||
pod := event.Object.(*example.Pod)
|
||||
if event.Type != watch.Added || !hasInitializers(pod, validInitializerName) {
|
||||
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-early:
|
||||
t.Fatalf("CreateInitialized should not have returned")
|
||||
default:
|
||||
}
|
||||
|
||||
pod.Initializers = nil
|
||||
updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pod = updated.(*example.Pod)
|
||||
if !isInitialized(pod) {
|
||||
t.Fatalf("unexpected update: %#v", pod.Initializers)
|
||||
}
|
||||
|
||||
event = <-w.ResultChan()
|
||||
if event.Type != watch.Modified || !isInitialized(event.Object.(*example.Pod)) {
|
||||
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||
}
|
||||
}()
|
||||
|
||||
// create a background worker that should only observe the final creation
|
||||
go func() {
|
||||
defer close(chObserver)
|
||||
w, err := registry.Watch(ctx, &metainternalversion.ListOptions{
|
||||
IncludeUninitialized: false,
|
||||
Watch: true,
|
||||
FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w.Stop()
|
||||
|
||||
event := <-w.ResultChan()
|
||||
pod := event.Object.(*example.Pod)
|
||||
if event.Type != watch.Added || !isInitialized(pod) {
|
||||
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||
}
|
||||
}()
|
||||
|
||||
// create the object
|
||||
objA, err := registry.Create(ctx, podA, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// signal that we're now waiting, then wait for both observers to see
|
||||
// the result of the create.
|
||||
early <- struct{}{}
|
||||
<-ch
|
||||
<-chObserver
|
||||
|
||||
// get the object
|
||||
checkobj, err := registry.Get(ctx, podA.Name, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// verify objects are equal
|
||||
if e, a := objA, checkobj; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreCreateInitializedFailed(t *testing.T) {
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.Initializers, true)()
|
||||
|
||||
podA := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo", Namespace: "test",
|
||||
Initializers: &metav1.Initializers{
|
||||
Pending: []metav1.Initializer{{Name: validInitializerName}},
|
||||
},
|
||||
},
|
||||
Spec: example.PodSpec{NodeName: "machine"},
|
||||
}
|
||||
|
||||
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
|
||||
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||
defer destroyFunc()
|
||||
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
w, err := registry.Watch(ctx, &metainternalversion.ListOptions{
|
||||
IncludeUninitialized: true,
|
||||
Watch: true,
|
||||
FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
event := <-w.ResultChan()
|
||||
pod := event.Object.(*example.Pod)
|
||||
if event.Type != watch.Added || !hasInitializers(pod, validInitializerName) {
|
||||
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||
}
|
||||
pod.Initializers.Pending = nil
|
||||
pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure, Code: 403, Reason: metav1.StatusReasonForbidden, Message: "induced failure"}
|
||||
updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pod = updated.(*example.Pod)
|
||||
if !isFailedInitialization(pod) {
|
||||
t.Fatalf("unexpected update: %#v", pod.Initializers)
|
||||
}
|
||||
|
||||
event = <-w.ResultChan()
|
||||
if event.Type != watch.Modified || !isFailedInitialization(event.Object.(*example.Pod)) {
|
||||
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||
}
|
||||
|
||||
event = <-w.ResultChan()
|
||||
if event.Type != watch.Deleted || !isFailedInitialization(event.Object.(*example.Pod)) {
|
||||
t.Fatalf("unexpected event: %s %#v", event.Type, event.Object)
|
||||
}
|
||||
w.Stop()
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
// create the object
|
||||
_, err := registry.Create(ctx, podA, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
|
||||
if !errors.IsForbidden(err) {
|
||||
t.Fatalf("unexpected error: %#v", err.(errors.APIStatus).Status())
|
||||
}
|
||||
if err.(errors.APIStatus).Status().Message != "induced failure" {
|
||||
t.Fatalf("unexpected error: %#v", err)
|
||||
}
|
||||
|
||||
<-ch
|
||||
|
||||
// get the object
|
||||
_, err = registry.Get(ctx, podA.Name, &metav1.GetOptions{})
|
||||
if !errors.IsNotFound(err) {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func updateAndVerify(t *testing.T, ctx context.Context, registry *Store, pod *example.Pod) bool {
|
||||
obj, _, err := registry.Update(ctx, pod.Name, rest.DefaultUpdatedObjectInfo(pod), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
|
@ -897,44 +687,6 @@ func TestStoreDelete(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStoreDeleteUninitialized(t *testing.T) {
|
||||
podA := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", Initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: validInitializerName}}}},
|
||||
Spec: example.PodSpec{NodeName: "machine"},
|
||||
}
|
||||
|
||||
testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
|
||||
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||
defer destroyFunc()
|
||||
|
||||
// test failure condition
|
||||
_, _, err := registry.Delete(testContext, podA.Name, nil)
|
||||
if !errors.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// create pod
|
||||
_, err = registry.Create(testContext, podA, rest.ValidateAllObjectFunc, &metav1.CreateOptions{IncludeUninitialized: true})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// delete object
|
||||
_, wasDeleted, err := registry.Delete(testContext, podA.Name, nil)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if !wasDeleted {
|
||||
t.Errorf("unexpected, pod %s should have been deleted immediately", podA.Name)
|
||||
}
|
||||
|
||||
// try to get a item which should be deleted
|
||||
_, err = registry.Get(testContext, podA.Name, &metav1.GetOptions{})
|
||||
if !errors.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGracefulStoreCanDeleteIfExistingGracePeriodZero tests recovery from
|
||||
// race condition where the graceful delete is unable to complete
|
||||
// in prior operation, but the pod remains with deletion timestamp
|
||||
|
@ -1040,45 +792,6 @@ func TestGracefulStoreHandleFinalizers(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFailedInitializationStoreUpdate(t *testing.T) {
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.Initializers, true)()
|
||||
|
||||
initialGeneration := int64(1)
|
||||
podInitializing := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", Initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: validInitializerName}}}, Generation: initialGeneration},
|
||||
Spec: example.PodSpec{NodeName: "machine"},
|
||||
}
|
||||
|
||||
testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
|
||||
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||
registry.EnableGarbageCollection = true
|
||||
defaultDeleteStrategy := testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true}
|
||||
registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy}
|
||||
defer destroyFunc()
|
||||
|
||||
// create pod, view initializing
|
||||
obj, err := registry.Create(testContext, podInitializing, rest.ValidateAllObjectFunc, &metav1.CreateOptions{IncludeUninitialized: true})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
pod := obj.(*example.Pod)
|
||||
|
||||
// update the pod with initialization failure, the pod should be deleted
|
||||
pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure}
|
||||
result, _, err := registry.Update(testContext, podInitializing.Name, rest.DefaultUpdatedObjectInfo(pod), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
_, err = registry.Get(testContext, podInitializing.Name, &metav1.GetOptions{})
|
||||
if err == nil || !errors.IsNotFound(err) {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
pod = result.(*example.Pod)
|
||||
if pod.Initializers == nil || pod.Initializers.Result == nil || pod.Initializers.Result.Status != metav1.StatusFailure {
|
||||
t.Fatalf("Pod returned from update was not correct: %#v", pod)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNonGracefulStoreHandleFinalizers(t *testing.T) {
|
||||
initialGeneration := int64(1)
|
||||
podWithFinalizer := &example.Pod{
|
||||
|
@ -1656,14 +1369,6 @@ func TestStoreDeletionPropagation(t *testing.T) {
|
|||
func TestStoreDeleteCollection(t *testing.T) {
|
||||
podA := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
podB := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
|
||||
podC := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "baz",
|
||||
Initializers: &metav1.Initializers{
|
||||
Pending: []metav1.Initializer{{Name: validInitializerName}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
|
||||
destroyFunc, registry := NewTestGenericStoreRegistry(t)
|
||||
|
@ -1675,9 +1380,6 @@ func TestStoreDeleteCollection(t *testing.T) {
|
|||
if _, err := registry.Create(testContext, podB, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if _, err := registry.Create(testContext, podC, rest.ValidateAllObjectFunc, &metav1.CreateOptions{IncludeUninitialized: true}); err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Delete all pods.
|
||||
deleted, err := registry.DeleteCollection(testContext, nil, &metainternalversion.ListOptions{})
|
||||
|
@ -1685,7 +1387,7 @@ func TestStoreDeleteCollection(t *testing.T) {
|
|||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
deletedPods := deleted.(*example.PodList)
|
||||
if len(deletedPods.Items) != 3 {
|
||||
if len(deletedPods.Items) != 2 {
|
||||
t.Errorf("Unexpected number of pods deleted: %d, expected: 3", len(deletedPods.Items))
|
||||
}
|
||||
|
||||
|
@ -1695,9 +1397,6 @@ func TestStoreDeleteCollection(t *testing.T) {
|
|||
if _, err := registry.Get(testContext, podB.Name, &metav1.GetOptions{}); !errors.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if _, err := registry.Get(testContext, podC.Name, &metav1.GetOptions{}); !errors.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreDeleteCollectionNotFound(t *testing.T) {
|
||||
|
@ -1892,12 +1591,12 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
|
|||
return storage.SelectionPredicate{
|
||||
Label: label,
|
||||
Field: field,
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*example.Pod)
|
||||
if !ok {
|
||||
return nil, nil, false, fmt.Errorf("not a pod")
|
||||
return nil, nil, fmt.Errorf("not a pod")
|
||||
}
|
||||
return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(&pod.ObjectMeta, true), pod.Initializers != nil, nil
|
||||
return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(&pod.ObjectMeta, true), nil
|
||||
},
|
||||
}
|
||||
},
|
||||
|
|
|
@ -28,9 +28,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage/names"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
)
|
||||
|
||||
// RESTCreateStrategy defines the minimum validation, accepted input, and
|
||||
|
@ -92,11 +90,7 @@ func BeforeCreate(strategy RESTCreateStrategy, ctx context.Context, obj runtime.
|
|||
objectMeta.SetName(strategy.GenerateName(objectMeta.GetGenerateName()))
|
||||
}
|
||||
|
||||
// Ensure Initializers are not set unless the feature is enabled
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.Initializers) {
|
||||
objectMeta.SetInitializers(nil)
|
||||
}
|
||||
|
||||
// ClusterName is ignored and should not be saved
|
||||
if len(objectMeta.GetClusterName()) > 0 {
|
||||
objectMeta.SetClusterName("")
|
||||
|
|
|
@ -176,8 +176,7 @@ type Creater interface {
|
|||
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
|
||||
New() runtime.Object
|
||||
|
||||
// Create creates a new version of a resource. If includeUninitialized is set, the object may be returned
|
||||
// without completing initialization.
|
||||
// Create creates a new version of a resource.
|
||||
Create(ctx context.Context, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error)
|
||||
}
|
||||
|
||||
|
@ -189,8 +188,7 @@ type NamedCreater interface {
|
|||
|
||||
// Create creates a new version of a resource. It expects a name parameter from the path.
|
||||
// This is needed for create operations on subresources which include the name of the parent
|
||||
// resource in the path. If includeUninitialized is set, the object may be returned without
|
||||
// completing initialization.
|
||||
// resource in the path.
|
||||
Create(ctx context.Context, name string, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error)
|
||||
}
|
||||
|
||||
|
|
|
@ -28,8 +28,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
)
|
||||
|
||||
// RESTUpdateStrategy defines the minimum validation, accepted input, and
|
||||
|
@ -104,12 +102,8 @@ func BeforeUpdate(strategy RESTUpdateStrategy, ctx context.Context, obj, old run
|
|||
}
|
||||
objectMeta.SetGeneration(oldMeta.GetGeneration())
|
||||
|
||||
// Ensure Initializers are not set unless the feature is enabled
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.Initializers) {
|
||||
oldMeta.SetInitializers(nil)
|
||||
objectMeta.SetInitializers(nil)
|
||||
}
|
||||
|
||||
strategy.PrepareForUpdate(ctx, obj, old)
|
||||
|
||||
// ClusterName is ignored and should not be saved
|
||||
|
|
|
@ -62,8 +62,8 @@ type Config struct {
|
|||
// KeyFunc is used to get a key in the underlying storage for a given object.
|
||||
KeyFunc func(runtime.Object) (string, error)
|
||||
|
||||
// GetAttrsFunc is used to get object labels, fields, and the uninitialized bool
|
||||
GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error)
|
||||
// GetAttrsFunc is used to get object labels, fields
|
||||
GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, err error)
|
||||
|
||||
// TriggerPublisherFunc is used for optimizing amount of watchers that
|
||||
// needs to process an incoming event.
|
||||
|
@ -131,7 +131,7 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
|
|||
}
|
||||
}
|
||||
|
||||
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool
|
||||
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool
|
||||
|
||||
// Cacher is responsible for serving WATCH and LIST requests for a given
|
||||
// resource from its internal cache and updating its cache in the background
|
||||
|
@ -458,7 +458,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
|
|||
if !ok {
|
||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||
}
|
||||
if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {
|
||||
if filter(elem.Key, elem.Labels, elem.Fields) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||
}
|
||||
}
|
||||
|
@ -532,7 +532,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||
if !ok {
|
||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||
}
|
||||
if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {
|
||||
if filter(elem.Key, elem.Labels, elem.Fields) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||
}
|
||||
}
|
||||
|
@ -693,11 +693,11 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
|
|||
}
|
||||
|
||||
func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
|
||||
filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool {
|
||||
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
|
||||
if !hasPathPrefix(objKey, key) {
|
||||
return false
|
||||
}
|
||||
return p.MatchesObjectAttributes(label, field, uninitialized)
|
||||
return p.MatchesObjectAttributes(label, field)
|
||||
}
|
||||
return filterFunc
|
||||
}
|
||||
|
@ -871,10 +871,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
|
|||
|
||||
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
||||
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
||||
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.ObjUninitialized)
|
||||
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
|
||||
oldObjPasses := false
|
||||
if event.PrevObject != nil {
|
||||
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObjUninitialized)
|
||||
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
|
||||
}
|
||||
if !curObjPasses && !oldObjPasses {
|
||||
// Watcher is not interested in that object.
|
||||
|
|
|
@ -46,7 +46,7 @@ import (
|
|||
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
||||
var lock sync.RWMutex
|
||||
count := 0
|
||||
filter := func(string, labels.Set, fields.Set, bool) bool { return true }
|
||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||
forget := func(bool) {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
@ -70,7 +70,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCacheWatcherHandlesFiltering(t *testing.T) {
|
||||
filter := func(_ string, _ labels.Set, field fields.Set, _ bool) bool {
|
||||
filter := func(_ string, _ labels.Set, field fields.Set) bool {
|
||||
return field["spec.nodeName"] == "host"
|
||||
}
|
||||
forget := func(bool) {}
|
||||
|
@ -240,7 +240,7 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) {
|
|||
Type: &example.Pod{},
|
||||
ResourcePrefix: prefix,
|
||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
||||
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { return nil, nil, true, nil },
|
||||
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) { return nil, nil, nil },
|
||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||
}
|
||||
|
|
|
@ -46,30 +46,27 @@ const (
|
|||
// the previous value of the object to enable proper filtering in the
|
||||
// upper layers.
|
||||
type watchCacheEvent struct {
|
||||
Type watch.EventType
|
||||
Object runtime.Object
|
||||
ObjLabels labels.Set
|
||||
ObjFields fields.Set
|
||||
ObjUninitialized bool
|
||||
PrevObject runtime.Object
|
||||
PrevObjLabels labels.Set
|
||||
PrevObjFields fields.Set
|
||||
PrevObjUninitialized bool
|
||||
Key string
|
||||
ResourceVersion uint64
|
||||
Type watch.EventType
|
||||
Object runtime.Object
|
||||
ObjLabels labels.Set
|
||||
ObjFields fields.Set
|
||||
PrevObject runtime.Object
|
||||
PrevObjLabels labels.Set
|
||||
PrevObjFields fields.Set
|
||||
Key string
|
||||
ResourceVersion uint64
|
||||
}
|
||||
|
||||
// Computing a key of an object is generally non-trivial (it performs
|
||||
// e.g. validation underneath). Similarly computing object fields and
|
||||
// labels. To avoid computing them multiple times (to serve the event
|
||||
// in different List/Watch requests), in the underlying store we are
|
||||
// keeping structs (key, object, labels, fields, uninitialized).
|
||||
// keeping structs (key, object, labels, fields).
|
||||
type storeElement struct {
|
||||
Key string
|
||||
Object runtime.Object
|
||||
Labels labels.Set
|
||||
Fields fields.Set
|
||||
Uninitialized bool
|
||||
Key string
|
||||
Object runtime.Object
|
||||
Labels labels.Set
|
||||
Fields fields.Set
|
||||
}
|
||||
|
||||
func storeElementKey(obj interface{}) (string, error) {
|
||||
|
@ -107,7 +104,7 @@ type watchCache struct {
|
|||
keyFunc func(runtime.Object) (string, error)
|
||||
|
||||
// getAttrsFunc is used to get labels and fields of an object.
|
||||
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)
|
||||
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
|
||||
|
||||
// cache is used a cyclic buffer - its first element (with the smallest
|
||||
// resourceVersion) is defined by startIndex, its last element is defined
|
||||
|
@ -147,7 +144,7 @@ type watchCache struct {
|
|||
func newWatchCache(
|
||||
capacity int,
|
||||
keyFunc func(runtime.Object) (string, error),
|
||||
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error),
|
||||
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
|
||||
versioner storage.Versioner) *watchCache {
|
||||
wc := &watchCache{
|
||||
capacity: capacity,
|
||||
|
@ -220,19 +217,18 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||
return fmt.Errorf("couldn't compute key: %v", err)
|
||||
}
|
||||
elem := &storeElement{Key: key, Object: event.Object}
|
||||
elem.Labels, elem.Fields, elem.Uninitialized, err = w.getAttrsFunc(event.Object)
|
||||
elem.Labels, elem.Fields, err = w.getAttrsFunc(event.Object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
watchCacheEvent := &watchCacheEvent{
|
||||
Type: event.Type,
|
||||
Object: elem.Object,
|
||||
ObjLabels: elem.Labels,
|
||||
ObjFields: elem.Fields,
|
||||
ObjUninitialized: elem.Uninitialized,
|
||||
Key: key,
|
||||
ResourceVersion: resourceVersion,
|
||||
Type: event.Type,
|
||||
Object: elem.Object,
|
||||
ObjLabels: elem.Labels,
|
||||
ObjFields: elem.Fields,
|
||||
Key: key,
|
||||
ResourceVersion: resourceVersion,
|
||||
}
|
||||
|
||||
// TODO: We should consider moving this lock below after the watchCacheEvent
|
||||
|
@ -250,7 +246,6 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||
watchCacheEvent.PrevObject = previousElem.Object
|
||||
watchCacheEvent.PrevObjLabels = previousElem.Labels
|
||||
watchCacheEvent.PrevObjFields = previousElem.Fields
|
||||
watchCacheEvent.PrevObjUninitialized = previousElem.Uninitialized
|
||||
}
|
||||
|
||||
if w.onEvent != nil {
|
||||
|
@ -373,16 +368,15 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("couldn't compute key: %v", err)
|
||||
}
|
||||
objLabels, objFields, objUninitialized, err := w.getAttrsFunc(object)
|
||||
objLabels, objFields, err := w.getAttrsFunc(object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
toReplace = append(toReplace, &storeElement{
|
||||
Key: key,
|
||||
Object: object,
|
||||
Labels: objLabels,
|
||||
Fields: objFields,
|
||||
Uninitialized: objUninitialized,
|
||||
Key: key,
|
||||
Object: object,
|
||||
Labels: objLabels,
|
||||
Fields: objFields,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -451,18 +445,17 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
|
|||
if !ok {
|
||||
return nil, fmt.Errorf("not a storeElement: %v", elem)
|
||||
}
|
||||
objLabels, objFields, objUninitialized, err := w.getAttrsFunc(elem.Object)
|
||||
objLabels, objFields, err := w.getAttrsFunc(elem.Object)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result[i] = &watchCacheEvent{
|
||||
Type: watch.Added,
|
||||
Object: elem.Object,
|
||||
ObjLabels: objLabels,
|
||||
ObjFields: objFields,
|
||||
ObjUninitialized: objUninitialized,
|
||||
Key: elem.Key,
|
||||
ResourceVersion: w.resourceVersion,
|
||||
Type: watch.Added,
|
||||
Object: elem.Object,
|
||||
ObjLabels: objLabels,
|
||||
ObjFields: objFields,
|
||||
Key: elem.Key,
|
||||
ResourceVersion: w.resourceVersion,
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
|
|
|
@ -56,11 +56,10 @@ func makeTestPod(name string, resourceVersion uint64) *v1.Pod {
|
|||
|
||||
func makeTestStoreElement(pod *v1.Pod) *storeElement {
|
||||
return &storeElement{
|
||||
Key: "prefix/ns/" + pod.Name,
|
||||
Object: pod,
|
||||
Labels: labels.Set(pod.Labels),
|
||||
Fields: fields.Set{"spec.nodeName": pod.Spec.NodeName},
|
||||
Uninitialized: false,
|
||||
Key: "prefix/ns/" + pod.Name,
|
||||
Object: pod,
|
||||
Labels: labels.Set(pod.Labels),
|
||||
Fields: fields.Set{"spec.nodeName": pod.Spec.NodeName},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,12 +68,12 @@ func newTestWatchCache(capacity int) *watchCache {
|
|||
keyFunc := func(obj runtime.Object) (string, error) {
|
||||
return storage.NamespaceKeyFunc("prefix", obj)
|
||||
}
|
||||
getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return nil, nil, false, fmt.Errorf("not a pod")
|
||||
return nil, nil, fmt.Errorf("not a pod")
|
||||
}
|
||||
return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, false, nil
|
||||
return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil
|
||||
}
|
||||
versioner := etcd.APIObjectVersioner{}
|
||||
wc := newWatchCache(capacity, keyFunc, getAttrsFunc, versioner)
|
||||
|
|
|
@ -314,9 +314,9 @@ func TestGetToList(t *testing.T) {
|
|||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
},
|
||||
expectedOut: nil,
|
||||
|
@ -819,9 +819,9 @@ func TestList(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
|
@ -1124,9 +1124,9 @@ func TestListContinuation(t *testing.T) {
|
|||
Continue: continueValue,
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -1233,9 +1233,9 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||
Continue: continueValue,
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,9 +72,9 @@ func testWatch(t *testing.T, recursive bool) {
|
|||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.ParseSelectorOrDie("metadata.name=bar"),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
},
|
||||
}, { // update
|
||||
|
@ -87,9 +87,9 @@ func testWatch(t *testing.T, recursive bool) {
|
|||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.ParseSelectorOrDie("metadata.name!=bar"),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
},
|
||||
}}
|
||||
|
|
|
@ -86,8 +86,6 @@ type TriggerPublisherFunc func(obj runtime.Object) []MatchValue
|
|||
var Everything = SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
// TODO: split this into a new top level constant?
|
||||
IncludeUninitialized: true,
|
||||
}
|
||||
|
||||
// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update
|
||||
|
|
|
@ -25,59 +25,58 @@ import (
|
|||
|
||||
// AttrFunc returns label and field sets and the uninitialized flag for List or Watch to match.
|
||||
// In any failure to parse given object, it returns error.
|
||||
type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, bool, error)
|
||||
type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, error)
|
||||
|
||||
// FieldMutationFunc allows the mutation of the field selection fields. It is mutating to
|
||||
// avoid the extra allocation on this common path
|
||||
type FieldMutationFunc func(obj runtime.Object, fieldSet fields.Set) error
|
||||
|
||||
func DefaultClusterScopedAttr(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
func DefaultClusterScopedAttr(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return nil, nil, false, err
|
||||
return nil, nil, err
|
||||
}
|
||||
fieldSet := fields.Set{
|
||||
"metadata.name": metadata.GetName(),
|
||||
}
|
||||
|
||||
return labels.Set(metadata.GetLabels()), fieldSet, metadata.GetInitializers() != nil, nil
|
||||
return labels.Set(metadata.GetLabels()), fieldSet, nil
|
||||
}
|
||||
|
||||
func DefaultNamespaceScopedAttr(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
func DefaultNamespaceScopedAttr(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return nil, nil, false, err
|
||||
return nil, nil, err
|
||||
}
|
||||
fieldSet := fields.Set{
|
||||
"metadata.name": metadata.GetName(),
|
||||
"metadata.namespace": metadata.GetNamespace(),
|
||||
}
|
||||
|
||||
return labels.Set(metadata.GetLabels()), fieldSet, metadata.GetInitializers() != nil, nil
|
||||
return labels.Set(metadata.GetLabels()), fieldSet, nil
|
||||
}
|
||||
|
||||
func (f AttrFunc) WithFieldMutation(fieldMutator FieldMutationFunc) AttrFunc {
|
||||
return func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
labelSet, fieldSet, initialized, err := f(obj)
|
||||
return func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
labelSet, fieldSet, err := f(obj)
|
||||
if err != nil {
|
||||
return nil, nil, false, err
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := fieldMutator(obj, fieldSet); err != nil {
|
||||
return nil, nil, false, err
|
||||
return nil, nil, err
|
||||
}
|
||||
return labelSet, fieldSet, initialized, nil
|
||||
return labelSet, fieldSet, nil
|
||||
}
|
||||
}
|
||||
|
||||
// SelectionPredicate is used to represent the way to select objects from api storage.
|
||||
type SelectionPredicate struct {
|
||||
Label labels.Selector
|
||||
Field fields.Selector
|
||||
IncludeUninitialized bool
|
||||
GetAttrs AttrFunc
|
||||
IndexFields []string
|
||||
Limit int64
|
||||
Continue string
|
||||
Label labels.Selector
|
||||
Field fields.Selector
|
||||
GetAttrs AttrFunc
|
||||
IndexFields []string
|
||||
Limit int64
|
||||
Continue string
|
||||
}
|
||||
|
||||
// Matches returns true if the given object's labels and fields (as
|
||||
|
@ -87,13 +86,10 @@ func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
|
|||
if s.Empty() {
|
||||
return true, nil
|
||||
}
|
||||
labels, fields, uninitialized, err := s.GetAttrs(obj)
|
||||
labels, fields, err := s.GetAttrs(obj)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !s.IncludeUninitialized && uninitialized {
|
||||
return false, nil
|
||||
}
|
||||
matched := s.Label.Matches(labels)
|
||||
if matched && s.Field != nil {
|
||||
matched = matched && s.Field.Matches(fields)
|
||||
|
@ -103,10 +99,7 @@ func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
|
|||
|
||||
// MatchesObjectAttributes returns true if the given labels and fields
|
||||
// match s.Label and s.Field.
|
||||
func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set, uninitialized bool) bool {
|
||||
if !s.IncludeUninitialized && uninitialized {
|
||||
return false
|
||||
}
|
||||
func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set) bool {
|
||||
if s.Label.Empty() && s.Field.Empty() {
|
||||
return true
|
||||
}
|
||||
|
@ -146,5 +139,5 @@ func (s *SelectionPredicate) MatcherIndex() []MatchValue {
|
|||
|
||||
// Empty returns true if the predicate performs no filtering.
|
||||
func (s *SelectionPredicate) Empty() bool {
|
||||
return s.Label.Empty() && s.Field.Empty() && s.IncludeUninitialized
|
||||
return s.Label.Empty() && s.Field.Empty()
|
||||
}
|
||||
|
|
|
@ -48,7 +48,6 @@ func TestSelectionPredicate(t *testing.T) {
|
|||
labelSelector, fieldSelector string
|
||||
labels labels.Set
|
||||
fields fields.Set
|
||||
uninitialized bool
|
||||
err error
|
||||
shouldMatch bool
|
||||
matchSingleKey string
|
||||
|
@ -81,14 +80,6 @@ func TestSelectionPredicate(t *testing.T) {
|
|||
shouldMatch: true,
|
||||
matchSingleKey: "12345",
|
||||
},
|
||||
"E": {
|
||||
fieldSelector: "metadata.name=12345",
|
||||
labels: labels.Set{},
|
||||
fields: fields.Set{"metadata.name": "12345"},
|
||||
uninitialized: true,
|
||||
shouldMatch: false,
|
||||
matchSingleKey: "12345",
|
||||
},
|
||||
"error": {
|
||||
labelSelector: "name=foo",
|
||||
fieldSelector: "uid=12345",
|
||||
|
@ -109,8 +100,8 @@ func TestSelectionPredicate(t *testing.T) {
|
|||
sp := &SelectionPredicate{
|
||||
Label: parsedLabel,
|
||||
Field: parsedField,
|
||||
GetAttrs: func(runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) {
|
||||
return item.labels, item.fields, item.uninitialized, item.err
|
||||
GetAttrs: func(runtime.Object) (label labels.Set, field fields.Set, err error) {
|
||||
return item.labels, item.fields, item.err
|
||||
},
|
||||
}
|
||||
got, err := sp.Matches(&Ignored{})
|
||||
|
|
|
@ -61,12 +61,12 @@ func init() {
|
|||
}
|
||||
|
||||
// GetAttrs returns labels and fields of a given object for filtering purposes.
|
||||
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*example.Pod)
|
||||
if !ok {
|
||||
return nil, nil, false, fmt.Errorf("not a pod")
|
||||
return nil, nil, fmt.Errorf("not a pod")
|
||||
}
|
||||
return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), pod.Initializers != nil, nil
|
||||
return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), nil
|
||||
}
|
||||
|
||||
// PodToSelectableFields returns a field set that represents the object
|
||||
|
@ -194,9 +194,9 @@ func TestGetToList(t *testing.T) {
|
|||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
},
|
||||
expectedOut: nil,
|
||||
|
@ -520,12 +520,12 @@ func TestFiltering(t *testing.T) {
|
|||
pred := storage.SelectionPredicate{
|
||||
Label: labels.SelectorFromSet(labels.Set{"filter": "foo"}),
|
||||
Field: fields.Everything(),
|
||||
GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) {
|
||||
GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) {
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
return labels.Set(metadata.GetLabels()), nil, metadata.GetInitializers() != nil, nil
|
||||
return labels.Set(metadata.GetLabels()), nil, nil
|
||||
},
|
||||
}
|
||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, pred)
|
||||
|
|
Loading…
Reference in New Issue