From e2ee5bed78d74604ed87dcb761a6f6e10a2e355c Mon Sep 17 00:00:00 2001 From: Matt Moore Date: Mon, 30 Mar 2020 18:14:50 -0700 Subject: [PATCH] Start the webhook before informers sync. (#1180) * Start the webhook before informers sync. Some webhooks (e.g. conversion) are required to list resources, so by delaying those until after informers have synced, we create a deadlock when they run in the same process. This change has two key parts: 1. Start the webhook immediately when our process starts, and issue a callback from sharedmain when the informers have synced. 2. Block `Admit` calls until informers have synced (all conversions are exempt), unless they have been designated by implementing `webhook.StatelessAdmissionController`. Our built-in admission controllers (defaulting, validation, configmap validation) have all been marked as stateless, the main case where we want to block `Admit` calls is when we require the informer to have synchronized to populate indices for Bindings. * Add missing err declaration --- injection/sharedmain/main.go | 41 +++++++------ webhook/admission.go | 24 +++++++- webhook/admission_integration_test.go | 57 +++++++++++++------ webhook/configmaps/configmaps.go | 3 + .../defaulting/defaulting.go | 3 + .../validation/validation.go | 3 + webhook/webhook.go | 27 ++++++--- webhook/webhook_integration_test.go | 2 + 8 files changed, 116 insertions(+), 44 deletions(-) diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 0eb16ecd1..4165acbd4 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -232,6 +232,26 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component) WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component) + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(profilingServer.ListenAndServe) + + // If we have one or more admission controllers, then start the webhook + // and pass them in. + var wh *webhook.Webhook + var err error + if len(webhooks) > 0 { + // Register webhook metrics + webhook.RegisterMetrics() + + wh, err = webhook.New(ctx, webhooks) + if err != nil { + logger.Fatalw("Failed to create webhook", zap.Error(err)) + } + eg.Go(func() error { + return wh.Run(ctx.Done()) + }) + } + logger.Info("Starting configuration manager...") if err := cmw.Start(ctx.Done()); err != nil { logger.Fatalw("Failed to start configuration manager", zap.Error(err)) @@ -240,27 +260,12 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf if err := controller.StartInformers(ctx.Done(), informers...); err != nil { logger.Fatalw("Failed to start informers", zap.Error(err)) } + if wh != nil { + wh.InformersHaveSynced() + } logger.Info("Starting controllers...") go controller.StartAll(ctx.Done(), controllers...) - eg, egCtx := errgroup.WithContext(ctx) - eg.Go(profilingServer.ListenAndServe) - - // If we have one or more admission controllers, then start the webhook - // and pass them in. - if len(webhooks) > 0 { - // Register webhook metrics - webhook.RegisterMetrics() - - wh, err := webhook.New(ctx, webhooks) - if err != nil { - logger.Fatalw("Failed to create webhook", zap.Error(err)) - } - eg.Go(func() error { - return wh.Run(ctx.Done()) - }) - } - // This will block until either a signal arrives or one of the grouped functions // returns an error. <-egCtx.Done() diff --git a/webhook/admission.go b/webhook/admission.go index 4bcd64838..1e50b5885 100644 --- a/webhook/admission.go +++ b/webhook/admission.go @@ -39,6 +39,14 @@ type AdmissionController interface { Admit(context.Context, *admissionv1beta1.AdmissionRequest) *admissionv1beta1.AdmissionResponse } +// StatelessAdmissionController is implemented by AdmissionControllers where Admit may be safely +// called before informers have finished syncing. This is implemented by inlining +// StatelessAdmissionImpl in your Go type. +type StatelessAdmissionController interface { + // A silly name that should avoid collisions. + ThisTypeDoesNotDependOnInformerState() +} + // MakeErrorStatus creates an 'BadRequest' error AdmissionResponse func MakeErrorStatus(reason string, args ...interface{}) *admissionv1beta1.AdmissionResponse { result := apierrors.NewBadRequest(fmt.Sprintf(reason, args...)).Status() @@ -48,8 +56,17 @@ func MakeErrorStatus(reason string, args ...interface{}) *admissionv1beta1.Admis } } -func admissionHandler(rootLogger *zap.SugaredLogger, stats StatsReporter, c AdmissionController) http.HandlerFunc { +func admissionHandler(rootLogger *zap.SugaredLogger, stats StatsReporter, c AdmissionController, synced <-chan struct{}) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + if _, ok := c.(StatelessAdmissionController); ok { + // Stateless admission controllers do not require Informers to have + // finished syncing before Admit is called. + } else { + // Don't allow admission control requests through until we have been + // notified that informers have been synchronized. + <-synced + } + var ttStart = time.Now() logger := rootLogger logger.Infof("Webhook ServeHTTP request=%#v", r) @@ -92,3 +109,8 @@ func admissionHandler(rootLogger *zap.SugaredLogger, stats StatsReporter, c Admi } } } + +// Inline this type to implement StatelessAdmissionController. +type StatelessAdmissionImpl struct{} + +func (sai StatelessAdmissionImpl) ThisTypeDoesNotDependOnInformerState() {} diff --git a/webhook/admission_integration_test.go b/webhook/admission_integration_test.go index e476500be..f86349b0a 100644 --- a/webhook/admission_integration_test.go +++ b/webhook/admission_integration_test.go @@ -26,6 +26,7 @@ import ( "path" "strings" "testing" + "time" "golang.org/x/sync/errgroup" jsonpatch "gomodules.xyz/jsonpatch/v2" @@ -126,26 +127,47 @@ func TestAdmissionValidResponseForResource(t *testing.T) { } req.Header.Add("Content-Type", "application/json") - response, err := tlsClient.Do(req) - if err != nil { - t.Fatalf("Failed to get response %v", err) + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + response, err := tlsClient.Do(req) + if err != nil { + t.Fatalf("Failed to get response %v", err) + } + + if got, want := response.StatusCode, http.StatusOK; got != want { + t.Errorf("Response status code = %v, wanted %v", got, want) + } + + defer response.Body.Close() + responseBody, err := ioutil.ReadAll(response.Body) + if err != nil { + t.Fatalf("Failed to read response body %v", err) + } + + reviewResponse := admissionv1beta1.AdmissionReview{} + + err = json.NewDecoder(bytes.NewReader(responseBody)).Decode(&reviewResponse) + if err != nil { + t.Fatalf("Failed to decode response: %v", err) + } + }() + + // Check that Admit calls block when they are initiated before informers sync. + select { + case <-time.After(5 * time.Second): + case <-doneCh: + t.Fatal("Admit was called before informers had synced.") } - if got, want := response.StatusCode, http.StatusOK; got != want { - t.Errorf("Response status code = %v, wanted %v", got, want) - } + // Signal the webhook that informers have synced. + wh.InformersHaveSynced() - defer response.Body.Close() - responseBody, err := ioutil.ReadAll(response.Body) - if err != nil { - t.Fatalf("Failed to read response body %v", err) - } - - reviewResponse := admissionv1beta1.AdmissionReview{} - - err = json.NewDecoder(bytes.NewReader(responseBody)).Decode(&reviewResponse) - if err != nil { - t.Fatalf("Failed to decode response: %v", err) + // Check that after informers have synced that things start completing immediately (including outstanding requests). + select { + case <-doneCh: + case <-time.After(5 * time.Second): + t.Error("Timed out waiting on Admit to complete after informers synced.") } metricstest.CheckStatsReported(t, requestCountName, requestLatenciesName) @@ -164,6 +186,7 @@ func TestAdmissionInvalidResponseForResource(t *testing.T) { eg, _ := errgroup.WithContext(ctx) eg.Go(func() error { return wh.Run(ctx.Done()) }) + wh.InformersHaveSynced() defer func() { cancel() if err := eg.Wait(); err != nil { diff --git a/webhook/configmaps/configmaps.go b/webhook/configmaps/configmaps.go index be28c7700..f4805f8cb 100644 --- a/webhook/configmaps/configmaps.go +++ b/webhook/configmaps/configmaps.go @@ -43,6 +43,8 @@ import ( // reconciler implements the AdmissionController for ConfigMaps type reconciler struct { + webhook.StatelessAdmissionImpl + name string path string constructors map[string]reflect.Value @@ -56,6 +58,7 @@ type reconciler struct { var _ controller.Reconciler = (*reconciler)(nil) var _ webhook.AdmissionController = (*reconciler)(nil) +var _ webhook.StatelessAdmissionController = (*reconciler)(nil) // Reconcile implements controller.Reconciler func (ac *reconciler) Reconcile(ctx context.Context, key string) error { diff --git a/webhook/resourcesemantics/defaulting/defaulting.go b/webhook/resourcesemantics/defaulting/defaulting.go index 0b1c129b0..3eb7a8aca 100644 --- a/webhook/resourcesemantics/defaulting/defaulting.go +++ b/webhook/resourcesemantics/defaulting/defaulting.go @@ -50,6 +50,8 @@ var errMissingNewObject = errors.New("the new object may not be nil") // reconciler implements the AdmissionController for resources type reconciler struct { + webhook.StatelessAdmissionImpl + name string path string handlers map[schema.GroupVersionKind]resourcesemantics.GenericCRD @@ -66,6 +68,7 @@ type reconciler struct { var _ controller.Reconciler = (*reconciler)(nil) var _ webhook.AdmissionController = (*reconciler)(nil) +var _ webhook.StatelessAdmissionController = (*reconciler)(nil) // Reconcile implements controller.Reconciler func (ac *reconciler) Reconcile(ctx context.Context, key string) error { diff --git a/webhook/resourcesemantics/validation/validation.go b/webhook/resourcesemantics/validation/validation.go index f394c3968..80f7622aa 100644 --- a/webhook/resourcesemantics/validation/validation.go +++ b/webhook/resourcesemantics/validation/validation.go @@ -48,6 +48,8 @@ var errMissingNewObject = errors.New("the new object may not be nil") // reconciler implements the AdmissionController for resources type reconciler struct { + webhook.StatelessAdmissionImpl + name string path string handlers map[schema.GroupVersionKind]resourcesemantics.GenericCRD @@ -64,6 +66,7 @@ type reconciler struct { var _ controller.Reconciler = (*reconciler)(nil) var _ webhook.AdmissionController = (*reconciler)(nil) +var _ webhook.StatelessAdmissionController = (*reconciler)(nil) // Reconcile implements controller.Reconciler func (ac *reconciler) Reconcile(ctx context.Context, key string) error { diff --git a/webhook/webhook.go b/webhook/webhook.go index ec272a9c2..99cb48d09 100644 --- a/webhook/webhook.go +++ b/webhook/webhook.go @@ -65,6 +65,9 @@ type Webhook struct { Options Options Logger *zap.SugaredLogger + // synced is function that is called when the informers have been synced. + synced context.CancelFunc + mux http.ServeMux secretlister corelisters.SecretLister } @@ -105,11 +108,14 @@ func New( opts.StatsReporter = reporter } + syncCtx, cancel := context.WithCancel(context.Background()) + webhook = &Webhook{ Client: client, Options: *opts, secretlister: secretInformer.Lister(), Logger: logger, + synced: cancel, } webhook.mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -117,26 +123,31 @@ func New( }) for _, controller := range controllers { - var handler http.Handler - var path string - switch c := controller.(type) { case AdmissionController: - handler = admissionHandler(logger, opts.StatsReporter, c) - path = c.Path() + handler := admissionHandler(logger, opts.StatsReporter, c, syncCtx.Done()) + webhook.mux.Handle(c.Path(), handler) + case ConversionController: - handler = conversionHandler(logger, opts.StatsReporter, c) - path = c.Path() + handler := conversionHandler(logger, opts.StatsReporter, c) + webhook.mux.Handle(c.Path(), handler) + default: return nil, fmt.Errorf("unknown webhook controller type: %T", controller) } - webhook.mux.Handle(path, handler) } return } +// InformersHaveSynced is called when the informers have all been synced, which allows any outstanding +// admission webhooks through. +func (wh *Webhook) InformersHaveSynced() { + wh.synced() + wh.Logger.Info("Informers have been synced, unblocking admission webhooks.") +} + // Run implements the admission controller run loop. func (wh *Webhook) Run(stop <-chan struct{}) error { logger := wh.Logger diff --git a/webhook/webhook_integration_test.go b/webhook/webhook_integration_test.go index 463aed1fa..e4c7ef96f 100644 --- a/webhook/webhook_integration_test.go +++ b/webhook/webhook_integration_test.go @@ -58,6 +58,7 @@ func TestMissingContentType(t *testing.T) { eg, _ := errgroup.WithContext(ctx) eg.Go(func() error { return wh.Run(ctx.Done()) }) + wh.InformersHaveSynced() defer func() { cancel() if err := eg.Wait(); err != nil { @@ -111,6 +112,7 @@ func testEmptyRequestBody(t *testing.T, controller interface{}) { eg, _ := errgroup.WithContext(ctx) eg.Go(func() error { return wh.Run(ctx.Done()) }) + wh.InformersHaveSynced() defer func() { cancel() if err := eg.Wait(); err != nil {