support continueToken for inconsistent list
Kubernetes-commit: 0a7286c6b21a858f7397a0835776cb5900d98e87
This commit is contained in:
parent
b080aefffc
commit
cc84cfddc9
|
@ -20,6 +20,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
|
||||
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
)
|
||||
|
||||
func interpretWatchError(err error) error {
|
||||
|
@ -30,13 +31,41 @@ func interpretWatchError(err error) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func interpretListError(err error, paging bool) error {
|
||||
const (
|
||||
expired string = "The resourceVersion for the provided list is too old."
|
||||
continueExpired string = "The provided continue parameter is too old " +
|
||||
"to display a consistent list result. You can start a new list without " +
|
||||
"the continue parameter."
|
||||
inconsistentContinue string = "The provided continue parameter is too old " +
|
||||
"to display a consistent list result. You can start a new list without " +
|
||||
"the continue parameter, or use the continue token in this response to " +
|
||||
"retrieve the remainder of the results. Continuing with the provided " +
|
||||
"token results in an inconsistent list - objects that were created, " +
|
||||
"modified, or deleted between the time the first chunk was returned " +
|
||||
"and now may show up in the list."
|
||||
)
|
||||
|
||||
func interpretListError(err error, paging bool, continueKey, keyPrefix string) error {
|
||||
switch {
|
||||
case err == etcdrpc.ErrCompacted:
|
||||
if paging {
|
||||
return errors.NewResourceExpired("The provided from parameter is too old to display a consistent list result. You must start a new list without the from.")
|
||||
return handleCompactedErrorForPaging(continueKey, keyPrefix)
|
||||
}
|
||||
return errors.NewResourceExpired("The resourceVersion for the provided list is too old.")
|
||||
return errors.NewResourceExpired(expired)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func handleCompactedErrorForPaging(continueKey, keyPrefix string) error {
|
||||
// continueToken.ResoureVersion=-1 means that the apiserver can
|
||||
// continue the list at the latest resource version. We don't use rv=0
|
||||
// for this purpose to distinguish from a bad token that has empty rv.
|
||||
newToken, err := encodeContinue(continueKey, keyPrefix, -1)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return errors.NewResourceExpired(continueExpired)
|
||||
}
|
||||
statusError := errors.NewResourceExpired(inconsistentContinue)
|
||||
statusError.ErrStatus.ListMeta.Continue = newToken
|
||||
return statusError
|
||||
}
|
||||
|
|
|
@ -508,10 +508,11 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
|||
options = append(options, clientv3.WithLimit(pred.Limit))
|
||||
}
|
||||
|
||||
var returnedRV int64
|
||||
var returnedRV, continueRV int64
|
||||
var continueKey string
|
||||
switch {
|
||||
case s.pagingEnabled && len(pred.Continue) > 0:
|
||||
continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix)
|
||||
continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
|
||||
}
|
||||
|
@ -524,9 +525,13 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
|||
options = append(options, clientv3.WithRange(rangeEnd))
|
||||
key = continueKey
|
||||
|
||||
options = append(options, clientv3.WithRev(continueRV))
|
||||
returnedRV = continueRV
|
||||
|
||||
// If continueRV > 0, the LIST request needs a specific resource version.
|
||||
// continueRV==0 is invalid.
|
||||
// If continueRV < 0, the request is for the latest resource version.
|
||||
if continueRV > 0 {
|
||||
options = append(options, clientv3.WithRev(continueRV))
|
||||
returnedRV = continueRV
|
||||
}
|
||||
case s.pagingEnabled && pred.Limit > 0:
|
||||
if len(resourceVersion) > 0 {
|
||||
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||
|
@ -563,7 +568,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
|||
for {
|
||||
getResp, err := s.client.KV.Get(ctx, key, options...)
|
||||
if err != nil {
|
||||
return interpretListError(err, len(pred.Continue) > 0)
|
||||
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
|
||||
}
|
||||
hasMore = getResp.More
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
|
@ -44,6 +45,7 @@ import (
|
|||
"k8s.io/apiserver/pkg/apis/example"
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd"
|
||||
storagetests "k8s.io/apiserver/pkg/storage/tests"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
)
|
||||
|
@ -1180,6 +1182,153 @@ func TestListContinuation(t *testing.T) {
|
|||
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) {
|
||||
t.Fatalf("Unexpected third page: %#v", out.Items)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestListInconsistentContinuation(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
// /
|
||||
// - one-level/
|
||||
// | - test
|
||||
// |
|
||||
// - two-level/
|
||||
// - 1/
|
||||
// | - test
|
||||
// |
|
||||
// - 2/
|
||||
// - test
|
||||
//
|
||||
preset := []struct {
|
||||
key string
|
||||
obj *example.Pod
|
||||
storedObj *example.Pod
|
||||
}{
|
||||
{
|
||||
key: "/one-level/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||
},
|
||||
{
|
||||
key: "/two-level/1/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||
},
|
||||
{
|
||||
key: "/two-level/2/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
|
||||
},
|
||||
}
|
||||
|
||||
for i, ps := range preset {
|
||||
preset[i].storedObj = &example.Pod{}
|
||||
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
|
||||
return storage.SelectionPredicate{
|
||||
Limit: limit,
|
||||
Continue: continueValue,
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
out := &example.PodList{}
|
||||
if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil {
|
||||
t.Fatalf("Unable to get initial list: %v", err)
|
||||
}
|
||||
if len(out.Continue) == 0 {
|
||||
t.Fatalf("No continuation token set")
|
||||
}
|
||||
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) {
|
||||
t.Fatalf("Unexpected first page: %#v", out.Items)
|
||||
}
|
||||
|
||||
continueFromSecondItem := out.Continue
|
||||
|
||||
// update /two-level/2/test/bar
|
||||
oldName := preset[2].obj.Name
|
||||
newPod := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: oldName,
|
||||
Labels: map[string]string{
|
||||
"state": "new",
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil,
|
||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||
return newPod, nil, nil
|
||||
}, newPod); err != nil {
|
||||
t.Fatalf("update failed: %v", err)
|
||||
}
|
||||
|
||||
// compact to latest revision.
|
||||
versioner := etcd.APIObjectVersioner{}
|
||||
lastRVString := preset[2].storedObj.ResourceVersion
|
||||
lastRV, err := versioner.ParseResourceVersion(lastRVString)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := cluster.Client(0).KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil {
|
||||
t.Fatalf("Unable to compact, %v", err)
|
||||
}
|
||||
|
||||
// The old continue token should have expired
|
||||
err = store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected no error")
|
||||
}
|
||||
if !strings.Contains(err.Error(), inconsistentContinue) {
|
||||
t.Fatalf("unexpected error message %v", err)
|
||||
}
|
||||
status, ok := err.(apierrors.APIStatus)
|
||||
if !ok {
|
||||
t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err))
|
||||
}
|
||||
inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue
|
||||
if len(inconsistentContinueFromSecondItem) == 0 {
|
||||
t.Fatalf("expect non-empty continue token")
|
||||
}
|
||||
|
||||
out = &example.PodList{}
|
||||
if err := store.List(ctx, "/", "0", pred(1, inconsistentContinueFromSecondItem), out); err != nil {
|
||||
t.Fatalf("Unable to get second page: %v", err)
|
||||
}
|
||||
if len(out.Continue) == 0 {
|
||||
t.Fatalf("No continuation token set")
|
||||
}
|
||||
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[1].storedObj) {
|
||||
t.Fatalf("Unexpected second page: %#v", out.Items)
|
||||
}
|
||||
if out.ResourceVersion != lastRVString {
|
||||
t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion)
|
||||
}
|
||||
continueFromThirdItem := out.Continue
|
||||
out = &example.PodList{}
|
||||
if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil {
|
||||
t.Fatalf("Unable to get second page: %v", err)
|
||||
}
|
||||
if len(out.Continue) != 0 {
|
||||
t.Fatalf("Unexpected continuation token set")
|
||||
}
|
||||
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) {
|
||||
t.Fatalf("Unexpected third page: %#v", out.Items)
|
||||
}
|
||||
if out.ResourceVersion != lastRVString {
|
||||
t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion)
|
||||
}
|
||||
}
|
||||
|
||||
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||
|
|
Loading…
Reference in New Issue