From f3ae70d0cf2ee5c7d5b41e058fc78c0d3e43ac10 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Tue, 10 Aug 2021 12:03:21 -0400 Subject: [PATCH] send retry-after until ready Kubernetes-commit: 6e3923d0a4f4720d2d9f628eb9c073d2d3ee291a --- pkg/server/config.go | 54 +++++- pkg/server/config_test.go | 154 ++++++++++++++++++ pkg/server/filters/with_retry_after.go | 35 +--- pkg/server/filters/with_retry_after_test.go | 96 ++--------- ...ericapiserver_graceful_termination_test.go | 3 +- pkg/server/options/server_run_options.go | 36 ++-- 6 files changed, 253 insertions(+), 125 deletions(-) diff --git a/pkg/server/config.go b/pkg/server/config.go index df5566bc5..0739a3baa 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -240,6 +240,15 @@ type Config struct { // rejected with a 429 status code and a 'Retry-After' response. ShutdownSendRetryAfter bool + // StartupSendRetryAfterUntilReady once set will reject incoming requests with + // a 429 status code and a 'Retry-After' response header until the apiserver + // hasn't fully initialized. + // This option ensures that the system stays consistent even when requests + // are received before the server has been initialized. + // In particular, it prevents child deletion in case of GC or/and orphaned + // content in case of the namespaces controller. + StartupSendRetryAfterUntilReady bool + //=========================================================================== // values below here are targets for removal //=========================================================================== @@ -473,6 +482,46 @@ func (c *Config) AddPostStartHookOrDie(name string, hook PostStartHookFunc) { } } +// shouldAddWithRetryAfterFilter returns an appropriate ShouldRespondWithRetryAfterFunc +// if the apiserver should respond with a Retry-After response header based on option +// 'shutdown-send-retry-after' or 'startup-send-retry-after-until-ready'. +func (c *Config) shouldAddWithRetryAfterFilter() genericfilters.ShouldRespondWithRetryAfterFunc { + if !(c.ShutdownSendRetryAfter || c.StartupSendRetryAfterUntilReady) { + return nil + } + + // follow lifecycle, avoiding go routines per request + const ( + startup int32 = iota + running + terminating + ) + state := startup + go func() { + <-c.lifecycleSignals.HasBeenReady.Signaled() + atomic.StoreInt32(&state, running) + <-c.lifecycleSignals.AfterShutdownDelayDuration.Signaled() + atomic.StoreInt32(&state, terminating) + }() + + return func() (*genericfilters.RetryAfterParams, bool) { + state := atomic.LoadInt32(&state) + switch { + case c.StartupSendRetryAfterUntilReady && state == startup: + return &genericfilters.RetryAfterParams{ + Message: "The apiserver hasn't been fully initialized yet, please try again later.", + }, true + case c.ShutdownSendRetryAfter && state == terminating: + return &genericfilters.RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true + default: + return nil, false + } + } +} + // Complete fills in any fields not set that are required to have valid data and can be derived // from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver. func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig { @@ -799,9 +848,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithWarningRecorder(handler) handler = genericapifilters.WithCacheControl(handler) handler = genericfilters.WithHSTS(handler, c.HSTSDirectives) - if c.ShutdownSendRetryAfter { - shouldRetryAfterFn := genericfilters.NewShouldRespondWithRetryAfterFunc(c.ShutdownSendRetryAfter, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) - handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn) + if shouldRespondWithRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRespondWithRetryAfterFn != nil { + handler = genericfilters.WithRetryAfter(handler, shouldRespondWithRetryAfterFn) } handler = genericfilters.WithHTTPLogging(handler) if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { diff --git a/pkg/server/config_test.go b/pkg/server/config_test.go index 532cf49d3..e60f4a516 100644 --- a/pkg/server/config_test.go +++ b/pkg/server/config_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" + genericfilters "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -345,6 +346,159 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) { } } +func TestShouldRespondWithRetryAfterFunc(t *testing.T) { + tests := []struct { + name string + config *Config + addWithRetryAfterFilterExpected bool + sendRetryAfterExpected bool + retryAfterParamsExpected *genericfilters.RetryAfterParams + }{ + { + name: "both shutdown-send-retry-after and startup-send-retry-after-until-ready are not enabled", + config: &Config{ + StartupSendRetryAfterUntilReady: false, + ShutdownSendRetryAfter: false, + }, + addWithRetryAfterFilterExpected: false, + sendRetryAfterExpected: false, + retryAfterParamsExpected: nil, + }, + { + name: "shutdown-send-retry-after is enabled, the apserver is shutting down", + config: func() *Config { + c := &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: false, + ShutdownSendRetryAfter: true, + } + c.lifecycleSignals.HasBeenReady.Signal() + c.lifecycleSignals.AfterShutdownDelayDuration.Signal() + return c + }(), + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: true, + retryAfterParamsExpected: &genericfilters.RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, + }, + { + name: "shutdown-send-retry-after is enabled, the apserver is not in shutdown mode", + config: &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: false, + ShutdownSendRetryAfter: true, + }, + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: false, + retryAfterParamsExpected: nil, + }, + { + name: "startup-send-retry-after-until-ready is enabled, the apserver is not ready yet", + config: &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: true, + ShutdownSendRetryAfter: false, + }, + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: true, + retryAfterParamsExpected: &genericfilters.RetryAfterParams{ + TearDownConnection: false, + Message: "The apiserver hasn't been fully initialized yet, please try again later.", + }, + }, + { + name: "startup-send-retry-after-until-ready is enabled, the apserver is ready", + config: func() *Config { + c := &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: true, + ShutdownSendRetryAfter: false, + } + c.lifecycleSignals.HasBeenReady.Signal() + return c + }(), + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: false, + retryAfterParamsExpected: nil, + }, + { + name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is not ready", + config: &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: true, + ShutdownSendRetryAfter: true, + }, + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: true, + retryAfterParamsExpected: &genericfilters.RetryAfterParams{ + TearDownConnection: false, + Message: "The apiserver hasn't been fully initialized yet, please try again later.", + }, + }, + { + name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is ready", + config: func() *Config { + c := &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: true, + ShutdownSendRetryAfter: true, + } + c.lifecycleSignals.HasBeenReady.Signal() + return c + }(), + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: false, + retryAfterParamsExpected: nil, + }, + { + name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is shutting down", + config: func() *Config { + c := &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: true, + ShutdownSendRetryAfter: true, + } + c.lifecycleSignals.HasBeenReady.Signal() + c.lifecycleSignals.AfterShutdownDelayDuration.Signal() + return c + }(), + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: true, + retryAfterParamsExpected: &genericfilters.RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + shouldRespondWithRetryAfterFn := test.config.shouldAddWithRetryAfterFilter() + + // we need to sleep some time to allow the goroutine launched by + // shouldAddWithRetryAfterFilter to finish + time.Sleep(100 * time.Millisecond) + + if test.addWithRetryAfterFilterExpected != (shouldRespondWithRetryAfterFn != nil) { + t.Errorf("Expected add WithRetryAfter: %t, but got: %t", test.addWithRetryAfterFilterExpected, shouldRespondWithRetryAfterFn != nil) + } + if !test.addWithRetryAfterFilterExpected { + return + } + + paramsGot, sendRetryAfterGot := shouldRespondWithRetryAfterFn() + if test.sendRetryAfterExpected != sendRetryAfterGot { + t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot) + } + if !reflect.DeepEqual(test.retryAfterParamsExpected, paramsGot) { + t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, paramsGot)) + } + }) + } +} + type testBackend struct { events []*auditinternal.Event diff --git a/pkg/server/filters/with_retry_after.go b/pkg/server/filters/with_retry_after.go index 7b2343f07..c3c5cda1f 100644 --- a/pkg/server/filters/with_retry_after.go +++ b/pkg/server/filters/with_retry_after.go @@ -36,8 +36,8 @@ var ( // with a Retry-After response, otherwise it returns false. type isRequestExemptFunc func(*http.Request) bool -// retryAfterParams dictates how the Retry-After response is constructed -type retryAfterParams struct { +// RetryAfterParams dictates how the Retry-After response is constructed +type RetryAfterParams struct { // TearDownConnection is true when we should send a 'Connection: close' // header in the response so net/http can tear down the TCP connection. TearDownConnection bool @@ -48,9 +48,9 @@ type retryAfterParams struct { // ShouldRespondWithRetryAfterFunc returns true if the requests should // be rejected with a Retry-After response once certain conditions are met. -// The retryAfterParams returned contains instructions on how to +// The RetryAfterParams returned contains instructions on how to // construct the Retry-After response. -type ShouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool) +type ShouldRespondWithRetryAfterFunc func() (*RetryAfterParams, bool) // WithRetryAfter rejects any incoming new request(s) with a 429 // if the specified shutdownDelayDurationElapsedFn channel is closed @@ -68,33 +68,6 @@ func WithRetryAfter(handler http.Handler, when ShouldRespondWithRetryAfterFunc) return withRetryAfter(handler, isRequestExemptFromRetryAfter, when) } -// NewShouldRespondWithRetryAfterFunc returns a ShouldRespondWithRetryAfterFunc -func NewShouldRespondWithRetryAfterFunc(shutdownSendRetryAfter bool, shutdownCh <-chan struct{}) ShouldRespondWithRetryAfterFunc { - if !shutdownSendRetryAfter { - return func() (*retryAfterParams, bool) { - return nil, false - } - } - - return newShutdownRetryAfterFunc(shutdownCh) -} - -func newShutdownRetryAfterFunc(shutdownCh <-chan struct{}) ShouldRespondWithRetryAfterFunc { - shutdownRetryAfterParams := &retryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - } - - return func() (*retryAfterParams, bool) { - select { - case <-shutdownCh: - return shutdownRetryAfterParams, true - default: - return nil, false - } - } -} - func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn ShouldRespondWithRetryAfterFunc) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { params, send := shouldRespondWithRetryAfterFn() diff --git a/pkg/server/filters/with_retry_after_test.go b/pkg/server/filters/with_retry_after_test.go index 08b1e0809..0200e6b7c 100644 --- a/pkg/server/filters/with_retry_after_test.go +++ b/pkg/server/filters/with_retry_after_test.go @@ -17,10 +17,8 @@ limitations under the License. package filters import ( - "github.com/google/go-cmp/cmp" "net/http" "net/http/httptest" - "reflect" "testing" utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" @@ -41,7 +39,7 @@ func TestWithRetryAfter(t *testing.T) { }{ { name: "retry-after disabled", - when: func() (*retryAfterParams, bool) { + when: func() (*RetryAfterParams, bool) { return nil, false }, requestURL: "/api/v1/namespaces", @@ -53,8 +51,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is not exempt", - when: func() (*retryAfterParams, bool) { - return &retryAfterParams{ + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ TearDownConnection: true, Message: "The apiserver is shutting down, please try again later.", }, true @@ -68,8 +66,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is not exempt, no connection tear down", - when: func() (*retryAfterParams, bool) { - return &retryAfterParams{ + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ TearDownConnection: false, Message: "The apiserver is shutting down, please try again later.", }, true @@ -83,8 +81,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/metrics)", - when: func() (*retryAfterParams, bool) { - return &retryAfterParams{ + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ TearDownConnection: true, Message: "The apiserver is shutting down, please try again later.", }, true @@ -98,8 +96,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/livez)", - when: func() (*retryAfterParams, bool) { - return &retryAfterParams{ + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ TearDownConnection: true, Message: "The apiserver is shutting down, please try again later.", }, true @@ -113,8 +111,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/readyz)", - when: func() (*retryAfterParams, bool) { - return &retryAfterParams{ + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ TearDownConnection: true, Message: "The apiserver is shutting down, please try again later.", }, true @@ -128,8 +126,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/healthz)", - when: func() (*retryAfterParams, bool) { - return &retryAfterParams{ + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ TearDownConnection: true, Message: "The apiserver is shutting down, please try again later.", }, true @@ -143,8 +141,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(local loopback)", - when: func() (*retryAfterParams, bool) { - return &retryAfterParams{ + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ TearDownConnection: true, Message: "The apiserver is shutting down, please try again later.", }, true @@ -158,8 +156,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode", - when: func() (*retryAfterParams, bool) { - return &retryAfterParams{ + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ TearDownConnection: true, Message: "The apiserver is shutting down, please try again later.", }, true @@ -223,63 +221,3 @@ func TestWithRetryAfter(t *testing.T) { }) } } - -func TestNewShouldRespondWithRetryAfterFunc(t *testing.T) { - tests := []struct { - name string - shutdownSendRetryAfter bool - shutdownCh <-chan struct{} - sendRetryAfterExpected bool - retryAfterParamsExpected *retryAfterParams - }{ - { - name: "shutdown-send-retry-after is disabled", - shutdownSendRetryAfter: false, - shutdownCh: newChannel(true), - sendRetryAfterExpected: false, - retryAfterParamsExpected: nil, - }, - { - name: "shutdown-send-retry-after is enabled, shutting down", - shutdownSendRetryAfter: true, - shutdownCh: newChannel(true), - sendRetryAfterExpected: true, - retryAfterParamsExpected: &retryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - }, - }, - { - name: "shutdown-send-retry-after is enabled, not shutting down", - shutdownSendRetryAfter: true, - shutdownCh: newChannel(false), - sendRetryAfterExpected: false, - retryAfterParamsExpected: nil, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - fn := NewShouldRespondWithRetryAfterFunc(test.shutdownSendRetryAfter, test.shutdownCh) - if fn == nil { - t.Fatal("Expected a non nil ShouldRespondWithRetryAfterFunc") - } - - retryAfterParamsGot, sendRetryAfterGot := fn() - if test.sendRetryAfterExpected != sendRetryAfterGot { - t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot) - } - if !reflect.DeepEqual(test.retryAfterParamsExpected, retryAfterParamsGot) { - t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, retryAfterParamsGot)) - } - }) - } -} - -func newChannel(closed bool) <-chan struct{} { - ch := make(chan struct{}) - if closed { - close(ch) - } - return ch -} diff --git a/pkg/server/genericapiserver_graceful_termination_test.go b/pkg/server/genericapiserver_graceful_termination_test.go index 45047df9c..d3cffbdac 100644 --- a/pkg/server/genericapiserver_graceful_termination_test.go +++ b/pkg/server/genericapiserver_graceful_termination_test.go @@ -518,8 +518,7 @@ func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer { config.ShutdownSendRetryAfter = keepListening config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler { handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup) - if c.ShutdownSendRetryAfter { - shouldRetryAfterFn := genericfilters.NewShouldRespondWithRetryAfterFunc(c.ShutdownSendRetryAfter, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) + if shouldRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRetryAfterFn != nil { handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn) } handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) diff --git a/pkg/server/options/server_run_options.go b/pkg/server/options/server_run_options.go index 07a887a53..aebabbd76 100644 --- a/pkg/server/options/server_run_options.go +++ b/pkg/server/options/server_run_options.go @@ -63,21 +63,31 @@ type ServerRunOptions struct { // If enabled, after ShutdownDelayDuration elapses, any incoming request is // rejected with a 429 status code and a 'Retry-After' response. ShutdownSendRetryAfter bool + + // StartupSendRetryAfterUntilReady once set will reject incoming requests with + // a 429 status code and a 'Retry-After' response header until the apiserver + // hasn't fully initialized. + // This option ensures that the system stays consistent even when requests + // are received before the server has been initialized. + // In particular, it prevents child deletion in case of GC or/and orphaned + // content in case of the namespaces controller. + StartupSendRetryAfterUntilReady bool } func NewServerRunOptions() *ServerRunOptions { defaults := server.NewConfig(serializer.CodecFactory{}) return &ServerRunOptions{ - MaxRequestsInFlight: defaults.MaxRequestsInFlight, - MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, - RequestTimeout: defaults.RequestTimeout, - LivezGracePeriod: defaults.LivezGracePeriod, - MinRequestTimeout: defaults.MinRequestTimeout, - ShutdownDelayDuration: defaults.ShutdownDelayDuration, - JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, - MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, - EnablePriorityAndFairness: true, - ShutdownSendRetryAfter: false, + MaxRequestsInFlight: defaults.MaxRequestsInFlight, + MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, + RequestTimeout: defaults.RequestTimeout, + LivezGracePeriod: defaults.LivezGracePeriod, + MinRequestTimeout: defaults.MinRequestTimeout, + ShutdownDelayDuration: defaults.ShutdownDelayDuration, + JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, + MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, + EnablePriorityAndFairness: true, + ShutdownSendRetryAfter: false, + StartupSendRetryAfterUntilReady: false, } } @@ -97,6 +107,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error { c.MaxRequestBodyBytes = s.MaxRequestBodyBytes c.PublicAddress = s.AdvertiseAddress c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter + c.StartupSendRetryAfterUntilReady = s.StartupSendRetryAfterUntilReady return nil } @@ -261,5 +272,10 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { "during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+ "in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.") + fs.BoolVar(&s.StartupSendRetryAfterUntilReady, "startup-send-retry-after-until-ready", s.ShutdownSendRetryAfter, ""+ + "If true, incoming request(s) will be rejected with a '429' status code and a 'Retry-After' response header "+ + "until the apiserver has initialized. This option ensures that the system stays consistent even when requests "+ + "arrive at the server before it has been initialized.") + utilfeature.DefaultMutableFeatureGate.AddFlag(fs) }