/* Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package flowcontrol import ( "context" "fmt" "math/rand" "os" "reflect" "sync" "testing" "time" flowcontrol "k8s.io/api/flowcontrol/v1beta2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" fcclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2" "k8s.io/klog/v2" "k8s.io/utils/clock" ) // Some tests print a lot of debug logs which slows down tests considerably, // causing them to even timeout. var testDebugLogs = false func TestMain(m *testing.M) { klog.InitFlags(nil) os.Exit(m.Run()) } var mandPLs = func() map[string]*flowcontrol.PriorityLevelConfiguration { ans := make(map[string]*flowcontrol.PriorityLevelConfiguration) for _, mand := range fcboot.MandatoryPriorityLevelConfigurations { ans[mand.Name] = mand } return ans }() // in general usage, the boolean returned may be inaccurate by the time the caller examines it. func (cfgCtlr *configController) hasPriorityLevelState(plName string) bool { cfgCtlr.lock.Lock() defer cfgCtlr.lock.Unlock() return cfgCtlr.priorityLevelStates[plName] != nil } type ctlrTestState struct { t *testing.T cfgCtlr *configController fcIfc fcclient.FlowcontrolV1beta2Interface existingPLs map[string]*flowcontrol.PriorityLevelConfiguration existingFSs map[string]*flowcontrol.FlowSchema heldRequestsMap map[string][]heldRequest requestWG sync.WaitGroup lock sync.Mutex queues map[string]*ctlrTestQueueSet } type heldRequest struct { rd RequestDigest finishCh chan struct{} } var _ fq.QueueSetFactory = (*ctlrTestState)(nil) type ctlrTestQueueSetCompleter struct { cts *ctlrTestState cqs *ctlrTestQueueSet qc fq.QueuingConfig } type ctlrTestQueueSet struct { cts *ctlrTestState qc fq.QueuingConfig dc fq.DispatchingConfig countActive int } type ctlrTestRequest struct { cqs *ctlrTestQueueSet qsName string descr1, descr2 interface{} } func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedChangeObserverPair, eso metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { return ctlrTestQueueSetCompleter{cts, nil, qc}, nil } func (cqs *ctlrTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) { return ctlrTestQueueSetCompleter{cqs.cts, cqs, qc}, nil } func (cqs *ctlrTestQueueSet) UpdateObservations() { } func (cqs *ctlrTestQueueSet) Dump(bool) debug.QueueSetDump { return debug.QueueSetDump{} } func (cqc ctlrTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet { cqc.cts.lock.Lock() defer cqc.cts.lock.Unlock() qs := cqc.cqs if qs == nil { qs = &ctlrTestQueueSet{cts: cqc.cts, qc: cqc.qc, dc: dc} cqc.cts.queues[cqc.qc.Name] = qs } else { qs.qc, qs.dc = cqc.qc, dc } return qs } func (cqs *ctlrTestQueueSet) IsIdle() bool { cqs.cts.lock.Lock() defer cqs.cts.lock.Unlock() klog.V(7).Infof("For %p QS %s, countActive==%d", cqs, cqs.qc.Name, cqs.countActive) return cqs.countActive == 0 } func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width *fcrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) { cqs.cts.lock.Lock() defer cqs.cts.lock.Unlock() cqs.countActive++ if testDebugLogs { cqs.cts.t.Logf("Queued %q %#+v %#+v for %p QS=%s, countActive:=%d", fsName, descr1, descr2, cqs, cqs.qc.Name, cqs.countActive) } return &ctlrTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false } func (ctr *ctlrTestRequest) Finish(execute func()) bool { execute() ctr.cqs.cts.lock.Lock() defer ctr.cqs.cts.lock.Unlock() ctr.cqs.countActive-- if testDebugLogs { ctr.cqs.cts.t.Logf("Finished %#+v %#+v for %p QS=%s, countActive:=%d", ctr.descr1, ctr.descr2, ctr.cqs, ctr.cqs.qc.Name, ctr.cqs.countActive) } return ctr.cqs.countActive == 0 } func (cts *ctlrTestState) getQueueSetNames() sets.String { cts.lock.Lock() defer cts.lock.Unlock() return sets.StringKeySet(cts.queues) } func (cts *ctlrTestState) getNonIdleQueueSetNames() sets.String { cts.lock.Lock() defer cts.lock.Unlock() ans := sets.NewString() for name, qs := range cts.queues { if qs.countActive > 0 { ans.Insert(name) } } return ans } func (cts *ctlrTestState) hasNonIdleQueueSet(name string) bool { cts.lock.Lock() defer cts.lock.Unlock() qs := cts.queues[name] return qs != nil && qs.countActive > 0 } func (cts *ctlrTestState) addHeldRequest(plName string, rd RequestDigest, finishCh chan struct{}) { cts.lock.Lock() defer cts.lock.Unlock() hrs := cts.heldRequestsMap[plName] hrs = append(hrs, heldRequest{rd, finishCh}) cts.heldRequestsMap[plName] = hrs if testDebugLogs { cts.t.Logf("Holding %#+v for %s, count:=%d", rd, plName, len(hrs)) } } func (cts *ctlrTestState) popHeldRequest() (plName string, hr *heldRequest, nCount int) { cts.lock.Lock() defer cts.lock.Unlock() var hrs []heldRequest for { for plName, hrs = range cts.heldRequestsMap { goto GotOne } return "", nil, 0 GotOne: if nhr := len(hrs); nhr > 0 { hrv := hrs[nhr-1] hrs = hrs[:nhr-1] hr = &hrv } if len(hrs) == 0 { delete(cts.heldRequestsMap, plName) } else { cts.heldRequestsMap[plName] = hrs } if hr != nil { nCount = len(hrs) return } } } var mandQueueSetNames, exclQueueSetNames = func() (sets.String, sets.String) { mandQueueSetNames := sets.NewString() exclQueueSetNames := sets.NewString() for _, mpl := range fcboot.MandatoryPriorityLevelConfigurations { if mpl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { exclQueueSetNames.Insert(mpl.Name) } else { mandQueueSetNames.Insert(mpl.Name) } } return mandQueueSetNames, exclQueueSetNames }() func TestConfigConsumer(t *testing.T) { rngOuter := rand.New(rand.NewSource(1234567890123456789)) for i := 1; i <= 10; i++ { rng := rand.New(rand.NewSource(int64(rngOuter.Uint64()))) t.Run(fmt.Sprintf("trial%d:", i), func(t *testing.T) { clientset := clientsetfake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(clientset, 0) flowcontrolClient := clientset.FlowcontrolV1beta2() cts := &ctlrTestState{t: t, fcIfc: flowcontrolClient, existingFSs: map[string]*flowcontrol.FlowSchema{}, existingPLs: map[string]*flowcontrol.PriorityLevelConfiguration{}, heldRequestsMap: map[string][]heldRequest{}, queues: map[string]*ctlrTestQueueSet{}, } ctlr := newTestableController(TestableConfig{ Name: "Controller", Clock: clock.RealClock{}, AsFieldManager: ConfigConsumerAsFieldManager, FoundToDangling: func(found bool) bool { return !found }, InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 100, // server concurrency limit RequestWaitLimit: time.Minute, // request wait limit ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, QueueSetFactory: cts, }) cts.cfgCtlr = ctlr persistingPLNames := sets.NewString() trialStep := fmt.Sprintf("trial%d-0", i) _, _, desiredPLNames, newBadPLNames := genPLs(rng, trialStep, persistingPLNames, 0) _, _, newFTRs, newCatchAlls := genFSs(t, rng, trialStep, desiredPLNames, newBadPLNames, 0) for j := 0; ; { if testDebugLogs { t.Logf("For %s, desiredPLNames=%#+v", trialStep, desiredPLNames) t.Logf("For %s, newFTRs=%#+v", trialStep, newFTRs) } // Check that the latest digestion did the right thing nextPLNames := sets.NewString() for oldPLName := range persistingPLNames { if mandPLs[oldPLName] != nil || cts.hasNonIdleQueueSet(oldPLName) { nextPLNames.Insert(oldPLName) } } persistingPLNames = nextPLNames.Union(desiredPLNames) expectedQueueSetNames := persistingPLNames.Union(mandQueueSetNames).Difference(exclQueueSetNames) allQueueSetNames := cts.getQueueSetNames() missingQueueSetNames := expectedQueueSetNames.Difference(allQueueSetNames) if len(missingQueueSetNames) > 0 { t.Errorf("Fail: missing QueueSets %v", missingQueueSetNames) } nonIdleQueueSetNames := cts.getNonIdleQueueSetNames() extraQueueSetNames := nonIdleQueueSetNames.Difference(expectedQueueSetNames) if len(extraQueueSetNames) > 0 { t.Errorf("Fail: unexpected QueueSets %v", extraQueueSetNames) } for plName, hr, nCount := cts.popHeldRequest(); hr != nil; plName, hr, nCount = cts.popHeldRequest() { desired := desiredPLNames.Has(plName) || mandPLs[plName] != nil if testDebugLogs { t.Logf("Releasing held request %#+v, desired=%v, plName=%s, count:=%d", hr.rd, desired, plName, nCount) } close(hr.finishCh) } cts.requestWG.Wait() for _, ftr := range newFTRs { checkNewFS(cts, rng, trialStep, ftr, newCatchAlls) } j++ if j > 20 { break } // Calculate expected survivors // Now create a new config and digest it trialStep = fmt.Sprintf("trial%d-%d", i, j) var newPLs []*flowcontrol.PriorityLevelConfiguration var newFSs []*flowcontrol.FlowSchema newPLs, _, desiredPLNames, newBadPLNames = genPLs(rng, trialStep, persistingPLNames, 1+rng.Intn(4)) newFSs, _, newFTRs, newCatchAlls = genFSs(t, rng, trialStep, desiredPLNames, newBadPLNames, 1+rng.Intn(6)) if testDebugLogs { for _, newPL := range newPLs { t.Logf("For %s, digesting newPL=%s", trialStep, fcfmt.Fmt(newPL)) } for _, newFS := range newFSs { t.Logf("For %s, digesting newFS=%s", trialStep, fcfmt.Fmt(newFS)) } } _ = ctlr.lockAndDigestConfigObjects(newPLs, newFSs) } for plName, hr, nCount := cts.popHeldRequest(); hr != nil; plName, hr, nCount = cts.popHeldRequest() { if testDebugLogs { desired := desiredPLNames.Has(plName) || mandPLs[plName] != nil t.Logf("Releasing held request %#+v, desired=%v, plName=%s, count:=%d", hr.rd, desired, plName, nCount) } close(hr.finishCh) } cts.requestWG.Wait() }) } } func TestAPFControllerWithGracefulShutdown(t *testing.T) { const plName = "test-ps" fs := &flowcontrol.FlowSchema{ ObjectMeta: metav1.ObjectMeta{ Name: "test-fs", }, Spec: flowcontrol.FlowSchemaSpec{ MatchingPrecedence: 100, PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{ Name: plName, }, DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{ Type: flowcontrol.FlowDistinguisherMethodByUserType, }, }, } pl := &flowcontrol.PriorityLevelConfiguration{ ObjectMeta: metav1.ObjectMeta{ Name: plName, }, Spec: flowcontrol.PriorityLevelConfigurationSpec{ Type: flowcontrol.PriorityLevelEnablementLimited, Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ AssuredConcurrencyShares: 10, LimitResponse: flowcontrol.LimitResponse{ Type: flowcontrol.LimitResponseTypeReject, }, }, }, } clientset := clientsetfake.NewSimpleClientset(fs, pl) informerFactory := informers.NewSharedInformerFactory(clientset, time.Second) flowcontrolClient := clientset.FlowcontrolV1beta2() cts := &ctlrTestState{t: t, fcIfc: flowcontrolClient, existingFSs: map[string]*flowcontrol.FlowSchema{}, existingPLs: map[string]*flowcontrol.PriorityLevelConfiguration{}, heldRequestsMap: map[string][]heldRequest{}, queues: map[string]*ctlrTestQueueSet{}, } controller := newTestableController(TestableConfig{ Name: "Controller", Clock: clock.RealClock{}, AsFieldManager: ConfigConsumerAsFieldManager, FoundToDangling: func(found bool) bool { return !found }, InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 100, RequestWaitLimit: time.Minute, ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, QueueSetFactory: cts, }) stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{}) var controllerErr error informerFactory.Start(stopCh) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() status := informerFactory.WaitForCacheSync(ctx.Done()) if names := unsynced(status); len(names) > 0 { t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names) } go func() { defer close(controllerCompletedCh) controllerErr = controller.Run(stopCh) }() // ensure that the controller has run its first loop. err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { return controller.hasPriorityLevelState(plName), nil }) if err != nil { t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plName, err) } close(stopCh) t.Log("waiting for the controller Run function to shutdown gracefully") <-controllerCompletedCh if controllerErr != nil { t.Errorf("expected nil error from controller Run function, but got: %#v", controllerErr) } } func unsynced(status map[reflect.Type]bool) []string { names := make([]string, 0) for objType, synced := range status { if !synced { names = append(names, objType.Name()) } } return names } func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*flowcontrol.FlowSchema) { t := cts.t ctlr := cts.cfgCtlr fs := ftr.fs expectedPLName := fs.Spec.PriorityLevelConfiguration.Name ctx := context.Background() // Use this to make sure all these requests have started executing // before the next reconfiguration var startWG sync.WaitGroup for matches, digests1 := range ftr.digests { for isResource, digests2 := range digests1 { for _, rd := range digests2 { finishCh := make(chan struct{}) rdu := uniqify(rd) cts.requestWG.Add(1) startWG.Add(1) go func(matches, isResource bool, rdu RequestDigest) { expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name) ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) { matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt if testDebugLogs { t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt) } if a := matchFS.Name == fs.Name; expectedMatch != a { t.Errorf("Fail at %s/%s: rd=%#+v, expectedMatch=%v, actualMatch=%v, matchFSName=%q, catchAlls=%#+v", trialName, fs.Name, rdu, expectedMatch, a, matchFS.Name, catchAlls) } if matchFS.Name == fs.Name { if fs.Spec.PriorityLevelConfiguration.Name != matchPL.Name { t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name) } } }, func() fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} }, func(inQueue bool) { }, func() { startWG.Done() _ = <-finishCh }) cts.requestWG.Done() }(matches, isResource, rdu) if rng.Float32() < 0.8 { if testDebugLogs { t.Logf("Immediate request %#+v, plName=%s", rdu, expectedPLName) } close(finishCh) } else { cts.addHeldRequest(expectedPLName, rdu, finishCh) } } } } startWG.Wait() } func genPLs(rng *rand.Rand, trial string, oldPLNames sets.String, n int) (pls []*flowcontrol.PriorityLevelConfiguration, plMap map[string]*flowcontrol.PriorityLevelConfiguration, goodNames, badNames sets.String) { pls = make([]*flowcontrol.PriorityLevelConfiguration, 0, n) plMap = make(map[string]*flowcontrol.PriorityLevelConfiguration, n) goodNames = sets.NewString() badNames = sets.NewString(trial+"-nopl1", trial+"-nopl2") addGood := func(pl *flowcontrol.PriorityLevelConfiguration) { pls = append(pls, pl) plMap[pl.Name] = pl goodNames.Insert(pl.Name) } for i := 1; i <= n; i++ { pl := genPL(rng, fmt.Sprintf("%s-pl%d", trial, i)) addGood(pl) } for oldPLName := range oldPLNames { if _, has := mandPLs[oldPLName]; has { continue } if rng.Float32() < 0.67 { pl := genPL(rng, oldPLName) addGood(pl) } } for _, pl := range mandPLs { if n > 0 && rng.Float32() < 0.5 && !(goodNames.Has(pl.Name) || badNames.Has(pl.Name)) { addGood(pl) } } return } func genFSs(t *testing.T, rng *rand.Rand, trial string, goodPLNames, badPLNames sets.String, n int) (newFSs []*flowcontrol.FlowSchema, newFSMap map[string]*flowcontrol.FlowSchema, newFTRs map[string]*fsTestingRecord, catchAlls map[bool]*flowcontrol.FlowSchema) { newFTRs = map[string]*fsTestingRecord{} catchAlls = map[bool]*flowcontrol.FlowSchema{ false: fcboot.MandatoryFlowSchemaCatchAll, true: fcboot.MandatoryFlowSchemaCatchAll} newFSMap = map[string]*flowcontrol.FlowSchema{} add := func(ftr *fsTestingRecord) { newFSs = append(newFSs, ftr.fs) newFSMap[ftr.fs.Name] = ftr.fs newFTRs[ftr.fs.Name] = ftr if ftr.wellFormed { if ftr.matchesAllNonResourceRequests && fsPrecedes(ftr.fs, catchAlls[false]) { catchAlls[false] = ftr.fs } if ftr.matchesAllResourceRequests && fsPrecedes(ftr.fs, catchAlls[true]) { catchAlls[true] = ftr.fs } } if testDebugLogs { t.Logf("For trial %s, adding wf=%v FlowSchema %s", trial, ftr.wellFormed, fcfmt.Fmt(ftr.fs)) } } if n == 0 || rng.Float32() < 0.5 { add(mandFTRCatchAll) } for i := 1; i <= n; i++ { ftr := genFS(t, rng, fmt.Sprintf("%s-fs%d", trial, i), false, goodPLNames, badPLNames) add(ftr) } if n == 0 || rng.Float32() < 0.5 { add(mandFTRExempt) } return } func fsPrecedes(a, b *flowcontrol.FlowSchema) bool { if a.Spec.MatchingPrecedence < b.Spec.MatchingPrecedence { return true } if a.Spec.MatchingPrecedence == b.Spec.MatchingPrecedence { return a.Name < b.Name } return false }