Merge pull request #87754 from MikeSpreitzer/apf-filter5

Add twice refactored filter and config consumer for API Priority and Fairness

Kubernetes-commit: 77e8c75f32ae3e35a655fde08ae3b188d2a8e92e
This commit is contained in:
Kubernetes Publisher 2020-02-13 16:54:46 -08:00
commit 18e37e63c4
19 changed files with 2928 additions and 288 deletions

View File

@ -60,6 +60,7 @@ import (
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/routes"
serverstore "k8s.io/apiserver/pkg/server/storage"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/informers"
restclient "k8s.io/client-go/rest"
"k8s.io/component-base/logs"
@ -107,6 +108,9 @@ type Config struct {
AdmissionControl admission.Interface
CorsAllowedOriginList []string
// FlowControl, if not nil, gives priority and fairness to request handling
FlowControl utilflowcontrol.Interface
EnableIndex bool
EnableProfiling bool
EnableDiscovery bool
@ -606,6 +610,21 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
}
}
const priorityAndFairnessConfigConsumerHookName = "priority-and-fairness-config-consumer"
if s.isPostStartHookRegistered(priorityAndFairnessConfigConsumerHookName) {
} else if c.FlowControl != nil {
err := s.AddPostStartHook(priorityAndFairnessConfigConsumerHookName, func(context PostStartHookContext) error {
go c.FlowControl.Run(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
// TODO(yue9944882): plumb pre-shutdown-hook for request-management system?
} else {
klog.V(3).Infof("Not requested to run hook %s", priorityAndFairnessConfigConsumerHookName)
}
for _, delegateCheck := range delegationTarget.HealthzChecks() {
skip := false
for _, existingCheck := range c.HealthzChecks {
@ -638,7 +657,11 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
if c.FlowControl != nil {
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl)
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)

View File

@ -0,0 +1,110 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"context"
"fmt"
"net/http"
// TODO: decide whether to also generate the old metrics, which
// categorize according to mutating vs readonly.
// "k8s.io/apiserver/pkg/endpoints/metrics"
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
apitypes "k8s.io/apimachinery/pkg/types"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/klog"
)
type priorityAndFairnessKeyType int
const priorityAndFairnessKey priorityAndFairnessKeyType = iota
// PriorityAndFairnessClassification identifies the results of
// classification for API Priority and Fairness
type PriorityAndFairnessClassification struct {
FlowSchemaName string
FlowSchemaUID apitypes.UID
PriorityLevelName string
PriorityLevelUID apitypes.UID
}
// GetClassification returns the classification associated with the
// given context, if any, otherwise nil
func GetClassification(ctx context.Context) *PriorityAndFairnessClassification {
return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification)
}
// WithPriorityAndFairness limits the number of in-flight
// requests in a fine-grained way.
func WithPriorityAndFairness(
handler http.Handler,
longRunningRequestCheck apirequest.LongRunningRequestCheck,
fcIfc utilflowcontrol.Interface,
) http.Handler {
if fcIfc == nil {
klog.Warningf("priority and fairness support not found, skipping")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no RequestInfo found in context"))
return
}
user, ok := apirequest.UserFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no User found in context"))
return
}
// Skip tracking long running requests.
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {
klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
handler.ServeHTTP(w, r)
return
}
var classification *PriorityAndFairnessClassification
note := func(fs *fcv1a1.FlowSchema, pl *fcv1a1.PriorityLevelConfiguration) {
classification = &PriorityAndFairnessClassification{
FlowSchemaName: fs.Name,
FlowSchemaUID: fs.UID,
PriorityLevelName: pl.Name,
PriorityLevelUID: pl.UID}
}
var served bool
execute := func() {
served = true
innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification)
innerReq := r.Clone(innerCtx)
handler.ServeHTTP(w, innerReq)
}
digest := utilflowcontrol.RequestDigest{requestInfo, user}
fcIfc.Handle(ctx, digest, note, execute)
if !served {
tooManyRequests(r, w)
return
}
})
}

View File

@ -18,12 +18,15 @@ package options
import (
"github.com/spf13/pflag"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/kubernetes"
"k8s.io/component-base/featuregate"
)
@ -125,7 +128,14 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
if err := o.EgressSelector.ApplyTo(&config.Config); err != nil {
return err
}
if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) {
config.FlowControl = utilflowcontrol.New(
config.SharedInformerFactory,
kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1alpha1(),
config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight,
config.RequestTimeout/4,
)
}
return nil
}

View File

