Extract list response struct to manage all the response fields
Kubernetes-commit: 78a6402755905f14d72ee47fdb1e169dbdaa6853
This commit is contained in:
		
							parent
							
								
									2cad252f0c
								
							
						
					
					
						commit
						62c90b1dff
					
				|  | @ -825,20 +825,25 @@ func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool { | |||
| 	return noLabelSelector && noFieldSelector && hasLimit | ||||
| } | ||||
| 
 | ||||
| func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) { | ||||
| func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) (listResp, string, error) { | ||||
| 	if !recursive { | ||||
| 		obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key) | ||||
| 		if err != nil { | ||||
| 			return nil, 0, "", err | ||||
| 			return listResp{}, "", err | ||||
| 		} | ||||
| 		if exists { | ||||
| 			return []interface{}{obj}, readResourceVersion, "", nil | ||||
| 			return listResp{Items: []interface{}{obj}, ResourceVersion: readResourceVersion}, "", nil | ||||
| 		} | ||||
| 		return nil, readResourceVersion, "", nil | ||||
| 		return listResp{ResourceVersion: readResourceVersion}, "", nil | ||||
| 	} | ||||
| 	return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx)) | ||||
| } | ||||
| 
 | ||||
| type listResp struct { | ||||
| 	Items           []interface{} | ||||
| 	ResourceVersion uint64 | ||||
| } | ||||
| 
 | ||||
| // GetList implements storage.Interface
 | ||||
| func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { | ||||
| 	recursive := opts.Recursive | ||||
|  | @ -914,7 +919,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio | |||
| 		return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) | ||||
| 	} | ||||
| 
 | ||||
| 	objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive) | ||||
| 	resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive) | ||||
| 	success := "true" | ||||
| 	fallback := "false" | ||||
| 	if err != nil { | ||||
|  | @ -933,7 +938,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio | |||
| 	if consistentRead { | ||||
| 		metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1) | ||||
| 	} | ||||
| 	span.AddEvent("Listed items from cache", attribute.Int("count", len(objs))) | ||||
| 	span.AddEvent("Listed items from cache", attribute.Int("count", len(resp.Items))) | ||||
| 	// store pointer of eligible objects,
 | ||||
| 	// Why not directly put object in the items of listObj?
 | ||||
| 	//   the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
 | ||||
