Validate requests sent to etcd in TestList "test List with limit" scenario

This adds a regression test to detect fallback to etcd as discovered in https://github.com/kubernetes/kubernetes/issues/132132.

Kubernetes-commit: 4cb6d3d77617f141fefd4994910380c095dac1ad
This commit is contained in:
Marek Siarkowicz 2025-06-12 11:33:02 +02:00 committed by Kubernetes Publisher
parent d76c79a4e9
commit 9b695a5efa
5 changed files with 123 additions and 38 deletions

View File

@ -186,7 +186,7 @@ func TestLists(t *testing.T) {
t.Parallel()
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true)
storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, server.V3Client.Kubernetes.(*storagetesting.KubernetesRecorder))
})
t.Run("ConsistentList", func(t *testing.T) {

View File

@ -24,7 +24,6 @@ import (
"os"
"reflect"
"strings"
"sync/atomic"
"testing"
"github.com/go-logr/logr"
@ -249,7 +248,7 @@ func TestTransformationFailure(t *testing.T) {
func TestList(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestList(ctx, t, store, increaseRV(client.Client), false)
storagetesting.RunTestList(ctx, t, store, increaseRV(client.Client), false, client.Kubernetes.(*storagetesting.KubernetesRecorder))
}
func TestConsistentList(t *testing.T) {
@ -257,7 +256,7 @@ func TestConsistentList(t *testing.T) {
storagetesting.RunTestConsistentList(ctx, t, store, increaseRV(client.Client), false, true, false)
}
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *storagetesting.KVRecorder) storagetesting.CallsValidation {
return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) {
if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects {
t.Errorf("unexpected reads: %d, expected: %d", reads, estimatedProcessedObjects)
@ -288,23 +287,23 @@ func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer,
}
func TestListContinuation(t *testing.T) {
ctx, store, client := testSetup(t, withRecorder())
ctx, store, client := testSetup(t)
validation := checkStorageCallsInvariants(
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*storagetesting.KVRecorder))
storagetesting.RunTestListContinuation(ctx, t, store, validation)
}
func TestListPaginationRareObject(t *testing.T) {
ctx, store, client := testSetup(t, withRecorder())
ctx, store, client := testSetup(t)
validation := checkStorageCallsInvariants(
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*storagetesting.KVRecorder))
storagetesting.RunTestListPaginationRareObject(ctx, t, store, validation)
}
func TestListContinuationWithFilter(t *testing.T) {
ctx, store, client := testSetup(t, withRecorder())
ctx, store, client := testSetup(t)
validation := checkStorageCallsInvariants(
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*storagetesting.KVRecorder))
storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation)
}
@ -521,20 +520,6 @@ func newTestTransformer() value.Transformer {
return storagetesting.NewPrefixTransformer([]byte(defaultTestPrefix), false)
}
type clientRecorder struct {
reads uint64
clientv3.KV
}
func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
atomic.AddUint64(&r.reads, 1)
return r.KV.Get(ctx, key, opts...)
}
func (r *clientRecorder) GetReadsAndReset() uint64 {
return atomic.SwapUint64(&r.reads, 0)
}
type setupOptions struct {
client func(testing.TB) *kubernetes.Client
codec runtime.Codec
@ -545,8 +530,6 @@ type setupOptions struct {
groupResource schema.GroupResource
transformer value.Transformer
leaseConfig LeaseManagerConfig
recorderEnabled bool
}
type setupOption func(*setupOptions)
@ -571,12 +554,6 @@ func withLeaseConfig(leaseConfig LeaseManagerConfig) setupOption {
}
}
func withRecorder() setupOption {
return func(options *setupOptions) {
options.recorderEnabled = true
}
}
func withDefaults(options *setupOptions) {
options.client = func(t testing.TB) *kubernetes.Client {
return testserver.RunEtcd(t, nil)
@ -600,9 +577,6 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *kub
opt(&setupOpts)
}
client := setupOpts.client(t)
if setupOpts.recorderEnabled {
client.KV = &clientRecorder{KV: client.KV}
}
versioner := storage.APIObjectVersioner{}
store := newStore(
client,

View File

@ -32,6 +32,7 @@ import (
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
)
// getAvailablePort returns a TCP port that is available for binding.
@ -129,5 +130,7 @@ func RunEtcd(t testing.TB, cfg *embed.Config) *kubernetes.Client {
if err != nil {
t.Fatal(err)
}
client.KV = storagetesting.NewKVRecorder(client.KV)
client.Kubernetes = storagetesting.NewKubernetesRecorder(client.Kubernetes)
return client
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"context"
"sync"
"sync/atomic"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
)
type KVRecorder struct {
clientv3.KV
reads uint64
}
func NewKVRecorder(kv clientv3.KV) *KVRecorder {
return &KVRecorder{KV: kv}
}
func (r *KVRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
atomic.AddUint64(&r.reads, 1)
return r.KV.Get(ctx, key, opts...)
}
func (r *KVRecorder) GetReadsAndReset() uint64 {
return atomic.SwapUint64(&r.reads, 0)
}
type KubernetesRecorder struct {
kubernetes.Interface
mux sync.Mutex
listsPerKey map[string][]RecordedList
}
func NewKubernetesRecorder(client kubernetes.Interface) *KubernetesRecorder {
return &KubernetesRecorder{
listsPerKey: make(map[string][]RecordedList),
Interface: client,
}
}
func (r *KubernetesRecorder) List(ctx context.Context, key string, opts kubernetes.ListOptions) (kubernetes.ListResponse, error) {
recorderKey, ok := ctx.Value(recorderContextKey).(string)
if ok {
r.mux.Lock()
r.listsPerKey[recorderKey] = append(r.listsPerKey[recorderKey], RecordedList{Key: key, ListOptions: opts})
r.mux.Unlock()
}
return r.Interface.List(ctx, key, opts)
}
func (r *KubernetesRecorder) ListRequestForKey(key string) []RecordedList {
r.mux.Lock()
defer r.mux.Unlock()
return r.listsPerKey[key]
}
type RecordedList struct {
Key string
kubernetes.ListOptions
}
var recorderContextKey recorderKeyType
type recorderKeyType struct{}

View File

@ -32,6 +32,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/v3/kubernetes"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -39,8 +40,10 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilpointer "k8s.io/utils/pointer"
)
@ -624,7 +627,7 @@ func RunTestPreconditionalDeleteWithOnlySuggestionPass(ctx context.Context, t *t
expectNoDiff(t, "incorrect pod:", updatedPod, out)
}
func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, ignoreWatchCacheTests bool) {
func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, watchCacheEnabled bool, recorder *KubernetesRecorder) {
initialRV, createdPods, updatedPod, err := seedMultiLevelData(ctx, store)
if err != nil {
t.Fatal(err)
@ -670,6 +673,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, inc
expectRVTooLarge bool
expectRV string
expectRVFunc func(string) error
expectEtcdRequest func() []RecordedList
}{
{
name: "rejects invalid resource version",
@ -813,6 +817,17 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, inc
expectContinue: true,
expectContinueExact: encodeContinueOrDie(createdPods[1].Name+"\x00", int64(mustAtoi(currentRV))),
expectedRemainingItemCount: utilpointer.Int64(1),
expectEtcdRequest: func() []RecordedList {
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
return nil
}
return []RecordedList{
{
Key: "/registry/pods/second/",
ListOptions: kubernetes.ListOptions{Revision: 0, Limit: 1},
},
}
},
},
{
name: "test List with limit at current resource version",
@ -1574,7 +1589,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, inc
// doesn't automatically preclude some scenarios from happening.
t.Parallel()
if ignoreWatchCacheTests && tt.ignoreForWatchCache {
if watchCacheEnabled && tt.ignoreForWatchCache {
t.Skip()
}
@ -1589,7 +1604,9 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, inc
Predicate: tt.pred,
Recursive: true,
}
err := store.GetList(ctx, tt.prefix, storageOpts, out)
recorderKey := t.Name()
listCtx := context.WithValue(ctx, recorderContextKey, recorderKey)
err := store.GetList(listCtx, tt.prefix, storageOpts, out)
if tt.expectRVTooLarge {
// TODO: Clasify etcd future revision error as TooLargeResourceVersion
if err == nil || !(storage.IsTooLargeResourceVersion(err) || strings.Contains(err.Error(), "etcdserver: mvcc: required revision is a future revision")) {
@ -1636,6 +1653,13 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, inc
if !cmp.Equal(tt.expectedRemainingItemCount, out.RemainingItemCount) {
t.Fatalf("unexpected remainingItemCount, diff: %s", cmp.Diff(tt.expectedRemainingItemCount, out.RemainingItemCount))
}
if watchCacheEnabled && tt.expectEtcdRequest != nil {
expectEtcdLists := tt.expectEtcdRequest()
etcdLists := recorder.ListRequestForKey(recorderKey)
if !cmp.Equal(expectEtcdLists, etcdLists) {
t.Fatalf("unexpected etcd requests, diff: %s", cmp.Diff(expectEtcdLists, etcdLists))
}
}
})
}
}