Make sampleAndWaterMarkHistograms not fall very far behind

Kubernetes-commit: 9e89b92a92c02cdd2c70c0f52a30936e9c3309c7
This commit is contained in:
Mike Spreitzer 2020-08-20 16:43:11 -04:00 committed by Kubernetes Publisher
parent b06200931e
commit b1ede52e21
7 changed files with 43 additions and 4 deletions

View File

@ -626,6 +626,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
if s.isPostStartHookRegistered(priorityAndFairnessConfigConsumerHookName) {
} else if c.FlowControl != nil {
err := s.AddPostStartHook(priorityAndFairnessConfigConsumerHookName, func(context PostStartHookContext) error {
go c.FlowControl.MaintainObservations(context.StopCh)
go c.FlowControl.Run(context.StopCh)
return nil
})

View File

@ -227,6 +227,23 @@ func (cfgCtlr *configController) initializeConfigController(informerFactory kube
}})
}
// MaintainObservations keeps the observers from
// metrics.PriorityLevelConcurrencyObserverPairGenerator from falling
// too far behind
func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) {
wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh)
}
func (cfgCtlr *configController) updateObservations() {
cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock()
for _, plc := range cfgCtlr.priorityLevelStates {
if plc.queues != nil {
plc.queues.UpdateObservations()
}
}
}
func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
defer cfgCtlr.configQueue.ShutDown()
klog.Info("Starting API Priority and Fairness config controller")

View File

@ -38,10 +38,13 @@ import (
type Interface interface {
// Handle takes care of queuing and dispatching a request
// characterized by the given digest. The given `noteFn` will be
// invoked with the results of request classification. If Handle
// decides that the request should be executed then `execute()`
// will be invoked once to execute the request; otherwise
// `execute()` will not be invoked.
// invoked with the results of request classification. If the
// request is queued then `queueNoteFn` will be called twice,
// first with `true` and then with `false`; otherwise
// `queueNoteFn` will not be called at all. If Handle decides
// that the request should be executed then `execute()` will be
// invoked once to execute the request; otherwise `execute()` will
// not be invoked.
Handle(ctx context.Context,
requestDigest RequestDigest,
noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration),
@ -49,6 +52,9 @@ type Interface interface {
execFn func(),
)
// MaintainObservations is a helper for maintaining statistics.
MaintainObservations(stopCh <-chan struct{})
// Run monitors config objects from the main apiservers and causes
// any needed changes to local behavior. This method ceases
// activity and returns after the given channel is closed.

View File

@ -97,6 +97,9 @@ func (cqs *ctlrTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSet
return ctlrTestQueueSetCompleter{cqs.cts, cqs, qc}, nil
}
func (cqs *ctlrTestQueueSet) UpdateObservations() {
}
func (cqs *ctlrTestQueueSet) Dump(bool) debug.QueueSetDump {
return debug.QueueSetDump{}
}

View File

@ -82,6 +82,10 @@ type QueueSet interface {
// exactly once.
StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool)
// UpdateObservations makes sure any time-based statistics have
// caught up with the current clock reading
UpdateObservations()
// Dump saves and returns the instant internal state of the queue-set.
// Note that dumping process will stop the queue-set from proceeding
// any requests.

View File

@ -743,6 +743,11 @@ func (qs *queueSet) goroutineDoneOrBlocked() {
qs.counter.Add(-1)
}
func (qs *queueSet) UpdateObservations() {
qs.obsPair.RequestsWaiting.Add(0)
qs.obsPair.RequestsExecuting.Add(0)
}
func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
qs.lock.Lock()
defer qs.lock.Unlock()

View File

@ -59,6 +59,9 @@ func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDisti
return noRestraintRequest{}, false
}
func (noRestraint) UpdateObservations() {
}
func (noRestraint) Dump(bool) debug.QueueSetDump {
return debug.QueueSetDump{}
}