Make QueueSet support exempt behavior; use it
Signed-off-by: Mike Spreitzer <mspreitz@us.ibm.com> Kubernetes-commit: f269acd12b225f6a2dbbfae64a475f73f448b918
This commit is contained in:
		
							parent
							
								
									4cf166b68a
								
							
						
					
					
						commit
						078694d35d
					
				|  | @ -197,16 +197,15 @@ type priorityLevelState struct { | ||||||
| 	pl *flowcontrol.PriorityLevelConfiguration | 	pl *flowcontrol.PriorityLevelConfiguration | ||||||
| 
 | 
 | ||||||
| 	// qsCompleter holds the QueueSetCompleter derived from `config`
 | 	// qsCompleter holds the QueueSetCompleter derived from `config`
 | ||||||
| 	// and `queues` if config is not exempt, nil otherwise.
 | 	// and `queues`.
 | ||||||
| 	qsCompleter fq.QueueSetCompleter | 	qsCompleter fq.QueueSetCompleter | ||||||
| 
 | 
 | ||||||
| 	// The QueueSet for this priority level.  This is nil if and only
 | 	// The QueueSet for this priority level.
 | ||||||
| 	// if the priority level is exempt.
 | 	// Never nil.
 | ||||||
| 	queues fq.QueueSet | 	queues fq.QueueSet | ||||||
| 
 | 
 | ||||||
| 	// quiescing==true indicates that this priority level should be
 | 	// quiescing==true indicates that this priority level should be
 | ||||||
| 	// removed when its queues have all drained.  May be true only if
 | 	// removed when its queues have all drained.
 | ||||||
| 	// queues is non-nil.
 |  | ||||||
| 	quiescing bool | 	quiescing bool | ||||||
| 
 | 
 | ||||||
| 	// number of goroutines between Controller::Match and calling the
 | 	// number of goroutines between Controller::Match and calling the
 | ||||||
|  | @ -384,9 +383,6 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta | ||||||
| 	items := make([]allocProblemItem, 0, len(plStates)) | 	items := make([]allocProblemItem, 0, len(plStates)) | ||||||
| 	plNames := make([]string, 0, len(plStates)) | 	plNames := make([]string, 0, len(plStates)) | ||||||
| 	for plName, plState := range plStates { | 	for plName, plState := range plStates { | ||||||
| 		if plState.pl.Spec.Limited == nil { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		obs := plState.seatDemandIntegrator.Reset() | 		obs := plState.seatDemandIntegrator.Reset() | ||||||
| 		plState.seatDemandStats.update(obs) | 		plState.seatDemandStats.update(obs) | ||||||
| 		// Lower bound on this priority level's adjusted concurreny limit is the lesser of:
 | 		// Lower bound on this priority level's adjusted concurreny limit is the lesser of:
 | ||||||
|  | @ -403,7 +399,7 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta | ||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
| 	if len(items) == 0 && cfgCtlr.nominalCLSum > 0 { | 	if len(items) == 0 && cfgCtlr.nominalCLSum > 0 { | ||||||
| 		klog.ErrorS(nil, "Impossible: no non-exempt priority levels", "plStates", cfgCtlr.priorityLevelStates) | 		klog.ErrorS(nil, "Impossible: no priority levels", "plStates", cfgCtlr.priorityLevelStates) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items) | 	allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items) | ||||||
|  | @ -412,17 +408,11 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta | ||||||
| 		allocs = make([]float64, len(items)) | 		allocs = make([]float64, len(items)) | ||||||
| 		for idx, plName := range plNames { | 		for idx, plName := range plNames { | ||||||
| 			plState := plStates[plName] | 			plState := plStates[plName] | ||||||
| 			if plState.pl.Spec.Limited == nil { |  | ||||||
| 				continue |  | ||||||
| 			} |  | ||||||
| 			allocs[idx] = float64(plState.currentCL) | 			allocs[idx] = float64(plState.currentCL) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	for idx, plName := range plNames { | 	for idx, plName := range plNames { | ||||||
| 		plState := plStates[plName] | 		plState := plStates[plName] | ||||||
| 		if plState.pl.Spec.Limited == nil { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		if setCompleters { | 		if setCompleters { | ||||||
| 			qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues, | 			qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues, | ||||||
| 				plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, | 				plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, | ||||||
|  | @ -441,8 +431,15 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta | ||||||
| 		if relChange >= 0.05 { | 		if relChange >= 0.05 { | ||||||
| 			logLevel = 2 | 			logLevel = 2 | ||||||
| 		} | 		} | ||||||
| 		klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "backstop", err != nil) | 		var concurrencyDenominator int | ||||||
| 		plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL}) | 		if currentCL > 0 { | ||||||
|  | 			concurrencyDenominator = currentCL | ||||||
|  | 		} else { | ||||||
|  | 			concurrencyDenominator = int(math.Max(1, math.Round(float64(cfgCtlr.serverConcurrencyLimit)/10))) | ||||||
|  | 		} | ||||||
|  | 		plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyDenominator)) | ||||||
|  | 		klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "concurrencyDenominator", concurrencyDenominator, "backstop", err != nil) | ||||||
|  | 		plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL, ConcurrencyDenominator: concurrencyDenominator}) | ||||||
| 	} | 	} | ||||||
| 	metrics.SetFairFrac(float64(fairFrac)) | 	metrics.SetFairFrac(float64(fairFrac)) | ||||||
| } | } | ||||||
|  | @ -690,9 +687,8 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi | ||||||
| 			klog.V(3).Infof("Priority level %q was undesired and has become desired again", pl.Name) | 			klog.V(3).Infof("Priority level %q was undesired and has become desired again", pl.Name) | ||||||
| 			state.quiescing = false | 			state.quiescing = false | ||||||
| 		} | 		} | ||||||
| 		if state.pl.Spec.Limited != nil { | 		nominalConcurrencyShares, _, _ := plSpecCommons(state.pl) | ||||||
| 			meal.shareSum += float64(state.pl.Spec.Limited.NominalConcurrencyShares) | 		meal.shareSum += float64(nominalConcurrencyShares) | ||||||
| 		} |  | ||||||
| 		meal.haveExemptPL = meal.haveExemptPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt | 		meal.haveExemptPL = meal.haveExemptPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt | ||||||
| 		meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameCatchAll | 		meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameCatchAll | ||||||
| 	} | 	} | ||||||
|  | @ -765,15 +761,15 @@ func (meal *cfgMeal) processOldPLsLocked() { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		if plName == flowcontrol.PriorityLevelConfigurationNameExempt && !meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll && !meal.haveCatchAllPL { | 		if plName == flowcontrol.PriorityLevelConfigurationNameExempt && !meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll && !meal.haveCatchAllPL { | ||||||
| 			// BTW, we know the Spec has not changed because the
 | 			// BTW, we know the Spec has not changed what is says about queuing because the
 | ||||||
| 			// mandatory objects have immutable Specs
 | 			// mandatory objects have immutable Specs as far as queuing is concerned.
 | ||||||
| 			klog.V(3).Infof("Retaining mandatory priority level %q despite lack of API object", plName) | 			klog.V(3).Infof("Retaining mandatory priority level %q despite lack of API object", plName) | ||||||
| 		} else { | 		} else { | ||||||
| 			if plState.queues == nil || plState.numPending == 0 && plState.queues.IsIdle() { | 			if plState.numPending == 0 && plState.queues.IsIdle() { | ||||||
| 				// Either there are no queues or they are done
 | 				// The QueueSet is done
 | ||||||
| 				// draining and no use is coming from another
 | 				// draining and no use is coming from another
 | ||||||
| 				// goroutine
 | 				// goroutine
 | ||||||
| 				klog.V(3).Infof("Removing undesired priority level %q (nilQueues=%v), Type=%v", plName, plState.queues == nil, plState.pl.Spec.Type) | 				klog.V(3).Infof("Removing undesired priority level %q, Type=%v", plName, plState.pl.Spec.Type) | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 			if !plState.quiescing { | 			if !plState.quiescing { | ||||||
|  | @ -789,15 +785,14 @@ func (meal *cfgMeal) processOldPLsLocked() { | ||||||
| 			// This can not happen because queueSetCompleterForPL already approved this config
 | 			// This can not happen because queueSetCompleterForPL already approved this config
 | ||||||
| 			panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) | 			panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) | ||||||
| 		} | 		} | ||||||
| 		if plState.pl.Spec.Limited != nil { | 		// We deliberately include the lingering priority levels
 | ||||||
| 			// We deliberately include the lingering priority levels
 | 		// here so that their queues get some concurrency and they
 | ||||||
| 			// here so that their queues get some concurrency and they
 | 		// continue to drain.  During this interim a lingering
 | ||||||
| 			// continue to drain.  During this interim a lingering
 | 		// priority level continues to get a concurrency
 | ||||||
| 			// priority level continues to get a concurrency
 | 		// allocation determined by all the share values in the
 | ||||||
| 			// allocation determined by all the share values in the
 | 		// regular way.
 | ||||||
| 			// regular way.
 | 		nominalConcurrencyShares, _, _ := plSpecCommons(plState.pl) | ||||||
| 			meal.shareSum += float64(plState.pl.Spec.Limited.NominalConcurrencyShares) | 		meal.shareSum += float64(nominalConcurrencyShares) | ||||||
| 		} |  | ||||||
| 		meal.haveExemptPL = meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameExempt | 		meal.haveExemptPL = meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameExempt | ||||||
| 		meal.haveCatchAllPL = meal.haveCatchAllPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll | 		meal.haveCatchAllPL = meal.haveCatchAllPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll | ||||||
| 		meal.newPLStates[plName] = plState | 		meal.newPLStates[plName] = plState | ||||||
|  | @ -809,41 +804,35 @@ func (meal *cfgMeal) processOldPLsLocked() { | ||||||
| // QueueSets.
 | // QueueSets.
 | ||||||
| func (meal *cfgMeal) finishQueueSetReconfigsLocked() { | func (meal *cfgMeal) finishQueueSetReconfigsLocked() { | ||||||
| 	for plName, plState := range meal.newPLStates { | 	for plName, plState := range meal.newPLStates { | ||||||
| 		if plState.pl.Spec.Limited == nil { | 		nominalConcurrencyShares, lendablePercent, borrowingLimitPercent := plSpecCommons(plState.pl) | ||||||
| 			klog.V(5).Infof("Using exempt priority level %q: quiescing=%v", plName, plState.quiescing) |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		limited := plState.pl.Spec.Limited |  | ||||||
| 		// The use of math.Ceil here means that the results might sum
 | 		// The use of math.Ceil here means that the results might sum
 | ||||||
| 		// to a little more than serverConcurrencyLimit but the
 | 		// to a little more than serverConcurrencyLimit but the
 | ||||||
| 		// difference will be negligible.
 | 		// difference will be negligible.
 | ||||||
| 		concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(limited.NominalConcurrencyShares) / meal.shareSum)) | 		concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(nominalConcurrencyShares) / meal.shareSum)) | ||||||
| 		var lendableCL, borrowingCL int | 		var lendableCL, borrowingCL int | ||||||
| 		if limited.LendablePercent != nil { | 		if lendablePercent != nil { | ||||||
| 			lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.LendablePercent) / 100)) | 			lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*lendablePercent) / 100)) | ||||||
| 		} | 		} | ||||||
| 		if limited.BorrowingLimitPercent != nil { | 		if borrowingLimitPercent != nil { | ||||||
| 			borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.BorrowingLimitPercent) / 100)) | 			borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*borrowingLimitPercent) / 100)) | ||||||
| 		} else { | 		} else { | ||||||
| 			borrowingCL = meal.cfgCtlr.serverConcurrencyLimit | 			borrowingCL = meal.cfgCtlr.serverConcurrencyLimit | ||||||
| 		} | 		} | ||||||
|  | 
 | ||||||
