Use two lane queue instead of the regular workqueue (#1514)

* Use two lane queue instead of the regular workqueue

- we need to poll for len in the webhook tests because we have async propagation now, and check at the wrong time will be not correct.
- otherwise just a drop in replacement.

* update test

* cmt

* tests hardened
This commit is contained in:
Victor Agababov 2020-07-19 14:01:34 -07:00 committed by GitHub
parent e193c4be24
commit 1cea86c85f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 43 additions and 18 deletions

View File

@ -213,12 +213,9 @@ func NewImpl(r Reconciler, logger *zap.SugaredLogger, workQueueName string) *Imp
func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName string, reporter StatsReporter) *Impl { func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName string, reporter StatsReporter) *Impl {
logger = logger.Named(workQueueName) logger = logger.Named(workQueueName)
return &Impl{ return &Impl{
Name: workQueueName, Name: workQueueName,
Reconciler: r, Reconciler: r,
WorkQueue: workqueue.NewNamedRateLimitingQueue( WorkQueue: newTwoLaneWorkQueue(workQueueName),
workqueue.DefaultControllerRateLimiter(),
workQueueName,
),
logger: logger, logger: logger,
statsReporter: reporter, statsReporter: reporter,
} }

View File

@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientgotesting "k8s.io/client-go/testing" clientgotesting "k8s.io/client-go/testing"
"knative.dev/pkg/configmap" "knative.dev/pkg/configmap"
"knative.dev/pkg/controller" "knative.dev/pkg/controller"
@ -42,7 +43,10 @@ import (
) )
func TestReconcile(t *testing.T) { func TestReconcile(t *testing.T) {
secretName, serviceName := "webhook-secret", "webhook-service" const (
secretName = "webhook-secret"
serviceName = "webhook-service"
)
secret, err := certresources.MakeSecret(context.Background(), secret, err := certresources.MakeSecret(context.Background(),
secretName, system.Namespace(), serviceName) secretName, system.Namespace(), serviceName)
if err != nil { if err != nil {
@ -238,13 +242,16 @@ func TestNew(t *testing.T) {
t.Errorf("Promote() = %v", err) t.Errorf("Promote() = %v", err)
} }
if want, got := 1, c.WorkQueue.Len(); want != got { // Queue has async moving parts so if we check at the wrong moment, this might still be 0.
t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want) if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return c.WorkQueue.Len() == 1, nil
}) != nil {
t.Error("Queue length was never 1")
} }
} }
func secretWithCertData(t *testing.T, expiration time.Time) *corev1.Secret { func secretWithCertData(t *testing.T, expiration time.Time) *corev1.Secret {
secretName := "webhook-secret" const secretName = "webhook-secret"
serverKey, serverCert, caCert, err := certresources.CreateCerts(context.Background(), "webhook-service", system.Namespace(), expiration) serverKey, serverCert, caCert, err := certresources.CreateCerts(context.Background(), "webhook-service", system.Namespace(), expiration)
if err != nil { if err != nil {
t.Fatalf("Failed to create cert: %v", err) t.Fatalf("Failed to create cert: %v", err)

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"reflect" "reflect"
"testing" "testing"
"time"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake" kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake" _ "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake"
@ -30,6 +31,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientgotesting "k8s.io/client-go/testing" clientgotesting "k8s.io/client-go/testing"
"knative.dev/pkg/configmap" "knative.dev/pkg/configmap"
"knative.dev/pkg/controller" "knative.dev/pkg/controller"
@ -338,7 +340,10 @@ func TestNew(t *testing.T) {
t.Errorf("Promote() = %v", err) t.Errorf("Promote() = %v", err)
} }
if want, got := 1, c.WorkQueue.Len(); want != got { // Queue has async moving parts so if we check at the wrong moment, this might still be 0.
t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want) if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return c.WorkQueue.Len() == 1, nil
}) != nil {
t.Error("Queue length was never 1")
} }
} }

View File

@ -19,6 +19,7 @@ package defaulting
import ( import (
"context" "context"
"testing" "testing"
"time"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake" kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake" _ "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake"
@ -30,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientgotesting "k8s.io/client-go/testing" clientgotesting "k8s.io/client-go/testing"
"knative.dev/pkg/configmap" "knative.dev/pkg/configmap"
"knative.dev/pkg/controller" "knative.dev/pkg/controller"
@ -368,7 +370,10 @@ func TestNew(t *testing.T) {
t.Errorf("Promote() = %v", err) t.Errorf("Promote() = %v", err)
} }
if want, got := 1, c.WorkQueue.Len(); want != got { // Queue has async moving parts so if we check at the wrong moment, this might still be 0.
t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want) if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return c.WorkQueue.Len() == 1, nil
}) != nil {
t.Error("Queue length was never 1")
} }
} }

View File

@ -19,6 +19,7 @@ package validation
import ( import (
"context" "context"
"testing" "testing"
"time"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake" kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake" _ "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake"
@ -30,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientgotesting "k8s.io/client-go/testing" clientgotesting "k8s.io/client-go/testing"
"knative.dev/pkg/configmap" "knative.dev/pkg/configmap"
"knative.dev/pkg/controller" "knative.dev/pkg/controller"
@ -369,7 +371,11 @@ func TestNew(t *testing.T) {
t.Errorf("Promote() = %v", err) t.Errorf("Promote() = %v", err)
} }
if want, got := 1, c.WorkQueue.Len(); want != got { // Queue has async moving parts so if we check at the wrong moment, thist might still be 0.
t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want) if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return c.WorkQueue.Len() == 1, nil
}) != nil {
t.Error("Queue length was never 1")
} }
} }

View File

@ -21,6 +21,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"testing" "testing"
"time"
// Injection stuff // Injection stuff
_ "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/client/fake"
@ -35,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
fakekubeclientset "k8s.io/client-go/kubernetes/fake" fakekubeclientset "k8s.io/client-go/kubernetes/fake"
"knative.dev/pkg/apis" "knative.dev/pkg/apis"
"knative.dev/pkg/system" "knative.dev/pkg/system"
@ -659,8 +661,11 @@ func NewTestResourceAdmissionController(t *testing.T) *reconciler {
t.Errorf("Promote() = %v", err) t.Errorf("Promote() = %v", err)
} }
if want, got := 1, c.WorkQueue.Len(); want != got { // Queue has async moving parts so if we check at the wrong moment, this might still be 0.
t.Errorf("WorkQueue.Len() = %d, wanted %d", got, want) if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return c.WorkQueue.Len() == 1, nil
}) != nil {
t.Error("Queue length was never 1")
} }
return c.Reconciler.(*reconciler) return c.Reconciler.(*reconciler)