@ -26,9 +26,6 @@ import (
"k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
// add the generic feature gates
"k8s.io/apiserver/pkg/features"
"github.com/spf13/pflag"
)
@ -67,6 +64,7 @@ func NewServerRunOptions() *ServerRunOptions {
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
EnableInflightQuotaHandler: true,
}
}
@ -116,26 +114,11 @@ func (s *ServerRunOptions) Validate() []error {
errors = append(errors, fmt.Errorf("--livez-grace-period can not be a negative value"))
}
if s.EnableInflightQuotaHandler {
if !utilfeature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) {
errors = append(errors, fmt.Errorf("--enable-inflight-quota-handler can not be set if feature "+
"gate APIPriorityAndFairness is disabled"))
}
if s.MaxMutatingRequestsInFlight != 0 {
errors = append(errors, fmt.Errorf("--max-mutating-requests-inflight=%v "+
"can not be set if enabled inflight quota handler", s.MaxMutatingRequestsInFlight))
}
if s.MaxRequestsInFlight != 0 {
errors = append(errors, fmt.Errorf("--max-requests-inflight=%v "+
"can not be set if enabled inflight quota handler", s.MaxRequestsInFlight))
}
} else {
if s.MaxRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-requests-inflight can not be negative value"))
}
if s.MaxMutatingRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-mutating-requests-inflight can not be negative value"))
}
if s.MaxRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-requests-inflight can not be negative value"))
}
if s.MaxMutatingRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-mutating-requests-inflight can not be negative value"))
}
if s.RequestTimeout.Nanoseconds() < 0 {
@ -211,7 +194,7 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"to spread out load.")
fs.BoolVar(&s.EnableInflightQuotaHandler, "enable-inflight-quota-handler", s.EnableInflightQuotaHandler, ""+
"If true, replace the max-in-flight handler with an enhanced one that queues and dispatches with priority and fairness")
"If true and the APIPriorityAndFairness feature gate is enabled, replace the max-in-flight handler with an enhanced one that queues and dispatches with priority and fairness")
fs.DurationVar(&s.ShutdownDelayDuration, "shutdown-delay-duration", s.ShutdownDelayDuration, ""+
"Time to delay the termination. During that time the server keeps serving requests normally and /healthz "+

View File

@ -0,0 +1,725 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"context"
"crypto/sha256"
"encoding/binary"
"encoding/json"
"fmt"
"math"
"sort"
"sync"
"time"
"github.com/pkg/errors"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
apitypes "k8s.io/apimachinery/pkg/types"
apierrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/apihelpers"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
fctypesv1a1 "k8s.io/api/flowcontrol/v1alpha1"
fcclientv1a1 "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1"
fclistersv1a1 "k8s.io/client-go/listers/flowcontrol/v1alpha1"
)
// This file contains a simple local (to the apiserver) controller
// that digests API Priority and Fairness config objects (FlowSchema
// and PriorityLevelConfiguration) into the data structure that the
// filter uses. At this first level of development this controller
// takes the simplest possible approach: whenever notified of any
// change to any config object, or when any priority level that is
// undesired becomes completely unused, all the config objects are
// read and processed as a whole.
// StartFunction begins the process of handlig a request. If the
// request gets queued then this function uses the given hashValue as
// the source of entropy as it shuffle-shards the request into a
// queue. The descr1 and descr2 values play no role in the logic but
// appear in log messages. This method does not return until the
// queuing, if any, for this request is done. If `execute` is false
// then `afterExecution` is irrelevant and the request should be
// rejected. Otherwise the request should be executed and
// `afterExecution` must be called exactly once.
type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, afterExecution func())
// RequestDigest holds necessary info from request for flow-control
type RequestDigest struct {
RequestInfo *request.RequestInfo
User user.Info
}
// `*configController` maintains eventual consistency with the API
// objects that configure API Priority and Fairness, and provides a
// procedural interface to the configured behavior. The methods of
// this type and cfgMeal follow the convention that the suffix
// "Locked" means that the caller must hold the configController lock.
type configController struct {
queueSetFactory fq.QueueSetFactory
// configQueue holds `(interface{})(0)` when the configuration
// objects need to be reprocessed.
configQueue workqueue.RateLimitingInterface
plLister fclistersv1a1.PriorityLevelConfigurationLister
plInformerSynced cache.InformerSynced
fsLister fclistersv1a1.FlowSchemaLister
fsInformerSynced cache.InformerSynced
flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface
// serverConcurrencyLimit is the limit on the server's total
// number of non-exempt requests being served at once. This comes
// from server configuration.
serverConcurrencyLimit int
// requestWaitLimit comes from server configuration.
requestWaitLimit time.Duration
// This must be locked while accessing flowSchemas or
// priorityLevelStates. It is the lock involved in
// LockingWriteMultiple.
lock sync.Mutex
// flowSchemas holds the flow schema objects, sorted by increasing
// numerical (decreasing logical) matching precedence. Every
// FlowSchema in this slice is immutable.
flowSchemas apihelpers.FlowSchemaSequence
// priorityLevelStates maps the PriorityLevelConfiguration object
// name to the state for that level. Every name referenced from a
// member of `flowSchemas` has an entry here.
priorityLevelStates map[string]*priorityLevelState
}
// priorityLevelState holds the state specific to a priority level.
type priorityLevelState struct {
// the API object or prototype prescribing this level. Nothing
// reached through this pointer is mutable.
pl *fctypesv1a1.PriorityLevelConfiguration
// qsCompleter holds the QueueSetCompleter derived from `config`
// and `queues` if config is not exempt, nil otherwise.
qsCompleter fq.QueueSetCompleter
// The QueueSet for this priority level. This is nil if and only
// if the priority level is exempt.
queues fq.QueueSet
// quiescing==true indicates that this priority level should be
// removed when its queues have all drained. May be true only if
// queues is non-nil.
quiescing bool
// number of goroutines between Controller::Match and calling the
// returned StartFunction
numPending int
}
// NewTestableController is extra flexible to facilitate testing
func newTestableController(
informerFactory kubeinformers.SharedInformerFactory,
flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface,
serverConcurrencyLimit int,
requestWaitLimit time.Duration,
queueSetFactory fq.QueueSetFactory,
) *configController {
cfgCtl := &configController{
queueSetFactory: queueSetFactory,
serverConcurrencyLimit: serverConcurrencyLimit,
requestWaitLimit: requestWaitLimit,
flowcontrolClient: flowcontrolClient,
priorityLevelStates: make(map[string]*priorityLevelState),
}
klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", serverConcurrencyLimit, requestWaitLimit)
cfgCtl.initializeConfigController(informerFactory)
// ensure the data structure reflects the mandatory config
cfgCtl.lockAndDigestConfigObjects(nil, nil)
return cfgCtl
}
// initializeConfigController sets up the controller that processes
// config API objects.
func (cfgCtl *configController) initializeConfigController(informerFactory kubeinformers.SharedInformerFactory) {
cfgCtl.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
fci := informerFactory.Flowcontrol().V1alpha1()
pli := fci.PriorityLevelConfigurations()
fsi := fci.FlowSchemas()
cfgCtl.plLister = pli.Lister()
cfgCtl.plInformerSynced = pli.Informer().HasSynced
cfgCtl.fsLister = fsi.Lister()
cfgCtl.fsInformerSynced = fsi.Informer().HasSynced
pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pl := obj.(*fctypesv1a1.PriorityLevelConfiguration)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of PLC %s", pl.Name)
cfgCtl.configQueue.Add(0)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newPL := newObj.(*fctypesv1a1.PriorityLevelConfiguration)
oldPL := oldObj.(*fctypesv1a1.PriorityLevelConfiguration)
if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) {
klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of PLC %s", newPL.Name)
cfgCtl.configQueue.Add(0)
}
},
DeleteFunc: func(obj interface{}) {
name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of PLC %s", name)
cfgCtl.configQueue.Add(0)
}})
fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
fs := obj.(*fctypesv1a1.FlowSchema)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of FS %s", fs.Name)
cfgCtl.configQueue.Add(0)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newFS := newObj.(*fctypesv1a1.FlowSchema)
oldFS := oldObj.(*fctypesv1a1.FlowSchema)
if !apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) {
klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of FS %s", newFS.Name)
cfgCtl.configQueue.Add(0)
}
},
DeleteFunc: func(obj interface{}) {
name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of FS %s", name)
cfgCtl.configQueue.Add(0)
}})
}
func (cfgCtl *configController) Run(stopCh <-chan struct{}) error {
defer cfgCtl.configQueue.ShutDown()
klog.Info("Starting API Priority and Fairness config controller")
if ok := cache.WaitForCacheSync(stopCh, cfgCtl.plInformerSynced, cfgCtl.fsInformerSynced); !ok {
return fmt.Errorf("Never achieved initial sync")
}
klog.Info("Running API Priority and Fairness config worker")
wait.Until(cfgCtl.runWorker, time.Second, stopCh)
klog.Info("Shutting down API Priority and Fairness config worker")
return nil
}
func (cfgCtl *configController) runWorker() {
for cfgCtl.processNextWorkItem() {
}
}
func (cfgCtl *configController) processNextWorkItem() bool {
obj, shutdown := cfgCtl.configQueue.Get()
if shutdown {
return false
}
func(obj interface{}) {
defer cfgCtl.configQueue.Done(obj)
if !cfgCtl.syncOne() {
cfgCtl.configQueue.AddRateLimited(obj)
} else {
cfgCtl.configQueue.Forget(obj)
}
}(obj)
return true
}
// syncOne attempts to sync all the API Priority and Fairness config
// objects. It either succeeds and returns `true` or logs an error
// and returns `false`.
func (cfgCtl *configController) syncOne() bool {
all := labels.Everything()
newPLs, err := cfgCtl.plLister.List(all)
if err != nil {
klog.Errorf("Unable to list PriorityLevelConfiguration objects: %s", err.Error())
return false
}
newFSs, err := cfgCtl.fsLister.List(all)
if err != nil {
klog.Errorf("Unable to list FlowSchema objects: %s", err.Error())
return false
}
err = cfgCtl.digestConfigObjects(newPLs, newFSs)
if err == nil {
return true
}
klog.Error(err)
return false
}
// cfgMeal is the data involved in the process of digesting the API
// objects that configure API Priority and Fairness. All the config
// objects are digested together, because this is the simplest way to
// cope with the various dependencies between objects. The process of
// digestion is done in four passes over config objects --- three
// passes over PriorityLevelConfigurations and one pass over the
// FlowSchemas --- with the work dvided among the passes according to
// those dependencies.
type cfgMeal struct {
cfgCtl *configController
newPLStates map[string]*priorityLevelState
// The sum of the concurrency shares of the priority levels in the
// new configuration
shareSum float64
// These keep track of which mandatory priority level config
// objects have been digested
haveExemptPL, haveCatchAllPL bool
// Buffered FlowSchema status updates to do. Do them when the
// lock is not held, to avoid a deadlock due to such a request
// provoking a call into this controller while the lock held
// waiting on that request to complete.
fsStatusUpdates []fsStatusUpdate
}
// A buffered set of status updates for a FlowSchema
type fsStatusUpdate struct {
flowSchema *fctypesv1a1.FlowSchema
condition fctypesv1a1.FlowSchemaCondition
oldValue fctypesv1a1.FlowSchemaCondition
}
// digestConfigObjects is given all the API objects that configure
// cfgCtl and writes its consequent new configState.
func (cfgCtl *configController) digestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) error {
fsStatusUpdates := cfgCtl.lockAndDigestConfigObjects(newPLs, newFSs)
var errs []error
for _, fsu := range fsStatusUpdates {
enc, err := json.Marshal(fsu.condition)
if err != nil {
// should never happen because these conditions are created here and well formed
panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error()))
}
klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue))
_, err = cfgCtl.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: "api-priority-and-fairness-config-consumer"}, "status")
if err != nil {
errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name)))
}
}
if len(errs) == 0 {
return nil
}
return apierrors.NewAggregate(errs)
}
func (cfgCtl *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) []fsStatusUpdate {
cfgCtl.lock.Lock()
defer cfgCtl.lock.Unlock()
meal := cfgMeal{
cfgCtl: cfgCtl,
newPLStates: make(map[string]*priorityLevelState),
}
meal.digestNewPLsLocked(newPLs)
meal.digestFlowSchemasLocked(newFSs)
meal.processOldPLsLocked()
// Supply missing mandatory PriorityLevelConfiguration objects
if !meal.haveExemptPL {
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtl.requestWaitLimit)
}
if !meal.haveCatchAllPL {
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtl.requestWaitLimit)
}
meal.finishQueueSetReconfigsLocked()
// The new config has been constructed
cfgCtl.priorityLevelStates = meal.newPLStates
klog.V(5).Infof("Switched to new API Priority and Fairness configuration")
return meal.fsStatusUpdates
}
// Digest the new set of PriorityLevelConfiguration objects.
// Pretend broken ones do not exist.
func (meal *cfgMeal) digestNewPLsLocked(newPLs []*fctypesv1a1.PriorityLevelConfiguration) {
for _, pl := range newPLs {
state := meal.cfgCtl.priorityLevelStates[pl.Name]
if state == nil {
state = &priorityLevelState{}
}
qsCompleter, err := qscOfPL(meal.cfgCtl.queueSetFactory, state.queues, pl, meal.cfgCtl.requestWaitLimit)
if err != nil {
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
continue
}
meal.newPLStates[pl.Name] = state
state.pl = pl
state.qsCompleter = qsCompleter
if state.quiescing { // it was undesired, but no longer
klog.V(3).Infof("Priority level %q was undesired and has become desired again", pl.Name)
state.quiescing = false
}
if state.pl.Spec.Limited != nil {
meal.shareSum += float64(state.pl.Spec.Limited.AssuredConcurrencyShares)
}
meal.haveExemptPL = meal.haveExemptPL || pl.Name == fctypesv1a1.PriorityLevelConfigurationNameExempt
meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == fctypesv1a1.PriorityLevelConfigurationNameCatchAll
}
}
// Digest the given FlowSchema objects. Ones that reference a missing
// or broken priority level are not to be passed on to the filter for
// use. We do this before holding over old priority levels so that
// requests stop going to those levels and FlowSchemaStatus values
// reflect this. This function also adds any missing mandatory
// FlowSchema objects. The given objects must all have distinct
// names.
func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*fctypesv1a1.FlowSchema) {
fsSeq := make(apihelpers.FlowSchemaSequence, 0, len(newFSs))
fsMap := make(map[string]*fctypesv1a1.FlowSchema, len(newFSs))
var haveExemptFS, haveCatchAllFS bool
for i, fs := range newFSs {
otherFS := fsMap[fs.Name]
if otherFS != nil {
// This client is forbidden to do this.
panic(fmt.Sprintf("Given two FlowSchema objects with the same name: %s and %s", fcfmt.Fmt(otherFS), fcfmt.Fmt(fs)))
}
fsMap[fs.Name] = fs
_, goodPriorityRef := meal.newPLStates[fs.Spec.PriorityLevelConfiguration.Name]
// Ensure the object's status reflects whether its priority
// level reference is broken.
//
// TODO: consider not even trying if server is not handling
// requests yet.
meal.presyncFlowSchemaStatus(fs, !goodPriorityRef, fs.Spec.PriorityLevelConfiguration.Name)
if !goodPriorityRef {
klog.V(6).Infof("Ignoring FlowSchema %s because of bad priority level reference %q", fs.Name, fs.Spec.PriorityLevelConfiguration.Name)
continue
}
fsSeq = append(fsSeq, newFSs[i])
haveExemptFS = haveExemptFS || fs.Name == fctypesv1a1.FlowSchemaNameExempt
haveCatchAllFS = haveCatchAllFS || fs.Name == fctypesv1a1.FlowSchemaNameCatchAll
}
// sort into the order to be used for matching
sort.Sort(fsSeq)
// Supply missing mandatory FlowSchemas, in correct position
if !haveExemptFS {
fsSeq = append(apihelpers.FlowSchemaSequence{fcboot.MandatoryFlowSchemaExempt}, fsSeq...)
}
if !haveCatchAllFS {
fsSeq = append(fsSeq, fcboot.MandatoryFlowSchemaCatchAll)
}
meal.cfgCtl.flowSchemas = fsSeq
if klog.V(5) {
for _, fs := range fsSeq {
klog.Infof("Using FlowSchema %s", fcfmt.Fmt(fs))
}
}
}
// Consider all the priority levels in the previous configuration.
// Keep the ones that are in the new config, supply mandatory
// behavior, or are still busy; for the rest: drop it if it has no
// queues, otherwise start the quiescing process if that has not
// already been started.
func (meal *cfgMeal) processOldPLsLocked() {
for plName, plState := range meal.cfgCtl.priorityLevelStates {
if meal.newPLStates[plName] != nil {
// Still desired and already updated
continue
}
if plName == fctypesv1a1.PriorityLevelConfigurationNameExempt && !meal.haveExemptPL || plName == fctypesv1a1.PriorityLevelConfigurationNameCatchAll && !meal.haveCatchAllPL {
// BTW, we know the Spec has not changed because the
// mandatory objects have immutable Specs
klog.V(3).Infof("Retaining mandatory priority level %q despite lack of API object", plName)
} else {
if plState.queues == nil || plState.numPending == 0 && plState.queues.IsIdle() {
// Either there are no queues or they are done
// draining and no use is coming from another
// goroutine
klog.V(3).Infof("Removing undesired priority level %q (nilQueues=%v), Type=%v", plName, plState.queues == nil, plState.pl.Spec.Type)
continue
}
if !plState.quiescing {
klog.V(3).Infof("Priority level %q became undesired", plName)
plState.quiescing = true
}
}
var err error
plState.qsCompleter, err = qscOfPL(meal.cfgCtl.queueSetFactory, plState.queues, plState.pl, meal.cfgCtl.requestWaitLimit)
if err != nil {
// This can not happen because qscOfPL already approved this config
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
}
if plState.pl.Spec.Limited != nil {
// We deliberately include the lingering priority levels
// here so that their queues get some concurrency and they
// continue to drain. During this interim a lingering
// priority level continues to get a concurrency
// allocation determined by all the share values in the
// regular way.
meal.shareSum += float64(plState.pl.Spec.Limited.AssuredConcurrencyShares)
}
meal.haveExemptPL = meal.haveExemptPL || plName == fctypesv1a1.PriorityLevelConfigurationNameExempt
meal.haveCatchAllPL = meal.haveCatchAllPL || plName == fctypesv1a1.PriorityLevelConfigurationNameCatchAll
meal.newPLStates[plName] = plState
}
}
// For all the priority levels of the new config, divide up the
// server's total concurrency limit among them and create/update their
// QueueSets.
func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
for plName, plState := range meal.newPLStates {
if plState.pl.Spec.Limited == nil {
klog.V(5).Infof("Using exempt priority level %q: quiescing=%v", plName, plState.quiescing)
continue
}
// The use of math.Ceil here means that the results might sum
// to a little more than serverConcurrencyLimit but the
// difference will be negligible.
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtl.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum))
metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
if plState.queues == nil {
klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum)
} else {
klog.V(5).Infof("Retaining queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum)
}
plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: concurrencyLimit})
}
}
// qscOfPL returns a pointer to an appropriate QueuingConfig or nil
// if no limiting is called for. Returns nil and an error if the given
// object is malformed in a way that is a problem for this package.
func qscOfPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration) (fq.QueueSetCompleter, error) {
if (pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
return nil, errors.New("broken union structure at the top")
}
if (pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt) != (pl.Name == fctypesv1a1.PriorityLevelConfigurationNameExempt) {
// This package does not attempt to cope with a priority level dynamically switching between exempt and not.
return nil, errors.New("non-alignment between name and type")
}
if pl.Spec.Limited == nil {
return nil, nil
}
if (pl.Spec.Limited.LimitResponse.Type == fctypesv1a1.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) {
return nil, errors.New("broken union structure for limit response")
}
qcAPI := pl.Spec.Limited.LimitResponse.Queuing
qcQS := fq.QueuingConfig{Name: pl.Name}
if qcAPI != nil {
qcQS = fq.QueuingConfig{Name: pl.Name,
DesiredNumQueues: int(qcAPI.Queues),
QueueLengthLimit: int(qcAPI.QueueLengthLimit),
HandSize: int(qcAPI.HandSize),
RequestWaitLimit: requestWaitLimit,
}
}
var qsc fq.QueueSetCompleter
var err error
if queues != nil {
qsc, err = queues.BeginConfigChange(qcQS)
} else {
qsc, err = qsf.BeginConstruction(qcQS)
}
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("priority level %q has QueuingConfiguration %#+v, which is invalid", pl.Name, *qcAPI))
}
return qsc, err
}
func (meal *cfgMeal) presyncFlowSchemaStatus(fs *fctypesv1a1.FlowSchema, isDangling bool, plName string) {
danglingCondition := apihelpers.GetFlowSchemaConditionByType(fs, fctypesv1a1.FlowSchemaConditionDangling)
if danglingCondition == nil {
danglingCondition = &fctypesv1a1.FlowSchemaCondition{
Type: fctypesv1a1.FlowSchemaConditionDangling,
}
}
desiredStatus := fctypesv1a1.ConditionFalse
var desiredReason, desiredMessage string
if isDangling {
desiredStatus = fctypesv1a1.ConditionTrue
desiredReason = "NotFound"
desiredMessage = fmt.Sprintf("This FlowSchema references the PriorityLevelConfiguration object named %q but there is no such object", plName)
} else {
desiredReason = "Found"
desiredMessage = fmt.Sprintf("This FlowSchema references the PriorityLevelConfiguration object named %q and it exists", plName)
}
if danglingCondition.Status == desiredStatus && danglingCondition.Reason == desiredReason && danglingCondition.Message == desiredMessage {
return
}
meal.fsStatusUpdates = append(meal.fsStatusUpdates, fsStatusUpdate{
flowSchema: fs,
condition: fctypesv1a1.FlowSchemaCondition{
Type: fctypesv1a1.FlowSchemaConditionDangling,
Status: desiredStatus,
LastTransitionTime: metav1.Now(),
Reason: desiredReason,
Message: desiredMessage,
},
oldValue: *danglingCondition})
}
// imaginePL adds a priority level based on one of the mandatory ones
func (meal *cfgMeal) imaginePL(proto *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
qsCompleter, err := qscOfPL(meal.cfgCtl.queueSetFactory, nil, proto, requestWaitLimit)
if err != nil {
// This can not happen because proto is one of the mandatory
// objects and these are not erroneous
panic(err)
}
meal.newPLStates[proto.Name] = &priorityLevelState{
pl: proto,
qsCompleter: qsCompleter,
}
if proto.Spec.Limited != nil {
meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares)
}
return
}
type immediateRequest struct{}
func (immediateRequest) Finish(execute func()) bool {
execute()
return false
}
// startRequest classifies and, if appropriate, enqueues the request.
// Returns a nil Request if and only if the request is to be rejected.
// The returned bool indicates whether the request is exempt from
// limitation. The startWaitingTime is when the request started
// waiting in its queue, or `Time{}` if this did not happen.
func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDigest) (fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
klog.V(7).Infof("startRequest(%#+v)", rd)
cfgCtl.lock.Lock()
defer cfgCtl.lock.Unlock()
for _, fs := range cfgCtl.flowSchemas {
if matchesFlowSchema(rd, fs) {
plName := fs.Spec.PriorityLevelConfiguration.Name
plState := cfgCtl.priorityLevelStates[plName]
if plState.pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt {
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, fs.Name, fs.Spec.DistinguisherMethod, plName)
return fs, plState.pl, true, immediateRequest{}, time.Time{}
}
var numQueues int32
if plState.pl.Spec.Limited.LimitResponse.Type == fctypesv1a1.LimitResponseTypeQueue {
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
}
var hashValue uint64
if numQueues > 1 {
flowDistinguisher := computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod)
hashValue = hashFlowID(fs.Name, flowDistinguisher)
}
startWaitingTime = time.Now()
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, hashValue, rd.RequestInfo, rd.User)
if idle {
cfgCtl.maybeReapLocked(plName, plState)
}
return fs, plState.pl, false, req, startWaitingTime
}
}
// This can never happen because every configState has a
// FlowSchema that matches everything. If somehow control reaches
// here, panic with some relevant information.
var catchAll *fctypesv1a1.FlowSchema
for _, fs := range cfgCtl.flowSchemas {
if fs.Name == fctypesv1a1.FlowSchemaNameCatchAll {
catchAll = fs
}
}
panic(fmt.Sprintf("No match; rd=%#+v, catchAll=%s", rd, fcfmt.Fmt(catchAll)))
}
// Call this after getting a clue that the given priority level is undesired and idle
func (cfgCtl *configController) maybeReap(plName string) {
cfgCtl.lock.Lock()
defer cfgCtl.lock.Unlock()
plState := cfgCtl.priorityLevelStates[plName]
if plState == nil {
klog.V(7).Infof("plName=%s, plState==nil", plName)
return
}
if plState.queues != nil {
useless := plState.quiescing && plState.numPending == 0 && plState.queues.IsIdle()
klog.V(7).Infof("plState.quiescing=%v, plState.numPending=%d, useless=%v", plState.quiescing, plState.numPending, useless)
if !useless {
return
}
}
klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName)
cfgCtl.configQueue.Add(0)
}
// Call this if both (1) plState.queues is non-nil and reported being
// idle, and (2) cfgCtl's lock has not been released since then.
func (cfgCtl *configController) maybeReapLocked(plName string, plState *priorityLevelState) {
if !(plState.quiescing && plState.numPending == 0) {
return
}
klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName)
cfgCtl.configQueue.Add(0)
}
// computeFlowDistinguisher extracts the flow distinguisher according to the given method
func computeFlowDistinguisher(rd RequestDigest, method *fctypesv1a1.FlowDistinguisherMethod) string {
if method == nil {
return ""
}
switch method.Type {
case fctypesv1a1.FlowDistinguisherMethodByUserType:
return rd.User.GetName()
case fctypesv1a1.FlowDistinguisherMethodByNamespaceType:
return rd.RequestInfo.Namespace
default:
// this line shall never reach
panic("invalid flow-distinguisher method")
}
}
func hashFlowID(fsName, fDistinguisher string) uint64 {
hash := sha256.New()
var sep = [1]byte{0}
hash.Write([]byte(fsName))
hash.Write(sep[:])
hash.Write([]byte(fDistinguisher))
var sum [32]byte
hash.Sum(sum[:0])
return binary.LittleEndian.Uint64(sum[:8])
}

View File

@ -0,0 +1,124 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"context"
"strconv"
"time"
// TODO: decide whether to use the existing metrics, which
// categorize according to mutating vs readonly, or make new
// metrics because this filter does not pay attention to that
// distinction
// "k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/klog"
fctypesv1a1 "k8s.io/api/flowcontrol/v1alpha1"
fcclientv1a1 "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1"
)
// Interface defines how the API Priority and Fairness filter interacts with the underlying system.
type Interface interface {
// Handle takes care of queuing and dispatching a request
// characterized by the given digest. The given `noteFn` will be
// invoked with the results of request classification. If Handle
// decides that the request should be executed then `execute()`
// will be invoked once to execute the request; otherwise
// `execute()` will not be invoked.
Handle(ctx context.Context,
requestDigest RequestDigest,
noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration),
execFn func(),
)
// Run monitors config objects from the main apiservers and causes
// any needed changes to local behavior. This method ceases
// activity and returns after the given channel is closed.
Run(stopCh <-chan struct{}) error
}
// This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
// New creates a new instance to implement API priority and fairness
func New(
informerFactory kubeinformers.SharedInformerFactory,
flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface,
serverConcurrencyLimit int,
requestWaitLimit time.Duration,
) Interface {
grc := counter.NoOp{}
return NewTestable(
informerFactory,
flowcontrolClient,
serverConcurrencyLimit,
requestWaitLimit,
fqs.NewQueueSetFactory(&clock.RealClock{}, grc),
)
}
// NewTestable is extra flexible to facilitate testing
func NewTestable(
informerFactory kubeinformers.SharedInformerFactory,
flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface,
serverConcurrencyLimit int,
requestWaitLimit time.Duration,
queueSetFactory fq.QueueSetFactory,
) Interface {
return newTestableController(informerFactory, flowcontrolClient, serverConcurrencyLimit, requestWaitLimit, queueSetFactory)
}
func (cfgCtl *configController) Handle(ctx context.Context, requestDigest RequestDigest,
noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration),
execFn func()) {
fs, pl, isExempt, req, startWaitingTime := cfgCtl.startRequest(ctx, requestDigest)
queued := startWaitingTime != time.Time{}
noteFn(fs, pl)
if req == nil {
if queued {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
}
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt)
return
}
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued)
var executed bool
idle := req.Finish(func() {
if queued {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
}
executed = true
startExecutionTime := time.Now()
execFn()
metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime))
})
if queued && !executed {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
}
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v, Finish() => idle=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued, idle)
if idle {
cfgCtl.maybeReap(pl.Name)
}
}

View File

@ -0,0 +1,419 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"context"
"fmt"
"math/rand"
"os"
"sync"
"testing"
"time"
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
"k8s.io/apimachinery/pkg/util/sets"
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
fcclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1"
"k8s.io/klog"
)
func TestMain(m *testing.M) {
klog.InitFlags(nil)
os.Exit(m.Run())
}
var mandPLs = func() map[string]*fcv1a1.PriorityLevelConfiguration {
ans := make(map[string]*fcv1a1.PriorityLevelConfiguration)
for _, mand := range fcboot.MandatoryPriorityLevelConfigurations {
ans[mand.Name] = mand
}
return ans
}()
type ctlTestState struct {
t *testing.T
cfgCtl *configController
fcIfc fcclient.FlowcontrolV1alpha1Interface
existingPLs map[string]*fcv1a1.PriorityLevelConfiguration
existingFSs map[string]*fcv1a1.FlowSchema
heldRequestsMap map[string][]heldRequest
requestWG sync.WaitGroup
lock sync.Mutex
queues map[string]*ctlTestQueueSet
}
type heldRequest struct {
rd RequestDigest
finishCh chan struct{}
}
var _ fq.QueueSetFactory = (*ctlTestState)(nil)
type ctlTestQueueSetCompleter struct {
cts *ctlTestState
cqs *ctlTestQueueSet
qc fq.QueuingConfig
}
type ctlTestQueueSet struct {
cts *ctlTestState
qc fq.QueuingConfig
dc fq.DispatchingConfig
countActive int
}
type ctlTestRequest struct {
cqs *ctlTestQueueSet
qsName string
descr1, descr2 interface{}
}
func (cts *ctlTestState) BeginConstruction(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) {
return ctlTestQueueSetCompleter{cts, nil, qc}, nil
}
func (cqs *ctlTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) {
return ctlTestQueueSetCompleter{cqs.cts, cqs, qc}, nil
}
func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet {
cqc.cts.lock.Lock()
defer cqc.cts.lock.Unlock()
qs := cqc.cqs
if qs == nil {
qs = &ctlTestQueueSet{cts: cqc.cts, qc: cqc.qc, dc: dc}
cqc.cts.queues[cqc.qc.Name] = qs
} else {
qs.qc, qs.dc = cqc.qc, dc
}
return qs
}
func (cqs *ctlTestQueueSet) IsIdle() bool {
cqs.cts.lock.Lock()
defer cqs.cts.lock.Unlock()
klog.V(7).Infof("For %p QS %s, countActive==%d", cqs, cqs.qc.Name, cqs.countActive)
return cqs.countActive == 0
}
func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (req fq.Request, idle bool) {
cqs.cts.lock.Lock()
defer cqs.cts.lock.Unlock()
cqs.countActive++
cqs.cts.t.Logf("Queued %#+v %#+v for %p QS=%s, countActive:=%d", descr1, descr2, cqs, cqs.qc.Name, cqs.countActive)
return &ctlTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false
}
func (ctr *ctlTestRequest) Finish(execute func()) bool {
execute()
ctr.cqs.cts.lock.Lock()
defer ctr.cqs.cts.lock.Unlock()
ctr.cqs.countActive--
ctr.cqs.cts.t.Logf("Finished %#+v %#+v for %p QS=%s, countActive:=%d", ctr.descr1, ctr.descr2, ctr.cqs, ctr.cqs.qc.Name, ctr.cqs.countActive)
return ctr.cqs.countActive == 0
}
func (cts *ctlTestState) getQueueSetNames() sets.String {
cts.lock.Lock()
defer cts.lock.Unlock()
return sets.StringKeySet(cts.queues)
}
func (cts *ctlTestState) getNonIdleQueueSetNames() sets.String {
cts.lock.Lock()
defer cts.lock.Unlock()
ans := sets.NewString()
for name, qs := range cts.queues {
if qs.countActive > 0 {
ans.Insert(name)
}
}
return ans
}
func (cts *ctlTestState) hasNonIdleQueueSet(name string) bool {
cts.lock.Lock()
defer cts.lock.Unlock()
qs := cts.queues[name]
return qs != nil && qs.countActive > 0
}
func (cts *ctlTestState) addHeldRequest(plName string, rd RequestDigest, finishCh chan struct{}) {
cts.lock.Lock()
defer cts.lock.Unlock()
hrs := cts.heldRequestsMap[plName]
hrs = append(hrs, heldRequest{rd, finishCh})
cts.heldRequestsMap[plName] = hrs
cts.t.Logf("Holding %#+v for %s, count:=%d", rd, plName, len(hrs))
}
func (cts *ctlTestState) popHeldRequest() (plName string, hr *heldRequest, nCount int) {
cts.lock.Lock()
defer cts.lock.Unlock()
var hrs []heldRequest
for {
for plName, hrs = range cts.heldRequestsMap {
goto GotOne
}
return "", nil, 0
GotOne:
if nhr := len(hrs); nhr > 0 {
hrv := hrs[nhr-1]
hrs = hrs[:nhr-1]
hr = &hrv
}
if len(hrs) == 0 {
delete(cts.heldRequestsMap, plName)
} else {
cts.heldRequestsMap[plName] = hrs
}
if hr != nil {
nCount = len(hrs)
return
}
}
}
var mandQueueSetNames, exclQueueSetNames = func() (sets.String, sets.String) {
mandQueueSetNames := sets.NewString()
exclQueueSetNames := sets.NewString()
for _, mpl := range fcboot.MandatoryPriorityLevelConfigurations {
if mpl.Spec.Type == fcv1a1.PriorityLevelEnablementExempt {
exclQueueSetNames.Insert(mpl.Name)
} else {
mandQueueSetNames.Insert(mpl.Name)
}
}
return mandQueueSetNames, exclQueueSetNames
}()
func TestConfigConsumer(t *testing.T) {
rngOuter := rand.New(rand.NewSource(1234567890123456789))
for i := 1; i <= 20; i++ {
rng := rand.New(rand.NewSource(int64(rngOuter.Uint64())))
t.Run(fmt.Sprintf("trial%d:", i), func(t *testing.T) {
clientset := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
flowcontrolClient := clientset.FlowcontrolV1alpha1()
cts := &ctlTestState{t: t,
fcIfc: flowcontrolClient,
existingFSs: map[string]*fcv1a1.FlowSchema{},
existingPLs: map[string]*fcv1a1.PriorityLevelConfiguration{},
heldRequestsMap: map[string][]heldRequest{},
queues: map[string]*ctlTestQueueSet{},
}
ctl := newTestableController(
informerFactory,
flowcontrolClient,
100, // server concurrency limit
time.Minute, // request wait limit
cts,
)
cts.cfgCtl = ctl
persistingPLNames := sets.NewString()
trialStep := fmt.Sprintf("trial%d-0", i)
_, _, desiredPLNames, newBadPLNames := genPLs(rng, trialStep, persistingPLNames, 0)
_, _, newFTRs, newCatchAlls := genFSs(t, rng, trialStep, desiredPLNames, newBadPLNames, 0)
for j := 0; ; {
t.Logf("For %s, desiredPLNames=%#+v", trialStep, desiredPLNames)
t.Logf("For %s, newFTRs=%#+v", trialStep, newFTRs)
// Check that the latest digestion did the right thing
nextPLNames := sets.NewString()
for oldPLName := range persistingPLNames {
if mandPLs[oldPLName] != nil || cts.hasNonIdleQueueSet(oldPLName) {
nextPLNames.Insert(oldPLName)
}
}
persistingPLNames = nextPLNames.Union(desiredPLNames)
expectedQueueSetNames := persistingPLNames.Union(mandQueueSetNames).Difference(exclQueueSetNames)
allQueueSetNames := cts.getQueueSetNames()
missingQueueSetNames := expectedQueueSetNames.Difference(allQueueSetNames)
if len(missingQueueSetNames) > 0 {
t.Errorf("Fail: missing QueueSets %v", missingQueueSetNames)
}
nonIdleQueueSetNames := cts.getNonIdleQueueSetNames()
extraQueueSetNames := nonIdleQueueSetNames.Difference(expectedQueueSetNames)
if len(extraQueueSetNames) > 0 {
t.Errorf("Fail: unexpected QueueSets %v", extraQueueSetNames)
}
for plName, hr, nCount := cts.popHeldRequest(); hr != nil; plName, hr, nCount = cts.popHeldRequest() {
desired := desiredPLNames.Has(plName) || mandPLs[plName] != nil
t.Logf("Releasing held request %#+v, desired=%v, plName=%s, count:=%d", hr.rd, desired, plName, nCount)
close(hr.finishCh)
}
cts.requestWG.Wait()
for _, ftr := range newFTRs {
checkNewFS(cts, rng, trialStep, ftr, newCatchAlls)
}
j++
if j > 20 {
break
}
// Calculate expected survivors
// Now create a new config and digest it
trialStep = fmt.Sprintf("trial%d-%d", i, j)
var newPLs []*fcv1a1.PriorityLevelConfiguration
var newFSs []*fcv1a1.FlowSchema
newPLs, _, desiredPLNames, newBadPLNames = genPLs(rng, trialStep, persistingPLNames, 1+rng.Intn(4))
newFSs, _, newFTRs, newCatchAlls = genFSs(t, rng, trialStep, desiredPLNames, newBadPLNames, 1+rng.Intn(6))
for _, newPL := range newPLs {
t.Logf("For %s, digesting newPL=%s", trialStep, fcfmt.Fmt(newPL))
}
for _, newFS := range newFSs {
t.Logf("For %s, digesting newFS=%s", trialStep, fcfmt.Fmt(newFS))
}
_ = ctl.lockAndDigestConfigObjects(newPLs, newFSs)
}
for plName, hr, nCount := cts.popHeldRequest(); hr != nil; plName, hr, nCount = cts.popHeldRequest() {
desired := desiredPLNames.Has(plName) || mandPLs[plName] != nil
t.Logf("Releasing held request %#+v, desired=%v, plName=%s, count:=%d", hr.rd, desired, plName, nCount)
close(hr.finishCh)
}
cts.requestWG.Wait()
})
}
}
func checkNewFS(cts *ctlTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*fcv1a1.FlowSchema) {
t := cts.t
ctl := cts.cfgCtl
fs := ftr.fs
expectedPLName := fs.Spec.PriorityLevelConfiguration.Name
ctx := context.Background()
// Use this to make sure all these requests have started executing
// before the next reconfiguration
var startWG sync.WaitGroup
for matches, digests1 := range ftr.digests {
for isResource, digests2 := range digests1 {
for _, rd := range digests2 {
finishCh := make(chan struct{})
rdu := uniqify(rd)
cts.requestWG.Add(1)
startWG.Add(1)
go func(matches, isResource bool, rdu RequestDigest) {
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
ctl.Handle(ctx, rdu, func(matchFS *fcv1a1.FlowSchema, matchPL *fcv1a1.PriorityLevelConfiguration) {
matchIsExempt := matchPL.Spec.Type == fcv1a1.PriorityLevelEnablementExempt
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
if e, a := expectedMatch, matchFS.Name == fs.Name; e != a {
t.Errorf("Fail at %s/%s: rd=%#+v, expectedMatch=%v, actualMatch=%v, matchFSName=%q, catchAlls=%#+v", trialName, fs.Name, rdu, e, a, matchFS.Name, catchAlls)
}
if matchFS.Name == fs.Name {
if e, a := fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name; e != a {
t.Errorf("Fail at %s/%s: e=%v, a=%v", trialName, fs.Name, e, a)
}
}
}, func() {
startWG.Done()
_ = <-finishCh
})
cts.requestWG.Done()
}(matches, isResource, rdu)
if rng.Float32() < 0.8 {
t.Logf("Immediate request %#+v, plName=%s", rdu, expectedPLName)
close(finishCh)
} else {
cts.addHeldRequest(expectedPLName, rdu, finishCh)
}
}
}
}
startWG.Wait()
}
func genPLs(rng *rand.Rand, trial string, oldPLNames sets.String, n int) (pls []*fcv1a1.PriorityLevelConfiguration, plMap map[string]*fcv1a1.PriorityLevelConfiguration, goodNames, badNames sets.String) {
pls = make([]*fcv1a1.PriorityLevelConfiguration, 0, n)
plMap = make(map[string]*fcv1a1.PriorityLevelConfiguration, n)
goodNames = sets.NewString()
badNames = sets.NewString(trial+"-nopl1", trial+"-nopl2")
addGood := func(pl *fcv1a1.PriorityLevelConfiguration) {
pls = append(pls, pl)
plMap[pl.Name] = pl
goodNames.Insert(pl.Name)
}
for i := 1; i <= n; i++ {
pl := genPL(rng, fmt.Sprintf("%s-pl%d", trial, i))
addGood(pl)
}
for oldPLName := range oldPLNames {
if _, has := mandPLs[oldPLName]; has {
continue
}
if rng.Float32() < 0.67 {
pl := genPL(rng, oldPLName)
addGood(pl)
}
}
for _, pl := range mandPLs {
if n > 0 && rng.Float32() < 0.5 && !(goodNames.Has(pl.Name) || badNames.Has(pl.Name)) {
addGood(pl)
}
}
return
}
func genFSs(t *testing.T, rng *rand.Rand, trial string, goodPLNames, badPLNames sets.String, n int) (newFSs []*fcv1a1.FlowSchema, newFSMap map[string]*fcv1a1.FlowSchema, newFTRs map[string]*fsTestingRecord, catchAlls map[bool]*fcv1a1.FlowSchema) {
newFTRs = map[string]*fsTestingRecord{}
catchAlls = map[bool]*fcv1a1.FlowSchema{
false: fcboot.MandatoryFlowSchemaCatchAll,
true: fcboot.MandatoryFlowSchemaCatchAll}
newFSMap = map[string]*fcv1a1.FlowSchema{}
add := func(ftr *fsTestingRecord) {
newFSs = append(newFSs, ftr.fs)
newFSMap[ftr.fs.Name] = ftr.fs
newFTRs[ftr.fs.Name] = ftr
if ftr.wellFormed {
if ftr.matchesAllNonResourceRequests && fsPrecedes(ftr.fs, catchAlls[false]) {
catchAlls[false] = ftr.fs
}
if ftr.matchesAllResourceRequests && fsPrecedes(ftr.fs, catchAlls[true]) {
catchAlls[true] = ftr.fs
}
}
t.Logf("For trial %s, adding wf=%v FlowSchema %s", trial, ftr.wellFormed, fcfmt.Fmt(ftr.fs))
}
if n == 0 || rng.Float32() < 0.5 {
add(mandFTRCatchAll)
}
for i := 1; i <= n; i++ {
ftr := genFS(t, rng, fmt.Sprintf("%s-fs%d", trial, i), false, goodPLNames, badPLNames)
add(ftr)
}
if n == 0 || rng.Float32() < 0.5 {
add(mandFTRExempt)
}
return
}
func fsPrecedes(a, b *fcv1a1.FlowSchema) bool {
if a.Spec.MatchingPrecedence < b.Spec.MatchingPrecedence {
return true
}
if a.Spec.MatchingPrecedence == b.Spec.MatchingPrecedence {
return a.Name < b.Name
}
return false
}