| 		metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit, concurrencyLimit-lendableCL, concurrencyLimit+borrowingCL) | 		metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit, concurrencyLimit-lendableCL, concurrencyLimit+borrowingCL) | ||||||
| 		plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyLimit)) |  | ||||||
| 		cfgChanged := plState.nominalCL != concurrencyLimit || plState.minCL != concurrencyLimit-lendableCL || plState.maxCL != concurrencyLimit+borrowingCL | 		cfgChanged := plState.nominalCL != concurrencyLimit || plState.minCL != concurrencyLimit-lendableCL || plState.maxCL != concurrencyLimit+borrowingCL | ||||||
| 		plState.nominalCL = concurrencyLimit | 		plState.nominalCL = concurrencyLimit | ||||||
| 		plState.minCL = concurrencyLimit - lendableCL | 		plState.minCL = concurrencyLimit - lendableCL | ||||||
| 		plState.maxCL = concurrencyLimit + borrowingCL | 		plState.maxCL = concurrencyLimit + borrowingCL | ||||||
| 		meal.maxExecutingRequests += concurrencyLimit | 		meal.maxExecutingRequests += concurrencyLimit | ||||||
| 		var waitLimit int | 		if limited := plState.pl.Spec.Limited; limited != nil { | ||||||
| 		if qCfg := limited.LimitResponse.Queuing; qCfg != nil { | 			if qCfg := limited.LimitResponse.Queuing; qCfg != nil { | ||||||
| 			waitLimit = int(qCfg.Queues * qCfg.QueueLengthLimit) | 				meal.maxWaitingRequests += int(qCfg.Queues * qCfg.QueueLengthLimit) | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 		meal.maxWaitingRequests += waitLimit |  | ||||||
| 
 |  | ||||||
| 		if plState.queues == nil { | 		if plState.queues == nil { | ||||||
| 			initialCL := concurrencyLimit - lendableCL/2 | 			initialCL := concurrencyLimit - lendableCL/2 | ||||||
| 			klog.V(2).Infof("Introducing queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, initialCL, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) | 			klog.V(2).Infof("Introducing queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, initialCL, plState.quiescing, nominalConcurrencyShares, meal.shareSum) | ||||||
| 			plState.seatDemandStats = seatDemandStats{} | 			plState.seatDemandStats = seatDemandStats{} | ||||||
| 			plState.currentCL = initialCL | 			plState.currentCL = initialCL | ||||||
| 		} else { | 		} else { | ||||||
|  | @ -851,7 +840,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { | ||||||
| 			if cfgChanged { | 			if cfgChanged { | ||||||
| 				logLevel = 2 | 				logLevel = 2 | ||||||
| 			} | 			} | ||||||
| 			klog.V(logLevel).Infof("Retaining queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, plState.currentCL, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) | 			klog.V(logLevel).Infof("Retaining queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, plState.currentCL, plState.quiescing, plState.numPending, nominalConcurrencyShares, meal.shareSum) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	meal.cfgCtlr.nominalCLSum = meal.maxExecutingRequests | 	meal.cfgCtlr.nominalCLSum = meal.maxExecutingRequests | ||||||
|  | @ -859,32 +848,32 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
 | // queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
 | ||||||
| // given priority level configuration.  Returns nil if that config
 | // given priority level configuration.  Returns nil and an error if the given
 | ||||||
| // does not call for limiting.  Returns nil and an error if the given
 |  | ||||||
| // object is malformed in a way that is a problem for this package.
 | // object is malformed in a way that is a problem for this package.
 | ||||||
| func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) { | func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) { | ||||||
| 	if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { | 	if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited) != (pl.Spec.Limited != nil) { | ||||||
| 		return nil, errors.New("broken union structure at the top") | 		return nil, errors.New("broken union structure at the top, for Limited") | ||||||
| 	} | 	} | ||||||
| 	if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt) { | 	if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt) { | ||||||
| 		// This package does not attempt to cope with a priority level dynamically switching between exempt and not.
 | 		// This package does not attempt to cope with a priority level dynamically switching between exempt and not.
 | ||||||
| 		return nil, errors.New("non-alignment between name and type") | 		return nil, errors.New("non-alignment between name and type") | ||||||
| 	} | 	} | ||||||
| 	if pl.Spec.Limited == nil { |  | ||||||
| 		return nil, nil |  | ||||||
| 	} |  | ||||||
| 	if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) { |  | ||||||
| 		return nil, errors.New("broken union structure for limit response") |  | ||||||
| 	} |  | ||||||
| 	qcAPI := pl.Spec.Limited.LimitResponse.Queuing |  | ||||||
| 	qcQS := fq.QueuingConfig{Name: pl.Name} | 	qcQS := fq.QueuingConfig{Name: pl.Name} | ||||||
| 	if qcAPI != nil { | 	if pl.Spec.Limited != nil { | ||||||
| 		qcQS = fq.QueuingConfig{Name: pl.Name, | 		if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) { | ||||||
| 			DesiredNumQueues: int(qcAPI.Queues), | 			return nil, errors.New("broken union structure for limit response") | ||||||
| 			QueueLengthLimit: int(qcAPI.QueueLengthLimit), |  | ||||||
| 			HandSize:         int(qcAPI.HandSize), |  | ||||||
| 			RequestWaitLimit: requestWaitLimit, |  | ||||||
| 		} | 		} | ||||||
|  | 		qcAPI := pl.Spec.Limited.LimitResponse.Queuing | ||||||
|  | 		if qcAPI != nil { | ||||||
|  | 			qcQS = fq.QueuingConfig{Name: pl.Name, | ||||||
|  | 				DesiredNumQueues: int(qcAPI.Queues), | ||||||
|  | 				QueueLengthLimit: int(qcAPI.QueueLengthLimit), | ||||||
|  | 				HandSize:         int(qcAPI.HandSize), | ||||||
|  | 				RequestWaitLimit: requestWaitLimit, | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		qcQS = fq.QueuingConfig{Name: pl.Name, DesiredNumQueues: -1} | ||||||
| 	} | 	} | ||||||
| 	var qsc fq.QueueSetCompleter | 	var qsc fq.QueueSetCompleter | ||||||
| 	var err error | 	var err error | ||||||
|  | @ -894,7 +883,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow | ||||||
| 		qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandGauge) | 		qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandGauge) | ||||||
| 	} | 	} | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err) | 		err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcQS, err) | ||||||
| 	} | 	} | ||||||
| 	return qsc, err | 	return qsc, err | ||||||
| } | } | ||||||
|  | @ -962,13 +951,6 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type immediateRequest struct{} |  | ||||||
| 
 |  | ||||||
