From 08156c67f66343e0a143f24438a656348d9617eb Mon Sep 17 00:00:00 2001 From: Victor Agababov Date: Tue, 21 Jul 2020 13:11:54 -0700 Subject: [PATCH] Use slow lane to do global resync (#1528) * Use slow lane to do global resync * cmt * yolo * yolo v2 * fix log str * fixes * publicize things * renamemove --- controller/controller.go | 84 ++++++++++++------- controller/controller_test.go | 39 +++++---- controller/two_lane_queue.go | 4 +- webhook/certificates/certificates_test.go | 4 +- webhook/configmaps/table_test.go | 4 +- webhook/psbinding/table_test.go | 4 +- .../defaulting/table_test.go | 4 +- .../validation/reconcile_config_test.go | 4 +- .../validation/validation_admit_test.go | 4 +- 9 files changed, 92 insertions(+), 59 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index 966a1ffff..551088341 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -186,12 +185,14 @@ type Impl struct { // from the workqueue to process. Public for testing. Reconciler Reconciler - // WorkQueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - WorkQueue workqueue.RateLimitingInterface + // workQueue is a rate-limited two-lane work queue. + // This is used to queue work to be processed instead of performing it as + // soon as a change happens. This means we can ensure we only process a + // fixed amount of resources at a time, and makes it easy to ensure we are + // never processing the same item simultaneously in two different workers. + // The slow queue is used for global resync and other background processes + // which are not required to complete at the highest priority. + workQueue *twoLaneQueue // Sugared logger is easier to use but is not as performant as the // raw logger. In performance critical paths, call logger.Desugar() @@ -215,12 +216,17 @@ func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName str return &Impl{ Name: workQueueName, Reconciler: r, - WorkQueue: newTwoLaneWorkQueue(workQueueName), + workQueue: newTwoLaneWorkQueue(workQueueName), logger: logger, statsReporter: reporter, } } +// WorkQueue permits direct access to the work queue. +func (c *Impl) WorkQueue() workqueue.RateLimitingInterface { + return c.workQueue +} + // EnqueueAfter takes a resource, converts it into a namespace/name string, // and passes it to EnqueueKey. func (c *Impl) EnqueueAfter(obj interface{}, after time.Duration) { @@ -232,6 +238,25 @@ func (c *Impl) EnqueueAfter(obj interface{}, after time.Duration) { c.EnqueueKeyAfter(types.NamespacedName{Namespace: object.GetNamespace(), Name: object.GetName()}, after) } +// EnqueueSlowKey takes a resource, converts it into a namespace/name string, +// and enqueues that key in the slow lane. +func (c *Impl) EnqueueSlowKey(key types.NamespacedName) { + c.workQueue.SlowLane().Add(key) + c.logger.Debugf("Adding to the slow queue %s (depth(total/slow): %d/%d)", safeKey(key), c.workQueue.Len(), c.workQueue.SlowLane().Len()) +} + +// EnqueueSlow extracts namesspeced name from the object and enqueues it on the slow +// work queue. +func (c *Impl) EnqueueSlow(obj interface{}) { + object, err := kmeta.DeletionHandlingAccessor(obj) + if err != nil { + c.logger.Errorw("EnqueueSlow", zap.Error(err)) + return + } + key := types.NamespacedName{Namespace: object.GetNamespace(), Name: object.GetName()} + c.EnqueueSlowKey(key) +} + // Enqueue takes a resource, converts it into a namespace/name string, // and passes it to EnqueueKey. func (c *Impl) Enqueue(obj interface{}) { @@ -342,23 +367,23 @@ func (c *Impl) EnqueueNamespaceOf(obj interface{}) { // EnqueueKey takes a namespace/name string and puts it onto the work queue. func (c *Impl) EnqueueKey(key types.NamespacedName) { - c.WorkQueue.Add(key) - c.logger.Debugf("Adding to queue %s (depth: %d)", safeKey(key), c.WorkQueue.Len()) + c.workQueue.Add(key) + c.logger.Debugf("Adding to queue %s (depth: %d)", safeKey(key), c.workQueue.Len()) } -// MaybeEnqueueBucketKey takes a Bucket and namespace/name string and puts it onto the work queue. +// MaybeEnqueueBucketKey takes a Bucket and namespace/name string and puts it onto +// the slow work queue. func (c *Impl) MaybeEnqueueBucketKey(bkt reconciler.Bucket, key types.NamespacedName) { if bkt.Has(key) { - c.WorkQueue.Add(key) - c.logger.Debugf("Adding to queue %s (depth: %d)", safeKey(key), c.WorkQueue.Len()) + c.EnqueueSlowKey(key) } } // EnqueueKeyAfter takes a namespace/name string and schedules its execution in // the work queue after given delay. func (c *Impl) EnqueueKeyAfter(key types.NamespacedName, delay time.Duration) { - c.WorkQueue.AddAfter(key, delay) - c.logger.Debugf("Adding to queue %s (delay: %v, depth: %d)", safeKey(key), delay, c.WorkQueue.Len()) + c.workQueue.AddAfter(key, delay) + c.logger.Debugf("Adding to queue %s (delay: %v, depth: %d)", safeKey(key), delay, c.workQueue.Len()) } // RunContext starts the controller's worker threads, the number of which is threadiness. @@ -372,8 +397,8 @@ func (c *Impl) RunContext(ctx context.Context, threadiness int) error { sg := sync.WaitGroup{} defer sg.Wait() defer func() { - c.WorkQueue.ShutDown() - for c.WorkQueue.Len() > 0 { + c.workQueue.ShutDown() + for c.workQueue.Len() > 0 { time.Sleep(time.Millisecond * 100) } }() @@ -423,25 +448,25 @@ func (c *Impl) Run(threadiness int, stopCh <-chan struct{}) error { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling Reconcile on our Reconciler. func (c *Impl) processNextWorkItem() bool { - obj, shutdown := c.WorkQueue.Get() + obj, shutdown := c.workQueue.Get() if shutdown { return false } key := obj.(types.NamespacedName) keyStr := safeKey(key) - c.logger.Debugf("Processing from queue %s (depth: %d)", safeKey(key), c.WorkQueue.Len()) + c.logger.Debugf("Processing from queue %s (depth: %d)", safeKey(key), c.workQueue.Len()) startTime := time.Now() // Send the metrics for the current queue depth - c.statsReporter.ReportQueueDepth(int64(c.WorkQueue.Len())) + c.statsReporter.ReportQueueDepth(int64(c.workQueue.Len())) // We call Done here so the workqueue knows we have finished // processing this item. We also must remember to call Forget if // reconcile succeeds. If a transient error occurs, we do not call // Forget and put the item back to the queue with an increased // delay. - defer c.WorkQueue.Done(key) + defer c.workQueue.Done(key) var err error defer func() { @@ -467,7 +492,7 @@ func (c *Impl) processNextWorkItem() bool { // Finally, if no error occurs we Forget this item so it does not // have any delay when another change happens. - c.WorkQueue.Forget(key) + c.workQueue.Forget(key) logger.Info("Reconcile succeeded. Time taken: ", time.Since(startTime)) return true @@ -480,16 +505,16 @@ func (c *Impl) handleErr(err error, key types.NamespacedName) { // We want to check that the queue is shutting down here // since controller Run might have exited by now (since while this item was // being processed, queue.Len==0). - if !IsPermanentError(err) && !c.WorkQueue.ShuttingDown() { - c.WorkQueue.AddRateLimited(key) - c.logger.Debugf("Requeuing key %s due to non-permanent error (depth: %d)", safeKey(key), c.WorkQueue.Len()) + if !IsPermanentError(err) && !c.workQueue.ShuttingDown() { + c.workQueue.AddRateLimited(key) + c.logger.Debugf("Requeuing key %s due to non-permanent error (depth: %d)", safeKey(key), c.workQueue.Len()) return } - c.WorkQueue.Forget(key) + c.workQueue.Forget(key) } -// GlobalResync enqueues (with a delay) all objects from the passed SharedInformer +// GlobalResync enqueues into the slow lane all objects from the passed SharedInformer func (c *Impl) GlobalResync(si cache.SharedInformer) { alwaysTrue := func(interface{}) bool { return true } c.FilteredGlobalResync(alwaysTrue, si) @@ -498,14 +523,13 @@ func (c *Impl) GlobalResync(si cache.SharedInformer) { // FilteredGlobalResync enqueues (with a delay) all objects from the // SharedInformer that pass the filter function func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) { - if c.WorkQueue.ShuttingDown() { + if c.workQueue.ShuttingDown() { return } list := si.GetStore().List() - count := float64(len(list)) for _, obj := range list { if f(obj) { - c.EnqueueAfter(obj, wait.Jitter(time.Second, count)) + c.EnqueueSlow(obj) } } } diff --git a/controller/controller_test.go b/controller/controller_test.go index 629ecc0f0..56e63c676 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -464,6 +464,17 @@ func TestEnqueues(t *testing.T) { }) }, wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "foo"}}, + }, { + name: "enqueue resource slow", + work: func(impl *Impl) { + impl.EnqueueSlow(&Resource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + }) + }, + wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "foo"}}, }, { name: "enqueue sentinel resource", work: func(impl *Impl) { @@ -720,8 +731,8 @@ func TestEnqueues(t *testing.T) { // The rate limit on our queue delays when things are added to the queue. time.Sleep(50 * time.Millisecond) - impl.WorkQueue.ShutDown() - gotQueue := drainWorkQueue(impl.WorkQueue) + impl.WorkQueue().ShutDown() + gotQueue := drainWorkQueue(impl.WorkQueue()) if diff := cmp.Diff(test.wantQueue, gotQueue); diff != "" { t.Errorf("unexpected queue (-want +got): %s", diff) @@ -730,7 +741,7 @@ func TestEnqueues(t *testing.T) { } } -func TestEnqeueAfter(t *testing.T) { +func TestEnqueueAfter(t *testing.T) { t.Cleanup(ClearAll) impl := NewImplWithStats(&NopReconciler{}, TestLogger(t), "Testing", &FakeStatsReporter{}) impl.EnqueueAfter(&Resource{ @@ -752,16 +763,16 @@ func TestEnqeueAfter(t *testing.T) { }, }, 20*time.Second) time.Sleep(10 * time.Millisecond) - if got, want := impl.WorkQueue.Len(), 0; got != want { + if got, want := impl.WorkQueue().Len(), 0; got != want { t.Errorf("|Queue| = %d, want: %d", got, want) } // Sleep the remaining time. time.Sleep(time.Second) - if got, want := impl.WorkQueue.Len(), 1; got != want { + if got, want := impl.WorkQueue().Len(), 1; got != want { t.Errorf("|Queue| = %d, want: %d", got, want) } - impl.WorkQueue.ShutDown() - if got, want := drainWorkQueue(impl.WorkQueue), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) { + impl.WorkQueue().ShutDown() + if got, want := drainWorkQueue(impl.WorkQueue()), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) { t.Errorf("Queue = %v, want: %v, diff: %s", got, want, cmp.Diff(got, want)) } } @@ -773,16 +784,16 @@ func TestEnqeueKeyAfter(t *testing.T) { impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "the", Name: "waterfall"}, 500*time.Millisecond) impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "to", Name: "fall"}, 20*time.Second) time.Sleep(10 * time.Millisecond) - if got, want := impl.WorkQueue.Len(), 0; got != want { + if got, want := impl.WorkQueue().Len(), 0; got != want { t.Errorf("|Queue| = %d, want: %d", got, want) } // Sleep the remaining time. time.Sleep(time.Second) - if got, want := impl.WorkQueue.Len(), 1; got != want { + if got, want := impl.WorkQueue().Len(), 1; got != want { t.Errorf("|Queue| = %d, want: %d", got, want) } - impl.WorkQueue.ShutDown() - if got, want := drainWorkQueue(impl.WorkQueue), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) { + impl.WorkQueue().ShutDown() + if got, want := drainWorkQueue(impl.WorkQueue()), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) { t.Errorf("Queue = %v, want: %v, diff: %s", got, want, cmp.Diff(got, want)) } } @@ -1007,7 +1018,7 @@ func TestStartAndShutdownWithWork(t *testing.T) { if got, want := r.count, 1; got != want { t.Errorf("reconcile count = %v, wanted %v", got, want) } - if got, want := impl.WorkQueue.NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want { + if got, want := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want { t.Errorf("requeues = %v, wanted %v", got, want) } @@ -1093,7 +1104,7 @@ func TestStartAndShutdownWithErroringWork(t *testing.T) { // As NumRequeues can't fully reflect the real state of queue length. // Here we need to wait for NumRequeues to be more than 1, to ensure // the key get re-queued and reprocessed as expect. - if got, wantAtLeast := impl.WorkQueue.NumRequeues(types.NamespacedName{Namespace: "", Name: "bar"}), 2; got < wantAtLeast { + if got, wantAtLeast := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "", Name: "bar"}), 2; got < wantAtLeast { t.Errorf("Requeue count = %v, wanted at least %v", got, wantAtLeast) } } @@ -1137,7 +1148,7 @@ func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) { } // Check that the work was not requeued in RateLimiter. - if got, want := impl.WorkQueue.NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want { + if got, want := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want { t.Errorf("Requeue count = %v, wanted %v", got, want) } diff --git a/controller/two_lane_queue.go b/controller/two_lane_queue.go index 165eb2df3..340a958c4 100644 --- a/controller/two_lane_queue.go +++ b/controller/two_lane_queue.go @@ -16,9 +16,7 @@ limitations under the License. package controller -import ( - "k8s.io/client-go/util/workqueue" -) +import "k8s.io/client-go/util/workqueue" // twoLaneQueue is a rate limited queue that wraps around two queues // -- fast queue (anonymously aliased), whose contents are processed with priority. diff --git a/webhook/certificates/certificates_test.go b/webhook/certificates/certificates_test.go index 776a8f1bf..c614fc2fe 100644 --- a/webhook/certificates/certificates_test.go +++ b/webhook/certificates/certificates_test.go @@ -229,7 +229,7 @@ func TestNew(t *testing.T) { t.Fatal("Expected NewController to return a non-nil value") } - if want, got := 0, c.WorkQueue.Len(); want != got { + if want, got := 0, c.WorkQueue().Len(); want != got { t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want) } @@ -244,7 +244,7 @@ func TestNew(t *testing.T) { // Queue has async moving parts so if we check at the wrong moment, this might still be 0. if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { - return c.WorkQueue.Len() == 1, nil + return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") } diff --git a/webhook/configmaps/table_test.go b/webhook/configmaps/table_test.go index abb2e9ff7..ff9548b61 100644 --- a/webhook/configmaps/table_test.go +++ b/webhook/configmaps/table_test.go @@ -327,7 +327,7 @@ func TestNew(t *testing.T) { t.Fatal("Expected NewController to return a non-nil value") } - if want, got := 0, c.WorkQueue.Len(); want != got { + if want, got := 0, c.WorkQueue().Len(); want != got { t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want) } @@ -342,7 +342,7 @@ func TestNew(t *testing.T) { // Queue has async moving parts so if we check at the wrong moment, this might still be 0. if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { - return c.WorkQueue.Len() == 1, nil + return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") } diff --git a/webhook/psbinding/table_test.go b/webhook/psbinding/table_test.go index 86952af91..1ae11be65 100644 --- a/webhook/psbinding/table_test.go +++ b/webhook/psbinding/table_test.go @@ -1039,7 +1039,7 @@ func TestNew(t *testing.T) { t.Fatal("Expected NewController to return a non-nil value") } - if want, got := 0, c.WorkQueue.Len(); want != got { + if want, got := 0, c.WorkQueue().Len(); want != got { t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want) } @@ -1054,7 +1054,7 @@ func TestNew(t *testing.T) { // Queue has async moving parts so if we check at the wrong moment, this might still be 0. if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { - return c.WorkQueue.Len() == 1, nil + return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") } diff --git a/webhook/resourcesemantics/defaulting/table_test.go b/webhook/resourcesemantics/defaulting/table_test.go index 3aae9f28d..37c1bedea 100644 --- a/webhook/resourcesemantics/defaulting/table_test.go +++ b/webhook/resourcesemantics/defaulting/table_test.go @@ -357,7 +357,7 @@ func TestNew(t *testing.T) { t.Fatal("Expected NewController to return a non-nil value") } - if want, got := 0, c.WorkQueue.Len(); want != got { + if want, got := 0, c.WorkQueue().Len(); want != got { t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want) } @@ -372,7 +372,7 @@ func TestNew(t *testing.T) { // Queue has async moving parts so if we check at the wrong moment, this might still be 0. if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { - return c.WorkQueue.Len() == 1, nil + return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") } diff --git a/webhook/resourcesemantics/validation/reconcile_config_test.go b/webhook/resourcesemantics/validation/reconcile_config_test.go index ba419df78..bb803208d 100644 --- a/webhook/resourcesemantics/validation/reconcile_config_test.go +++ b/webhook/resourcesemantics/validation/reconcile_config_test.go @@ -358,7 +358,7 @@ func TestNew(t *testing.T) { t.Fatal("Expected NewController to return a non-nil value") } - if want, got := 0, c.WorkQueue.Len(); want != got { + if want, got := 0, c.WorkQueue().Len(); want != got { t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want) } @@ -373,7 +373,7 @@ func TestNew(t *testing.T) { // Queue has async moving parts so if we check at the wrong moment, thist might still be 0. if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { - return c.WorkQueue.Len() == 1, nil + return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") } diff --git a/webhook/resourcesemantics/validation/validation_admit_test.go b/webhook/resourcesemantics/validation/validation_admit_test.go index 87c9dc624..47dc65522 100644 --- a/webhook/resourcesemantics/validation/validation_admit_test.go +++ b/webhook/resourcesemantics/validation/validation_admit_test.go @@ -648,7 +648,7 @@ func NewTestResourceAdmissionController(t *testing.T) *reconciler { t.Fatal("Expected NewController to return a non-nil value") } - if want, got := 0, c.WorkQueue.Len(); want != got { + if want, got := 0, c.WorkQueue().Len(); want != got { t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want) } @@ -663,7 +663,7 @@ func NewTestResourceAdmissionController(t *testing.T) *reconciler { // Queue has async moving parts so if we check at the wrong moment, this might still be 0. if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { - return c.WorkQueue.Len() == 1, nil + return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") }