Stabilize the controller unit tests. (#482)

This commit is contained in:
Matt Moore 2019-06-23 17:16:00 -07:00 committed by Knative Prow Robot
parent ca175939db
commit 7538724784
1 changed files with 35 additions and 21 deletions

View File

@ -482,6 +482,7 @@ func TestEnqueues(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
defer ClearAll()
impl := NewImplWithStats(&NopReconciler{}, TestLogger(t), "Testing", &FakeStatsReporter{})
test.work(impl)
@ -498,56 +499,58 @@ func TestEnqueues(t *testing.T) {
}
func TestEnqeueAfter(t *testing.T) {
defer ClearAll()
impl := NewImplWithStats(&NopReconciler{}, TestLogger(t), "Testing", &FakeStatsReporter{})
impl.EnqueueAfter(&Resource{
ObjectMeta: metav1.ObjectMeta{
Name: "for",
Namespace: "waiting",
},
}, time.Second)
}, 5*time.Second)
impl.EnqueueAfter(&Resource{
ObjectMeta: metav1.ObjectMeta{
Name: "waterfall",
Namespace: "the",
},
}, 300*time.Millisecond)
}, 500*time.Millisecond)
impl.EnqueueAfter(&Resource{
ObjectMeta: metav1.ObjectMeta{
Name: "to",
Namespace: "fall",
},
}, 2*time.Second)
time.Sleep(50 * time.Millisecond)
}, 20*time.Second)
time.Sleep(10 * time.Millisecond)
if got, want := impl.WorkQueue.Len(), 0; got != want {
t.Errorf("|Queue| = %d, want: %d", got, want)
}
// Sleep the remaining time.
time.Sleep(time.Second - 50*time.Millisecond)
if got, want := impl.WorkQueue.Len(), 2; got != want {
time.Sleep(time.Second)
if got, want := impl.WorkQueue.Len(), 1; got != want {
t.Errorf("|Queue| = %d, want: %d", got, want)
}
impl.WorkQueue.ShutDown()
if got, want := drainWorkQueue(impl.WorkQueue), []string{"the/waterfall", "waiting/for"}; !cmp.Equal(got, want) {
if got, want := drainWorkQueue(impl.WorkQueue), []string{"the/waterfall"}; !cmp.Equal(got, want) {
t.Errorf("Queue = %v, want: %v, diff: %s", got, want, cmp.Diff(got, want))
}
}
func TestEnqeueKeyAfter(t *testing.T) {
defer ClearAll()
impl := NewImplWithStats(&NopReconciler{}, TestLogger(t), "Testing", &FakeStatsReporter{})
impl.EnqueueKeyAfter("waiting/for", time.Second)
impl.EnqueueKeyAfter("the/waterfall", time.Second>>1)
impl.EnqueueKeyAfter("to/fall", time.Second<<1)
time.Sleep(50 * time.Millisecond)
impl.EnqueueKeyAfter("waiting/for", 5*time.Second)
impl.EnqueueKeyAfter("the/waterfall", 500*time.Millisecond)
impl.EnqueueKeyAfter("to/fall", 20*time.Second)
time.Sleep(10 * time.Millisecond)
if got, want := impl.WorkQueue.Len(), 0; got != want {
t.Errorf("|Queue| = %d, want: %d", got, want)
}
// Sleep the remaining time.
time.Sleep(time.Second - 50*time.Millisecond)
if got, want := impl.WorkQueue.Len(), 2; got != want {
time.Sleep(time.Second)
if got, want := impl.WorkQueue.Len(), 1; got != want {
t.Errorf("|Queue| = %d, want: %d", got, want)
}
impl.WorkQueue.ShutDown()
if got, want := drainWorkQueue(impl.WorkQueue), []string{"the/waterfall", "waiting/for"}; !cmp.Equal(got, want) {
if got, want := drainWorkQueue(impl.WorkQueue), []string{"the/waterfall"}; !cmp.Equal(got, want) {
t.Errorf("Queue = %v, want: %v, diff: %s", got, want, cmp.Diff(got, want))
}
}
@ -565,6 +568,7 @@ func (cr *CountingReconciler) Reconcile(context.Context, string) error {
}
func TestStartAndShutdown(t *testing.T) {
defer ClearAll()
r := &CountingReconciler{}
impl := NewImplWithStats(r, TestLogger(t), "Testing", &FakeStatsReporter{})
@ -597,6 +601,7 @@ func TestStartAndShutdown(t *testing.T) {
}
func TestStartAndShutdownWithWork(t *testing.T) {
defer ClearAll()
r := &CountingReconciler{}
reporter := &FakeStatsReporter{}
impl := NewImplWithStats(r, TestLogger(t), "Testing", reporter)
@ -643,6 +648,7 @@ func (er *ErrorReconciler) Reconcile(context.Context, string) error {
}
func TestStartAndShutdownWithErroringWork(t *testing.T) {
defer ClearAll()
r := &ErrorReconciler{}
reporter := &FakeStatsReporter{}
impl := NewImplWithStats(r, TestLogger(t), "Testing", reporter)
@ -654,33 +660,39 @@ func TestStartAndShutdownWithErroringWork(t *testing.T) {
go func() {
defer close(doneCh)
// StartAll blocks until all the worker threads finish, which shouldn't
// be until we close stopCh.
StartAll(stopCh, impl)
}()
select {
case <-time.After(20 * time.Millisecond):
// We don't expect completion before the stopCh closes.
case <-time.After(1 * time.Second):
// We don't expect completion before the stopCh closes,
// but the workers should spin on the erroring work.
case <-doneCh:
t.Error("StartAll finished early.")
}
// By closing the stopCh all the workers should complete and
// we should close the doneCh.
close(stopCh)
select {
case <-time.After(1 * time.Second):
t.Error("Timed out waiting for controller to finish.")
case <-doneCh:
// We expect the work to complete.
// We expect any outstanding work to complete, for the worker
// threads to complete and for doneCh to close in a timely manner.
}
// Check that the work was requeued in RateLimiter.
// As NumRequeues can't fully reflect the real state of queue length.
// Here we need to wait for NumRequeues to be more than 1, to ensure
// the key get re-queued and reprocessed as expect.
if got, want := impl.WorkQueue.NumRequeues("foo/bar"), 3; got != want {
t.Errorf("Requeue count = %v, wanted %v", got, want)
if got, wantAtLeast := impl.WorkQueue.NumRequeues("foo/bar"), 2; got < wantAtLeast {
t.Errorf("Requeue count = %v, wanted at least %v", got, wantAtLeast)
}
checkStats(t, reporter, 3, 0, 3, falseString)
}
type PermanentErrorReconciler struct{}
@ -691,6 +703,7 @@ func (er *PermanentErrorReconciler) Reconcile(context.Context, string) error {
}
func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) {
defer ClearAll()
r := &PermanentErrorReconciler{}
reporter := &FakeStatsReporter{}
impl := NewImplWithStats(r, TestLogger(t), "Testing", reporter)
@ -763,6 +776,7 @@ func (*dummyStore) List() []interface{} {
}
func TestImplGlobalResync(t *testing.T) {
defer ClearAll()
r := &CountingReconciler{}
impl := NewImplWithStats(r, TestLogger(t), "Testing", &FakeStatsReporter{})