| func (immediateRequest) Finish(execute func()) bool { |  | ||||||
| 	execute() |  | ||||||
| 	return false |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // startRequest classifies and, if appropriate, enqueues the request.
 | // startRequest classifies and, if appropriate, enqueues the request.
 | ||||||
| // Returns a nil Request if and only if the request is to be rejected.
 | // Returns a nil Request if and only if the request is to be rejected.
 | ||||||
| // The returned bool indicates whether the request is exempt from
 | // The returned bool indicates whether the request is exempt from
 | ||||||
|  | @ -1007,32 +989,31 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig | ||||||
| 	} | 	} | ||||||
| 	plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name | 	plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name | ||||||
| 	plState := cfgCtlr.priorityLevelStates[plName] | 	plState := cfgCtlr.priorityLevelStates[plName] | ||||||
| 	if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { |  | ||||||
| 		noteFn(selectedFlowSchema, plState.pl, "") |  | ||||||
| 		klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName) |  | ||||||
| 		return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{} |  | ||||||
| 	} |  | ||||||
| 	var numQueues int32 | 	var numQueues int32 | ||||||
| 	if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue { |  | ||||||
| 		numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues |  | ||||||
| 	} |  | ||||||
| 	var flowDistinguisher string |  | ||||||
| 	var hashValue uint64 | 	var hashValue uint64 | ||||||
| 	if numQueues > 1 { | 	var flowDistinguisher string | ||||||
| 		flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod) | 	if plState.pl.Spec.Type != flowcontrol.PriorityLevelEnablementExempt { | ||||||
| 		hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher) | 		if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue { | ||||||
|  | 			numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues | ||||||
|  | 		} | ||||||
|  | 		if numQueues > 1 { | ||||||
|  | 			flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod) | ||||||
|  | 			hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	noteFn(selectedFlowSchema, plState.pl, flowDistinguisher) | 	noteFn(selectedFlowSchema, plState.pl, flowDistinguisher) | ||||||
| 	workEstimate := workEstimator() | 	workEstimate := workEstimator() | ||||||
| 
 | 
 | ||||||
