mirror of https://github.com/knative/pkg.git
Rework how we GlobalResync. (#597)
* Rework how we GlobalResync. This consolidates the logic for how we do GlobalResyncs in a few ways: 1. Remove SendGlobalUpdates and replace it with FilteredGlobalResync. 2. Have GlobalResync go through the FilteredGlobalResync with a tautological predicate. 3. Change the way we enqueue things during a global resync to avoid flooding the system by using EnqueueAfter and wait.Jitter to stagger how things are queued. The background for this is that when I started to benchmark reconciliation of a large number of resources we'd see random large stalls like [this](https://mako.dev/run?run_key=4882560844824576&~dl=1&~cl=1&~rl=1&~rvl=1&~il=1&~sksl=1&~pal=1) fairly consistently. Looking at the logs, these stalls coincide with incredibly deep work queues due to global resyncs triggers by configmap notifications. When I disabled global resyncs the spikes [went away](https://mako.dev/run?run_key=4975897060835328&~dl=1&~cl=1&~rl=1&~rvl=1&~il=1&~sksl=1&~pal=1). To mitigate the huge pile-up due to the global resync we use `wait.Jitter` to stagger how things are enqueued with a `maxFactor` of the length of the store. This also seems to [keep things flowing](https://mako.dev/run?run_key=5701802099998720&~dl=1&~cl=1&~rl=1&~rvl=1&~il=1&~sksl=1&~pal=1), although we will possibly need to tune things further. * Update comment to mention the delay.
This commit is contained in:
parent
39a29cf1bf
commit
1a3a36a996
|
@ -28,6 +28,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
@ -255,12 +256,14 @@ func (c *Impl) EnqueueLabelOfClusterScopedResource(nameLabel string) func(obj in
|
|||
// EnqueueKey takes a namespace/name string and puts it onto the work queue.
|
||||
func (c *Impl) EnqueueKey(key string) {
|
||||
c.WorkQueue.Add(key)
|
||||
c.logger.Debugf("Adding to queue %s (depth: %d)", key, c.WorkQueue.Len())
|
||||
}
|
||||
|
||||
// EnqueueKeyAfter takes a namespace/name string and schedules its execution in
|
||||
// the work queue after given delay.
|
||||
func (c *Impl) EnqueueKeyAfter(key string, delay time.Duration) {
|
||||
c.WorkQueue.AddAfter(key, delay)
|
||||
c.logger.Debugf("Adding to queue %s (delay: %v, depth: %d)", key, delay, c.WorkQueue.Len())
|
||||
}
|
||||
|
||||
// Run starts the controller's worker threads, the number of which is threadiness.
|
||||
|
@ -300,6 +303,8 @@ func (c *Impl) processNextWorkItem() bool {
|
|||
}
|
||||
key := obj.(string)
|
||||
|
||||
c.logger.Debugf("Processing from queue %s (depth: %d)", key, c.WorkQueue.Len())
|
||||
|
||||
startTime := time.Now()
|
||||
// Send the metrics for the current queue depth
|
||||
c.statsReporter.ReportQueueDepth(int64(c.WorkQueue.Len()))
|
||||
|
@ -347,16 +352,28 @@ func (c *Impl) handleErr(err error, key string) {
|
|||
// Re-queue the key if it's an transient error.
|
||||
if !IsPermanentError(err) {
|
||||
c.WorkQueue.AddRateLimited(key)
|
||||
c.logger.Debugf("Requeuing key %s due to non-permanent error (depth: %d)", key, c.WorkQueue.Len())
|
||||
return
|
||||
}
|
||||
|
||||
c.WorkQueue.Forget(key)
|
||||
}
|
||||
|
||||
// GlobalResync enqueues all objects from the passed SharedInformer
|
||||
// GlobalResync enqueues (with a delay) all objects from the passed SharedInformer
|
||||
func (c *Impl) GlobalResync(si cache.SharedInformer) {
|
||||
for _, key := range si.GetStore().ListKeys() {
|
||||
c.EnqueueKey(key)
|
||||
alwaysTrue := func(interface{}) bool { return true }
|
||||
c.FilteredGlobalResync(alwaysTrue, si)
|
||||
}
|
||||
|
||||
// FilteredGlobalResync enqueues (with a delay) all objects from the
|
||||
// SharedInformer that pass the filter function
|
||||
func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) {
|
||||
list := si.GetStore().List()
|
||||
count := float64(len(list))
|
||||
for _, obj := range list {
|
||||
if f(obj) {
|
||||
c.EnqueueAfter(obj, wait.Jitter(time.Second, count))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -765,7 +765,26 @@ func (*dummyInformer) GetStore() cache.Store {
|
|||
}
|
||||
|
||||
var dummyKeys = []string{"foo/bar", "bar/foo", "fizz/buzz"}
|
||||
var dummyObjs = []interface{}{"foo/bar", "bar/foo", "fizz/buzz"}
|
||||
var dummyObjs = []interface{}{
|
||||
&Resource{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "bar",
|
||||
Namespace: "foo",
|
||||
},
|
||||
},
|
||||
&Resource{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: "bar",
|
||||
},
|
||||
},
|
||||
&Resource{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "buzz",
|
||||
Namespace: "fizz",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func (*dummyStore) ListKeys() []string {
|
||||
return dummyKeys
|
||||
|
@ -790,8 +809,10 @@ func TestImplGlobalResync(t *testing.T) {
|
|||
|
||||
impl.GlobalResync(&dummyInformer{})
|
||||
|
||||
// The global resync delays enqueuing things by a second with a jitter that
|
||||
// goes up to len(dummyObjs) times a second.
|
||||
select {
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
case <-time.After((1 + 3) * time.Second):
|
||||
// We don't expect completion before the stopCh closes.
|
||||
case <-doneCh:
|
||||
t.Error("StartAll finished early.")
|
||||
|
|
|
@ -19,7 +19,6 @@ package controller
|
|||
import (
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"knative.dev/pkg/kmeta"
|
||||
)
|
||||
|
@ -51,17 +50,3 @@ func EnsureTypeMeta(f Callback, gvk schema.GroupVersionKind) Callback {
|
|||
f(copy)
|
||||
}
|
||||
}
|
||||
|
||||
// SendGlobalUpdates triggers an update event for all objects from the
|
||||
// passed SharedInformer.
|
||||
//
|
||||
// Since this is triggered not by a real update of these objects
|
||||
// themselves, we have no way of knowing the change to these objects
|
||||
// if any, so we call handler.OnUpdate(obj, obj) for all of them
|
||||
// regardless if they have changes or not.
|
||||
func SendGlobalUpdates(si cache.SharedInformer, handler cache.ResourceEventHandler) {
|
||||
store := si.GetStore()
|
||||
for _, obj := range store.List() {
|
||||
handler.OnUpdate(obj, obj)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,24 +28,6 @@ import (
|
|||
. "knative.dev/pkg/testing"
|
||||
)
|
||||
|
||||
func TestSendGlobalUpdate(t *testing.T) {
|
||||
called := make(map[interface{}]bool)
|
||||
handler := cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
called[new] = true
|
||||
},
|
||||
}
|
||||
SendGlobalUpdates(&dummyInformer{}, handler)
|
||||
for _, obj := range dummyObjs {
|
||||
if updated := called[obj]; !updated {
|
||||
t.Errorf("Expected obj %v to be updated but wasn't", obj)
|
||||
}
|
||||
}
|
||||
if len(dummyObjs) != len(called) {
|
||||
t.Errorf("Expected to see %d updates, saw %d", len(dummyObjs), len(called))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureTypeMeta(t *testing.T) {
|
||||
gvk := schema.GroupVersionKind{
|
||||
Group: "foo.bar.com",
|
||||
|
|
Loading…
Reference in New Issue