apiserver: add a new mode for graceful termination

add a new mode for graceful termination with the new server run option
'shutdown-send-retry-after'
- shutdown-send-retry-after=true: we initiate shutdown of the
  HTTP Server when all in-flight request(s) have been drained. during
  this window all incoming requests are rejected with status code
  429 and the following response headers:
    - 'Retry-After: N' - client should retry after N seconds
    - 'Connection: close' - tear down the TCP connection
- shutdown-send-retry-after=false: we initiate shutdown of the
  HTTP Server as soon as shutdown-delay-duration has elapsed. This
  is in keeping with the current behavior.

Kubernetes-commit: 3182b69e970bd1fd036ff839fdf811f14e790244
This commit is contained in:
Abu Kashem 2021-07-14 10:39:29 -04:00 committed by Kubernetes Publisher
parent 5d12abfebe
commit ffb869e08f
8 changed files with 626 additions and 28 deletions

View File

@ -231,6 +231,15 @@ type Config struct {
// in the storage per resource, so we can estimate width of incoming requests.
StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
// ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP
// Server during the graceful termination of the apiserver. If true, we wait
// for non longrunning requests in flight to be drained and then initiate a
// shutdown of the HTTP Server. If false, we initiate a shutdown of the HTTP
// Server as soon as ShutdownDelayDuration has elapsed.
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
// rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool
//===========================================================================
// values below here are targets for removal
//===========================================================================
@ -611,7 +620,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
maxRequestBodyBytes: c.MaxRequestBodyBytes,
livezClock: clock.RealClock{},
lifecycleSignals: c.lifecycleSignals,
lifecycleSignals: c.lifecycleSignals,
ShutdownSendRetryAfter: c.ShutdownSendRetryAfter,
APIServerID: c.APIServerID,
StorageVersionManager: c.StorageVersionManager,
@ -789,6 +799,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
}
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
handler = genericapifilters.WithTracing(handler, c.TracerProvider)

View File

@ -32,6 +32,12 @@ import (
// WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown.
func WithWaitGroup(handler http.Handler, longRunning apirequest.LongRunningRequestCheck, wg *utilwaitgroup.SafeWaitGroup) http.Handler {
// NOTE: both WithWaitGroup and WithRetryAfter must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter,
// otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully.
return withWaitGroup(handler, longRunning, wg, isRequestExemptFromRetryAfter)
}
func withWaitGroup(handler http.Handler, longRunning apirequest.LongRunningRequestCheck, wg *utilwaitgroup.SafeWaitGroup, isRequestExemptFn isRequestExemptFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
@ -41,21 +47,33 @@ func WithWaitGroup(handler http.Handler, longRunning apirequest.LongRunningReque
return
}
if !longRunning(req, requestInfo) {
if err := wg.Add(1); err != nil {
// When apiserver is shutting down, signal clients to retry
// There is a good chance the client hit a different server, so a tight retry is good for client responsiveness.
w.Header().Add("Retry-After", "1")
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
w.Header().Set("X-Content-Type-Options", "nosniff")
statusErr := apierrors.NewServiceUnavailable("apiserver is shutting down").Status()
w.WriteHeader(int(statusErr.Code))
fmt.Fprintln(w, runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &statusErr))
return
}
defer wg.Done()
if longRunning(req, requestInfo) {
handler.ServeHTTP(w, req)
return
}
if err := wg.Add(1); err != nil {
// shutdown delay duration has elapsed and SafeWaitGroup.Wait has been invoked,
// this means 'WithRetryAfter' has started sending Retry-After response.
// we are going to exempt the same set of requests that WithRetryAfter are
// exempting from being rejected with a Retry-After response.
if isRequestExemptFn(req) {
handler.ServeHTTP(w, req)
return
}
// When apiserver is shutting down, signal clients to retry
// There is a good chance the client hit a different server, so a tight retry is good for client responsiveness.
w.Header().Add("Retry-After", "1")
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
w.Header().Set("X-Content-Type-Options", "nosniff")
statusErr := apierrors.NewServiceUnavailable("apiserver is shutting down").Status()
w.WriteHeader(int(statusErr.Code))
fmt.Fprintln(w, runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &statusErr))
return
}
defer wg.Done()
handler.ServeHTTP(w, req)
})
}