| 	startWaitingTime = cfgCtlr.clock.Now() | 	if plState.pl.Spec.Type != flowcontrol.PriorityLevelEnablementExempt { | ||||||
|  | 		startWaitingTime = cfgCtlr.clock.Now() | ||||||
|  | 	} | ||||||
| 	klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) | 	klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) | ||||||
| 	req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) | 	req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) | ||||||
| 	if idle { | 	if idle { | ||||||
| 		cfgCtlr.maybeReapReadLocked(plName, plState) | 		cfgCtlr.maybeReapReadLocked(plName, plState) | ||||||
| 	} | 	} | ||||||
| 	return selectedFlowSchema, plState.pl, false, req, startWaitingTime | 	return selectedFlowSchema, plState.pl, plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt, req, startWaitingTime | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // maybeReap will remove the last internal traces of the named
 | // maybeReap will remove the last internal traces of the named
 | ||||||
|  | @ -1046,10 +1027,6 @@ func (cfgCtlr *configController) maybeReap(plName string) { | ||||||
| 		klog.V(7).Infof("plName=%s, plState==nil", plName) | 		klog.V(7).Infof("plName=%s, plState==nil", plName) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	if plState.queues == nil { |  | ||||||
| 		klog.V(7).Infof("plName=%s, plState.queues==nil", plName) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	useless := plState.quiescing && plState.numPending == 0 && plState.queues.IsIdle() | 	useless := plState.quiescing && plState.numPending == 0 && plState.queues.IsIdle() | ||||||
| 	klog.V(7).Infof("plState.quiescing=%v, plState.numPending=%d, useless=%v", plState.quiescing, plState.numPending, useless) | 	klog.V(7).Infof("plState.quiescing=%v, plState.numPending=%d, useless=%v", plState.quiescing, plState.numPending, useless) | ||||||
| 	if !useless { | 	if !useless { | ||||||
|  | @ -1107,3 +1084,12 @@ func relDiff(x, y float64) float64 { | ||||||
| 	} | 	} | ||||||
| 	return diff / den | 	return diff / den | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // plSpecCommons returns the (NominalConcurrencyShares, LendablePercent, BorrowingLimitPercent) of the given priority level config
 | ||||||
|  | func plSpecCommons(pl *flowcontrol.PriorityLevelConfiguration) (int32, *int32, *int32) { | ||||||
|  | 	if limiter := pl.Spec.Limited; limiter != nil { | ||||||
|  | 		return limiter.NominalConcurrencyShares, limiter.LendablePercent, limiter.BorrowingLimitPercent | ||||||
|  | 	} | ||||||
|  | 	var zero int32 | ||||||
|  | 	return 0, &zero, &zero | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -75,22 +75,6 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if plState.queues == nil { |  | ||||||
| 			tabPrint(tabWriter, row( |  | ||||||
| 				plState.pl.Name, // 1
 |  | ||||||
| 				"<none>",        // 2
 |  | ||||||
| 				"<none>",        // 3
 |  | ||||||
| 				"<none>",        // 4
 |  | ||||||
| 				"<none>",        // 5
 |  | ||||||
| 				"<none>",        // 6
 |  | ||||||
| 				"<none>",        // 7
 |  | ||||||
| 				"<none>",        // 8
 |  | ||||||
| 				"<none>",        // 9
 |  | ||||||
| 				"<none>",        // 10
 |  | ||||||
| 			)) |  | ||||||
| 			endLine(tabWriter) |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		queueSetDigest := plState.queues.Dump(false) | 		queueSetDigest := plState.queues.Dump(false) | ||||||
| 		activeQueueNum := 0 | 		activeQueueNum := 0 | ||||||
| 		for _, q := range queueSetDigest.Queues { | 		for _, q := range queueSetDigest.Queues { | ||||||
|  | @ -134,21 +118,6 @@ func (cfgCtlr *configController) dumpQueues(w http.ResponseWriter, r *http.Reque | ||||||
| 	tabPrint(tabWriter, rowForHeaders(columnHeaders)) | 	tabPrint(tabWriter, rowForHeaders(columnHeaders)) | ||||||
| 	endLine(tabWriter) | 	endLine(tabWriter) | ||||||
| 	for _, plState := range cfgCtlr.priorityLevelStates { | 	for _, plState := range cfgCtlr.priorityLevelStates { | ||||||
| 		if plState.queues == nil { |  | ||||||
| 			tabPrint(tabWriter, row( |  | ||||||
| 				plState.pl.Name, // 1
 |  | ||||||
| 				"<none>",        // 2
 |  | ||||||
| 				"<none>",        // 3
 |  | ||||||
| 				"<none>",        // 4
 |  | ||||||
| 				"<none>",        // 5
 |  | ||||||
| 				"<none>",        // 6
 |  | ||||||
| 				"<none>",        // 7
 |  | ||||||
| 				"<none>",        // 8
 |  | ||||||
| 				"<none>",        // 9
 |  | ||||||
| 			)) |  | ||||||
| 			endLine(tabWriter) |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		queueSetDigest := plState.queues.Dump(false) | 		queueSetDigest := plState.queues.Dump(false) | ||||||
| 		for i, q := range queueSetDigest.Queues { | 		for i, q := range queueSetDigest.Queues { | ||||||
| 			tabPrint(tabWriter, row( | 			tabPrint(tabWriter, row( | ||||||
|  | @ -201,9 +170,6 @@ func (cfgCtlr *configController) dumpRequests(w http.ResponseWriter, r *http.Req | ||||||
| 	} | 	} | ||||||
| 	endLine(tabWriter) | 	endLine(tabWriter) | ||||||
| 	for _, plState := range cfgCtlr.priorityLevelStates { | 	for _, plState := range cfgCtlr.priorityLevelStates { | ||||||
| 		if plState.queues == nil { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		queueSetDigest := plState.queues.Dump(includeRequestDetails) | 		queueSetDigest := plState.queues.Dump(includeRequestDetails) | ||||||
| 		for iq, q := range queueSetDigest.Queues { | 		for iq, q := range queueSetDigest.Queues { | ||||||
| 			for ir, r := range q.Requests { | 			for ir, r := range q.Requests { | ||||||
|  |  | ||||||
|  | @ -221,17 +221,12 @@ func (cts *ctlrTestState) popHeldRequest() (plName string, hr *heldRequest, nCou | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| var mandQueueSetNames, exclQueueSetNames = func() (sets.String, sets.String) { | var mandQueueSetNames = func() sets.String { | ||||||
| 	mandQueueSetNames := sets.NewString() | 	mandQueueSetNames := sets.NewString() | ||||||
| 	exclQueueSetNames := sets.NewString() |  | ||||||
| 	for _, mpl := range fcboot.MandatoryPriorityLevelConfigurations { | 	for _, mpl := range fcboot.MandatoryPriorityLevelConfigurations { | ||||||
| 		if mpl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { | 		mandQueueSetNames.Insert(mpl.Name) | ||||||
| 			exclQueueSetNames.Insert(mpl.Name) |  | ||||||
| 		} else { |  | ||||||
| 			mandQueueSetNames.Insert(mpl.Name) |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
| 	return mandQueueSetNames, exclQueueSetNames | 	return mandQueueSetNames | ||||||
| }() | }() | ||||||
| 
 | 
 | ||||||
| func TestConfigConsumer(t *testing.T) { | func TestConfigConsumer(t *testing.T) { | ||||||
|  | @ -280,7 +275,7 @@ func TestConfigConsumer(t *testing.T) { | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 				persistingPLNames = nextPLNames.Union(desiredPLNames) | 				persistingPLNames = nextPLNames.Union(desiredPLNames) | ||||||
| 				expectedQueueSetNames := persistingPLNames.Union(mandQueueSetNames).Difference(exclQueueSetNames) | 				expectedQueueSetNames := persistingPLNames.Union(mandQueueSetNames) | ||||||
| 				allQueueSetNames := cts.getQueueSetNames() | 				allQueueSetNames := cts.getQueueSetNames() | ||||||
| 				missingQueueSetNames := expectedQueueSetNames.Difference(allQueueSetNames) | 				missingQueueSetNames := expectedQueueSetNames.Difference(allQueueSetNames) | ||||||
| 				if len(missingQueueSetNames) > 0 { | 				if len(missingQueueSetNames) > 0 { | ||||||
|  |  | ||||||
|  | @ -34,7 +34,10 @@ type QueueSetFactory interface { | ||||||
| 	// BeginConstruction does the first phase of creating a QueueSet.
 | 	// BeginConstruction does the first phase of creating a QueueSet.
 | ||||||
| 	// The RatioedGaugePair observes number of requests,
 | 	// The RatioedGaugePair observes number of requests,
 | ||||||
| 	// execution covering just the regular phase.
 | 	// execution covering just the regular phase.
 | ||||||
|  | 	// The denominator for the waiting phase is
 | ||||||
|  | 	// max(1, QueuingConfig.QueueLengthLimit) X max(1, QueuingConfig.DesiredNumQueues).
 | ||||||
| 	// The RatioedGauge observes number of seats occupied through all phases of execution.
 | 	// The RatioedGauge observes number of seats occupied through all phases of execution.
 | ||||||
|  | 	// The denominator for all the ratioed concurrency gauges is supplied later in the DispatchingConfig.
 | ||||||
| 	// The Gauge observes the seat demand (executing + queued seats).
 | 	// The Gauge observes the seat demand (executing + queued seats).
 | ||||||
| 	BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, metrics.Gauge) (QueueSetCompleter, error) | 	BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, metrics.Gauge) (QueueSetCompleter, error) | ||||||
| } | } | ||||||
|  | @ -113,8 +116,11 @@ type QueuingConfig struct { | ||||||
| 	Name string | 	Name string | ||||||
| 
 | 
 | ||||||
| 	// DesiredNumQueues is the number of queues that the API says
 | 	// DesiredNumQueues is the number of queues that the API says
 | ||||||
| 	// should exist now.  This may be zero, in which case
 | 	// should exist now.  This may be non-positive, in which case
 | ||||||
| 	// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
 | 	// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
 | ||||||
|  | 	// A value of zero means to respect the ConcurrencyLimit of the DispatchingConfig.
 | ||||||
|  | 	// A negative value means to always dispatch immediately upon arrival
 | ||||||
|  | 	// (i.e., the requests are "exempt" from limitation).
 | ||||||
| 	DesiredNumQueues int | 	DesiredNumQueues int | ||||||
| 
 | 
 | ||||||
| 	// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
 | 	// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
 | ||||||
|  | @ -133,4 +139,8 @@ type QueuingConfig struct { | ||||||
| type DispatchingConfig struct { | type DispatchingConfig struct { | ||||||
| 	// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
 | 	// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
 | ||||||
| 	ConcurrencyLimit int | 	ConcurrencyLimit int | ||||||
|  | 
 | ||||||
|  | 	// ConcurrencyDenominator is used in relative metrics of concurrency.
 | ||||||
|  | 	// It equals ConcurrencyLimit except when that is zero.
 | ||||||
|  | 	ConcurrencyDenominator int | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -197,7 +197,7 @@ func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePa | ||||||
| // calls for one, and returns a non-nil error if the given config is
 | // calls for one, and returns a non-nil error if the given config is
 | ||||||
| // invalid.
 | // invalid.
 | ||||||
| func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) { | func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) { | ||||||
| 	if qCfg.DesiredNumQueues == 0 { | 	if qCfg.DesiredNumQueues <= 0 { | ||||||
| 		return nil, nil | 		return nil, nil | ||||||
| 	} | 	} | ||||||
| 	dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize) | 	dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize) | ||||||
|  | @ -280,8 +280,8 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, | ||||||
| 		qll *= qCfg.DesiredNumQueues | 		qll *= qCfg.DesiredNumQueues | ||||||
| 	} | 	} | ||||||
| 	qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll)) | 	qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll)) | ||||||
| 	qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit)) | 	qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyDenominator)) | ||||||
| 	qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit)) | 	qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyDenominator)) | ||||||
| 
 | 
 | ||||||
| 	qs.dispatchAsMuchAsPossibleLocked() | 	qs.dispatchAsMuchAsPossibleLocked() | ||||||
| } | } | ||||||
|  | @ -796,6 +796,9 @@ func (qs *queueSet) dispatchLocked() bool { | ||||||
| // otherwise it returns false.
 | // otherwise it returns false.
 | ||||||
| func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { | func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { | ||||||
| 	switch { | 	switch { | ||||||
|  | 	case qs.qCfg.DesiredNumQueues < 0: | ||||||
|  | 		// This is code for exemption from limitation
 | ||||||
|  | 		return true | ||||||
| 	case seats > qs.dCfg.ConcurrencyLimit: | 	case seats > qs.dCfg.ConcurrencyLimit: | ||||||
| 		// we have picked the queue with the minimum virtual finish time, but
 | 		// we have picked the queue with the minimum virtual finish time, but
 | ||||||
| 		// the number of seats this request asks for exceeds the concurrency limit.
 | 		// the number of seats this request asks for exceeds the concurrency limit.
 | ||||||
|  |  | ||||||
|  | @ -514,7 +514,7 @@ func TestBaseline(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) | 	qs := qsComplete(qsc, 1) | ||||||
| 
 | 
 | ||||||
| 	uniformScenario{name: qCfg.Name, | 	uniformScenario{name: qCfg.Name, | ||||||
| 		qs: qs, | 		qs: qs, | ||||||
|  | @ -534,6 +534,47 @@ func TestBaseline(t *testing.T) { | ||||||
| 	}.exercise(t) | 	}.exercise(t) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func TestExampt(t *testing.T) { | ||||||
|  | 	metrics.Register() | ||||||
|  | 	for concurrencyLimit := 0; concurrencyLimit <= 2; concurrencyLimit += 2 { | ||||||
|  | 		t.Run(fmt.Sprintf("concurrency=%d", concurrencyLimit), func(t *testing.T) { | ||||||
|  | 			now := time.Now() | ||||||
|  | 			clk, counter := testeventclock.NewFake(now, 0, nil) | ||||||
|  | 			qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) | ||||||
|  | 			qCfg := fq.QueuingConfig{ | ||||||
|  | 				Name:             "TestBaseline", | ||||||
|  | 				DesiredNumQueues: -1, | ||||||
|  | 				QueueLengthLimit: 2, | ||||||
|  | 				HandSize:         3, | ||||||
|  | 				RequestWaitLimit: 10 * time.Minute, | ||||||
|  | 			} | ||||||
|  | 			seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") | ||||||
|  | 			qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) | ||||||
|  | 			if err != nil { | ||||||
|  | 				t.Fatal(err) | ||||||
|  | 			} | ||||||
|  | 			qs := qsComplete(qsc, concurrencyLimit) | ||||||
|  | 			uniformScenario{name: qCfg.Name, | ||||||
|  | 				qs: qs, | ||||||
|  | 				clients: []uniformClient{ | ||||||
|  | 					newUniformClient(1001001001, 5, 20, time.Second, time.Second).setInitWidth(3), | ||||||
|  | 				}, | ||||||
|  | 				concurrencyLimit:            1, | ||||||
|  | 				evalDuration:                time.Second * 40, | ||||||
|  | 				expectedFair:                []bool{true}, // "fair" is a bit odd-sounding here, but it "expectFair" here means expect `expectedAverages`
 | ||||||
|  | 				expectedAverages:            []float64{7.5}, | ||||||
|  | 				expectedFairnessMargin:      []float64{0.00000001}, | ||||||
|  | 				expectAllRequests:           true, | ||||||
|  | 				evalInqueueMetrics:          false, | ||||||
|  | 				evalExecutingMetrics:        true, | ||||||
|  | 				clk:                         clk, | ||||||
|  | 				counter:                     counter, | ||||||
|  | 				seatDemandIntegratorSubject: seatDemandIntegratorSubject, | ||||||
|  | 			}.exercise(t) | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func TestSeparations(t *testing.T) { | func TestSeparations(t *testing.T) { | ||||||
| 	flts := func(avgs ...float64) []float64 { return avgs } | 	flts := func(avgs ...float64) []float64 { return avgs } | ||||||
| 	for _, seps := range []struct { | 	for _, seps := range []struct { | ||||||
|  | @ -585,7 +626,7 @@ func TestSeparations(t *testing.T) { | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Fatal(err) | 				t.Fatal(err) | ||||||
| 			} | 			} | ||||||
| 			qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: seps.conc}) | 			qs := qsComplete(qsc, seps.conc) | ||||||
| 			uniformScenario{name: qCfg.Name, | 			uniformScenario{name: qCfg.Name, | ||||||
| 				qs: qs, | 				qs: qs, | ||||||
| 				clients: []uniformClient{ | 				clients: []uniformClient{ | ||||||
|  | @ -626,7 +667,7 @@ func TestUniformFlowsHandSize1(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) | 	qs := qsComplete(qsc, 4) | ||||||
| 
 | 
 | ||||||
| 	uniformScenario{name: qCfg.Name, | 	uniformScenario{name: qCfg.Name, | ||||||
| 		qs: qs, | 		qs: qs, | ||||||
|  | @ -665,7 +706,7 @@ func TestUniformFlowsHandSize3(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) | 	qs := qsComplete(qsc, 4) | ||||||
| 	uniformScenario{name: qCfg.Name, | 	uniformScenario{name: qCfg.Name, | ||||||
| 		qs: qs, | 		qs: qs, | ||||||
| 		clients: []uniformClient{ | 		clients: []uniformClient{ | ||||||
|  | @ -703,7 +744,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) | 	qs := qsComplete(qsc, 4) | ||||||
| 
 | 
 | ||||||
| 	uniformScenario{name: qCfg.Name, | 	uniformScenario{name: qCfg.Name, | ||||||
| 		qs: qs, | 		qs: qs, | ||||||
|  | @ -745,7 +786,7 @@ func TestSeatSecondsRollover(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 2000}) | 	qs := qsComplete(qsc, 2000) | ||||||
| 
 | 
 | ||||||
| 	uniformScenario{name: qCfg.Name, | 	uniformScenario{name: qCfg.Name, | ||||||
| 		qs: qs, | 		qs: qs, | ||||||
|  | @ -785,7 +826,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3}) | 	qs := qsComplete(qsc, 3) | ||||||
| 
 | 
 | ||||||
| 	uniformScenario{name: qCfg.Name, | 	uniformScenario{name: qCfg.Name, | ||||||
| 		qs: qs, | 		qs: qs, | ||||||
|  | @ -824,7 +865,7 @@ func TestDifferentWidths(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6}) | 	qs := qsComplete(qsc, 6) | ||||||
| 	uniformScenario{name: qCfg.Name, | 	uniformScenario{name: qCfg.Name, | ||||||
| 		qs: qs, | 		qs: qs, | ||||||
| 		clients: []uniformClient{ | 		clients: []uniformClient{ | ||||||
|  | @ -862,7 +903,7 @@ func TestTooWide(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6}) | 	qs := qsComplete(qsc, 6) | ||||||
| 	uniformScenario{name: qCfg.Name, | 	uniformScenario{name: qCfg.Name, | ||||||
| 		qs: qs, | 		qs: qs, | ||||||
| 		clients: []uniformClient{ | 		clients: []uniformClient{ | ||||||
|  | @ -925,7 +966,7 @@ func TestWindup(t *testing.T) { | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Fatal(err) | 				t.Fatal(err) | ||||||
| 			} | 			} | ||||||
| 			qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3}) | 			qs := qsComplete(qsc, 3) | ||||||
| 
 | 
 | ||||||
| 			uniformScenario{name: qCfg.Name, qs: qs, | 			uniformScenario{name: qCfg.Name, qs: qs, | ||||||
| 				clients: []uniformClient{ | 				clients: []uniformClient{ | ||||||
|  | @ -962,7 +1003,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) | 	qs := qsComplete(qsc, 4) | ||||||
| 
 | 
 | ||||||
| 	uniformScenario{name: qCfg.Name, | 	uniformScenario{name: qCfg.Name, | ||||||
| 		qs: qs, | 		qs: qs, | ||||||
|  | @ -1000,7 +1041,7 @@ func TestTimeout(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) | 	qs := qsComplete(qsc, 1) | ||||||
| 
 | 
 | ||||||
| 	uniformScenario{name: qCfg.Name, | 	uniformScenario{name: qCfg.Name, | ||||||
| 		qs: qs, | 		qs: qs, | ||||||
|  | @ -1053,7 +1094,7 @@ func TestContextCancel(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) | 	qs := qsComplete(qsc, 1) | ||||||
| 	counter.Add(1) // account for main activity of the goroutine running this test
 | 	counter.Add(1) // account for main activity of the goroutine running this test
 | ||||||
| 	ctx1 := context.Background() | 	ctx1 := context.Background() | ||||||
| 	pZero := func() *int32 { var zero int32; return &zero } | 	pZero := func() *int32 { var zero int32; return &zero } | ||||||
|  | @ -1159,7 +1200,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) | 	qs := qsComplete(qsc, 1) | ||||||
| 	counter.Add(1) // account for the goroutine running this test
 | 	counter.Add(1) // account for the goroutine running this test
 | ||||||
| 
 | 
 | ||||||
| 	queue, ok := qs.(*queueSet) | 	queue, ok := qs.(*queueSet) | ||||||
|  | @ -1545,3 +1586,11 @@ func float64NaNTo0(x float64) float64 { | ||||||
| 	} | 	} | ||||||
| 	return x | 	return x | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func qsComplete(qsc fq.QueueSetCompleter, concurrencyLimit int) fq.QueueSet { | ||||||
|  | 	concurrencyDenominator := concurrencyLimit | ||||||
|  | 	if concurrencyDenominator <= 0 { | ||||||
|  | 		concurrencyDenominator = 1 | ||||||
|  | 	} | ||||||
|  | 	return qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: concurrencyLimit, ConcurrencyDenominator: concurrencyDenominator}) | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue