diff --git a/pkg/server/filters/priority-and-fairness_test.go b/pkg/server/filters/priority-and-fairness_test.go index 70af7839f..04772a7de 100644 --- a/pkg/server/filters/priority-and-fairness_test.go +++ b/pkg/server/filters/priority-and-fairness_test.go @@ -687,7 +687,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) stopCh := make(chan struct{}) - controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} var executed bool @@ -757,7 +757,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) stopCh := make(chan struct{}) - controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} var executed bool @@ -833,7 +833,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) stopCh := make(chan struct{}) - controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} var innerHandlerWriteErr error @@ -911,7 +911,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) stopCh := make(chan struct{}) - controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} var innerHandlerWriteErr error @@ -986,7 +986,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, queueLength) stopCh := make(chan struct{}) - controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} var firstRequestInnerHandlerWriteErr error @@ -1118,11 +1118,11 @@ func fmtError(err error) string { } func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration []runtime.Object, serverConcurrency int, - requestWaitLimit time.Duration, plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) { + plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) { clientset := newClientset(t, apfConfiguration...) // this test does not rely on resync, so resync period is set to zero factory := informers.NewSharedInformerFactory(clientset, 0) - controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta3(), serverConcurrency, requestWaitLimit) + controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta3(), serverConcurrency) factory.Start(stopCh) diff --git a/pkg/server/options/recommended.go b/pkg/server/options/recommended.go index 69f8fb515..5d031e202 100644 --- a/pkg/server/options/recommended.go +++ b/pkg/server/options/recommended.go @@ -154,7 +154,6 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error { config.SharedInformerFactory, kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1beta3(), config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight, - config.RequestTimeout/4, ) } else { klog.Warningf("Neither kubeconfig is provided nor service-account is mounted, so APIPriorityAndFairness will be disabled") diff --git a/pkg/util/flowcontrol/apf_controller.go b/pkg/util/flowcontrol/apf_controller.go index f2df57ccc..8c90811bf 100644 --- a/pkg/util/flowcontrol/apf_controller.go +++ b/pkg/util/flowcontrol/apf_controller.go @@ -150,9 +150,6 @@ type configController struct { // from server configuration. serverConcurrencyLimit int - // requestWaitLimit comes from server configuration. - requestWaitLimit time.Duration - // watchTracker implements the necessary WatchTracker interface. WatchTracker @@ -287,13 +284,12 @@ func newTestableController(config TestableConfig) *configController { asFieldManager: config.AsFieldManager, foundToDangling: config.FoundToDangling, serverConcurrencyLimit: config.ServerConcurrencyLimit, - requestWaitLimit: config.RequestWaitLimit, flowcontrolClient: config.FlowcontrolClient, priorityLevelStates: make(map[string]*priorityLevelState), WatchTracker: NewWatchTracker(), MaxSeatsTracker: NewMaxSeatsTracker(), } - klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager) + klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.name, cfgCtlr.asFieldManager) // Start with longish delay because conflicts will be between // different processes, so take some time to go away. cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue") @@ -433,7 +429,7 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta plState := plStates[plName] if setCompleters { qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues, - plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, + plState.pl, plState.reqsGaugePair, plState.execSeatsObs, metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge)) if err != nil { klog.ErrorS(err, "Inconceivable! Configuration error in existing priority level", "pl", plState.pl) @@ -657,10 +653,10 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontro // Supply missing mandatory PriorityLevelConfiguration objects if !meal.haveExemptPL { - meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtlr.requestWaitLimit) + meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt) } if !meal.haveCatchAllPL { - meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtlr.requestWaitLimit) + meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll) } meal.finishQueueSetReconfigsLocked() @@ -692,7 +688,7 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi } } qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, - pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, + pl, state.reqsGaugePair, state.execSeatsObs, metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge)) if err != nil { klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) @@ -798,7 +794,7 @@ func (meal *cfgMeal) processOldPLsLocked() { } var err error plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, - plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, + plState.pl, plState.reqsGaugePair, plState.execSeatsObs, metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge)) if err != nil { // This can not happen because queueSetCompleterForPL already approved this config @@ -880,7 +876,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { // queueSetCompleterForPL returns an appropriate QueueSetCompleter for the // given priority level configuration. Returns nil and an error if the given // object is malformed in a way that is a problem for this package. -func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) { +func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) { if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited) != (pl.Spec.Limited != nil) { return nil, errors.New("broken union structure at the top, for Limited") } @@ -902,7 +898,6 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow DesiredNumQueues: int(qcAPI.Queues), QueueLengthLimit: int(qcAPI.QueueLengthLimit), HandSize: int(qcAPI.HandSize), - RequestWaitLimit: requestWaitLimit, } } } else { @@ -956,16 +951,15 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl // imaginePL adds a priority level based on one of the mandatory ones // that does not actually exist (right now) as a real API object. -func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) { +func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration) { klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name) labelValues := []string{proto.Name} reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues) execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues) seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name) seatDemandRatioedGauge := metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{proto.Name}) - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, - requestWaitLimit, reqsGaugePair, execSeatsObs, - metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge)) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, reqsGaugePair, + execSeatsObs, metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge)) if err != nil { // This can not happen because proto is one of the mandatory // objects and these are not erroneous diff --git a/pkg/util/flowcontrol/apf_filter.go b/pkg/util/flowcontrol/apf_filter.go index 76782623a..05f4f5e53 100644 --- a/pkg/util/flowcontrol/apf_filter.go +++ b/pkg/util/flowcontrol/apf_filter.go @@ -90,7 +90,6 @@ func New( informerFactory kubeinformers.SharedInformerFactory, flowcontrolClient flowcontrolclient.FlowcontrolV1beta3Interface, serverConcurrencyLimit int, - requestWaitLimit time.Duration, ) Interface { clk := eventclock.Real{} return NewTestable(TestableConfig{ @@ -101,7 +100,6 @@ func New( InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: serverConcurrencyLimit, - RequestWaitLimit: requestWaitLimit, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: fqs.NewQueueSetFactory(clk), @@ -139,9 +137,6 @@ type TestableConfig struct { // ServerConcurrencyLimit for the controller to enforce ServerConcurrencyLimit int - // RequestWaitLimit configured on the server - RequestWaitLimit time.Duration - // GaugeVec for metrics about requests, broken down by phase and priority_level ReqsGaugeVec metrics.RatioedGaugeVec diff --git a/pkg/util/flowcontrol/apf_filter_test.go b/pkg/util/flowcontrol/apf_filter_test.go index 840fecd75..dcc4de38a 100644 --- a/pkg/util/flowcontrol/apf_filter_test.go +++ b/pkg/util/flowcontrol/apf_filter_test.go @@ -109,7 +109,6 @@ func TestQueueWaitTimeLatencyTracker(t *testing.T) { InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 24, - RequestWaitLimit: time.Minute, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: fqs.NewQueueSetFactory(clk), diff --git a/pkg/util/flowcontrol/borrowing_test.go b/pkg/util/flowcontrol/borrowing_test.go index 8511730f0..e6ab27bea 100644 --- a/pkg/util/flowcontrol/borrowing_test.go +++ b/pkg/util/flowcontrol/borrowing_test.go @@ -143,7 +143,6 @@ func TestBorrowing(t *testing.T) { InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 24, - RequestWaitLimit: time.Minute, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: fqs.NewQueueSetFactory(clk), diff --git a/pkg/util/flowcontrol/controller_test.go b/pkg/util/flowcontrol/controller_test.go index ec7c58a9b..a50c09680 100644 --- a/pkg/util/flowcontrol/controller_test.go +++ b/pkg/util/flowcontrol/controller_test.go @@ -251,8 +251,7 @@ func TestConfigConsumer(t *testing.T) { FoundToDangling: func(found bool) bool { return !found }, InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, - ServerConcurrencyLimit: 100, // server concurrency limit - RequestWaitLimit: time.Minute, // request wait limit + ServerConcurrencyLimit: 100, // server concurrency limit ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: cts, @@ -384,7 +383,6 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) { InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 100, - RequestWaitLimit: time.Minute, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: cts, diff --git a/pkg/util/flowcontrol/fairqueuing/interface.go b/pkg/util/flowcontrol/fairqueuing/interface.go index 013fd41e0..3b0ad1638 100644 --- a/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/pkg/util/flowcontrol/fairqueuing/interface.go @@ -18,7 +18,6 @@ package fairqueuing import ( "context" - "time" "k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" @@ -117,7 +116,7 @@ type QueuingConfig struct { // DesiredNumQueues is the number of queues that the API says // should exist now. This may be non-positive, in which case - // QueueLengthLimit, HandSize, and RequestWaitLimit are ignored. + // QueueLengthLimit, and HandSize are ignored. // A value of zero means to respect the ConcurrencyLimit of the DispatchingConfig. // A negative value means to always dispatch immediately upon arrival // (i.e., the requests are "exempt" from limitation). @@ -129,10 +128,6 @@ type QueuingConfig struct { // HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly // dealing a "hand" of this many queues and then picking one of minimum length. HandSize int - - // RequestWaitLimit is the maximum amount of time that a request may wait in a queue. - // If, by the end of that time, the request has not been dispatched then it is rejected. - RequestWaitLimit time.Duration } // DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet. diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 319b684eb..c16288246 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -272,7 +272,6 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, } else { qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit qCfg.HandSize = qs.qCfg.HandSize - qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit } qs.qCfg = qCfg diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index f1f7b4d6a..5e9399605 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -551,7 +551,6 @@ func TestBaseline(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 8, HandSize: 3, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -590,7 +589,6 @@ func TestExampt(t *testing.T) { DesiredNumQueues: -1, QueueLengthLimit: 2, HandSize: 3, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -663,7 +661,6 @@ func TestSeparations(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 8, HandSize: 3, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, caseName+" seatDemandSubject") qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -704,7 +701,6 @@ func TestUniformFlowsHandSize1(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 8, HandSize: 1, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -743,7 +739,6 @@ func TestUniformFlowsHandSize3(t *testing.T) { DesiredNumQueues: 8, QueueLengthLimit: 16, HandSize: 3, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -781,7 +776,6 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 8, HandSize: 1, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -823,7 +817,6 @@ func TestSeatSecondsRollover(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 8, HandSize: 1, - RequestWaitLimit: 40 * Quarter, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -863,7 +856,6 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 6, HandSize: 1, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -902,7 +894,6 @@ func TestDifferentWidths(t *testing.T) { DesiredNumQueues: 64, QueueLengthLimit: 13, HandSize: 7, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -940,7 +931,6 @@ func TestTooWide(t *testing.T) { DesiredNumQueues: 64, QueueLengthLimit: 35, HandSize: 7, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -1003,7 +993,6 @@ func TestWindup(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 6, HandSize: 1, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -1093,7 +1082,6 @@ func TestContextCancel(t *testing.T) { DesiredNumQueues: 11, QueueLengthLimit: 11, HandSize: 1, - RequestWaitLimit: 15 * time.Second, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -1200,7 +1188,6 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { qCfg := fq.QueuingConfig{ Name: "TestTotalRequestsExecutingWithPanic", DesiredNumQueues: 0, - RequestWaitLimit: 15 * time.Second, } qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, qCfg.Name)) if err != nil { diff --git a/pkg/util/flowcontrol/gen_test.go b/pkg/util/flowcontrol/gen_test.go index 34e67386c..4b0d7dba4 100644 --- a/pkg/util/flowcontrol/gen_test.go +++ b/pkg/util/flowcontrol/gen_test.go @@ -21,7 +21,6 @@ import ( "math/rand" "sync/atomic" "testing" - "time" "k8s.io/utils/clock" @@ -60,7 +59,7 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration QueueLengthLimit: 5} } labelVals := []string{"test"} - _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals), fq.NewNamedIntegrator(clock.RealClock{}, name)) + _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals), fq.NewNamedIntegrator(clock.RealClock{}, name)) if err != nil { panic(err) } diff --git a/pkg/util/flowcontrol/max_seats_test.go b/pkg/util/flowcontrol/max_seats_test.go index 23697acfa..92c0367f2 100644 --- a/pkg/util/flowcontrol/max_seats_test.go +++ b/pkg/util/flowcontrol/max_seats_test.go @@ -108,7 +108,6 @@ func Test_GetMaxSeats(t *testing.T) { // for the purposes of this test, serverCL ~= nominalCL since there is // only 1 PL with large concurrency shares, making mandatory PLs negligible. ServerConcurrencyLimit: testcase.nominalCL, - RequestWaitLimit: time.Minute, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: fqs.NewQueueSetFactory(clk),