storage/factory: extend the Create method by newList and resourcePrefix params

Kubernetes-commit: ccabc01093a1344ebb27c32c946e9da3b8e91fd2
This commit is contained in:
Lukasz Szaszkiewicz 2023-07-28 09:53:01 +02:00 committed by Kubernetes Publisher
parent f42313972c
commit f75c503352
13 changed files with 43 additions and 26 deletions

View File

@ -78,7 +78,9 @@ type peerEndpointLeaseReconciler struct {
// NewPeerEndpointLeaseReconciler creates a new peer endpoint lease reconciler
func NewPeerEndpointLeaseReconciler(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (PeerEndpointLeaseReconciler, error) {
leaseStorage, destroyFn, err := storagefactory.Create(*config, nil)
// note that newFunc, newListFunc and resourcePrefix
// can be left blank unless the storage.Watch method is used
leaseStorage, destroyFn, err := storagefactory.Create(*config, nil, nil, "")
if err != nil {
return nil, fmt.Errorf("error creating storage factory: %v", err)
}

View File

@ -89,9 +89,10 @@ func TestPeerEndpointLeaseReconciler(t *testing.T) {
t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} }
newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc, newListFunc, "")
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}
@ -195,9 +196,10 @@ func TestPeerLeaseRemoveEndpoints(t *testing.T) {
t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} }
newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc, newListFunc, "")
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}

View File

@ -39,7 +39,7 @@ import (
func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) {
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, destroy, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), nil)
s, destroy, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), nil, nil, "")
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}

View File

@ -44,7 +44,7 @@ func StorageWithCacher() generic.StorageDecorator {
triggerFuncs storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
s, d, err := generic.NewRawStorage(storageConfig, newFunc)
s, d, err := generic.NewRawStorage(storageConfig, newFunc, newListFunc, resourcePrefix)
if err != nil {
return s, d, err
}

View File

@ -2325,7 +2325,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
newListFunc := func() runtime.Object { return &example.PodList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc, newListFunc, "/pods")
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}

View File

@ -47,12 +47,12 @@ func UndecoratedStorage(
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config, newFunc)
return NewRawStorage(config, newFunc, newListFunc, resourcePrefix)
}
// NewRawStorage creates the low level kv storage. This is a work-around for current
// two layer of same storage interface.
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
func NewRawStorage(config *storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config, newFunc)
func NewRawStorage(config *storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config, newFunc, newListFunc, resourcePrefix)
}

View File

@ -61,7 +61,9 @@ func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3
server.V3Client,
apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion),
newPod,
newPodList,
prefix,
"/pods",
schema.GroupResource{Resource: "pods"},
identity.NewEncryptCheckTransformer(),
pagingEnabled,

View File

@ -97,11 +97,11 @@ type objState struct {
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, prefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
}
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
versioner := storage.APIObjectVersioner{}
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
@ -112,6 +112,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
pathPrefix += "/"
}
// TODO(p0lyn0mial): pass newListFunc and resourcePrefix to the watcher
w := &watcher{
client: c,
codec: codec,

View File

@ -64,6 +64,10 @@ func newPod() runtime.Object {
return &example.Pod{}
}
func newPodList() runtime.Object {
return &example.PodList{}
}
func checkStorageInvariants(etcdClient *clientv3.Client, codec runtime.Codec) storagetesting.KeyValidation {
return func(ctx context.Context, t *testing.T, key string) {
getResp, err := etcdClient.KV.Get(ctx, key)
@ -468,14 +472,16 @@ func (r *clientRecorder) GetReadsAndReset() uint64 {
}
type setupOptions struct {
client func(testing.TB) *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
prefix string
groupResource schema.GroupResource
transformer value.Transformer
pagingEnabled bool
leaseConfig LeaseManagerConfig
client func(testing.TB) *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
newListFunc func() runtime.Object
prefix string
resourcePrefix string
groupResource schema.GroupResource
transformer value.Transformer
pagingEnabled bool
leaseConfig LeaseManagerConfig
recorderEnabled bool
}
@ -520,7 +526,9 @@ func withDefaults(options *setupOptions) {
}
options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
options.newFunc = newPod
options.newListFunc = newPodList
options.prefix = ""
options.resourcePrefix = "/pods"
options.groupResource = schema.GroupResource{Resource: "pods"}
options.transformer = newTestTransformer()
options.pagingEnabled = true
@ -543,7 +551,9 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli
client,
setupOpts.codec,
setupOpts.newFunc,
setupOpts.newListFunc,
setupOpts.prefix,
setupOpts.resourcePrefix,
setupOpts.groupResource,
setupOpts.transformer,
setupOpts.pagingEnabled,

View File

@ -419,7 +419,7 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
}, nil
}
func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
if err != nil {
return nil, nil, err
@ -454,7 +454,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
if transformer == nil {
transformer = identity.NewEncryptCheckTransformer()
}
return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
}
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -30,12 +30,12 @@ import (
type DestroyFunc func()
// Create creates a storage backend based on given config.
func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
func Create(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c, newFunc)
return newETCD3Storage(c, newFunc, newListFunc, resourcePrefix)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}

View File

@ -81,7 +81,7 @@ func TestTLSConnection(t *testing.T) {
},
Codec: codec,
}
storage, destroyFunc, err := newETCD3Storage(*cfg.ForResource(schema.GroupResource{Resource: "pods"}), nil)
storage, destroyFunc, err := newETCD3Storage(*cfg.ForResource(schema.GroupResource{Resource: "pods"}), nil, nil, "")
defer destroyFunc()
if err != nil {
t.Fatal(err)

View File

@ -85,7 +85,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
// test data
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
return server, storage
}
server, etcdStorage := newEtcdTestStorage(t, "")