Add ResourceVersionMatch parameter to make Resource Version semantics consistent for list

Kubernetes-commit: e214f2408b59c745c199645547948a8ad2a87ac2
This commit is contained in:
Joe Betz 2020-05-29 10:44:26 -07:00 committed by Kubernetes Publisher
parent a54a193e04
commit ee219411ed
8 changed files with 233 additions and 30 deletions

View File

@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/validation" "k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -198,6 +199,12 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc
return return
} }
if errs := metainternalversionvalidation.ValidateListOptions(&listOptions); len(errs) > 0 {
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs)
scope.err(err, w, req)
return
}
// transform fields // transform fields
// TODO: DecodeParametersInto should do this. // TODO: DecodeParametersInto should do this.
if listOptions.FieldSelector != nil { if listOptions.FieldSelector != nil {

View File

@ -19,6 +19,8 @@ package handlers
import ( import (
"context" "context"
"fmt" "fmt"
metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
"k8s.io/apimachinery/pkg/runtime/schema"
"math/rand" "math/rand"
"net/http" "net/http"
"net/url" "net/url"
@ -198,6 +200,12 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
return return
} }
if errs := metainternalversionvalidation.ValidateListOptions(&opts); len(errs) > 0 {
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs)
scope.err(err, w, req)
return
}
// transform fields // transform fields
// TODO: DecodeParametersInto should do this. // TODO: DecodeParametersInto should do this.
if opts.FieldSelector != nil { if opts.FieldSelector != nil {

View File

@ -1078,6 +1078,10 @@ func typeToJSON(typeName string) string {
return "string" return "string"
case "v1.DeletionPropagation", "*v1.DeletionPropagation": case "v1.DeletionPropagation", "*v1.DeletionPropagation":
return "string" return "string"
case "v1.ResourceVersionMatch", "*v1.ResourceVersionMatch":
return "string"
case "v1.IncludeObjectPolicy", "*v1.IncludeObjectPolicy":
return "string"
// TODO: Fix these when go-restful supports a way to specify an array query param: // TODO: Fix these when go-restful supports a way to specify an array query param:
// https://github.com/emicklei/go-restful/issues/225 // https://github.com/emicklei/go-restful/issues/225

View File

@ -322,7 +322,7 @@ func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate,
p.Continue = options.Continue p.Continue = options.Continue
list := e.NewListFunc() list := e.NewListFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx) qualifiedResource := e.qualifiedResourceFromContext(ctx)
storageOpts := storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: p} storageOpts := storage.ListOptions{ResourceVersion: options.ResourceVersion, ResourceVersionMatch: options.ResourceVersionMatch, Predicate: p}
if name, ok := p.MatchesSingle(); ok { if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil { if key, err := e.KeyFunc(ctx, name); err == nil {
err := e.Storage.GetToList(ctx, key, storageOpts, list) err := e.Storage.GetToList(ctx, key, storageOpts, list)

View File

@ -580,7 +580,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOpt
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0 hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
if resourceVersion == "" || hasContinuation || hasLimit { if resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
// If resourceVersion is not specified, serve it from underlying // If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is // storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well. // requested, serve it from the underlying storage as well.
@ -654,7 +654,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0 hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
if resourceVersion == "" || hasContinuation || hasLimit { if resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
// If resourceVersion is not specified, serve it from underlying // If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is // storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well. // requested, serve it from the underlying storage as well.
@ -1090,7 +1090,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
Continue: options.Continue, Continue: options.Continue,
} }
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, storage.ListOptions{Predicate: pred}, list); err != nil { if err := lw.storage.List(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersionMatch: options.ResourceVersionMatch, Predicate: pred}, list); err != nil {
return nil, err return nil, err
} }
return list, nil return list, nil

View File

@ -32,6 +32,8 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -63,9 +65,6 @@ var _ value.Context = authenticatedDataString("")
type store struct { type store struct {
client *clientv3.Client client *clientv3.Client
// getOps contains additional options that should be passed
// to all Get() calls.
getOps []clientv3.OpOption
codec runtime.Codec codec runtime.Codec
versioner storage.Versioner versioner storage.Versioner
transformer value.Transformer transformer value.Transformer
@ -115,13 +114,12 @@ func (s *store) Versioner() storage.Versioner {
func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error { func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
key = path.Join(s.pathPrefix, key) key = path.Join(s.pathPrefix, key)
startTime := time.Now() startTime := time.Now()
callOpts := s.getOps getResp, err := s.client.KV.Get(ctx, key)
getResp, err := s.client.KV.Get(ctx, key, callOpts...)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil { if err != nil {
return err return err
} }
if err = s.ensureMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil { if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err return err
} }
@ -252,7 +250,7 @@ func (s *store) GuaranteedUpdate(
getCurrentState := func() (*objState, error) { getCurrentState := func() (*objState, error) {
startTime := time.Now() startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...) getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil { if err != nil {
return nil, err return nil, err
@ -382,10 +380,12 @@ func (s *store) GuaranteedUpdate(
// GetToList implements storage.Interface.GetToList. // GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error { func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := listOpts.ResourceVersion resourceVersion := listOpts.ResourceVersion
match := listOpts.ResourceVersionMatch
pred := listOpts.Predicate pred := listOpts.Predicate
trace := utiltrace.New("GetToList etcd3", trace := utiltrace.New("GetToList etcd3",
utiltrace.Field{"key", key}, utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion}, utiltrace.Field{"resourceVersion", resourceVersion},
utiltrace.Field{"resourceVersionMatch", match},
utiltrace.Field{"limit", pred.Limit}, utiltrace.Field{"limit", pred.Limit},
utiltrace.Field{"continue", pred.Continue}) utiltrace.Field{"continue", pred.Continue})
defer trace.LogIfLong(500 * time.Millisecond) defer trace.LogIfLong(500 * time.Millisecond)
@ -402,12 +402,21 @@ func (s *store) GetToList(ctx context.Context, key string, listOpts storage.List
key = path.Join(s.pathPrefix, key) key = path.Join(s.pathPrefix, key)
startTime := time.Now() startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...) var opts []clientv3.OpOption
if len(resourceVersion) > 0 && match == metav1.ResourceVersionMatchExact {
rv, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
opts = append(opts, clientv3.WithRev(int64(rv)))
}
getResp, err := s.client.KV.Get(ctx, key, opts...)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
if err != nil { if err != nil {
return err return err
} }
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err return err
} }
@ -515,10 +524,12 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error
// List implements storage.Interface.List. // List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
pred := opts.Predicate pred := opts.Predicate
trace := utiltrace.New("List etcd3", trace := utiltrace.New("List etcd3",
utiltrace.Field{"key", key}, utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion}, utiltrace.Field{"resourceVersion", resourceVersion},
utiltrace.Field{"resourceVersionMatch", match},
utiltrace.Field{"limit", pred.Limit}, utiltrace.Field{"limit", pred.Limit},
utiltrace.Field{"continue", pred.Continue}) utiltrace.Field{"continue", pred.Continue})
defer trace.LogIfLong(500 * time.Millisecond) defer trace.LogIfLong(500 * time.Millisecond)
@ -552,6 +563,15 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
newItemFunc := getNewItemFunc(listObj, v) newItemFunc := getNewItemFunc(listObj, v)
var fromRV *uint64
if len(resourceVersion) > 0 {
parsedRV, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
fromRV = &parsedRV
}
var returnedRV, continueRV int64 var returnedRV, continueRV int64
var continueKey string var continueKey string
switch { switch {
@ -577,20 +597,41 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
returnedRV = continueRV returnedRV = continueRV
} }
case s.pagingEnabled && pred.Limit > 0: case s.pagingEnabled && pred.Limit > 0:
if len(resourceVersion) > 0 { if fromRV != nil {
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion) switch match {
if err != nil { case metav1.ResourceVersionMatchNotOlderThan:
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) // The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
returnedRV = int64(*fromRV)
options = append(options, clientv3.WithRev(returnedRV))
case "": // legacy case
if *fromRV > 0 {
returnedRV = int64(*fromRV)
options = append(options, clientv3.WithRev(returnedRV))
} }
if fromRV > 0 { default:
options = append(options, clientv3.WithRev(int64(fromRV))) return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
} }
returnedRV = int64(fromRV)
} }
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd)) options = append(options, clientv3.WithRange(rangeEnd))
default: default:
if fromRV != nil {
switch match {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
returnedRV = int64(*fromRV)
options = append(options, clientv3.WithRev(returnedRV))
case "": // legacy case
default:
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
}
}
options = append(options, clientv3.WithPrefix()) options = append(options, clientv3.WithPrefix())
} }
@ -605,7 +646,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
if err != nil { if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
} }
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err return err
} }
hasMore = getResp.More hasMore = getResp.More
@ -822,9 +863,9 @@ func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, er
return []clientv3.OpOption{clientv3.WithLease(id)}, nil return []clientv3.OpOption{clientv3.WithLease(id)}, nil
} }
// ensureMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is // validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
// greater than the most recent actualRevision available from storage. // greater than the most recent actualRevision available from storage.
func (s *store) ensureMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error { func (s *store) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
if minimumResourceVersion == "" { if minimumResourceVersion == "" {
return nil return nil
} }

View File

@ -373,6 +373,10 @@ func TestConditionalDelete(t *testing.T) {
func TestGetToList(t *testing.T) { func TestGetToList(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
prevKey, prevStoredObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "prev"}})
prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion) currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
@ -382,6 +386,7 @@ func TestGetToList(t *testing.T) {
pred storage.SelectionPredicate pred storage.SelectionPredicate
expectedOut []*example.Pod expectedOut []*example.Pod
rv string rv string
rvMatch metav1.ResourceVersionMatch
expectRVTooLarge bool expectRVTooLarge bool
}{{ // test GetToList on existing key }{{ // test GetToList on existing key
key: key, key: key,
@ -392,11 +397,41 @@ func TestGetToList(t *testing.T) {
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*example.Pod{storedObj}, expectedOut: []*example.Pod{storedObj},
rv: "0", rv: "0",
}, { // test GetToList on existing key with minimum resource version set to 0, match=minimum
key: key,
pred: storage.Everything,
expectedOut: []*example.Pod{storedObj},
rv: "0",
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
}, { // test GetToList on existing key with minimum resource version set to current resource version }, { // test GetToList on existing key with minimum resource version set to current resource version
key: key, key: key,
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*example.Pod{storedObj}, expectedOut: []*example.Pod{storedObj},
rv: fmt.Sprintf("%d", currentRV), rv: fmt.Sprintf("%d", currentRV),
}, { // test GetToList on existing key with minimum resource version set to current resource version, match=minimum
key: key,
pred: storage.Everything,
expectedOut: []*example.Pod{storedObj},
rv: fmt.Sprintf("%d", currentRV),
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
}, { // test GetToList on existing key with minimum resource version set to previous resource version, match=minimum
key: key,
pred: storage.Everything,
expectedOut: []*example.Pod{storedObj},
rv: fmt.Sprintf("%d", prevRV),
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
}, { // test GetToList on existing key with resource version set to current resource version, match=exact
key: key,
pred: storage.Everything,
expectedOut: []*example.Pod{storedObj},
rv: fmt.Sprintf("%d", currentRV),
rvMatch: metav1.ResourceVersionMatchExact,
}, { // test GetToList on existing key with resource version set to previous resource version, match=exact
key: prevKey,
pred: storage.Everything,
expectedOut: []*example.Pod{prevStoredObj},
rv: fmt.Sprintf("%d", prevRV),
rvMatch: metav1.ResourceVersionMatchExact,
}, { // test GetToList on existing key with minimum resource version set too high }, { // test GetToList on existing key with minimum resource version set too high
key: key, key: key,
pred: storage.Everything, pred: storage.Everything,
@ -422,7 +457,7 @@ func TestGetToList(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
out := &example.PodList{} out := &example.PodList{}
err := store.GetToList(ctx, tt.key, storage.ListOptions{ResourceVersion: tt.rv, Predicate: tt.pred}, out) err := store.GetToList(ctx, tt.key, storage.ListOptions{ResourceVersion: tt.rv, ResourceVersionMatch: tt.rvMatch, Predicate: tt.pred}, out)
if tt.expectRVTooLarge { if tt.expectRVTooLarge {
if err == nil || !storage.IsTooLargeResourceVersion(err) { if err == nil || !storage.IsTooLargeResourceVersion(err) {
@ -934,6 +969,7 @@ func TestList(t *testing.T) {
name string name string
disablePaging bool disablePaging bool
rv string rv string
rvMatch metav1.ResourceVersionMatch
prefix string prefix string
pred storage.SelectionPredicate pred storage.SelectionPredicate
expectedOut []*example.Pod expectedOut []*example.Pod
@ -981,6 +1017,31 @@ func TestList(t *testing.T) {
expectedOut: []*example.Pod{preset[0].storedObj}, expectedOut: []*example.Pod{preset[0].storedObj},
rv: "0", rv: "0",
}, },
{
name: "test List on existing key with resource version set to 1, match=Exact",
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{},
rv: "1",
rvMatch: metav1.ResourceVersionMatchExact,
expectRV: "1",
},
{
name: "test List on existing key with resource version set to 1, match=NotOlderThan",
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj},
rv: "0",
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
},
{
name: "test List on existing key with resource version set to 1, match=Invalid",
prefix: "/one-level/",
pred: storage.Everything,
rv: "0",
rvMatch: "Invalid",
expectError: true,
},
{ {
name: "test List on existing key with resource version set to current resource version", name: "test List on existing key with resource version set to current resource version",
prefix: "/one-level/", prefix: "/one-level/",
@ -988,6 +1049,23 @@ func TestList(t *testing.T) {
expectedOut: []*example.Pod{preset[0].storedObj}, expectedOut: []*example.Pod{preset[0].storedObj},
rv: list.ResourceVersion, rv: list.ResourceVersion,
}, },
{
name: "test List on existing key with resource version set to current resource version, match=Exact",
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj},
rv: list.ResourceVersion,
rvMatch: metav1.ResourceVersionMatchExact,
expectRV: list.ResourceVersion,
},
{
name: "test List on existing key with resource version set to current resource version, match=NotOlderThan",
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj},
rv: list.ResourceVersion,
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
},
{ {
name: "test List on non-existing key", name: "test List on non-existing key",
prefix: "/non-existing/", prefix: "/non-existing/",
@ -1029,6 +1107,21 @@ func TestList(t *testing.T) {
rv: list.ResourceVersion, rv: list.ResourceVersion,
expectRV: list.ResourceVersion, expectRV: list.ResourceVersion,
}, },
{
name: "test List with limit at current resource version and match=Exact",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
},
expectedOut: []*example.Pod{preset[1].storedObj},
expectContinue: true,
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
rv: list.ResourceVersion,
rvMatch: metav1.ResourceVersionMatchExact,
expectRV: list.ResourceVersion,
},
{ {
name: "test List with limit at resource version 0", name: "test List with limit at resource version 0",
prefix: "/two-level/", prefix: "/two-level/",
@ -1043,6 +1136,49 @@ func TestList(t *testing.T) {
rv: "0", rv: "0",
expectRV: list.ResourceVersion, expectRV: list.ResourceVersion,
}, },
{
name: "test List with limit at resource version 0 match=NotOlderThan",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
},
expectedOut: []*example.Pod{preset[1].storedObj},
expectContinue: true,
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
rv: "0",
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
expectRV: list.ResourceVersion,
},
{
name: "test List with limit at resource version 1 and match=Exact",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
},
expectedOut: []*example.Pod{},
expectContinue: false,
rv: "1",
rvMatch: metav1.ResourceVersionMatchExact,
expectRV: "1",
},
{
name: "test List with limit at old resource version and match=Exact",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
},
expectedOut: []*example.Pod{},
expectContinue: false,
rv: "1",
rvMatch: metav1.ResourceVersionMatchExact,
expectRV: "1",
},
{ {
name: "test List with limit when paging disabled", name: "test List with limit when paging disabled",
disablePaging: true, disablePaging: true,
@ -1201,7 +1337,7 @@ func TestList(t *testing.T) {
} }
out := &example.PodList{} out := &example.PodList{}
storageOpts := storage.ListOptions{ResourceVersion: tt.rv, Predicate: tt.pred} storageOpts := storage.ListOptions{ResourceVersion: tt.rv, ResourceVersionMatch: tt.rvMatch, Predicate: tt.pred}
var err error var err error
if tt.disablePaging { if tt.disablePaging {
err = disablePagingStore.List(ctx, tt.prefix, storageOpts, out) err = disablePagingStore.List(ctx, tt.prefix, storageOpts, out)

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -189,17 +190,20 @@ type Interface interface {
// Get unmarshals json found at key into objPtr. On a not found error, will either // Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'. // return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
// Treats empty responses and nil response nodes exactly like a not found error. // Treats empty responses and nil response nodes exactly like a not found error.
// The returned contents may be delayed according to the semantics of GetOptions.ResourceVersion. // The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
// GetToList unmarshals json found at key and opaque it into *List api object // GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition). // (an object that satisfies the runtime.IsList definition).
// The returned contents may be delayed according to the semantics of ListOptions.ResourceVersion. // The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
// List unmarshalls jsons found at directory defined by key and opaque them // List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition). // into *List api object (an object that satisfies runtime.IsList definition).
// The returned contents may be delayed according to the semantics of ListOptions.ResourceVersion. // The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
@ -260,6 +264,9 @@ type ListOptions struct {
// ResourceVersion. The newest available data is preferred, but any data not older than this // ResourceVersion. The newest available data is preferred, but any data not older than this
// ResourceVersion may be served. // ResourceVersion may be served.
ResourceVersion string ResourceVersion string
// ResourceVersionMatch provides the rule for how the resource version constraint applies. If set
// to the default value "" the legacy resource version semantic apply.
ResourceVersionMatch metav1.ResourceVersionMatch
// Predicate provides the selection rules for the list operation. // Predicate provides the selection rules for the list operation.
Predicate SelectionPredicate Predicate SelectionPredicate
} }