View File

@ -0,0 +1,25 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package counter
// NoOp is a GoRoutineCounter that does not actually count
type NoOp struct{}
var _ GoRoutineCounter = NoOp{}
// Add would adjust the count, if a count were being kept
func (NoOp) Add(int) {}

View File

@ -47,47 +47,46 @@ type QueueSetCompleter interface {
// . Some day we may have connections between priority levels, but
// today is not that day.
type QueueSet interface {
// BeginConfigChange starts the two-step process of updating
// the configuration. No change is made until Complete is
// called. If `C := X.BeginConstruction(q)` then
// `C.Complete(d)` returns the same value `X`. If the
// QueuingConfig's DesiredNumQueues field is zero then the other
// queuing-specific config parameters are not changed, so that the
// queues continue draining as before.
// BeginConfigChange starts the two-step process of updating the
// configuration. No change is made until Complete is called. If
// `C := X.BeginConstruction(q)` then `C.Complete(d)` returns the
// same value `X`. If the QueuingConfig's DesiredNumQueues field
// is zero then the other queuing-specific config parameters are
// not changed, so that the queues continue draining as before.
// In any case, reconfiguration does not discard any queue unless
// and until it is undesired and empty.
BeginConfigChange(QueuingConfig) (QueueSetCompleter, error)
// Quiesce controls whether the QueueSet is operating normally or
// is quiescing. A quiescing QueueSet drains as normal but does
// not admit any new requests. Passing a non-nil handler means the
// system should be quiescing, a nil handler means the system
// should operate normally. A call to Wait while the system is
// quiescing will be rebuffed by returning tryAnother=true. If all
// the queues have no requests waiting nor executing while the
// system is quiescing then the handler will eventually be called
// with no locks held (even if the system becomes non-quiescing
// between the triggering state and the required call). In Go
// Memory Model terms, the triggering state happens before the
// call to the EmptyHandler.
Quiesce(EmptyHandler)
// IsIdle returns a bool indicating whether the QueueSet was idle
// at the moment of the return. Idle means the QueueSet has zero
// requests queued and zero executing. This bit can change only
// (1) during a call to StartRequest and (2) during a call to
// Request::Finish. In the latter case idleness can only change
// from false to true.
IsIdle() bool
// Wait uses the given hashValue as the source of entropy as it
// shuffle-shards a request into a queue and waits for a decision
// on what to do with that request. The descr1 and descr2 values
// play no role in the logic but appear in log messages. If
// tryAnother==true at return then the QueueSet has become
// undesirable and the client should try to find a different
// QueueSet to use; execute and afterExecution are irrelevant in
// this case. In the terms of the Go Memory Model, there was a
// call to Quiesce with a non-nil handler that happened before
// this return from Wait. Otherwise, if execute then the client
// should start executing the request and, once the request
// finishes execution or is canceled, call afterExecution().
// Otherwise the client should not execute the request and
// afterExecution is irrelevant. Canceling the context while the
// request is waiting in its queue will cut short that wait and
// cause a return with tryAnother and execute both false; later
// cancellations are the caller's problem.
Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func())
// StartRequest begins the process of handling a request. If the
// request gets queued and the number of queues is greater than
// 1 then Wait uses the given hashValue as the source of entropy
// as it shuffle-shards the request into a queue. The descr1 and
// descr2 values play no role in the logic but appear in log
// messages. This method always returns quickly (without waiting
// for the request to be dequeued). If this method returns a nil
// Request value then caller should reject the request and the
// returned bool indicates whether the QueueSet was idle at the
// moment of the return. Otherwise idle==false and the client
// must call the Wait method of the Request exactly once.
StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (req Request, idle bool)
}
// Request represents the remainder of the handling of one request
type Request interface {
// Finish determines whether to execute or reject the request and
// invokes `execute` if the decision is to execute the request.
// The returned `idle bool` value indicates whether the QueueSet
// was idle when the value was calculated, but might no longer be
// accurate by the time the client examines that value.
Finish(execute func()) (idle bool)
}
// QueuingConfig defines the configuration of the queuing aspect of a QueueSet.

View File

@ -18,6 +18,7 @@ package queueset
import (
"context"
"fmt"
"math"
"sync"
"time"
@ -56,9 +57,11 @@ type queueSetCompleter struct {
// described in this package's doc, and a pointer to one implements
// the QueueSet interface. The clock, GoRoutineCounter, and estimated
// service time should not be changed; the fields listed after the
// lock must be accessed only while holding the lock.
// This is not yet designed to support limiting concurrency without
// queuing (this will need to be added soon).
// lock must be accessed only while holding the lock. The methods of
// this type follow the naming convention that the suffix "Locked"
// means the caller must hold the lock; for a method whose name does
// not end in "Locked" either acquires the lock or does not care about
// locking.
type queueSet struct {
clock clock.PassiveClock
counter counter.GoRoutineCounter
@ -102,8 +105,6 @@ type queueSet struct {
// sum, over all the queues, of the number of requests executing
// from that queue.
totRequestsExecuting int
emptyHandler fq.EmptyHandler
}
// NewQueueSetFactory creates a new QueueSetFactory object
@ -205,28 +206,6 @@ func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shuffleshard
qs.dispatchAsMuchAsPossibleLocked()
}
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
// A quiescing QueueSet drains as normal but does not admit any
// new requests. Passing a non-nil handler means the system should
// be quiescing, a nil handler means the system should operate
// normally. A call to Wait while the system is quiescing
// will be rebuffed by returning tryAnother=true. If all the
// queues have no requests waiting nor executing while the system
// is quiescing then the handler will eventually be called with no
// locks held (even if the system becomes non-quiescing between the
// triggering state and the required call).
func (qs *queueSet) Quiesce(eh fq.EmptyHandler) {
qs.lock.Lock()
defer qs.lock.Unlock()
qs.emptyHandler = eh
if eh == nil {
return
}
// Here we check whether there are any requests queued or executing and
// if not then fork an invocation of the EmptyHandler.
qs.maybeForkEmptyHandlerLocked()
}
// A decision about a request
type requestDecision int
@ -235,142 +214,144 @@ const (
decisionExecute requestDecision = iota
decisionReject
decisionCancel
decisionTryAnother
)
// Wait uses the given hashValue as the source of entropy as it
// shuffle-shards a request into a queue and waits for a decision on
// what to do with that request. The descr1 and descr2 values play no
// role in the logic but appear in log messages; we use two because
// the main client characterizes a request by two items that, if
// bundled together in a larger data structure, would lose interesting
// details when formatted. If tryAnother==true at return then the
// QueueSet has become undesirable and the client should try to find a
// different QueueSet to use; execute and afterExecution are
// irrelevant in this case. Otherwise, if execute then the client
// should start executing the request and, once the request finishes
// execution or is canceled, call afterExecution(). Otherwise the
// client should not execute the request and afterExecution is
// irrelevant.
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) {
// StartRequest begins the process of handling a request. We take the
// approach of updating the metrics about total requests queued and
// executing on each path out of this method and Request::Wait. We do
// not update those metrics in lower level functions because there can
// be multiple lower level changes in one invocation here.
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (fq.Request, bool) {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
var req *request
decision := func() requestDecision {
// Decide what to do and update metrics accordingly. Metrics
// about total requests queued and executing are updated on
// each way out of this function rather than in lower level
// functions because there can be multiple lower level changes
// in one invocation here.
qs.lockAndSyncTime()
defer qs.lock.Unlock()
// A call to Wait while the system is quiescing will be rebuffed by
// returning `tryAnother=true`.
if qs.emptyHandler != nil {
klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.qCfg.Name, descr1, descr2)
return decisionTryAnother
}
// ========================================================================
// Step 0:
// Apply only concurrency limit, if zero queues desired
if qs.qCfg.DesiredNumQueues < 1 {
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
return decisionReject
}
req = qs.dispatchSansQueue(descr1, descr2)
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
return decisionExecute
}
// ========================================================================
// Step 1:
// 1) Start with shuffle sharding, to pick a queue.
// 2) Reject old requests that have been waiting too long
// 3) Reject current request if there is not enough concurrency shares and
// we are at max queue length
// 4) If not rejected, create a request and enqueue
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue, descr1, descr2)
defer metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting)
// req == nil means that the request was rejected - no remaining
// concurrency shares and at max queue length already
if req == nil {
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.qCfg.Name, descr1, descr2)
metrics.AddReject(qs.qCfg.Name, "queue-full")
return decisionReject
}
// ========================================================================
// Step 2:
// The next step is to invoke the method that dequeues as much
// as possible.
// This method runs a loop, as long as there are non-empty
// queues and the number currently executing is less than the
// assured concurrency value. The body of the loop uses the
// fair queuing technique to pick a queue and dispatch a
// request from that queue.
qs.dispatchAsMuchAsPossibleLocked()
defer metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
// ========================================================================
// Step 3:
// Set up a relay from the context's Done channel to the world
// of well-counted goroutines. We Are Told that every
// request's context's Done channel gets closed by the time
// the request is done being processed.
doneCh := ctx.Done()
if doneCh != nil {
qs.preCreateOrUnblockGoroutine()
go func() {
defer runtime.HandleCrash()
qs.goroutineDoneOrBlocked()
select {
case <-doneCh:
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.qCfg.Name, descr1, descr2)
qs.cancelWait(req)
}
qs.goroutineDoneOrBlocked()
}()
}
// ========================================================================
// Step 4:
// The final step in Wait is to wait on a decision from
// somewhere and then act on it.
decisionAny := req.decision.GetLocked()
qs.syncTimeLocked()
decision, isDecision := decisionAny.(requestDecision)
if !isDecision {
klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, descr1, descr2)
decision = decisionExecute // yeah, this is a no-op
}
switch decision {
case decisionReject:
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, descr1, descr2)
metrics.AddReject(qs.qCfg.Name, "time-out")
case decisionCancel:
// TODO(aaron-prindle) add metrics for this case
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, descr1, descr2)
}
// At this point, if the qs is quiescing,
// has zero requests executing, and has zero requests enqueued
// then a call to the EmptyHandler should be forked.
qs.maybeForkEmptyHandlerLocked()
return decision
}()
switch decision {
case decisionTryAnother:
return true, false, func() {}
case decisionReject, decisionCancel:
return false, false, func() {}
default:
if decision != decisionExecute {
klog.Errorf("Impossible decision %q", decision)
}
return false, true, func() {
qs.finishRequestAndDispatchAsMuchAsPossible(req)
// ========================================================================
// Step 0:
// Apply only concurrency limit, if zero queues desired
if qs.qCfg.DesiredNumQueues < 1 {
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
return nil, qs.isIdleLocked()
}
req = qs.dispatchSansQueueLocked(ctx, descr1, descr2)
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
return req, false
}
// ========================================================================
// Step 1:
// 1) Start with shuffle sharding, to pick a queue.
// 2) Reject old requests that have been waiting too long
// 3) Reject current request if there is not enough concurrency shares and
// we are at max queue length
// 4) If not rejected, create a request and enqueue
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, descr1, descr2)
defer metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting)
// req == nil means that the request was rejected - no remaining
// concurrency shares and at max queue length already
if req == nil {
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.qCfg.Name, descr1, descr2)
metrics.AddReject(qs.qCfg.Name, "queue-full")
return nil, qs.isIdleLocked()
}
// ========================================================================
// Step 2:
// The next step is to invoke the method that dequeues as much
// as possible.
// This method runs a loop, as long as there are non-empty
// queues and the number currently executing is less than the
// assured concurrency value. The body of the loop uses the
// fair queuing technique to pick a queue and dispatch a
// request from that queue.
qs.dispatchAsMuchAsPossibleLocked()
defer metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
// ========================================================================
// Step 3:
// Set up a relay from the context's Done channel to the world
// of well-counted goroutines. We Are Told that every
// request's context's Done channel gets closed by the time
// the request is done being processed.
doneCh := ctx.Done()
if doneCh != nil {
qs.preCreateOrUnblockGoroutine()
go func() {
defer runtime.HandleCrash()
qs.goroutineDoneOrBlocked()
_ = <-doneCh
// Whatever goroutine unblocked the preceding receive MUST
// have already either (a) incremented qs.counter or (b)
// known that said counter is not actually counting or (c)
// known that the count does not need to be accurate.
// BTW, the count only needs to be accurate in a test that
// uses FakeEventClock::Run().
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.qCfg.Name, descr1, descr2)
qs.cancelWait(req)
qs.goroutineDoneOrBlocked()
}()
}
return req, false
}
func (req *request) Finish(execFn func()) bool {
exec, idle := req.wait()
if !exec {
return idle
}
execFn()
return req.qs.finishRequestAndDispatchAsMuchAsPossible(req)
}
func (req *request) wait() (bool, bool) {
qs := req.qs
qs.lock.Lock()
defer qs.lock.Unlock()
if req.waitStarted {
// This can not happen, because the client is forbidden to
// call Wait twice on the same request
panic(fmt.Sprintf("Multiple calls to the Wait method, QueueSet=%s, startTime=%s, descr1=%#+v, descr2=%#+v", req.qs.qCfg.Name, req.startTime, req.descr1, req.descr2))
}
req.waitStarted = true
// ========================================================================
// Step 4:
// The final step is to wait on a decision from
// somewhere and then act on it.
decisionAny := req.decision.GetLocked()
qs.syncTimeLocked()
decision, isDecision := decisionAny.(requestDecision)
if !isDecision {
panic(fmt.Sprintf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, req.descr1, req.descr2))
}
switch decision {
case decisionReject:
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
metrics.AddReject(qs.qCfg.Name, "time-out")
return false, qs.isIdleLocked()
case decisionCancel:
// TODO(aaron-prindle) add metrics for this case
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
return false, qs.isIdleLocked()
case decisionExecute:
klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
return true, false
default:
// This can not happen, all possible values are handled above
panic(decision)
}
}
func (qs *queueSet) IsIdle() bool {
qs.lock.Lock()
defer qs.lock.Unlock()
return qs.isIdleLocked()
}
func (qs *queueSet) isIdleLocked() bool {
return qs.totRequestsWaiting == 0 && qs.totRequestsExecuting == 0
}
// lockAndSyncTime acquires the lock and updates the virtual time.
@ -389,12 +370,12 @@ func (qs *queueSet) syncTimeLocked() {
realNow := qs.clock.Now()
timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds()
qs.lastRealTime = realNow
qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatio()
qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatioLocked()
}
// getVirtualTimeRatio calculates the rate at which virtual time has
// been advancing, according to the logic in `doc.go`.
func (qs *queueSet) getVirtualTimeRatio() float64 {
func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
activeQueues := 0
reqs := 0
for _, queue := range qs.queues {
@ -419,7 +400,7 @@ func (qs *queueSet) getVirtualTimeRatio() float64 {
// returns the enqueud request on a successful enqueue
// returns nil in the case that there is no available concurrency or
// the queuelengthlimit has been reached
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64, descr1, descr2 interface{}) *request {
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) *request {
// Start with the shuffle sharding, to pick a queue.
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
@ -431,6 +412,8 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64,
// Create a request and enqueue
req := &request{
qs: qs,
ctx: ctx,
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
arrivalTime: qs.clock.Now(),
queue: queue,
@ -451,9 +434,6 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
bestQueueLen := int(math.MaxInt32)
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
qs.dealer.Deal(hashValue, func(queueIdx int) {
if queueIdx < 0 || queueIdx >= len(qs.queues) {
return
}
thisLen := len(qs.queues[queueIdx].requests)
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen)
if thisLen < bestQueueLen {
@ -496,8 +476,10 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
}
}
// rejectOrEnqueueLocked rejects or enqueues the newly arrived request if
// resource criteria isn't met
// rejectOrEnqueueLocked rejects or enqueues the newly arrived
// request, which has been assigned to a queue. If up against the
// queue length limit and the concurrency limit then returns false.
// Otherwise enqueues and returns true.
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
queue := request.queue
curQueueLength := len(queue.requests)
@ -511,7 +493,7 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
return true
}
// enqueues a request into an queueSet
// enqueues a request into its queue.
func (qs *queueSet) enqueueLocked(request *request) {
queue := request.queue
if len(queue.requests) == 0 && queue.requestsExecuting == 0 {
@ -540,14 +522,18 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
}
}
func (qs *queueSet) dispatchSansQueue(descr1, descr2 interface{}) *request {
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, descr1, descr2 interface{}) *request {
now := qs.clock.Now()
req := &request{
qs: qs,
ctx: ctx,
startTime: now,
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
arrivalTime: now,
descr1: descr1,
descr2: descr2,
}
req.decision.SetLocked(decisionExecute)
qs.totRequestsExecuting++
if klog.V(5) {
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting)
@ -586,7 +572,8 @@ func (qs *queueSet) dispatchLocked() bool {
return ok
}
// cancelWait ensures the request is not waiting
// cancelWait ensures the request is not waiting. This is only
// applicable to a request that has been assigned to a queue.
func (qs *queueSet) cancelWait(req *request) {
qs.lock.Lock()
defer qs.lock.Unlock()
@ -639,8 +626,9 @@ func (qs *queueSet) selectQueueLocked() *queue {
// finishRequestAndDispatchAsMuchAsPossible is a convenience method
// which calls finishRequest for a given request and then dispatches
// as many requests as possible. This is all of what needs to be done
// once a request finishes execution or is canceled.
func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) {
// once a request finishes execution or is canceled. This returns a bool
// indicating whether the QueueSet is now idle.
func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
@ -648,6 +636,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) {
qs.dispatchAsMuchAsPossibleLocked()
metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting)
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
return qs.isIdleLocked()
}
// finishRequestLocked is a callback that should be used when a
@ -688,11 +677,6 @@ func (qs *queueSet) finishRequestLocked(r *request) {
if qs.robinIndex >= r.queue.index {
qs.robinIndex--
}
// At this point, if the qs is quiescing,
// has zero requests executing, and has zero requests enqueued
// then a call to the EmptyHandler should be forked.
qs.maybeForkEmptyHandlerLocked()
}
}
@ -706,18 +690,6 @@ func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
return keptQueues
}
func (qs *queueSet) maybeForkEmptyHandlerLocked() {
if qs.emptyHandler != nil && qs.totRequestsWaiting == 0 &&
qs.totRequestsExecuting == 0 {
qs.preCreateOrUnblockGoroutine()
go func(eh fq.EmptyHandler) {
defer runtime.HandleCrash()
defer qs.goroutineDoneOrBlocked()
eh.HandleEmpty()
}(qs.emptyHandler)
}
}
// preCreateOrUnblockGoroutine needs to be called before creating a
// goroutine associated with this queueSet or unblocking a blocked
// one, to properly update the accounting used in testing.

View File

@ -66,21 +66,26 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet,
go func(i, j int, uc uniformClient, igr test.Integrator) {
for k := 0; k < uc.nCalls; k++ {
ClockWait(clk, counter, uc.thinkDuration)
for {
tryAnother, execute, afterExecute := qs.Wait(context.Background(), uc.hash, name, []int{i, j, k})
t.Logf("%s: %d, %d, %d got a=%v, e=%v", clk.Now().Format(nsTimeFmt), i, j, k, tryAnother, execute)
if tryAnother {
continue
}
if !execute {
atomic.AddUint64(&failedCount, 1)
break
}
req, idle := qs.StartRequest(context.Background(), uc.hash, name, []int{i, j, k})
t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle)
if req == nil {
atomic.AddUint64(&failedCount, 1)
break
}
if idle {
t.Error("got request but QueueSet reported idle")
}
var executed bool
idle2 := req.Finish(func() {
executed = true
t.Logf("%s: %d, %d, %d executing", clk.Now().Format(nsTimeFmt), i, j, k)
igr.Add(1)
ClockWait(clk, counter, uc.execDuration)
afterExecute()
igr.Add(-1)
break
})
t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", clk.Now().Format(nsTimeFmt), i, j, k, executed, idle2)
if !executed {
atomic.AddUint64(&failedCount, 1)
}
}
counter.Add(-1)
@ -262,29 +267,44 @@ func TestContextCancel(t *testing.T) {
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
counter.Add(1) // account for the goroutine running this test
ctx1 := context.Background()
another1, exec1, cleanup1 := qs.Wait(ctx1, 1, "test", "one")
if another1 || !exec1 {
t.Errorf("Unexpected: another1=%v, exec1=%v", another1, exec1)
req1, _ := qs.StartRequest(ctx1, 1, "test", "one")
if req1 == nil {
t.Error("Request rejected")
return
}
defer cleanup1()
ctx2, cancel2 := context.WithCancel(context.Background())
tBefore := time.Now()
go func() {
time.Sleep(time.Second)
cancel2()
}()
another2, exec2, cleanup2 := qs.Wait(ctx2, 2, "test", "two")
tAfter := time.Now()
if another2 || exec2 {
t.Errorf("Unexpected: another2=%v, exec2=%v", another2, exec2)
if exec2 {
defer cleanup2()
var executed1 bool
idle1 := req1.Finish(func() {
executed1 = true
ctx2, cancel2 := context.WithCancel(context.Background())
tBefore := time.Now()
go func() {
time.Sleep(time.Second)
// account for unblocking the goroutine that waits on cancelation
counter.Add(1)
cancel2()
}()
req2, idle2a := qs.StartRequest(ctx2, 2, "test", "two")
if idle2a {
t.Error("2nd StartRequest returned idle")
}
} else {
if req2 != nil {
idle2b := req2.Finish(func() {
t.Error("Executing req2")
})
if idle2b {
t.Error("2nd Finish returned idle")
}
}
tAfter := time.Now()
dt := tAfter.Sub(tBefore)
if dt < time.Second || dt > 2*time.Second {
t.Errorf("Unexpected: dt=%d", dt)
}
})
if !executed1 {
t.Errorf("Unexpected: executed1=%v", executed1)
}
if !idle1 {
t.Error("Not idle at the end")
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package queueset
import (
"context"
"time"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
@ -25,6 +26,11 @@ import (
// request is a temporary container for "requests" with additional
// tracking fields required for the functionality FQScheduler
type request struct {
qs *queueSet
ctx context.Context
// The relevant queue. Is nil if this request did not go through
// a queue.
queue *queue
// startTime is the real time when the request began executing
@ -43,6 +49,9 @@ type request struct {
// descr1 and descr2 are not used in any logic but they appear in
// log messages
descr1, descr2 interface{}
// Indicates whether client has called Request::Wait()
waitStarted bool
}
// queue is an array of requests with additional metadata required for

View File

@ -31,25 +31,33 @@ func NewNoRestraintFactory() fq.QueueSetFactory {
type noRestraintFactory struct{}
type noRestraintCompeter struct{}
type noRestraintCompleter struct{}
type noRestraint struct{}
type noRestraintRequest struct{}
func (noRestraintFactory) BeginConstruction(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
return noRestraintCompeter{}, nil
return noRestraintCompleter{}, nil
}
func (noRestraintCompeter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
func (noRestraintCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
return noRestraint{}
}
func (noRestraint) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
return noRestraintCompeter{}, nil
return noRestraintCompleter{}, nil
}
func (noRestraint) Quiesce(fq.EmptyHandler) {
func (noRestraint) IsIdle() bool {
return false
}
func (noRestraint) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (quiescent, execute bool, afterExecution func()) {
return false, true, func() {}
func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (fq.Request, bool) {
return noRestraintRequest{}, false
}
func (noRestraintRequest) Finish(execute func()) (idle bool) {
execute()
return false
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"testing"
)
func BenchmarkHash(b *testing.B) {
for i := 0; i < b.N; i++ {
hashFlowID("supercalifragilistic expialidocious", "the sound of it is something quite atrocious")
}
}

View File

@ -0,0 +1,231 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package format
import (
"bytes"
"encoding/json"
"fmt"
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
)
// This file provides an easy way to mark a value for formatting to
// `%s` in full detail IF it is printed but without costing a lot of
// CPU or memory if the value is NOT printed. The API Priority and
// Fairness API objects are formatted into JSON. The other types of
// objects here are formatted into golang source.
// Stringer marks the given value for custom formatting by this package.
type Stringer struct{ val interface{} }
// Fmt marks the given value for custom formatting by this package.
func Fmt(val interface{}) Stringer {
return Stringer{val}
}
// String formats to a string in full detail
func (sr Stringer) String() string {
if sr.val == nil {
return "nil"
}
switch typed := sr.val.(type) {
case *fcv1a1.FlowSchema,
fcv1a1.FlowSchema,
fcv1a1.FlowSchemaSpec,
fcv1a1.FlowDistinguisherMethod,
*fcv1a1.FlowDistinguisherMethod,
*fcv1a1.PolicyRulesWithSubjects,
fcv1a1.PolicyRulesWithSubjects,
fcv1a1.Subject,
fcv1a1.ResourcePolicyRule,
fcv1a1.NonResourcePolicyRule,
fcv1a1.FlowSchemaCondition,
*fcv1a1.PriorityLevelConfiguration,
fcv1a1.PriorityLevelConfiguration,
fcv1a1.PriorityLevelConfigurationSpec,
*fcv1a1.LimitedPriorityLevelConfiguration,
fcv1a1.LimitedPriorityLevelConfiguration,
fcv1a1.LimitResponse,
*fcv1a1.QueuingConfiguration,
fcv1a1.QueuingConfiguration:
return ToJSON(sr.val)
case []user.Info:
return FmtUsers(typed)
case []*request.RequestInfo:
return FmtRequests(typed)
default:
return fmt.Sprintf("%#+v", sr.val)
}
}
// ToJSON converts using encoding/json and handles errors by
// formatting them
func ToJSON(val interface{}) string {
bs, err := json.Marshal(val)
str := string(bs)
if err != nil {
str = str + "<" + err.Error() + ">"
}
return str
}
// FmtPriorityLevelConfiguration returns a golang source expression
// equivalent to the given value
func FmtPriorityLevelConfiguration(pl *fcv1a1.PriorityLevelConfiguration) string {
if pl == nil {
return "nil"
}
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("&v1alpha1.PriorityLevelConfiguration{ObjectMeta: %#+v, Spec: ",
pl.ObjectMeta))
BufferPriorityLevelConfigurationSpec(&buf, &pl.Spec)
buf.WriteString(fmt.Sprintf(", Status: %#+v}", pl.Status))
return buf.String()
}
// FmtPriorityLevelConfigurationSpec returns a golang source
// expression equivalent to the given value
func FmtPriorityLevelConfigurationSpec(plSpec *fcv1a1.PriorityLevelConfigurationSpec) string {
var buf bytes.Buffer
BufferPriorityLevelConfigurationSpec(&buf, plSpec)
return buf.String()
}
// BufferPriorityLevelConfigurationSpec writes a golang source
// expression for the given value to the given buffer
func BufferPriorityLevelConfigurationSpec(buf *bytes.Buffer, plSpec *fcv1a1.PriorityLevelConfigurationSpec) {
buf.WriteString(fmt.Sprintf("v1alpha1.PriorityLevelConfigurationSpec{Type: %#v", plSpec.Type))
if plSpec.Limited != nil {
buf.WriteString(fmt.Sprintf(", Limited: &v1alpha1.LimitedPriorityLevelConfiguration{AssuredConcurrencyShares:%d, LimitResponse:v1alpha1.LimitResponse{Type:%#v", plSpec.Limited.AssuredConcurrencyShares, plSpec.Limited.LimitResponse.Type))
if plSpec.Limited.LimitResponse.Queuing != nil {
buf.WriteString(fmt.Sprintf(", Queuing:&%#+v", *plSpec.Limited.LimitResponse.Queuing))
}
buf.WriteString(" } }")
}
buf.WriteString("}")
}
// FmtFlowSchema produces a golang source expression of the value.
func FmtFlowSchema(fs *fcv1a1.FlowSchema) string {
if fs == nil {
return "nil"
}
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("&v1alpha1.FlowSchema{ObjectMeta: %#+v, Spec: ",
fs.ObjectMeta))
BufferFlowSchemaSpec(&buf, &fs.Spec)
buf.WriteString(fmt.Sprintf(", Status: %#+v}", fs.Status))
return buf.String()
}
// FmtFlowSchemaSpec produces a golang source expression equivalent to
// the given spec
func FmtFlowSchemaSpec(fsSpec *fcv1a1.FlowSchemaSpec) string {
var buf bytes.Buffer
BufferFlowSchemaSpec(&buf, fsSpec)
return buf.String()
}
// BufferFlowSchemaSpec writes a golang source expression for the
// given value to the given buffer
func BufferFlowSchemaSpec(buf *bytes.Buffer, fsSpec *fcv1a1.FlowSchemaSpec) {
buf.WriteString(fmt.Sprintf("v1alpha1.FlowSchemaSpec{PriorityLevelConfiguration: %#+v, MatchingPrecedence: %d, DistinguisherMethod: ",
fsSpec.PriorityLevelConfiguration,
fsSpec.MatchingPrecedence))
if fsSpec.DistinguisherMethod == nil {
buf.WriteString("nil")
} else {
buf.WriteString(fmt.Sprintf("&%#+v", *fsSpec.DistinguisherMethod))
}
buf.WriteString(", Rules: []v1alpha1.PolicyRulesWithSubjects{")
for idx, rule := range fsSpec.Rules {
if idx > 0 {
buf.WriteString(", ")
}
BufferFmtPolicyRulesWithSubjectsSlim(buf, rule)
}
buf.WriteString("}}")
}
// FmtPolicyRulesWithSubjects produces a golang source expression of the value.
func FmtPolicyRulesWithSubjects(rule fcv1a1.PolicyRulesWithSubjects) string {
return "v1alpha1.PolicyRulesWithSubjects" + FmtPolicyRulesWithSubjectsSlim(rule)
}
// FmtPolicyRulesWithSubjectsSlim produces a golang source expression
// of the value but without the leading type name. See above for an
// example context where this is useful.
func FmtPolicyRulesWithSubjectsSlim(rule fcv1a1.PolicyRulesWithSubjects) string {
var buf bytes.Buffer
BufferFmtPolicyRulesWithSubjectsSlim(&buf, rule)
return buf.String()
}
// BufferFmtPolicyRulesWithSubjectsSlim writes a golang source
// expression for the given value to the given buffer but excludes the
// leading type name
func BufferFmtPolicyRulesWithSubjectsSlim(buf *bytes.Buffer, rule fcv1a1.PolicyRulesWithSubjects) {
buf.WriteString("{Subjects: []v1alpha1.Subject{")
for jdx, subj := range rule.Subjects {
if jdx > 0 {
buf.WriteString(", ")
}
buf.WriteString(fmt.Sprintf("{Kind: %q", subj.Kind))
if subj.User != nil {
buf.WriteString(fmt.Sprintf(", User: &%#+v", *subj.User))
}
if subj.Group != nil {
buf.WriteString(fmt.Sprintf(", Group: &%#+v", *subj.Group))
}
if subj.ServiceAccount != nil {
buf.WriteString(fmt.Sprintf(", ServiceAcount: &%#+v", *subj.ServiceAccount))
}
buf.WriteString("}")
}
buf.WriteString(fmt.Sprintf("}, ResourceRules: %#+v, NonResourceRules: %#+v}", rule.ResourceRules, rule.NonResourceRules))
}
// FmtUsers produces a golang source expression of the value.
func FmtUsers(list []user.Info) string {
var buf bytes.Buffer
buf.WriteString("[]user.Info{")
for idx, member := range list {
if idx > 0 {
buf.WriteString(", ")
}
buf.WriteString(fmt.Sprintf("%#+v", member))
}
buf.WriteString("}")
return buf.String()
}
// FmtRequests produces a golang source expression of the value.
func FmtRequests(list []*request.RequestInfo) string {
var buf bytes.Buffer
buf.WriteString("[]*request.RequestInfo{")
for idx, member := range list {
if idx > 0 {
buf.WriteString(", ")
}
buf.WriteString(fmt.Sprintf("%#+v", member))
}
buf.WriteString("}")
return buf.String()
}

View File

@ -0,0 +1,40 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"fmt"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
)
var _ fmt.GoStringer = RequestDigest{}
// GoString produces a golang source expression of the value.
func (rd RequestDigest) GoString() string {
return fmt.Sprintf("RequestDigest{RequestInfo: %#+v, User: %#+v}", rd.RequestInfo, rd.User)
}
var _ fmt.GoStringer = (*priorityLevelState)(nil)
// GoString produces a golang source expression of the value.
func (pls *priorityLevelState) GoString() string {
if pls == nil {
return "nil"
}
return fmt.Sprintf("&priorityLevelState{pl:%s, qsCompleter:%#+v, queues:%#+v, quiescing:%#v, numPending:%d}", fcfmt.Fmt(pls.pl), pls.qsCompleter, pls.queues, pls.quiescing, pls.numPending)
}

View File

@ -0,0 +1,646 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"fmt"
"math/rand"
"sync/atomic"
"testing"
"time"
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
)
var noRestraintQSF = fqtesting.NewNoRestraintFactory()
// genPL creates a valid PriorityLevelConfiguration with the given
// name and randomly generated spec. The given name must not be one
// of the mandatory ones.
func genPL(rng *rand.Rand, name string) *fcv1a1.PriorityLevelConfiguration {
plc := &fcv1a1.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{Name: name},
Spec: fcv1a1.PriorityLevelConfigurationSpec{
Type: fcv1a1.PriorityLevelEnablementLimited,
Limited: &fcv1a1.LimitedPriorityLevelConfiguration{
AssuredConcurrencyShares: rng.Int31n(100) + 1,
LimitResponse: fcv1a1.LimitResponse{
Type: fcv1a1.LimitResponseTypeReject}}}}
if rng.Float32() < 0.95 {
plc.Spec.Limited.LimitResponse.Type = fcv1a1.LimitResponseTypeQueue
hs := rng.Int31n(5) + 1
plc.Spec.Limited.LimitResponse.Queuing = &fcv1a1.QueuingConfiguration{
Queues: hs + rng.Int31n(20),
HandSize: hs,
QueueLengthLimit: 5}
}
_, err := qscOfPL(noRestraintQSF, nil, plc, time.Minute)
if err != nil {
panic(err)
}
return plc
}
// A FlowSchema together with characteristics relevant to testing
type fsTestingRecord struct {
fs *fcv1a1.FlowSchema
// Does this reference an existing priority level?
wellFormed bool
matchesAllResourceRequests bool
matchesAllNonResourceRequests bool
// maps `matches bool` to `isResourceRequest bool` to digests
digests map[bool]map[bool][]RequestDigest
}
func (ftr *fsTestingRecord) addDigest(digest RequestDigest, matches bool) {
ftr.digests[matches][digest.RequestInfo.IsResourceRequest] = append(ftr.digests[matches][digest.RequestInfo.IsResourceRequest], digest)
}
func (ftr *fsTestingRecord) addDigests(digests []RequestDigest, matches bool) {
for _, digest := range digests {
ftr.addDigest(digest, matches)
}
}
var flowDistinguisherMethodTypes = sets.NewString(
string(fcv1a1.FlowDistinguisherMethodByUserType),
string(fcv1a1.FlowDistinguisherMethodByNamespaceType),
)
var mandFTRExempt = &fsTestingRecord{
fs: fcboot.MandatoryFlowSchemaExempt,
wellFormed: true,
digests: map[bool]map[bool][]RequestDigest{
false: {
false: {{
RequestInfo: &request.RequestInfo{
IsResourceRequest: false,
Path: "/foo/bar",
Verb: "frobulate"},
User: &user.DefaultInfo{
Name: "nobody",
Groups: []string{user.AllAuthenticated, "nogroup"},
},
}},
true: {{
RequestInfo: &request.RequestInfo{
IsResourceRequest: true,
Verb: "mandate",
APIGroup: "nogroup",
Namespace: "nospace",
Resource: "nons",
},
User: &user.DefaultInfo{
Name: "nobody",
Groups: []string{user.AllAuthenticated, "nogroup"},
},
}},
},
true: {
false: {{
RequestInfo: &request.RequestInfo{
IsResourceRequest: false,
Path: "/foo/bar",
Verb: "frobulate"},
User: &user.DefaultInfo{
Name: "nobody",
Groups: []string{user.AllAuthenticated, user.SystemPrivilegedGroup},
},
}},
true: {{
RequestInfo: &request.RequestInfo{
IsResourceRequest: true,
Verb: "mandate",
APIGroup: "nogroup",
Namespace: "nospace",
Resource: "nons",
},
User: &user.DefaultInfo{
Name: "nobody",
Groups: []string{user.AllAuthenticated, user.SystemPrivilegedGroup},
},
}},
},
},
}
var mandFTRCatchAll = &fsTestingRecord{
fs: fcboot.MandatoryFlowSchemaCatchAll,
wellFormed: true,
digests: map[bool]map[bool][]RequestDigest{
false: {},
true: {
false: {{
RequestInfo: &request.RequestInfo{
IsResourceRequest: false,
Path: "/foo/bar",
Verb: "frobulate"},
User: &user.DefaultInfo{
Name: "nobody",
Groups: []string{user.AllAuthenticated, "nogroup"},
},
}},
true: {{
RequestInfo: &request.RequestInfo{
IsResourceRequest: true,
Verb: "mandate",
APIGroup: "nogroup",
Namespace: "nospace",
Resource: "nons",
},
User: &user.DefaultInfo{
Name: "nobody",
Groups: []string{user.AllAuthenticated, "nogroup"},
},
}},
},
},
}
// genFS creates a valid FlowSchema with the given name and randomly
// generated spec, along with characteristics relevant to testing.
// When all the FlowSchemas in a collection are generated with
// different names: (a) the matching digests match only the schema for
// which they were generated, and (b) the non-matching digests do not
// match any schema in the collection. The generated spec is
// relatively likely to be well formed but might not be. An ill
// formed spec references a priority level drawn from badPLNames.
// goodPLNames may be empty, but badPLNames may not.
func genFS(t *testing.T, rng *rand.Rand, name string, mayMatchClusterScope bool, goodPLNames, badPLNames sets.String) *fsTestingRecord {
fs := &fcv1a1.FlowSchema{
ObjectMeta: metav1.ObjectMeta{Name: name},
Spec: fcv1a1.FlowSchemaSpec{}}
// 5% chance of zero rules, otherwise draw from 1--6 biased low
nRules := (1 + rng.Intn(3)) * (1 + rng.Intn(2)) * ((19 + rng.Intn(20)) / 20)
ftr := &fsTestingRecord{fs: fs,
wellFormed: true,
matchesAllResourceRequests: nRules > 0 && rng.Float32() < 0.1,
matchesAllNonResourceRequests: nRules > 0 && rng.Float32() < 0.1,
digests: map[bool]map[bool][]RequestDigest{
false: {false: {}, true: {}},
true: {false: {}, true: {}}},
}
dangleStatus := fcv1a1.ConditionFalse
if rng.Float32() < 0.9 && len(goodPLNames) > 0 {
fs.Spec.PriorityLevelConfiguration = fcv1a1.PriorityLevelConfigurationReference{pickSetString(rng, goodPLNames)}
} else {
fs.Spec.PriorityLevelConfiguration = fcv1a1.PriorityLevelConfigurationReference{pickSetString(rng, badPLNames)}
ftr.wellFormed = false
dangleStatus = fcv1a1.ConditionTrue
}
fs.Status.Conditions = []fcv1a1.FlowSchemaCondition{{
Type: fcv1a1.FlowSchemaConditionDangling,
Status: dangleStatus}}
fs.Spec.MatchingPrecedence = rng.Int31n(9997) + 2
if rng.Float32() < 0.8 {
fdmt := fcv1a1.FlowDistinguisherMethodType(pickSetString(rng, flowDistinguisherMethodTypes))
fs.Spec.DistinguisherMethod = &fcv1a1.FlowDistinguisherMethod{fdmt}
}
fs.Spec.Rules = []fcv1a1.PolicyRulesWithSubjects{}
everyResourceMatcher := -1
if ftr.matchesAllResourceRequests {
if mayMatchClusterScope {
everyResourceMatcher = 0
} else {
everyResourceMatcher = rng.Intn(nRules)
}
}
everyNonResourceMatcher := -1
if ftr.matchesAllNonResourceRequests {
if mayMatchClusterScope {
everyNonResourceMatcher = 0
} else {
everyNonResourceMatcher = rng.Intn(nRules)
}
}
// Allow only one rule if mayMatchClusterScope because that breaks
// cross-rule exclusion.
for i := 0; i < nRules && (i == 0 || !mayMatchClusterScope); i++ {
rule, ruleMatchingRDigests, ruleMatchingNDigests, ruleSkippingRDigests, ruleSkippingNDigests := genPolicyRuleWithSubjects(t, rng, fmt.Sprintf("%s-%d", name, i+1), mayMatchClusterScope, ftr.matchesAllResourceRequests, ftr.matchesAllNonResourceRequests, i == everyResourceMatcher, i == everyNonResourceMatcher)
fs.Spec.Rules = append(fs.Spec.Rules, rule)
ftr.addDigests(ruleMatchingRDigests, true)
ftr.addDigests(ruleMatchingNDigests, true)
ftr.addDigests(ruleSkippingRDigests, false)
ftr.addDigests(ruleSkippingNDigests, false)
}
if nRules == 0 {
var skippingRDigests, skippingNDigests []RequestDigest
_, _, _, skippingRDigests, skippingNDigests = genPolicyRuleWithSubjects(t, rng, name+"-1", false, false, false, false, false)
ftr.addDigests(skippingRDigests, false)
ftr.addDigests(skippingNDigests, false)
}
t.Logf("Returning name=%s, plRef=%q, wellFormed=%v, matchesAllResourceRequests=%v, matchesAllNonResourceRequests=%v for mayMatchClusterScope=%v", fs.Name, fs.Spec.PriorityLevelConfiguration.Name, ftr.wellFormed, ftr.matchesAllResourceRequests, ftr.matchesAllNonResourceRequests, mayMatchClusterScope)
return ftr
}
var noextra = make(map[string][]string)
// Generate one valid PolicyRulesWithSubjects. Also returns: matching
// resource-style digests, matching non-resource-style digests,
// skipping (i.e., not matching the generated rule) resource-style
// digests, skipping non-resource-style digests. When a collection of
// rules is generated with unique prefixes, the skipping digests for
// each rule match no rules in the collection. The
// someMatchesAllResourceRequests and
// someMatchesAllNonResourceRequests parameters indicate whether any
// rule in the collection matches all of the relevant sort of request;
// these imply the respective returned slice of counterexamples will
// be empty. The matchAllResourceRequests and
// matchAllNonResourceRequests parameters indicate whether the
// generated rule should match all of the relevant sort. The
// cross-rule exclusion is based on using names that start with the
// given prefix --- which can not be done for the namespace of a
// cluster-scoped request. Thus, these are normally excluded. When
// mayMatchClusterScope==true the generated rule may be cluster-scoped
// and there is no promise of cross-rule exclusion.
func genPolicyRuleWithSubjects(t *testing.T, rng *rand.Rand, pfx string, mayMatchClusterScope, someMatchesAllResourceRequests, someMatchesAllNonResourceRequests, matchAllResourceRequests, matchAllNonResourceRequests bool) (fcv1a1.PolicyRulesWithSubjects, []RequestDigest, []RequestDigest, []RequestDigest, []RequestDigest) {
subjects := []fcv1a1.Subject{}
matchingUIs := []user.Info{}
skippingUIs := []user.Info{}
resourceRules := []fcv1a1.ResourcePolicyRule{}
nonResourceRules := []fcv1a1.NonResourcePolicyRule{}
matchingRRIs := []*request.RequestInfo{}
skippingRRIs := []*request.RequestInfo{}
matchingNRIs := []*request.RequestInfo{}
skippingNRIs := []*request.RequestInfo{}
nSubj := rng.Intn(4)
for i := 0; i < nSubj; i++ {
subject, smus, ssus := genSubject(rng, fmt.Sprintf("%s-%d", pfx, i+1))
subjects = append(subjects, subject)
matchingUIs = append(matchingUIs, smus...)
skippingUIs = append(skippingUIs, ssus...)
}
if matchAllResourceRequests || matchAllNonResourceRequests {
switch rng.Intn(3) {
case 0:
subjects = append(subjects, mkUserSubject("*"))
case 1:
subjects = append(subjects, mkGroupSubject("*"))
default:
subjects = append(subjects, mkGroupSubject("system:authenticated"), mkGroupSubject("system:unauthenticated"))
}
matchingUIs = append(matchingUIs, skippingUIs...)
}
if someMatchesAllResourceRequests || someMatchesAllNonResourceRequests {
skippingUIs = []user.Info{}
} else if nSubj == 0 {
_, _, skippingUIs = genSubject(rng, pfx+"-o")
}
var nRR, nNRR int
for nRR+nNRR == 0 || matchAllResourceRequests && nRR == 0 || matchAllNonResourceRequests && nNRR == 0 {
nRR = rng.Intn(4)
nNRR = rng.Intn(4)
}
allResourceMatcher := -1
if matchAllResourceRequests {
allResourceMatcher = rng.Intn(nRR)
}
// Allow only one resource rule if mayMatchClusterScope because
// that breaks cross-rule exclusion.
for i := 0; i < nRR && (i == 0 || !mayMatchClusterScope); i++ {
rr, rmrs, rsrs := genResourceRule(rng, fmt.Sprintf("%s-%d", pfx, i+1), mayMatchClusterScope, i == allResourceMatcher, someMatchesAllResourceRequests)
resourceRules = append(resourceRules, rr)
matchingRRIs = append(matchingRRIs, rmrs...)
skippingRRIs = append(skippingRRIs, rsrs...)
}
if nRR == 0 {
_, _, skippingRRIs = genResourceRule(rng, pfx+"-o", mayMatchClusterScope, false, someMatchesAllResourceRequests)
}
allNonResourceMatcher := -1
if matchAllNonResourceRequests {
allNonResourceMatcher = rng.Intn(nNRR)
}
for i := 0; i < nNRR; i++ {
nrr, nmrs, nsrs := genNonResourceRule(rng, fmt.Sprintf("%s-%d", pfx, i+1), i == allNonResourceMatcher, someMatchesAllNonResourceRequests)
nonResourceRules = append(nonResourceRules, nrr)
matchingNRIs = append(matchingNRIs, nmrs...)
skippingNRIs = append(skippingNRIs, nsrs...)
}
if nRR == 0 {
_, _, skippingNRIs = genNonResourceRule(rng, pfx+"-o", false, someMatchesAllNonResourceRequests)
}
rule := fcv1a1.PolicyRulesWithSubjects{subjects, resourceRules, nonResourceRules}
t.Logf("For pfx=%s, mayMatchClusterScope=%v, someMatchesAllResourceRequests=%v, someMatchesAllNonResourceRequests=%v, marr=%v, manrr=%v: generated prws=%s, mu=%s, su=%s, mrr=%s, mnr=%s, srr=%s, snr=%s", pfx, mayMatchClusterScope, someMatchesAllResourceRequests, someMatchesAllNonResourceRequests, matchAllResourceRequests, matchAllNonResourceRequests, fcfmt.Fmt(rule), fcfmt.Fmt(matchingUIs), fcfmt.Fmt(skippingUIs), fcfmt.Fmt(matchingRRIs), fcfmt.Fmt(matchingNRIs), fcfmt.Fmt(skippingRRIs), fcfmt.Fmt(skippingNRIs))
matchingRDigests := cross(matchingUIs, matchingRRIs)
skippingRDigests := append(append(cross(matchingUIs, skippingRRIs),
cross(skippingUIs, matchingRRIs)...),
cross(skippingUIs, skippingRRIs)...)
matchingNDigests := cross(matchingUIs, matchingNRIs)
skippingNDigests := append(append(cross(matchingUIs, skippingNRIs),
cross(skippingUIs, matchingNRIs)...),
cross(skippingUIs, skippingNRIs)...)
matchingRDigests = shuffleAndTakeDigests(t, rng, &rule, true, matchingRDigests, (1+rng.Intn(2))*(1+rng.Intn(2)))
skippingRDigests = shuffleAndTakeDigests(t, rng, &rule, false, skippingRDigests, (1+rng.Intn(2))*(1+rng.Intn(2)))
matchingNDigests = shuffleAndTakeDigests(t, rng, &rule, true, matchingNDigests, (1+rng.Intn(2))*(1+rng.Intn(2)))
skippingNDigests = shuffleAndTakeDigests(t, rng, &rule, false, skippingNDigests, (1+rng.Intn(2))*(1+rng.Intn(2)))
return rule, matchingRDigests, matchingNDigests, skippingRDigests, skippingNDigests
}
func cross(uis []user.Info, ris []*request.RequestInfo) []RequestDigest {
ans := make([]RequestDigest, 0, len(uis)*len(ris))
for _, ui := range uis {
for _, ri := range ris {
ans = append(ans, RequestDigest{RequestInfo: ri, User: ui})
}
}
return ans
}
func shuffleAndTakeDigests(t *testing.T, rng *rand.Rand, rule *fcv1a1.PolicyRulesWithSubjects, toMatch bool, digests []RequestDigest, n int) []RequestDigest {
ans := make([]RequestDigest, 0, n)
for len(ans) < n && len(digests) > 0 {
i := rng.Intn(len(digests))
digest := digests[i]
ans = append(ans, digest)
digests[i] = digests[len(digests)-1]
digests = digests[:len(digests)-1]
if rule != nil {
thisMatches := matchesPolicyRule(digest, rule)
if toMatch {
t.Logf("Added matching digest %#+v", digest)
if !thisMatches {
t.Errorf("Fail in check: rule %s does not match digest %#+v", fcfmt.Fmt(rule), digest)
}
} else {
t.Logf("Added skipping digest %#+v", digest)
if thisMatches {
t.Errorf("Fail in check: rule %s matches digest %#+v", fcfmt.Fmt(rule), digest)
}
}
}
}
return ans
}
var uCounter uint32 = 1
func uniqify(in RequestDigest) RequestDigest {
u1 := in.User.(*user.DefaultInfo)
u2 := *u1
u2.Extra = map[string][]string{"u": {fmt.Sprintf("z%d", atomic.AddUint32(&uCounter, 1))}}
return RequestDigest{User: &u2, RequestInfo: in.RequestInfo}
}
// genSubject returns a randomly generated valid Subject that matches
// on some name(s) starting with the given prefix. The first returned
// list contains members that match the generated Subject and involve
// names that begin with the given prefix. The second returned list
// contains members that mismatch the generated Subject and involve
// names that begin with the given prefix.
func genSubject(rng *rand.Rand, pfx string) (fcv1a1.Subject, []user.Info, []user.Info) {
subject := fcv1a1.Subject{}
var matchingUIs, skippingUIs []user.Info
x := rng.Float32()
switch {
case x < 0.33:
subject.Kind = fcv1a1.SubjectKindUser
subject.User, matchingUIs, skippingUIs = genUser(rng, pfx)
case x < 0.67:
subject.Kind = fcv1a1.SubjectKindGroup
subject.Group, matchingUIs, skippingUIs = genGroup(rng, pfx)
default:
subject.Kind = fcv1a1.SubjectKindServiceAccount
subject.ServiceAccount, matchingUIs, skippingUIs = genServiceAccount(rng, pfx)
}
return subject, matchingUIs, skippingUIs
}
func genUser(rng *rand.Rand, pfx string) (*fcv1a1.UserSubject, []user.Info, []user.Info) {
mui := &user.DefaultInfo{
Name: pfx + "-u",
UID: "good-id",
Groups: []string{pfx + "-g1", mg(rng), pfx + "-g2"},
Extra: noextra}
skips := []user.Info{&user.DefaultInfo{
Name: mui.Name + "x",
UID: mui.UID,
Groups: mui.Groups,
Extra: mui.Extra}}
return &fcv1a1.UserSubject{mui.Name}, []user.Info{mui}, skips
}
var groupCover = []string{"system:authenticated", "system:unauthenticated"}
func mg(rng *rand.Rand) string {
return groupCover[rng.Intn(len(groupCover))]
}
func mkUserSubject(username string) fcv1a1.Subject {
return fcv1a1.Subject{
Kind: fcv1a1.SubjectKindUser,
User: &fcv1a1.UserSubject{username},
}
}
func mkGroupSubject(group string) fcv1a1.Subject {
return fcv1a1.Subject{
Kind: fcv1a1.SubjectKindGroup,
Group: &fcv1a1.GroupSubject{group},
}
}
func genGroup(rng *rand.Rand, pfx string) (*fcv1a1.GroupSubject, []user.Info, []user.Info) {
name := pfx + "-g"
ui := &user.DefaultInfo{
Name: pfx + "-u",
UID: "good-id",
Groups: []string{name},
Extra: noextra}
if rng.Intn(2) == 0 {
ui.Groups = append([]string{mg(rng)}, ui.Groups...)
} else {
ui.Groups = append(ui.Groups, mg(rng))
}
if rng.Intn(3) == 0 {
ui.Groups = append([]string{pfx + "-h"}, ui.Groups...)
}
if rng.Intn(3) == 0 {
ui.Groups = append(ui.Groups, pfx+"-i")
}
skipper := &user.DefaultInfo{
Name: pfx + "-u",
UID: "bad-id",
Groups: []string{pfx + "-j", mg(rng)},
Extra: noextra}
if rng.Intn(2) == 0 {
skipper.Groups = append(skipper.Groups, pfx+"-k")
}
return &fcv1a1.GroupSubject{name}, []user.Info{ui}, []user.Info{skipper}
}
func genServiceAccount(rng *rand.Rand, pfx string) (*fcv1a1.ServiceAccountSubject, []user.Info, []user.Info) {
ns := pfx + "-ns"
name := pfx + "-n"
mname := name
if rng.Float32() < 0.05 {
mname = "*"
}
mui := &user.DefaultInfo{
Name: fmt.Sprintf("system:serviceaccount:%s:%s", ns, name),
UID: "good-id",
Groups: []string{pfx + "-g1", mg(rng), pfx + "-g2"},
Extra: noextra}
var skips []user.Info
if mname == "*" || rng.Intn(2) == 0 {
skips = []user.Info{&user.DefaultInfo{
Name: fmt.Sprintf("system:serviceaccount:%sx:%s", ns, name),
UID: "bad-id",
Groups: mui.Groups,
Extra: mui.Extra}}
} else {
skips = []user.Info{&user.DefaultInfo{
Name: fmt.Sprintf("system:serviceaccount:%s:%sx", ns, name),
UID: "bad-id",
Groups: mui.Groups,
Extra: mui.Extra}}
}
return &fcv1a1.ServiceAccountSubject{Namespace: ns, Name: mname}, []user.Info{mui}, skips
}
// genResourceRule randomly generates a valid ResourcePolicyRule and lists
// of matching and non-matching `*request.RequestInfo`.
func genResourceRule(rng *rand.Rand, pfx string, mayMatchClusterScope, matchAllResources, someMatchesAllResources bool) (fcv1a1.ResourcePolicyRule, []*request.RequestInfo, []*request.RequestInfo) {
namespaces := []string{pfx + "-n1", pfx + "-n2", pfx + "-n3"}
rnamespaces := namespaces
if mayMatchClusterScope && rng.Float32() < 0.1 {
namespaces[0] = ""
rnamespaces = namespaces[1:]
}
rr := fcv1a1.ResourcePolicyRule{
Verbs: []string{pfx + "-v1", pfx + "-v2", pfx + "-v3"},
APIGroups: []string{pfx + ".g1", pfx + ".g2", pfx + ".g3"},
Resources: []string{pfx + "-r1s", pfx + "-r2s", pfx + "-r3s"},
ClusterScope: namespaces[0] == "",
Namespaces: rnamespaces}
matchingRIs := genRRIs(rng, 3, rr.Verbs, rr.APIGroups, rr.Resources, namespaces)
var skippingRIs []*request.RequestInfo
if !someMatchesAllResources {
skipNSs := []string{pfx + "-n4", pfx + "-n5", pfx + "-n6"}
if mayMatchClusterScope && rr.Namespaces[0] != "" && rng.Float32() < 0.1 {
skipNSs[0] = ""
}
skippingRIs = genRRIs(rng, 3,
[]string{pfx + "-v4", pfx + "-v5", pfx + "-v6"},
[]string{pfx + ".g4", pfx + ".g5", pfx + ".g6"},
[]string{pfx + "-r4s", pfx + "-r5s", pfx + "-r6s"},
skipNSs)
}
// choose a proper subset of fields to wildcard; only matters if not matching all
starMask := rng.Intn(15)
if matchAllResources || starMask&1 == 1 && rng.Float32() < 0.1 {
rr.Verbs = []string{fcv1a1.VerbAll}
}
if matchAllResources || starMask&2 == 2 && rng.Float32() < 0.1 {
rr.APIGroups = []string{fcv1a1.APIGroupAll}
}
if matchAllResources || starMask&4 == 4 && rng.Float32() < 0.1 {
rr.Resources = []string{fcv1a1.ResourceAll}
}
if matchAllResources || starMask&8 == 8 && rng.Float32() < 0.1 {
rr.ClusterScope = true
rr.Namespaces = []string{fcv1a1.NamespaceEvery}
}
return rr, matchingRIs, skippingRIs
}
func genRRIs(rng *rand.Rand, m int, verbs, apiGroups, resources, namespaces []string) []*request.RequestInfo {
nv := len(verbs)
ng := len(apiGroups)
nr := len(resources)
nn := len(namespaces)
coords := chooseInts(rng, nv*ng*nr*nn, m)
ans := make([]*request.RequestInfo, 0, m)
for _, coord := range coords {
ans = append(ans, &request.RequestInfo{
IsResourceRequest: true,
Verb: verbs[coord%nv],
APIGroup: apiGroups[coord/nv%ng],
Resource: resources[coord/nv/ng%nr],
Namespace: namespaces[coord/nv/ng/nr]})
}
return ans
}
func genNRRIs(rng *rand.Rand, m int, verbs, urls []string) []*request.RequestInfo {
nv := len(verbs)
nu := len(urls)
coords := chooseInts(rng, nv*nu, m)
ans := make([]*request.RequestInfo, 0, m)
for _, coord := range coords {
ans = append(ans, &request.RequestInfo{
IsResourceRequest: false,
Verb: verbs[coord%nv],
Path: urls[coord/nv]})
}
return ans
}
func chooseInts(rng *rand.Rand, n, m int) []int {
ans := sets.NewInt()
for len(ans) < m {
i := rng.Intn(n)
if ans.Has(i) {
continue
}
ans.Insert(i)
}
return ans.List()
}
// genNonResourceRule returns a randomly generated valid
// NonResourcePolicyRule and lists of matching and non-matching
// `*request.RequestInfo`.
func genNonResourceRule(rng *rand.Rand, pfx string, matchAllNonResources, someMatchesAllNonResources bool) (fcv1a1.NonResourcePolicyRule, []*request.RequestInfo, []*request.RequestInfo) {
nrr := fcv1a1.NonResourcePolicyRule{
Verbs: []string{pfx + "-v1", pfx + "-v2", pfx + "-v3"},
NonResourceURLs: []string{"/" + pfx + "/p1", "/" + pfx + "/p2", "/" + pfx + "/p3"},
}
matchingRIs := genNRRIs(rng, 3, nrr.Verbs, nrr.NonResourceURLs)
var skippingRIs []*request.RequestInfo
if !someMatchesAllNonResources {
skippingRIs = genNRRIs(rng, 3,
[]string{pfx + "-v4", pfx + "-v5", pfx + "-v6"},
[]string{"/" + pfx + "/p4", "/" + pfx + "/p5", "/" + pfx + "/p6"})
}
// choose a proper subset of fields to consider wildcarding; only matters if not matching all
starMask := rng.Intn(3)
if matchAllNonResources || starMask&1 == 1 && rng.Float32() < 0.1 {
nrr.Verbs = []string{fcv1a1.VerbAll}
}
if matchAllNonResources || starMask&2 == 2 && rng.Float32() < 0.1 {
nrr.NonResourceURLs = []string{"*"}
}
return nrr, matchingRIs, skippingRIs
}
func pickSetString(rng *rand.Rand, set sets.String) string {
i, n := 0, rng.Intn(len(set))
for s := range set {
if i == n {
return s
}
i++
}
panic("empty set")
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"fmt"
"math/rand"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
)
func TestMatching(t *testing.T) {
checkFTR(t, mandFTRExempt)
checkFTR(t, mandFTRCatchAll)
rngOuter := rand.New(rand.NewSource(42))
goodPLNames := sets.NewString("pl1", "pl2", "pl3", "pl4", "pl5")
badPLNames := sets.NewString("ql1", "ql2", "ql3", "ql4", "ql5")
for i := 0; i < 300; i++ {
rng := rand.New(rand.NewSource(int64(rngOuter.Uint64())))
t.Run(fmt.Sprintf("trial%d:", i), func(t *testing.T) {
ftr := genFS(t, rng, fmt.Sprintf("fs%d", i), rng.Float32() < 0.2, goodPLNames, badPLNames)
checkFTR(t, ftr)
})
}
}
func checkFTR(t *testing.T, ftr *fsTestingRecord) {
for expectMatch, digests1 := range ftr.digests {
t.Logf("%s.digests[%v] = %#+v", ftr.fs.Name, expectMatch, digests1)
for _, digests2 := range digests1 {
for _, digest := range digests2 {
actualMatch := matchesFlowSchema(digest, ftr.fs)
if expectMatch != actualMatch {
t.Errorf("Fail for %s vs %#+v: expectedMatch=%v, actualMatch=%v", fcfmt.Fmt(ftr.fs), digest, expectMatch, actualMatch)
}
}
}
}
}
func TestPolicyRules(t *testing.T) {
rngOuter := rand.New(rand.NewSource(42))
for i := 0; i < 300; i++ {
rng := rand.New(rand.NewSource(int64(rngOuter.Uint64())))
t.Run(fmt.Sprintf("trial%d:", i), func(t *testing.T) {
r := rng.Float32()
n := rng.Float32()
policyRule, matchingRDigests, matchingNDigests, skippingRDigests, skippingNDigests := genPolicyRuleWithSubjects(t, rng, fmt.Sprintf("t%d", i), rng.Float32() < 0.2, r < 0.10, n < 0.10, r < 0.05, n < 0.05)
t.Logf("policyRule=%s, mrd=%#+v, mnd=%#+v, srd=%#+v, snd=%#+v", fcfmt.Fmt(policyRule), matchingRDigests, matchingNDigests, skippingRDigests, skippingNDigests)
for _, digest := range append(matchingRDigests, matchingNDigests...) {
if !matchesPolicyRule(digest, &policyRule) {
t.Errorf("Fail: expected %s to match %#+v but it did not", fcfmt.Fmt(policyRule), digest)
}
}
for _, digest := range append(skippingRDigests, skippingNDigests...) {
if matchesPolicyRule(digest, &policyRule) {
t.Errorf("Fail: expected %s to not match %#+v but it did", fcfmt.Fmt(policyRule), digest)
}
}
})
}
}

View File

@ -0,0 +1,191 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"strings"
fctypesv1a1 "k8s.io/api/flowcontrol/v1alpha1"
"k8s.io/apiserver/pkg/authentication/serviceaccount"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
)
// Tests whether a given request and FlowSchema match. Nobody mutates
// either input.
func matchesFlowSchema(digest RequestDigest, flowSchema *fctypesv1a1.FlowSchema) bool {
for _, policyRule := range flowSchema.Spec.Rules {
if matchesPolicyRule(digest, &policyRule) {
return true
}
}
return false
}
func matchesPolicyRule(digest RequestDigest, policyRule *fctypesv1a1.PolicyRulesWithSubjects) bool {
if !matchesASubject(digest.User, policyRule.Subjects) {
return false
}
if digest.RequestInfo.IsResourceRequest {
return matchesAResourceRule(digest.RequestInfo, policyRule.ResourceRules)
}
return matchesANonResourceRule(digest.RequestInfo, policyRule.NonResourceRules)
}
func matchesASubject(user user.Info, subjects []fctypesv1a1.Subject) bool {
for _, subject := range subjects {
if matchesSubject(user, subject) {
return true
}
}
return false
}
func matchesSubject(user user.Info, subject fctypesv1a1.Subject) bool {
switch subject.Kind {
case fctypesv1a1.SubjectKindUser:
return subject.User != nil && (subject.User.Name == fctypesv1a1.NameAll || subject.User.Name == user.GetName())
case fctypesv1a1.SubjectKindGroup:
if subject.Group == nil {
return false
}
seek := subject.Group.Name
if seek == "*" {
return true
}
for _, userGroup := range user.GetGroups() {
if userGroup == seek {
return true
}
}
return false
case fctypesv1a1.SubjectKindServiceAccount:
if subject.ServiceAccount == nil {
return false
}
if subject.ServiceAccount.Name == fctypesv1a1.NameAll {
return serviceAccountMatchesNamespace(subject.ServiceAccount.Namespace, user.GetName())
}
return serviceaccount.MatchesUsername(subject.ServiceAccount.Namespace, subject.ServiceAccount.Name, user.GetName())
default:
return false
}
}
// serviceAccountMatchesNamespace checks whether the provided service account username matches the namespace, without
// allocating. Use this when checking a service account namespace against a known string.
// This is copied from `k8s.io/apiserver/pkg/authentication/serviceaccount::MatchesUsername` and simplified to not check the name part.
func serviceAccountMatchesNamespace(namespace string, username string) bool {
const (
ServiceAccountUsernamePrefix = "system:serviceaccount:"
ServiceAccountUsernameSeparator = ":"
)
if !strings.HasPrefix(username, ServiceAccountUsernamePrefix) {
return false
}
username = username[len(ServiceAccountUsernamePrefix):]
if !strings.HasPrefix(username, namespace) {
return false
}
username = username[len(namespace):]
return strings.HasPrefix(username, ServiceAccountUsernameSeparator)
}
func matchesAResourceRule(ri *request.RequestInfo, rules []fctypesv1a1.ResourcePolicyRule) bool {
for _, rr := range rules {
if matchesResourcePolicyRule(ri, rr) {
return true
}
}
return false
}
func matchesResourcePolicyRule(ri *request.RequestInfo, policyRule fctypesv1a1.ResourcePolicyRule) bool {
if !matchPolicyRuleVerb(policyRule.Verbs, ri.Verb) {
return false
}
if !matchPolicyRuleResource(policyRule.Resources, ri.Resource, ri.Subresource) {
return false
}
if !matchPolicyRuleAPIGroup(policyRule.APIGroups, ri.APIGroup) {
return false
}
if len(ri.Namespace) == 0 {
return policyRule.ClusterScope
}
return containsString(ri.Namespace, policyRule.Namespaces, fctypesv1a1.NamespaceEvery)
}
func matchesANonResourceRule(ri *request.RequestInfo, rules []fctypesv1a1.NonResourcePolicyRule) bool {
for _, rr := range rules {
if matchesNonResourcePolicyRule(ri, rr) {
return true
}
}
return false
}
func matchesNonResourcePolicyRule(ri *request.RequestInfo, policyRule fctypesv1a1.NonResourcePolicyRule) bool {
if !matchPolicyRuleVerb(policyRule.Verbs, ri.Verb) {
return false
}
return matchPolicyRuleNonResourceURL(policyRule.NonResourceURLs, ri.Path)
}
func matchPolicyRuleVerb(policyRuleVerbs []string, requestVerb string) bool {
return containsString(requestVerb, policyRuleVerbs, fctypesv1a1.VerbAll)
}
func matchPolicyRuleNonResourceURL(policyRuleRequestURLs []string, requestPath string) bool {
return containsString(requestPath, policyRuleRequestURLs, fctypesv1a1.NonResourceAll)
}
func matchPolicyRuleAPIGroup(policyRuleAPIGroups []string, requestAPIGroup string) bool {
return containsString(requestAPIGroup, policyRuleAPIGroups, fctypesv1a1.APIGroupAll)
}
func rsJoin(requestResource, requestSubresource string) string {
seekString := requestResource
if requestSubresource != "" {
seekString = requestResource + "/" + requestSubresource
}
return seekString
}
func matchPolicyRuleResource(policyRuleRequestResources []string, requestResource, requestSubresource string) bool {
return containsString(rsJoin(requestResource, requestSubresource), policyRuleRequestResources, fctypesv1a1.ResourceAll)
}
// containsString returns true if either `x` or `wildcard` is in
// `list`. The wildcard is not a pattern to match against `x`; rather
// the presence of the wildcard in the list is the caller's way of
// saying that all values of `x` should match the list. This function
// assumes that if `wildcard` is in `list` then it is the only member
// of the list, which is enforced by validation.
func containsString(x string, list []string, wildcard string) bool {
if len(list) == 1 && list[0] == wildcard {
return true
}
for _, y := range list {
if x == y {
return true
}
}
return false
}