From 034ed25bbe02bfa25fb30072ee3178d14fc06598 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Mon, 1 Apr 2019 14:44:19 -0700 Subject: [PATCH] Add RemainingItemCount to ListMeta Kubernetes-commit: 58c18309a84f9e0fe05b92c202616e4ecf8062f7 --- pkg/storage/cacher/cacher.go | 4 ++-- pkg/storage/cacher/cacher_whitebox_test.go | 3 ++- pkg/storage/etcd/api_object_versioner.go | 3 ++- pkg/storage/etcd3/store.go | 15 +++++++++---- pkg/storage/etcd3/store_test.go | 25 +++++++++++++--------- pkg/storage/interfaces.go | 10 +++++---- 6 files changed, 38 insertions(+), 22 deletions(-) diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index 4bc8d3239..51c3d36ca 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -604,7 +604,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri } } if c.versioner != nil { - if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil { + if err := c.versioner.UpdateList(listObj, readResourceVersion, "", 0); err != nil { return err } } @@ -679,7 +679,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p } trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len())) if c.versioner != nil { - if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil { + if err := c.versioner.UpdateList(listObj, readResourceVersion, "", 0); err != nil { return err } } diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 9f9016dd8..a737b9ccb 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -217,13 +217,14 @@ type testVersioner struct{} func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error { return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10)) } -func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error { +func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string, count int64) error { listAccessor, err := meta.ListAccessor(obj) if err != nil || listAccessor == nil { return err } listAccessor.SetResourceVersion(strconv.FormatUint(resourceVersion, 10)) listAccessor.SetContinue(continueValue) + listAccessor.SetRemainingItemCount(count) return nil } func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error { diff --git a/pkg/storage/etcd/api_object_versioner.go b/pkg/storage/etcd/api_object_versioner.go index 2fed9f486..dc2b35da0 100644 --- a/pkg/storage/etcd/api_object_versioner.go +++ b/pkg/storage/etcd/api_object_versioner.go @@ -44,7 +44,7 @@ func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uin } // UpdateList implements Versioner -func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string) error { +func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string, count int64) error { listAccessor, err := meta.ListAccessor(obj) if err != nil || listAccessor == nil { return err @@ -55,6 +55,7 @@ func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint6 } listAccessor.SetResourceVersion(versionString) listAccessor.SetContinue(nextKey) + listAccessor.SetRemainingItemCount(count) return nil } diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 34e913617..1216b1f1e 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -413,7 +413,7 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin } } // update version with cluster level revision - return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "") + return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "", 0) } func (s *store) Count(key string) (int64, error) { @@ -576,9 +576,10 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor // loop until we have filled the requested limit from etcd or there are no more results var lastKey []byte var hasMore bool + var getResp *clientv3.GetResponse for { startTime := time.Now() - getResp, err := s.client.KV.Get(ctx, key, options...) + getResp, err = s.client.KV.Get(ctx, key, options...) metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) if err != nil { return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) @@ -639,11 +640,17 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor if err != nil { return err } - return s.versioner.UpdateList(listObj, uint64(returnedRV), next) + remainingItemCount := getResp.Count - pred.Limit + // getResp.Count counts in objects that do not match the pred. + // Instead of returning inaccurate count, return 0. + if !pred.Empty() { + remainingItemCount = 0 + } + return s.versioner.UpdateList(listObj, uint64(returnedRV), next, remainingItemCount) } // no continuation - return s.versioner.UpdateList(listObj, uint64(returnedRV), "") + return s.versioner.UpdateList(listObj, uint64(returnedRV), "", 0) } // growSlice takes a slice value and grows its capacity up diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index 8f4ae504a..4d4c1e158 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -825,14 +825,15 @@ func TestList(t *testing.T) { } tests := []struct { - name string - disablePaging bool - rv string - prefix string - pred storage.SelectionPredicate - expectedOut []*example.Pod - expectContinue bool - expectError bool + name string + disablePaging bool + rv string + prefix string + pred storage.SelectionPredicate + expectedOut []*example.Pod + expectContinue bool + expectedRemainingItemCount int64 + expectError bool }{ { name: "rejects invalid resource version", @@ -882,8 +883,9 @@ func TestList(t *testing.T) { Field: fields.Everything(), Limit: 1, }, - expectedOut: []*example.Pod{preset[1].storedObj}, - expectContinue: true, + expectedOut: []*example.Pod{preset[1].storedObj}, + expectContinue: true, + expectedRemainingItemCount: 1, }, { name: "test List with limit when paging disabled", @@ -1061,6 +1063,9 @@ func TestList(t *testing.T) { t.Errorf("(%s): length of list want=%d, got=%d", tt.name, len(tt.expectedOut), len(out.Items)) continue } + if e, a := tt.expectedRemainingItemCount, out.ListMeta.RemainingItemCount; e != a { + t.Errorf("(%s): remainingItemCount want=%d, got=%d", tt.name, e, a) + } for j, wantPod := range tt.expectedOut { getPod := &out.Items[j] if !reflect.DeepEqual(wantPod, getPod) { diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index 108f2acb4..f2a1f1053 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -40,10 +40,12 @@ type Versioner interface { // from database. UpdateObject(obj runtime.Object, resourceVersion uint64) error // UpdateList sets the resource version into an API list object. Returns an error if the object - // cannot be updated correctly. May return nil if the requested object does not need metadata - // from database. continueValue is optional and indicates that more results are available if - // the client passes that value to the server in a subsequent call. - UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error + // cannot be updated correctly. May return nil if the requested object does not need metadata from + // database. continueValue is optional and indicates that more results are available if the client + // passes that value to the server in a subsequent call. remainingItemCount indicates the number + // of remaining objects if the list is partial. The remainingItemCount field is omitted during + // serialization if it is set to 0. + UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string, remainingItemCount int64) error // PrepareObjectForStorage should set SelfLink and ResourceVersion to the empty value. Should // return an error if the specified object cannot be updated. PrepareObjectForStorage(obj runtime.Object) error