mirror of https://github.com/knative/pkg.git
Reduce delays in controller tests (#1603)
* TestEnqueue: remove unnecessary calls to Sleep The rate limiter applies only when multiple items are put onto the workqueue, which is not the case in those tests. Execution: ~7.6s -> ~2.1s * TestEnqueueAfter: remove assumptions on execution times Instead of sleeping for a conservative amount of time, keep watching the state of the workqueue in a goroutine, and notify the test logic as soon as the item is observed. Execution: ~1s -> ~0.05s * TestEnqueueKeyAfter: remove assumptions on execution times Instead of sleeping for a conservative amount of time, keep watching the state of the workqueue in a goroutine, and notify the test logic as soon as the item is observed. Execution: ~1s -> ~0.05s * TestStartAndShutdownWithErroringWork: remove sleep Instead of sleeping for a conservative amount of time, keep watching the state number of requeues in a goroutine, and notify the test logic as soon as the expected threshold is reached. Logs, for an idea of timings ---------------------------- Started workers Processing from queue bar (depth: 0) Reconcile error {"error": "I always error"} Requeuing key bar due to non-permanent error (depth: 0) Reconcile failed. Time taken: 104µs {"knative.dev/key": "bar"} Processing from queue bar (depth: 0) Reconcile error {"error": "I always error"} Requeuing key bar due to non-permanent error (depth: 0) Reconcile failed. Time taken: 48.2µs {"knative.dev/key": "bar"} Execution: ~1s -> ~0.01s * TestStart*/TestRun*: reduce sleep time There is no need to sleep for that long. If an error was returned, it would activate the second select case immediately. Execution: ~1s -> ~0.05s * TestImplGlobalResync: reduce sleep time We know the fast lane is empty in this test, so we can safely assume immediate enqueuing of all items on the slow lane. Logs, for an idea of timings ---------------------------- Started workers Processing from queue foo/bar (depth: 0) Reconcile succeeded. Time taken: 11.5µs {"knative.dev/key": "foo/bar"} Processing from queue bar/foo (depth: 1) Processing from queue fizz/buzz (depth: 0) Reconcile succeeded. Time taken: 9.7µs {"knative.dev/key": "fizz/buzz"} Reconcile succeeded. Time taken: 115µs {"knative.dev/key": "bar/foo"} Shutting down workers Execution: ~4s -> ~0.05s * review: Replace for/select with PollUntil * review: Remove redundant duration multiplier * review: Replace defer with t.Cleanup
This commit is contained in:
parent
2465d13e72
commit
c56f5e203b
|
@ -29,6 +29,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
fakekube "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
@ -435,7 +436,7 @@ func (t testRateLimiter) NumRequeues(interface{}) int { return 0 }
|
|||
|
||||
var _ workqueue.RateLimiter = (*testRateLimiter)(nil)
|
||||
|
||||
func TestEnqueues(t *testing.T) {
|
||||
func TestEnqueue(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
work func(*Impl)
|
||||
|
@ -741,8 +742,6 @@ func TestEnqueues(t *testing.T) {
|
|||
impl := NewImplFull(&NopReconciler{}, ControllerOptions{WorkQueueName: "Testing", Logger: TestLogger(t), RateLimiter: rl})
|
||||
test.work(impl)
|
||||
|
||||
// The rate limit on our queue delays when things are added to the queue.
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
impl.WorkQueue().ShutDown()
|
||||
gotQueue := drainWorkQueue(impl.WorkQueue())
|
||||
|
||||
|
@ -758,8 +757,6 @@ func TestEnqueues(t *testing.T) {
|
|||
impl := NewImplWithStats(&NopReconciler{}, TestLogger(t), "Testing", &FakeStatsReporter{})
|
||||
test.work(impl)
|
||||
|
||||
// The rate limit on our queue delays when things are added to the queue.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
impl.WorkQueue().ShutDown()
|
||||
gotQueue := drainWorkQueue(impl.WorkQueue())
|
||||
|
||||
|
@ -771,59 +768,149 @@ func TestEnqueues(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEnqueueAfter(t *testing.T) {
|
||||
const (
|
||||
// longDelay is longer than we expect the test to run.
|
||||
longDelay = time.Minute
|
||||
// shortDelay is short enough for the test to execute quickly, but long
|
||||
// enough to reasonably delay the enqueuing of an item.
|
||||
shortDelay = 50 * time.Millisecond
|
||||
|
||||
// time we allow the queue length checker to keep polling the
|
||||
// workqueue.
|
||||
queueCheckTimeout = shortDelay + 500*time.Millisecond
|
||||
)
|
||||
|
||||
t.Cleanup(ClearAll)
|
||||
|
||||
impl := NewImplWithStats(&NopReconciler{}, TestLogger(t), "Testing", &FakeStatsReporter{})
|
||||
t.Cleanup(func() {
|
||||
impl.WorkQueue().ShutDown()
|
||||
})
|
||||
|
||||
// Enqueue two items with a long delay.
|
||||
impl.EnqueueAfter(&Resource{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "for",
|
||||
Namespace: "waiting",
|
||||
},
|
||||
}, 5*time.Second)
|
||||
}, longDelay)
|
||||
impl.EnqueueAfter(&Resource{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "waterfall",
|
||||
Namespace: "the",
|
||||
},
|
||||
}, 500*time.Millisecond)
|
||||
}, longDelay)
|
||||
|
||||
// Enqueue one item with a short delay.
|
||||
enqueueTime := time.Now()
|
||||
impl.EnqueueAfter(&Resource{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "to",
|
||||
Namespace: "fall",
|
||||
Name: "fall",
|
||||
Namespace: "to",
|
||||
},
|
||||
}, 20*time.Second)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if got, want := impl.WorkQueue().Len(), 0; got != want {
|
||||
t.Errorf("|Queue| = %d, want: %d", got, want)
|
||||
}, shortDelay)
|
||||
|
||||
// Keep checking the queue length until 'to/fall' gets enqueued, send to channel to indicate success.
|
||||
queuePopulated := make(chan struct{})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), queueCheckTimeout)
|
||||
|
||||
t.Cleanup(func() {
|
||||
close(queuePopulated)
|
||||
cancel()
|
||||
})
|
||||
|
||||
var successCheck wait.ConditionFunc = func() (bool, error) {
|
||||
if impl.WorkQueue().Len() > 0 {
|
||||
queuePopulated <- struct{}{}
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
// Sleep the remaining time.
|
||||
time.Sleep(time.Second)
|
||||
if got, want := impl.WorkQueue().Len(), 1; got != want {
|
||||
t.Errorf("|Queue| = %d, want: %d", got, want)
|
||||
go wait.PollImmediateUntil(5*time.Millisecond, successCheck, ctx.Done())
|
||||
|
||||
select {
|
||||
case <-queuePopulated:
|
||||
if enqueueDelay := time.Since(enqueueTime); enqueueDelay < shortDelay {
|
||||
t.Errorf("Item enqueued within %v, expected at least a %v delay", enqueueDelay, shortDelay)
|
||||
}
|
||||
if got, want := impl.WorkQueue().Len(), 1; got != want {
|
||||
t.Errorf("|Queue| = %d, want: %d", got, want)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
t.Fatal("Timed out waiting for item to be put onto the workqueue")
|
||||
}
|
||||
|
||||
impl.WorkQueue().ShutDown()
|
||||
if got, want := drainWorkQueue(impl.WorkQueue()), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) {
|
||||
t.Errorf("Queue = %v, want: %v, diff: %s", got, want, cmp.Diff(got, want))
|
||||
|
||||
got, want := drainWorkQueue(impl.WorkQueue()), []types.NamespacedName{{Namespace: "to", Name: "fall"}}
|
||||
if diff := cmp.Diff(want, got); diff != "" {
|
||||
t.Errorf("Unexpected workqueue state (-:expect, +:got):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnqeueKeyAfter(t *testing.T) {
|
||||
func TestEnqueueKeyAfter(t *testing.T) {
|
||||
const (
|
||||
// longDelay is longer than we expect the test to run.
|
||||
longDelay = time.Minute
|
||||
// shortDelay is short enough for the test to execute quickly, but long
|
||||
// enough to reasonably delay the enqueuing of an item.
|
||||
shortDelay = 50 * time.Millisecond
|
||||
)
|
||||
|
||||
t.Cleanup(ClearAll)
|
||||
|
||||
impl := NewImplWithStats(&NopReconciler{}, TestLogger(t), "Testing", &FakeStatsReporter{})
|
||||
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "waiting", Name: "for"}, 5*time.Second)
|
||||
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "the", Name: "waterfall"}, 500*time.Millisecond)
|
||||
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "to", Name: "fall"}, 20*time.Second)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if got, want := impl.WorkQueue().Len(), 0; got != want {
|
||||
t.Errorf("|Queue| = %d, want: %d", got, want)
|
||||
t.Cleanup(func() {
|
||||
impl.WorkQueue().ShutDown()
|
||||
})
|
||||
|
||||
// Enqueue two items with a long delay.
|
||||
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "waiting", Name: "for"}, longDelay)
|
||||
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "the", Name: "waterfall"}, longDelay)
|
||||
|
||||
// Enqueue one item with a short delay.
|
||||
enqueueTime := time.Now()
|
||||
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "to", Name: "fall"}, shortDelay)
|
||||
|
||||
// Keep checking the queue length until 'to/fall' gets enqueued, send to channel to indicate success.
|
||||
queuePopulated := make(chan struct{})
|
||||
|
||||
const queueCheckTimeout = shortDelay + 500*time.Millisecond
|
||||
ctx, cancel := context.WithTimeout(context.Background(), queueCheckTimeout)
|
||||
|
||||
t.Cleanup(func() {
|
||||
close(queuePopulated)
|
||||
cancel()
|
||||
})
|
||||
|
||||
var successCheck wait.ConditionFunc = func() (bool, error) {
|
||||
if impl.WorkQueue().Len() > 0 {
|
||||
queuePopulated <- struct{}{}
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
// Sleep the remaining time.
|
||||
time.Sleep(time.Second)
|
||||
if got, want := impl.WorkQueue().Len(), 1; got != want {
|
||||
t.Errorf("|Queue| = %d, want: %d", got, want)
|
||||
go wait.PollImmediateUntil(5*time.Millisecond, successCheck, ctx.Done())
|
||||
|
||||
select {
|
||||
case <-queuePopulated:
|
||||
if enqueueDelay := time.Since(enqueueTime); enqueueDelay < shortDelay {
|
||||
t.Errorf("Item enqueued within %v, expected at least a %v delay", enqueueDelay, shortDelay)
|
||||
}
|
||||
if got, want := impl.WorkQueue().Len(), 1; got != want {
|
||||
t.Errorf("|Queue| = %d, want: %d", got, want)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
t.Fatal("Timed out waiting for item to be put onto the workqueue")
|
||||
}
|
||||
|
||||
impl.WorkQueue().ShutDown()
|
||||
if got, want := drainWorkQueue(impl.WorkQueue()), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) {
|
||||
t.Errorf("Queue = %v, want: %v, diff: %s", got, want, cmp.Diff(got, want))
|
||||
|
||||
got, want := drainWorkQueue(impl.WorkQueue()), []types.NamespacedName{{Namespace: "to", Name: "fall"}}
|
||||
if diff := cmp.Diff(want, got); diff != "" {
|
||||
t.Errorf("Unexpected workqueue state (-:expect, +:got):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1090,17 +1177,19 @@ func (er *ErrorReconciler) Reconcile(context.Context, string) error {
|
|||
}
|
||||
|
||||
func TestStartAndShutdownWithErroringWork(t *testing.T) {
|
||||
const testTimeout = 500 * time.Millisecond
|
||||
|
||||
item := types.NamespacedName{Namespace: "", Name: "bar"}
|
||||
|
||||
t.Cleanup(ClearAll)
|
||||
r := &ErrorReconciler{}
|
||||
reporter := &FakeStatsReporter{}
|
||||
impl := NewImplWithStats(r, TestLogger(t), "Testing", reporter)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
impl := NewImplWithStats(&ErrorReconciler{}, TestLogger(t), "Testing", &FakeStatsReporter{})
|
||||
impl.EnqueueKey(item)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
impl.EnqueueKey(types.NamespacedName{Namespace: "", Name: "bar"})
|
||||
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
// StartAll blocks until all the worker threads finish, which shouldn't
|
||||
|
@ -1108,33 +1197,33 @@ func TestStartAndShutdownWithErroringWork(t *testing.T) {
|
|||
StartAll(ctx, impl)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
// We don't expect completion before the context is cancelled,
|
||||
// but the workers should spin on the erroring work.
|
||||
// Keep checking the number of requeues, send to channel to indicate success.
|
||||
itemRequeued := make(chan struct{})
|
||||
defer close(itemRequeued)
|
||||
|
||||
case <-doneCh:
|
||||
t.Error("StartAll finished early.")
|
||||
var successCheck wait.ConditionFunc = func() (bool, error) {
|
||||
// Check that the work was requeued in RateLimiter, as NumRequeues
|
||||
// can't fully reflect the real state of queue length.
|
||||
// Here we need to wait for NumRequeues to be more than 1, to ensure
|
||||
// the key get re-queued and reprocessed as expect.
|
||||
if impl.WorkQueue().NumRequeues(item) > 1 {
|
||||
itemRequeued <- struct{}{}
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// By cancelling the context all the workers should complete and
|
||||
// we should close the doneCh.
|
||||
cancel()
|
||||
go wait.PollImmediateUntil(5*time.Millisecond, successCheck, ctx.Done())
|
||||
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Error("Timed out waiting for controller to finish.")
|
||||
case <-doneCh:
|
||||
// We expect any outstanding work to complete, for the worker
|
||||
// threads to complete and for doneCh to close in a timely manner.
|
||||
}
|
||||
case <-itemRequeued:
|
||||
// shut down reconciler
|
||||
cancel()
|
||||
|
||||
// Check that the work was requeued in RateLimiter.
|
||||
// As NumRequeues can't fully reflect the real state of queue length.
|
||||
// Here we need to wait for NumRequeues to be more than 1, to ensure
|
||||
// the key get re-queued and reprocessed as expect.
|
||||
if got, wantAtLeast := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "", Name: "bar"}), 2; got < wantAtLeast {
|
||||
t.Errorf("Requeue count = %v, wanted at least %v", got, wantAtLeast)
|
||||
case <-doneCh:
|
||||
t.Fatal("StartAll finished early")
|
||||
|
||||
case <-ctx.Done():
|
||||
t.Fatal("Timed out waiting for item to be requeued")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1254,9 +1343,10 @@ func TestImplGlobalResync(t *testing.T) {
|
|||
impl.GlobalResync(&dummyInformer{})
|
||||
|
||||
// The global resync delays enqueuing things by a second with a jitter that
|
||||
// goes up to len(dummyObjs) times a second.
|
||||
// goes up to len(dummyObjs) times a second: time.Duration(1+len(dummyObjs)) * time.Second.
|
||||
// In this test, the fast lane is empty, so we can assume immediate enqueuing.
|
||||
select {
|
||||
case <-time.After((1 + 3) * time.Second):
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// We don't expect completion before the context is cancelled.
|
||||
case <-doneCh:
|
||||
t.Error("StartAll finished early.")
|
||||
|
@ -1361,10 +1451,10 @@ func TestStartInformersEventualSuccess(t *testing.T) {
|
|||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
t.Errorf("Unexpected send on errCh: %v", err)
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Wait a brief period to ensure nothing is sent.
|
||||
case err := <-errCh:
|
||||
t.Fatal("Unexpected send on errCh:", err)
|
||||
}
|
||||
|
||||
// Let the Sync complete.
|
||||
|
@ -1392,10 +1482,10 @@ func TestStartInformersFailure(t *testing.T) {
|
|||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
t.Errorf("Unexpected send on errCh: %v", err)
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Wait a brief period to ensure nothing is sent.
|
||||
case err := <-errCh:
|
||||
t.Fatal("Unexpected send on errCh:", err)
|
||||
}
|
||||
|
||||
// Now close the stopCh and we should see an error sent.
|
||||
|
@ -1448,10 +1538,10 @@ func TestRunInformersEventualSuccess(t *testing.T) {
|
|||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
t.Fatalf("Unexpected send on errCh: %v", err)
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Wait a brief period to ensure nothing is sent.
|
||||
case err := <-errCh:
|
||||
t.Fatal("Unexpected send on errCh:", err)
|
||||
}
|
||||
|
||||
// Let the Sync complete.
|
||||
|
@ -1482,10 +1572,10 @@ func TestRunInformersFailure(t *testing.T) {
|
|||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
t.Errorf("Unexpected send on errCh: %v", err)
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Wait a brief period to ensure nothing is sent.
|
||||
case err := <-errCh:
|
||||
t.Fatal("Unexpected send on errCh:", err)
|
||||
}
|
||||
|
||||
// Now close the stopCh and we should see an error sent.
|
||||
|
|
Loading…
Reference in New Issue