Optimize and clean the controller code (#1639)

- fix outdated comments
- reorder the code to reduce the  number of defer calls
- various other nits
"
This commit is contained in:
Victor Agababov 2020-08-21 22:20:46 -07:00 committed by GitHub
parent f0c35fcd5b
commit d5c09d2aef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 17 additions and 15 deletions

View File

@ -216,10 +216,13 @@ type ControllerOptions struct {
// NewImpl instantiates an instance of our controller that will feed work to the // NewImpl instantiates an instance of our controller that will feed work to the
// provided Reconciler as it is enqueued. // provided Reconciler as it is enqueued.
// Deprecated: use NewImplFull.
func NewImpl(r Reconciler, logger *zap.SugaredLogger, workQueueName string) *Impl { func NewImpl(r Reconciler, logger *zap.SugaredLogger, workQueueName string) *Impl {
return NewImplFull(r, ControllerOptions{WorkQueueName: workQueueName, Logger: logger}) return NewImplFull(r, ControllerOptions{WorkQueueName: workQueueName, Logger: logger})
} }
// NewImplWithStats creates a controller.Impl with stats reporter.
// Deprecated: use NewImplFull.
func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName string, reporter StatsReporter) *Impl { func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName string, reporter StatsReporter) *Impl {
return NewImplFull(r, ControllerOptions{WorkQueueName: workQueueName, Logger: logger, Reporter: reporter}) return NewImplFull(r, ControllerOptions{WorkQueueName: workQueueName, Logger: logger, Reporter: reporter})
} }
@ -416,15 +419,14 @@ func (c *Impl) EnqueueKeyAfter(key types.NamespacedName, delay time.Duration) {
// internal work queue and waits for workers to finish processing their current // internal work queue and waits for workers to finish processing their current
// work items. // work items.
func (c *Impl) RunContext(ctx context.Context, threadiness int) error { func (c *Impl) RunContext(ctx context.Context, threadiness int) error {
logger := c.logger
defer runtime.HandleCrash()
sg := sync.WaitGroup{} sg := sync.WaitGroup{}
defer sg.Wait()
defer func() { defer func() {
c.workQueue.ShutDown() c.workQueue.ShutDown()
for c.workQueue.Len() > 0 { for c.workQueue.Len() > 0 {
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
} }
sg.Wait()
runtime.HandleCrash()
}() }()
if la, ok := c.Reconciler.(reconciler.LeaderAware); ok { if la, ok := c.Reconciler.(reconciler.LeaderAware); ok {
@ -441,7 +443,7 @@ func (c *Impl) RunContext(ctx context.Context, threadiness int) error {
} }
// Launch workers to process resources that get enqueued to our workqueue. // Launch workers to process resources that get enqueued to our workqueue.
logger.Info("Starting controller and workers") c.logger.Info("Starting controller and workers")
for i := 0; i < threadiness; i++ { for i := 0; i < threadiness; i++ {
sg.Add(1) sg.Add(1)
go func() { go func() {
@ -451,9 +453,9 @@ func (c *Impl) RunContext(ctx context.Context, threadiness int) error {
}() }()
} }
logger.Info("Started workers") c.logger.Info("Started workers")
<-ctx.Done() <-ctx.Done()
logger.Info("Shutting down workers") c.logger.Info("Shutting down workers")
return nil return nil
} }
@ -485,13 +487,6 @@ func (c *Impl) processNextWorkItem() bool {
// Send the metrics for the current queue depth // Send the metrics for the current queue depth
c.statsReporter.ReportQueueDepth(int64(c.workQueue.Len())) c.statsReporter.ReportQueueDepth(int64(c.workQueue.Len()))
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if
// reconcile succeeds. If a transient error occurs, we do not call
// Forget and put the item back to the queue with an increased
// delay.
defer c.workQueue.Done(key)
var err error var err error
defer func() { defer func() {
status := trueString status := trueString
@ -499,6 +494,13 @@ func (c *Impl) processNextWorkItem() bool {
status = falseString status = falseString
} }
c.statsReporter.ReportReconcile(time.Since(startTime), status) c.statsReporter.ReportReconcile(time.Since(startTime), status)
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if
// reconcile succeeds. If a transient error occurs, we do not call
// Forget and put the item back to the queue with an increased
// delay.
c.workQueue.Done(key)
}() }()
// Embed the key into the logger and attach that to the context we pass // Embed the key into the logger and attach that to the context we pass
@ -544,8 +546,8 @@ func (c *Impl) GlobalResync(si cache.SharedInformer) {
c.FilteredGlobalResync(alwaysTrue, si) c.FilteredGlobalResync(alwaysTrue, si)
} }
// FilteredGlobalResync enqueues (with a delay) all objects from the // FilteredGlobalResync enqueues all objects from the
// SharedInformer that pass the filter function // SharedInformer that pass the filter function in to the slow queue.
func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) { func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) {
if c.workQueue.ShuttingDown() { if c.workQueue.ShuttingDown() {
return return