diff --git a/pkg/storage/cacher/cacher_test.go b/pkg/storage/cacher/cacher_test.go index 0d2dfe73a..5547bf065 100644 --- a/pkg/storage/cacher/cacher_test.go +++ b/pkg/storage/cacher/cacher_test.go @@ -186,7 +186,7 @@ func TestLists(t *testing.T) { t.Parallel() ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true) + storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, server.V3Client.Kubernetes.(*storagetesting.KubernetesRecorder)) }) t.Run("ConsistentList", func(t *testing.T) { diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index dff4930a7..6b6672cb4 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -24,7 +24,6 @@ import ( "os" "reflect" "strings" - "sync/atomic" "testing" "github.com/go-logr/logr" @@ -249,7 +248,7 @@ func TestTransformationFailure(t *testing.T) { func TestList(t *testing.T) { ctx, store, client := testSetup(t) - storagetesting.RunTestList(ctx, t, store, increaseRV(client.Client), false) + storagetesting.RunTestList(ctx, t, store, increaseRV(client.Client), false, client.Kubernetes.(*storagetesting.KubernetesRecorder)) } func TestConsistentList(t *testing.T) { @@ -257,7 +256,7 @@ func TestConsistentList(t *testing.T) { storagetesting.RunTestConsistentList(ctx, t, store, increaseRV(client.Client), false, true, false) } -func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation { +func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *storagetesting.KVRecorder) storagetesting.CallsValidation { return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) { if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects { t.Errorf("unexpected reads: %d, expected: %d", reads, estimatedProcessedObjects) @@ -288,23 +287,23 @@ func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, } func TestListContinuation(t *testing.T) { - ctx, store, client := testSetup(t, withRecorder()) + ctx, store, client := testSetup(t) validation := checkStorageCallsInvariants( - store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder)) + store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*storagetesting.KVRecorder)) storagetesting.RunTestListContinuation(ctx, t, store, validation) } func TestListPaginationRareObject(t *testing.T) { - ctx, store, client := testSetup(t, withRecorder()) + ctx, store, client := testSetup(t) validation := checkStorageCallsInvariants( - store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder)) + store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*storagetesting.KVRecorder)) storagetesting.RunTestListPaginationRareObject(ctx, t, store, validation) } func TestListContinuationWithFilter(t *testing.T) { - ctx, store, client := testSetup(t, withRecorder()) + ctx, store, client := testSetup(t) validation := checkStorageCallsInvariants( - store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder)) + store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*storagetesting.KVRecorder)) storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation) } @@ -521,20 +520,6 @@ func newTestTransformer() value.Transformer { return storagetesting.NewPrefixTransformer([]byte(defaultTestPrefix), false) } -type clientRecorder struct { - reads uint64 - clientv3.KV -} - -func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { - atomic.AddUint64(&r.reads, 1) - return r.KV.Get(ctx, key, opts...) -} - -func (r *clientRecorder) GetReadsAndReset() uint64 { - return atomic.SwapUint64(&r.reads, 0) -} - type setupOptions struct { client func(testing.TB) *kubernetes.Client codec runtime.Codec @@ -545,8 +530,6 @@ type setupOptions struct { groupResource schema.GroupResource transformer value.Transformer leaseConfig LeaseManagerConfig - - recorderEnabled bool } type setupOption func(*setupOptions) @@ -571,12 +554,6 @@ func withLeaseConfig(leaseConfig LeaseManagerConfig) setupOption { } } -func withRecorder() setupOption { - return func(options *setupOptions) { - options.recorderEnabled = true - } -} - func withDefaults(options *setupOptions) { options.client = func(t testing.TB) *kubernetes.Client { return testserver.RunEtcd(t, nil) @@ -600,9 +577,6 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *kub opt(&setupOpts) } client := setupOpts.client(t) - if setupOpts.recorderEnabled { - client.KV = &clientRecorder{KV: client.KV} - } versioner := storage.APIObjectVersioner{} store := newStore( client, diff --git a/pkg/storage/etcd3/testserver/test_server.go b/pkg/storage/etcd3/testserver/test_server.go index b8c5641c0..6d2f1c2f2 100644 --- a/pkg/storage/etcd3/testserver/test_server.go +++ b/pkg/storage/etcd3/testserver/test_server.go @@ -32,6 +32,7 @@ import ( "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" "google.golang.org/grpc" + storagetesting "k8s.io/apiserver/pkg/storage/testing" ) // getAvailablePort returns a TCP port that is available for binding. @@ -129,5 +130,7 @@ func RunEtcd(t testing.TB, cfg *embed.Config) *kubernetes.Client { if err != nil { t.Fatal(err) } + client.KV = storagetesting.NewKVRecorder(client.KV) + client.Kubernetes = storagetesting.NewKubernetesRecorder(client.Kubernetes) return client } diff --git a/pkg/storage/testing/recorder.go b/pkg/storage/testing/recorder.go new file mode 100644 index 000000000..7505c0e2b --- /dev/null +++ b/pkg/storage/testing/recorder.go @@ -0,0 +1,84 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + "sync" + "sync/atomic" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" +) + +type KVRecorder struct { + clientv3.KV + + reads uint64 +} + +func NewKVRecorder(kv clientv3.KV) *KVRecorder { + return &KVRecorder{KV: kv} +} + +func (r *KVRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + atomic.AddUint64(&r.reads, 1) + return r.KV.Get(ctx, key, opts...) +} + +func (r *KVRecorder) GetReadsAndReset() uint64 { + return atomic.SwapUint64(&r.reads, 0) +} + +type KubernetesRecorder struct { + kubernetes.Interface + + mux sync.Mutex + listsPerKey map[string][]RecordedList +} + +func NewKubernetesRecorder(client kubernetes.Interface) *KubernetesRecorder { + return &KubernetesRecorder{ + listsPerKey: make(map[string][]RecordedList), + Interface: client, + } +} + +func (r *KubernetesRecorder) List(ctx context.Context, key string, opts kubernetes.ListOptions) (kubernetes.ListResponse, error) { + recorderKey, ok := ctx.Value(recorderContextKey).(string) + if ok { + r.mux.Lock() + r.listsPerKey[recorderKey] = append(r.listsPerKey[recorderKey], RecordedList{Key: key, ListOptions: opts}) + r.mux.Unlock() + } + return r.Interface.List(ctx, key, opts) +} + +func (r *KubernetesRecorder) ListRequestForKey(key string) []RecordedList { + r.mux.Lock() + defer r.mux.Unlock() + return r.listsPerKey[key] +} + +type RecordedList struct { + Key string + kubernetes.ListOptions +} + +var recorderContextKey recorderKeyType + +type recorderKeyType struct{} diff --git a/pkg/storage/testing/store_tests.go b/pkg/storage/testing/store_tests.go index 7aa5838ba..6610edad1 100644 --- a/pkg/storage/testing/store_tests.go +++ b/pkg/storage/testing/store_tests.go @@ -32,6 +32,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/client/v3/kubernetes" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -39,8 +40,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/value" + utilfeature "k8s.io/apiserver/pkg/util/feature" utilpointer "k8s.io/utils/pointer" ) @@ -624,7 +627,7 @@ func RunTestPreconditionalDeleteWithOnlySuggestionPass(ctx context.Context, t *t expectNoDiff(t, "incorrect pod:", updatedPod, out) } -func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, ignoreWatchCacheTests bool) { +func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, watchCacheEnabled bool, recorder *KubernetesRecorder) { initialRV, createdPods, updatedPod, err := seedMultiLevelData(ctx, store) if err != nil { t.Fatal(err) @@ -670,6 +673,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, inc expectRVTooLarge bool expectRV string expectRVFunc func(string) error + expectEtcdRequest func() []RecordedList }{ { name: "rejects invalid resource version", @@ -813,6 +817,17 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, inc expectContinue: true, expectContinueExact: encodeContinueOrDie(createdPods[1].Name+"\x00", int64(mustAtoi(currentRV))), expectedRemainingItemCount: utilpointer.Int64(1), + expectEtcdRequest: func() []RecordedList { + if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) { + return nil + } + return []RecordedList{ + { + Key: "/registry/pods/second/", + ListOptions: kubernetes.ListOptions{Revision: 0, Limit: 1}, + }, + } + }, }, { name: "test List with limit at current resource version", @@ -1574,7 +1589,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, inc // doesn't automatically preclude some scenarios from happening. t.Parallel() - if ignoreWatchCacheTests && tt.ignoreForWatchCache { + if watchCacheEnabled && tt.ignoreForWatchCache { t.Skip() } @@ -1589,7 +1604,9 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, inc Predicate: tt.pred, Recursive: true, } - err := store.GetList(ctx, tt.prefix, storageOpts, out) + recorderKey := t.Name() + listCtx := context.WithValue(ctx, recorderContextKey, recorderKey) + err := store.GetList(listCtx, tt.prefix, storageOpts, out) if tt.expectRVTooLarge { // TODO: Clasify etcd future revision error as TooLargeResourceVersion if err == nil || !(storage.IsTooLargeResourceVersion(err) || strings.Contains(err.Error(), "etcdserver: mvcc: required revision is a future revision")) { @@ -1636,6 +1653,13 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, inc if !cmp.Equal(tt.expectedRemainingItemCount, out.RemainingItemCount) { t.Fatalf("unexpected remainingItemCount, diff: %s", cmp.Diff(tt.expectedRemainingItemCount, out.RemainingItemCount)) } + if watchCacheEnabled && tt.expectEtcdRequest != nil { + expectEtcdLists := tt.expectEtcdRequest() + etcdLists := recorder.ListRequestForKey(recorderKey) + if !cmp.Equal(expectEtcdLists, etcdLists) { + t.Fatalf("unexpected etcd requests, diff: %s", cmp.Diff(expectEtcdLists, etcdLists)) + } + } }) } }