This PR implements AdmissionOptions.ApplyTo
ApplyTo adds the admission chain to the server configuration the method lazily initializes a generic plugin that is appended to the list of pluginInitializers. apiserver.Config will hold an instance of SharedInformerFactory to ensure we only have once instance. The field will be initialized in apisever.SecureServingOptions Kubernetes-commit: 8cea69aa9812d6627ebdfa4f8b9c1d7624a8f3f5
This commit is contained in:
parent
53a5f2e3f3
commit
ceeef3670c
|
@ -54,6 +54,7 @@ import (
|
|||
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/apiserver/pkg/server/routes"
|
||||
"k8s.io/client-go/informers"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
certutil "k8s.io/client-go/util/cert"
|
||||
|
||||
|
@ -109,6 +110,8 @@ type Config struct {
|
|||
// Will default to a value based on secure serving info and available ipv4 IPs.
|
||||
ExternalAddress string
|
||||
|
||||
// SharedInformerFactory provides shared informers for resources
|
||||
SharedInformerFactory informers.SharedInformerFactory
|
||||
//===========================================================================
|
||||
// Fields you probably don't care about changing
|
||||
//===========================================================================
|
||||
|
@ -405,6 +408,16 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ
|
|||
s.postStartHooks[k] = v
|
||||
}
|
||||
|
||||
genericApiServerHookName := "generic-apiserver-start-informers"
|
||||
if c.SharedInformerFactory != nil && !s.isHookRegistered(genericApiServerHookName) {
|
||||
err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
|
||||
c.SharedInformerFactory.Start(context.StopCh)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for _, delegateCheck := range delegationTarget.HealthzChecks() {
|
||||
skip := false
|
||||
for _, existingCheck := range c.HealthzChecks {
|
||||
|
|
|
@ -28,6 +28,8 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
|
@ -38,6 +40,11 @@ func TestNewWithDelegate(t *testing.T) {
|
|||
delegateConfig.LegacyAPIGroupPrefixes = sets.NewString("/api")
|
||||
delegateConfig.LoopbackClientConfig = &rest.Config{}
|
||||
delegateConfig.SwaggerConfig = DefaultSwaggerConfig()
|
||||
clientset := fake.NewSimpleClientset()
|
||||
if clientset == nil {
|
||||
t.Fatal("unable to create fake client set")
|
||||
}
|
||||
delegateConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, delegateConfig.LoopbackClientConfig.Timeout)
|
||||
|
||||
delegateHealthzCalled := false
|
||||
delegateConfig.HealthzChecks = append(delegateConfig.HealthzChecks, healthz.NamedCheck("delegate-health", func(r *http.Request) error {
|
||||
|
@ -66,6 +73,7 @@ func TestNewWithDelegate(t *testing.T) {
|
|||
wrappingConfig.LegacyAPIGroupPrefixes = sets.NewString("/api")
|
||||
wrappingConfig.LoopbackClientConfig = &rest.Config{}
|
||||
wrappingConfig.SwaggerConfig = DefaultSwaggerConfig()
|
||||
wrappingConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, wrappingConfig.LoopbackClientConfig.Timeout)
|
||||
|
||||
wrappingHealthzCalled := false
|
||||
wrappingConfig.HealthzChecks = append(wrappingConfig.HealthzChecks, healthz.NamedCheck("wrapping-health", func(r *http.Request) error {
|
||||
|
@ -102,6 +110,7 @@ func TestNewWithDelegate(t *testing.T) {
|
|||
"/healthz/delegate-health",
|
||||
"/healthz/ping",
|
||||
"/healthz/poststarthook/delegate-post-start-hook",
|
||||
"/healthz/poststarthook/generic-apiserver-start-informers",
|
||||
"/healthz/poststarthook/wrapping-post-start-hook",
|
||||
"/healthz/wrapping-health",
|
||||
"/swaggerapi/"
|
||||
|
@ -110,6 +119,7 @@ func TestNewWithDelegate(t *testing.T) {
|
|||
checkPath(server.URL+"/healthz", http.StatusInternalServerError, `[+]ping ok
|
||||
[-]wrapping-health failed: reason withheld
|
||||
[-]delegate-health failed: reason withheld
|
||||
[+]poststarthook/generic-apiserver-start-informers ok
|
||||
[+]poststarthook/delegate-post-start-hook ok
|
||||
[+]poststarthook/wrapping-post-start-hook ok
|
||||
healthz check failed
|
||||
|
|
|
@ -48,6 +48,8 @@ import (
|
|||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
|
@ -90,6 +92,12 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
|
|||
config.LegacyAPIGroupPrefixes = sets.NewString("/api")
|
||||
config.LoopbackClientConfig = &restclient.Config{}
|
||||
|
||||
clientset := fake.NewSimpleClientset()
|
||||
if clientset == nil {
|
||||
t.Fatal("unable to create fake client set")
|
||||
}
|
||||
config.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, config.LoopbackClientConfig.Timeout)
|
||||
|
||||
// TODO restore this test, but right now, eliminate our cycle
|
||||
// config.OpenAPIConfig = DefaultOpenAPIConfig(testGetOpenAPIDefinitions, runtime.NewScheme())
|
||||
// config.OpenAPIConfig.Info = &spec.Info{
|
||||
|
@ -300,17 +308,13 @@ func TestPrepareRun(t *testing.T) {
|
|||
defer etcdserver.Terminate(t)
|
||||
|
||||
assert.NotNil(config.SwaggerConfig)
|
||||
// assert.NotNil(config.OpenAPIConfig)
|
||||
|
||||
server := httptest.NewServer(s.Handler.GoRestfulContainer.ServeMux)
|
||||
defer server.Close()
|
||||
done := make(chan struct{})
|
||||
|
||||
s.PrepareRun()
|
||||
|
||||
// openapi is installed in PrepareRun
|
||||
// resp, err := http.Get(server.URL + "/swagger.json")
|
||||
// assert.NoError(err)
|
||||
// assert.Equal(http.StatusOK, resp.StatusCode)
|
||||
s.RunPostStartHooks(done)
|
||||
|
||||
// swagger is installed in PrepareRun
|
||||
resp, err := http.Get(server.URL + "/swaggerapi/")
|
||||
|
|
|
@ -106,6 +106,14 @@ func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) {
|
|||
}
|
||||
}
|
||||
|
||||
// isHookRegistered checks whether a given hook is registered
|
||||
func (s *GenericAPIServer) isHookRegistered(name string) bool {
|
||||
s.postStartHookLock.Lock()
|
||||
defer s.postStartHookLock.Unlock()
|
||||
_, exists := s.postStartHooks[name]
|
||||
return exists
|
||||
}
|
||||
|
||||
func runPostStartHook(name string, entry postStartHookEntry, context PostStartHookContext) {
|
||||
var err error
|
||||
func() {
|
||||
|
@ -117,7 +125,6 @@ func runPostStartHook(name string, entry postStartHookEntry, context PostStartHo
|
|||
if err != nil {
|
||||
glog.Fatalf("PostStartHook %q failed: %v", name, err)
|
||||
}
|
||||
|
||||
close(entry.done)
|
||||
}
|
||||
|
||||
|
|
|
@ -17,33 +17,70 @@ limitations under the License.
|
|||
package options
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/admission/initializer"
|
||||
"k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
// AdmissionOptions holds the admission options
|
||||
type AdmissionOptions struct {
|
||||
Control string
|
||||
ControlConfigFile string
|
||||
Plugins *admission.Plugins
|
||||
PluginNames []string
|
||||
ConfigFile string
|
||||
Plugins *admission.Plugins
|
||||
}
|
||||
|
||||
// NewAdmissionOptions creates a new instance of AdmissionOptions
|
||||
func NewAdmissionOptions(plugins *admission.Plugins) *AdmissionOptions {
|
||||
return &AdmissionOptions{
|
||||
Plugins: plugins,
|
||||
Control: "AlwaysAdmit",
|
||||
Plugins: plugins,
|
||||
PluginNames: []string{},
|
||||
}
|
||||
}
|
||||
|
||||
// AddFlags adds flags related to admission for a specific APIServer to the specified FlagSet
|
||||
func (a *AdmissionOptions) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.StringVar(&a.Control, "admission-control", a.Control, ""+
|
||||
fs.StringSliceVar(&a.PluginNames, "admission-control", a.PluginNames, ""+
|
||||
"Ordered list of plug-ins to do admission control of resources into cluster. "+
|
||||
"Comma-delimited list of: "+strings.Join(a.Plugins.Registered(), ", ")+".")
|
||||
|
||||
fs.StringVar(&a.ControlConfigFile, "admission-control-config-file", a.ControlConfigFile,
|
||||
fs.StringVar(&a.ConfigFile, "admission-control-config-file", a.ConfigFile,
|
||||
"File with admission control configuration.")
|
||||
}
|
||||
|
||||
// ApplyTo adds the admission chain to the server configuration
|
||||
// the method lazily initializes a generic plugin that is appended to the list of pluginInitializers
|
||||
// note this method uses:
|
||||
// genericconfig.LoopbackClientConfig
|
||||
// genericconfig.SharedInformerFactory
|
||||
// genericconfig.Authorizer
|
||||
func (a *AdmissionOptions) ApplyTo(serverCfg *server.Config, pluginInitializers ...admission.PluginInitializer) error {
|
||||
pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(a.PluginNames, a.ConfigFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read plugin config: %v", err)
|
||||
}
|
||||
|
||||
clientset, err := kubernetes.NewForConfig(serverCfg.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
genericInitializer, err := initializer.New(clientset, serverCfg.SharedInformerFactory, serverCfg.Authorizer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
initializersChain := admission.PluginInitializers{}
|
||||
pluginInitializers = append(pluginInitializers, genericInitializer)
|
||||
initializersChain = append(initializersChain, pluginInitializers...)
|
||||
|
||||
admissionChain, err := a.Plugins.NewFromPlugins(a.PluginNames, pluginsConfigProvider, initializersChain)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
serverCfg.AdmissionControl = admissionChain
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@ import (
|
|||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apiserver/pkg/server"
|
||||
utilflag "k8s.io/apiserver/pkg/util/flag"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
certutil "k8s.io/client-go/util/cert"
|
||||
)
|
||||
|
||||
|
@ -167,6 +169,13 @@ func (s *SecureServingOptions) ApplyTo(c *server.Config) error {
|
|||
c.SecureServingInfo.SNICerts[server.LoopbackClientServerNameOverride] = &tlsCert
|
||||
}
|
||||
|
||||
// create shared informers
|
||||
clientset, err := kubernetes.NewForConfig(c.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, c.LoopbackClientConfig.Timeout)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue