mirror of https://github.com/knative/pkg.git
Start the webhook before informers sync. (#1180)
* Start the webhook before informers sync. Some webhooks (e.g. conversion) are required to list resources, so by delaying those until after informers have synced, we create a deadlock when they run in the same process. This change has two key parts: 1. Start the webhook immediately when our process starts, and issue a callback from sharedmain when the informers have synced. 2. Block `Admit` calls until informers have synced (all conversions are exempt), unless they have been designated by implementing `webhook.StatelessAdmissionController`. Our built-in admission controllers (defaulting, validation, configmap validation) have all been marked as stateless, the main case where we want to block `Admit` calls is when we require the informer to have synchronized to populate indices for Bindings. * Add missing err declaration
This commit is contained in:
parent
f40c61abc6
commit
e2ee5bed78
|
@ -232,6 +232,26 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf
|
|||
WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component)
|
||||
WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component)
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
eg.Go(profilingServer.ListenAndServe)
|
||||
|
||||
// If we have one or more admission controllers, then start the webhook
|
||||
// and pass them in.
|
||||
var wh *webhook.Webhook
|
||||
var err error
|
||||
if len(webhooks) > 0 {
|
||||
// Register webhook metrics
|
||||
webhook.RegisterMetrics()
|
||||
|
||||
wh, err = webhook.New(ctx, webhooks)
|
||||
if err != nil {
|
||||
logger.Fatalw("Failed to create webhook", zap.Error(err))
|
||||
}
|
||||
eg.Go(func() error {
|
||||
return wh.Run(ctx.Done())
|
||||
})
|
||||
}
|
||||
|
||||
logger.Info("Starting configuration manager...")
|
||||
if err := cmw.Start(ctx.Done()); err != nil {
|
||||
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
|
||||
|
@ -240,27 +260,12 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf
|
|||
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
|
||||
logger.Fatalw("Failed to start informers", zap.Error(err))
|
||||
}
|
||||
if wh != nil {
|
||||
wh.InformersHaveSynced()
|
||||
}
|
||||
logger.Info("Starting controllers...")
|
||||
go controller.StartAll(ctx.Done(), controllers...)
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
eg.Go(profilingServer.ListenAndServe)
|
||||
|
||||
// If we have one or more admission controllers, then start the webhook
|
||||
// and pass them in.
|
||||
if len(webhooks) > 0 {
|
||||
// Register webhook metrics
|
||||
webhook.RegisterMetrics()
|
||||
|
||||
wh, err := webhook.New(ctx, webhooks)
|
||||
if err != nil {
|
||||
logger.Fatalw("Failed to create webhook", zap.Error(err))
|
||||
}
|
||||
eg.Go(func() error {
|
||||
return wh.Run(ctx.Done())
|
||||
})
|
||||
}
|
||||
|
||||
// This will block until either a signal arrives or one of the grouped functions
|
||||
// returns an error.
|
||||
<-egCtx.Done()
|
||||
|
|
|
@ -39,6 +39,14 @@ type AdmissionController interface {
|
|||
Admit(context.Context, *admissionv1beta1.AdmissionRequest) *admissionv1beta1.AdmissionResponse
|
||||
}
|
||||
|
||||
// StatelessAdmissionController is implemented by AdmissionControllers where Admit may be safely
|
||||
// called before informers have finished syncing. This is implemented by inlining
|
||||
// StatelessAdmissionImpl in your Go type.
|
||||
type StatelessAdmissionController interface {
|
||||
// A silly name that should avoid collisions.
|
||||
ThisTypeDoesNotDependOnInformerState()
|
||||
}
|
||||
|
||||
// MakeErrorStatus creates an 'BadRequest' error AdmissionResponse
|
||||
func MakeErrorStatus(reason string, args ...interface{}) *admissionv1beta1.AdmissionResponse {
|
||||
result := apierrors.NewBadRequest(fmt.Sprintf(reason, args...)).Status()
|
||||
|
@ -48,8 +56,17 @@ func MakeErrorStatus(reason string, args ...interface{}) *admissionv1beta1.Admis
|
|||
}
|
||||
}
|
||||
|
||||
func admissionHandler(rootLogger *zap.SugaredLogger, stats StatsReporter, c AdmissionController) http.HandlerFunc {
|
||||
func admissionHandler(rootLogger *zap.SugaredLogger, stats StatsReporter, c AdmissionController, synced <-chan struct{}) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if _, ok := c.(StatelessAdmissionController); ok {
|
||||
// Stateless admission controllers do not require Informers to have
|
||||
// finished syncing before Admit is called.
|
||||
} else {
|
||||
// Don't allow admission control requests through until we have been
|
||||
// notified that informers have been synchronized.
|
||||
<-synced
|
||||
}
|
||||
|
||||
var ttStart = time.Now()
|
||||
logger := rootLogger
|
||||
logger.Infof("Webhook ServeHTTP request=%#v", r)
|
||||
|
@ -92,3 +109,8 @@ func admissionHandler(rootLogger *zap.SugaredLogger, stats StatsReporter, c Admi
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Inline this type to implement StatelessAdmissionController.
|
||||
type StatelessAdmissionImpl struct{}
|
||||
|
||||
func (sai StatelessAdmissionImpl) ThisTypeDoesNotDependOnInformerState() {}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"path"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
jsonpatch "gomodules.xyz/jsonpatch/v2"
|
||||
|
@ -126,6 +127,9 @@ func TestAdmissionValidResponseForResource(t *testing.T) {
|
|||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
response, err := tlsClient.Do(req)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get response %v", err)
|
||||
|
@ -147,6 +151,24 @@ func TestAdmissionValidResponseForResource(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Failed to decode response: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Check that Admit calls block when they are initiated before informers sync.
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-doneCh:
|
||||
t.Fatal("Admit was called before informers had synced.")
|
||||
}
|
||||
|
||||
// Signal the webhook that informers have synced.
|
||||
wh.InformersHaveSynced()
|
||||
|
||||
// Check that after informers have synced that things start completing immediately (including outstanding requests).
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Error("Timed out waiting on Admit to complete after informers synced.")
|
||||
}
|
||||
|
||||
metricstest.CheckStatsReported(t, requestCountName, requestLatenciesName)
|
||||
}
|
||||
|
@ -164,6 +186,7 @@ func TestAdmissionInvalidResponseForResource(t *testing.T) {
|
|||
|
||||
eg, _ := errgroup.WithContext(ctx)
|
||||
eg.Go(func() error { return wh.Run(ctx.Done()) })
|
||||
wh.InformersHaveSynced()
|
||||
defer func() {
|
||||
cancel()
|
||||
if err := eg.Wait(); err != nil {
|
||||
|
|
|
@ -43,6 +43,8 @@ import (
|
|||
|
||||
// reconciler implements the AdmissionController for ConfigMaps
|
||||
type reconciler struct {
|
||||
webhook.StatelessAdmissionImpl
|
||||
|
||||
name string
|
||||
path string
|
||||
constructors map[string]reflect.Value
|
||||
|
@ -56,6 +58,7 @@ type reconciler struct {
|
|||
|
||||
var _ controller.Reconciler = (*reconciler)(nil)
|
||||
var _ webhook.AdmissionController = (*reconciler)(nil)
|
||||
var _ webhook.StatelessAdmissionController = (*reconciler)(nil)
|
||||
|
||||
// Reconcile implements controller.Reconciler
|
||||
func (ac *reconciler) Reconcile(ctx context.Context, key string) error {
|
||||
|
|
|
@ -50,6 +50,8 @@ var errMissingNewObject = errors.New("the new object may not be nil")
|
|||
|
||||
// reconciler implements the AdmissionController for resources
|
||||
type reconciler struct {
|
||||
webhook.StatelessAdmissionImpl
|
||||
|
||||
name string
|
||||
path string
|
||||
handlers map[schema.GroupVersionKind]resourcesemantics.GenericCRD
|
||||
|
@ -66,6 +68,7 @@ type reconciler struct {
|
|||
|
||||
var _ controller.Reconciler = (*reconciler)(nil)
|
||||
var _ webhook.AdmissionController = (*reconciler)(nil)
|
||||
var _ webhook.StatelessAdmissionController = (*reconciler)(nil)
|
||||
|
||||
// Reconcile implements controller.Reconciler
|
||||
func (ac *reconciler) Reconcile(ctx context.Context, key string) error {
|
||||
|
|
|
@ -48,6 +48,8 @@ var errMissingNewObject = errors.New("the new object may not be nil")
|
|||
|
||||
// reconciler implements the AdmissionController for resources
|
||||
type reconciler struct {
|
||||
webhook.StatelessAdmissionImpl
|
||||
|
||||
name string
|
||||
path string
|
||||
handlers map[schema.GroupVersionKind]resourcesemantics.GenericCRD
|
||||
|
@ -64,6 +66,7 @@ type reconciler struct {
|
|||
|
||||
var _ controller.Reconciler = (*reconciler)(nil)
|
||||
var _ webhook.AdmissionController = (*reconciler)(nil)
|
||||
var _ webhook.StatelessAdmissionController = (*reconciler)(nil)
|
||||
|
||||
// Reconcile implements controller.Reconciler
|
||||
func (ac *reconciler) Reconcile(ctx context.Context, key string) error {
|
||||
|
|
|
@ -65,6 +65,9 @@ type Webhook struct {
|
|||
Options Options
|
||||
Logger *zap.SugaredLogger
|
||||
|
||||
// synced is function that is called when the informers have been synced.
|
||||
synced context.CancelFunc
|
||||
|
||||
mux http.ServeMux
|
||||
secretlister corelisters.SecretLister
|
||||
}
|
||||
|
@ -105,11 +108,14 @@ func New(
|
|||
opts.StatsReporter = reporter
|
||||
}
|
||||
|
||||
syncCtx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
webhook = &Webhook{
|
||||
Client: client,
|
||||
Options: *opts,
|
||||
secretlister: secretInformer.Lister(),
|
||||
Logger: logger,
|
||||
synced: cancel,
|
||||
}
|
||||
|
||||
webhook.mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -117,26 +123,31 @@ func New(
|
|||
})
|
||||
|
||||
for _, controller := range controllers {
|
||||
var handler http.Handler
|
||||
var path string
|
||||
|
||||
switch c := controller.(type) {
|
||||
case AdmissionController:
|
||||
handler = admissionHandler(logger, opts.StatsReporter, c)
|
||||
path = c.Path()
|
||||
handler := admissionHandler(logger, opts.StatsReporter, c, syncCtx.Done())
|
||||
webhook.mux.Handle(c.Path(), handler)
|
||||
|
||||
case ConversionController:
|
||||
handler = conversionHandler(logger, opts.StatsReporter, c)
|
||||
path = c.Path()
|
||||
handler := conversionHandler(logger, opts.StatsReporter, c)
|
||||
webhook.mux.Handle(c.Path(), handler)
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown webhook controller type: %T", controller)
|
||||
}
|
||||
|
||||
webhook.mux.Handle(path, handler)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// InformersHaveSynced is called when the informers have all been synced, which allows any outstanding
|
||||
// admission webhooks through.
|
||||
func (wh *Webhook) InformersHaveSynced() {
|
||||
wh.synced()
|
||||
wh.Logger.Info("Informers have been synced, unblocking admission webhooks.")
|
||||
}
|
||||
|
||||
// Run implements the admission controller run loop.
|
||||
func (wh *Webhook) Run(stop <-chan struct{}) error {
|
||||
logger := wh.Logger
|
||||
|
|
|
@ -58,6 +58,7 @@ func TestMissingContentType(t *testing.T) {
|
|||
|
||||
eg, _ := errgroup.WithContext(ctx)
|
||||
eg.Go(func() error { return wh.Run(ctx.Done()) })
|
||||
wh.InformersHaveSynced()
|
||||
defer func() {
|
||||
cancel()
|
||||
if err := eg.Wait(); err != nil {
|
||||
|
@ -111,6 +112,7 @@ func testEmptyRequestBody(t *testing.T, controller interface{}) {
|
|||
|
||||
eg, _ := errgroup.WithContext(ctx)
|
||||
eg.Go(func() error { return wh.Run(ctx.Done()) })
|
||||
wh.InformersHaveSynced()
|
||||
defer func() {
|
||||
cancel()
|
||||
if err := eg.Wait(); err != nil {
|
||||
|
|
Loading…
Reference in New Issue