Implement page size progressing for rare selectors.
Kubernetes-commit: a2ad9f9e4aba6aae6657a3189bdced6dbc8ba4b5
This commit is contained in:
parent
610f96c654
commit
f4edb394b0
|
@ -48,6 +48,13 @@ import (
|
|||
utiltrace "k8s.io/utils/trace"
|
||||
)
|
||||
|
||||
const (
|
||||
// maxLimit is a maximum page limit increase used when fetching objects from etcd.
|
||||
// This limit is used only for increasing page size by kube-apiserver. If request
|
||||
// specifies larger limit initially, it won't be changed.
|
||||
maxLimit = 10000
|
||||
)
|
||||
|
||||
// authenticatedDataString satisfies the value.Context interface. It uses the key to
|
||||
// authenticate the stored data. This does not defend against reuse of previously
|
||||
// encrypted values under the same key, but will prevent an attacker from using an
|
||||
|
@ -585,11 +592,12 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
|||
keyPrefix := key
|
||||
|
||||
// set the appropriate clientv3 options to filter the returned data set
|
||||
var paging bool
|
||||
var limitOption *clientv3.OpOption
|
||||
var limit int64 = pred.Limit
|
||||
options := make([]clientv3.OpOption, 0, 4)
|
||||
if s.pagingEnabled && pred.Limit > 0 {
|
||||
paging = true
|
||||
options = append(options, clientv3.WithLimit(pred.Limit))
|
||||
options = append(options, clientv3.WithLimit(limit))
|
||||
limitOption = &options[len(options)-1]
|
||||
}
|
||||
|
||||
newItemFunc := getNewItemFunc(listObj, v)
|
||||
|
@ -714,7 +722,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
|||
|
||||
// take items from the response until the bucket is full, filtering as we go
|
||||
for i, kv := range getResp.Kvs {
|
||||
if paging && int64(v.Len()) >= pred.Limit {
|
||||
if limitOption != nil && int64(v.Len()) >= pred.Limit {
|
||||
hasMore = true
|
||||
break
|
||||
}
|
||||
|
@ -740,13 +748,23 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
|||
}
|
||||
|
||||
// no more results remain or we didn't request paging
|
||||
if !hasMore || !paging {
|
||||
if !hasMore || limitOption == nil {
|
||||
break
|
||||
}
|
||||
// we're paging but we have filled our bucket
|
||||
if int64(v.Len()) >= pred.Limit {
|
||||
break
|
||||
}
|
||||
|
||||
if limit < maxLimit {
|
||||
// We got incomplete result due to field/label selector dropping the object.
|
||||
// Double page size to reduce total number of calls to etcd.
|
||||
limit *= 2
|
||||
if limit > maxLimit {
|
||||
limit = maxLimit
|
||||
}
|
||||
*limitOption = clientv3.WithLimit(limit)
|
||||
}
|
||||
key = string(lastKey) + "\x00"
|
||||
if withRev == 0 {
|
||||
withRev = returnedRV
|
||||
|
|
|
@ -1784,6 +1784,67 @@ func TestListContinuation(t *testing.T) {
|
|||
recorder.resetReads()
|
||||
}
|
||||
|
||||
func TestListPaginationRareObject(t *testing.T) {
|
||||
etcdClient := testserver.RunEtcd(t, nil)
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
|
||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||
etcdClient.KV = recorder
|
||||
store := newStore(etcdClient, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
|
||||
podCount := 1000
|
||||
var pods []*example.Pod
|
||||
for i := 0; i < podCount; i++ {
|
||||
key := fmt.Sprintf("/one-level/pod-%d", i)
|
||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i)}}
|
||||
storedObj := &example.Pod{}
|
||||
err := store.Create(ctx, key, obj, storedObj, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
pods = append(pods, storedObj)
|
||||
}
|
||||
|
||||
out := &example.PodList{}
|
||||
options := storage.ListOptions{
|
||||
Predicate: storage.SelectionPredicate{
|
||||
Limit: 1,
|
||||
Label: labels.Everything(),
|
||||
Field: fields.OneTermEqualSelector("metadata.name", "pod-999"),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
},
|
||||
Recursive: true,
|
||||
}
|
||||
if err := store.GetList(ctx, "/", options, out); err != nil {
|
||||
t.Fatalf("Unable to get initial list: %v", err)
|
||||
}
|
||||
if len(out.Continue) != 0 {
|
||||
t.Errorf("Unexpected continuation token set")
|
||||
}
|
||||
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) {
|
||||
t.Fatalf("Unexpected first page: %#v", out.Items)
|
||||
}
|
||||
if transformer.reads != uint64(podCount) {
|
||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||
}
|
||||
// We expect that kube-apiserver will be increasing page sizes
|
||||
// if not full pages are received, so we should see significantly less
|
||||
// than 1000 pages (which would be result of talking to etcd with page size
|
||||
// copied from pred.Limit).
|
||||
// The expected number of calls is n+1 where n is the smallest n so that:
|
||||
// pageSize + pageSize * 2 + pageSize * 4 + ... + pageSize * 2^n >= podCount.
|
||||
// For pageSize = 1, podCount = 1000, we get n+1 = 10, 2 ^ 10 = 1024.
|
||||
if recorder.reads != 10 {
|
||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||
}
|
||||
transformer.resetReads()
|
||||
recorder.resetReads()
|
||||
}
|
||||
|
||||
type clientRecorder struct {
|
||||
reads uint64
|
||||
clientv3.KV
|
||||
|
|
Loading…
Reference in New Issue