apiserver: stop http server after pre shutdown hooks

Kubernetes-commit: 8d96cc2dfbd650544660e41c5e8efc8d1e1033a3
This commit is contained in:
Abu Kashem 2022-02-09 16:40:57 -05:00 committed by Kubernetes Publisher
parent 9e54246a1f
commit af86802d1a
2 changed files with 81 additions and 4 deletions

View File

@ -423,7 +423,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// close socket after delayed stopCh
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
stopHttpServerCh := delayedStopCh.Signaled()
delayedStopOrDrainedCh := delayedStopCh.Signaled()
shutdownTimeout := s.ShutdownTimeout
if s.ShutdownSendRetryAfter {
// when this mode is enabled, we do the following:
@ -432,11 +432,20 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// - once drained, http Server Shutdown is invoked with a timeout of 2s,
// net/http waits for 1s for the peer to respond to a GO_AWAY frame, so
// we should wait for a minimum of 2s
stopHttpServerCh = drainedCh.Signaled()
delayedStopOrDrainedCh = drainedCh.Signaled()
shutdownTimeout = 2 * time.Second
klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout)
}
// pre-shutdown hooks need to finish before we stop the http server
preShutdownHooksHasStoppedCh, stopHttpServerCh := make(chan struct{}), make(chan struct{})
go func() {
defer close(stopHttpServerCh)
<-delayedStopOrDrainedCh
<-preShutdownHooksHasStoppedCh
}()
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
if err != nil {
return err
@ -462,8 +471,12 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")
<-stopCh
// run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver.
err = s.RunPreShutdownHooks()
// run shutdown hooks directly. This includes deregistering from
// the kubernetes endpoint in case of kube-apiserver.
func() {
defer close(preShutdownHooksHasStoppedCh)
err = s.RunPreShutdownHooks()
}()
if err != nil {
return err
}

View File

@ -26,6 +26,7 @@ import (
"net"
"net/http"
"net/http/httptrace"
"os"
"reflect"
"sync"
"testing"
@ -35,11 +36,17 @@ import (
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/klog/v2"
"github.com/google/go-cmp/cmp"
"golang.org/x/net/http2"
)
func TestMain(m *testing.M) {
klog.InitFlags(nil)
os.Exit(m.Run())
}
// doer sends a request to the server
type doer func(client *http.Client, gci func(httptrace.GotConnInfo), path string, timeout time.Duration) result
@ -420,6 +427,63 @@ func TestMuxAndDiscoveryComplete(t *testing.T) {
t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
}
}
func TestPreShutdownHooks(t *testing.T) {
s := newGenericAPIServer(t, true)
doer := setupDoer(t, s.SecureServingInfo)
preShutdownHookErrCh := make(chan error)
err := s.AddPreShutdownHook("test-backend", func() error {
// this pre-shutdown hook waits for the requests in flight to drain
// and then send a series of requests to the apiserver, and
// we expect these series of requests to be completed successfully
<-s.lifecycleSignals.InFlightRequestsDrained.Signaled()
// we send 5 requests, once every second
var r result
client := newClient(true)
for i := 0; i < 5; i++ {
r = doer.Do(client, func(httptrace.GotConnInfo) {}, fmt.Sprintf("/echo?message=attempt-%d", i), 100*time.Millisecond)
if r.err != nil {
break
}
time.Sleep(time.Second)
}
preShutdownHookErrCh <- r.err
return nil
})
if err != nil {
t.Fatalf("Failed to add pre-shutdown hook - %v", err)
}
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
go func() {
defer func() {
// this test has an inherent race condition when we wait for two go routines
// to finish - the Run method and the pre-shutdown hook, each running in
// its own goroutine, give it a second before unblocking the test assert
<-time.After(time.Second)
close(runCompletedCh)
}()
s.PrepareRun().Run(stopCh)
}()
waitForAPIServerStarted(t, doer)
close(stopCh)
select {
case err := <-preShutdownHookErrCh:
if err != nil {
t.Fatalf("PreSHutdown hook can not access the API server - %v", err)
}
case <-runCompletedCh:
t.Fatalf("API Server exited without running the PreShutdown hooks")
case <-time.After(15 * time.Second):
t.Fatalf("test timed out after 15 seconds")
}
}
func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) {
return func(ci httptrace.GotConnInfo) {
if !ci.Reused {