252 lines
9.1 KiB
Go
252 lines
9.1 KiB
Go
/*
|
|
Copyright 2022 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package flowcontrol
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
flowcontrol "k8s.io/api/flowcontrol/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apiserver/pkg/authentication/user"
|
|
"k8s.io/apiserver/pkg/endpoints/request"
|
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
|
|
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
|
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
|
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
|
"k8s.io/client-go/informers"
|
|
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
|
"k8s.io/utils/ptr"
|
|
)
|
|
|
|
type borrowingTestConstraints struct {
|
|
lendable, borrowing int32
|
|
}
|
|
|
|
// TestBorrowing tests borrowing of concurrency between priority levels.
|
|
// It runs two scenarios, one where the borrowing hits the limit on
|
|
// lendable concurrency and one where the borrowing hits the limit on
|
|
// borrowing of concurrency.
|
|
// Both scenarios are the same except for the limits.
|
|
// The test defines two priority levels, identified as "flows" 0 and 1.
|
|
// Both priority levels have a nominal concurrency limit of 12.
|
|
// The test maintains 24 concurrent clients for priority level 0
|
|
// and 6 for level 1,
|
|
// using an exec func that simply sleeps for 250 ms, for
|
|
// 25 seconds. The first 10 seconds of behavior are ignored, allowing
|
|
// the borrowing to start at any point during that time. The test
|
|
// continues for another 15 seconds, and checks that the delivered
|
|
// concurrency is about 16 for flow 0 and 6 for flow 1.
|
|
func TestBorrowing(t *testing.T) {
|
|
clientsPerFlow := [2]int{24, 6}
|
|
metrics.Register()
|
|
for _, testCase := range []struct {
|
|
name string
|
|
constraints []borrowingTestConstraints
|
|
}{
|
|
{
|
|
name: "lendable-limited",
|
|
constraints: []borrowingTestConstraints{
|
|
{lendable: 50, borrowing: 67},
|
|
{lendable: 33, borrowing: 50},
|
|
}},
|
|
{
|
|
name: "borrowing-limited",
|
|
constraints: []borrowingTestConstraints{
|
|
{lendable: 50, borrowing: 33},
|
|
{lendable: 67, borrowing: 50},
|
|
}},
|
|
} {
|
|
t.Run(testCase.name, func(t *testing.T) {
|
|
fsObjs := make([]*flowcontrol.FlowSchema, 2)
|
|
plcObjs := make([]*flowcontrol.PriorityLevelConfiguration, 2)
|
|
usernames := make([]string, 2)
|
|
cfgObjs := []runtime.Object{}
|
|
for flow := 0; flow < 2; flow++ {
|
|
usernames[flow] = fmt.Sprintf("test-user%d", flow)
|
|
plName := fmt.Sprintf("test-pl%d", flow)
|
|
fsObjs[flow] = &flowcontrol.FlowSchema{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("test-fs%d", flow),
|
|
},
|
|
Spec: flowcontrol.FlowSchemaSpec{
|
|
MatchingPrecedence: 100,
|
|
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
|
|
Name: plName,
|
|
},
|
|
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
|
|
Type: flowcontrol.FlowDistinguisherMethodByUserType,
|
|
},
|
|
Rules: []flowcontrol.PolicyRulesWithSubjects{{
|
|
Subjects: []flowcontrol.Subject{{
|
|
Kind: flowcontrol.SubjectKindUser,
|
|
User: &flowcontrol.UserSubject{Name: usernames[flow]},
|
|
}},
|
|
NonResourceRules: []flowcontrol.NonResourcePolicyRule{{
|
|
Verbs: []string{"*"},
|
|
NonResourceURLs: []string{"*"},
|
|
}},
|
|
}},
|
|
},
|
|
}
|
|
plcObjs[flow] = &flowcontrol.PriorityLevelConfiguration{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: plName,
|
|
},
|
|
Spec: flowcontrol.PriorityLevelConfigurationSpec{
|
|
Type: flowcontrol.PriorityLevelEnablementLimited,
|
|
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
|
|
NominalConcurrencyShares: ptr.To(int32(100)),
|
|
LendablePercent: &testCase.constraints[flow].lendable,
|
|
BorrowingLimitPercent: &testCase.constraints[flow].borrowing,
|
|
LimitResponse: flowcontrol.LimitResponse{
|
|
Type: flowcontrol.LimitResponseTypeQueue,
|
|
Queuing: &flowcontrol.QueuingConfiguration{
|
|
Queues: 10,
|
|
HandSize: 2,
|
|
QueueLengthLimit: 10,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
cfgObjs = append(cfgObjs, fsObjs[flow], plcObjs[flow])
|
|
}
|
|
clientset := clientsetfake.NewSimpleClientset(cfgObjs...)
|
|
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second)
|
|
flowcontrolClient := clientset.FlowcontrolV1()
|
|
clk := eventclock.Real{}
|
|
controller := newTestableController(TestableConfig{
|
|
Name: "Controller",
|
|
Clock: clk,
|
|
AsFieldManager: ConfigConsumerAsFieldManager,
|
|
FoundToDangling: func(found bool) bool { return !found },
|
|
InformerFactory: informerFactory,
|
|
FlowcontrolClient: flowcontrolClient,
|
|
ServerConcurrencyLimit: 24,
|
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
|
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
|
})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
|
|
stopCh := ctx.Done()
|
|
controllerCompletionCh := make(chan error)
|
|
|
|
informerFactory.Start(stopCh)
|
|
|
|
status := informerFactory.WaitForCacheSync(ctx.Done())
|
|
if names := unsynced(status); len(names) > 0 {
|
|
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
|
|
}
|
|
|
|
go func() {
|
|
controllerCompletionCh <- controller.Run(stopCh)
|
|
}()
|
|
|
|
// ensure that the controller has run its first loop.
|
|
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
|
return controller.hasPriorityLevelState(plcObjs[0].Name), nil
|
|
})
|
|
if err != nil {
|
|
t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plcObjs[0].Name, err)
|
|
}
|
|
|
|
concIntegrators := make([]fq.Integrator, 2)
|
|
reqInfo := &request.RequestInfo{
|
|
IsResourceRequest: false,
|
|
Path: "/foobar",
|
|
Verb: "GET",
|
|
}
|
|
noteFn := func(fs *flowcontrol.FlowSchema, plc *flowcontrol.PriorityLevelConfiguration, fd string) {}
|
|
workEstr := func() fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} }
|
|
qnf := fq.QueueNoteFn(func(bool) {})
|
|
var startWG sync.WaitGroup
|
|
startWG.Add(clientsPerFlow[0] + clientsPerFlow[1])
|
|
// Launch 20 client threads for each flow
|
|
for flow := 0; flow < 2; flow++ {
|
|
username := usernames[flow]
|
|
flowUser := testUser{name: username}
|
|
rd := RequestDigest{
|
|
RequestInfo: reqInfo,
|
|
User: flowUser,
|
|
}
|
|
concIntegrator := fq.NewNamedIntegrator(clk, username)
|
|
concIntegrators[flow] = concIntegrator
|
|
exec := func() {
|
|
concIntegrator.Inc()
|
|
clk.Sleep(250 * time.Millisecond)
|
|
concIntegrator.Dec()
|
|
}
|
|
nThreads := clientsPerFlow[flow]
|
|
for thread := 0; thread < nThreads; thread++ {
|
|
go func() {
|
|
startWG.Done()
|
|
wait.Until(func() { controller.Handle(ctx, rd, noteFn, workEstr, qnf, exec) }, 0, ctx.Done())
|
|
}()
|
|
}
|
|
}
|
|
startWG.Wait()
|
|
// Make sure the controller has had time to sense the load and adjust
|
|
clk.Sleep(10 * time.Second)
|
|
// Start the stats that matter from now
|
|
for _, ci := range concIntegrators {
|
|
ci.Reset()
|
|
}
|
|
// Run for 15 seconds
|
|
clk.Sleep(15 * time.Second)
|
|
// Collect the delivered concurrency stats
|
|
results0 := concIntegrators[0].Reset()
|
|
results1 := concIntegrators[1].Reset()
|
|
// shut down all the async stuff
|
|
cancel()
|
|
|
|
// Do the checking
|
|
|
|
t.Log("waiting for the controller Run function to shutdown gracefully")
|
|
controllerErr := <-controllerCompletionCh
|
|
close(controllerCompletionCh)
|
|
if controllerErr != nil {
|
|
t.Errorf("expected nil error from controller Run function, but got: %#v", controllerErr)
|
|
}
|
|
if results0.Average < 15.5 || results0.Average > 16.1 {
|
|
t.Errorf("Flow 0 got average concurrency of %v but expected about 16", results0.Average)
|
|
} else {
|
|
t.Logf("Flow 0 got average concurrency of %v and expected about 16", results0.Average)
|
|
}
|
|
if results1.Average < 5.5 || results1.Average > 6.1 {
|
|
t.Errorf("Flow 1 got average concurrency of %v but expected about 6", results1.Average)
|
|
} else {
|
|
t.Logf("Flow 1 got average concurrency of %v and expected about 6", results1.Average)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
type testUser struct{ name string }
|
|
|
|
func (tu testUser) GetName() string { return tu.name }
|
|
func (tu testUser) GetUID() string { return tu.name }
|
|
func (tu testUser) GetGroups() []string { return []string{user.AllAuthenticated} }
|
|
func (tu testUser) GetExtra() map[string][]string { return map[string][]string{} }
|