apiserver: refactor WithRetryAfter server filter

Kubernetes-commit: 83889ae5940036d89b9822a1e38f0f939308e408
This commit is contained in:
Abu Kashem 2021-08-09 18:25:29 -04:00 committed by Kubernetes Publisher
parent 36693d51c0
commit 030819c510
4 changed files with 144 additions and 49 deletions

View File

@ -800,7 +800,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
shouldRetryAfterFn := genericfilters.NewShouldRespondWithRetryAfterFunc(c.ShutdownSendRetryAfter, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn)
}
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {

View File

@ -46,11 +46,11 @@ type retryAfterParams struct {
Message string
}
// shouldRespondWithRetryAfterFunc returns true if the requests should
// ShouldRespondWithRetryAfterFunc returns true if the requests should
// be rejected with a Retry-After response once certain conditions are met.
// The retryAfterParams returned contains instructions on how to
// construct the Retry-After response.
type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
type ShouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
// WithRetryAfter rejects any incoming new request(s) with a 429
// if the specified shutdownDelayDurationElapsedFn channel is closed
@ -62,25 +62,40 @@ type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
// - 'Connection: close': tear down the TCP connection
//
// TODO: is there a way to merge WithWaitGroup and this filter?
func WithRetryAfter(handler http.Handler, shutdownDelayDurationElapsedCh <-chan struct{}) http.Handler {
func WithRetryAfter(handler http.Handler, when ShouldRespondWithRetryAfterFunc) http.Handler {
// NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter,
// otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully.
return withRetryAfter(handler, isRequestExemptFromRetryAfter, when)
}
// NewShouldRespondWithRetryAfterFunc returns a ShouldRespondWithRetryAfterFunc
func NewShouldRespondWithRetryAfterFunc(shutdownSendRetryAfter bool, shutdownCh <-chan struct{}) ShouldRespondWithRetryAfterFunc {
if !shutdownSendRetryAfter {
return func() (*retryAfterParams, bool) {
return nil, false
}
}
return newShutdownRetryAfterFunc(shutdownCh)
}
func newShutdownRetryAfterFunc(shutdownCh <-chan struct{}) ShouldRespondWithRetryAfterFunc {
shutdownRetryAfterParams := &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}
// NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter,
// otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully.
return withRetryAfter(handler, isRequestExemptFromRetryAfter, func() (*retryAfterParams, bool) {
return func() (*retryAfterParams, bool) {
select {
case <-shutdownDelayDurationElapsedCh:
case <-shutdownCh:
return shutdownRetryAfterParams, true
default:
return nil, false
}
})
}
}
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn shouldRespondWithRetryAfterFunc) http.Handler {
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn ShouldRespondWithRetryAfterFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
params, send := shouldRespondWithRetryAfterFn()
if !send || isRequestExemptFn(req) {

View File

@ -17,8 +17,10 @@ limitations under the License.
package filters
import (
"github.com/google/go-cmp/cmp"
"net/http"
"net/http/httptest"
"reflect"
"testing"
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
@ -27,20 +29,20 @@ import (
func TestWithRetryAfter(t *testing.T) {
tests := []struct {
name string
shutdownDelayDurationElapsedFn func() <-chan struct{}
requestURL string
userAgent string
safeWaitGroupIsWaiting bool
handlerInvoked int
closeExpected string
retryAfterExpected string
statusCodeExpected int
name string
when ShouldRespondWithRetryAfterFunc
requestURL string
userAgent string
safeWaitGroupIsWaiting bool
handlerInvoked int
closeExpected string
retryAfterExpected string
statusCodeExpected int
}{
{
name: "retry-after disabled",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(false)
when: func() (*retryAfterParams, bool) {
return nil, false
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
@ -51,8 +53,11 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is not exempt",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
@ -61,10 +66,28 @@ func TestWithRetryAfter(t *testing.T) {
retryAfterExpected: "5",
statusCodeExpected: http.StatusTooManyRequests,
},
{
name: "retry-after enabled, request is not exempt, no connection tear down",
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
TearDownConnection: false,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
handlerInvoked: 0,
closeExpected: "",
retryAfterExpected: "5",
statusCodeExpected: http.StatusTooManyRequests,
},
{
name: "retry-after enabled, request is exempt(/metrics)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/metrics?foo=bar",
userAgent: "foo",
@ -75,8 +98,11 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/livez)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/livez?verbose",
userAgent: "foo",
@ -87,8 +113,11 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/readyz)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/readyz?verbose",
userAgent: "foo",
@ -99,8 +128,11 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/healthz)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/healthz?verbose",
userAgent: "foo",
@ -111,8 +143,11 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(local loopback)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/api/v1/namespaces",
userAgent: "kube-apiserver/",
@ -121,22 +156,13 @@ func TestWithRetryAfter(t *testing.T) {
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "nil channel",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return nil
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
handlerInvoked: 1,
closeExpected: "",
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*retryAfterParams, bool) {
return &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/readyz?verbose",
userAgent: "foo",
@ -165,7 +191,7 @@ func TestWithRetryAfter(t *testing.T) {
wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool {
return false
}, safeWG)
wrapped = WithRetryAfter(wrapped, test.shutdownDelayDurationElapsedFn())
wrapped = WithRetryAfter(wrapped, test.when)
req, err := http.NewRequest(http.MethodGet, test.requestURL, nil)
if err != nil {
@ -198,6 +224,58 @@ func TestWithRetryAfter(t *testing.T) {
}
}
func TestNewShouldRespondWithRetryAfterFunc(t *testing.T) {
tests := []struct {
name string
shutdownSendRetryAfter bool
shutdownCh <-chan struct{}
sendRetryAfterExpected bool
retryAfterParamsExpected *retryAfterParams
}{
{
name: "shutdown-send-retry-after is disabled",
shutdownSendRetryAfter: false,
shutdownCh: newChannel(true),
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "shutdown-send-retry-after is enabled, shutting down",
shutdownSendRetryAfter: true,
shutdownCh: newChannel(true),
sendRetryAfterExpected: true,
retryAfterParamsExpected: &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
},
},
{
name: "shutdown-send-retry-after is enabled, not shutting down",
shutdownSendRetryAfter: true,
shutdownCh: newChannel(false),
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fn := NewShouldRespondWithRetryAfterFunc(test.shutdownSendRetryAfter, test.shutdownCh)
if fn == nil {
t.Fatal("Expected a non nil ShouldRespondWithRetryAfterFunc")
}
retryAfterParamsGot, sendRetryAfterGot := fn()
if test.sendRetryAfterExpected != sendRetryAfterGot {
t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot)
}
if !reflect.DeepEqual(test.retryAfterParamsExpected, retryAfterParamsGot) {
t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, retryAfterParamsGot))
}
})
}
}
func newChannel(closed bool) <-chan struct{} {
ch := make(chan struct{})
if closed {

View File

@ -519,7 +519,8 @@ func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer {
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
shouldRetryAfterFn := genericfilters.NewShouldRespondWithRetryAfterFunc(c.ShutdownSendRetryAfter, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn)
}
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
return handler