diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 59305cff5..d7fe75eea 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -458,58 +458,7 @@ func (s *store) GuaranteedUpdate( // GetToList implements storage.Interface.GetToList. func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error { - resourceVersion := listOpts.ResourceVersion - match := listOpts.ResourceVersionMatch - pred := listOpts.Predicate - trace := utiltrace.New("GetToList etcd3", - utiltrace.Field{"key", key}, - utiltrace.Field{"resourceVersion", resourceVersion}, - utiltrace.Field{"resourceVersionMatch", match}, - utiltrace.Field{"limit", pred.Limit}, - utiltrace.Field{"continue", pred.Continue}) - defer trace.LogIfLong(500 * time.Millisecond) - listPtr, err := meta.GetItemsPtr(listObj) - if err != nil { - return err - } - v, err := conversion.EnforcePtr(listPtr) - if err != nil || v.Kind() != reflect.Slice { - return fmt.Errorf("need ptr to slice: %v", err) - } - - newItemFunc := getNewItemFunc(listObj, v) - - key = path.Join(s.pathPrefix, key) - startTime := time.Now() - var opts []clientv3.OpOption - if len(resourceVersion) > 0 && match == metav1.ResourceVersionMatchExact { - rv, err := s.versioner.ParseResourceVersion(resourceVersion) - if err != nil { - return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) - } - opts = append(opts, clientv3.WithRev(int64(rv))) - } - - getResp, err := s.client.KV.Get(ctx, key, opts...) - metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) - if err != nil { - return err - } - if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { - return err - } - - if len(getResp.Kvs) > 0 { - data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key)) - if err != nil { - return storage.NewInternalError(err.Error()) - } - if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil { - return err - } - } - // update version with cluster level revision - return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "", nil) + return s.list(ctx, key, listOpts, listObj, false) } func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Object { @@ -610,10 +559,14 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error // List implements storage.Interface.List. func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + return s.list(ctx, key, opts, listObj, true) +} + +func (s *store) list(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error { resourceVersion := opts.ResourceVersion match := opts.ResourceVersionMatch pred := opts.Predicate - trace := utiltrace.New("List etcd3", + trace := utiltrace.New(fmt.Sprintf("List(recursive=%v) etcd3", recursive), utiltrace.Field{"key", key}, utiltrace.Field{"resourceVersion", resourceVersion}, utiltrace.Field{"resourceVersionMatch", match}, @@ -628,14 +581,13 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, if err != nil || v.Kind() != reflect.Slice { return fmt.Errorf("need ptr to slice: %v", err) } + key = path.Join(s.pathPrefix, key) - if s.pathPrefix != "" { - key = path.Join(s.pathPrefix, key) - } - // We need to make sure the key ended with "/" so that we only get children "directories". - // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three, - // while with prefix "/a/" will return only "/a/b" which is the correct answer. - if !strings.HasSuffix(key, "/") { + // For recursive lists, we need to make sure the key ended with "/" so that we only + // get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys + // with prefix "/a" will return all three, while with prefix "/a/" will return only + // "/a/b" which is the correct answer. + if recursive && !strings.HasSuffix(key, "/") { key += "/" } keyPrefix := key @@ -662,7 +614,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, var returnedRV, continueRV, withRev int64 var continueKey string switch { - case s.pagingEnabled && len(pred.Continue) > 0: + case recursive && s.pagingEnabled && len(pred.Continue) > 0: continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix) if err != nil { return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) @@ -683,7 +635,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, withRev = continueRV returnedRV = continueRV } - case s.pagingEnabled && pred.Limit > 0: + case recursive && s.pagingEnabled && pred.Limit > 0: if fromRV != nil { switch match { case metav1.ResourceVersionMatchNotOlderThan: @@ -719,7 +671,9 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, } } - options = append(options, clientv3.WithPrefix()) + if recursive { + options = append(options, clientv3.WithPrefix()) + } } if withRev != 0 { options = append(options, clientv3.WithRev(withRev)) @@ -740,7 +694,11 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, for { startTime := time.Now() getResp, err = s.client.KV.Get(ctx, key, options...) - metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) + if recursive { + metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) + } else { + metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) + } if err != nil { return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) }