Allow creating controller with custom RateLimiter. (#1546)

* Allow creating controller with custom RateLimiter.

Which was possible before via field modification.
Not switching to a builder pattern mostly for speed of resolution.
Happy to consider alternatives.

* Add tests for new functionality.

Specifically, these test that the Wait() function is notified about
the item, and that the RateLimiter is passed through to the queue.

* Add Options. Gophers love Options.

* Even moar controller GenericOptions.

* Attempt to appease lint, don't create struct for typecheck.

* GenericOptions -> ControllerOptions

* Public struct fields.
This commit is contained in:
Jon Donovan 2020-08-03 14:31:28 -07:00 committed by GitHub
parent ba855eb0ac
commit 7be5c0a87b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 123 additions and 10 deletions

View File

@ -205,20 +205,40 @@ type Impl struct {
statsReporter StatsReporter
}
// ControllerOptions encapsulates options for creating a new controller,
// including throttling and stats behavior.
type ControllerOptions struct {
WorkQueueName string
Logger *zap.SugaredLogger
Reporter StatsReporter
RateLimiter workqueue.RateLimiter
}
// NewImpl instantiates an instance of our controller that will feed work to the
// provided Reconciler as it is enqueued.
func NewImpl(r Reconciler, logger *zap.SugaredLogger, workQueueName string) *Impl {
return NewImplWithStats(r, logger, workQueueName, MustNewStatsReporter(workQueueName, logger))
return NewImplFull(r, ControllerOptions{WorkQueueName: workQueueName, Logger: logger})
}
func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName string, reporter StatsReporter) *Impl {
logger = logger.Named(workQueueName)
return NewImplFull(r, ControllerOptions{WorkQueueName: workQueueName, Logger: logger, Reporter: reporter})
}
// NewImplFull accepts the full set of options available to all controllers.
func NewImplFull(r Reconciler, options ControllerOptions) *Impl {
logger := options.Logger.Named(options.WorkQueueName)
if options.RateLimiter == nil {
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
}
if options.Reporter == nil {
options.Reporter = MustNewStatsReporter(options.WorkQueueName, options.Logger)
}
return &Impl{
Name: workQueueName,
Name: options.WorkQueueName,
Reconciler: r,
workQueue: newTwoLaneWorkQueue(workQueueName),
workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter),
logger: logger,
statsReporter: reporter,
statsReporter: options.Reporter,
}
}

View File

@ -424,6 +424,17 @@ func (nr *NopReconciler) Reconcile(context.Context, string) error {
return nil
}
type testRateLimiter struct {
t *testing.T
delay time.Duration
}
func (t testRateLimiter) When(interface{}) time.Duration { return t.delay }
func (t testRateLimiter) Forget(interface{}) {}
func (t testRateLimiter) NumRequeues(interface{}) int { return 0 }
var _ workqueue.RateLimiter = (*testRateLimiter)(nil)
func TestEnqueues(t *testing.T) {
tests := []struct {
name string
@ -723,6 +734,24 @@ func TestEnqueues(t *testing.T) {
},
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Cleanup(ClearAll)
var rl workqueue.RateLimiter = testRateLimiter{t, 100 * time.Millisecond}
impl := NewImplFull(&NopReconciler{}, ControllerOptions{WorkQueueName: "Testing", Logger: TestLogger(t), RateLimiter: rl})
test.work(impl)
// The rate limit on our queue delays when things are added to the queue.
time.Sleep(150 * time.Millisecond)
impl.WorkQueue().ShutDown()
gotQueue := drainWorkQueue(impl.WorkQueue())
if diff := cmp.Diff(test.wantQueue, gotQueue); diff != "" {
t.Errorf("unexpected queue (-want +got): %s", diff)
}
})
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Cleanup(ClearAll)

View File

@ -37,8 +37,7 @@ type twoLaneQueue struct {
}
// Creates a new twoLaneQueue.
func newTwoLaneWorkQueue(name string) *twoLaneQueue {
rl := workqueue.DefaultControllerRateLimiter()
func newTwoLaneWorkQueue(name string, rl workqueue.RateLimiter) *twoLaneQueue {
tlq := &twoLaneQueue{
RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(
rl,

View File

@ -22,10 +22,75 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)
type chanRateLimiter struct {
t *testing.T
// Called when this ratelimiter is consulted for when to process a value.
whenCalled chan interface{}
retryCount map[interface{}]int
}
func (r chanRateLimiter) When(item interface{}) time.Duration {
r.whenCalled <- item
return 0 * time.Second
}
func (r chanRateLimiter) Forget(item interface{}) {
r.t.Fatalf("Forgetting item %+v, we should not be forgetting any items.", item)
}
func (r chanRateLimiter) NumRequeues(item interface{}) int {
return r.retryCount[item]
}
var _ workqueue.RateLimiter = &chanRateLimiter{}
func TestRateLimit(t *testing.T) {
whenCalled := make(chan interface{}, 2)
rl := chanRateLimiter{
t,
whenCalled,
make(map[interface{}]int),
}
q := newTwoLaneWorkQueue("live-in-the-limited-lane", rl)
q.SlowLane().Add("1")
q.Add("2")
k, done := q.Get()
q.Done(k)
q.AddRateLimited(k)
rlK := <-whenCalled
if got, want := k.(string), rlK.(string); got != want {
t.Errorf(`Got = %q, want: "%q"`, got, want)
}
if done {
t.Error("The queue is unexpectedly shutdown")
}
k2, done := q.Get()
q.Done(k2)
q.AddRateLimited(k2)
rlK2 := <-whenCalled
if got, want := k2.(string), rlK2.(string); got != want {
t.Errorf(`Got = %q, want: "%q"`, got, want)
}
if s1, s2 := k.(string), k2.(string); (s1 != "1" || s2 != "2") && (s1 != "2" || s2 != "1") {
t.Errorf("Expected to see 1 and 2, instead saw %q and %q", s1, s2)
}
if done {
t.Error("The queue is unexpectedly shutdown")
}
// Queue has async moving parts so if we check at the wrong moment, this might still be 0 or 1.
if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return q.Len() == 2, nil
}) != nil {
t.Error("Queue length was never 2")
}
}
func TestSlowQueue(t *testing.T) {
q := newTwoLaneWorkQueue("live-in-the-fast-lane")
q := newTwoLaneWorkQueue("live-in-the-fast-lane", workqueue.DefaultControllerRateLimiter())
q.SlowLane().Add("1")
// Queue has async moving parts so if we check at the wrong moment, this might still be 0.
if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
@ -53,7 +118,7 @@ func TestSlowQueue(t *testing.T) {
func TestDoubleKey(t *testing.T) {
// Verifies that we don't get double concurrent processing of the same key.
q := newTwoLaneWorkQueue("live-in-the-fast-lane")
q := newTwoLaneWorkQueue("live-in-the-fast-lane", workqueue.DefaultControllerRateLimiter())
q.Add("1")
t.Cleanup(q.ShutDown)
@ -97,7 +162,7 @@ func TestDoubleKey(t *testing.T) {
func TestOrder(t *testing.T) {
// Verifies that we read from the fast queue first.
q := newTwoLaneWorkQueue("live-in-the-fast-lane")
q := newTwoLaneWorkQueue("live-in-the-fast-lane", workqueue.DefaultControllerRateLimiter())
stop := make(chan struct{})
t.Cleanup(func() {
close(stop)