send retry-after until ready

Kubernetes-commit: 6e3923d0a4f4720d2d9f628eb9c073d2d3ee291a
This commit is contained in:
Abu Kashem 2021-08-10 12:03:21 -04:00 committed by Kubernetes Publisher
parent 030819c510
commit f3ae70d0cf
6 changed files with 253 additions and 125 deletions

View File

@ -240,6 +240,15 @@ type Config struct {
// rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool
// StartupSendRetryAfterUntilReady once set will reject incoming requests with
// a 429 status code and a 'Retry-After' response header until the apiserver
// hasn't fully initialized.
// This option ensures that the system stays consistent even when requests
// are received before the server has been initialized.
// In particular, it prevents child deletion in case of GC or/and orphaned
// content in case of the namespaces controller.
StartupSendRetryAfterUntilReady bool
//===========================================================================
// values below here are targets for removal
//===========================================================================
@ -473,6 +482,46 @@ func (c *Config) AddPostStartHookOrDie(name string, hook PostStartHookFunc) {
}
}
// shouldAddWithRetryAfterFilter returns an appropriate ShouldRespondWithRetryAfterFunc
// if the apiserver should respond with a Retry-After response header based on option
// 'shutdown-send-retry-after' or 'startup-send-retry-after-until-ready'.
func (c *Config) shouldAddWithRetryAfterFilter() genericfilters.ShouldRespondWithRetryAfterFunc {
if !(c.ShutdownSendRetryAfter || c.StartupSendRetryAfterUntilReady) {
return nil
}
// follow lifecycle, avoiding go routines per request
const (
startup int32 = iota
running
terminating
)
state := startup
go func() {
<-c.lifecycleSignals.HasBeenReady.Signaled()
atomic.StoreInt32(&state, running)
<-c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()
atomic.StoreInt32(&state, terminating)
}()
return func() (*genericfilters.RetryAfterParams, bool) {
state := atomic.LoadInt32(&state)
switch {
case c.StartupSendRetryAfterUntilReady && state == startup:
return &genericfilters.RetryAfterParams{
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
}, true
case c.ShutdownSendRetryAfter && state == terminating:
return &genericfilters.RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
default:
return nil, false
}
}
}
// Complete fills in any fields not set that are required to have valid data and can be derived
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
@ -799,9 +848,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
if c.ShutdownSendRetryAfter {
shouldRetryAfterFn := genericfilters.NewShouldRespondWithRetryAfterFunc(c.ShutdownSendRetryAfter, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn)
if shouldRespondWithRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRespondWithRetryAfterFn != nil {
handler = genericfilters.WithRetryAfter(handler, shouldRespondWithRetryAfterFn)
}
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
@ -345,6 +346,159 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) {
}
}
func TestShouldRespondWithRetryAfterFunc(t *testing.T) {
tests := []struct {
name string
config *Config
addWithRetryAfterFilterExpected bool
sendRetryAfterExpected bool
retryAfterParamsExpected *genericfilters.RetryAfterParams
}{
{
name: "both shutdown-send-retry-after and startup-send-retry-after-until-ready are not enabled",
config: &Config{
StartupSendRetryAfterUntilReady: false,
ShutdownSendRetryAfter: false,
},
addWithRetryAfterFilterExpected: false,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "shutdown-send-retry-after is enabled, the apserver is shutting down",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: false,
ShutdownSendRetryAfter: true,
}
c.lifecycleSignals.HasBeenReady.Signal()
c.lifecycleSignals.AfterShutdownDelayDuration.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
},
},
{
name: "shutdown-send-retry-after is enabled, the apserver is not in shutdown mode",
config: &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: false,
ShutdownSendRetryAfter: true,
},
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "startup-send-retry-after-until-ready is enabled, the apserver is not ready yet",
config: &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: false,
},
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: false,
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
},
},
{
name: "startup-send-retry-after-until-ready is enabled, the apserver is ready",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: false,
}
c.lifecycleSignals.HasBeenReady.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is not ready",
config: &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: true,
},
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: false,
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
},
},
{
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is ready",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: true,
}
c.lifecycleSignals.HasBeenReady.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is shutting down",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: true,
}
c.lifecycleSignals.HasBeenReady.Signal()
c.lifecycleSignals.AfterShutdownDelayDuration.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
shouldRespondWithRetryAfterFn := test.config.shouldAddWithRetryAfterFilter()
// we need to sleep some time to allow the goroutine launched by
// shouldAddWithRetryAfterFilter to finish
time.Sleep(100 * time.Millisecond)
if test.addWithRetryAfterFilterExpected != (shouldRespondWithRetryAfterFn != nil) {
t.Errorf("Expected add WithRetryAfter: %t, but got: %t", test.addWithRetryAfterFilterExpected, shouldRespondWithRetryAfterFn != nil)
}
if !test.addWithRetryAfterFilterExpected {
return
}
paramsGot, sendRetryAfterGot := shouldRespondWithRetryAfterFn()
if test.sendRetryAfterExpected != sendRetryAfterGot {
t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot)
}
if !reflect.DeepEqual(test.retryAfterParamsExpected, paramsGot) {
t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, paramsGot))
}
})
}
}
type testBackend struct {
events []*auditinternal.Event

View File

@ -36,8 +36,8 @@ var (
// with a Retry-After response, otherwise it returns false.
type isRequestExemptFunc func(*http.Request) bool
// retryAfterParams dictates how the Retry-After response is constructed
type retryAfterParams struct {
// RetryAfterParams dictates how the Retry-After response is constructed
type RetryAfterParams struct {
// TearDownConnection is true when we should send a 'Connection: close'
// header in the response so net/http can tear down the TCP connection.
TearDownConnection bool
@ -48,9 +48,9 @@ type retryAfterParams struct {
// ShouldRespondWithRetryAfterFunc returns true if the requests should
// be rejected with a Retry-After response once certain conditions are met.
// The retryAfterParams returned contains instructions on how to
// The RetryAfterParams returned contains instructions on how to
// construct the Retry-After response.
type ShouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
type ShouldRespondWithRetryAfterFunc func() (*RetryAfterParams, bool)
// WithRetryAfter rejects any incoming new request(s) with a 429
// if the specified shutdownDelayDurationElapsedFn channel is closed
@ -68,33 +68,6 @@ func WithRetryAfter(handler http.Handler, when ShouldRespondWithRetryAfterFunc)
return withRetryAfter(handler, isRequestExemptFromRetryAfter, when)
}
// NewShouldRespondWithRetryAfterFunc returns a ShouldRespondWithRetryAfterFunc
func NewShouldRespondWithRetryAfterFunc(shutdownSendRetryAfter bool, shutdownCh <-chan struct{}) ShouldRespondWithRetryAfterFunc {
if !shutdownSendRetryAfter {
return func() (*retryAfterParams, bool) {
return nil, false
}
}
return newShutdownRetryAfterFunc(shutdownCh)
}
func newShutdownRetryAfterFunc(shutdownCh <-chan struct{}) ShouldRespondWithRetryAfterFunc {
shutdownRetryAfterParams := &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}
return func() (*retryAfterParams, bool) {
select {
case <-shutdownCh:
return shutdownRetryAfterParams, true
default:
return nil, false
}
}
}
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn ShouldRespondWithRetryAfterFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
params, send := shouldRespondWithRetryAfterFn()

View File

@ -17,10 +17,8 @@ limitations under the License.
package filters
import (
"github.com/google/go-cmp/cmp"
"net/http"
"net/http/httptest"
"reflect"
"testing"
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
@ -41,7 +39,7 @@ func TestWithRetryAfter(t *testing.T) {
}{
{
name: "retry-after disabled",
when: func() (*retryAfterParams, bool) {
when: func() (*RetryAfterParams, bool) {
return nil, false
},
requestURL: "/api/v1/namespaces",
@ -53,8 +51,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is not exempt",
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
@ -68,8 +66,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is not exempt, no connection tear down",
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: false,
Message: "The apiserver is shutting down, please try again later.",
}, true
@ -83,8 +81,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/metrics)",
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
@ -98,8 +96,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/livez)",
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
@ -113,8 +111,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/readyz)",
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
@ -128,8 +126,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/healthz)",
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
@ -143,8 +141,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(local loopback)",
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
@ -158,8 +156,8 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode",
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
@ -223,63 +221,3 @@ func TestWithRetryAfter(t *testing.T) {
})
}
}
func TestNewShouldRespondWithRetryAfterFunc(t *testing.T) {
tests := []struct {
name string
shutdownSendRetryAfter bool
shutdownCh <-chan struct{}
sendRetryAfterExpected bool
retryAfterParamsExpected *retryAfterParams
}{
{
name: "shutdown-send-retry-after is disabled",
shutdownSendRetryAfter: false,
shutdownCh: newChannel(true),
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "shutdown-send-retry-after is enabled, shutting down",
shutdownSendRetryAfter: true,
shutdownCh: newChannel(true),
sendRetryAfterExpected: true,
retryAfterParamsExpected: &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
},
},
{
name: "shutdown-send-retry-after is enabled, not shutting down",
shutdownSendRetryAfter: true,
shutdownCh: newChannel(false),
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fn := NewShouldRespondWithRetryAfterFunc(test.shutdownSendRetryAfter, test.shutdownCh)
if fn == nil {
t.Fatal("Expected a non nil ShouldRespondWithRetryAfterFunc")
}
retryAfterParamsGot, sendRetryAfterGot := fn()
if test.sendRetryAfterExpected != sendRetryAfterGot {
t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot)
}
if !reflect.DeepEqual(test.retryAfterParamsExpected, retryAfterParamsGot) {
t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, retryAfterParamsGot))
}
})
}
}
func newChannel(closed bool) <-chan struct{} {
ch := make(chan struct{})
if closed {
close(ch)
}
return ch
}

View File

@ -518,8 +518,7 @@ func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer {
config.ShutdownSendRetryAfter = keepListening
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup)
if c.ShutdownSendRetryAfter {
shouldRetryAfterFn := genericfilters.NewShouldRespondWithRetryAfterFunc(c.ShutdownSendRetryAfter, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
if shouldRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRetryAfterFn != nil {
handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn)
}
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)

View File

@ -63,21 +63,31 @@ type ServerRunOptions struct {
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
// rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool
// StartupSendRetryAfterUntilReady once set will reject incoming requests with
// a 429 status code and a 'Retry-After' response header until the apiserver
// hasn't fully initialized.
// This option ensures that the system stays consistent even when requests
// are received before the server has been initialized.
// In particular, it prevents child deletion in case of GC or/and orphaned
// content in case of the namespaces controller.
StartupSendRetryAfterUntilReady bool
}
func NewServerRunOptions() *ServerRunOptions {
defaults := server.NewConfig(serializer.CodecFactory{})
return &ServerRunOptions{
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
RequestTimeout: defaults.RequestTimeout,
LivezGracePeriod: defaults.LivezGracePeriod,
MinRequestTimeout: defaults.MinRequestTimeout,
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
EnablePriorityAndFairness: true,
ShutdownSendRetryAfter: false,
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
RequestTimeout: defaults.RequestTimeout,
LivezGracePeriod: defaults.LivezGracePeriod,
MinRequestTimeout: defaults.MinRequestTimeout,
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
EnablePriorityAndFairness: true,
ShutdownSendRetryAfter: false,
StartupSendRetryAfterUntilReady: false,
}
}
@ -97,6 +107,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
c.PublicAddress = s.AdvertiseAddress
c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter
c.StartupSendRetryAfterUntilReady = s.StartupSendRetryAfterUntilReady
return nil
}
@ -261,5 +272,10 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+
"in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.")
fs.BoolVar(&s.StartupSendRetryAfterUntilReady, "startup-send-retry-after-until-ready", s.ShutdownSendRetryAfter, ""+
"If true, incoming request(s) will be rejected with a '429' status code and a 'Retry-After' response header "+
"until the apiserver has initialized. This option ensures that the system stays consistent even when requests "+
"arrive at the server before it has been initialized.")
utilfeature.DefaultMutableFeatureGate.AddFlag(fs)
}