567 lines
18 KiB
Go
567 lines
18 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package flowcontrol
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"reflect"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
flowcontrol "k8s.io/api/flowcontrol/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
|
|
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
|
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
|
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
|
"k8s.io/client-go/informers"
|
|
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
|
fcclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/utils/clock"
|
|
"k8s.io/utils/ptr"
|
|
)
|
|
|
|
// Some tests print a lot of debug logs which slows down tests considerably,
|
|
// causing them to even timeout.
|
|
var testDebugLogs = false
|
|
|
|
func TestMain(m *testing.M) {
|
|
klog.InitFlags(nil)
|
|
os.Exit(m.Run())
|
|
}
|
|
|
|
var mandPLs = func() map[string]*flowcontrol.PriorityLevelConfiguration {
|
|
ans := make(map[string]*flowcontrol.PriorityLevelConfiguration)
|
|
for _, mand := range fcboot.MandatoryPriorityLevelConfigurations {
|
|
ans[mand.Name] = mand
|
|
}
|
|
return ans
|
|
}()
|
|
|
|
// in general usage, the boolean returned may be inaccurate by the time the caller examines it.
|
|
func (cfgCtlr *configController) hasPriorityLevelState(plName string) bool {
|
|
cfgCtlr.lock.Lock()
|
|
defer cfgCtlr.lock.Unlock()
|
|
return cfgCtlr.priorityLevelStates[plName] != nil
|
|
}
|
|
|
|
type ctlrTestState struct {
|
|
t *testing.T
|
|
cfgCtlr *configController
|
|
fcIfc fcclient.FlowcontrolV1Interface
|
|
existingPLs map[string]*flowcontrol.PriorityLevelConfiguration
|
|
existingFSs map[string]*flowcontrol.FlowSchema
|
|
heldRequestsMap map[string][]heldRequest
|
|
requestWG sync.WaitGroup
|
|
lock sync.Mutex
|
|
queues map[string]*ctlrTestQueueSet
|
|
}
|
|
|
|
type heldRequest struct {
|
|
rd RequestDigest
|
|
finishCh chan struct{}
|
|
}
|
|
|
|
var _ fq.QueueSetFactory = (*ctlrTestState)(nil)
|
|
|
|
type ctlrTestQueueSetCompleter struct {
|
|
cts *ctlrTestState
|
|
cqs *ctlrTestQueueSet
|
|
qc fq.QueuingConfig
|
|
}
|
|
|
|
type ctlrTestQueueSet struct {
|
|
cts *ctlrTestState
|
|
qc fq.QueuingConfig
|
|
dc fq.DispatchingConfig
|
|
countActive int
|
|
}
|
|
|
|
type ctlrTestRequest struct {
|
|
cqs *ctlrTestQueueSet
|
|
qsName string
|
|
descr1, descr2 interface{}
|
|
}
|
|
|
|
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge, sdi metrics.Gauge) (fq.QueueSetCompleter, error) {
|
|
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
|
|
}
|
|
|
|
func (cqs *ctlrTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) {
|
|
return ctlrTestQueueSetCompleter{cqs.cts, cqs, qc}, nil
|
|
}
|
|
|
|
func (cqs *ctlrTestQueueSet) Dump(bool) debug.QueueSetDump {
|
|
return debug.QueueSetDump{}
|
|
}
|
|
|
|
func (cqc ctlrTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet {
|
|
cqc.cts.lock.Lock()
|
|
defer cqc.cts.lock.Unlock()
|
|
qs := cqc.cqs
|
|
if qs == nil {
|
|
qs = &ctlrTestQueueSet{cts: cqc.cts, qc: cqc.qc, dc: dc}
|
|
cqc.cts.queues[cqc.qc.Name] = qs
|
|
} else {
|
|
qs.qc, qs.dc = cqc.qc, dc
|
|
}
|
|
return qs
|
|
}
|
|
|
|
func (cqs *ctlrTestQueueSet) IsIdle() bool {
|
|
cqs.cts.lock.Lock()
|
|
defer cqs.cts.lock.Unlock()
|
|
klog.V(7).Infof("For %p QS %s, countActive==%d", cqs, cqs.qc.Name, cqs.countActive)
|
|
return cqs.countActive == 0
|
|
}
|
|
|
|
func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width *fcrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) {
|
|
cqs.cts.lock.Lock()
|
|
defer cqs.cts.lock.Unlock()
|
|
cqs.countActive++
|
|
if testDebugLogs {
|
|
cqs.cts.t.Logf("Queued %q %#+v %#+v for %p QS=%s, countActive:=%d", fsName, descr1, descr2, cqs, cqs.qc.Name, cqs.countActive)
|
|
}
|
|
return &ctlrTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false
|
|
}
|
|
|
|
func (ctr *ctlrTestRequest) Finish(execute func()) bool {
|
|
execute()
|
|
ctr.cqs.cts.lock.Lock()
|
|
defer ctr.cqs.cts.lock.Unlock()
|
|
ctr.cqs.countActive--
|
|
if testDebugLogs {
|
|
ctr.cqs.cts.t.Logf("Finished %#+v %#+v for %p QS=%s, countActive:=%d", ctr.descr1, ctr.descr2, ctr.cqs, ctr.cqs.qc.Name, ctr.cqs.countActive)
|
|
}
|
|
return ctr.cqs.countActive == 0
|
|
}
|
|
|
|
func (cts *ctlrTestState) getQueueSetNames() sets.String {
|
|
cts.lock.Lock()
|
|
defer cts.lock.Unlock()
|
|
return sets.StringKeySet(cts.queues)
|
|
}
|
|
|
|
func (cts *ctlrTestState) getNonIdleQueueSetNames() sets.String {
|
|
cts.lock.Lock()
|
|
defer cts.lock.Unlock()
|
|
ans := sets.NewString()
|
|
for name, qs := range cts.queues {
|
|
if qs.countActive > 0 {
|
|
ans.Insert(name)
|
|
}
|
|
}
|
|
return ans
|
|
}
|
|
|
|
func (cts *ctlrTestState) hasNonIdleQueueSet(name string) bool {
|
|
cts.lock.Lock()
|
|
defer cts.lock.Unlock()
|
|
qs := cts.queues[name]
|
|
return qs != nil && qs.countActive > 0
|
|
}
|
|
|
|
func (cts *ctlrTestState) addHeldRequest(plName string, rd RequestDigest, finishCh chan struct{}) {
|
|
cts.lock.Lock()
|
|
defer cts.lock.Unlock()
|
|
hrs := cts.heldRequestsMap[plName]
|
|
hrs = append(hrs, heldRequest{rd, finishCh})
|
|
cts.heldRequestsMap[plName] = hrs
|
|
if testDebugLogs {
|
|
cts.t.Logf("Holding %#+v for %s, count:=%d", rd, plName, len(hrs))
|
|
}
|
|
}
|
|
|
|
func (cts *ctlrTestState) popHeldRequest() (plName string, hr *heldRequest, nCount int) {
|
|
cts.lock.Lock()
|
|
defer cts.lock.Unlock()
|
|
var hrs []heldRequest
|
|
for {
|
|
for plName, hrs = range cts.heldRequestsMap {
|
|
goto GotOne
|
|
}
|
|
return "", nil, 0
|
|
GotOne:
|
|
if nhr := len(hrs); nhr > 0 {
|
|
hrv := hrs[nhr-1]
|
|
hrs = hrs[:nhr-1]
|
|
hr = &hrv
|
|
}
|
|
if len(hrs) == 0 {
|
|
delete(cts.heldRequestsMap, plName)
|
|
} else {
|
|
cts.heldRequestsMap[plName] = hrs
|
|
}
|
|
if hr != nil {
|
|
nCount = len(hrs)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
var mandQueueSetNames = func() sets.String {
|
|
mandQueueSetNames := sets.NewString()
|
|
for _, mpl := range fcboot.MandatoryPriorityLevelConfigurations {
|
|
mandQueueSetNames.Insert(mpl.Name)
|
|
}
|
|
return mandQueueSetNames
|
|
}()
|
|
|
|
func TestConfigConsumer(t *testing.T) {
|
|
rngOuter := rand.New(rand.NewSource(1234567890123456789))
|
|
for i := 1; i <= 10; i++ {
|
|
rng := rand.New(rand.NewSource(int64(rngOuter.Uint64())))
|
|
t.Run(fmt.Sprintf("trial%d:", i), func(t *testing.T) {
|
|
clientset := clientsetfake.NewSimpleClientset()
|
|
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
|
|
flowcontrolClient := clientset.FlowcontrolV1()
|
|
cts := &ctlrTestState{t: t,
|
|
fcIfc: flowcontrolClient,
|
|
existingFSs: map[string]*flowcontrol.FlowSchema{},
|
|
existingPLs: map[string]*flowcontrol.PriorityLevelConfiguration{},
|
|
heldRequestsMap: map[string][]heldRequest{},
|
|
queues: map[string]*ctlrTestQueueSet{},
|
|
}
|
|
ctlr := newTestableController(TestableConfig{
|
|
Name: "Controller",
|
|
Clock: clock.RealClock{},
|
|
AsFieldManager: ConfigConsumerAsFieldManager,
|
|
FoundToDangling: func(found bool) bool { return !found },
|
|
InformerFactory: informerFactory,
|
|
FlowcontrolClient: flowcontrolClient,
|
|
ServerConcurrencyLimit: 100, // server concurrency limit
|
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
|
QueueSetFactory: cts,
|
|
})
|
|
cts.cfgCtlr = ctlr
|
|
persistingPLNames := sets.NewString()
|
|
trialStep := fmt.Sprintf("trial%d-0", i)
|
|
_, _, desiredPLNames, newBadPLNames := genPLs(rng, trialStep, persistingPLNames, 0)
|
|
_, _, newFTRs, newCatchAlls := genFSs(t, rng, trialStep, desiredPLNames, newBadPLNames, 0)
|
|
for j := 0; ; {
|
|
if testDebugLogs {
|
|
t.Logf("For %s, desiredPLNames=%#+v", trialStep, desiredPLNames)
|
|
t.Logf("For %s, newFTRs=%#+v", trialStep, newFTRs)
|
|
}
|
|
// Check that the latest digestion did the right thing
|
|
nextPLNames := sets.NewString()
|
|
for oldPLName := range persistingPLNames {
|
|
if mandPLs[oldPLName] != nil || cts.hasNonIdleQueueSet(oldPLName) {
|
|
nextPLNames.Insert(oldPLName)
|
|
}
|
|
}
|
|
persistingPLNames = nextPLNames.Union(desiredPLNames)
|
|
expectedQueueSetNames := persistingPLNames.Union(mandQueueSetNames)
|
|
allQueueSetNames := cts.getQueueSetNames()
|
|
missingQueueSetNames := expectedQueueSetNames.Difference(allQueueSetNames)
|
|
if len(missingQueueSetNames) > 0 {
|
|
t.Errorf("Fail: missing QueueSets %v", missingQueueSetNames)
|
|
}
|
|
nonIdleQueueSetNames := cts.getNonIdleQueueSetNames()
|
|
extraQueueSetNames := nonIdleQueueSetNames.Difference(expectedQueueSetNames)
|
|
if len(extraQueueSetNames) > 0 {
|
|
t.Errorf("Fail: unexpected QueueSets %v", extraQueueSetNames)
|
|
}
|
|
for plName, hr, nCount := cts.popHeldRequest(); hr != nil; plName, hr, nCount = cts.popHeldRequest() {
|
|
desired := desiredPLNames.Has(plName) || mandPLs[plName] != nil
|
|
if testDebugLogs {
|
|
t.Logf("Releasing held request %#+v, desired=%v, plName=%s, count:=%d", hr.rd, desired, plName, nCount)
|
|
}
|
|
close(hr.finishCh)
|
|
}
|
|
cts.requestWG.Wait()
|
|
for _, ftr := range newFTRs {
|
|
checkNewFS(cts, rng, trialStep, ftr, newCatchAlls)
|
|
}
|
|
|
|
j++
|
|
if j > 20 {
|
|
break
|
|
}
|
|
|
|
// Calculate expected survivors
|
|
|
|
// Now create a new config and digest it
|
|
trialStep = fmt.Sprintf("trial%d-%d", i, j)
|
|
var newPLs []*flowcontrol.PriorityLevelConfiguration
|
|
var newFSs []*flowcontrol.FlowSchema
|
|
newPLs, _, desiredPLNames, newBadPLNames = genPLs(rng, trialStep, persistingPLNames, 1+rng.Intn(4))
|
|
newFSs, _, newFTRs, newCatchAlls = genFSs(t, rng, trialStep, desiredPLNames, newBadPLNames, 1+rng.Intn(6))
|
|
|
|
if testDebugLogs {
|
|
for _, newPL := range newPLs {
|
|
t.Logf("For %s, digesting newPL=%s", trialStep, fcfmt.Fmt(newPL))
|
|
}
|
|
for _, newFS := range newFSs {
|
|
t.Logf("For %s, digesting newFS=%s", trialStep, fcfmt.Fmt(newFS))
|
|
}
|
|
}
|
|
_ = ctlr.lockAndDigestConfigObjects(newPLs, newFSs)
|
|
}
|
|
for plName, hr, nCount := cts.popHeldRequest(); hr != nil; plName, hr, nCount = cts.popHeldRequest() {
|
|
if testDebugLogs {
|
|
desired := desiredPLNames.Has(plName) || mandPLs[plName] != nil
|
|
t.Logf("Releasing held request %#+v, desired=%v, plName=%s, count:=%d", hr.rd, desired, plName, nCount)
|
|
}
|
|
close(hr.finishCh)
|
|
}
|
|
cts.requestWG.Wait()
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestAPFControllerWithGracefulShutdown(t *testing.T) {
|
|
const plName = "test-ps"
|
|
fs := &flowcontrol.FlowSchema{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "test-fs",
|
|
},
|
|
Spec: flowcontrol.FlowSchemaSpec{
|
|
MatchingPrecedence: 100,
|
|
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
|
|
Name: plName,
|
|
},
|
|
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
|
|
Type: flowcontrol.FlowDistinguisherMethodByUserType,
|
|
},
|
|
},
|
|
}
|
|
|
|
pl := &flowcontrol.PriorityLevelConfiguration{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: plName,
|
|
},
|
|
Spec: flowcontrol.PriorityLevelConfigurationSpec{
|
|
Type: flowcontrol.PriorityLevelEnablementLimited,
|
|
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
|
|
NominalConcurrencyShares: ptr.To(int32(10)),
|
|
LimitResponse: flowcontrol.LimitResponse{
|
|
Type: flowcontrol.LimitResponseTypeReject,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
clientset := clientsetfake.NewSimpleClientset(fs, pl)
|
|
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second)
|
|
flowcontrolClient := clientset.FlowcontrolV1()
|
|
cts := &ctlrTestState{t: t,
|
|
fcIfc: flowcontrolClient,
|
|
existingFSs: map[string]*flowcontrol.FlowSchema{},
|
|
existingPLs: map[string]*flowcontrol.PriorityLevelConfiguration{},
|
|
heldRequestsMap: map[string][]heldRequest{},
|
|
queues: map[string]*ctlrTestQueueSet{},
|
|
}
|
|
controller := newTestableController(TestableConfig{
|
|
Name: "Controller",
|
|
Clock: clock.RealClock{},
|
|
AsFieldManager: ConfigConsumerAsFieldManager,
|
|
FoundToDangling: func(found bool) bool { return !found },
|
|
InformerFactory: informerFactory,
|
|
FlowcontrolClient: flowcontrolClient,
|
|
ServerConcurrencyLimit: 100,
|
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
|
QueueSetFactory: cts,
|
|
})
|
|
|
|
stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{})
|
|
var controllerErr error
|
|
|
|
informerFactory.Start(stopCh)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
status := informerFactory.WaitForCacheSync(ctx.Done())
|
|
if names := unsynced(status); len(names) > 0 {
|
|
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
|
|
}
|
|
|
|
go func() {
|
|
defer close(controllerCompletedCh)
|
|
controllerErr = controller.Run(stopCh)
|
|
}()
|
|
|
|
// ensure that the controller has run its first loop.
|
|
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
|
return controller.hasPriorityLevelState(plName), nil
|
|
})
|
|
if err != nil {
|
|
t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plName, err)
|
|
}
|
|
|
|
close(stopCh)
|
|
t.Log("waiting for the controller Run function to shutdown gracefully")
|
|
<-controllerCompletedCh
|
|
|
|
if controllerErr != nil {
|
|
t.Errorf("expected nil error from controller Run function, but got: %#v", controllerErr)
|
|
}
|
|
}
|
|
|
|
func unsynced(status map[reflect.Type]bool) []string {
|
|
names := make([]string, 0)
|
|
|
|
for objType, synced := range status {
|
|
if !synced {
|
|
names = append(names, objType.Name())
|
|
}
|
|
}
|
|
|
|
return names
|
|
}
|
|
|
|
func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*flowcontrol.FlowSchema) {
|
|
t := cts.t
|
|
ctlr := cts.cfgCtlr
|
|
fs := ftr.fs
|
|
expectedPLName := fs.Spec.PriorityLevelConfiguration.Name
|
|
ctx := context.Background()
|
|
// Use this to make sure all these requests have started executing
|
|
// before the next reconfiguration
|
|
var startWG sync.WaitGroup
|
|
for matches, digests1 := range ftr.digests {
|
|
for isResource, digests2 := range digests1 {
|
|
for _, rd := range digests2 {
|
|
finishCh := make(chan struct{})
|
|
rdu := uniqify(rd)
|
|
cts.requestWG.Add(1)
|
|
startWG.Add(1)
|
|
go func(matches, isResource bool, rdu RequestDigest) {
|
|
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
|
|
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) {
|
|
matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt
|
|
if testDebugLogs {
|
|
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
|
|
}
|
|
if a := matchFS.Name == fs.Name; expectedMatch != a {
|
|
t.Errorf("Fail at %s/%s: rd=%#+v, expectedMatch=%v, actualMatch=%v, matchFSName=%q, catchAlls=%#+v", trialName, fs.Name, rdu, expectedMatch, a, matchFS.Name, catchAlls)
|
|
}
|
|
if matchFS.Name == fs.Name {
|
|
if fs.Spec.PriorityLevelConfiguration.Name != matchPL.Name {
|
|
t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name)
|
|
}
|
|
}
|
|
}, func() fcrequest.WorkEstimate {
|
|
return fcrequest.WorkEstimate{InitialSeats: 1}
|
|
}, func(inQueue bool) {
|
|
}, func() {
|
|
startWG.Done()
|
|
_ = <-finishCh
|
|
})
|
|
cts.requestWG.Done()
|
|
}(matches, isResource, rdu)
|
|
if rng.Float32() < 0.8 {
|
|
if testDebugLogs {
|
|
t.Logf("Immediate request %#+v, plName=%s", rdu, expectedPLName)
|
|
}
|
|
close(finishCh)
|
|
} else {
|
|
cts.addHeldRequest(expectedPLName, rdu, finishCh)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
startWG.Wait()
|
|
}
|
|
|
|
func genPLs(rng *rand.Rand, trial string, oldPLNames sets.String, n int) (pls []*flowcontrol.PriorityLevelConfiguration, plMap map[string]*flowcontrol.PriorityLevelConfiguration, goodNames, badNames sets.String) {
|
|
pls = make([]*flowcontrol.PriorityLevelConfiguration, 0, n)
|
|
plMap = make(map[string]*flowcontrol.PriorityLevelConfiguration, n)
|
|
goodNames = sets.NewString()
|
|
badNames = sets.NewString(trial+"-nopl1", trial+"-nopl2")
|
|
addGood := func(pl *flowcontrol.PriorityLevelConfiguration) {
|
|
pls = append(pls, pl)
|
|
plMap[pl.Name] = pl
|
|
goodNames.Insert(pl.Name)
|
|
}
|
|
for i := 1; i <= n; i++ {
|
|
pl := genPL(rng, fmt.Sprintf("%s-pl%d", trial, i))
|
|
addGood(pl)
|
|
}
|
|
for oldPLName := range oldPLNames {
|
|
if _, has := mandPLs[oldPLName]; has {
|
|
continue
|
|
}
|
|
if rng.Float32() < 0.67 {
|
|
pl := genPL(rng, oldPLName)
|
|
addGood(pl)
|
|
}
|
|
}
|
|
for _, pl := range mandPLs {
|
|
if n > 0 && rng.Float32() < 0.5 && !(goodNames.Has(pl.Name) || badNames.Has(pl.Name)) {
|
|
addGood(pl)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func genFSs(t *testing.T, rng *rand.Rand, trial string, goodPLNames, badPLNames sets.String, n int) (newFSs []*flowcontrol.FlowSchema, newFSMap map[string]*flowcontrol.FlowSchema, newFTRs map[string]*fsTestingRecord, catchAlls map[bool]*flowcontrol.FlowSchema) {
|
|
newFTRs = map[string]*fsTestingRecord{}
|
|
catchAlls = map[bool]*flowcontrol.FlowSchema{
|
|
false: fcboot.MandatoryFlowSchemaCatchAll,
|
|
true: fcboot.MandatoryFlowSchemaCatchAll}
|
|
newFSMap = map[string]*flowcontrol.FlowSchema{}
|
|
add := func(ftr *fsTestingRecord) {
|
|
newFSs = append(newFSs, ftr.fs)
|
|
newFSMap[ftr.fs.Name] = ftr.fs
|
|
newFTRs[ftr.fs.Name] = ftr
|
|
if ftr.wellFormed {
|
|
if ftr.matchesAllNonResourceRequests && fsPrecedes(ftr.fs, catchAlls[false]) {
|
|
catchAlls[false] = ftr.fs
|
|
}
|
|
if ftr.matchesAllResourceRequests && fsPrecedes(ftr.fs, catchAlls[true]) {
|
|
catchAlls[true] = ftr.fs
|
|
}
|
|
}
|
|
if testDebugLogs {
|
|
t.Logf("For trial %s, adding wf=%v FlowSchema %s", trial, ftr.wellFormed, fcfmt.Fmt(ftr.fs))
|
|
}
|
|
}
|
|
if n == 0 || rng.Float32() < 0.5 {
|
|
add(mandFTRCatchAll)
|
|
}
|
|
for i := 1; i <= n; i++ {
|
|
ftr := genFS(t, rng, fmt.Sprintf("%s-fs%d", trial, i), false, goodPLNames, badPLNames)
|
|
add(ftr)
|
|
}
|
|
if n == 0 || rng.Float32() < 0.5 {
|
|
add(mandFTRExempt)
|
|
}
|
|
return
|
|
}
|
|
|
|
func fsPrecedes(a, b *flowcontrol.FlowSchema) bool {
|
|
if a.Spec.MatchingPrecedence < b.Spec.MatchingPrecedence {
|
|
return true
|
|
}
|
|
if a.Spec.MatchingPrecedence == b.Spec.MatchingPrecedence {
|
|
return a.Name < b.Name
|
|
}
|
|
return false
|
|
}
|