Migrate GetList to Kubernetes client
Kubernetes-commit: a16a364324c218b703d033edf89187aa60d9dd87
This commit is contained in:
		
							parent
							
								
									de27d754e6
								
							
						
					
					
						commit
						eb7f648085
					
				|  | @ -629,7 +629,7 @@ func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts sto | |||
| 
 | ||||
| // GetList implements storage.Interface.
 | ||||
| func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { | ||||
| 	preparedKey, err := s.prepareKey(key) | ||||
| 	keyPrefix, err := s.prepareKey(key) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | @ -654,27 +654,13 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption | |||
| 	// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
 | ||||
| 	// with prefix "/a" will return all three, while with prefix "/a/" will return only
 | ||||
| 	// "/a/b" which is the correct answer.
 | ||||
| 	if opts.Recursive && !strings.HasSuffix(preparedKey, "/") { | ||||
| 		preparedKey += "/" | ||||
| 	if opts.Recursive && !strings.HasSuffix(keyPrefix, "/") { | ||||
| 		keyPrefix += "/" | ||||
| 	} | ||||
| 	keyPrefix := preparedKey | ||||
| 
 | ||||
| 	// set the appropriate clientv3 options to filter the returned data set
 | ||||
| 	var limitOption *clientv3.OpOption | ||||
| 	limit := opts.Predicate.Limit | ||||
| 	var paging bool | ||||
| 	options := make([]clientv3.OpOption, 0, 4) | ||||
| 	if opts.Predicate.Limit > 0 { | ||||
| 		paging = true | ||||
| 		options = append(options, clientv3.WithLimit(limit)) | ||||
| 		limitOption = &options[len(options)-1] | ||||
| 	} | ||||
| 
 | ||||
| 	if opts.Recursive { | ||||
| 		rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) | ||||
| 		options = append(options, clientv3.WithRange(rangeEnd)) | ||||
| 	} | ||||
| 
 | ||||
| 	paging := opts.Predicate.Limit > 0 | ||||
| 	newItemFunc := getNewItemFunc(listObj, v) | ||||
| 
 | ||||
| 	var continueRV, withRev int64 | ||||
|  | @ -684,20 +670,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption | |||
| 		if err != nil { | ||||
| 			return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) | ||||
| 		} | ||||
| 		preparedKey = continueKey | ||||
| 	} | ||||
| 	if withRev, err = s.resolveGetListRev(continueKey, continueRV, opts); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	if withRev != 0 { | ||||
| 		options = append(options, clientv3.WithRev(withRev)) | ||||
| 	} | ||||
| 
 | ||||
| 	// loop until we have filled the requested limit from etcd or there are no more results
 | ||||
| 	var lastKey []byte | ||||
| 	var hasMore bool | ||||
| 	var getResp *clientv3.GetResponse | ||||
| 	var getResp kubernetes.ListResponse | ||||
| 	var numFetched int | ||||
| 	var numEvald int | ||||
| 	// Because these metrics are for understanding the costs of handling LIST requests,
 | ||||
|  | @ -714,24 +695,27 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption | |||
| 
 | ||||
| 	for { | ||||
| 		startTime := time.Now() | ||||
| 		getResp, err = s.client.KV.Get(ctx, preparedKey, options...) | ||||
| 		getResp, err = s.getList(ctx, keyPrefix, opts.Recursive, kubernetes.ListOptions{ | ||||
| 			Revision: withRev, | ||||
| 			Limit:    limit, | ||||
| 			Continue: continueKey, | ||||
| 		}) | ||||
| 		metrics.RecordEtcdRequest(metricsOp, s.groupResourceString, err, startTime) | ||||
| 		if err != nil { | ||||
| 			return interpretListError(err, len(opts.Predicate.Continue) > 0, continueKey, keyPrefix) | ||||
| 		} | ||||
| 		numFetched += len(getResp.Kvs) | ||||
| 		if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil { | ||||
| 		if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Revision)); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		hasMore = getResp.More | ||||
| 		hasMore = int64(len(getResp.Kvs)) < getResp.Count | ||||
| 
 | ||||
| 		if len(getResp.Kvs) == 0 && getResp.More { | ||||
| 		if len(getResp.Kvs) == 0 && hasMore { | ||||
| 			return fmt.Errorf("no results were found, but etcd indicated there were more values remaining") | ||||
| 		} | ||||
| 		// indicate to the client which resource version was returned, and use the same resource version for subsequent requests.
 | ||||
| 		if withRev == 0 { | ||||
| 			withRev = getResp.Header.Revision | ||||
| 			options = append(options, clientv3.WithRev(withRev)) | ||||
| 			withRev = getResp.Revision | ||||
| 		} | ||||
| 
 | ||||
| 		// avoid small allocations for the result slice, since this can be called in many
 | ||||
|  | @ -779,6 +763,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption | |||
| 			// free kv early. Long lists can take O(seconds) to decode.
 | ||||
| 			getResp.Kvs[i] = nil | ||||
| 		} | ||||
| 		continueKey = string(lastKey) + "\x00" | ||||
| 
 | ||||
| 		// no more results remain or we didn't request paging
 | ||||
| 		if !hasMore || !paging { | ||||
|  | @ -796,9 +781,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption | |||
| 			if limit > maxLimit { | ||||
| 				limit = maxLimit | ||||
| 			} | ||||
| 			*limitOption = clientv3.WithLimit(limit) | ||||
| 		} | ||||
| 		preparedKey = string(lastKey) + "\x00" | ||||
| 	} | ||||
| 
 | ||||
| 	if v.IsNil() { | ||||
|  | @ -813,6 +796,26 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption | |||
| 	return s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount) | ||||
| } | ||||
| 
 | ||||
| func (s *store) getList(ctx context.Context, keyPrefix string, recursive bool, options kubernetes.ListOptions) (kubernetes.ListResponse, error) { | ||||
| 	if recursive { | ||||
| 		return s.client.Kubernetes.List(ctx, keyPrefix, options) | ||||
| 	} | ||||
| 	getResp, err := s.client.Kubernetes.Get(ctx, keyPrefix, kubernetes.GetOptions{ | ||||
| 		Revision: options.Revision, | ||||
| 	}) | ||||
| 	var resp kubernetes.ListResponse | ||||
| 	if getResp.KV != nil { | ||||
| 		resp.Kvs = []*mvccpb.KeyValue{getResp.KV} | ||||
| 		resp.Count = 1 | ||||
| 		resp.Revision = getResp.Revision | ||||
| 	} else { | ||||
| 		resp.Kvs = []*mvccpb.KeyValue{} | ||||
| 		resp.Count = 0 | ||||
| 		resp.Revision = getResp.Revision | ||||
| 	} | ||||
| 	return resp, err | ||||
| } | ||||
| 
 | ||||
| // growSlice takes a slice value and grows its capacity up
 | ||||
| // to the maximum of the passed sizes or maxCapacity, whichever
 | ||||
| // is smaller. Above maxCapacity decisions about allocation are left
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue