Eliminate MaintainObservations function in P&F
Kubernetes-commit: badf436ac4451590e5e84e537f2234e3632ea3b4
This commit is contained in:
		
							parent
							
								
									fc846fd428
								
							
						
					
					
						commit
						8f7c120935
					
				|  | @ -713,7 +713,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G | ||||||
| 	if s.isPostStartHookRegistered(priorityAndFairnessConfigConsumerHookName) { | 	if s.isPostStartHookRegistered(priorityAndFairnessConfigConsumerHookName) { | ||||||
| 	} else if c.FlowControl != nil { | 	} else if c.FlowControl != nil { | ||||||
| 		err := s.AddPostStartHook(priorityAndFairnessConfigConsumerHookName, func(context PostStartHookContext) error { | 		err := s.AddPostStartHook(priorityAndFairnessConfigConsumerHookName, func(context PostStartHookContext) error { | ||||||
| 			go c.FlowControl.MaintainObservations(context.StopCh) |  | ||||||
| 			go c.FlowControl.Run(context.StopCh) | 			go c.FlowControl.Run(context.StopCh) | ||||||
| 			return nil | 			return nil | ||||||
| 		}) | 		}) | ||||||
|  |  | ||||||
|  | @ -82,9 +82,6 @@ type fakeApfFilter struct { | ||||||
| 	utilflowcontrol.WatchTracker | 	utilflowcontrol.WatchTracker | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) { |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t fakeApfFilter) Handle(ctx context.Context, | func (t fakeApfFilter) Handle(ctx context.Context, | ||||||
| 	requestDigest utilflowcontrol.RequestDigest, | 	requestDigest utilflowcontrol.RequestDigest, | ||||||
| 	noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), | 	noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), | ||||||
|  | @ -394,9 +391,6 @@ func (f *fakeWatchApfFilter) Handle(ctx context.Context, | ||||||
| 	f.inflight-- | 	f.inflight-- | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (f *fakeWatchApfFilter) MaintainObservations(stopCh <-chan struct{}) { |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error { | func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -291,23 +291,6 @@ func newTestableController(config TestableConfig) *configController { | ||||||
| 	return cfgCtlr | 	return cfgCtlr | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // MaintainObservations keeps the observers from
 |  | ||||||
| // metrics.PriorityLevelConcurrencyPairVec from falling
 |  | ||||||
| // too far behind
 |  | ||||||
| func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) { |  | ||||||
| 	wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (cfgCtlr *configController) updateObservations() { |  | ||||||
| 	cfgCtlr.lock.RLock() |  | ||||||
| 	defer cfgCtlr.lock.RUnlock() |  | ||||||
| 	for _, plc := range cfgCtlr.priorityLevelStates { |  | ||||||
| 		if plc.queues != nil { |  | ||||||
| 			plc.queues.UpdateObservations() |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error { | func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error { | ||||||
| 	defer utilruntime.HandleCrash() | 	defer utilruntime.HandleCrash() | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -66,9 +66,6 @@ type Interface interface { | ||||||
| 		execFn func(), | 		execFn func(), | ||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	// MaintainObservations is a helper for maintaining statistics.
 |  | ||||||
| 	MaintainObservations(stopCh <-chan struct{}) |  | ||||||
| 
 |  | ||||||
| 	// Run monitors config objects from the main apiservers and causes
 | 	// Run monitors config objects from the main apiservers and causes
 | ||||||
| 	// any needed changes to local behavior.  This method ceases
 | 	// any needed changes to local behavior.  This method ceases
 | ||||||
| 	// activity and returns after the given channel is closed.
 | 	// activity and returns after the given channel is closed.
 | ||||||
|  |  | ||||||
|  | @ -113,9 +113,6 @@ func (cqs *ctlrTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSet | ||||||
| 	return ctlrTestQueueSetCompleter{cqs.cts, cqs, qc}, nil | 	return ctlrTestQueueSetCompleter{cqs.cts, cqs, qc}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (cqs *ctlrTestQueueSet) UpdateObservations() { |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (cqs *ctlrTestQueueSet) Dump(bool) debug.QueueSetDump { | func (cqs *ctlrTestQueueSet) Dump(bool) debug.QueueSetDump { | ||||||
| 	return debug.QueueSetDump{} | 	return debug.QueueSetDump{} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -86,10 +86,6 @@ type QueueSet interface { | ||||||
| 	// exactly once.
 | 	// exactly once.
 | ||||||
| 	StartRequest(ctx context.Context, width *request.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) | 	StartRequest(ctx context.Context, width *request.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) | ||||||
| 
 | 
 | ||||||
| 	// UpdateObservations makes sure any time-based statistics have
 |  | ||||||
| 	// caught up with the current clock reading
 |  | ||||||
| 	UpdateObservations() |  | ||||||
| 
 |  | ||||||
| 	// Dump saves and returns the instant internal state of the queue-set.
 | 	// Dump saves and returns the instant internal state of the queue-set.
 | ||||||
| 	// Note that dumping process will stop the queue-set from proceeding
 | 	// Note that dumping process will stop the queue-set from proceeding
 | ||||||
| 	// any requests.
 | 	// any requests.
 | ||||||
|  |  | ||||||
|  | @ -988,12 +988,6 @@ func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue { | ||||||
| 	return keptQueues | 	return keptQueues | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (qs *queueSet) UpdateObservations() { |  | ||||||
| 	qs.reqsGaugePair.RequestsWaiting.Add(0) |  | ||||||
| 	qs.reqsGaugePair.RequestsExecuting.Add(0) |  | ||||||
| 	qs.execSeatsGauge.Add(0) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { | func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { | ||||||
| 	qs.lock.Lock() | 	qs.lock.Lock() | ||||||
| 	defer qs.lock.Unlock() | 	defer qs.lock.Unlock() | ||||||
|  |  | ||||||
|  | @ -60,9 +60,6 @@ func (noRestraint) StartRequest(ctx context.Context, workEstimate *fcrequest.Wor | ||||||
| 	return noRestraintRequest{}, false | 	return noRestraintRequest{}, false | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (noRestraint) UpdateObservations() { |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (noRestraint) Dump(bool) debug.QueueSetDump { | func (noRestraint) Dump(bool) debug.QueueSetDump { | ||||||
| 	return debug.QueueSetDump{} | 	return debug.QueueSetDump{} | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue