add unit test to simulate an enqued request that times out
Kubernetes-commit: 1d691ddb44e8dfa54008977469201a811410f7e9
This commit is contained in:
parent
47ff55d998
commit
c4f555497d
|
@ -22,6 +22,7 @@ import (
|
|||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -48,8 +49,14 @@ import (
|
|||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
klog.InitFlags(nil)
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
type mockDecision int
|
||||
|
||||
const (
|
||||
|
@ -344,57 +351,25 @@ func TestApfCancelWaitRequest(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestPriorityAndFairnessWithPanicRecoverAndTimeoutFilter(t *testing.T) {
|
||||
func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
||||
fcmetrics.Register()
|
||||
|
||||
t.Run("priority level concurrency is set to 1, request handler panics, next request should not be rejected", func(t *testing.T) {
|
||||
const (
|
||||
requestTimeout = time.Minute
|
||||
requestTimeout = 1 * time.Minute
|
||||
userName = "alice"
|
||||
fsName = "test-fs"
|
||||
plName = "test-pl"
|
||||
serverConcurrency, plConcurrencyShares, plConcurrency = 1, 1, 1
|
||||
)
|
||||
|
||||
objects := newConfiguration(fsName, plName, userName, flowcontrol.LimitResponseTypeReject, plConcurrencyShares)
|
||||
clientset := newClientset(t, objects...)
|
||||
// this test does not rely on resync, so resync period is set to zero
|
||||
factory := informers.NewSharedInformerFactory(clientset, 0)
|
||||
controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta1(), serverConcurrency, requestTimeout/4)
|
||||
|
||||
stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{})
|
||||
factory.Start(stopCh)
|
||||
|
||||
// wait for the informer cache to sync.
|
||||
timeout, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||
defer cancel()
|
||||
cacheSyncDone := factory.WaitForCacheSync(timeout.Done())
|
||||
if names := unsyncedInformers(cacheSyncDone); len(names) > 0 {
|
||||
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
|
||||
}
|
||||
|
||||
var controllerErr error
|
||||
go func() {
|
||||
defer close(controllerCompletedCh)
|
||||
controllerErr = controller.Run(stopCh)
|
||||
}()
|
||||
|
||||
// make sure that apf controller syncs the priority level configuration object we are using in this test.
|
||||
// read the metrics and ensure the concurrency limit for our priority level is set to the expected value.
|
||||
pollErr := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
|
||||
if err := gaugeValueMatch("apiserver_flowcontrol_request_concurrency_limit", map[string]string{"priority_level": plName}, plConcurrency); err != nil {
|
||||
t.Logf("polling retry - error: %s", err)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if pollErr != nil {
|
||||
t.Fatalf("expected the apf controller to sync the priotity level configuration object: %s", "test-pl")
|
||||
}
|
||||
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
||||
stopCh := make(chan struct{})
|
||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
||||
|
||||
var executed bool
|
||||
// we will raise a panic for the first request.
|
||||
firstRequestPathPanic := "/request/panic"
|
||||
firstRequestPathPanic := "/request/panic-as-designed"
|
||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
executed = true
|
||||
expectMatchingAPFHeaders(t, w, fsName, plName)
|
||||
|
@ -411,32 +386,353 @@ func TestPriorityAndFairnessWithPanicRecoverAndTimeoutFilter(t *testing.T) {
|
|||
var err error
|
||||
_, err = requestGetter(firstRequestPathPanic)
|
||||
if !executed {
|
||||
t.Errorf("expected inner handler to be executed for request: %s", firstRequestPathPanic)
|
||||
t.Errorf("Expected inner handler to be executed for request: %q", firstRequestPathPanic)
|
||||
}
|
||||
expectResetStreamError(t, err)
|
||||
|
||||
executed = false
|
||||
// the second request should be served successfully.
|
||||
secondRequestPathShouldWork := "/request/should-work"
|
||||
secondRequestPathShouldWork := "/request/should-succeed-as-expected"
|
||||
response, err := requestGetter(secondRequestPathShouldWork)
|
||||
if !executed {
|
||||
t.Errorf("expected inner handler to be executed for request: %s", secondRequestPathShouldWork)
|
||||
t.Errorf("Expected inner handler to be executed for request: %s", secondRequestPathShouldWork)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("expected request: %s to succeed, but got error: %#v", secondRequestPathShouldWork, err)
|
||||
t.Fatalf("Expected request: %q to get a response, but got error: %#v", secondRequestPathShouldWork, err)
|
||||
}
|
||||
if response.StatusCode != http.StatusOK {
|
||||
t.Errorf("expected HTTP status code: %d for request: %s, but got: %#v", http.StatusOK, secondRequestPathShouldWork, response)
|
||||
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusOK, secondRequestPathShouldWork, response)
|
||||
}
|
||||
|
||||
close(stopCh)
|
||||
t.Log("waiting for the controller to shutdown")
|
||||
<-controllerCompletedCh
|
||||
t.Log("Waiting for the controller to shutdown")
|
||||
|
||||
controllerErr := <-controllerCompletedCh
|
||||
if controllerErr != nil {
|
||||
t.Errorf("expected a nil error from controller, but got: %#v", controllerErr)
|
||||
t.Errorf("Expected no error from the controller, but got: %#v", controllerErr)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("priority level concurrency is set to 1, request times out and inner handler hasn't written to the response yet", func(t *testing.T) {
|
||||
const (
|
||||
requestTimeout = 3 * time.Second
|
||||
userName = "alice"
|
||||
fsName = "test-fs"
|
||||
plName = "test-pl"
|
||||
serverConcurrency, plConcurrencyShares, plConcurrency = 1, 1, 1
|
||||
)
|
||||
|
||||
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
||||
stopCh := make(chan struct{})
|
||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
||||
|
||||
var executed bool
|
||||
rquestTimesOutPath := "/request/time-out-as-designed"
|
||||
reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
executed = true
|
||||
expectMatchingAPFHeaders(t, w, fsName, plName)
|
||||
|
||||
if r.URL.Path == rquestTimesOutPath {
|
||||
defer close(reqHandlerCompletedCh)
|
||||
|
||||
// this will force the request to time out.
|
||||
<-callerRoundTripDoneCh
|
||||
}
|
||||
})
|
||||
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
||||
|
||||
server, requestGetter := newHTTP2ServerWithClient(handler)
|
||||
defer server.Close()
|
||||
|
||||
var (
|
||||
response *http.Response
|
||||
err error
|
||||
)
|
||||
func() {
|
||||
defer close(callerRoundTripDoneCh)
|
||||
|
||||
t.Logf("Waiting for the request: %q to time out", rquestTimesOutPath)
|
||||
response, err = requestGetter(rquestTimesOutPath)
|
||||
}()
|
||||
|
||||
if !executed {
|
||||
t.Errorf("Expected inner handler to be executed for request: %q", rquestTimesOutPath)
|
||||
}
|
||||
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
||||
<-reqHandlerCompletedCh
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err)
|
||||
}
|
||||
if response.StatusCode != http.StatusGatewayTimeout {
|
||||
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, rquestTimesOutPath, response)
|
||||
}
|
||||
|
||||
close(stopCh)
|
||||
t.Log("Waiting for the controller to shutdown")
|
||||
|
||||
controllerErr := <-controllerCompletedCh
|
||||
if controllerErr != nil {
|
||||
t.Errorf("Expected no error from the controller, but got: %#v", controllerErr)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("priority level concurrency is set to 1, inner handler panics after the request times out", func(t *testing.T) {
|
||||
const (
|
||||
requestTimeout = 3 * time.Second
|
||||
userName = "alice"
|
||||
fsName = "test-fs"
|
||||
plName = "test-pl"
|
||||
serverConcurrency, plConcurrencyShares, plConcurrency = 1, 1, 1
|
||||
)
|
||||
|
||||
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
||||
stopCh := make(chan struct{})
|
||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
||||
|
||||
reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
||||
rquestTimesOutPath := "/request/time-out-as-designed"
|
||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
expectMatchingAPFHeaders(t, w, fsName, plName)
|
||||
|
||||
if r.URL.Path == rquestTimesOutPath {
|
||||
defer close(reqHandlerCompletedCh)
|
||||
<-callerRoundTripDoneCh
|
||||
|
||||
// we expect the timeout handler to have timed out this request by now and any attempt
|
||||
// to write to the response should return a http.ErrHandlerTimeout error.
|
||||
if _, err := w.Write([]byte("foo")); err != http.ErrHandlerTimeout {
|
||||
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
||||
}
|
||||
|
||||
panic(http.ErrAbortHandler)
|
||||
}
|
||||
})
|
||||
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
||||
|
||||
server, requestGetter := newHTTP2ServerWithClient(handler)
|
||||
defer server.Close()
|
||||
|
||||
var (
|
||||
response *http.Response
|
||||
err error
|
||||
)
|
||||
func() {
|
||||
defer close(callerRoundTripDoneCh)
|
||||
t.Logf("Waiting for the request: %q to time out", rquestTimesOutPath)
|
||||
response, err = requestGetter(rquestTimesOutPath)
|
||||
}()
|
||||
|
||||
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
||||
<-reqHandlerCompletedCh
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err)
|
||||
}
|
||||
if response.StatusCode != http.StatusGatewayTimeout {
|
||||
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, rquestTimesOutPath, response)
|
||||
}
|
||||
|
||||
close(stopCh)
|
||||
t.Log("Waiting for the controller to shutdown")
|
||||
|
||||
controllerErr := <-controllerCompletedCh
|
||||
if controllerErr != nil {
|
||||
t.Errorf("Expected no error from the controller, but got: %#v", controllerErr)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("priority level concurrency is set to 1, inner handler writes to the response before request times out", func(t *testing.T) {
|
||||
const (
|
||||
requestTimeout = 3 * time.Second
|
||||
userName = "alice"
|
||||
fsName = "test-fs"
|
||||
plName = "test-pl"
|
||||
serverConcurrency, plConcurrencyShares, plConcurrency = 1, 1, 1
|
||||
)
|
||||
|
||||
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
||||
stopCh := make(chan struct{})
|
||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
||||
|
||||
rquestTimesOutPath := "/request/time-out-as-designed"
|
||||
reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
expectMatchingAPFHeaders(t, w, fsName, plName)
|
||||
|
||||
if r.URL.Path == rquestTimesOutPath {
|
||||
defer close(reqHandlerCompletedCh)
|
||||
|
||||
// inner handler writes header and then let the request time out.
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
<-callerRoundTripDoneCh
|
||||
|
||||
// we expect the timeout handler to have timed out this request by now and any attempt
|
||||
// to write to the response should return a http.ErrHandlerTimeout error.
|
||||
if _, err := w.Write([]byte("foo")); err != http.ErrHandlerTimeout {
|
||||
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
||||
|
||||
server, requestGetter := newHTTP2ServerWithClient(handler)
|
||||
defer server.Close()
|
||||
|
||||
var err error
|
||||
func() {
|
||||
defer close(callerRoundTripDoneCh)
|
||||
t.Logf("Waiting for the request: %q to time out", rquestTimesOutPath)
|
||||
_, err = requestGetter(rquestTimesOutPath)
|
||||
}()
|
||||
|
||||
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
||||
<-reqHandlerCompletedCh
|
||||
|
||||
expectResetStreamError(t, err)
|
||||
|
||||
close(stopCh)
|
||||
t.Log("Waiting for the controller to shutdown")
|
||||
|
||||
controllerErr := <-controllerCompletedCh
|
||||
if controllerErr != nil {
|
||||
t.Errorf("Expected no error from the controller, but got: %#v", controllerErr)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("priority level concurrency is set to 1, queue length is 1, first request should time out and second (enqueued) request should time out as well", func(t *testing.T) {
|
||||
const (
|
||||
requestTimeout = 3 * time.Second
|
||||
userName = "alice"
|
||||
fsName = "test-fs"
|
||||
plName = "test-pl"
|
||||
serverConcurrency, plConcurrencyShares, plConcurrency, queueLength = 1, 1, 1, 1
|
||||
)
|
||||
|
||||
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, queueLength)
|
||||
stopCh := make(chan struct{})
|
||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
||||
|
||||
firstRequestTimesOutPath := "/request/first/time-out-as-designed"
|
||||
secondRequestEnqueuedPath := "/request/second/enqueued-as-designed"
|
||||
firstReqHandlerCompletedCh, firstReqInProgressCh := make(chan struct{}), make(chan struct{})
|
||||
firstReqRoundTripDoneCh, secondReqRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
expectMatchingAPFHeaders(t, w, fsName, plName)
|
||||
|
||||
if r.URL.Path == firstRequestTimesOutPath {
|
||||
defer close(firstReqHandlerCompletedCh)
|
||||
|
||||
close(firstReqInProgressCh)
|
||||
<-firstReqRoundTripDoneCh
|
||||
|
||||
// make sure we wait until the caller of the second request returns, this is to
|
||||
// ensure that second request never has a chance to be executed (to avoid flakes)
|
||||
<-secondReqRoundTripDoneCh
|
||||
|
||||
// we expect the timeout handler to have timed out this request by now and any attempt
|
||||
// to write to the response should return a http.ErrHandlerTimeout error.
|
||||
if _, err := w.Write([]byte("foo")); err != http.ErrHandlerTimeout {
|
||||
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if r.URL.Path == secondRequestEnqueuedPath {
|
||||
// we expect the concurrency to be set to 1 and so this request should never be executed.
|
||||
t.Fatalf("Expected request to be enqueued: %q", secondRequestEnqueuedPath)
|
||||
}
|
||||
})
|
||||
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
||||
|
||||
server, requestGetter := newHTTP2ServerWithClient(handler)
|
||||
defer server.Close()
|
||||
|
||||
var firstReqErr, secondReqErr error
|
||||
var resp1, resp2 *http.Response
|
||||
go func() {
|
||||
defer close(firstReqRoundTripDoneCh)
|
||||
t.Logf("Waiting for the request: %q to time out", firstRequestTimesOutPath)
|
||||
resp1, firstReqErr = requestGetter(firstRequestTimesOutPath)
|
||||
}()
|
||||
func() {
|
||||
defer close(secondReqRoundTripDoneCh)
|
||||
|
||||
// we must wait for the "first" request to start executing first
|
||||
<-firstReqInProgressCh
|
||||
resp2, secondReqErr = requestGetter(secondRequestEnqueuedPath)
|
||||
}()
|
||||
|
||||
<-firstReqRoundTripDoneCh
|
||||
|
||||
t.Logf("Waiting for the inner handler of the request: %q to complete", firstRequestTimesOutPath)
|
||||
<-firstReqHandlerCompletedCh
|
||||
|
||||
// first request is expected to time out.
|
||||
if firstReqErr != nil {
|
||||
t.Fatalf("Expected request: %q to get a response, but got error: %#v", firstRequestTimesOutPath, firstReqErr)
|
||||
}
|
||||
if resp1.StatusCode != http.StatusGatewayTimeout {
|
||||
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, firstRequestTimesOutPath, resp1)
|
||||
}
|
||||
|
||||
// second request is expected to either be rejected (ideal behavior) or time out (current approximation of the ideal behavior)
|
||||
if secondReqErr != nil {
|
||||
t.Fatalf("Expected request: %q to get a response, but got error: %#v", secondRequestEnqueuedPath, secondReqErr)
|
||||
}
|
||||
if !(resp2.StatusCode == http.StatusTooManyRequests || resp2.StatusCode == http.StatusGatewayTimeout) {
|
||||
t.Errorf("Expected HTTP status code: %d or %d for request: %q, but got: %#v", http.StatusTooManyRequests, http.StatusGatewayTimeout, secondRequestEnqueuedPath, resp2)
|
||||
}
|
||||
|
||||
close(stopCh)
|
||||
t.Log("Waiting for the controller to shutdown")
|
||||
|
||||
controllerErr := <-controllerCompletedCh
|
||||
if controllerErr != nil {
|
||||
t.Errorf("Expected no error from the controller, but got: %#v", controllerErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration []runtime.Object, serverConcurrency int,
|
||||
requestWaitLimit time.Duration, plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) {
|
||||
clientset := newClientset(t, apfConfiguration...)
|
||||
// this test does not rely on resync, so resync period is set to zero
|
||||
factory := informers.NewSharedInformerFactory(clientset, 0)
|
||||
controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta1(), serverConcurrency, requestWaitLimit)
|
||||
|
||||
factory.Start(stopCh)
|
||||
|
||||
// wait for the informer cache to sync.
|
||||
timeout, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||
defer cancel()
|
||||
cacheSyncDone := factory.WaitForCacheSync(timeout.Done())
|
||||
if names := unsyncedInformers(cacheSyncDone); len(names) > 0 {
|
||||
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
|
||||
}
|
||||
|
||||
controllerCompletedCh := make(chan error)
|
||||
var controllerErr error
|
||||
go func() {
|
||||
controllerErr = controller.Run(stopCh)
|
||||
controllerCompletedCh <- controllerErr
|
||||
}()
|
||||
|
||||
// make sure that apf controller syncs the priority level configuration object we are using in this test.
|
||||
// read the metrics and ensure the concurrency limit for our priority level is set to the expected value.
|
||||
pollErr := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
|
||||
if err := gaugeValueMatch("apiserver_flowcontrol_request_concurrency_limit", map[string]string{"priority_level": plName}, plConcurrency); err != nil {
|
||||
t.Logf("polling retry - error: %s", err)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if pollErr != nil {
|
||||
t.Fatalf("expected the apf controller to sync the priotity level configuration object: %s", plName)
|
||||
}
|
||||
|
||||
return controller, controllerCompletedCh
|
||||
}
|
||||
|
||||
// returns a started http2 server, with a client function to send request to the server.
|
||||
|
@ -510,6 +806,8 @@ func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol.
|
|||
})
|
||||
|
||||
handler = WithTimeoutForNonLongRunningRequests(handler, longRunningRequestCheck)
|
||||
// we don't have any request with invalid timeout, so leaving audit policy and sink nil.
|
||||
handler = apifilters.WithRequestDeadline(handler, nil, nil, longRunningRequestCheck, nil, requestTimeout)
|
||||
handler = apifilters.WithRequestInfo(handler, requestInfoFactory)
|
||||
handler = WithPanicRecovery(handler, requestInfoFactory)
|
||||
return handler
|
||||
|
@ -527,7 +825,7 @@ func unsyncedInformers(status map[reflect.Type]bool) []string {
|
|||
return names
|
||||
}
|
||||
|
||||
func newConfiguration(fsName, plName, user string, responseType flowcontrol.LimitResponseType, concurrency int32) []runtime.Object {
|
||||
func newConfiguration(fsName, plName, user string, concurrency int32, queueLength int32) []runtime.Object {
|
||||
fs := &flowcontrol.FlowSchema{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fsName,
|
||||
|
@ -562,6 +860,18 @@ func newConfiguration(fsName, plName, user string, responseType flowcontrol.Limi
|
|||
},
|
||||
}
|
||||
|
||||
var (
|
||||
responseType flowcontrol.LimitResponseType = flowcontrol.LimitResponseTypeReject
|
||||
qcfg *flowcontrol.QueuingConfiguration
|
||||
)
|
||||
if queueLength > 0 {
|
||||
responseType = flowcontrol.LimitResponseTypeQueue
|
||||
qcfg = &flowcontrol.QueuingConfiguration{
|
||||
Queues: 1,
|
||||
QueueLengthLimit: queueLength,
|
||||
HandSize: 1,
|
||||
}
|
||||
}
|
||||
pl := &flowcontrol.PriorityLevelConfiguration{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: plName,
|
||||
|
@ -572,7 +882,8 @@ func newConfiguration(fsName, plName, user string, responseType flowcontrol.Limi
|
|||
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
|
||||
AssuredConcurrencyShares: concurrency,
|
||||
LimitResponse: flowcontrol.LimitResponse{
|
||||
Type: responseType,
|
||||
Type: responseType,
|
||||
Queuing: qcfg,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue