diff --git a/pkg/registry/generic/registry/store.go b/pkg/registry/generic/registry/store.go index 722a31227..d4748bc2e 100644 --- a/pkg/registry/generic/registry/store.go +++ b/pkg/registry/generic/registry/store.go @@ -54,6 +54,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ) // FinishFunc is a function returned by Begin hooks to complete an operation. @@ -1129,94 +1130,112 @@ func (e *Store) updateForGracefulDeletionAndFinalizers(ctx context.Context, name // Delete removes the item from storage. // options can be mutated by rest.BeforeDelete due to a graceful deletion strategy. func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { - key, err := e.KeyFunc(ctx, name) - if err != nil { - return nil, false, err - } - obj := e.NewFunc() - qualifiedResource := e.qualifiedResourceFromContext(ctx) - if err = e.Storage.Get(ctx, key, storage.GetOptions{}, obj); err != nil { - return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name) - } + for { + key, err := e.KeyFunc(ctx, name) + if err != nil { + return nil, false, err + } - // support older consumers of delete by treating "nil" as delete immediately - if options == nil { - options = metav1.NewDeleteOptions(0) - } - var preconditions storage.Preconditions - if options.Preconditions != nil { - preconditions.UID = options.Preconditions.UID - preconditions.ResourceVersion = options.Preconditions.ResourceVersion - } - graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options) - if err != nil { - return nil, false, err - } - // this means finalizers cannot be updated via DeleteOptions if a deletion is already pending - if pendingGraceful { - out, err := e.finalizeDelete(ctx, obj, false, options) - return out, false, err - } - // check if obj has pending finalizers - accessor, err := meta.Accessor(obj) - if err != nil { - return nil, false, apierrors.NewInternalError(err) - } - pendingFinalizers := len(accessor.GetFinalizers()) != 0 - var ignoreNotFound bool - var deleteImmediately bool = true - var lastExisting, out runtime.Object + obj := e.NewFunc() + qualifiedResource := e.qualifiedResourceFromContext(ctx) + if err = e.Storage.Get(ctx, key, storage.GetOptions{}, obj); err != nil { + return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name) + } - // Handle combinations of graceful deletion and finalization by issuing - // the correct updates. - shouldUpdateFinalizers, _ := deletionFinalizersForGarbageCollection(ctx, e, accessor, options) - // TODO: remove the check, because we support no-op updates now. - if graceful || pendingFinalizers || shouldUpdateFinalizers { - err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletionAndFinalizers(ctx, name, key, options, preconditions, deleteValidation, obj) - // Update the preconditions.ResourceVersion if set since we updated the object. - if err == nil && deleteImmediately && preconditions.ResourceVersion != nil { - accessor, err = meta.Accessor(out) - if err != nil { - return out, false, apierrors.NewInternalError(err) + // support older consumers of delete by treating "nil" as delete immediately + if options == nil { + options = metav1.NewDeleteOptions(0) + } + var preconditions storage.Preconditions + if options.Preconditions != nil { + preconditions.UID = options.Preconditions.UID + preconditions.ResourceVersion = options.Preconditions.ResourceVersion + } + graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options) + if err != nil { + return nil, false, err + } + // this means finalizers cannot be updated via DeleteOptions if a deletion is already pending + if pendingGraceful { + out, err := e.finalizeDelete(ctx, obj, false, options) + return out, false, err + } + // check if obj has pending finalizers + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, false, apierrors.NewInternalError(err) + } + pendingFinalizers := len(accessor.GetFinalizers()) != 0 + var ignoreNotFound bool + var deleteImmediately bool = true + var lastExisting, out runtime.Object + + // Handle combinations of graceful deletion and finalization by issuing + // the correct updates. + shouldUpdateFinalizers, _ := deletionFinalizersForGarbageCollection(ctx, e, accessor, options) + // TODO: remove the check, because we support no-op updates now. + if graceful || pendingFinalizers || shouldUpdateFinalizers { + err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletionAndFinalizers(ctx, name, key, options, preconditions, deleteValidation, obj) + // Update the preconditions.ResourceVersion if set since we updated the object. + if err == nil && deleteImmediately && preconditions.ResourceVersion != nil { + accessor, err = meta.Accessor(out) + if err != nil { + return out, false, apierrors.NewInternalError(err) + } + resourceVersion := accessor.GetResourceVersion() + preconditions.ResourceVersion = &resourceVersion } - resourceVersion := accessor.GetResourceVersion() - preconditions.ResourceVersion = &resourceVersion } - } - // !deleteImmediately covers all cases where err != nil. We keep both to be future-proof. - if !deleteImmediately || err != nil { - return out, false, err - } - - // Going further in this function is not useful when we are - // performing a dry-run request. Worse, it will actually - // override "out" with the version of the object in database - // that doesn't have the finalizer and deletiontimestamp set - // (because the update above was dry-run too). If we already - // have that version available, let's just return it now, - // otherwise, we can call dry-run delete that will get us the - // latest version of the object. - if dryrun.IsDryRun(options.DryRun) && out != nil { - return out, true, nil - } - - // delete immediately, or no graceful deletion supported - klog.V(6).InfoS("Going to delete object from registry", "object", klog.KRef(genericapirequest.NamespaceValue(ctx), name)) - out = e.NewFunc() - if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun), nil, storage.DeleteOptions{}); err != nil { - // Please refer to the place where we set ignoreNotFound for the reason - // why we ignore the NotFound error . - if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil { - // The lastExisting object may not be the last state of the object - // before its deletion, but it's the best approximation. - out, err := e.finalizeDelete(ctx, lastExisting, true, options) - return out, true, err + // !deleteImmediately covers all cases where err != nil. We keep both to be future-proof. + if !deleteImmediately || err != nil { + return out, false, err } - return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name) + + // Going further in this function is not useful when we are + // performing a dry-run request. Worse, it will actually + // override "out" with the version of the object in database + // that doesn't have the finalizer and deletiontimestamp set + // (because the update above was dry-run too). If we already + // have that version available, let's just return it now, + // otherwise, we can call dry-run delete that will get us the + // latest version of the object. + if dryrun.IsDryRun(options.DryRun) && out != nil { + return out, true, nil + } + + retryOnRVConflict := false + if preconditions.ResourceVersion == nil { + // We have no RV precondition, and could be racing with addition or deletion of finalizers. + // Add an internal RV precondition based on the internal object we looked up, + // and retry internally if we get a conflict error on that field as a result. + retryOnRVConflict = true + preconditions.ResourceVersion = ptr.To(accessor.GetResourceVersion()) + } + + // delete immediately, or no graceful deletion supported + klog.V(6).InfoS("Going to delete object from registry", "object", klog.KRef(genericapirequest.NamespaceValue(ctx), name)) + out = e.NewFunc() + if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun), nil, storage.DeleteOptions{}); err != nil { + // Please refer to the place where we set ignoreNotFound for the reason + // why we ignore the NotFound error . + if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil { + // The lastExisting object may not be the last state of the object + // before its deletion, but it's the best approximation. + out, err := e.finalizeDelete(ctx, lastExisting, true, options) + return out, true, err + } + + if retryOnRVConflict && storage.IsPreconditionErrorForField(err, storage.PreconditionResourceVersion) { + // retry from the top if the delete failed due to a resource version precondition we added internally + continue + } + + return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name) + } + out, err = e.finalizeDelete(ctx, out, true, options) + return out, true, err } - out, err = e.finalizeDelete(ctx, out, true, options) - return out, true, err } // DeleteReturnsDeletedObject implements the rest.MayReturnFullObjectDeleter interface diff --git a/pkg/registry/generic/registry/store_test.go b/pkg/registry/generic/registry/store_test.go index cc50afede..fddfcd12a 100644 --- a/pkg/registry/generic/registry/store_test.go +++ b/pkg/registry/generic/registry/store_test.go @@ -1635,6 +1635,95 @@ func TestNonGracefulStoreHandleFinalizers(t *testing.T) { } } +func TestFinalizerRace(t *testing.T) { + var ( + originalStorage storage.Interface + wrappedStorage *deleteInterceptingStorage + ) + wrapStorage := func(s storage.Interface) storage.Interface { + originalStorage = s + wrappedStorage = &deleteInterceptingStorage{Interface: originalStorage} + return wrappedStorage + } + testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") + destroyFunc, registry := newTestGenericStoreRegistryWithOptions(t, scheme, testRegistryOptions{hasCacheEnabled: false, wrapStorage: wrapStorage}) + defer destroyFunc() + + registry.EnableGarbageCollection = true + registry.ReturnDeletedObject = true + + // create pod + pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Spec: example.PodSpec{NodeName: "machine"}} + _, err := registry.Create(testContext, pod, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // intercept delete requests to mutate the object just before delete requests are processed + beforeDeleteCalled := 0 + wrappedStorage.beforeDelete = func(ctx context.Context, key string) { + switch beforeDeleteCalled { + case 0: + // simulate concurrent update that didn't modify finalizers. + // should trigger an internal delete re-attempt. + err := originalStorage.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { + input.(*example.Pod).Labels = map[string]string{"test": "true"} + return input, nil, nil + }, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + case 1: + // simulate concurrent update that added a finalizer. + // should not trigger an internal re-attempt because there's now a finalizer. + err := originalStorage.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { + input.(*example.Pod).Finalizers = []string{"example.com/finalizer"} + return input, nil, nil + }, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + default: + // unexpected + t.Fatalf("unexpected 3rd call to beforeDelete") + } + beforeDeleteCalled++ + } + + result, wasDeleted, err := registry.Delete(testContext, pod.Name, rest.ValidateAllObjectFunc, nil) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if wasDeleted { + t.Errorf("unexpected, pod %s should not have been deleted immediately", pod.Name) + } + if beforeDeleteCalled != 2 { + t.Errorf("expected beforeDelete called 2 times, got %d", beforeDeleteCalled) + } + if result.(*example.Pod).DeletionTimestamp == nil { + t.Errorf("expected deletionTimestamp, got nil") + } + if len(result.(*example.Pod).Finalizers) != 1 { + t.Errorf("expected finalizer, got none") + } + if result.(*example.Pod).Labels["test"] != "true" { + t.Errorf("expected test=true label") + } +} + +type deleteInterceptingStorage struct { + storage.Interface + + beforeDelete func(ctx context.Context, key string) +} + +func (t *deleteInterceptingStorage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error { + if t.beforeDelete != nil { + t.beforeDelete(ctx, key) + } + return t.Interface.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject, opts) +} + func TestStoreDeleteWithOrphanDependents(t *testing.T) { initialGeneration := int64(1) podWithOrphanFinalizer := func(name string) *example.Pod { @@ -2427,6 +2516,16 @@ func TestStoreWatch(t *testing.T) { } func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheEnabled bool) (factory.DestroyFunc, *Store) { + return newTestGenericStoreRegistryWithOptions(t, scheme, testRegistryOptions{hasCacheEnabled: hasCacheEnabled}) +} + +type testRegistryOptions struct { + hasCacheEnabled bool + + wrapStorage func(storage.Interface) storage.Interface +} + +func newTestGenericStoreRegistryWithOptions(t *testing.T, scheme *runtime.Scheme, opts testRegistryOptions) (factory.DestroyFunc, *Store) { podPrefix := "/pods" server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) strategy := &testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true} @@ -2439,11 +2538,15 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE if err != nil { t.Fatalf("Error creating storage: %v", err) } + if opts.wrapStorage != nil { + // allow shimming storage to intercept calls + s = opts.wrapStorage(s) + } destroyFunc := func() { dFunc() server.Terminate(t) } - if hasCacheEnabled { + if opts.hasCacheEnabled { config := cacherstorage.Config{ Storage: s, Versioner: storage.APIObjectVersioner{}, diff --git a/pkg/storage/errors.go b/pkg/storage/errors.go index 5bae365a8..e53e19e3f 100644 --- a/pkg/storage/errors.go +++ b/pkg/storage/errors.go @@ -98,6 +98,34 @@ func NewInvalidObjError(key, msg string) *StorageError { } } +// NewPreconditionError returns a storage error with an underlying cause of a precondition failure on the given field. +func NewPreconditionError(key, field, preconditionValue, objectValue string) *StorageError { + return &StorageError{ + Code: ErrCodeInvalidObj, + Key: key, + err: &preconditionError{field: field, preconditionValue: preconditionValue, objectValue: objectValue}, + } +} + +type preconditionError struct { + field string + preconditionValue string + objectValue string +} + +func (p *preconditionError) Error() string { + return fmt.Sprintf( + "Precondition failed: %s in precondition: %v, %s in object meta: %v", + p.field, p.preconditionValue, + p.field, p.objectValue) +} + +// IsPreconditionErrorForField returns true if the given error is a precondition error on the given field. +func IsPreconditionErrorForField(err error, field string) bool { + var p *preconditionError + return errors.As(err, &p) && p != nil && p.field == field +} + // NewCorruptObjError returns a new StorageError, it represents a corrupt object: // a) object data retrieved from the storage failed to transform with the given err. // b) the given object failed to decode with the given err diff --git a/pkg/storage/errors_test.go b/pkg/storage/errors_test.go new file mode 100644 index 000000000..a47228fc6 --- /dev/null +++ b/pkg/storage/errors_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "fmt" + "testing" +) + +func TestIsPreconditionErrorForField(t *testing.T) { + type args struct { + err error + field string + } + tests := []struct { + name string + args args + wantInvalidObjError bool + wantPreconditionErrorForField bool + }{ + { + name: "nil", + args: args{err: nil, field: "otherfield"}, + wantInvalidObjError: false, + wantPreconditionErrorForField: false, + }, + { + name: "non-storage error", + args: args{err: fmt.Errorf("test"), field: "otherfield"}, + wantInvalidObjError: false, + wantPreconditionErrorForField: false, + }, + { + name: "non-precondition storage error", + args: args{err: &StorageError{}, field: "otherfield"}, + wantInvalidObjError: false, + wantPreconditionErrorForField: false, + }, + { + name: "non-precondition storage error", + args: args{err: &StorageError{err: fmt.Errorf("test")}, field: "otherfield"}, + wantInvalidObjError: false, + wantPreconditionErrorForField: false, + }, + { + name: "invalid obj error", + args: args{err: NewInvalidObjError("mykey", "myerr"), field: "otherfield"}, + wantInvalidObjError: true, + wantPreconditionErrorForField: false, + }, + { + name: "precondition storage error other field", + args: args{err: NewPreconditionError("mykey", "myfield", "preconditionValue", "objectValue"), field: "otherfield"}, + wantInvalidObjError: true, + wantPreconditionErrorForField: false, + }, + { + name: "precondition storage error field", + args: args{err: NewPreconditionError("mykey", "myfield", "preconditionValue", "objectValue"), field: "myfield"}, + wantInvalidObjError: true, + wantPreconditionErrorForField: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsInvalidObj(tt.args.err); got != tt.wantInvalidObjError { + t.Errorf("IsInvalidObj() = %v, want %v", got, tt.wantInvalidObjError) + } + if got := IsPreconditionErrorForField(tt.args.err, tt.args.field); got != tt.wantPreconditionErrorForField { + t.Errorf("IsPreconditionErrorForField() = %v, want %v", got, tt.wantPreconditionErrorForField) + } + }) + } +} diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index a07dda562..960d32285 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -147,22 +147,19 @@ func (p *Preconditions) Check(key string, obj runtime.Object) error { err)) } if p.UID != nil && *p.UID != objMeta.GetUID() { - err := fmt.Sprintf( - "Precondition failed: UID in precondition: %v, UID in object meta: %v", - *p.UID, - objMeta.GetUID()) - return NewInvalidObjError(key, err) + return NewPreconditionError(key, PreconditionUID, string(*p.UID), string(objMeta.GetUID())) } if p.ResourceVersion != nil && *p.ResourceVersion != objMeta.GetResourceVersion() { - err := fmt.Sprintf( - "Precondition failed: ResourceVersion in precondition: %v, ResourceVersion in object meta: %v", - *p.ResourceVersion, - objMeta.GetResourceVersion()) - return NewInvalidObjError(key, err) + return NewPreconditionError(key, PreconditionResourceVersion, *p.ResourceVersion, objMeta.GetResourceVersion()) } return nil } +const ( + PreconditionUID = "UID" + PreconditionResourceVersion = "ResourceVersion" +) + // Interface offers a common interface for object marshaling/unmarshaling operations and // hides all the storage-related operations behind it. type Interface interface {