From 1b75e5416d2da34a257bd3356e2757f548bf6e16 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Tue, 4 Dec 2018 16:55:59 -0800 Subject: [PATCH] Use same 'minimum resource version' semantics both when watch cache is enabled and disabled Kubernetes-commit: 1406f7adf20a9ceaf5b8270ec9a61dd9e450ae8a --- pkg/storage/cacher/watch_cache.go | 8 +- pkg/storage/cacher/watch_cache_test.go | 7 +- pkg/storage/errors.go | 30 ++++++ pkg/storage/etcd3/store.go | 40 +++++--- pkg/storage/etcd3/store_test.go | 135 +++++++++++++++++++++---- pkg/storage/tests/cacher_test.go | 7 +- 6 files changed, 192 insertions(+), 35 deletions(-) diff --git a/pkg/storage/cacher/watch_cache.go b/pkg/storage/cacher/watch_cache.go index 709e352f5..65c6b31f8 100644 --- a/pkg/storage/cacher/watch_cache.go +++ b/pkg/storage/cacher/watch_cache.go @@ -40,6 +40,10 @@ const ( // before terminating request and returning Timeout error with retry // after suggestion. blockTimeout = 3 * time.Second + + // resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client + // after receiving a 'too high resource version' error. + resourceVersionTooHighRetrySeconds = 1 ) // watchCacheEvent is a single "watch event" that is send to users of @@ -345,8 +349,8 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utilt } for w.resourceVersion < resourceVersion { if w.clock.Since(startTime) >= blockTimeout { - // Timeout with retry after 1 second. - return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1) + // Request that the client retry after 'resourceVersionTooHighRetrySeconds' seconds. + return storage.NewTooLargeResourceVersionError(resourceVersion, w.resourceVersion, resourceVersionTooHighRetrySeconds) } w.cond.Wait() } diff --git a/pkg/storage/cacher/watch_cache_test.go b/pkg/storage/cacher/watch_cache_test.go index 9883def73..860fba0b8 100644 --- a/pkg/storage/cacher/watch_cache_test.go +++ b/pkg/storage/cacher/watch_cache_test.go @@ -380,8 +380,11 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) { }() _, _, err := store.WaitUntilFreshAndList(5, nil) - if err == nil { - t.Fatalf("unexpected lack of timeout error") + if !errors.IsTimeout(err) { + t.Errorf("expected timeout error but got: %v", err) + } + if !storage.IsTooLargeResourceVersion(err) { + t.Errorf("expected 'Too large resource version' cause in error but got: %v", err) } } diff --git a/pkg/storage/errors.go b/pkg/storage/errors.go index f73d47aed..3acee4598 100644 --- a/pkg/storage/errors.go +++ b/pkg/storage/errors.go @@ -19,6 +19,8 @@ package storage import ( "fmt" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/validation/field" ) @@ -168,3 +170,31 @@ func NewInternalError(reason string) InternalError { func NewInternalErrorf(format string, a ...interface{}) InternalError { return InternalError{fmt.Sprintf(format, a...)} } + +var tooLargeResourceVersionCauseMsg = "Too large resource version" + +// NewTooLargeResourceVersionError returns a timeout error with the given retrySeconds for a request for +// a minimum resource version that is larger than the largest currently available resource version for a requested resource. +func NewTooLargeResourceVersionError(minimumResourceVersion, currentRevision uint64, retrySeconds int) error { + err := errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %d, current: %d", minimumResourceVersion, currentRevision), retrySeconds) + err.ErrStatus.Details.Causes = []metav1.StatusCause{{Message: tooLargeResourceVersionCauseMsg}} + return err +} + +// IsTooLargeResourceVersion returns true if the error is a TooLargeResourceVersion error. +func IsTooLargeResourceVersion(err error) bool { + if !errors.IsTimeout(err) { + return false + } + switch t := err.(type) { + case errors.APIStatus: + if d := t.Status().Details; d != nil { + for _, cause := range d.Causes { + if cause.Message == tooLargeResourceVersionCauseMsg { + return true + } + } + } + } + return false +} diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index f065872b5..7a99c2a5c 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -29,7 +29,6 @@ import ( "time" "github.com/coreos/etcd/clientv3" - "k8s.io/klog" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -41,6 +40,7 @@ import ( "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/klog" utiltrace "k8s.io/utils/trace" ) @@ -119,6 +119,9 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out if err != nil { return err } + if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { + return err + } if len(getResp.Kvs) == 0 { if ignoreNotFound { @@ -398,6 +401,9 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin if err != nil { return err } + if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { + return err + } if len(getResp.Kvs) > 0 { data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key)) @@ -559,17 +565,6 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor options = append(options, clientv3.WithRange(rangeEnd)) default: - if len(resourceVersion) > 0 { - fromRV, err := s.versioner.ParseResourceVersion(resourceVersion) - if err != nil { - return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) - } - if fromRV > 0 { - options = append(options, clientv3.WithRev(int64(fromRV))) - } - returnedRV = int64(fromRV) - } - options = append(options, clientv3.WithPrefix()) } @@ -584,6 +579,9 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor if err != nil { return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) } + if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { + return err + } hasMore = getResp.More if len(getResp.Kvs) == 0 && getResp.More { @@ -798,6 +796,24 @@ func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, er return []clientv3.OpOption{clientv3.WithLease(id)}, nil } +// ensureMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is +// greater than the most recent actualRevision available from storage. +func (s *store) ensureMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error { + if minimumResourceVersion == "" { + return nil + } + minimumRV, err := s.versioner.ParseResourceVersion(minimumResourceVersion) + if err != nil { + return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) + } + // Enforce the storage.Interface guarantee that the resource version of the returned data + // "will be at least 'resourceVersion'". + if minimumRV > actualRevision { + return storage.NewTooLargeResourceVersionError(minimumRV, actualRevision, 0) + } + return nil +} + // decode decodes value of bytes into object. It will also set the object resource version to rev. // On success, objPtr would be set to the object. func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error { diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index 85aa3b3be..d6e5cfe0d 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -183,44 +183,87 @@ func TestCreateWithKeyExist(t *testing.T) { func TestGet(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) + // create an object to test key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + // create an additional object to increment the resource version for pods above the resource version of the foo object + lastUpdatedObj := &example.Pod{} + if err := store.Create(ctx, "bar", &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, lastUpdatedObj, 0); err != nil { + t.Fatalf("Set failed: %v", err) + } + + currentRV, _ := strconv.Atoi(storedObj.ResourceVersion) + lastUpdatedCurrentRV, _ := strconv.Atoi(lastUpdatedObj.ResourceVersion) tests := []struct { + name string key string ignoreNotFound bool expectNotFoundErr bool + expectRVTooLarge bool expectedOut *example.Pod + rv string }{{ // test get on existing item + name: "get existing", key: key, ignoreNotFound: false, expectNotFoundErr: false, expectedOut: storedObj, + }, { // test get on existing item with minimum resource version set to 0 + name: "resource version 0", + key: key, + expectedOut: storedObj, + rv: "0", + }, { // test get on existing item with minimum resource version set to current resource version of the object + name: "current object resource version", + key: key, + expectedOut: storedObj, + rv: fmt.Sprintf("%d", currentRV), + }, { // test get on existing item with minimum resource version set to latest pod resource version + name: "latest resource version", + key: key, + expectedOut: storedObj, + rv: fmt.Sprintf("%d", lastUpdatedCurrentRV), + }, { // test get on existing item with minimum resource version set too high + name: "too high resource version", + key: key, + expectRVTooLarge: true, + rv: fmt.Sprintf("%d", lastUpdatedCurrentRV+1), }, { // test get on non-existing item with ignoreNotFound=false + name: "get non-existing", key: "/non-existing", ignoreNotFound: false, expectNotFoundErr: true, }, { // test get on non-existing item with ignoreNotFound=true + name: "get non-existing, ignore not found", key: "/non-existing", ignoreNotFound: true, expectNotFoundErr: false, expectedOut: &example.Pod{}, }} - for i, tt := range tests { - out := &example.Pod{} - err := store.Get(ctx, tt.key, "", out, tt.ignoreNotFound) - if tt.expectNotFoundErr { - if err == nil || !storage.IsNotFound(err) { - t.Errorf("#%d: expecting not found error, but get: %s", i, err) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out := &example.Pod{} + err := store.Get(ctx, tt.key, tt.rv, out, tt.ignoreNotFound) + if tt.expectNotFoundErr { + if err == nil || !storage.IsNotFound(err) { + t.Errorf("expecting not found error, but get: %v", err) + } + return } - continue - } - if err != nil { - t.Fatalf("Get failed: %v", err) - } - if !reflect.DeepEqual(tt.expectedOut, out) { - t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedOut, out) - } + if tt.expectRVTooLarge { + if err == nil || !storage.IsTooLargeResourceVersion(err) { + t.Errorf("expecting resource version too high error, but get: %v", err) + } + return + } + if err != nil { + t.Fatalf("Get failed: %v", err) + } + if !reflect.DeepEqual(tt.expectedOut, out) { + t.Errorf("pod want=%#v, get=%#v", tt.expectedOut, out) + } + }) } } @@ -301,14 +344,34 @@ func TestGetToList(t *testing.T) { defer cluster.Terminate(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + currentRV, _ := strconv.Atoi(storedObj.ResourceVersion) + tests := []struct { - key string - pred storage.SelectionPredicate - expectedOut []*example.Pod + key string + pred storage.SelectionPredicate + expectedOut []*example.Pod + rv string + expectRVTooLarge bool }{{ // test GetToList on existing key key: key, pred: storage.Everything, expectedOut: []*example.Pod{storedObj}, + }, { // test GetToList on existing key with minimum resource version set to 0 + key: key, + pred: storage.Everything, + expectedOut: []*example.Pod{storedObj}, + rv: "0", + }, { // test GetToList on existing key with minimum resource version set to current resource version + key: key, + pred: storage.Everything, + expectedOut: []*example.Pod{storedObj}, + rv: fmt.Sprintf("%d", currentRV), + }, { // test GetToList on existing key with minimum resource version set too high + key: key, + pred: storage.Everything, + expectedOut: []*example.Pod{storedObj}, + rv: fmt.Sprintf("%d", currentRV+1), + expectRVTooLarge: true, }, { // test GetToList on non-existing key key: "/non-existing", pred: storage.Everything, @@ -328,7 +391,15 @@ func TestGetToList(t *testing.T) { for i, tt := range tests { out := &example.PodList{} - err := store.GetToList(ctx, tt.key, "", tt.pred, out) + err := store.GetToList(ctx, tt.key, tt.rv, tt.pred, out) + + if tt.expectRVTooLarge { + if err == nil || !storage.IsTooLargeResourceVersion(err) { + t.Errorf("#%d: expecting resource version too high error, but get: %s", i, err) + } + continue + } + if err != nil { t.Fatalf("GetToList failed: %v", err) } @@ -838,6 +909,7 @@ func TestList(t *testing.T) { expectContinue bool expectedRemainingItemCount *int64 expectError bool + expectRVTooLarge bool }{ { name: "rejects invalid resource version", @@ -858,12 +930,32 @@ func TestList(t *testing.T) { rv: "1", expectError: true, }, + { + name: "rejects resource version set too high", + prefix: "/", + rv: fmt.Sprintf("%d", continueRV+1), + expectRVTooLarge: true, + }, { name: "test List on existing key", prefix: "/one-level/", pred: storage.Everything, expectedOut: []*example.Pod{preset[0].storedObj}, }, + { + name: "test List on existing key with minimum resource version set to 0", + prefix: "/one-level/", + pred: storage.Everything, + expectedOut: []*example.Pod{preset[0].storedObj}, + rv: "0", + }, + { + name: "test List on existing key with minimum resource version set to current resource version", + prefix: "/one-level/", + pred: storage.Everything, + expectedOut: []*example.Pod{preset[0].storedObj}, + rv: list.ResourceVersion, + }, { name: "test List on non-existing key", prefix: "/non-existing/", @@ -1054,6 +1146,13 @@ func TestList(t *testing.T) { } else { err = store.List(ctx, tt.prefix, tt.rv, tt.pred, out) } + if tt.expectRVTooLarge { + if err == nil || !storage.IsTooLargeResourceVersion(err) { + t.Errorf("(%s): expecting resource version too high error, but get: %s", tt.name, err) + } + continue + } + if (err != nil) != tt.expectError { t.Errorf("(%s): List failed: %v", tt.name, err) } diff --git a/pkg/storage/tests/cacher_test.go b/pkg/storage/tests/cacher_test.go index 80eeea09e..acf7a5218 100644 --- a/pkg/storage/tests/cacher_test.go +++ b/pkg/storage/tests/cacher_test.go @@ -323,7 +323,9 @@ func TestList(t *testing.T) { } } -func TestInfiniteList(t *testing.T) { +// TestTooLargeResourceVersionList ensures that a list request for a resource version higher than available +// in the watch cache completes (does not wait indefinitely) and results in a ResourceVersionTooLarge error. +func TestTooLargeResourceVersionList(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) cacher, v, err := newTestCacher(etcdStorage, 10) @@ -347,6 +349,9 @@ func TestInfiniteList(t *testing.T) { if !errors.IsTimeout(err) { t.Errorf("Unexpected error: %v", err) } + if !storage.IsTooLargeResourceVersion(err) { + t.Errorf("expected 'Too large resource version' cause in error but got: %v", err) + } } func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {