From af86802d1a3371f0cf4d99769d156775731af3ad Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Wed, 9 Feb 2022 16:40:57 -0500 Subject: [PATCH] apiserver: stop http server after pre shutdown hooks Kubernetes-commit: 8d96cc2dfbd650544660e41c5e8efc8d1e1033a3 --- pkg/server/genericapiserver.go | 21 ++++-- ...ericapiserver_graceful_termination_test.go | 64 +++++++++++++++++++ 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/pkg/server/genericapiserver.go b/pkg/server/genericapiserver.go index d736465bd..7e410b8e5 100644 --- a/pkg/server/genericapiserver.go +++ b/pkg/server/genericapiserver.go @@ -423,7 +423,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { // close socket after delayed stopCh drainedCh := s.lifecycleSignals.InFlightRequestsDrained - stopHttpServerCh := delayedStopCh.Signaled() + delayedStopOrDrainedCh := delayedStopCh.Signaled() shutdownTimeout := s.ShutdownTimeout if s.ShutdownSendRetryAfter { // when this mode is enabled, we do the following: @@ -432,11 +432,20 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { // - once drained, http Server Shutdown is invoked with a timeout of 2s, // net/http waits for 1s for the peer to respond to a GO_AWAY frame, so // we should wait for a minimum of 2s - stopHttpServerCh = drainedCh.Signaled() + delayedStopOrDrainedCh = drainedCh.Signaled() shutdownTimeout = 2 * time.Second klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout) } + // pre-shutdown hooks need to finish before we stop the http server + preShutdownHooksHasStoppedCh, stopHttpServerCh := make(chan struct{}), make(chan struct{}) + go func() { + defer close(stopHttpServerCh) + + <-delayedStopOrDrainedCh + <-preShutdownHooksHasStoppedCh + }() + stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout) if err != nil { return err @@ -462,8 +471,12 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated") <-stopCh - // run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver. - err = s.RunPreShutdownHooks() + // run shutdown hooks directly. This includes deregistering from + // the kubernetes endpoint in case of kube-apiserver. + func() { + defer close(preShutdownHooksHasStoppedCh) + err = s.RunPreShutdownHooks() + }() if err != nil { return err } diff --git a/pkg/server/genericapiserver_graceful_termination_test.go b/pkg/server/genericapiserver_graceful_termination_test.go index 6c84271f0..446721bf4 100644 --- a/pkg/server/genericapiserver_graceful_termination_test.go +++ b/pkg/server/genericapiserver_graceful_termination_test.go @@ -26,6 +26,7 @@ import ( "net" "net/http" "net/http/httptrace" + "os" "reflect" "sync" "testing" @@ -35,11 +36,17 @@ import ( genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" "k8s.io/apiserver/pkg/server/dynamiccertificates" genericfilters "k8s.io/apiserver/pkg/server/filters" + "k8s.io/klog/v2" "github.com/google/go-cmp/cmp" "golang.org/x/net/http2" ) +func TestMain(m *testing.M) { + klog.InitFlags(nil) + os.Exit(m.Run()) +} + // doer sends a request to the server type doer func(client *http.Client, gci func(httptrace.GotConnInfo), path string, timeout time.Duration) result @@ -420,6 +427,63 @@ func TestMuxAndDiscoveryComplete(t *testing.T) { t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) } } + +func TestPreShutdownHooks(t *testing.T) { + s := newGenericAPIServer(t, true) + doer := setupDoer(t, s.SecureServingInfo) + + preShutdownHookErrCh := make(chan error) + err := s.AddPreShutdownHook("test-backend", func() error { + // this pre-shutdown hook waits for the requests in flight to drain + // and then send a series of requests to the apiserver, and + // we expect these series of requests to be completed successfully + <-s.lifecycleSignals.InFlightRequestsDrained.Signaled() + + // we send 5 requests, once every second + var r result + client := newClient(true) + for i := 0; i < 5; i++ { + r = doer.Do(client, func(httptrace.GotConnInfo) {}, fmt.Sprintf("/echo?message=attempt-%d", i), 100*time.Millisecond) + if r.err != nil { + break + } + time.Sleep(time.Second) + } + preShutdownHookErrCh <- r.err + return nil + }) + if err != nil { + t.Fatalf("Failed to add pre-shutdown hook - %v", err) + } + + // start the API server + stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + go func() { + defer func() { + // this test has an inherent race condition when we wait for two go routines + // to finish - the Run method and the pre-shutdown hook, each running in + // its own goroutine, give it a second before unblocking the test assert + <-time.After(time.Second) + close(runCompletedCh) + }() + s.PrepareRun().Run(stopCh) + }() + waitForAPIServerStarted(t, doer) + + close(stopCh) + + select { + case err := <-preShutdownHookErrCh: + if err != nil { + t.Fatalf("PreSHutdown hook can not access the API server - %v", err) + } + case <-runCompletedCh: + t.Fatalf("API Server exited without running the PreShutdown hooks") + case <-time.After(15 * time.Second): + t.Fatalf("test timed out after 15 seconds") + } +} + func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) { return func(ci httptrace.GotConnInfo) { if !ci.Reused {