Cleanup paging parameters in etcd3 store

Kubernetes-commit: 3f4d3b67682335db510f85deb65b322127a3a0a1
This commit is contained in:
Wojciech Tyczyński 2023-10-20 15:38:59 +02:00 committed by Kubernetes Publisher
parent 5bf4f58ab8
commit 324d2bc3cf
9 changed files with 13 additions and 116 deletions

View File

@ -179,12 +179,6 @@ func TestListWithListFromCache(t *testing.T) {
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true) storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
} }
func TestListWithoutPaging(t *testing.T) {
ctx, cacher, terminate := testSetup(t, withoutPaging)
t.Cleanup(terminate)
storagetesting.RunTestListWithoutPaging(ctx, t, cacher)
}
func TestGetListNonRecursive(t *testing.T) { func TestGetListNonRecursive(t *testing.T) {
ctx, cacher, terminate := testSetup(t) ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate) t.Cleanup(terminate)
@ -368,7 +362,6 @@ type setupOptions struct {
resourcePrefix string resourcePrefix string
keyFunc func(runtime.Object) (string, error) keyFunc func(runtime.Object) (string, error)
indexerFuncs map[string]storage.IndexerFunc indexerFuncs map[string]storage.IndexerFunc
pagingEnabled bool
clock clock.WithTicker clock clock.WithTicker
} }
@ -379,7 +372,6 @@ func withDefaults(options *setupOptions) {
options.resourcePrefix = prefix options.resourcePrefix = prefix
options.keyFunc = func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) } options.keyFunc = func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }
options.pagingEnabled = true
options.clock = clock.RealClock{} options.clock = clock.RealClock{}
} }
@ -401,10 +393,6 @@ func withSpecNodeNameIndexerFuncs(options *setupOptions) {
} }
} }
func withoutPaging(options *setupOptions) {
options.pagingEnabled = false
}
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) { func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) {
ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...) ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
return ctx, cacher, tearDown return ctx, cacher, tearDown
@ -417,7 +405,7 @@ func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context
opt(&setupOpts) opt(&setupOpts)
} }
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), setupOpts.pagingEnabled) server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
// Inject one list error to make sure we test the relist case. // Inject one list error to make sure we test the relist case.
wrappedStorage := &storagetesting.StorageInjectingListErrors{ wrappedStorage := &storagetesting.StorageInjectingListErrors{
Interface: etcdStorage, Interface: etcdStorage,

View File

@ -55,7 +55,7 @@ func init() {
func newPod() runtime.Object { return &example.Pod{} } func newPod() runtime.Object { return &example.Pod{} }
func newPodList() runtime.Object { return &example.PodList{} } func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3testing.EtcdTestServer, storage.Interface) { func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New( storage := etcd3.New(
server.V3Client, server.V3Client,
@ -66,7 +66,6 @@ func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3
"/pods", "/pods",
schema.GroupResource{Resource: "pods"}, schema.GroupResource{Resource: "pods"},
identity.NewEncryptCheckTransformer(), identity.NewEncryptCheckTransformer(),
pagingEnabled,
etcd3.NewDefaultLeaseManagerConfig()) etcd3.NewDefaultLeaseManagerConfig())
return server, storage return server, storage
} }

View File

@ -342,7 +342,7 @@ func TestWatchCacheBypass(t *testing.T) {
} }
func TestEmptyWatchEventCache(t *testing.T) { func TestEmptyWatchEventCache(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true) server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
// add a few objects // add a few objects

View File

@ -28,7 +28,7 @@ import (
func TestCacherListerWatcher(t *testing.T) { func TestCacherListerWatcher(t *testing.T) {
prefix := "pods" prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} } fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix, true) server, store := newEtcdTestStorage(t, prefix)
defer server.Terminate(t) defer server.Terminate(t)
objects := []*example.Pod{ objects := []*example.Pod{
@ -62,7 +62,7 @@ func TestCacherListerWatcher(t *testing.T) {
func TestCacherListerWatcherPagination(t *testing.T) { func TestCacherListerWatcherPagination(t *testing.T) {
prefix := "pods" prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} } fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix, true) server, store := newEtcdTestStorage(t, prefix)
defer server.Terminate(t) defer server.Terminate(t)
// We need the list to be sorted by name to later check the alphabetical order of // We need the list to be sorted by name to later check the alphabetical order of

View File

@ -77,7 +77,6 @@ type store struct {
groupResource schema.GroupResource groupResource schema.GroupResource
groupResourceString string groupResourceString string
watcher *watcher watcher *watcher
pagingEnabled bool
leaseManager *leaseManager leaseManager *leaseManager
} }
@ -96,11 +95,11 @@ type objState struct {
} }
// New returns an etcd3 implementation of storage.Interface. // New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface { func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, pagingEnabled, leaseManagerConfig) return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig)
} }
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store { func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) *store {
versioner := storage.APIObjectVersioner{} versioner := storage.APIObjectVersioner{}
// for compatibility with etcd2 impl. // for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'. // no-op for default prefix of '/registry'.
@ -129,7 +128,6 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
codec: codec, codec: codec,
versioner: versioner, versioner: versioner,
transformer: transformer, transformer: transformer,
pagingEnabled: pagingEnabled,
pathPrefix: pathPrefix, pathPrefix: pathPrefix,
groupResource: groupResource, groupResource: groupResource,
groupResourceString: groupResource.String(), groupResourceString: groupResource.String(),
@ -623,7 +621,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
limit := opts.Predicate.Limit limit := opts.Predicate.Limit
var paging bool var paging bool
options := make([]clientv3.OpOption, 0, 4) options := make([]clientv3.OpOption, 0, 4)
if s.pagingEnabled && opts.Predicate.Limit > 0 { if opts.Predicate.Limit > 0 {
paging = true paging = true
options = append(options, clientv3.WithLimit(limit)) options = append(options, clientv3.WithLimit(limit))
limitOption = &options[len(options)-1] limitOption = &options[len(options)-1]
@ -643,7 +641,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
var continueRV, withRev int64 var continueRV, withRev int64
var continueKey string var continueKey string
switch { switch {
case opts.Recursive && s.pagingEnabled && len(opts.Predicate.Continue) > 0: case opts.Recursive && len(opts.Predicate.Continue) > 0:
continueKey, continueRV, err = storage.DecodeContinue(opts.Predicate.Continue, keyPrefix) continueKey, continueRV, err = storage.DecodeContinue(opts.Predicate.Continue, keyPrefix)
if err != nil { if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
@ -668,7 +666,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
case metav1.ResourceVersionMatchExact: case metav1.ResourceVersionMatchExact:
withRev = int64(*fromRV) withRev = int64(*fromRV)
case "": // legacy case case "": // legacy case
if opts.Recursive && s.pagingEnabled && opts.Predicate.Limit > 0 && *fromRV > 0 { if opts.Recursive && opts.Predicate.Limit > 0 && *fromRV > 0 {
withRev = int64(*fromRV) withRev = int64(*fromRV)
} }
default: default:

View File

@ -216,11 +216,6 @@ func TestList(t *testing.T) {
storagetesting.RunTestList(ctx, t, store, compactStorage(client), false) storagetesting.RunTestList(ctx, t, store, compactStorage(client), false)
} }
func TestListWithoutPaging(t *testing.T) {
ctx, store, _ := testSetup(t, withoutPaging())
storagetesting.RunTestListWithoutPaging(ctx, t, store)
}
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation { func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) { return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) {
if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects { if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects {
@ -495,7 +490,6 @@ type setupOptions struct {
resourcePrefix string resourcePrefix string
groupResource schema.GroupResource groupResource schema.GroupResource
transformer value.Transformer transformer value.Transformer
pagingEnabled bool
leaseConfig LeaseManagerConfig leaseConfig LeaseManagerConfig
recorderEnabled bool recorderEnabled bool
@ -517,12 +511,6 @@ func withPrefix(prefix string) setupOption {
} }
} }
func withoutPaging() setupOption {
return func(options *setupOptions) {
options.pagingEnabled = false
}
}
func withLeaseConfig(leaseConfig LeaseManagerConfig) setupOption { func withLeaseConfig(leaseConfig LeaseManagerConfig) setupOption {
return func(options *setupOptions) { return func(options *setupOptions) {
options.leaseConfig = leaseConfig options.leaseConfig = leaseConfig
@ -546,7 +534,6 @@ func withDefaults(options *setupOptions) {
options.resourcePrefix = "/pods" options.resourcePrefix = "/pods"
options.groupResource = schema.GroupResource{Resource: "pods"} options.groupResource = schema.GroupResource{Resource: "pods"}
options.transformer = newTestTransformer() options.transformer = newTestTransformer()
options.pagingEnabled = true
options.leaseConfig = newTestLeaseManagerConfig() options.leaseConfig = newTestLeaseManagerConfig()
} }
@ -571,7 +558,6 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli
setupOpts.resourcePrefix, setupOpts.resourcePrefix,
setupOpts.groupResource, setupOpts.groupResource,
setupOpts.transformer, setupOpts.transformer,
setupOpts.pagingEnabled,
setupOpts.leaseConfig, setupOpts.leaseConfig,
) )
ctx := context.Background() ctx := context.Background()

View File

@ -454,7 +454,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
if transformer == nil { if transformer == nil {
transformer = identity.NewEncryptCheckTransformer() transformer = identity.NewEncryptCheckTransformer()
} }
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, true, c.LeaseManagerConfig), destroyFunc, nil return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig), destroyFunc, nil
} }
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -28,7 +28,6 @@ import (
"sync" "sync"
"testing" "testing"
"github.com/google/go-cmp/cmp"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
@ -1243,79 +1242,6 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com
} }
} }
func RunTestListWithoutPaging(ctx context.Context, t *testing.T, store storage.Interface) {
_, preset, err := seedMultiLevelData(ctx, store)
if err != nil {
t.Fatal(err)
}
getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil
}
tests := []struct {
name string
disablePaging bool
rv string
rvMatch metav1.ResourceVersionMatch
prefix string
pred storage.SelectionPredicate
expectedOut []*example.Pod
expectContinue bool
expectedRemainingItemCount *int64
expectError bool
}{
{
name: "test List with limit when paging disabled",
disablePaging: true,
prefix: "/pods/second/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
},
expectedOut: []*example.Pod{preset[1], preset[2]},
expectContinue: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.pred.GetAttrs == nil {
tt.pred.GetAttrs = getAttrs
}
out := &example.PodList{}
storageOpts := storage.ListOptions{
ResourceVersion: tt.rv,
ResourceVersionMatch: tt.rvMatch,
Predicate: tt.pred,
Recursive: true,
}
if err := store.GetList(ctx, tt.prefix, storageOpts, out); err != nil {
t.Fatalf("GetList failed: %v", err)
return
}
if (len(out.Continue) > 0) != tt.expectContinue {
t.Errorf("unexpected continue token: %q", out.Continue)
}
if len(tt.expectedOut) != len(out.Items) {
t.Fatalf("length of list want=%d, got=%d", len(tt.expectedOut), len(out.Items))
}
if diff := cmp.Diff(tt.expectedRemainingItemCount, out.ListMeta.GetRemainingItemCount()); diff != "" {
t.Errorf("incorrect remainingItemCount: %s", diff)
}
for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j]
expectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), wantPod, getPod)
}
})
}
}
// seedMultiLevelData creates a set of keys with a multi-level structure, returning a resourceVersion // seedMultiLevelData creates a set of keys with a multi-level structure, returning a resourceVersion
// from before any were created along with the full set of objects that were persisted // from before any were created along with the full set of objects that were persisted
func seedMultiLevelData(ctx context.Context, store storage.Interface) (string, []*example.Pod, error) { func seedMultiLevelData(ctx context.Context, store storage.Interface) (string, []*example.Pod, error) {

View File

@ -85,7 +85,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
// test data // test data
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig()) storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig())
return server, storage return server, storage
} }
server, etcdStorage := newEtcdTestStorage(t, "") server, etcdStorage := newEtcdTestStorage(t, "")