From 1a3a36a996dec7645bee7084153ef8b7273e1028 Mon Sep 17 00:00:00 2001 From: Matt Moore Date: Sat, 24 Aug 2019 21:23:13 -0700 Subject: [PATCH] Rework how we GlobalResync. (#597) * Rework how we GlobalResync. This consolidates the logic for how we do GlobalResyncs in a few ways: 1. Remove SendGlobalUpdates and replace it with FilteredGlobalResync. 2. Have GlobalResync go through the FilteredGlobalResync with a tautological predicate. 3. Change the way we enqueue things during a global resync to avoid flooding the system by using EnqueueAfter and wait.Jitter to stagger how things are queued. The background for this is that when I started to benchmark reconciliation of a large number of resources we'd see random large stalls like [this](https://mako.dev/run?run_key=4882560844824576&~dl=1&~cl=1&~rl=1&~rvl=1&~il=1&~sksl=1&~pal=1) fairly consistently. Looking at the logs, these stalls coincide with incredibly deep work queues due to global resyncs triggers by configmap notifications. When I disabled global resyncs the spikes [went away](https://mako.dev/run?run_key=4975897060835328&~dl=1&~cl=1&~rl=1&~rvl=1&~il=1&~sksl=1&~pal=1). To mitigate the huge pile-up due to the global resync we use `wait.Jitter` to stagger how things are enqueued with a `maxFactor` of the length of the store. This also seems to [keep things flowing](https://mako.dev/run?run_key=5701802099998720&~dl=1&~cl=1&~rl=1&~rvl=1&~il=1&~sksl=1&~pal=1), although we will possibly need to tune things further. * Update comment to mention the delay. --- controller/controller.go | 23 ++++++++++++++++++++--- controller/controller_test.go | 25 +++++++++++++++++++++++-- controller/helper.go | 15 --------------- controller/helper_test.go | 18 ------------------ 4 files changed, 43 insertions(+), 38 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index 7c6d6b422..6c9332754 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -255,12 +256,14 @@ func (c *Impl) EnqueueLabelOfClusterScopedResource(nameLabel string) func(obj in // EnqueueKey takes a namespace/name string and puts it onto the work queue. func (c *Impl) EnqueueKey(key string) { c.WorkQueue.Add(key) + c.logger.Debugf("Adding to queue %s (depth: %d)", key, c.WorkQueue.Len()) } // EnqueueKeyAfter takes a namespace/name string and schedules its execution in // the work queue after given delay. func (c *Impl) EnqueueKeyAfter(key string, delay time.Duration) { c.WorkQueue.AddAfter(key, delay) + c.logger.Debugf("Adding to queue %s (delay: %v, depth: %d)", key, delay, c.WorkQueue.Len()) } // Run starts the controller's worker threads, the number of which is threadiness. @@ -300,6 +303,8 @@ func (c *Impl) processNextWorkItem() bool { } key := obj.(string) + c.logger.Debugf("Processing from queue %s (depth: %d)", key, c.WorkQueue.Len()) + startTime := time.Now() // Send the metrics for the current queue depth c.statsReporter.ReportQueueDepth(int64(c.WorkQueue.Len())) @@ -347,16 +352,28 @@ func (c *Impl) handleErr(err error, key string) { // Re-queue the key if it's an transient error. if !IsPermanentError(err) { c.WorkQueue.AddRateLimited(key) + c.logger.Debugf("Requeuing key %s due to non-permanent error (depth: %d)", key, c.WorkQueue.Len()) return } c.WorkQueue.Forget(key) } -// GlobalResync enqueues all objects from the passed SharedInformer +// GlobalResync enqueues (with a delay) all objects from the passed SharedInformer func (c *Impl) GlobalResync(si cache.SharedInformer) { - for _, key := range si.GetStore().ListKeys() { - c.EnqueueKey(key) + alwaysTrue := func(interface{}) bool { return true } + c.FilteredGlobalResync(alwaysTrue, si) +} + +// FilteredGlobalResync enqueues (with a delay) all objects from the +// SharedInformer that pass the filter function +func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) { + list := si.GetStore().List() + count := float64(len(list)) + for _, obj := range list { + if f(obj) { + c.EnqueueAfter(obj, wait.Jitter(time.Second, count)) + } } } diff --git a/controller/controller_test.go b/controller/controller_test.go index c6cd0b4f8..6ef4884fc 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -765,7 +765,26 @@ func (*dummyInformer) GetStore() cache.Store { } var dummyKeys = []string{"foo/bar", "bar/foo", "fizz/buzz"} -var dummyObjs = []interface{}{"foo/bar", "bar/foo", "fizz/buzz"} +var dummyObjs = []interface{}{ + &Resource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bar", + Namespace: "foo", + }, + }, + &Resource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + }, + &Resource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "buzz", + Namespace: "fizz", + }, + }, +} func (*dummyStore) ListKeys() []string { return dummyKeys @@ -790,8 +809,10 @@ func TestImplGlobalResync(t *testing.T) { impl.GlobalResync(&dummyInformer{}) + // The global resync delays enqueuing things by a second with a jitter that + // goes up to len(dummyObjs) times a second. select { - case <-time.After(10 * time.Millisecond): + case <-time.After((1 + 3) * time.Second): // We don't expect completion before the stopCh closes. case <-doneCh: t.Error("StartAll finished early.") diff --git a/controller/helper.go b/controller/helper.go index b326cc5b1..b8cc08fb0 100644 --- a/controller/helper.go +++ b/controller/helper.go @@ -19,7 +19,6 @@ package controller import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/tools/cache" "knative.dev/pkg/kmeta" ) @@ -51,17 +50,3 @@ func EnsureTypeMeta(f Callback, gvk schema.GroupVersionKind) Callback { f(copy) } } - -// SendGlobalUpdates triggers an update event for all objects from the -// passed SharedInformer. -// -// Since this is triggered not by a real update of these objects -// themselves, we have no way of knowing the change to these objects -// if any, so we call handler.OnUpdate(obj, obj) for all of them -// regardless if they have changes or not. -func SendGlobalUpdates(si cache.SharedInformer, handler cache.ResourceEventHandler) { - store := si.GetStore() - for _, obj := range store.List() { - handler.OnUpdate(obj, obj) - } -} diff --git a/controller/helper_test.go b/controller/helper_test.go index 36c2e9781..71361de7c 100644 --- a/controller/helper_test.go +++ b/controller/helper_test.go @@ -28,24 +28,6 @@ import ( . "knative.dev/pkg/testing" ) -func TestSendGlobalUpdate(t *testing.T) { - called := make(map[interface{}]bool) - handler := cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(old, new interface{}) { - called[new] = true - }, - } - SendGlobalUpdates(&dummyInformer{}, handler) - for _, obj := range dummyObjs { - if updated := called[obj]; !updated { - t.Errorf("Expected obj %v to be updated but wasn't", obj) - } - } - if len(dummyObjs) != len(called) { - t.Errorf("Expected to see %d updates, saw %d", len(dummyObjs), len(called)) - } -} - func TestEnsureTypeMeta(t *testing.T) { gvk := schema.GroupVersionKind{ Group: "foo.bar.com",