Add multi request test
Kubernetes-commit: 833ce487b9fab1650d5aaba2a8b295f8a90e07bd
This commit is contained in:
parent
bb16b18666
commit
7498f28769
|
@ -22,6 +22,7 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -39,8 +40,10 @@ import (
|
|||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
)
|
||||
|
||||
type mockDecision int
|
||||
|
||||
const (
|
||||
decisionNoQueuingExecute = iota
|
||||
decisionNoQueuingExecute mockDecision = iota
|
||||
decisionQueuingExecute
|
||||
decisionCancelWait
|
||||
decisionReject
|
||||
|
@ -48,7 +51,7 @@ const (
|
|||
)
|
||||
|
||||
type fakeApfFilter struct {
|
||||
mockDecision int
|
||||
mockDecision mockDecision
|
||||
postEnqueue func()
|
||||
postDequeue func()
|
||||
}
|
||||
|
@ -92,29 +95,41 @@ func (t fakeApfFilter) Run(stopCh <-chan struct{}) error {
|
|||
func (t fakeApfFilter) Install(c *mux.PathRecorderMux) {
|
||||
}
|
||||
|
||||
func newApfServer(decision int, t *testing.T) *httptest.Server {
|
||||
func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptest.Server {
|
||||
onExecuteFunc := func() {
|
||||
if decision == decisionCancelWait {
|
||||
t.Errorf("execute should not be invoked")
|
||||
}
|
||||
// atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time.
|
||||
if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 {
|
||||
t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting)
|
||||
}
|
||||
}
|
||||
postExecuteFunc := func() {}
|
||||
// atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time.
|
||||
postEnqueueFunc := func() {
|
||||
if atomicReadOnlyWaiting != 1 {
|
||||
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
|
||||
}
|
||||
}
|
||||
postDequeueFunc := func() {
|
||||
if atomicReadOnlyWaiting != 0 {
|
||||
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
|
||||
}
|
||||
}
|
||||
return newApfServerWithHooks(decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t)
|
||||
}
|
||||
|
||||
func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server {
|
||||
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
||||
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
||||
|
||||
apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if decision == decisionCancelWait {
|
||||
t.Errorf("execute should not be invoked")
|
||||
}
|
||||
if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 {
|
||||
t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting)
|
||||
}
|
||||
onExecute()
|
||||
}), longRunningRequestCheck, fakeApfFilter{
|
||||
mockDecision: decision,
|
||||
postEnqueue: func() {
|
||||
if atomicReadOnlyWaiting != 1 {
|
||||
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
|
||||
}
|
||||
},
|
||||
postDequeue: func() {
|
||||
if atomicReadOnlyWaiting != 0 {
|
||||
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
|
||||
}
|
||||
},
|
||||
postEnqueue: postEnqueue,
|
||||
postDequeue: postDequeue,
|
||||
})
|
||||
|
||||
handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -122,6 +137,7 @@ func newApfServer(decision int, t *testing.T) *httptest.Server {
|
|||
Groups: []string{user.AllUnauthenticated},
|
||||
}))
|
||||
apfHandler.ServeHTTP(w, r)
|
||||
postExecute()
|
||||
if atomicReadOnlyExecuting != 0 {
|
||||
t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting)
|
||||
}
|
||||
|
@ -134,9 +150,10 @@ func newApfServer(decision int, t *testing.T) *httptest.Server {
|
|||
func TestApfSkipLongRunningRequest(t *testing.T) {
|
||||
epmetrics.Register()
|
||||
|
||||
server := newApfServer(decisionSkipFilter, t)
|
||||
server := newApfServerWithSingleRequest(decisionSkipFilter, t)
|
||||
defer server.Close()
|
||||
|
||||
// send a watch request to test skipping long running request
|
||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil {
|
||||
// request should not be rejected
|
||||
t.Error(err)
|
||||
|
@ -146,7 +163,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) {
|
|||
func TestApfRejectRequest(t *testing.T) {
|
||||
epmetrics.Register()
|
||||
|
||||
server := newApfServer(decisionReject, t)
|
||||
server := newApfServerWithSingleRequest(decisionReject, t)
|
||||
defer server.Close()
|
||||
|
||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {
|
||||
|
@ -163,10 +180,11 @@ func TestApfExemptRequest(t *testing.T) {
|
|||
epmetrics.Register()
|
||||
fcmetrics.Register()
|
||||
|
||||
// wait the first sampleAndWaterMark metrics to be collected
|
||||
// Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator,
|
||||
// so that an observation will cause some data to go into the Prometheus metrics.
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
|
||||
server := newApfServer(decisionNoQueuingExecute, t)
|
||||
server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t)
|
||||
defer server.Close()
|
||||
|
||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
|
||||
|
@ -184,10 +202,11 @@ func TestApfExecuteRequest(t *testing.T) {
|
|||
epmetrics.Register()
|
||||
fcmetrics.Register()
|
||||
|
||||
// wait the first sampleAndWaterMark metrics to be collected
|
||||
// Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator,
|
||||
// so that an observation will cause some data to go into the Prometheus metrics.
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
|
||||
server := newApfServer(decisionQueuingExecute, t)
|
||||
server := newApfServerWithSingleRequest(decisionQueuingExecute, t)
|
||||
defer server.Close()
|
||||
|
||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
|
||||
|
@ -202,10 +221,81 @@ func TestApfExecuteRequest(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestApfExecuteMultipleRequests(t *testing.T) {
|
||||
epmetrics.Register()
|
||||
fcmetrics.Register()
|
||||
|
||||
// Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator,
|
||||
// so that an observation will cause some data to go into the Prometheus metrics.
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
|
||||
concurrentRequests := 5
|
||||
var preStartExecute, postStartExecute, preEnqueue, postEnqueue, preDequeue, postDequeue, finishExecute sync.WaitGroup
|
||||
for _, wg := range []*sync.WaitGroup{&preStartExecute, &postStartExecute, &preEnqueue, &postEnqueue, &preDequeue, &postDequeue, &finishExecute} {
|
||||
wg.Add(concurrentRequests)
|
||||
}
|
||||
|
||||
onExecuteFunc := func() {
|
||||
preStartExecute.Done()
|
||||
preStartExecute.Wait()
|
||||
if int(atomicReadOnlyExecuting) != concurrentRequests {
|
||||
t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, atomicReadOnlyExecuting)
|
||||
}
|
||||
postStartExecute.Done()
|
||||
postStartExecute.Wait()
|
||||
}
|
||||
|
||||
postEnqueueFunc := func() {
|
||||
preEnqueue.Done()
|
||||
preEnqueue.Wait()
|
||||
if int(atomicReadOnlyWaiting) != concurrentRequests {
|
||||
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
|
||||
|
||||
}
|
||||
postEnqueue.Done()
|
||||
postEnqueue.Wait()
|
||||
}
|
||||
|
||||
postDequeueFunc := func() {
|
||||
preDequeue.Done()
|
||||
preDequeue.Wait()
|
||||
if atomicReadOnlyWaiting != 0 {
|
||||
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
|
||||
}
|
||||
postDequeue.Done()
|
||||
postDequeue.Wait()
|
||||
}
|
||||
|
||||
postExecuteFunc := func() {
|
||||
finishExecute.Done()
|
||||
finishExecute.Wait()
|
||||
}
|
||||
|
||||
server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t)
|
||||
defer server.Close()
|
||||
|
||||
for i := 0; i < concurrentRequests; i++ {
|
||||
var err error
|
||||
go func() {
|
||||
err = expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK)
|
||||
}()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
checkForExpectedMetricsWithRetry(t, []string{
|
||||
"apiserver_current_inflight_requests",
|
||||
"apiserver_current_inqueue_requests",
|
||||
"apiserver_flowcontrol_read_vs_write_request_count_watermarks",
|
||||
"apiserver_flowcontrol_read_vs_write_request_count_samples",
|
||||
})
|
||||
}
|
||||
|
||||
func TestApfCancelWaitRequest(t *testing.T) {
|
||||
epmetrics.Register()
|
||||
|
||||
server := newApfServer(decisionCancelWait, t)
|
||||
server := newApfServerWithSingleRequest(decisionCancelWait, t)
|
||||
defer server.Close()
|
||||
|
||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {
|
||||
|
@ -232,7 +322,6 @@ func checkForExpectedMetricsWithRetry(t *testing.T, expectedMetrics []string) {
|
|||
|
||||
metrics := map[string]interface{}{}
|
||||
for _, mf := range metricsFamily {
|
||||
mf := mf
|
||||
metrics[*mf.Name] = mf
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue