diff --git a/injection/health_check.go b/injection/health_check.go new file mode 100644 index 000000000..2899c7e35 --- /dev/null +++ b/injection/health_check.go @@ -0,0 +1,109 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package injection + +import ( + "context" + "errors" + "net/http" + "strconv" + "time" + + "knative.dev/pkg/logging" +) + +// HealthCheckDefaultPort defines the default port number for health probes +const HealthCheckDefaultPort = 8080 + +// ServeHealthProbes sets up liveness and readiness probes. +// If user sets no probes explicitly via the context then defaults are added. +func ServeHealthProbes(ctx context.Context, port int) error { + logger := logging.FromContext(ctx) + server := http.Server{ReadHeaderTimeout: time.Minute, Handler: muxWithHandles(ctx), Addr: ":" + strconv.Itoa(port)} + go func() { + <-ctx.Done() + _ = server.Shutdown(ctx) + }() + + // start the web server on port and accept requests + logger.Infof("Probes server listening on port %s", port) + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil +} + +func muxWithHandles(ctx context.Context) *http.ServeMux { + mux := http.NewServeMux() + readiness := getReadinessHandleOrDefault(ctx) + liveness := getLivenessHandleOrDefault(ctx) + mux.HandleFunc("/readiness", *readiness) + mux.HandleFunc("/health", *liveness) + return mux +} + +func newDefaultProbesHandle(sigCtx context.Context) http.HandlerFunc { + logger := logging.FromContext(sigCtx) + return func(w http.ResponseWriter, r *http.Request) { + f := func() error { + select { + // When we get SIGTERM (sigCtx done), let readiness probes start failing. + case <-sigCtx.Done(): + logger.Info("Signal context canceled") + return errors.New("received SIGTERM from kubelet") + default: + return nil + } + } + if err := f(); err != nil { + logger.Errorf("Healthcheck failed: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + } +} + +type addReadinessKey struct{} + +// AddReadiness signals to probe setup logic to add a user provided probe handler +func AddReadiness(ctx context.Context, handlerFunc http.HandlerFunc) context.Context { + return context.WithValue(ctx, addReadinessKey{}, &handlerFunc) +} + +func getReadinessHandleOrDefault(ctx context.Context) *http.HandlerFunc { + if ctx.Value(addReadinessKey{}) != nil { + return ctx.Value(addReadinessKey{}).(*http.HandlerFunc) + } + defaultHandle := newDefaultProbesHandle(ctx) + return &defaultHandle +} + +type addLivenessKey struct{} + +// AddLiveness signals to probe setup logic to add a user provided probe handler +func AddLiveness(ctx context.Context, handlerFunc http.HandlerFunc) context.Context { + return context.WithValue(ctx, addLivenessKey{}, &handlerFunc) +} + +func getLivenessHandleOrDefault(ctx context.Context) *http.HandlerFunc { + if ctx.Value(addLivenessKey{}) != nil { + return ctx.Value(addLivenessKey{}).(*http.HandlerFunc) + } + defaultHandle := newDefaultProbesHandle(ctx) + return &defaultHandle +} diff --git a/injection/health_check_test.go b/injection/health_check_test.go new file mode 100644 index 000000000..745ef39af --- /dev/null +++ b/injection/health_check_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package injection + +import ( + "context" + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + +func TestHealthCheckHandler(t *testing.T) { + tests := []struct { + name string + ctx context.Context + expectedReadiness int + expectedLiveness int + }{{ + name: "user provided no handlers, default health check handlers are used", + ctx: context.Background(), + expectedReadiness: http.StatusOK, + expectedLiveness: http.StatusOK, + }, { + name: "user provided custom readiness health check handler, liveness default handler is used", + ctx: AddReadiness(context.Background(), testHandler()), + expectedReadiness: http.StatusBadGateway, + expectedLiveness: http.StatusOK, + }, { + name: "user provided custom liveness health check handler, readiness default handler is used", + ctx: AddLiveness(context.Background(), testHandler()), + expectedReadiness: http.StatusOK, + expectedLiveness: http.StatusBadGateway, + }, { + name: "user provided custom health check handlers", + ctx: AddReadiness(AddLiveness(context.Background(), testHandler()), testHandler()), + expectedReadiness: http.StatusBadGateway, + expectedLiveness: http.StatusBadGateway, + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mux := muxWithHandles(tc.ctx) + reqReadiness := http.Request{ + URL: &url.URL{ + Path: "/readiness", + }, + } + resp := httptest.NewRecorder() + mux.ServeHTTP(resp, &reqReadiness) + if got, want := resp.Code, tc.expectedReadiness; got != want { + t.Errorf("Probe status = %d, wanted %d", got, want) + } + reqLiveness := http.Request{ + URL: &url.URL{ + Path: "/health", + }, + } + resp = httptest.NewRecorder() + mux.ServeHTTP(resp, &reqLiveness) + if got, want := resp.Code, tc.expectedLiveness; got != want { + t.Errorf("Probe status = %d, wanted %d", got, want) + } + }) + } +} + +func testHandler() http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + http.Error(w, "test", http.StatusBadGateway) + } +} diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 8af9eb5d1..25562e965 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -313,6 +313,13 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto return controller.StartAll(ctx, controllers...) }) + // Setup default health checks to catch issues with cache sync etc. + if !healthProbesDisabled(ctx) { + eg.Go(func() error { + return injection.ServeHealthProbes(ctx, injection.HealthCheckDefaultPort) + }) + } + // This will block until either a signal arrives or one of the grouped functions // returns an error. <-egCtx.Done() @@ -324,6 +331,17 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto } } +type healthProbesDisabledKey struct{} + +// WithHealthProbesDisabled signals to MainWithContext that it should disable default probes (readiness and liveness). +func WithHealthProbesDisabled(ctx context.Context) context.Context { + return context.WithValue(ctx, healthProbesDisabledKey{}, struct{}{}) +} + +func healthProbesDisabled(ctx context.Context) bool { + return ctx.Value(healthProbesDisabledKey{}) != nil +} + func flush(logger *zap.SugaredLogger) { logger.Sync() metrics.FlushExporter()