apiserver: stratify versioned informer construction

Kubernetes-commit: ca3f7453464f6866a3bf467c8b9d8e132484cfb4
This commit is contained in:
Dr. Stefan Schimanski 2017-09-06 18:22:23 +02:00 committed by Kubernetes Publisher
parent 8ec769da6b
commit 75cf96f31e
7 changed files with 64 additions and 50 deletions

View File

@ -122,8 +122,6 @@ type Config struct {
// Will default to a value based on secure serving info and available ipv4 IPs. // Will default to a value based on secure serving info and available ipv4 IPs.
ExternalAddress string ExternalAddress string
// SharedInformerFactory provides shared informers for resources
SharedInformerFactory informers.SharedInformerFactory
//=========================================================================== //===========================================================================
// Fields you probably don't care about changing // Fields you probably don't care about changing
//=========================================================================== //===========================================================================
@ -187,6 +185,13 @@ type Config struct {
PublicAddress net.IP PublicAddress net.IP
} }
type RecommendedConfig struct {
Config
// SharedInformerFactory provides shared informers for resources
SharedInformerFactory informers.SharedInformerFactory
}
type SecureServingInfo struct { type SecureServingInfo struct {
// BindAddress is the ip:port to serve on // BindAddress is the ip:port to serve on
BindAddress string BindAddress string
@ -242,6 +247,13 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
} }
} }
// NewRecommendedConfig returns a RecommendedConfig struct with the default values
func NewRecommendedConfig(codecs serializer.CodecFactory) *RecommendedConfig {
return &RecommendedConfig{
Config: *NewConfig(codecs),
}
}
func DefaultOpenAPIConfig(getDefinitions openapicommon.GetOpenAPIDefinitions, scheme *runtime.Scheme) *openapicommon.Config { func DefaultOpenAPIConfig(getDefinitions openapicommon.GetOpenAPIDefinitions, scheme *runtime.Scheme) *openapicommon.Config {
defNamer := apiopenapi.NewDefinitionNamer(scheme) defNamer := apiopenapi.NewDefinitionNamer(scheme)
return &openapicommon.Config{ return &openapicommon.Config{
@ -301,6 +313,13 @@ func (c *Config) ApplyClientCert(clientCAFile string) (*Config, error) {
type completedConfig struct { type completedConfig struct {
*Config *Config
//===========================================================================
// values below here are filled in during completion
//===========================================================================
// SharedInformerFactory provides shared informers for resources
SharedInformerFactory informers.SharedInformerFactory
} }
type CompletedConfig struct { type CompletedConfig struct {
@ -310,7 +329,7 @@ type CompletedConfig struct {
// Complete fills in any fields not set that are required to have valid data and can be derived // Complete fills in any fields not set that are required to have valid data and can be derived
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver. // from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
func (c *Config) Complete() CompletedConfig { func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
if len(c.ExternalAddress) == 0 && c.PublicAddress != nil { if len(c.ExternalAddress) == 0 && c.PublicAddress != nil {
hostAndPort := c.PublicAddress.String() hostAndPort := c.PublicAddress.String()
if c.ReadWritePort != 0 { if c.ReadWritePort != 0 {
@ -385,7 +404,13 @@ func (c *Config) Complete() CompletedConfig {
c.RequestInfoResolver = NewRequestInfoResolver(c) c.RequestInfoResolver = NewRequestInfoResolver(c)
} }
return CompletedConfig{&completedConfig{c}} return CompletedConfig{&completedConfig{c, informers}}
}
// Complete fills in any fields not set that are required to have valid data and can be derived
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
func (c *RecommendedConfig) Complete() CompletedConfig {
return c.Config.Complete(c.SharedInformerFactory)
} }
// New creates a new server which logically combines the handling chain with the passed server. // New creates a new server which logically combines the handling chain with the passed server.

View File

@ -44,7 +44,6 @@ func TestNewWithDelegate(t *testing.T) {
if clientset == nil { if clientset == nil {
t.Fatal("unable to create fake client set") t.Fatal("unable to create fake client set")
} }
delegateConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, delegateConfig.LoopbackClientConfig.Timeout)
delegateHealthzCalled := false delegateHealthzCalled := false
delegateConfig.HealthzChecks = append(delegateConfig.HealthzChecks, healthz.NamedCheck("delegate-health", func(r *http.Request) error { delegateConfig.HealthzChecks = append(delegateConfig.HealthzChecks, healthz.NamedCheck("delegate-health", func(r *http.Request) error {
@ -52,7 +51,8 @@ func TestNewWithDelegate(t *testing.T) {
return fmt.Errorf("delegate failed healthcheck") return fmt.Errorf("delegate failed healthcheck")
})) }))
delegateServer, err := delegateConfig.Complete().New("test", EmptyDelegate) sharedInformers := informers.NewSharedInformerFactory(clientset, delegateConfig.LoopbackClientConfig.Timeout)
delegateServer, err := delegateConfig.Complete(sharedInformers).New("test", EmptyDelegate)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -73,7 +73,6 @@ func TestNewWithDelegate(t *testing.T) {
wrappingConfig.LegacyAPIGroupPrefixes = sets.NewString("/api") wrappingConfig.LegacyAPIGroupPrefixes = sets.NewString("/api")
wrappingConfig.LoopbackClientConfig = &rest.Config{} wrappingConfig.LoopbackClientConfig = &rest.Config{}
wrappingConfig.SwaggerConfig = DefaultSwaggerConfig() wrappingConfig.SwaggerConfig = DefaultSwaggerConfig()
wrappingConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, wrappingConfig.LoopbackClientConfig.Timeout)
wrappingHealthzCalled := false wrappingHealthzCalled := false
wrappingConfig.HealthzChecks = append(wrappingConfig.HealthzChecks, healthz.NamedCheck("wrapping-health", func(r *http.Request) error { wrappingConfig.HealthzChecks = append(wrappingConfig.HealthzChecks, healthz.NamedCheck("wrapping-health", func(r *http.Request) error {
@ -81,7 +80,8 @@ func TestNewWithDelegate(t *testing.T) {
return fmt.Errorf("wrapping failed healthcheck") return fmt.Errorf("wrapping failed healthcheck")
})) }))
wrappingServer, err := wrappingConfig.Complete().New("test", delegateServer) sharedInformers = informers.NewSharedInformerFactory(clientset, wrappingConfig.LoopbackClientConfig.Timeout)
wrappingServer, err := wrappingConfig.Complete(sharedInformers).New("test", delegateServer)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -96,7 +96,6 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
if clientset == nil { if clientset == nil {
t.Fatal("unable to create fake client set") t.Fatal("unable to create fake client set")
} }
config.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, config.LoopbackClientConfig.Timeout)
// TODO restore this test, but right now, eliminate our cycle // TODO restore this test, but right now, eliminate our cycle
// config.OpenAPIConfig = DefaultOpenAPIConfig(testGetOpenAPIDefinitions, runtime.NewScheme()) // config.OpenAPIConfig = DefaultOpenAPIConfig(testGetOpenAPIDefinitions, runtime.NewScheme())
@ -107,7 +106,8 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
// }, // },
// } // }
config.SwaggerConfig = DefaultSwaggerConfig() config.SwaggerConfig = DefaultSwaggerConfig()
config.Complete() sharedInformers := informers.NewSharedInformerFactory(clientset, config.LoopbackClientConfig.Timeout)
config.Complete(sharedInformers)
return etcdServer, *config, assert.New(t) return etcdServer, *config, assert.New(t)
} }
@ -115,7 +115,7 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
func newMaster(t *testing.T) (*GenericAPIServer, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) { func newMaster(t *testing.T) (*GenericAPIServer, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
etcdserver, config, assert := setUp(t) etcdserver, config, assert := setUp(t)
s, err := config.Complete().New("test", EmptyDelegate) s, err := config.Complete(nil).New("test", EmptyDelegate)
if err != nil { if err != nil {
t.Fatalf("Error in bringing up the server: %v", err) t.Fatalf("Error in bringing up the server: %v", err)
} }
@ -147,7 +147,7 @@ func TestInstallAPIGroups(t *testing.T) {
config.LegacyAPIGroupPrefixes = sets.NewString("/apiPrefix") config.LegacyAPIGroupPrefixes = sets.NewString("/apiPrefix")
config.DiscoveryAddresses = discovery.DefaultAddresses{DefaultAddress: "ExternalAddress"} config.DiscoveryAddresses = discovery.DefaultAddresses{DefaultAddress: "ExternalAddress"}
s, err := config.Complete().New("test", EmptyDelegate) s, err := config.Complete(nil).New("test", EmptyDelegate)
if err != nil { if err != nil {
t.Fatalf("Error in bringing up the server: %v", err) t.Fatalf("Error in bringing up the server: %v", err)
} }
@ -351,7 +351,7 @@ func TestCustomHandlerChain(t *testing.T) {
called = true called = true
}) })
s, err := config.Complete().New("test", EmptyDelegate) s, err := config.Complete(nil).New("test", EmptyDelegate)
if err != nil { if err != nil {
t.Fatalf("Error in bringing up the server: %v", err) t.Fatalf("Error in bringing up the server: %v", err)
} }
@ -406,7 +406,7 @@ func TestNotRestRoutesHaveAuth(t *testing.T) {
kubeVersion := fakeVersion() kubeVersion := fakeVersion()
config.Version = &kubeVersion config.Version = &kubeVersion
s, err := config.Complete().New("test", EmptyDelegate) s, err := config.Complete(nil).New("test", EmptyDelegate)
if err != nil { if err != nil {
t.Fatalf("Error in bringing up the server: %v", err) t.Fatalf("Error in bringing up the server: %v", err)
} }

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer" "k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )
@ -63,17 +64,17 @@ func (a *AdmissionOptions) AddFlags(fs *pflag.FlagSet) {
// genericconfig.LoopbackClientConfig // genericconfig.LoopbackClientConfig
// genericconfig.SharedInformerFactory // genericconfig.SharedInformerFactory
// genericconfig.Authorizer // genericconfig.Authorizer
func (a *AdmissionOptions) ApplyTo(serverCfg *server.Config, pluginInitializers ...admission.PluginInitializer) error { func (a *AdmissionOptions) ApplyTo(c *server.Config, informers informers.SharedInformerFactory, pluginInitializers ...admission.PluginInitializer) error {
pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(a.PluginNames, a.ConfigFile) pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(a.PluginNames, a.ConfigFile)
if err != nil { if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err) return fmt.Errorf("failed to read plugin config: %v", err)
} }
clientset, err := kubernetes.NewForConfig(serverCfg.LoopbackClientConfig) clientset, err := kubernetes.NewForConfig(c.LoopbackClientConfig)
if err != nil { if err != nil {
return err return err
} }
genericInitializer, err := initializer.New(clientset, serverCfg.SharedInformerFactory, serverCfg.Authorizer) genericInitializer, err := initializer.New(clientset, informers, c.Authorizer)
if err != nil { if err != nil {
return err return err
} }
@ -86,7 +87,7 @@ func (a *AdmissionOptions) ApplyTo(serverCfg *server.Config, pluginInitializers
return err return err
} }
serverCfg.AdmissionControl = admissionChain c.AdmissionControl = admissionChain
return nil return nil
} }

View File

@ -17,11 +17,16 @@ limitations under the License.
package options package options
import ( import (
"fmt"
"time"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
) )
// RecommendedOptions contains the recommended options for running an API server // RecommendedOptions contains the recommended options for running an API server
@ -55,26 +60,33 @@ func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) {
o.Features.AddFlags(fs) o.Features.AddFlags(fs)
} }
func (o *RecommendedOptions) ApplyTo(config *server.Config) error { func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
if err := o.Etcd.ApplyTo(config); err != nil { if err := o.Etcd.ApplyTo(&config.Config); err != nil {
return err return err
} }
if err := o.SecureServing.ApplyTo(config); err != nil { if err := o.SecureServing.ApplyTo(&config.Config); err != nil {
return err return err
} }
if err := o.Authentication.ApplyTo(config); err != nil { if err := o.Authentication.ApplyTo(&config.Config); err != nil {
return err return err
} }
if err := o.Authorization.ApplyTo(config); err != nil { if err := o.Authorization.ApplyTo(&config.Config); err != nil {
return err return err
} }
if err := o.Audit.ApplyTo(config); err != nil { if err := o.Audit.ApplyTo(&config.Config); err != nil {
return err return err
} }
if err := o.Features.ApplyTo(config); err != nil { if err := o.Features.ApplyTo(&config.Config); err != nil {
return err return err
} }
// do convenience work for RecommendedOptions users
clientgoExternalClient, err := clientgoclientset.NewForConfig(config.LoopbackClientConfig)
if err != nil {
return fmt.Errorf("failed to create real external clientset: %v", err)
}
config.SharedInformerFactory = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
return nil return nil
} }

View File

@ -24,7 +24,6 @@ import (
"net" "net"
"path" "path"
"strconv" "strconv"
"time"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/pborman/uuid" "github.com/pborman/uuid"
@ -33,9 +32,6 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server"
utilflag "k8s.io/apiserver/pkg/util/flag" utilflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
certutil "k8s.io/client-go/util/cert" certutil "k8s.io/client-go/util/cert"
) )
@ -175,26 +171,6 @@ func (s *SecureServingOptions) ApplyTo(c *server.Config) error {
c.SecureServingInfo.SNICerts[server.LoopbackClientServerNameOverride] = &tlsCert c.SecureServingInfo.SNICerts[server.LoopbackClientServerNameOverride] = &tlsCert
} }
// create shared informers, if not explicitly set use in cluster config.
// do not fail on an error, this allows an external API server to startup
// outside of a kube cluster.
var clientCfg *rest.Config
err = nil
if s.useLoopbackCfg {
clientCfg = c.LoopbackClientConfig
} else {
clientCfg, err = rest.InClusterConfig()
}
if err != nil {
glog.Errorf("Couldn't create in cluster config due to %v. SharedInformerFactory will not be set.", err)
return nil
}
clientset, err := kubernetes.NewForConfig(clientCfg)
if err != nil {
glog.Errorf("Couldn't create clientset due to %v. SharedInformerFactory will not be set.", err)
return nil
}
c.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, 10*time.Minute)
return nil return nil
} }

View File

@ -487,7 +487,7 @@ NextTest:
return return
} }
s, err := config.Complete().New("test", server.EmptyDelegate) s, err := config.Complete(nil).New("test", server.EmptyDelegate)
if err != nil { if err != nil {
t.Errorf("%q - failed creating the server: %v", title, err) t.Errorf("%q - failed creating the server: %v", title, err)
return return