diff --git a/controller/controller.go b/controller/controller.go index 895b05d01..300f76e82 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -216,10 +216,13 @@ type ControllerOptions struct { // NewImpl instantiates an instance of our controller that will feed work to the // provided Reconciler as it is enqueued. +// Deprecated: use NewImplFull. func NewImpl(r Reconciler, logger *zap.SugaredLogger, workQueueName string) *Impl { return NewImplFull(r, ControllerOptions{WorkQueueName: workQueueName, Logger: logger}) } +// NewImplWithStats creates a controller.Impl with stats reporter. +// Deprecated: use NewImplFull. func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName string, reporter StatsReporter) *Impl { return NewImplFull(r, ControllerOptions{WorkQueueName: workQueueName, Logger: logger, Reporter: reporter}) } @@ -416,15 +419,14 @@ func (c *Impl) EnqueueKeyAfter(key types.NamespacedName, delay time.Duration) { // internal work queue and waits for workers to finish processing their current // work items. func (c *Impl) RunContext(ctx context.Context, threadiness int) error { - logger := c.logger - defer runtime.HandleCrash() sg := sync.WaitGroup{} - defer sg.Wait() defer func() { c.workQueue.ShutDown() for c.workQueue.Len() > 0 { time.Sleep(time.Millisecond * 100) } + sg.Wait() + runtime.HandleCrash() }() if la, ok := c.Reconciler.(reconciler.LeaderAware); ok { @@ -441,7 +443,7 @@ func (c *Impl) RunContext(ctx context.Context, threadiness int) error { } // Launch workers to process resources that get enqueued to our workqueue. - logger.Info("Starting controller and workers") + c.logger.Info("Starting controller and workers") for i := 0; i < threadiness; i++ { sg.Add(1) go func() { @@ -451,9 +453,9 @@ func (c *Impl) RunContext(ctx context.Context, threadiness int) error { }() } - logger.Info("Started workers") + c.logger.Info("Started workers") <-ctx.Done() - logger.Info("Shutting down workers") + c.logger.Info("Shutting down workers") return nil } @@ -485,13 +487,6 @@ func (c *Impl) processNextWorkItem() bool { // Send the metrics for the current queue depth c.statsReporter.ReportQueueDepth(int64(c.workQueue.Len())) - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if - // reconcile succeeds. If a transient error occurs, we do not call - // Forget and put the item back to the queue with an increased - // delay. - defer c.workQueue.Done(key) - var err error defer func() { status := trueString @@ -499,6 +494,13 @@ func (c *Impl) processNextWorkItem() bool { status = falseString } c.statsReporter.ReportReconcile(time.Since(startTime), status) + + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if + // reconcile succeeds. If a transient error occurs, we do not call + // Forget and put the item back to the queue with an increased + // delay. + c.workQueue.Done(key) }() // Embed the key into the logger and attach that to the context we pass @@ -544,8 +546,8 @@ func (c *Impl) GlobalResync(si cache.SharedInformer) { c.FilteredGlobalResync(alwaysTrue, si) } -// FilteredGlobalResync enqueues (with a delay) all objects from the -// SharedInformer that pass the filter function +// FilteredGlobalResync enqueues all objects from the +// SharedInformer that pass the filter function in to the slow queue. func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) { if c.workQueue.ShuttingDown() { return