apf: ensure exempt request notes the classification

Kubernetes-commit: 8b2dd74c277d6a56a14e99830d39b23c5788c62e
This commit is contained in:
Abu Kashem 2021-12-05 11:29:15 -05:00 committed by Kubernetes Publisher
parent ca9ab58ee1
commit 1d83e4074a
5 changed files with 49 additions and 19 deletions

View File

@ -101,7 +101,7 @@ func WithPriorityAndFairness(
}
var classification *PriorityAndFairnessClassification
estimateWork := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) flowcontrolrequest.WorkEstimate {
noteFn := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) {
classification = &PriorityAndFairnessClassification{
FlowSchemaName: fs.Name,
FlowSchemaUID: fs.UID,
@ -111,7 +111,19 @@ func WithPriorityAndFairness(
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher))
return workEstimator(r, fs.Name, pl.Name)
}
// estimateWork is called, if at all, after noteFn
estimateWork := func() flowcontrolrequest.WorkEstimate {
if classification == nil {
// workEstimator is being invoked before classification of
// the request has completed, we should never be here though.
klog.ErrorS(fmt.Errorf("workEstimator is being invoked before classification of the request has completed"),
"Using empty FlowSchema and PriorityLevelConfiguration name", "verb", r.Method, "URI", r.RequestURI)
return workEstimator(r, "", "")
}
return workEstimator(r, classification.FlowSchemaName, classification.PriorityLevelName)
}
var served bool
@ -235,7 +247,7 @@ func WithPriorityAndFairness(
// Note that Handle will return irrespective of whether the request
// executes or is rejected. In the latter case, the function will return
// without calling the passed `execute` function.
fcIfc.Handle(handleCtx, digest, estimateWork, queueNote, execute)
fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute)
}()
select {
@ -266,7 +278,7 @@ func WithPriorityAndFairness(
handler.ServeHTTP(w, r)
}
fcIfc.Handle(ctx, digest, estimateWork, queueNote, execute)
fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute)
}
if !served {

View File

@ -87,14 +87,15 @@ func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) {
func (t fakeApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn,
execFn func(),
) {
if t.mockDecision == decisionSkipFilter {
panic("Handle should not be invoked")
}
workEstimator(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
switch t.mockDecision {
case decisionNoQueuingExecute:
execFn()
@ -390,7 +391,8 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
func (f *fakeWatchApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
_ func() fcrequest.WorkEstimate,
_ fq.QueueNoteFn,
execFn func(),
) {
@ -640,11 +642,13 @@ type fakeFilterRequestDigest struct {
func (f *fakeFilterRequestDigest) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
_ fq.QueueNoteFn, _ func(),
) {
f.requestDigestGot = &requestDigest
f.workEstimateGot = workEstimator(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "")
noteFn(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "")
f.workEstimateGot = workEstimator()
}
func TestApfWithRequestDigest(t *testing.T) {

View File

@ -799,7 +799,10 @@ func (immediateRequest) Finish(execute func()) bool {
// The returned bool indicates whether the request is exempt from
// limitation. The startWaitingTime is when the request started
// waiting in its queue, or `Time{}` if this did not happen.
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
klog.V(7).Infof("startRequest(%#+v)", rd)
cfgCtlr.lock.RLock()
defer cfgCtlr.lock.RUnlock()
@ -830,6 +833,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name
plState := cfgCtlr.priorityLevelStates[plName]
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
noteFn(selectedFlowSchema, plState.pl, "")
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
}
@ -843,7 +847,10 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
}
workEstimate := workEstimator(selectedFlowSchema, plState.pl, flowDistinguisher)
noteFn(selectedFlowSchema, plState.pl, flowDistinguisher)
workEstimate := workEstimator()
startWaitingTime = time.Now()
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)

View File

@ -42,10 +42,14 @@ const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer-
// Interface defines how the API Priority and Fairness filter interacts with the underlying system.
type Interface interface {
// Handle takes care of queuing and dispatching a request
// characterized by the given digest. The given `workEstimator` will be
// invoked with the results of request classification and must return the
// work parameters for the request. If the
// request is queued then `queueNoteFn` will be called twice,
// characterized by the given digest. The given `noteFn` will be
// invoked with the results of request classification.
// The given `workEstimator` is called, if at all, after noteFn.
// `workEstimator` will be invoked only when the request
// is classified as non 'exempt'.
// 'workEstimator', when invoked, must return the
// work parameters for the request.
// If the request is queued then `queueNoteFn` will be called twice,
// first with `true` and then with `false`; otherwise
// `queueNoteFn` will not be called at all. If Handle decides
// that the request should be executed then `execute()` will be
@ -55,7 +59,8 @@ type Interface interface {
// ctx is cancelled or times out.
Handle(ctx context.Context,
requestDigest RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn,
execFn func(),
)
@ -150,10 +155,11 @@ func NewTestable(config TestableConfig) Interface {
}
func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn,
execFn func()) {
fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, workEstimator, queueNoteFn)
fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, noteFn, workEstimator, queueNoteFn)
queued := startWaitingTime != time.Time{}
if req == nil {
if queued {

View File

@ -462,7 +462,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
startWG.Add(1)
go func(matches, isResource bool, rdu RequestDigest) {
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) fcrequest.WorkEstimate {
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) {
matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt
if testDebugLogs {
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
@ -475,6 +475,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name)
}
}
}, func() fcrequest.WorkEstimate {
return fcrequest.WorkEstimate{InitialSeats: 1}
}, func(inQueue bool) {
}, func() {