diff --git a/controller/controller.go b/controller/controller.go index 551088341..e32db29bd 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -205,20 +205,40 @@ type Impl struct { statsReporter StatsReporter } +// ControllerOptions encapsulates options for creating a new controller, +// including throttling and stats behavior. +type ControllerOptions struct { + WorkQueueName string + Logger *zap.SugaredLogger + Reporter StatsReporter + RateLimiter workqueue.RateLimiter +} + // NewImpl instantiates an instance of our controller that will feed work to the // provided Reconciler as it is enqueued. func NewImpl(r Reconciler, logger *zap.SugaredLogger, workQueueName string) *Impl { - return NewImplWithStats(r, logger, workQueueName, MustNewStatsReporter(workQueueName, logger)) + return NewImplFull(r, ControllerOptions{WorkQueueName: workQueueName, Logger: logger}) } func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName string, reporter StatsReporter) *Impl { - logger = logger.Named(workQueueName) + return NewImplFull(r, ControllerOptions{WorkQueueName: workQueueName, Logger: logger, Reporter: reporter}) +} + +// NewImplFull accepts the full set of options available to all controllers. +func NewImplFull(r Reconciler, options ControllerOptions) *Impl { + logger := options.Logger.Named(options.WorkQueueName) + if options.RateLimiter == nil { + options.RateLimiter = workqueue.DefaultControllerRateLimiter() + } + if options.Reporter == nil { + options.Reporter = MustNewStatsReporter(options.WorkQueueName, options.Logger) + } return &Impl{ - Name: workQueueName, + Name: options.WorkQueueName, Reconciler: r, - workQueue: newTwoLaneWorkQueue(workQueueName), + workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter), logger: logger, - statsReporter: reporter, + statsReporter: options.Reporter, } } diff --git a/controller/controller_test.go b/controller/controller_test.go index 56e63c676..70ec2e5f0 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -424,6 +424,17 @@ func (nr *NopReconciler) Reconcile(context.Context, string) error { return nil } +type testRateLimiter struct { + t *testing.T + delay time.Duration +} + +func (t testRateLimiter) When(interface{}) time.Duration { return t.delay } +func (t testRateLimiter) Forget(interface{}) {} +func (t testRateLimiter) NumRequeues(interface{}) int { return 0 } + +var _ workqueue.RateLimiter = (*testRateLimiter)(nil) + func TestEnqueues(t *testing.T) { tests := []struct { name string @@ -723,6 +734,24 @@ func TestEnqueues(t *testing.T) { }, }} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Cleanup(ClearAll) + var rl workqueue.RateLimiter = testRateLimiter{t, 100 * time.Millisecond} + impl := NewImplFull(&NopReconciler{}, ControllerOptions{WorkQueueName: "Testing", Logger: TestLogger(t), RateLimiter: rl}) + test.work(impl) + + // The rate limit on our queue delays when things are added to the queue. + time.Sleep(150 * time.Millisecond) + impl.WorkQueue().ShutDown() + gotQueue := drainWorkQueue(impl.WorkQueue()) + + if diff := cmp.Diff(test.wantQueue, gotQueue); diff != "" { + t.Errorf("unexpected queue (-want +got): %s", diff) + } + }) + } + for _, test := range tests { t.Run(test.name, func(t *testing.T) { t.Cleanup(ClearAll) diff --git a/controller/two_lane_queue.go b/controller/two_lane_queue.go index ebd53df5b..919cc9006 100644 --- a/controller/two_lane_queue.go +++ b/controller/two_lane_queue.go @@ -37,8 +37,7 @@ type twoLaneQueue struct { } // Creates a new twoLaneQueue. -func newTwoLaneWorkQueue(name string) *twoLaneQueue { - rl := workqueue.DefaultControllerRateLimiter() +func newTwoLaneWorkQueue(name string, rl workqueue.RateLimiter) *twoLaneQueue { tlq := &twoLaneQueue{ RateLimitingInterface: workqueue.NewNamedRateLimitingQueue( rl, diff --git a/controller/two_lane_queue_test.go b/controller/two_lane_queue_test.go index 9c512f35f..a6998e735 100644 --- a/controller/two_lane_queue_test.go +++ b/controller/two_lane_queue_test.go @@ -22,10 +22,75 @@ import ( "time" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" ) +type chanRateLimiter struct { + t *testing.T + // Called when this ratelimiter is consulted for when to process a value. + whenCalled chan interface{} + retryCount map[interface{}]int +} + +func (r chanRateLimiter) When(item interface{}) time.Duration { + r.whenCalled <- item + return 0 * time.Second +} +func (r chanRateLimiter) Forget(item interface{}) { + r.t.Fatalf("Forgetting item %+v, we should not be forgetting any items.", item) +} +func (r chanRateLimiter) NumRequeues(item interface{}) int { + return r.retryCount[item] +} + +var _ workqueue.RateLimiter = &chanRateLimiter{} + +func TestRateLimit(t *testing.T) { + whenCalled := make(chan interface{}, 2) + rl := chanRateLimiter{ + t, + whenCalled, + make(map[interface{}]int), + } + q := newTwoLaneWorkQueue("live-in-the-limited-lane", rl) + q.SlowLane().Add("1") + q.Add("2") + + k, done := q.Get() + q.Done(k) + q.AddRateLimited(k) + rlK := <-whenCalled + if got, want := k.(string), rlK.(string); got != want { + t.Errorf(`Got = %q, want: "%q"`, got, want) + } + if done { + t.Error("The queue is unexpectedly shutdown") + } + + k2, done := q.Get() + q.Done(k2) + q.AddRateLimited(k2) + rlK2 := <-whenCalled + if got, want := k2.(string), rlK2.(string); got != want { + t.Errorf(`Got = %q, want: "%q"`, got, want) + } + if s1, s2 := k.(string), k2.(string); (s1 != "1" || s2 != "2") && (s1 != "2" || s2 != "1") { + t.Errorf("Expected to see 1 and 2, instead saw %q and %q", s1, s2) + } + if done { + t.Error("The queue is unexpectedly shutdown") + } + + // Queue has async moving parts so if we check at the wrong moment, this might still be 0 or 1. + if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + return q.Len() == 2, nil + }) != nil { + t.Error("Queue length was never 2") + } +} + func TestSlowQueue(t *testing.T) { - q := newTwoLaneWorkQueue("live-in-the-fast-lane") + q := newTwoLaneWorkQueue("live-in-the-fast-lane", workqueue.DefaultControllerRateLimiter()) q.SlowLane().Add("1") // Queue has async moving parts so if we check at the wrong moment, this might still be 0. if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { @@ -53,7 +118,7 @@ func TestSlowQueue(t *testing.T) { func TestDoubleKey(t *testing.T) { // Verifies that we don't get double concurrent processing of the same key. - q := newTwoLaneWorkQueue("live-in-the-fast-lane") + q := newTwoLaneWorkQueue("live-in-the-fast-lane", workqueue.DefaultControllerRateLimiter()) q.Add("1") t.Cleanup(q.ShutDown) @@ -97,7 +162,7 @@ func TestDoubleKey(t *testing.T) { func TestOrder(t *testing.T) { // Verifies that we read from the fast queue first. - q := newTwoLaneWorkQueue("live-in-the-fast-lane") + q := newTwoLaneWorkQueue("live-in-the-fast-lane", workqueue.DefaultControllerRateLimiter()) stop := make(chan struct{}) t.Cleanup(func() { close(stop)