Fix the flaky test. (#1632)

The test assumes the threads would schedule in particular way, but they don't.
But what we really care to check is that we thread in the proper RL and it works.
We don't need to check that underlying queue impl works, that's done in its own tests.
So just verify these two things.
This commit is contained in:
Victor Agababov 2020-08-18 17:06:13 -07:00 committed by GitHub
parent c53747eef4
commit c30ec2ffd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 37 deletions

View File

@ -38,7 +38,6 @@ func TestNewStatsReporterErrors(t *testing.T) {
if err == nil { if err == nil {
t.Errorf("Expected err to not be nil for value %q, got nil", v) t.Errorf("Expected err to not be nil for value %q, got nil", v)
} }
} }
} }

View File

@ -29,64 +29,60 @@ type chanRateLimiter struct {
t *testing.T t *testing.T
// Called when this ratelimiter is consulted for when to process a value. // Called when this ratelimiter is consulted for when to process a value.
whenCalled chan interface{} whenCalled chan interface{}
retryCount map[interface{}]int
} }
func (r chanRateLimiter) When(item interface{}) time.Duration { func (r *chanRateLimiter) When(item interface{}) time.Duration {
r.whenCalled <- item r.whenCalled <- item
return 0 * time.Second return 0
} }
func (r chanRateLimiter) Forget(item interface{}) {
func (r *chanRateLimiter) Forget(item interface{}) {
r.t.Fatalf("Forgetting item %+v, we should not be forgetting any items.", item) r.t.Fatalf("Forgetting item %+v, we should not be forgetting any items.", item)
} }
func (r chanRateLimiter) NumRequeues(item interface{}) int {
return r.retryCount[item] func (r *chanRateLimiter) NumRequeues(item interface{}) int {
return 0
} }
var _ workqueue.RateLimiter = &chanRateLimiter{} var _ workqueue.RateLimiter = &chanRateLimiter{}
func TestRateLimit(t *testing.T) { func TestRateLimit(t *testing.T) {
whenCalled := make(chan interface{}, 2) // Verifies that we properly pass the rate limiter to the queue.
rl := chanRateLimiter{ rl := &chanRateLimiter{
t, t: t,
whenCalled, whenCalled: make(chan interface{}, 1),
make(map[interface{}]int),
} }
q := newTwoLaneWorkQueue("live-in-the-limited-lane", rl) q := newTwoLaneWorkQueue("live-in-the-limited-lane", rl)
q.SlowLane().Add("1") // Verify the slow lane has the proper RL.
q.Add("2") q.SlowLane().AddRateLimited("1")
select {
k, done := q.Get() case <-rl.whenCalled:
q.Done(k) // As desired.
q.AddRateLimited(k) default:
rlK := <-whenCalled t.Error("Didn't go to the proper rate limiter.")
if got, want := k.(string), rlK.(string); got != want {
t.Errorf(`Got = %q, want: "%q"`, got, want)
}
if done {
t.Error("The queue is unexpectedly shutdown")
} }
k2, done := q.Get() // Verify the fast lane has the proper RL.
q.Done(k2) q.AddRateLimited("2")
q.AddRateLimited(k2) select {
rlK2 := <-whenCalled case <-rl.whenCalled:
if got, want := k2.(string), rlK2.(string); got != want { // As desired.
t.Errorf(`Got = %q, want: "%q"`, got, want) default:
} t.Error("Didn't go to the proper rate limiter.")
if s1, s2 := k.(string), k2.(string); (s1 != "1" || s2 != "2") && (s1 != "2" || s2 != "1") {
t.Errorf("Expected to see 1 and 2, instead saw %q and %q", s1, s2)
}
if done {
t.Error("The queue is unexpectedly shutdown")
} }
// Queue has async moving parts so if we check at the wrong moment, this might still be 0 or 1. // Verify the items were properly added for consumption.
if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return q.Len() == 2, nil return q.Len() == 2, nil
}) != nil { }) != nil {
t.Error("Queue length was never 2") t.Error("Queue length was never 2")
} }
// And drain.
q.ShutDown()
for q.Len() > 0 {
q.Get()
}
} }
func TestSlowQueue(t *testing.T) { func TestSlowQueue(t *testing.T) {