Fix watch rejections in P&F filter

Kubernetes-commit: 8054b0f808d116658ac086e4b71fb34d1502cd57
This commit is contained in:
wojtekt 2021-06-02 08:22:29 +02:00 committed by Kubernetes Publisher
parent d000916ae7
commit 9b71cff19c
2 changed files with 44 additions and 17 deletions

View File

@ -112,6 +112,9 @@ func WithPriorityAndFairness(
} }
} }
var resultCh chan interface{} var resultCh chan interface{}
if isWatchRequest {
resultCh = make(chan interface{})
}
execute := func() { execute := func() {
noteExecutingDelta(1) noteExecutingDelta(1)
defer noteExecutingDelta(-1) defer noteExecutingDelta(-1)
@ -129,7 +132,6 @@ func WithPriorityAndFairness(
setResponseHeaders(classification, w) setResponseHeaders(classification, w)
if isWatchRequest { if isWatchRequest {
resultCh = make(chan interface{})
go func() { go func() {
defer func() { defer func() {
err := recover() err := recover()
@ -179,7 +181,12 @@ func WithPriorityAndFairness(
} }
tooManyRequests(r, w) tooManyRequests(r, w)
} }
// In case of watch, from P&F POV it already finished, but we need to wait until the request itself finishes.
// For watch requests, from the APF point of view the request is already
// finished at this point. However, that doesn't mean it is already finished
// from the non-APF point of view. So we need to wait here until the request is:
// 1) finished being processed or
// 2) rejected
if isWatchRequest { if isWatchRequest {
err := <-resultCh err := <-resultCh
if err != nil { if err != nil {

View File

@ -112,7 +112,7 @@ func (t fakeApfFilter) Run(stopCh <-chan struct{}) error {
func (t fakeApfFilter) Install(c *mux.PathRecorderMux) { func (t fakeApfFilter) Install(c *mux.PathRecorderMux) {
} }
func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptest.Server { func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptest.Server {
onExecuteFunc := func() { onExecuteFunc := func() {
if decision == decisionCancelWait { if decision == decisionCancelWait {
t.Errorf("execute should not be invoked") t.Errorf("execute should not be invoked")
@ -134,24 +134,24 @@ func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptes
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
} }
} }
return newApfServerWithHooks(decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc)
} }
func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server { func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func()) *httptest.Server {
fakeFilter := fakeApfFilter{ fakeFilter := fakeApfFilter{
mockDecision: decision, mockDecision: decision,
postEnqueue: postEnqueue, postEnqueue: postEnqueue,
postDequeue: postDequeue, postDequeue: postDequeue,
} }
return newApfServerWithFilter(fakeFilter, onExecute, postExecute, t) return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute)
} }
func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) *httptest.Server { func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) *httptest.Server {
apfServer := httptest.NewServer(newApfHandlerWithFilter(flowControlFilter, onExecute, postExecute, t)) apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, onExecute, postExecute))
return apfServer return apfServer
} }
func newApfHandlerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) http.Handler { func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) http.Handler {
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
@ -176,7 +176,7 @@ func newApfHandlerWithFilter(flowControlFilter utilflowcontrol.Interface, onExec
func TestApfSkipLongRunningRequest(t *testing.T) { func TestApfSkipLongRunningRequest(t *testing.T) {
epmetrics.Register() epmetrics.Register()
server := newApfServerWithSingleRequest(decisionSkipFilter, t) server := newApfServerWithSingleRequest(t, decisionSkipFilter)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -193,7 +193,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) {
func TestApfRejectRequest(t *testing.T) { func TestApfRejectRequest(t *testing.T) {
epmetrics.Register() epmetrics.Register()
server := newApfServerWithSingleRequest(decisionReject, t) server := newApfServerWithSingleRequest(t, decisionReject)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -218,7 +218,7 @@ func TestApfExemptRequest(t *testing.T) {
// so that an observation will cause some data to go into the Prometheus metrics. // so that an observation will cause some data to go into the Prometheus metrics.
time.Sleep(time.Millisecond * 50) time.Sleep(time.Millisecond * 50)
server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t) server := newApfServerWithSingleRequest(t, decisionNoQueuingExecute)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -244,7 +244,7 @@ func TestApfExecuteRequest(t *testing.T) {
// so that an observation will cause some data to go into the Prometheus metrics. // so that an observation will cause some data to go into the Prometheus metrics.
time.Sleep(time.Millisecond * 50) time.Sleep(time.Millisecond * 50)
server := newApfServerWithSingleRequest(decisionQueuingExecute, t) server := newApfServerWithSingleRequest(t, decisionQueuingExecute)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -316,7 +316,7 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
finishExecute.Wait() finishExecute.Wait()
} }
server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) server := newApfServerWithHooks(t, decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -444,7 +444,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
postExecuteFunc := func() {} postExecuteFunc := func() {}
server := newApfServerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t) server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
defer server.Close() defer server.Close()
var wg sync.WaitGroup var wg sync.WaitGroup
@ -473,6 +473,24 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
wg.Wait() wg.Wait()
} }
func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) {
fakeFilter := &fakeWatchApfFilter{
capacity: 0,
}
onExecuteFunc := func() {
t.Errorf("Request unexepectedly executing")
}
postExecuteFunc := func() {}
server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
defer server.Close()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusTooManyRequests); err != nil {
t.Error(err)
}
}
func TestApfWatchPanic(t *testing.T) { func TestApfWatchPanic(t *testing.T) {
fakeFilter := &fakeWatchApfFilter{ fakeFilter := &fakeWatchApfFilter{
capacity: 1, capacity: 1,
@ -483,7 +501,7 @@ func TestApfWatchPanic(t *testing.T) {
} }
postExecuteFunc := func() {} postExecuteFunc := func() {}
apfHandler := newApfHandlerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t) apfHandler := newApfHandlerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
handler := func(w http.ResponseWriter, r *http.Request) { handler := func(w http.ResponseWriter, r *http.Request) {
defer func() { defer func() {
if err := recover(); err == nil { if err := recover(); err == nil {
@ -504,6 +522,8 @@ func TestApfWatchPanic(t *testing.T) {
// automatically even if the server doesn't cancel is explicitly. // automatically even if the server doesn't cancel is explicitly.
// This is required to ensure we won't be leaking goroutines that wait for context // This is required to ensure we won't be leaking goroutines that wait for context
// cancelling (e.g. in queueset::StartRequest method). // cancelling (e.g. in queueset::StartRequest method).
// Even though in production we are not using httptest.Server, this logic is shared
// across these two.
func TestContextClosesOnRequestProcessed(t *testing.T) { func TestContextClosesOnRequestProcessed(t *testing.T) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
@ -528,7 +548,7 @@ func TestContextClosesOnRequestProcessed(t *testing.T) {
func TestApfCancelWaitRequest(t *testing.T) { func TestApfCancelWaitRequest(t *testing.T) {
epmetrics.Register() epmetrics.Register()
server := newApfServerWithSingleRequest(decisionCancelWait, t) server := newApfServerWithSingleRequest(t, decisionCancelWait)
defer server.Close() defer server.Close()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {