Use same 'minimum resource version' semantics both when watch cache is enabled and disabled

Kubernetes-commit: 1406f7adf20a9ceaf5b8270ec9a61dd9e450ae8a
This commit is contained in:
Joe Betz 2018-12-04 16:55:59 -08:00 committed by Kubernetes Publisher
parent fc290d8208
commit 1b75e5416d
6 changed files with 192 additions and 35 deletions

View File

@ -40,6 +40,10 @@ const (
// before terminating request and returning Timeout error with retry
// after suggestion.
blockTimeout = 3 * time.Second
// resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client
// after receiving a 'too high resource version' error.
resourceVersionTooHighRetrySeconds = 1
)
// watchCacheEvent is a single "watch event" that is send to users of
@ -345,8 +349,8 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utilt
}
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= blockTimeout {
// Timeout with retry after 1 second.
return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1)
// Request that the client retry after 'resourceVersionTooHighRetrySeconds' seconds.
return storage.NewTooLargeResourceVersionError(resourceVersion, w.resourceVersion, resourceVersionTooHighRetrySeconds)
}
w.cond.Wait()
}

View File

@ -380,8 +380,11 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
}()
_, _, err := store.WaitUntilFreshAndList(5, nil)
if err == nil {
t.Fatalf("unexpected lack of timeout error")
if !errors.IsTimeout(err) {
t.Errorf("expected timeout error but got: %v", err)
}
if !storage.IsTooLargeResourceVersion(err) {
t.Errorf("expected 'Too large resource version' cause in error but got: %v", err)
}
}

View File

@ -19,6 +19,8 @@ package storage
import (
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
)
@ -168,3 +170,31 @@ func NewInternalError(reason string) InternalError {
func NewInternalErrorf(format string, a ...interface{}) InternalError {
return InternalError{fmt.Sprintf(format, a...)}
}
var tooLargeResourceVersionCauseMsg = "Too large resource version"
// NewTooLargeResourceVersionError returns a timeout error with the given retrySeconds for a request for
// a minimum resource version that is larger than the largest currently available resource version for a requested resource.
func NewTooLargeResourceVersionError(minimumResourceVersion, currentRevision uint64, retrySeconds int) error {
err := errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %d, current: %d", minimumResourceVersion, currentRevision), retrySeconds)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Message: tooLargeResourceVersionCauseMsg}}
return err
}
// IsTooLargeResourceVersion returns true if the error is a TooLargeResourceVersion error.
func IsTooLargeResourceVersion(err error) bool {
if !errors.IsTimeout(err) {
return false
}
switch t := err.(type) {
case errors.APIStatus:
if d := t.Status().Details; d != nil {
for _, cause := range d.Causes {
if cause.Message == tooLargeResourceVersionCauseMsg {
return true
}
}
}
}
return false
}

View File

@ -29,7 +29,6 @@ import (
"time"
"github.com/coreos/etcd/clientv3"
"k8s.io/klog"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@ -41,6 +40,7 @@ import (
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
utiltrace "k8s.io/utils/trace"
)
@ -119,6 +119,9 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out
if err != nil {
return err
}
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
if len(getResp.Kvs) == 0 {
if ignoreNotFound {
@ -398,6 +401,9 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
if err != nil {
return err
}
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
if len(getResp.Kvs) > 0 {
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
@ -559,17 +565,6 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
options = append(options, clientv3.WithRange(rangeEnd))
default:
if len(resourceVersion) > 0 {
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(int64(fromRV)))
}
returnedRV = int64(fromRV)
}
options = append(options, clientv3.WithPrefix())
}
@ -584,6 +579,9 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
}
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
hasMore = getResp.More
if len(getResp.Kvs) == 0 && getResp.More {
@ -798,6 +796,24 @@ func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, er
return []clientv3.OpOption{clientv3.WithLease(id)}, nil
}
// ensureMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
// greater than the most recent actualRevision available from storage.
func (s *store) ensureMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
if minimumResourceVersion == "" {
return nil
}
minimumRV, err := s.versioner.ParseResourceVersion(minimumResourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
// Enforce the storage.Interface guarantee that the resource version of the returned data
// "will be at least 'resourceVersion'".
if minimumRV > actualRevision {
return storage.NewTooLargeResourceVersionError(minimumRV, actualRevision, 0)
}
return nil
}
// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {

View File

@ -183,44 +183,87 @@ func TestCreateWithKeyExist(t *testing.T) {
func TestGet(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
// create an object to test
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
// create an additional object to increment the resource version for pods above the resource version of the foo object
lastUpdatedObj := &example.Pod{}
if err := store.Create(ctx, "bar", &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, lastUpdatedObj, 0); err != nil {
t.Fatalf("Set failed: %v", err)
}
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
lastUpdatedCurrentRV, _ := strconv.Atoi(lastUpdatedObj.ResourceVersion)
tests := []struct {
name string
key string
ignoreNotFound bool
expectNotFoundErr bool
expectRVTooLarge bool
expectedOut *example.Pod
rv string
}{{ // test get on existing item
name: "get existing",
key: key,
ignoreNotFound: false,
expectNotFoundErr: false,
expectedOut: storedObj,
}, { // test get on existing item with minimum resource version set to 0
name: "resource version 0",
key: key,
expectedOut: storedObj,
rv: "0",
}, { // test get on existing item with minimum resource version set to current resource version of the object
name: "current object resource version",
key: key,
expectedOut: storedObj,
rv: fmt.Sprintf("%d", currentRV),
}, { // test get on existing item with minimum resource version set to latest pod resource version
name: "latest resource version",
key: key,
expectedOut: storedObj,
rv: fmt.Sprintf("%d", lastUpdatedCurrentRV),
}, { // test get on existing item with minimum resource version set too high
name: "too high resource version",
key: key,
expectRVTooLarge: true,
rv: fmt.Sprintf("%d", lastUpdatedCurrentRV+1),
}, { // test get on non-existing item with ignoreNotFound=false
name: "get non-existing",
key: "/non-existing",
ignoreNotFound: false,
expectNotFoundErr: true,
}, { // test get on non-existing item with ignoreNotFound=true
name: "get non-existing, ignore not found",
key: "/non-existing",
ignoreNotFound: true,
expectNotFoundErr: false,
expectedOut: &example.Pod{},
}}
for i, tt := range tests {
out := &example.Pod{}
err := store.Get(ctx, tt.key, "", out, tt.ignoreNotFound)
if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) {
t.Errorf("#%d: expecting not found error, but get: %s", i, err)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out := &example.Pod{}
err := store.Get(ctx, tt.key, tt.rv, out, tt.ignoreNotFound)
if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) {
t.Errorf("expecting not found error, but get: %v", err)
}
return
}
continue
}
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if !reflect.DeepEqual(tt.expectedOut, out) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedOut, out)
}
if tt.expectRVTooLarge {
if err == nil || !storage.IsTooLargeResourceVersion(err) {
t.Errorf("expecting resource version too high error, but get: %v", err)
}
return
}
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if !reflect.DeepEqual(tt.expectedOut, out) {
t.Errorf("pod want=%#v, get=%#v", tt.expectedOut, out)
}
})
}
}
@ -301,14 +344,34 @@ func TestGetToList(t *testing.T) {
defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
tests := []struct {
key string
pred storage.SelectionPredicate
expectedOut []*example.Pod
key string
pred storage.SelectionPredicate
expectedOut []*example.Pod
rv string
expectRVTooLarge bool
}{{ // test GetToList on existing key
key: key,
pred: storage.Everything,
expectedOut: []*example.Pod{storedObj},
}, { // test GetToList on existing key with minimum resource version set to 0
key: key,
pred: storage.Everything,
expectedOut: []*example.Pod{storedObj},
rv: "0",
}, { // test GetToList on existing key with minimum resource version set to current resource version
key: key,
pred: storage.Everything,
expectedOut: []*example.Pod{storedObj},
rv: fmt.Sprintf("%d", currentRV),
}, { // test GetToList on existing key with minimum resource version set too high
key: key,
pred: storage.Everything,
expectedOut: []*example.Pod{storedObj},
rv: fmt.Sprintf("%d", currentRV+1),
expectRVTooLarge: true,
}, { // test GetToList on non-existing key
key: "/non-existing",
pred: storage.Everything,
@ -328,7 +391,15 @@ func TestGetToList(t *testing.T) {
for i, tt := range tests {
out := &example.PodList{}
err := store.GetToList(ctx, tt.key, "", tt.pred, out)
err := store.GetToList(ctx, tt.key, tt.rv, tt.pred, out)
if tt.expectRVTooLarge {
if err == nil || !storage.IsTooLargeResourceVersion(err) {
t.Errorf("#%d: expecting resource version too high error, but get: %s", i, err)
}
continue
}
if err != nil {
t.Fatalf("GetToList failed: %v", err)
}
@ -838,6 +909,7 @@ func TestList(t *testing.T) {
expectContinue bool
expectedRemainingItemCount *int64
expectError bool
expectRVTooLarge bool
}{
{
name: "rejects invalid resource version",
@ -858,12 +930,32 @@ func TestList(t *testing.T) {
rv: "1",
expectError: true,
},
{
name: "rejects resource version set too high",
prefix: "/",
rv: fmt.Sprintf("%d", continueRV+1),
expectRVTooLarge: true,
},
{
name: "test List on existing key",
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj},
},
{
name: "test List on existing key with minimum resource version set to 0",
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj},
rv: "0",
},
{
name: "test List on existing key with minimum resource version set to current resource version",
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj},
rv: list.ResourceVersion,
},
{
name: "test List on non-existing key",
prefix: "/non-existing/",
@ -1054,6 +1146,13 @@ func TestList(t *testing.T) {
} else {
err = store.List(ctx, tt.prefix, tt.rv, tt.pred, out)
}
if tt.expectRVTooLarge {
if err == nil || !storage.IsTooLargeResourceVersion(err) {
t.Errorf("(%s): expecting resource version too high error, but get: %s", tt.name, err)
}
continue
}
if (err != nil) != tt.expectError {
t.Errorf("(%s): List failed: %v", tt.name, err)
}

View File

@ -323,7 +323,9 @@ func TestList(t *testing.T) {
}
}
func TestInfiniteList(t *testing.T) {
// TestTooLargeResourceVersionList ensures that a list request for a resource version higher than available
// in the watch cache completes (does not wait indefinitely) and results in a ResourceVersionTooLarge error.
func TestTooLargeResourceVersionList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, v, err := newTestCacher(etcdStorage, 10)
@ -347,6 +349,9 @@ func TestInfiniteList(t *testing.T) {
if !errors.IsTimeout(err) {
t.Errorf("Unexpected error: %v", err)
}
if !storage.IsTooLargeResourceVersion(err) {
t.Errorf("expected 'Too large resource version' cause in error but got: %v", err)
}
}
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {