Merge pull request #132244 from hakuna-matatah/1.33-regression-w-test-validation
1.33 regression - Consistent paginated lists serve from cache Kubernetes-commit: 74210dd399c14582754e933de83a9e44b1d69c69
This commit is contained in:
commit
3b43a1a7f8
2
go.mod
2
go.mod
|
@ -48,7 +48,7 @@ require (
|
||||||
gopkg.in/evanphx/json-patch.v4 v4.12.0
|
gopkg.in/evanphx/json-patch.v4 v4.12.0
|
||||||
gopkg.in/go-jose/go-jose.v2 v2.6.3
|
gopkg.in/go-jose/go-jose.v2 v2.6.3
|
||||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||||
k8s.io/api v0.0.0-20250612195650-7efafe3627c8
|
k8s.io/api v0.0.0-20250616192729-f4a3fcd2245e
|
||||||
k8s.io/apimachinery v0.0.0-20250612195403-e0270fe44c97
|
k8s.io/apimachinery v0.0.0-20250612195403-e0270fe44c97
|
||||||
k8s.io/client-go v0.0.0-20250613134311-62b0aba8c9b6
|
k8s.io/client-go v0.0.0-20250613134311-62b0aba8c9b6
|
||||||
k8s.io/component-base v0.0.0-20250613180050-de927569f3fd
|
k8s.io/component-base v0.0.0-20250613180050-de927569f3fd
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -293,8 +293,8 @@ gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYs
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
k8s.io/api v0.0.0-20250612195650-7efafe3627c8 h1:K1AnJQBQTKLy2C/up2YSFuuQ+OBucYGcDCBO2cafjlQ=
|
k8s.io/api v0.0.0-20250616192729-f4a3fcd2245e h1:YVWWCL8/51Adbyo/pps3kAK1Asi7W8RSFVU9JbJJAkU=
|
||||||
k8s.io/api v0.0.0-20250612195650-7efafe3627c8/go.mod h1:+9QbMyXTXctHAXg3fdhJbuZgyzhYgprCn43M5NqoJzw=
|
k8s.io/api v0.0.0-20250616192729-f4a3fcd2245e/go.mod h1:+9QbMyXTXctHAXg3fdhJbuZgyzhYgprCn43M5NqoJzw=
|
||||||
k8s.io/apimachinery v0.0.0-20250612195403-e0270fe44c97 h1:h2og30eGCCk1GOEZK6+LNhhlydDcWY3wJaWDIs05xR8=
|
k8s.io/apimachinery v0.0.0-20250612195403-e0270fe44c97 h1:h2og30eGCCk1GOEZK6+LNhhlydDcWY3wJaWDIs05xR8=
|
||||||
k8s.io/apimachinery v0.0.0-20250612195403-e0270fe44c97/go.mod h1:EZ7eIfFAwky7ktmG4Pu9XWxBxFG++4dxPDOM0GL3abw=
|
k8s.io/apimachinery v0.0.0-20250612195403-e0270fe44c97/go.mod h1:EZ7eIfFAwky7ktmG4Pu9XWxBxFG++4dxPDOM0GL3abw=
|
||||||
k8s.io/client-go v0.0.0-20250613134311-62b0aba8c9b6 h1:3JROzxB0hC3Re9ERr+S0TtE0NpTg4vEqo+B+KZWREjQ=
|
k8s.io/client-go v0.0.0-20250613134311-62b0aba8c9b6 h1:3JROzxB0hC3Re9ERr+S0TtE0NpTg4vEqo+B+KZWREjQ=
|
||||||
|
|
|
@ -663,6 +663,7 @@ func TestMatchExactResourceVersionFallback(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, tc := range tcs {
|
for _, tc := range tcs {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true)
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
expectStoreRequests := 0
|
expectStoreRequests := 0
|
||||||
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||||
|
@ -760,6 +761,125 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetListNonRecursiveCacheWithConsistentListFromCache(t *testing.T) {
|
||||||
|
// Set feature gates once at the beginning since we only care about ConsistentListFromCache=true and ListFromCacheSnapshot=false
|
||||||
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
|
||||||
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, false)
|
||||||
|
forceRequestWatchProgressSupport(t)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
consistentListFromCache bool
|
||||||
|
expectGetListCallCount int
|
||||||
|
expectGetCurrentRV bool
|
||||||
|
injectRVError bool
|
||||||
|
expectedError error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "ConsistentListFromCache enabled - served from cache",
|
||||||
|
consistentListFromCache: true,
|
||||||
|
expectGetListCallCount: 1,
|
||||||
|
expectGetCurrentRV: true,
|
||||||
|
injectRVError: false,
|
||||||
|
expectedError: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
var getListCount, getCurrentRVCount int
|
||||||
|
backingStorage := &dummyStorage{}
|
||||||
|
|
||||||
|
backingStorage.getListFn = func(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||||
|
getListCount++
|
||||||
|
if tc.injectRVError {
|
||||||
|
return errDummy
|
||||||
|
}
|
||||||
|
podList := listObj.(*example.PodList)
|
||||||
|
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
backingStorage.getRVFn = func(ctx context.Context) (uint64, error) {
|
||||||
|
getCurrentRVCount++
|
||||||
|
rv := uint64(100)
|
||||||
|
err := error(nil)
|
||||||
|
if tc.injectRVError {
|
||||||
|
err = errDummy
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return rv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cacher, v, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
// Wait for cacher to be ready before injecting errors
|
||||||
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready: %v", err)
|
||||||
|
}
|
||||||
|
delegator := NewCacheDelegator(cacher, backingStorage)
|
||||||
|
defer delegator.Stop()
|
||||||
|
|
||||||
|
// Setup test object
|
||||||
|
key := "pods/ns"
|
||||||
|
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}
|
||||||
|
if err := v.UpdateObject(input, 100); err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put object into the store
|
||||||
|
if err := cacher.watchCache.Add(input); err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pred := storage.SelectionPredicate{
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.Everything(),
|
||||||
|
Limit: 500,
|
||||||
|
}
|
||||||
|
result := &example.PodList{}
|
||||||
|
|
||||||
|
// Make the list call with empty RV - delegator will get current RV and use it
|
||||||
|
err = delegator.GetList(context.TODO(), key, storage.ListOptions{
|
||||||
|
ResourceVersion: "",
|
||||||
|
Predicate: pred,
|
||||||
|
Recursive: true,
|
||||||
|
}, result)
|
||||||
|
|
||||||
|
// Verify error matches expectation
|
||||||
|
if !errors.Is(err, tc.expectedError) {
|
||||||
|
t.Errorf("Expected error %v, got: %v", tc.expectedError, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the correct storage method was called
|
||||||
|
if getListCount != tc.expectGetListCallCount {
|
||||||
|
t.Errorf("Expected GetList to be called %d times, but it was called %d times", tc.expectGetListCallCount, getListCount)
|
||||||
|
}
|
||||||
|
if tc.expectGetCurrentRV && getCurrentRVCount == 0 {
|
||||||
|
t.Error("Expected GetCurrentResourceVersion to be called, but it wasn't")
|
||||||
|
}
|
||||||
|
if !tc.expectGetCurrentRV && getCurrentRVCount > 0 {
|
||||||
|
t.Errorf("Expected GetCurrentResourceVersion not to be called, but it was called %d times", getCurrentRVCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
// For successful cache reads, verify the resource version
|
||||||
|
if err == nil {
|
||||||
|
resultRV, err := cacher.versioner.ParseResourceVersion(result.ResourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to parse result resource version: %v", err)
|
||||||
|
}
|
||||||
|
expectedRV := uint64(100)
|
||||||
|
if resultRV != expectedRV {
|
||||||
|
t.Errorf("Expected RV %d but got %d", expectedRV, resultRV)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
func TestGetCacheBypass(t *testing.T) {
|
func TestGetCacheBypass(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
|
|
@ -207,6 +207,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
|
||||||
return c.storage.GetList(ctx, key, opts, listObj)
|
return c.storage.GetList(ctx, key, opts, listObj)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fallbackOpts := opts
|
||||||
if result.ConsistentRead {
|
if result.ConsistentRead {
|
||||||
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
|
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -214,20 +215,28 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
|
||||||
}
|
}
|
||||||
// Setting resource version for consistent read in cache based on current ResourceVersion in etcd.
|
// Setting resource version for consistent read in cache based on current ResourceVersion in etcd.
|
||||||
opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10)
|
opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10)
|
||||||
|
// If continue is not set, we need to set the resource version match to ResourceVersionMatchNotOlderThan to serve latest from cache
|
||||||
|
if opts.Predicate.Continue == "" {
|
||||||
|
opts.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
|
||||||
|
}
|
||||||
}
|
}
|
||||||
err = c.cacher.GetList(ctx, key, opts, listObj)
|
err = c.cacher.GetList(ctx, key, opts, listObj)
|
||||||
success := "true"
|
success := "true"
|
||||||
fallback := "false"
|
fallback := "false"
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.IsResourceExpired(err) {
|
// ResourceExpired error occurs when attempting to list from cache with a specific resourceVersion
|
||||||
return c.storage.GetList(ctx, key, opts, listObj)
|
// that is no longer available in the cache. With ListFromCacheSnapshot feature (1.34+), we can
|
||||||
|
// serve exact resourceVersion requests from cache if available, falling back to storage only when
|
||||||
|
// the requested version is expired.
|
||||||
|
if errors.IsResourceExpired(err) && utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) {
|
||||||
|
return c.storage.GetList(ctx, key, fallbackOpts, listObj)
|
||||||
}
|
}
|
||||||
if result.ConsistentRead {
|
if result.ConsistentRead {
|
||||||
|
// IsTooLargeResourceVersion occurs when the requested RV is higher than cache's current RV
|
||||||
|
// and cache hasn't caught up within the timeout period. Fall back to etcd.
|
||||||
if storage.IsTooLargeResourceVersion(err) {
|
if storage.IsTooLargeResourceVersion(err) {
|
||||||
fallback = "true"
|
fallback = "true"
|
||||||
// Reset resourceVersion during fallback from consistent read.
|
err = c.storage.GetList(ctx, key, fallbackOpts, listObj)
|
||||||
opts.ResourceVersion = ""
|
|
||||||
err = c.storage.GetList(ctx, key, opts, listObj)
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
success = "false"
|
success = "false"
|
||||||
|
|
Loading…
Reference in New Issue