Use slow lane to do global resync (#1528)

* Use slow lane to do global resync

* cmt

* yolo

* yolo v2

* fix log str

* fixes

* publicize things

* renamemove
This commit is contained in:
Victor Agababov 2020-07-21 13:11:54 -07:00 committed by GitHub
parent c0a9ec7f11
commit 08156c67f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 92 additions and 59 deletions

View File

@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
@ -186,12 +185,14 @@ type Impl struct {
// from the workqueue to process. Public for testing.
Reconciler Reconciler
// WorkQueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
WorkQueue workqueue.RateLimitingInterface
// workQueue is a rate-limited two-lane work queue.
// This is used to queue work to be processed instead of performing it as
// soon as a change happens. This means we can ensure we only process a
// fixed amount of resources at a time, and makes it easy to ensure we are
// never processing the same item simultaneously in two different workers.
// The slow queue is used for global resync and other background processes
// which are not required to complete at the highest priority.
workQueue *twoLaneQueue
// Sugared logger is easier to use but is not as performant as the
// raw logger. In performance critical paths, call logger.Desugar()
@ -215,12 +216,17 @@ func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName str
return &Impl{
Name: workQueueName,
Reconciler: r,
WorkQueue: newTwoLaneWorkQueue(workQueueName),
workQueue: newTwoLaneWorkQueue(workQueueName),
logger: logger,
statsReporter: reporter,
}
}
// WorkQueue permits direct access to the work queue.
func (c *Impl) WorkQueue() workqueue.RateLimitingInterface {
return c.workQueue
}
// EnqueueAfter takes a resource, converts it into a namespace/name string,
// and passes it to EnqueueKey.
func (c *Impl) EnqueueAfter(obj interface{}, after time.Duration) {
@ -232,6 +238,25 @@ func (c *Impl) EnqueueAfter(obj interface{}, after time.Duration) {
c.EnqueueKeyAfter(types.NamespacedName{Namespace: object.GetNamespace(), Name: object.GetName()}, after)
}
// EnqueueSlowKey takes a resource, converts it into a namespace/name string,
// and enqueues that key in the slow lane.
func (c *Impl) EnqueueSlowKey(key types.NamespacedName) {
c.workQueue.SlowLane().Add(key)
c.logger.Debugf("Adding to the slow queue %s (depth(total/slow): %d/%d)", safeKey(key), c.workQueue.Len(), c.workQueue.SlowLane().Len())
}
// EnqueueSlow extracts namesspeced name from the object and enqueues it on the slow
// work queue.
func (c *Impl) EnqueueSlow(obj interface{}) {
object, err := kmeta.DeletionHandlingAccessor(obj)
if err != nil {
c.logger.Errorw("EnqueueSlow", zap.Error(err))
return
}
key := types.NamespacedName{Namespace: object.GetNamespace(), Name: object.GetName()}
c.EnqueueSlowKey(key)
}
// Enqueue takes a resource, converts it into a namespace/name string,
// and passes it to EnqueueKey.
func (c *Impl) Enqueue(obj interface{}) {
@ -342,23 +367,23 @@ func (c *Impl) EnqueueNamespaceOf(obj interface{}) {
// EnqueueKey takes a namespace/name string and puts it onto the work queue.
func (c *Impl) EnqueueKey(key types.NamespacedName) {
c.WorkQueue.Add(key)
c.logger.Debugf("Adding to queue %s (depth: %d)", safeKey(key), c.WorkQueue.Len())
c.workQueue.Add(key)
c.logger.Debugf("Adding to queue %s (depth: %d)", safeKey(key), c.workQueue.Len())
}
// MaybeEnqueueBucketKey takes a Bucket and namespace/name string and puts it onto the work queue.
// MaybeEnqueueBucketKey takes a Bucket and namespace/name string and puts it onto
// the slow work queue.
func (c *Impl) MaybeEnqueueBucketKey(bkt reconciler.Bucket, key types.NamespacedName) {
if bkt.Has(key) {
c.WorkQueue.Add(key)
c.logger.Debugf("Adding to queue %s (depth: %d)", safeKey(key), c.WorkQueue.Len())
c.EnqueueSlowKey(key)
}
}
// EnqueueKeyAfter takes a namespace/name string and schedules its execution in
// the work queue after given delay.
func (c *Impl) EnqueueKeyAfter(key types.NamespacedName, delay time.Duration) {
c.WorkQueue.AddAfter(key, delay)
c.logger.Debugf("Adding to queue %s (delay: %v, depth: %d)", safeKey(key), delay, c.WorkQueue.Len())
c.workQueue.AddAfter(key, delay)
c.logger.Debugf("Adding to queue %s (delay: %v, depth: %d)", safeKey(key), delay, c.workQueue.Len())
}
// RunContext starts the controller's worker threads, the number of which is threadiness.
@ -372,8 +397,8 @@ func (c *Impl) RunContext(ctx context.Context, threadiness int) error {
sg := sync.WaitGroup{}
defer sg.Wait()
defer func() {
c.WorkQueue.ShutDown()
for c.WorkQueue.Len() > 0 {
c.workQueue.ShutDown()
for c.workQueue.Len() > 0 {
time.Sleep(time.Millisecond * 100)
}
}()
@ -423,25 +448,25 @@ func (c *Impl) Run(threadiness int, stopCh <-chan struct{}) error {
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling Reconcile on our Reconciler.
func (c *Impl) processNextWorkItem() bool {
obj, shutdown := c.WorkQueue.Get()
obj, shutdown := c.workQueue.Get()
if shutdown {
return false
}
key := obj.(types.NamespacedName)
keyStr := safeKey(key)
c.logger.Debugf("Processing from queue %s (depth: %d)", safeKey(key), c.WorkQueue.Len())
c.logger.Debugf("Processing from queue %s (depth: %d)", safeKey(key), c.workQueue.Len())
startTime := time.Now()
// Send the metrics for the current queue depth
c.statsReporter.ReportQueueDepth(int64(c.WorkQueue.Len()))
c.statsReporter.ReportQueueDepth(int64(c.workQueue.Len()))
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if
// reconcile succeeds. If a transient error occurs, we do not call
// Forget and put the item back to the queue with an increased
// delay.
defer c.WorkQueue.Done(key)
defer c.workQueue.Done(key)
var err error
defer func() {
@ -467,7 +492,7 @@ func (c *Impl) processNextWorkItem() bool {
// Finally, if no error occurs we Forget this item so it does not
// have any delay when another change happens.
c.WorkQueue.Forget(key)
c.workQueue.Forget(key)
logger.Info("Reconcile succeeded. Time taken: ", time.Since(startTime))
return true
@ -480,16 +505,16 @@ func (c *Impl) handleErr(err error, key types.NamespacedName) {
// We want to check that the queue is shutting down here
// since controller Run might have exited by now (since while this item was
// being processed, queue.Len==0).
if !IsPermanentError(err) && !c.WorkQueue.ShuttingDown() {
c.WorkQueue.AddRateLimited(key)
c.logger.Debugf("Requeuing key %s due to non-permanent error (depth: %d)", safeKey(key), c.WorkQueue.Len())
if !IsPermanentError(err) && !c.workQueue.ShuttingDown() {
c.workQueue.AddRateLimited(key)
c.logger.Debugf("Requeuing key %s due to non-permanent error (depth: %d)", safeKey(key), c.workQueue.Len())
return
}
c.WorkQueue.Forget(key)
c.workQueue.Forget(key)
}
// GlobalResync enqueues (with a delay) all objects from the passed SharedInformer
// GlobalResync enqueues into the slow lane all objects from the passed SharedInformer
func (c *Impl) GlobalResync(si cache.SharedInformer) {
alwaysTrue := func(interface{}) bool { return true }
c.FilteredGlobalResync(alwaysTrue, si)
@ -498,14 +523,13 @@ func (c *Impl) GlobalResync(si cache.SharedInformer) {
// FilteredGlobalResync enqueues (with a delay) all objects from the
// SharedInformer that pass the filter function
func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) {
if c.WorkQueue.ShuttingDown() {
if c.workQueue.ShuttingDown() {
return
}
list := si.GetStore().List()
count := float64(len(list))
for _, obj := range list {
if f(obj) {
c.EnqueueAfter(obj, wait.Jitter(time.Second, count))
c.EnqueueSlow(obj)
}
}
}

View File

@ -464,6 +464,17 @@ func TestEnqueues(t *testing.T) {
})
},
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "foo"}},
}, {
name: "enqueue resource slow",
work: func(impl *Impl) {
impl.EnqueueSlow(&Resource{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "bar",
},
})
},
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "foo"}},
}, {
name: "enqueue sentinel resource",
work: func(impl *Impl) {
@ -720,8 +731,8 @@ func TestEnqueues(t *testing.T) {
// The rate limit on our queue delays when things are added to the queue.
time.Sleep(50 * time.Millisecond)
impl.WorkQueue.ShutDown()
gotQueue := drainWorkQueue(impl.WorkQueue)
impl.WorkQueue().ShutDown()
gotQueue := drainWorkQueue(impl.WorkQueue())
if diff := cmp.Diff(test.wantQueue, gotQueue); diff != "" {
t.Errorf("unexpected queue (-want +got): %s", diff)
@ -730,7 +741,7 @@ func TestEnqueues(t *testing.T) {
}
}
func TestEnqeueAfter(t *testing.T) {
func TestEnqueueAfter(t *testing.T) {
t.Cleanup(ClearAll)
impl := NewImplWithStats(&NopReconciler{}, TestLogger(t), "Testing", &FakeStatsReporter{})
impl.EnqueueAfter(&Resource{
@ -752,16 +763,16 @@ func TestEnqeueAfter(t *testing.T) {
},
}, 20*time.Second)
time.Sleep(10 * time.Millisecond)
if got, want := impl.WorkQueue.Len(), 0; got != want {
if got, want := impl.WorkQueue().Len(), 0; got != want {
t.Errorf("|Queue| = %d, want: %d", got, want)
}
// Sleep the remaining time.
time.Sleep(time.Second)
if got, want := impl.WorkQueue.Len(), 1; got != want {
if got, want := impl.WorkQueue().Len(), 1; got != want {
t.Errorf("|Queue| = %d, want: %d", got, want)
}
impl.WorkQueue.ShutDown()
if got, want := drainWorkQueue(impl.WorkQueue), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) {
impl.WorkQueue().ShutDown()
if got, want := drainWorkQueue(impl.WorkQueue()), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) {
t.Errorf("Queue = %v, want: %v, diff: %s", got, want, cmp.Diff(got, want))
}
}
@ -773,16 +784,16 @@ func TestEnqeueKeyAfter(t *testing.T) {
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "the", Name: "waterfall"}, 500*time.Millisecond)
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "to", Name: "fall"}, 20*time.Second)
time.Sleep(10 * time.Millisecond)
if got, want := impl.WorkQueue.Len(), 0; got != want {
if got, want := impl.WorkQueue().Len(), 0; got != want {
t.Errorf("|Queue| = %d, want: %d", got, want)
}
// Sleep the remaining time.
time.Sleep(time.Second)
if got, want := impl.WorkQueue.Len(), 1; got != want {
if got, want := impl.WorkQueue().Len(), 1; got != want {
t.Errorf("|Queue| = %d, want: %d", got, want)
}
impl.WorkQueue.ShutDown()
if got, want := drainWorkQueue(impl.WorkQueue), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) {
impl.WorkQueue().ShutDown()
if got, want := drainWorkQueue(impl.WorkQueue()), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) {
t.Errorf("Queue = %v, want: %v, diff: %s", got, want, cmp.Diff(got, want))
}
}
@ -1007,7 +1018,7 @@ func TestStartAndShutdownWithWork(t *testing.T) {
if got, want := r.count, 1; got != want {
t.Errorf("reconcile count = %v, wanted %v", got, want)
}
if got, want := impl.WorkQueue.NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want {
if got, want := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want {
t.Errorf("requeues = %v, wanted %v", got, want)
}
@ -1093,7 +1104,7 @@ func TestStartAndShutdownWithErroringWork(t *testing.T) {
// As NumRequeues can't fully reflect the real state of queue length.
// Here we need to wait for NumRequeues to be more than 1, to ensure
// the key get re-queued and reprocessed as expect.
if got, wantAtLeast := impl.WorkQueue.NumRequeues(types.NamespacedName{Namespace: "", Name: "bar"}), 2; got < wantAtLeast {
if got, wantAtLeast := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "", Name: "bar"}), 2; got < wantAtLeast {
t.Errorf("Requeue count = %v, wanted at least %v", got, wantAtLeast)
}
}
@ -1137,7 +1148,7 @@ func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) {
}
// Check that the work was not requeued in RateLimiter.
if got, want := impl.WorkQueue.NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want {
if got, want := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want {
t.Errorf("Requeue count = %v, wanted %v", got, want)
}

View File

@ -16,9 +16,7 @@ limitations under the License.
package controller
import (
"k8s.io/client-go/util/workqueue"
)
import "k8s.io/client-go/util/workqueue"
// twoLaneQueue is a rate limited queue that wraps around two queues
// -- fast queue (anonymously aliased), whose contents are processed with priority.

View File

@ -229,7 +229,7 @@ func TestNew(t *testing.T) {
t.Fatal("Expected NewController to return a non-nil value")
}
if want, got := 0, c.WorkQueue.Len(); want != got {
if want, got := 0, c.WorkQueue().Len(); want != got {
t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want)
}
@ -244,7 +244,7 @@ func TestNew(t *testing.T) {
// Queue has async moving parts so if we check at the wrong moment, this might still be 0.
if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return c.WorkQueue.Len() == 1, nil
return c.WorkQueue().Len() == 1, nil
}) != nil {
t.Error("Queue length was never 1")
}

View File

@ -327,7 +327,7 @@ func TestNew(t *testing.T) {
t.Fatal("Expected NewController to return a non-nil value")
}
if want, got := 0, c.WorkQueue.Len(); want != got {
if want, got := 0, c.WorkQueue().Len(); want != got {
t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want)
}
@ -342,7 +342,7 @@ func TestNew(t *testing.T) {
// Queue has async moving parts so if we check at the wrong moment, this might still be 0.
if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return c.WorkQueue.Len() == 1, nil
return c.WorkQueue().Len() == 1, nil
}) != nil {
t.Error("Queue length was never 1")
}

View File

@ -1039,7 +1039,7 @@ func TestNew(t *testing.T) {
t.Fatal("Expected NewController to return a non-nil value")
}
if want, got := 0, c.WorkQueue.Len(); want != got {
if want, got := 0, c.WorkQueue().Len(); want != got {
t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want)
}
@ -1054,7 +1054,7 @@ func TestNew(t *testing.T) {
// Queue has async moving parts so if we check at the wrong moment, this might still be 0.
if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return c.WorkQueue.Len() == 1, nil
return c.WorkQueue().Len() == 1, nil
}) != nil {
t.Error("Queue length was never 1")
}

View File

@ -357,7 +357,7 @@ func TestNew(t *testing.T) {
t.Fatal("Expected NewController to return a non-nil value")
}
if want, got := 0, c.WorkQueue.Len(); want != got {
if want, got := 0, c.WorkQueue().Len(); want != got {
t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want)
}
@ -372,7 +372,7 @@ func TestNew(t *testing.T) {
// Queue has async moving parts so if we check at the wrong moment, this might still be 0.
if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return c.WorkQueue.Len() == 1, nil
return c.WorkQueue().Len() == 1, nil
}) != nil {
t.Error("Queue length was never 1")
}

View File

@ -358,7 +358,7 @@ func TestNew(t *testing.T) {
t.Fatal("Expected NewController to return a non-nil value")
}
if want, got := 0, c.WorkQueue.Len(); want != got {
if want, got := 0, c.WorkQueue().Len(); want != got {
t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want)
}
@ -373,7 +373,7 @@ func TestNew(t *testing.T) {
// Queue has async moving parts so if we check at the wrong moment, thist might still be 0.
if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return c.WorkQueue.Len() == 1, nil
return c.WorkQueue().Len() == 1, nil
}) != nil {
t.Error("Queue length was never 1")
}

View File

@ -648,7 +648,7 @@ func NewTestResourceAdmissionController(t *testing.T) *reconciler {
t.Fatal("Expected NewController to return a non-nil value")
}
if want, got := 0, c.WorkQueue.Len(); want != got {
if want, got := 0, c.WorkQueue().Len(); want != got {
t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want)
}
@ -663,7 +663,7 @@ func NewTestResourceAdmissionController(t *testing.T) *reconciler {
// Queue has async moving parts so if we check at the wrong moment, this might still be 0.
if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return c.WorkQueue.Len() == 1, nil
return c.WorkQueue().Len() == 1, nil
}) != nil {
t.Error("Queue length was never 1")
}