Revert "Merge pull request #104281 from tkashem/not-ready-429"

This reverts commit fc5863b8b276e0789f717859e8cce58d7d060181, reversing
changes made to 027fe2554fd18343b8be39eddc8ff6570a6c390f.

Kubernetes-commit: f9f08725907b7db2104ee5fe9f82ab0752726533
This commit is contained in:
Abu Kashem 2021-08-31 10:10:46 -04:00 committed by Kubernetes Publisher
parent a687b3b7a9
commit 033ff70436
6 changed files with 80 additions and 303 deletions

View File

@ -239,15 +239,6 @@ type Config struct {
// rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool
// StartupSendRetryAfterUntilReady once set will reject incoming requests with
// a 429 status code and a 'Retry-After' response header until the apiserver
// hasn't fully initialized.
// This option ensures that the system stays consistent even when requests
// are received before the server has been initialized.
// In particular, it prevents child deletion in case of GC or/and orphaned
// content in case of the namespaces controller.
StartupSendRetryAfterUntilReady bool
//===========================================================================
// values below here are targets for removal
//===========================================================================
@ -481,46 +472,6 @@ func (c *Config) AddPostStartHookOrDie(name string, hook PostStartHookFunc) {
}
}
// shouldAddWithRetryAfterFilter returns an appropriate ShouldRespondWithRetryAfterFunc
// if the apiserver should respond with a Retry-After response header based on option
// 'shutdown-send-retry-after' or 'startup-send-retry-after-until-ready'.
func (c *Config) shouldAddWithRetryAfterFilter() genericfilters.ShouldRespondWithRetryAfterFunc {
if !(c.ShutdownSendRetryAfter || c.StartupSendRetryAfterUntilReady) {
return nil
}
// follow lifecycle, avoiding go routines per request
const (
startup int32 = iota
running
terminating
)
state := startup
go func() {
<-c.lifecycleSignals.HasBeenReady.Signaled()
atomic.StoreInt32(&state, running)
<-c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()
atomic.StoreInt32(&state, terminating)
}()
return func() (*genericfilters.RetryAfterParams, bool) {
state := atomic.LoadInt32(&state)
switch {
case c.StartupSendRetryAfterUntilReady && state == startup:
return &genericfilters.RetryAfterParams{
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
}, true
case c.ShutdownSendRetryAfter && state == terminating:
return &genericfilters.RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
default:
return nil, false
}
}
}
// Complete fills in any fields not set that are required to have valid data and can be derived
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
@ -847,8 +798,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
if shouldRespondWithRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRespondWithRetryAfterFn != nil {
handler = genericfilters.WithRetryAfter(handler, shouldRespondWithRetryAfterFn)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
}
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {

View File

@ -37,7 +37,6 @@ import (
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
@ -346,159 +345,6 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) {
}
}
func TestShouldRespondWithRetryAfterFunc(t *testing.T) {
tests := []struct {
name string
config *Config
addWithRetryAfterFilterExpected bool
sendRetryAfterExpected bool
retryAfterParamsExpected *genericfilters.RetryAfterParams
}{
{
name: "both shutdown-send-retry-after and startup-send-retry-after-until-ready are not enabled",
config: &Config{
StartupSendRetryAfterUntilReady: false,
ShutdownSendRetryAfter: false,
},
addWithRetryAfterFilterExpected: false,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "shutdown-send-retry-after is enabled, the apserver is shutting down",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: false,
ShutdownSendRetryAfter: true,
}
c.lifecycleSignals.HasBeenReady.Signal()
c.lifecycleSignals.AfterShutdownDelayDuration.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
},
},
{
name: "shutdown-send-retry-after is enabled, the apserver is not in shutdown mode",
config: &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: false,
ShutdownSendRetryAfter: true,
},
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "startup-send-retry-after-until-ready is enabled, the apserver is not ready yet",
config: &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: false,
},
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: false,
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
},
},
{
name: "startup-send-retry-after-until-ready is enabled, the apserver is ready",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: false,
}
c.lifecycleSignals.HasBeenReady.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is not ready",
config: &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: true,
},
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: false,
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
},
},
{
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is ready",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: true,
}
c.lifecycleSignals.HasBeenReady.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is shutting down",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: true,
}
c.lifecycleSignals.HasBeenReady.Signal()
c.lifecycleSignals.AfterShutdownDelayDuration.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
shouldRespondWithRetryAfterFn := test.config.shouldAddWithRetryAfterFilter()
// we need to sleep some time to allow the goroutine launched by
// shouldAddWithRetryAfterFilter to finish
time.Sleep(100 * time.Millisecond)
if test.addWithRetryAfterFilterExpected != (shouldRespondWithRetryAfterFn != nil) {
t.Errorf("Expected add WithRetryAfter: %t, but got: %t", test.addWithRetryAfterFilterExpected, shouldRespondWithRetryAfterFn != nil)
}
if !test.addWithRetryAfterFilterExpected {
return
}
paramsGot, sendRetryAfterGot := shouldRespondWithRetryAfterFn()
if test.sendRetryAfterExpected != sendRetryAfterGot {
t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot)
}
if !reflect.DeepEqual(test.retryAfterParamsExpected, paramsGot) {
t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, paramsGot))
}
})
}
}
type testBackend struct {
events []*auditinternal.Event

View File

@ -36,8 +36,8 @@ var (
// with a Retry-After response, otherwise it returns false.
type isRequestExemptFunc func(*http.Request) bool
// RetryAfterParams dictates how the Retry-After response is constructed
type RetryAfterParams struct {
// retryAfterParams dictates how the Retry-After response is constructed
type retryAfterParams struct {
// TearDownConnection is true when we should send a 'Connection: close'
// header in the response so net/http can tear down the TCP connection.
TearDownConnection bool
@ -46,11 +46,11 @@ type RetryAfterParams struct {
Message string
}
// ShouldRespondWithRetryAfterFunc returns true if the requests should
// shouldRespondWithRetryAfterFunc returns true if the requests should
// be rejected with a Retry-After response once certain conditions are met.
// The RetryAfterParams returned contains instructions on how to
// The retryAfterParams returned contains instructions on how to
// construct the Retry-After response.
type ShouldRespondWithRetryAfterFunc func() (*RetryAfterParams, bool)
type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
// WithRetryAfter rejects any incoming new request(s) with a 429
// if the specified shutdownDelayDurationElapsedFn channel is closed
@ -62,13 +62,25 @@ type ShouldRespondWithRetryAfterFunc func() (*RetryAfterParams, bool)
// - 'Connection: close': tear down the TCP connection
//
// TODO: is there a way to merge WithWaitGroup and this filter?
func WithRetryAfter(handler http.Handler, when ShouldRespondWithRetryAfterFunc) http.Handler {
func WithRetryAfter(handler http.Handler, shutdownDelayDurationElapsedCh <-chan struct{}) http.Handler {
shutdownRetryAfterParams := &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}
// NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter,
// otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully.
return withRetryAfter(handler, isRequestExemptFromRetryAfter, when)
return withRetryAfter(handler, isRequestExemptFromRetryAfter, func() (*retryAfterParams, bool) {
select {
case <-shutdownDelayDurationElapsedCh:
return shutdownRetryAfterParams, true
default:
return nil, false
}
})
}
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn ShouldRespondWithRetryAfterFunc) http.Handler {
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn shouldRespondWithRetryAfterFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
params, send := shouldRespondWithRetryAfterFn()
if !send || isRequestExemptFn(req) {

View File

@ -27,20 +27,20 @@ import (
func TestWithRetryAfter(t *testing.T) {
tests := []struct {
name string
when ShouldRespondWithRetryAfterFunc
requestURL string
userAgent string
safeWaitGroupIsWaiting bool
handlerInvoked int
closeExpected string
retryAfterExpected string
statusCodeExpected int
name string
shutdownDelayDurationElapsedFn func() <-chan struct{}
requestURL string
userAgent string
safeWaitGroupIsWaiting bool
handlerInvoked int
closeExpected string
retryAfterExpected string
statusCodeExpected int
}{
{
name: "retry-after disabled",
when: func() (*RetryAfterParams, bool) {
return nil, false
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(false)
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
@ -51,11 +51,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is not exempt",
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
@ -64,28 +61,10 @@ func TestWithRetryAfter(t *testing.T) {
retryAfterExpected: "5",
statusCodeExpected: http.StatusTooManyRequests,
},
{
name: "retry-after enabled, request is not exempt, no connection tear down",
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: false,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
handlerInvoked: 0,
closeExpected: "",
retryAfterExpected: "5",
statusCodeExpected: http.StatusTooManyRequests,
},
{
name: "retry-after enabled, request is exempt(/metrics)",
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/metrics?foo=bar",
userAgent: "foo",
@ -96,11 +75,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/livez)",
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/livez?verbose",
userAgent: "foo",
@ -111,11 +87,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/readyz)",
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/readyz?verbose",
userAgent: "foo",
@ -126,11 +99,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/healthz)",
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/healthz?verbose",
userAgent: "foo",
@ -141,11 +111,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(local loopback)",
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/api/v1/namespaces",
userAgent: "kube-apiserver/",
@ -154,13 +121,22 @@ func TestWithRetryAfter(t *testing.T) {
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "nil channel",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return nil
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
handlerInvoked: 1,
closeExpected: "",
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode",
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
},
requestURL: "/readyz?verbose",
userAgent: "foo",
@ -189,7 +165,7 @@ func TestWithRetryAfter(t *testing.T) {
wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool {
return false
}, safeWG)
wrapped = WithRetryAfter(wrapped, test.when)
wrapped = WithRetryAfter(wrapped, test.shutdownDelayDurationElapsedFn())
req, err := http.NewRequest(http.MethodGet, test.requestURL, nil)
if err != nil {
@ -221,3 +197,11 @@ func TestWithRetryAfter(t *testing.T) {
})
}
}
func newChannel(closed bool) <-chan struct{} {
ch := make(chan struct{})
if closed {
close(ch)
}
return ch
}

View File

@ -518,8 +518,8 @@ func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer {
config.ShutdownSendRetryAfter = keepListening
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup)
if shouldRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRetryAfterFn != nil {
handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
}
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
return handler

View File

@ -63,31 +63,21 @@ type ServerRunOptions struct {
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
// rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool
// StartupSendRetryAfterUntilReady once set will reject incoming requests with
// a 429 status code and a 'Retry-After' response header until the apiserver
// hasn't fully initialized.
// This option ensures that the system stays consistent even when requests
// are received before the server has been initialized.
// In particular, it prevents child deletion in case of GC or/and orphaned
// content in case of the namespaces controller.
StartupSendRetryAfterUntilReady bool
}
func NewServerRunOptions() *ServerRunOptions {
defaults := server.NewConfig(serializer.CodecFactory{})
return &ServerRunOptions{
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
RequestTimeout: defaults.RequestTimeout,
LivezGracePeriod: defaults.LivezGracePeriod,
MinRequestTimeout: defaults.MinRequestTimeout,
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
EnablePriorityAndFairness: true,
ShutdownSendRetryAfter: false,
StartupSendRetryAfterUntilReady: false,
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
RequestTimeout: defaults.RequestTimeout,
LivezGracePeriod: defaults.LivezGracePeriod,
MinRequestTimeout: defaults.MinRequestTimeout,
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
EnablePriorityAndFairness: true,
ShutdownSendRetryAfter: false,
}
}
@ -107,7 +97,6 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
c.PublicAddress = s.AdvertiseAddress
c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter
c.StartupSendRetryAfterUntilReady = s.StartupSendRetryAfterUntilReady
return nil
}
@ -272,10 +261,5 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+
"in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.")
fs.BoolVar(&s.StartupSendRetryAfterUntilReady, "startup-send-retry-after-until-ready", s.ShutdownSendRetryAfter, ""+
"If true, incoming request(s) will be rejected with a '429' status code and a 'Retry-After' response header "+
"until the apiserver has initialized. This option ensures that the system stays consistent even when requests "+
"arrive at the server before it has been initialized.")
utilfeature.DefaultMutableFeatureGate.AddFlag(fs)
}