Introduce storagebackend.ConfigForResource

This is a Config specialized for a GroupResource.
It will support generating new resource-specific metrics.

Kubernetes-commit: 85bcd243aa3c8769a5904a1aea44ce704f5e7174
This commit is contained in:
Mike Spreitzer 2021-08-29 01:06:12 -04:00 committed by Kubernetes Publisher
parent b68468b778
commit b225af44fe
11 changed files with 35 additions and 15 deletions

View File

@ -26,9 +26,9 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
) )
// RESTOptions is set of configuration options to generic registries. // RESTOptions is set of resource-specific configuration options to generic registries.
type RESTOptions struct { type RESTOptions struct {
StorageConfig *storagebackend.Config StorageConfig *storagebackend.ConfigForResource
Decorator StorageDecorator Decorator StorageDecorator
EnableGarbageCollection bool EnableGarbageCollection bool

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/apitesting" "k8s.io/apimachinery/pkg/api/apitesting"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1" examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
@ -38,7 +39,7 @@ import (
func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) { func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) {
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion) sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, destroy, err := factory.Create(*sc, nil) s, destroy, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), nil)
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %v", err) t.Fatalf("Error creating storage: %v", err)
} }

View File

@ -36,7 +36,7 @@ import (
// Creates a cacher based given storageConfig. // Creates a cacher based given storageConfig.
func StorageWithCacher() generic.StorageDecorator { func StorageWithCacher() generic.StorageDecorator {
return func( return func(
storageConfig *storagebackend.Config, storageConfig *storagebackend.ConfigForResource,
resourcePrefix string, resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error), keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object, newFunc func() runtime.Object,

View File

@ -2230,7 +2230,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
newListFunc := func() runtime.Object { return &example.PodList{} } newListFunc := func() runtime.Object { return &example.PodList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion) sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc, newFunc) s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc)
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %v", err) t.Fatalf("Error creating storage: %v", err)
} }

View File

@ -27,7 +27,7 @@ import (
// StorageDecorator is a function signature for producing a storage.Interface // StorageDecorator is a function signature for producing a storage.Interface
// and an associated DestroyFunc from given parameters. // and an associated DestroyFunc from given parameters.
type StorageDecorator func( type StorageDecorator func(
config *storagebackend.Config, config *storagebackend.ConfigForResource,
resourcePrefix string, resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error), keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object, newFunc func() runtime.Object,
@ -39,7 +39,7 @@ type StorageDecorator func(
// UndecoratedStorage returns the given a new storage from the given config // UndecoratedStorage returns the given a new storage from the given config
// without any decoration. // without any decoration.
func UndecoratedStorage( func UndecoratedStorage(
config *storagebackend.Config, config *storagebackend.ConfigForResource,
resourcePrefix string, resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error), keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object, newFunc func() runtime.Object,
@ -53,6 +53,6 @@ func UndecoratedStorage(
// NewRawStorage creates the low level kv storage. This is a work-around for current // NewRawStorage creates the low level kv storage. This is a work-around for current
// two layer of same storage interface. // two layer of same storage interface.
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method. // TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
func NewRawStorage(config *storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) { func NewRawStorage(config *storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config, newFunc) return factory.Create(*config, newFunc)
} }

View File

@ -255,7 +255,7 @@ type SimpleRestOptionsFactory struct {
func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{ ret := generic.RESTOptions{
StorageConfig: &f.Options.StorageConfig, StorageConfig: f.Options.StorageConfig.ForResource(resource),
Decorator: generic.UndecoratedStorage, Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: f.Options.EnableGarbageCollection, EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,

View File

@ -46,7 +46,7 @@ type Backend struct {
type StorageFactory interface { type StorageFactory interface {
// New finds the storage destination for the given group and resource. It will // New finds the storage destination for the given group and resource. It will
// return an error if the group has no storage destination configured. // return an error if the group has no storage destination configured.
NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) NewConfig(groupResource schema.GroupResource) (*storagebackend.ConfigForResource, error)
// ResourcePrefix returns the overridden resource prefix for the GroupResource // ResourcePrefix returns the overridden resource prefix for the GroupResource
// This allows for cohabitation of resources with different native types and provides // This allows for cohabitation of resources with different native types and provides
@ -250,7 +250,7 @@ func (s *DefaultStorageFactory) getStorageGroupResource(groupResource schema.Gro
// New finds the storage destination for the given group and resource. It will // New finds the storage destination for the given group and resource. It will
// return an error if the group has no storage destination configured. // return an error if the group has no storage destination configured.
func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) { func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.ConfigForResource, error) {
chosenStorageResource := s.getStorageGroupResource(groupResource) chosenStorageResource := s.getStorageGroupResource(groupResource)
// operate on copy // operate on copy
@ -284,7 +284,7 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*
} }
klog.V(3).Infof("storing %v in %v, reading as %v from %#v", groupResource, codecConfig.StorageVersion, codecConfig.MemoryVersion, codecConfig.Config) klog.V(3).Infof("storing %v in %v, reading as %v from %#v", groupResource, codecConfig.StorageVersion, codecConfig.MemoryVersion, codecConfig.Config)
return &storageConfig, nil return storageConfig.ForResource(groupResource), nil
} }
// Backends returns all backends for all registered storage destinations. // Backends returns all backends for all registered storage destinations.

View File

@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/value" "k8s.io/apiserver/pkg/storage/value"
@ -91,6 +92,23 @@ type Config struct {
StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
} }
// ConfigForResource is a Config specialized to a particular `schema.GroupResource`
type ConfigForResource struct {
// Config is the resource-independent configuration
Config
// GroupResource is the relevant one
GroupResource schema.GroupResource
}
// ForResource specializes to the given resource
func (config *Config) ForResource(resource schema.GroupResource) *ConfigForResource {
return &ConfigForResource{
Config: *config,
GroupResource: resource,
}
}
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
return &Config{ return &Config{
Paging: true, Paging: true,

View File

@ -244,7 +244,7 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
}, nil }, nil
} }
func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) { func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval) stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err

View File

@ -28,7 +28,7 @@ import (
type DestroyFunc func() type DestroyFunc func()
// Create creates a storage backend based on given config. // Create creates a storage backend based on given config.
func Create(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) { func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
switch c.Type { switch c.Type {
case storagebackend.StorageTypeETCD2: case storagebackend.StorageTypeETCD2:
return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)

View File

@ -29,6 +29,7 @@ import (
apitesting "k8s.io/apimachinery/pkg/api/apitesting" apitesting "k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/apis/example"
@ -78,7 +79,7 @@ func TestTLSConnection(t *testing.T) {
}, },
Codec: codec, Codec: codec,
} }
storage, destroyFunc, err := newETCD3Storage(cfg, nil) storage, destroyFunc, err := newETCD3Storage(*cfg.ForResource(schema.GroupResource{Resource: "pods"}), nil)
defer destroyFunc() defer destroyFunc()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)