View File

@ -0,0 +1,130 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"net/http"
"strings"
)
var (
// health probes and metrics scraping are never rejected, we will continue
// serving these requests after shutdown delay duration elapses.
pathPrefixesExemptFromRetryAfter = []string{
"/readyz",
"/livez",
"/healthz",
"/metrics",
}
)
// isRequestExemptFunc returns true if the request should not be rejected,
// with a Retry-After response, otherwise it returns false.
type isRequestExemptFunc func(*http.Request) bool
// retryAfterParams dictates how the Retry-After response is constructed
type retryAfterParams struct {
// TearDownConnection is true when we should send a 'Connection: close'
// header in the response so net/http can tear down the TCP connection.
TearDownConnection bool
// Message describes why Retry-After response has been sent by the server
Message string
}
// shouldRespondWithRetryAfterFunc returns true if the requests should
// be rejected with a Retry-After response once certain conditions are met.
// The retryAfterParams returned contains instructions on how to
// construct the Retry-After response.
type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
// WithRetryAfter rejects any incoming new request(s) with a 429
// if the specified shutdownDelayDurationElapsedFn channel is closed
//
// It includes new request(s) on a new or an existing TCP connection
// Any new request(s) arriving after shutdownDelayDurationElapsedFn is closed
// are replied with a 429 and the following response headers:
// - 'Retry-After: N` (so client can retry after N seconds, hopefully on a new apiserver instance)
// - 'Connection: close': tear down the TCP connection
//
// TODO: is there a way to merge WithWaitGroup and this filter?
func WithRetryAfter(handler http.Handler, shutdownDelayDurationElapsedCh <-chan struct{}) http.Handler {
shutdownRetryAfterParams := &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}
// NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter,
// otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully.
return withRetryAfter(handler, isRequestExemptFromRetryAfter, func() (*retryAfterParams, bool) {
select {
case <-shutdownDelayDurationElapsedCh:
return shutdownRetryAfterParams, true
default:
return nil, false
}
})
}
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn shouldRespondWithRetryAfterFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
params, send := shouldRespondWithRetryAfterFn()
if !send || isRequestExemptFn(req) {
handler.ServeHTTP(w, req)
return
}
// If we are here this means it's time to send Retry-After response
//
// Copied from net/http2 library
// "Connection" headers aren't allowed in HTTP/2 (RFC 7540, 8.1.2.2),
// but respect "Connection" == "close" to mean sending a GOAWAY and tearing
// down the TCP connection when idle, like we do for HTTP/1.
if params.TearDownConnection {
w.Header().Set("Connection", "close")
}
// Return a 429 status asking the client to try again after 5 seconds
w.Header().Set("Retry-After", "5")
http.Error(w, params.Message, http.StatusTooManyRequests)
})
}
// isRequestExemptFromRetryAfter returns true if the given request should be exempt
// from being rejected with a 'Retry-After' response.
// NOTE: both 'WithRetryAfter' and 'WithWaitGroup' filters should use this function
// to exempt the set of requests from being rejected or tracked.
func isRequestExemptFromRetryAfter(r *http.Request) bool {
return isKubeApiserverUserAgent(r) || hasExemptPathPrefix(r)
}
// isKubeApiserverUserAgent returns true if the user-agent matches
// the one set by the local loopback.
// NOTE: we can't look up the authenticated user informaion from the
// request context since the authentication filter has not executed yet.
func isKubeApiserverUserAgent(req *http.Request) bool {
return strings.HasPrefix(req.UserAgent(), "kube-apiserver/")
}
func hasExemptPathPrefix(r *http.Request) bool {
for _, whiteListedPrefix := range pathPrefixesExemptFromRetryAfter {
if strings.HasPrefix(r.URL.Path, whiteListedPrefix) {
return true
}
}
return false
}

View File