|  | @ -942,7 +947,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio | |||
| 	var lastSelectedObjectKey string | ||||
| 	var hasMoreListItems bool | ||||
| 	limit := computeListLimit(opts) | ||||
| 	for i, obj := range objs { | ||||
| 	for i, obj := range resp.Items { | ||||
| 		elem, ok := obj.(*storeElement) | ||||
| 		if !ok { | ||||
| 			return fmt.Errorf("non *storeElement returned from storage: %v", obj) | ||||
|  | @ -952,7 +957,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio | |||
| 			lastSelectedObjectKey = elem.Key | ||||
| 		} | ||||
| 		if limit > 0 && int64(len(selectedObjects)) >= limit { | ||||
| 			hasMoreListItems = i < len(objs)-1 | ||||
| 			hasMoreListItems = i < len(resp.Items)-1 | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
|  | @ -969,16 +974,16 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio | |||
| 	} | ||||
| 	span.AddEvent("Filtered items", attribute.Int("count", listVal.Len())) | ||||
| 	if c.versioner != nil { | ||||
| 		continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts) | ||||
| 		continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(resp.ResourceVersion), int64(len(resp.Items)), hasMoreListItems, opts) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil { | ||||
| 		if err = c.versioner.UpdateList(listObj, resp.ResourceVersion, continueValue, remainingItemCount); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(objs), listVal.Len()) | ||||
| 	metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(resp.Items), listVal.Len()) | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -3079,7 +3079,7 @@ func TestListIndexer(t *testing.T) { | |||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			pred := storagetesting.CreatePodPredicate(tt.fieldSelector, true, tt.indexFields) | ||||
| 			_, _, usedIndex, err := cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive) | ||||
| 			_, usedIndex, err := cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("Unexpected error: %v", err) | ||||
| 			} | ||||
|  |  | |||
|  | @ -451,7 +451,7 @@ func (s sortableStoreElements) Swap(i, j int) { | |||
| 
 | ||||
| // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
 | ||||
| // with their ResourceVersion and the name of the index, if any, that was used.
 | ||||
| func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) { | ||||
| func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (resp listResp, index string, err error) { | ||||
| 	requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) | ||||
| 	if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) { | ||||
| 		w.waitingUntilFresh.Add() | ||||
|  | @ -463,32 +463,34 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion | |||
| 
 | ||||
| 	defer w.RUnlock() | ||||
| 	if err != nil { | ||||
| 		return result, rv, index, err | ||||
| 		return listResp{}, "", err | ||||
| 	} | ||||
| 	var prefixFilteredAndOrdered bool | ||||
| 	result, rv, index, prefixFilteredAndOrdered, err = func() ([]interface{}, uint64, string, bool, error) { | ||||
| 		// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
 | ||||
| 		// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
 | ||||
| 		// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
 | ||||
| 		// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
 | ||||
| 		for _, matchValue := range matchValues { | ||||
| 			if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { | ||||
| 				return result, w.resourceVersion, matchValue.IndexName, false, nil | ||||
| 			} | ||||
| 		} | ||||
| 		if store, ok := w.store.(orderedLister); ok { | ||||
| 			result, _ := store.ListPrefix(key, "", 0) | ||||
| 			return result, w.resourceVersion, "", true, nil | ||||
| 		} | ||||
| 		return w.store.List(), w.resourceVersion, "", false, nil | ||||
| 	}() | ||||
| 	if !prefixFilteredAndOrdered { | ||||
| 		result, err = filterPrefixAndOrder(key, result) | ||||
| 		if err != nil { | ||||
| 			return nil, 0, "", err | ||||
| 	// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
 | ||||
| 	// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
 | ||||
| 	// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
 | ||||
| 	// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
 | ||||
| 	for _, matchValue := range matchValues { | ||||
| 		if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { | ||||
| 			result, err = filterPrefixAndOrder(key, result) | ||||
| 			return listResp{ | ||||
| 				Items:           result, | ||||
| 				ResourceVersion: w.resourceVersion, | ||||
| 			}, matchValue.IndexName, err | ||||
| 		} | ||||
| 	} | ||||
| 	return result, w.resourceVersion, index, nil | ||||
| 	if store, ok := w.store.(orderedLister); ok { | ||||
| 		result, _ := store.ListPrefix(key, "", 0) | ||||
| 		return listResp{ | ||||
| 			Items:           result, | ||||
| 			ResourceVersion: w.resourceVersion, | ||||
| 		}, "", nil | ||||
| 	} | ||||
| 	result := w.store.List() | ||||
| 	result, err = filterPrefixAndOrder(key, result) | ||||
| 	return listResp{ | ||||
| 		Items:           result, | ||||
| 		ResourceVersion: w.resourceVersion, | ||||
| 	}, "", err | ||||
| } | ||||
| 
 | ||||
| func filterPrefixAndOrder(prefix string, items []interface{}) ([]interface{}, error) { | ||||
|  |  | |||
|  | @ -462,15 +462,15 @@ func TestWaitUntilFreshAndList(t *testing.T) { | |||
| 	}() | ||||
| 
 | ||||
| 	// list by empty MatchValues.
 | ||||
| 	list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil) | ||||
| 	resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	if resourceVersion != 5 { | ||||
| 		t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) | ||||
| 	if resp.ResourceVersion != 5 { | ||||
| 		t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) | ||||
| 	} | ||||
| 	if len(list) != 3 { | ||||
| 		t.Errorf("unexpected list returned: %#v", list) | ||||
| 	if len(resp.Items) != 3 { | ||||
| 		t.Errorf("unexpected list returned: %#v", resp) | ||||
| 	} | ||||
| 	if indexUsed != "" { | ||||
| 		t.Errorf("Used index %q but expected none to be used", indexUsed) | ||||
|  | @ -481,15 +481,15 @@ func TestWaitUntilFreshAndList(t *testing.T) { | |||
| 		{IndexName: "l:label", Value: "value1"}, | ||||
| 		{IndexName: "f:spec.nodeName", Value: "node2"}, | ||||
| 	} | ||||
| 	list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) | ||||
| 	resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	if resourceVersion != 5 { | ||||
| 		t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) | ||||
| 	if resp.ResourceVersion != 5 { | ||||
| 		t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) | ||||
| 	} | ||||
| 	if len(list) != 2 { | ||||
| 		t.Errorf("unexpected list returned: %#v", list) | ||||
| 	if len(resp.Items) != 2 { | ||||
| 		t.Errorf("unexpected list returned: %#v", resp) | ||||
| 	} | ||||
| 	if indexUsed != "l:label" { | ||||
| 		t.Errorf("Used index %q but expected %q", indexUsed, "l:label") | ||||
|  | @ -500,15 +500,15 @@ func TestWaitUntilFreshAndList(t *testing.T) { | |||
| 		{IndexName: "l:not-exist-label", Value: "whatever"}, | ||||
| 		{IndexName: "f:spec.nodeName", Value: "node2"}, | ||||
| 	} | ||||
| 	list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) | ||||
| 	resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	if resourceVersion != 5 { | ||||
| 		t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) | ||||
| 	if resp.ResourceVersion != 5 { | ||||
| 		t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) | ||||
| 	} | ||||
| 	if len(list) != 1 { | ||||
| 		t.Errorf("unexpected list returned: %#v", list) | ||||
| 	if len(resp.Items) != 1 { | ||||
| 		t.Errorf("unexpected list returned: %#v", resp) | ||||
| 	} | ||||
| 	if indexUsed != "f:spec.nodeName" { | ||||
| 		t.Errorf("Used index %q but expected %q", indexUsed, "f:spec.nodeName") | ||||
|  | @ -518,15 +518,15 @@ func TestWaitUntilFreshAndList(t *testing.T) { | |||
| 	matchValues = []storage.MatchValue{ | ||||
| 		{IndexName: "l:not-exist-label", Value: "whatever"}, | ||||
| 	} | ||||
| 	list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) | ||||
| 	resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	if resourceVersion != 5 { | ||||
| 		t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) | ||||
| 	if resp.ResourceVersion != 5 { | ||||
| 		t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) | ||||
| 	} | ||||
| 	if len(list) != 3 { | ||||
| 		t.Errorf("unexpected list returned: %#v", list) | ||||
| 	if len(resp.Items) != 3 { | ||||
| 		t.Errorf("unexpected list returned: %#v", resp) | ||||
| 	} | ||||
| 	if indexUsed != "" { | ||||
| 		t.Errorf("Used index %q but expected none to be used", indexUsed) | ||||
|  | @ -546,15 +546,15 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) { | |||
| 	}() | ||||
| 
 | ||||
| 	// list from future revision. Requires watch cache to request bookmark to get it.
 | ||||
| 	list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil) | ||||
| 	resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	if resourceVersion != 3 { | ||||
| 		t.Errorf("unexpected resourceVersion: %v, expected: 6", resourceVersion) | ||||
| 	if resp.ResourceVersion != 3 { | ||||
| 		t.Errorf("unexpected resourceVersion: %v, expected: 6", resp.ResourceVersion) | ||||
| 	} | ||||
| 	if len(list) != 1 { | ||||
| 		t.Errorf("unexpected list returned: %#v", list) | ||||
| 	if len(resp.Items) != 1 { | ||||
| 		t.Errorf("unexpected list returned: %#v", resp) | ||||
| 	} | ||||
| 	if indexUsed != "" { | ||||
| 		t.Errorf("Used index %q but expected none to be used", indexUsed) | ||||
|  | @ -626,7 +626,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) { | |||
| 				store.Add(makeTestPod("bar", 4)) | ||||
| 			}() | ||||
| 
 | ||||
| 			_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil) | ||||
| 			_, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil) | ||||
| 			if !errors.IsTimeout(err) { | ||||
| 				t.Errorf("expected timeout error but got: %v", err) | ||||
| 			} | ||||
|  | @ -655,12 +655,12 @@ func TestReflectorForWatchCache(t *testing.T) { | |||
| 	defer store.Stop() | ||||
| 
 | ||||
| 	{ | ||||
| 		_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil) | ||||
| 		resp, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected error: %v", err) | ||||
| 		} | ||||
| 		if version != 0 { | ||||
| 			t.Errorf("unexpected resource version: %d", version) | ||||
| 		if resp.ResourceVersion != 0 { | ||||
| 			t.Errorf("unexpected resource version: %d", resp.ResourceVersion) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
|  | @ -678,12 +678,12 @@ func TestReflectorForWatchCache(t *testing.T) { | |||
| 	r.ListAndWatch(wait.NeverStop) | ||||
| 
 | ||||
| 	{ | ||||
| 		_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil) | ||||
| 		resp, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected error: %v", err) | ||||
| 		} | ||||
| 		if version != 10 { | ||||
| 			t.Errorf("unexpected resource version: %d", version) | ||||
| 		if resp.ResourceVersion != 10 { | ||||
| 			t.Errorf("unexpected resource version: %d", resp.ResourceVersion) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue