diff --git a/pkg/server/config.go b/pkg/server/config.go index b92063705..9699177ff 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -626,6 +626,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G if s.isPostStartHookRegistered(priorityAndFairnessConfigConsumerHookName) { } else if c.FlowControl != nil { err := s.AddPostStartHook(priorityAndFairnessConfigConsumerHookName, func(context PostStartHookContext) error { + go c.FlowControl.MaintainObservations(context.StopCh) go c.FlowControl.Run(context.StopCh) return nil }) diff --git a/pkg/util/flowcontrol/apf_controller.go b/pkg/util/flowcontrol/apf_controller.go index 91f357023..50eb33272 100644 --- a/pkg/util/flowcontrol/apf_controller.go +++ b/pkg/util/flowcontrol/apf_controller.go @@ -227,6 +227,23 @@ func (cfgCtlr *configController) initializeConfigController(informerFactory kube }}) } +// MaintainObservations keeps the observers from +// metrics.PriorityLevelConcurrencyObserverPairGenerator from falling +// too far behind +func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) { + wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh) +} + +func (cfgCtlr *configController) updateObservations() { + cfgCtlr.lock.Lock() + defer cfgCtlr.lock.Unlock() + for _, plc := range cfgCtlr.priorityLevelStates { + if plc.queues != nil { + plc.queues.UpdateObservations() + } + } +} + func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error { defer cfgCtlr.configQueue.ShutDown() klog.Info("Starting API Priority and Fairness config controller") diff --git a/pkg/util/flowcontrol/apf_filter.go b/pkg/util/flowcontrol/apf_filter.go index 99c36005b..5b8c03916 100644 --- a/pkg/util/flowcontrol/apf_filter.go +++ b/pkg/util/flowcontrol/apf_filter.go @@ -38,10 +38,13 @@ import ( type Interface interface { // Handle takes care of queuing and dispatching a request // characterized by the given digest. The given `noteFn` will be - // invoked with the results of request classification. If Handle - // decides that the request should be executed then `execute()` - // will be invoked once to execute the request; otherwise - // `execute()` will not be invoked. + // invoked with the results of request classification. If the + // request is queued then `queueNoteFn` will be called twice, + // first with `true` and then with `false`; otherwise + // `queueNoteFn` will not be called at all. If Handle decides + // that the request should be executed then `execute()` will be + // invoked once to execute the request; otherwise `execute()` will + // not be invoked. Handle(ctx context.Context, requestDigest RequestDigest, noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration), @@ -49,6 +52,9 @@ type Interface interface { execFn func(), ) + // MaintainObservations is a helper for maintaining statistics. + MaintainObservations(stopCh <-chan struct{}) + // Run monitors config objects from the main apiservers and causes // any needed changes to local behavior. This method ceases // activity and returns after the given channel is closed. diff --git a/pkg/util/flowcontrol/controller_test.go b/pkg/util/flowcontrol/controller_test.go index 1d762d4ba..cb6ff527e 100644 --- a/pkg/util/flowcontrol/controller_test.go +++ b/pkg/util/flowcontrol/controller_test.go @@ -97,6 +97,9 @@ func (cqs *ctlrTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSet return ctlrTestQueueSetCompleter{cqs.cts, cqs, qc}, nil } +func (cqs *ctlrTestQueueSet) UpdateObservations() { +} + func (cqs *ctlrTestQueueSet) Dump(bool) debug.QueueSetDump { return debug.QueueSetDump{} } diff --git a/pkg/util/flowcontrol/fairqueuing/interface.go b/pkg/util/flowcontrol/fairqueuing/interface.go index e0b628ecd..882a505c8 100644 --- a/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/pkg/util/flowcontrol/fairqueuing/interface.go @@ -82,6 +82,10 @@ type QueueSet interface { // exactly once. StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) + // UpdateObservations makes sure any time-based statistics have + // caught up with the current clock reading + UpdateObservations() + // Dump saves and returns the instant internal state of the queue-set. // Note that dumping process will stop the queue-set from proceeding // any requests. diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index adcb56d85..b469a4ac5 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -743,6 +743,11 @@ func (qs *queueSet) goroutineDoneOrBlocked() { qs.counter.Add(-1) } +func (qs *queueSet) UpdateObservations() { + qs.obsPair.RequestsWaiting.Add(0) + qs.obsPair.RequestsExecuting.Add(0) +} + func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { qs.lock.Lock() defer qs.lock.Unlock() diff --git a/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 53d3795d0..0cc08b182 100644 --- a/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -59,6 +59,9 @@ func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDisti return noRestraintRequest{}, false } +func (noRestraint) UpdateObservations() { +} + func (noRestraint) Dump(bool) debug.QueueSetDump { return debug.QueueSetDump{} }