@ -0,0 +1,207 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"net/http"
"net/http/httptest"
"testing"
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
)
func TestWithRetryAfter(t *testing.T) {
tests := []struct {
name string
shutdownDelayDurationElapsedFn func() <-chan struct{}
requestURL string
userAgent string
safeWaitGroupIsWaiting bool
handlerInvoked int
closeExpected string
retryAfterExpected string
statusCodeExpected int
}{
{
name: "retry-after disabled",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(false)
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
handlerInvoked: 1,
closeExpected: "",
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "retry-after enabled, request is not exempt",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
handlerInvoked: 0,
closeExpected: "close",
retryAfterExpected: "5",
statusCodeExpected: http.StatusTooManyRequests,
},
{
name: "retry-after enabled, request is exempt(/metrics)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/metrics?foo=bar",
userAgent: "foo",
handlerInvoked: 1,
closeExpected: "",
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "retry-after enabled, request is exempt(/livez)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/livez?verbose",
userAgent: "foo",
handlerInvoked: 1,
closeExpected: "",
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "retry-after enabled, request is exempt(/readyz)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/readyz?verbose",
userAgent: "foo",
handlerInvoked: 1,
closeExpected: "",
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "retry-after enabled, request is exempt(/healthz)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/healthz?verbose",
userAgent: "foo",
handlerInvoked: 1,
closeExpected: "",
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "retry-after enabled, request is exempt(local loopback)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/api/v1/namespaces",
userAgent: "kube-apiserver/",
handlerInvoked: 1,
closeExpected: "",
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "nil channel",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return nil
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
handlerInvoked: 1,
closeExpected: "",
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/readyz?verbose",
userAgent: "foo",
safeWaitGroupIsWaiting: true,
handlerInvoked: 1,
closeExpected: "",
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var handlerInvoked int
handler := http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {
handlerInvoked++
})
safeWG := new(utilwaitgroup.SafeWaitGroup)
if test.safeWaitGroupIsWaiting {
// mark the safe wait group as waiting, it's a blocking call
// but since the WaitGroup counter is zero it should not block
safeWG.Wait()
}
wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool {
return false
}, safeWG)
wrapped = WithRetryAfter(wrapped, test.shutdownDelayDurationElapsedFn())
req, err := http.NewRequest(http.MethodGet, test.requestURL, nil)
if err != nil {
t.Fatalf("failed to create new http request - %v", err)
}
req.Header.Set("User-Agent", test.userAgent)
req = req.WithContext(apirequest.WithRequestInfo(req.Context(), &apirequest.RequestInfo{}))
w := httptest.NewRecorder()
wrapped.ServeHTTP(w, req)
if test.handlerInvoked != handlerInvoked {
t.Errorf("expected the handler to be invoked: %d timed, but got: %d", test.handlerInvoked, handlerInvoked)
}
if test.statusCodeExpected != w.Result().StatusCode {
t.Errorf("expected status code: %d, but got: %d", test.statusCodeExpected, w.Result().StatusCode)
}
closeGot := w.Header().Get("Connection")
if test.closeExpected != closeGot {
t.Errorf("expected Connection close: %s, but got: %s", test.closeExpected, closeGot)
}
retryAfterGot := w.Header().Get("Retry-After")
if test.retryAfterExpected != retryAfterGot {
t.Errorf("expected Retry-After: %s, but got: %s", test.retryAfterExpected, retryAfterGot)
}
})
}
}
func newChannel(closed bool) <-chan struct{} {
ch := make(chan struct{})
if closed {
close(ch)
}
return ch
}

View File

@ -213,6 +213,15 @@ type GenericAPIServer struct {
// lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
lifecycleSignals lifecycleSignals
// ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP
// Server during the graceful termination of the apiserver. If true, we wait
// for non longrunning requests in flight to be drained and then initiate a
// shutdown of the HTTP Server. If false, we initiate a shutdown of the HTTP
// Server as soon as ShutdownDelayDuration has elapsed.
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
// rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool
}
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@ -352,7 +361,22 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
}()
// close socket after delayed stopCh
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled())
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
stopHttpServerCh := delayedStopCh.Signaled()
shutdownTimeout := s.ShutdownTimeout
if s.ShutdownSendRetryAfter {
// when this mode is enabled, we do the following:
// - the server will continue to listen until all existing requests in flight
// (not including active long runnning requests) have been drained.
// - once drained, http Server Shutdown is invoked with a timeout of 2s,
// net/http waits for 1s for the peer to respond to a GO_AWAY frame, so
// we should wait for a minimum of 2s
stopHttpServerCh = drainedCh.Signaled()
shutdownTimeout = 2 * time.Second
klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout)
}
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
if err != nil {
return err
}
@ -363,7 +387,6 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
}()
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
go func() {
defer drainedCh.Signal()
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name())
@ -397,7 +420,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// NonBlockingRun spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
// The returned channel is closed when the (asynchronous) termination is finished.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
// Use an stop channel to allow graceful shutdown without dropping audit events
// after http server shutdown.
auditStopCh := make(chan struct{})
@ -416,8 +439,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan
var listenerStoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
klog.V(1).Infof("[graceful-termination] ShutdownTimeout=%s", s.ShutdownTimeout)
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, s.ShutdownTimeout, internalStopCh)
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, shutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
close(auditStopCh)

View File

@ -111,7 +111,7 @@ func newStep(fn func()) *step {
}
func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t *testing.T) {
s := newGenericAPIServer(t)
s := newGenericAPIServer(t, false)
// record the termination events in the order they are signaled
var signalOrderLock sync.Mutex
@ -143,9 +143,9 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
delayedStopVerificationStepExecuted = true
t.Log("Before ShutdownDelayDuration elapses new request(s) should be served")
resultGot := doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-succeed", time.Second)
requestMustSucceed(t, resultGot)
assertResponse(t, resultGot, http.StatusOK)
resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second)
requestMustSucceed(t, resultGot)
assertResponse(t, resultGot, http.StatusOK)
})
steps := func(before bool, name string, e lifecycleSignal) {
// Before AfterShutdownDelayDuration event is signaled, the test
@ -208,7 +208,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
case <-time.After(5 * time.Second):
t.Fatal("Expected the server to send a response")
}
requestMustSucceed(t, inFlightResultGot)
assertResponse(t, inFlightResultGot, http.StatusOK)
t.Log("Waiting for the apiserver Run method to return")
select {
@ -232,6 +232,153 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
}()
}
func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t *testing.T) {
s := newGenericAPIServer(t, true)
// record the termination events in the order they are signaled
var signalOrderLock sync.Mutex
signalOrderGot := make([]string, 0)
recordOrderFn := func(before bool, name string, e lifecycleSignal) {
if !before {
return
}
signalOrderLock.Lock()
defer signalOrderLock.Unlock()
signalOrderGot = append(signalOrderGot, name)
}
// handler for a request that we want to keep in flight through to the end
inFlightRequestBlockedCh, inFlightStartedCh := make(chan result), make(chan struct{})
inFlightRequest := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
close(inFlightStartedCh)
// this request handler blocks until we deliberately unblock it.
<-inFlightRequestBlockedCh
w.WriteHeader(http.StatusOK)
})
s.Handler.NonGoRestfulMux.Handle("/in-flight-request-as-designed", inFlightRequest)
connReusingClient := newClient(false)
doer := setupDoer(t, s.SecureServingInfo)
var delayedStopVerificationStepExecuted bool
delayedStopVerificationStep := newStep(func() {
delayedStopVerificationStepExecuted = true
t.Log("Before ShutdownDelayDuration elapses new request(s) should be served")
resultGot := doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-succeed", time.Second)
assertResponse(t, resultGot, http.StatusOK)
resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second)
assertResponse(t, resultGot, http.StatusOK)
})
steps := func(before bool, name string, e lifecycleSignal) {
// Before AfterShutdownDelayDuration event is signaled, the test
// will send request(s) to assert on expected behavior.
if name == "AfterShutdownDelayDuration" && before {
// it unblocks the verification step and waits for it to complete
<-delayedStopVerificationStep.done()
}
}
// wrap the termination signals of the GenericAPIServer so the test can inject its own callback
wrapLifecycleSignals(t, &s.lifecycleSignals, func(before bool, name string, e lifecycleSignal) {
recordOrderFn(before, name, e)
steps(before, name, e)
})
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
}()
waitForAPIServerStarted(t, doer)
// step 1: fire a request that we want to keep in-flight through to the end
inFlightResultCh := make(chan result)
go func() {
resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/in-flight-request-as-designed", 0)
inFlightResultCh <- resultGot
}()
select {
case <-inFlightStartedCh:
case <-time.After(5 * time.Second):
t.Fatalf("Waited for 5s for the in-flight request to reach the server")
}
//step 1: /readyz should return OK
resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/readyz", time.Second)
assertResponse(t, resultGot, http.StatusOK)
// step 2: signal termination event: initiate a shutdown
close(stopCh)
// step 3: /readyz must return an error, but we need to give it some time
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/readyz", time.Second)
// wait until we have a non 200 response
if resultGot.response != nil && resultGot.response.StatusCode == http.StatusOK {
return false, nil
}
assertResponse(t, resultGot, http.StatusInternalServerError)
return true, nil
})
if err != nil {
t.Errorf("Expected /readyz to return 500 status code, but got: %v", err)
}
// step 4: before ShutdownDelayDuration elapses new request(s) should be served successfully.
delayedStopVerificationStep.execute()
if !delayedStopVerificationStepExecuted {
t.Fatal("Expected the AfterShutdownDelayDuration verification step to execute")
}
// step 5: ShutdownDelayDuration has elapsed, all incoming requests should receive 429
t.Log("Verify that new incoming request(s) get 429")
resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-fail-with-429", time.Second)
requestMustFailWithRetryHeader(t, resultGot, http.StatusTooManyRequests)
resultGot = doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-fail-with-429", time.Second)
requestMustFailWithRetryHeader(t, resultGot, http.StatusTooManyRequests)
// step 6: we still have a request in flight, let it unblock and we expect the request to succeed.
close(inFlightRequestBlockedCh)
var inFlightResultGot result
select {
case inFlightResultGot = <-inFlightResultCh:
case <-time.After(5 * time.Second):
t.Fatal("Expected the server to send a response")
}
assertResponse(t, inFlightResultGot, http.StatusOK)
// step 7: wait for the HTTP Server listener to have stopped
httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening
select {
case <-httpServerStoppedListeningCh.Signaled():
case <-time.After(5 * time.Second):
t.Fatal("Expected the server to signal HTTPServerStoppedListening event")
}
t.Log("Waiting for the apiserver Run method to return")
select {
case <-runCompletedCh:
case <-time.After(5 * time.Second):
t.Fatal("Expected the apiserver Run method to return")
}
lifecycleSignalOrderExpected := []string{
string("ShutdownInitiated"),
string("AfterShutdownDelayDuration"),
string("InFlightRequestsDrained"),
string("HTTPServerStoppedListening"),
}
func() {
signalOrderLock.Lock()
defer signalOrderLock.Unlock()
if !reflect.DeepEqual(lifecycleSignalOrderExpected, signalOrderGot) {
t.Errorf("Expected order of termination event signal to match, diff: %s", cmp.Diff(lifecycleSignalOrderExpected, signalOrderGot))
}
}()
}
func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) {
return func(ci httptrace.GotConnInfo) {
if !ci.Reused {
@ -248,13 +395,27 @@ func shouldUseNewConnection(t *testing.T) func(httptrace.GotConnInfo) {
}
}
func requestMustSucceed(t *testing.T, resultGot result) {
func assertResponse(t *testing.T, resultGot result, statusCodeExpected int) {
if resultGot.err != nil {
t.Errorf("Expected no error, but got: %v", resultGot.err)
return
}
if resultGot.response.StatusCode != http.StatusOK {
t.Errorf("Expected Status Code: %d, but got: %d", http.StatusOK, resultGot.response.StatusCode)
if resultGot.response.StatusCode != statusCodeExpected {
t.Errorf("Expected Status Code: %d, but got: %d", statusCodeExpected, resultGot.response.StatusCode)
}
}
func requestMustFailWithRetryHeader(t *testing.T, resultGot result, statusCodedExpected int) {
if resultGot.err != nil {
t.Errorf("Expected no error, but got: %v", resultGot.err)
return
}
if statusCodedExpected != resultGot.response.StatusCode {
t.Errorf("Expected Status Code: %d, but got: %d", statusCodedExpected, resultGot.response.StatusCode)
}
retryAfterGot := resultGot.response.Header.Get("Retry-After")
if retryAfterGot != "5" {
t.Errorf("Expected Retry-After Response Header, but got: %v", resultGot.response)
}
}
@ -351,11 +512,15 @@ func newClient(useNewConnection bool) *http.Client {
}
}
func newGenericAPIServer(t *testing.T) *GenericAPIServer {
func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer {
config, _ := setUp(t)
config.ShutdownDelayDuration = 100 * time.Millisecond
config.ShutdownSendRetryAfter = keepListening
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
}
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
return handler
}

View File

@ -28,7 +28,9 @@ Events:
- InFlightRequestsDrained: all in flight request(s) have been drained
- HasBeenReady is signaled when the readyz endpoint succeeds for the first time
The following is a sequence of shutdown events that we expect to see during termination:
The following is a sequence of shutdown events that we expect to see with
'ShutdownSendRetryAfter' = false:
T0: ShutdownInitiated: KILL signal received
- /readyz starts returning red
- run pre shutdown hooks
@ -54,6 +56,31 @@ T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have
any request in flight has a hard timeout of 60s.
- it's time to call 'Shutdown' on the audit events since all
in flight request(s) have drained.
The following is a sequence of shutdown events that we expect to see with
'ShutdownSendRetryAfter' = true:
T0: ShutdownInitiated: KILL signal received
- /readyz starts returning red
- run pre shutdown hooks
T0+70s: AfterShutdownDelayDuration: shutdown delay duration has passed
- the default value of 'ShutdownDelayDuration' is '70s'
- the HTTP Server will continue to listen
- the apiserver is not accepting new request(s)
- it includes new request(s) on a new or an existing TCP connection
- new request(s) arriving after this point are replied with a 429
and the response headers: 'Retry-After: 1` and 'Connection: close'
- note: these new request(s) will not show up in audit logs
T0 + 70s + up to 60s: InFlightRequestsDrained: existing in flight requests have been drained
- long running requests are outside of this scope
- up to 60s: the default value of 'ShutdownTimeout' is 60s, this means that
any request in flight has a hard timeout of 60s.
- server.Shutdown is called, the HTTP Server stops listening immediately
- the HTTP Server waits gracefully for existing requests to complete
up to '2s' (it's hard coded right now)
*/
// lifecycleSignal encapsulates a named apiserver event

View File

@ -54,6 +54,15 @@ type ServerRunOptions struct {
// apiserver library can wire it to a flag.
MaxRequestBodyBytes int64
EnablePriorityAndFairness bool
// ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP
// Server during the graceful termination of the apiserver. If true, we wait
// for non longrunning requests in flight to be drained and then initiate a
// shutdown of the HTTP Server. If false, we initiate a shutdown of the HTTP
// Server as soon as ShutdownDelayDuration has elapsed.
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
// rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool
}
func NewServerRunOptions() *ServerRunOptions {
@ -68,6 +77,7 @@ func NewServerRunOptions() *ServerRunOptions {
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
EnablePriorityAndFairness: true,
ShutdownSendRetryAfter: false,
}
}
@ -86,6 +96,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes
c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
c.PublicAddress = s.AdvertiseAddress
c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter
return nil
}
@ -245,5 +256,10 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"will return success, but /readyz immediately returns failure. Graceful termination starts after this delay "+
"has elapsed. This can be used to allow load balancer to stop sending traffic to this server.")
fs.BoolVar(&s.ShutdownSendRetryAfter, "shutdown-send-retry-after", s.ShutdownSendRetryAfter, ""+
"If true the HTTP Server will continue listening until all non long running request(s) in flight have been drained, "+
"during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+
"in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.")
utilfeature.DefaultMutableFeatureGate.AddFlag(fs)
}