diff --git a/pkg/storage/etcd3/errors.go b/pkg/storage/etcd3/errors.go index d71c9917d..7dd91d949 100644 --- a/pkg/storage/etcd3/errors.go +++ b/pkg/storage/etcd3/errors.go @@ -17,7 +17,11 @@ limitations under the License. package etcd3 import ( + goerrors "errors" + "net/http" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/storage" etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" @@ -29,6 +33,19 @@ func interpretWatchError(err error) error { case err == etcdrpc.ErrCompacted: return errors.NewResourceExpired("The resourceVersion for the provided watch is too old.") } + + var corruptobjDeletedErr *corruptObjectDeletedError + if goerrors.As(err, &corruptobjDeletedErr) { + return &errors.StatusError{ + ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusInternalServerError, + Reason: metav1.StatusReasonStoreReadError, + Message: corruptobjDeletedErr.Error(), + }, + } + } + return err } diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index f4de001e9..1484ad51d 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -194,6 +194,28 @@ func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetes } } +type corruptedTransformer struct { + value.Transformer +} + +func (f *corruptedTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, stale bool, err error) { + return nil, true, &corruptObjectError{err: fmt.Errorf("bits flipped"), errType: untransformable} +} + +type storeWithCorruptedTransformer struct { + *store +} + +func (s *storeWithCorruptedTransformer) CorruptTransformer() func() { + ct := &corruptedTransformer{Transformer: s.transformer} + s.transformer = ct + s.watcher.transformer = ct + return func() { + s.transformer = ct.Transformer + s.watcher.transformer = ct.Transformer + } +} + func TestGuaranteedUpdate(t *testing.T) { ctx, store, client := testSetup(t) storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(client.Client, store.codec)) diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 536f2e1c0..abb51b07f 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -686,18 +686,40 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) { data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key)) if err != nil { - return nil, nil, err + return nil, nil, wc.watcher.transformIfCorruptObjectError(e, err) } // Note that this sends the *old* object with the etcd revision for the time at // which it gets deleted. oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev) if err != nil { - return nil, nil, err + return nil, nil, wc.watcher.transformIfCorruptObjectError(e, err) } } return curObj, oldObj, nil } +type corruptObjectDeletedError struct { + err error +} + +func (e *corruptObjectDeletedError) Error() string { + return fmt.Sprintf("saw a DELETED event, but object data is corrupt - %v", e.err) +} +func (e *corruptObjectDeletedError) Unwrap() error { return e.err } + +func (w *watcher) transformIfCorruptObjectError(e *event, err error) error { + var corruptObjErr *corruptObjectError + if !e.isDeleted || !errors.As(err, &corruptObjErr) { + return err + } + + // if we are here it means we received a DELETED event but the object + // associated with it is corrupt because we failed to transform or + // decode the data associated with the object. + // wrap the original error so we can send a proper watch Error event. + return &corruptObjectDeletedError{err: corruptObjErr} +} + func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (_ runtime.Object, err error) { obj, err := runtime.Decode(codec, []byte(data)) if err != nil { diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 236fa9aca..536b4026f 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -112,6 +112,12 @@ func TestProgressNotify(t *testing.T) { storagetesting.RunOptionalTestProgressNotify(ctx, t, store) } +func TestWatchWithUnsafeDelete(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AllowUnsafeMalformedObjectDeletion, true) + ctx, store, _ := testSetup(t) + storagetesting.RunTestWatchWithUnsafeDelete(ctx, t, &storeWithCorruptedTransformer{store}) +} + // TestWatchDispatchBookmarkEvents makes sure that // setting allowWatchBookmarks query param against // etcd implementation doesn't have any effect. diff --git a/pkg/storage/testing/store_tests.go b/pkg/storage/testing/store_tests.go index 427bcbab5..c70d774c0 100644 --- a/pkg/storage/testing/store_tests.go +++ b/pkg/storage/testing/store_tests.go @@ -2147,6 +2147,11 @@ type InterfaceWithPrefixTransformer interface { UpdatePrefixTransformer(PrefixTransformerModifier) func() } +type InterfaceWithCorruptTransformer interface { + storage.Interface + CorruptTransformer() func() +} + func RunTestListResourceVersionMatch(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) { nextPod := func(index uint32) (string, *example.Pod) { obj := &example.Pod{ diff --git a/pkg/storage/testing/watcher_tests.go b/pkg/storage/testing/watcher_tests.go index e29d7094f..e8363f0fb 100644 --- a/pkg/storage/testing/watcher_tests.go +++ b/pkg/storage/testing/watcher_tests.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "strings" "sync" "testing" "time" @@ -407,6 +408,61 @@ func RunTestWatchError(ctx context.Context, t *testing.T, store InterfaceWithPre testCheckEventType(t, w, watch.Error) } +func RunTestWatchWithUnsafeDelete(ctx context.Context, t *testing.T, store InterfaceWithCorruptTransformer) { + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}} + key := computePodKey(obj) + + out := &example.Pod{} + if err := store.Create(ctx, key, obj, out, 0); err != nil { + t.Fatalf("failed to create object in the store: %v", err) + } + + // Compute the initial resource version from which we can start watching later. + list := &example.PodList{} + storageOpts := storage.ListOptions{ + ResourceVersion: "0", + Predicate: storage.Everything, + Recursive: true, + } + if err := store.GetList(ctx, "/pods", storageOpts, list); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // Now trigger watch error by injecting failing transformer. + revertTransformer := store.CorruptTransformer() + defer revertTransformer() + + w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: list.ResourceVersion, Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + + // normal deletetion should fail + if err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err == nil { + t.Fatalf("Expected normal Delete to fail") + } + if err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{IgnoreStoreReadError: true}); err != nil { + t.Fatalf("Expected unsafe Delete to succeed, but got: %v", err) + } + + testCheckResultFunc(t, w, func(got watch.Event) { + if want, got := watch.Error, got.Type; want != got { + t.Errorf("Expected event type: %q, but got: %q", want, got) + } + switch v := got.Object.(type) { + case *metav1.Status: + if want, got := metav1.StatusReasonStoreReadError, v.Reason; want != got { + t.Errorf("Expected reason: %q, but got: %q", want, got) + } + if want := "saw a DELETED event, but object data is corrupt"; !strings.Contains(v.Message, want) { + t.Errorf("Expected Message to contain: %q, but got: %q", want, v.Message) + } + default: + t.Errorf("expected an metav1 Status object, but got: %v", got.Object) + } + }) +} + func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage.Interface) { canceledCtx, cancel := context.WithCancel(ctx) cancel()