mirror of https://github.com/knative/pkg.git
1748 lines
42 KiB
Go
1748 lines
42 KiB
Go
/*
|
|
Copyright 2017 The Knative Authors
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package controller
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/go-cmp/cmp"
|
|
|
|
coordinationv1 "k8s.io/api/coordination/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
fakekube "k8s.io/client-go/kubernetes/fake"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/client-go/util/workqueue"
|
|
|
|
"knative.dev/pkg/leaderelection"
|
|
"knative.dev/pkg/ptr"
|
|
"knative.dev/pkg/reconciler"
|
|
"knative.dev/pkg/system"
|
|
|
|
. "knative.dev/pkg/controller/testing"
|
|
. "knative.dev/pkg/logging/testing"
|
|
_ "knative.dev/pkg/system/testing"
|
|
. "knative.dev/pkg/testing"
|
|
)
|
|
|
|
const (
|
|
oldObj = "foo"
|
|
newObj = "bar"
|
|
)
|
|
|
|
func TestPassNew(t *testing.T) {
|
|
PassNew(func(got interface{}) {
|
|
if newObj != got.(string) {
|
|
t.Errorf("PassNew() = %v, wanted %v", got, newObj)
|
|
}
|
|
})(oldObj, newObj)
|
|
}
|
|
|
|
func TestHandleAll(t *testing.T) {
|
|
ha := HandleAll(func(got interface{}) {
|
|
if newObj != got.(string) {
|
|
t.Errorf("HandleAll() = %v, wanted %v", got, newObj)
|
|
}
|
|
})
|
|
|
|
ha.OnAdd(newObj, false)
|
|
ha.OnUpdate(oldObj, newObj)
|
|
ha.OnDelete(newObj)
|
|
}
|
|
|
|
var gvk = schema.GroupVersionKind{
|
|
Group: "pkg.knative.dev",
|
|
Version: "v1meta1",
|
|
Kind: "Parent",
|
|
}
|
|
|
|
func TestFilterWithNameAndNamespace(t *testing.T) {
|
|
filter := FilterWithNameAndNamespace("test-namespace", "test-name")
|
|
|
|
tests := []struct {
|
|
name string
|
|
input interface{}
|
|
want bool
|
|
}{{
|
|
name: "not a metav1.Object",
|
|
input: "foo",
|
|
}, {
|
|
name: "nil",
|
|
input: nil,
|
|
}, {
|
|
name: "name matches, namespace does not",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "test-name",
|
|
Namespace: "wrong-namespace",
|
|
},
|
|
},
|
|
}, {
|
|
name: "namespace matches, name does not",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "wrong-name",
|
|
Namespace: "test-namespace",
|
|
},
|
|
},
|
|
}, {
|
|
name: "neither matches",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "wrong-name",
|
|
Namespace: "wrong-namespace",
|
|
},
|
|
},
|
|
}, {
|
|
name: "matches",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "test-name",
|
|
Namespace: "test-namespace",
|
|
},
|
|
},
|
|
want: true,
|
|
}}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
got := filter(test.input)
|
|
if test.want != got {
|
|
t.Errorf("FilterWithNameAndNamespace() = %v, wanted %v", got, test.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFilterWithName(t *testing.T) {
|
|
filter := FilterWithName("test-name")
|
|
|
|
tests := []struct {
|
|
name string
|
|
input interface{}
|
|
want bool
|
|
}{{
|
|
name: "not a metav1.Object",
|
|
input: "foo",
|
|
}, {
|
|
name: "nil",
|
|
input: nil,
|
|
}, {
|
|
name: "name matches, namespace does not",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "test-name",
|
|
Namespace: "wrong-namespace",
|
|
},
|
|
},
|
|
want: true, // Unlike FilterWithNameAndNamespace this passes
|
|
}, {
|
|
name: "namespace matches, name does not",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "wrong-name",
|
|
Namespace: "test-namespace",
|
|
},
|
|
},
|
|
}, {
|
|
name: "neither matches",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "wrong-name",
|
|
Namespace: "wrong-namespace",
|
|
},
|
|
},
|
|
}, {
|
|
name: "matches",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "test-name",
|
|
Namespace: "test-namespace",
|
|
},
|
|
},
|
|
want: true,
|
|
}}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
got := filter(test.input)
|
|
if test.want != got {
|
|
t.Errorf("FilterWithNameAndNamespace() = %v, wanted %v", got, test.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFilterGroupKind(t *testing.T) {
|
|
filter := FilterGroupKind(gvk.GroupKind())
|
|
|
|
tests := []struct {
|
|
name string
|
|
input interface{}
|
|
want bool
|
|
}{{
|
|
name: "not a metav1.Object",
|
|
input: "foo",
|
|
}, {
|
|
name: "nil",
|
|
input: nil,
|
|
}, {
|
|
name: "no owner reference",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
},
|
|
},
|
|
}, {
|
|
name: "wrong owner reference, not controller",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: "another.knative.dev/v1beta3",
|
|
Kind: "Parent",
|
|
Controller: ptr.Bool(false),
|
|
}},
|
|
},
|
|
},
|
|
}, {
|
|
name: "right owner reference, not controller",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: gvk.GroupVersion().String(),
|
|
Kind: gvk.Kind,
|
|
Controller: ptr.Bool(false),
|
|
}},
|
|
},
|
|
},
|
|
}, {
|
|
name: "wrong owner reference, but controller",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: "another.knative.dev/v1beta3",
|
|
Kind: "Parent",
|
|
Controller: ptr.Bool(true),
|
|
}},
|
|
},
|
|
},
|
|
want: false,
|
|
}, {
|
|
name: "right owner reference, is controller",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: gvk.GroupVersion().String(),
|
|
Kind: gvk.Kind,
|
|
Controller: ptr.Bool(true),
|
|
}},
|
|
},
|
|
},
|
|
want: true,
|
|
}, {
|
|
name: "right owner reference, is controller, different version",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: schema.GroupVersion{Group: gvk.Group, Version: "other"}.String(),
|
|
Kind: gvk.Kind,
|
|
Controller: ptr.Bool(true),
|
|
}},
|
|
},
|
|
},
|
|
want: true,
|
|
}}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
got := filter(test.input)
|
|
if test.want != got {
|
|
t.Errorf("Filter() = %v, wanted %v", got, test.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFilterGroupVersionKind(t *testing.T) {
|
|
filter := FilterGroupVersionKind(gvk)
|
|
|
|
tests := []struct {
|
|
name string
|
|
input interface{}
|
|
want bool
|
|
}{{
|
|
name: "not a metav1.Object",
|
|
input: "foo",
|
|
}, {
|
|
name: "nil",
|
|
input: nil,
|
|
}, {
|
|
name: "no owner reference",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
},
|
|
},
|
|
}, {
|
|
name: "wrong owner reference, not controller",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: "another.knative.dev/v1beta3",
|
|
Kind: "Parent",
|
|
Controller: ptr.Bool(false),
|
|
}},
|
|
},
|
|
},
|
|
}, {
|
|
name: "right owner reference, not controller",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: gvk.GroupVersion().String(),
|
|
Kind: gvk.Kind,
|
|
Controller: ptr.Bool(false),
|
|
}},
|
|
},
|
|
},
|
|
}, {
|
|
name: "wrong owner reference, but controller",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: "another.knative.dev/v1beta3",
|
|
Kind: "Parent",
|
|
Controller: ptr.Bool(true),
|
|
}},
|
|
},
|
|
},
|
|
}, {
|
|
name: "right owner reference, is controller",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: gvk.GroupVersion().String(),
|
|
Kind: gvk.Kind,
|
|
Controller: ptr.Bool(true),
|
|
}},
|
|
},
|
|
},
|
|
want: true,
|
|
}, {
|
|
name: "right owner reference, is controller, wrong version",
|
|
input: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: schema.GroupVersion{Group: gvk.Group, Version: "other"}.String(),
|
|
Kind: gvk.Kind,
|
|
Controller: ptr.Bool(true),
|
|
}},
|
|
},
|
|
},
|
|
}}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
got := filter(test.input)
|
|
if test.want != got {
|
|
t.Errorf("Filter() = %v, wanted %v", got, test.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
type nopReconciler struct{}
|
|
|
|
func (nr *nopReconciler) Reconcile(context.Context, string) error {
|
|
return nil
|
|
}
|
|
|
|
type testRateLimiter struct {
|
|
t *testing.T
|
|
delay time.Duration
|
|
}
|
|
|
|
func (t testRateLimiter) When(interface{}) time.Duration { return t.delay }
|
|
func (t testRateLimiter) Forget(interface{}) {}
|
|
func (t testRateLimiter) NumRequeues(interface{}) int { return 0 }
|
|
|
|
var _ workqueue.TypedRateLimiter[any] = (*testRateLimiter)(nil)
|
|
|
|
func TestEnqueue(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
work func(*Impl)
|
|
wantQueue []types.NamespacedName
|
|
}{{
|
|
name: "do nothing",
|
|
work: func(*Impl) {},
|
|
}, {
|
|
name: "enqueue key",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}},
|
|
}, {
|
|
name: "enqueue duplicate key",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
|
|
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
|
|
},
|
|
// The queue deduplicates.
|
|
wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}},
|
|
}, {
|
|
name: "enqueue different keys",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
|
|
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "baz"})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}, {Namespace: "foo", Name: "baz"}},
|
|
}, {
|
|
name: "enqueue resource",
|
|
work: func(impl *Impl) {
|
|
impl.Enqueue(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "foo"}},
|
|
}, {
|
|
name: "enqueue resource slow",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueSlow(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "foo"}},
|
|
}, {
|
|
name: "enqueue sentinel resource",
|
|
work: func(impl *Impl) {
|
|
e := impl.EnqueueSentinel(types.NamespacedName{Namespace: "foo", Name: "bar"})
|
|
e(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "foo",
|
|
Name: "baz",
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}},
|
|
}, {
|
|
name: "enqueue duplicate sentinel resource",
|
|
work: func(impl *Impl) {
|
|
e := impl.EnqueueSentinel(types.NamespacedName{Namespace: "foo", Name: "bar"})
|
|
e(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "foo",
|
|
Name: "baz-1",
|
|
},
|
|
})
|
|
e(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "foo",
|
|
Name: "baz-2",
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}},
|
|
}, {
|
|
name: "enqueue bad resource",
|
|
work: func(impl *Impl) {
|
|
impl.Enqueue("baz/blah")
|
|
},
|
|
}, {
|
|
name: "enqueue controller of bad resource",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueControllerOf("baz/blah")
|
|
},
|
|
}, {
|
|
name: "enqueue controller of resource without owner",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueControllerOf(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
},
|
|
})
|
|
},
|
|
}, {
|
|
name: "enqueue controller of resource with owner",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueControllerOf(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: gvk.GroupVersion().String(),
|
|
Kind: gvk.Kind,
|
|
Name: "baz",
|
|
Controller: ptr.Bool(true),
|
|
}},
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "baz"}},
|
|
}, {
|
|
name: "enqueue controller of deleted resource with owner",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueControllerOf(cache.DeletedFinalStateUnknown{
|
|
Key: "foo/bar",
|
|
Obj: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
OwnerReferences: []metav1.OwnerReference{{
|
|
APIVersion: gvk.GroupVersion().String(),
|
|
Kind: gvk.Kind,
|
|
Name: "baz",
|
|
Controller: ptr.Bool(true),
|
|
}},
|
|
},
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "baz"}},
|
|
}, {
|
|
name: "enqueue controller of deleted bad resource",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueControllerOf(cache.DeletedFinalStateUnknown{
|
|
Key: "foo/bar",
|
|
Obj: "bad-resource",
|
|
})
|
|
},
|
|
}, {
|
|
name: "enqueue label of namespaced resource bad resource",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfNamespaceScopedResource("test-ns", "test-name")("baz/blah")
|
|
},
|
|
}, {
|
|
name: "enqueue label of namespaced resource without label",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfNamespaceScopedResource("ns-key", "name-key")(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
Labels: map[string]string{
|
|
"ns-key": "bar",
|
|
},
|
|
},
|
|
})
|
|
},
|
|
}, {
|
|
name: "enqueue label of namespaced resource without namespace label",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfNamespaceScopedResource("ns-key", "name-key")(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
Labels: map[string]string{
|
|
"name-key": "baz",
|
|
},
|
|
},
|
|
})
|
|
},
|
|
}, {
|
|
name: "enqueue label of namespaced resource with labels",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfNamespaceScopedResource("ns-key", "name-key")(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
Labels: map[string]string{
|
|
"ns-key": "qux",
|
|
"name-key": "baz",
|
|
},
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "qux", Name: "baz"}},
|
|
}, {
|
|
name: "enqueue label of namespaced resource with empty namespace label",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfNamespaceScopedResource("", "name-key")(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
Labels: map[string]string{
|
|
"name-key": "baz",
|
|
},
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "baz"}},
|
|
}, {
|
|
name: "enqueue label of deleted namespaced resource with label",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfNamespaceScopedResource("ns-key", "name-key")(cache.DeletedFinalStateUnknown{
|
|
Key: "foo/bar",
|
|
Obj: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
Labels: map[string]string{
|
|
"ns-key": "qux",
|
|
"name-key": "baz",
|
|
},
|
|
},
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "qux", Name: "baz"}},
|
|
}, {
|
|
name: "enqueue label of deleted bad namespaced resource",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfNamespaceScopedResource("ns-key", "name-key")(cache.DeletedFinalStateUnknown{
|
|
Key: "foo/bar",
|
|
Obj: "bad-resource",
|
|
})
|
|
},
|
|
}, {
|
|
name: "enqueue label of cluster scoped resource bad resource",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfClusterScopedResource("name-key")("baz")
|
|
},
|
|
}, {
|
|
name: "enqueue label of cluster scoped resource without label",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfClusterScopedResource("name-key")(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
Labels: map[string]string{},
|
|
},
|
|
})
|
|
},
|
|
}, {
|
|
name: "enqueue label of cluster scoped resource with label",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfClusterScopedResource("name-key")(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
Labels: map[string]string{
|
|
"name-key": "baz",
|
|
},
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "", Name: "baz"}},
|
|
}, {
|
|
name: "enqueue label of deleted cluster scoped resource with label",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfClusterScopedResource("name-key")(cache.DeletedFinalStateUnknown{
|
|
Key: "foo/bar",
|
|
Obj: &Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
Labels: map[string]string{
|
|
"name-key": "baz",
|
|
},
|
|
},
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Namespace: "", Name: "baz"}},
|
|
}, {
|
|
name: "enqueue namespace of object",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueNamespaceOf(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
},
|
|
})
|
|
},
|
|
wantQueue: []types.NamespacedName{{Name: "bar"}},
|
|
}, {
|
|
name: "enqueue label of deleted bad cluster scoped resource",
|
|
work: func(impl *Impl) {
|
|
impl.EnqueueLabelOfClusterScopedResource("name-key")(cache.DeletedFinalStateUnknown{
|
|
Key: "bar",
|
|
Obj: "bad-resource",
|
|
})
|
|
},
|
|
}}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
var rl workqueue.TypedRateLimiter[any] = testRateLimiter{t, 100 * time.Millisecond}
|
|
impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{WorkQueueName: "Testing", Logger: TestLogger(t), RateLimiter: rl})
|
|
test.work(impl)
|
|
|
|
impl.WorkQueue().ShutDown()
|
|
gotQueue := drainWorkQueue(impl.WorkQueue())
|
|
|
|
if diff := cmp.Diff(test.wantQueue, gotQueue); diff != "" {
|
|
t.Error("unexpected queue (-want +got):", diff)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
const (
|
|
// longDelay is longer than we expect the test to run.
|
|
longDelay = time.Minute
|
|
// shortDelay is short enough for the test to execute quickly, but long
|
|
// enough to reasonably delay the enqueuing of an item.
|
|
shortDelay = 50 * time.Millisecond
|
|
|
|
// time we allow the queue length checker to keep polling the
|
|
// workqueue.
|
|
queueCheckTimeout = shortDelay + 500*time.Millisecond
|
|
)
|
|
|
|
func pollQ(q workqueue.TypedRateLimitingInterface[any], sig chan int) func(context.Context) (bool, error) {
|
|
return func(context.Context) (bool, error) {
|
|
if ql := q.Len(); ql > 0 {
|
|
sig <- ql
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
func TestEnqueueAfter(t *testing.T) {
|
|
impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{
|
|
Logger: TestLogger(t),
|
|
WorkQueueName: "Testing",
|
|
Reporter: &FakeStatsReporter{},
|
|
})
|
|
|
|
t.Cleanup(func() {
|
|
impl.WorkQueue().ShutDown()
|
|
})
|
|
|
|
// Enqueue two items with a long delay.
|
|
impl.EnqueueAfter(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "for",
|
|
Namespace: "waiting",
|
|
},
|
|
}, longDelay)
|
|
impl.EnqueueAfter(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "waterfall",
|
|
Namespace: "the",
|
|
},
|
|
}, longDelay)
|
|
|
|
// Enqueue one item with a short delay.
|
|
enqueueTime := time.Now()
|
|
impl.EnqueueAfter(&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "fall",
|
|
Namespace: "to",
|
|
},
|
|
}, shortDelay)
|
|
|
|
// Keep checking the queue length until 'to/fall' gets enqueued, send to channel to indicate success.
|
|
queuePopulated := make(chan int)
|
|
ctx, cancel := context.WithTimeout(context.Background(), queueCheckTimeout)
|
|
|
|
t.Cleanup(func() {
|
|
close(queuePopulated)
|
|
cancel()
|
|
})
|
|
|
|
go wait.PollUntilContextCancel(ctx, 5*time.Millisecond, true,
|
|
pollQ(impl.WorkQueue(), queuePopulated))
|
|
|
|
select {
|
|
case qlen := <-queuePopulated:
|
|
if enqueueDelay := time.Since(enqueueTime); enqueueDelay < shortDelay {
|
|
t.Errorf("Item enqueued within %v, expected at least a %v delay", enqueueDelay, shortDelay)
|
|
}
|
|
if got, want := qlen, 1; got != want {
|
|
t.Errorf("|Queue| = %d, want: %d", got, want)
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
t.Fatal("Timed out waiting for item to be put onto the workqueue")
|
|
}
|
|
|
|
impl.WorkQueue().ShutDown()
|
|
|
|
got, want := drainWorkQueue(impl.WorkQueue()), []types.NamespacedName{{Namespace: "to", Name: "fall"}}
|
|
if diff := cmp.Diff(want, got); diff != "" {
|
|
t.Errorf("Unexpected workqueue state (-:expect, +:got):\n%s", diff)
|
|
}
|
|
}
|
|
|
|
func TestEnqueueKeyAfter(t *testing.T) {
|
|
impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{
|
|
Logger: TestLogger(t),
|
|
WorkQueueName: "Testing",
|
|
Reporter: &FakeStatsReporter{},
|
|
})
|
|
t.Cleanup(func() {
|
|
impl.WorkQueue().ShutDown()
|
|
})
|
|
|
|
// Enqueue two items with a long delay.
|
|
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "waiting", Name: "for"}, longDelay)
|
|
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "the", Name: "waterfall"}, longDelay)
|
|
|
|
// Enqueue one item with a short delay.
|
|
enqueueTime := time.Now()
|
|
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "to", Name: "fall"}, shortDelay)
|
|
|
|
// Keep checking the queue length until 'to/fall' gets enqueued, send to channel to indicate success.
|
|
queuePopulated := make(chan int)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), queueCheckTimeout)
|
|
|
|
t.Cleanup(func() {
|
|
close(queuePopulated)
|
|
cancel()
|
|
})
|
|
|
|
go wait.PollUntilContextCancel(ctx, 5*time.Millisecond, true,
|
|
pollQ(impl.WorkQueue(), queuePopulated))
|
|
|
|
select {
|
|
case qlen := <-queuePopulated:
|
|
if enqueueDelay := time.Since(enqueueTime); enqueueDelay < shortDelay {
|
|
t.Errorf("Item enqueued within %v, expected at least a %v delay", enqueueDelay, shortDelay)
|
|
}
|
|
if got, want := qlen, 1; got != want {
|
|
t.Errorf("|Queue| = %d, want: %d", got, want)
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
t.Fatal("Timed out waiting for item to be put onto the workqueue")
|
|
}
|
|
|
|
impl.WorkQueue().ShutDown()
|
|
|
|
got, want := drainWorkQueue(impl.WorkQueue()), []types.NamespacedName{{Namespace: "to", Name: "fall"}}
|
|
if diff := cmp.Diff(want, got); diff != "" {
|
|
t.Errorf("Unexpected workqueue state (-:expect, +:got):\n%s", diff)
|
|
}
|
|
}
|
|
|
|
type CountingReconciler struct {
|
|
count atomic.Int32
|
|
}
|
|
|
|
func (cr *CountingReconciler) Reconcile(context.Context, string) error {
|
|
cr.count.Add(1)
|
|
return nil
|
|
}
|
|
|
|
func TestStartAndShutdown(t *testing.T) {
|
|
r := &CountingReconciler{}
|
|
impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{
|
|
Logger: TestLogger(t),
|
|
WorkQueueName: "Testing",
|
|
Reporter: &FakeStatsReporter{},
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer close(doneCh)
|
|
StartAll(ctx, impl)
|
|
}()
|
|
t.Cleanup(func() {
|
|
cancel()
|
|
<-doneCh
|
|
})
|
|
|
|
select {
|
|
case <-time.After(10 * time.Millisecond):
|
|
// We don't expect completion before the context is cancelled.
|
|
case <-doneCh:
|
|
t.Error("StartAll finished early.")
|
|
}
|
|
cancel()
|
|
|
|
select {
|
|
case <-time.After(time.Second):
|
|
t.Error("Timed out waiting for controller to finish.")
|
|
case <-doneCh:
|
|
// We expect the work to complete.
|
|
}
|
|
|
|
if got, want := r.count.Load(), int32(0); got != want {
|
|
t.Errorf("count = %v, wanted %v", got, want)
|
|
}
|
|
}
|
|
|
|
type countingLeaderAwareReconciler struct {
|
|
reconciler.LeaderAwareFuncs
|
|
|
|
reconcileCount atomic.Int32
|
|
promotionCount atomic.Int32
|
|
}
|
|
|
|
var _ reconciler.LeaderAware = (*countingLeaderAwareReconciler)(nil)
|
|
|
|
func (cr *countingLeaderAwareReconciler) Promote(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
|
|
cr.promotionCount.Add(1)
|
|
return cr.LeaderAwareFuncs.Promote(b, enq)
|
|
}
|
|
|
|
func (cr *countingLeaderAwareReconciler) Reconcile(ctx context.Context, key string) error {
|
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if cr.IsLeaderFor(types.NamespacedName{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
}) {
|
|
cr.reconcileCount.Add(1)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func TestStartAndShutdownWithLeaderAwareNoElection(t *testing.T) {
|
|
r := &countingLeaderAwareReconciler{
|
|
LeaderAwareFuncs: reconciler.LeaderAwareFuncs{
|
|
PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
|
|
t.Error("Promote should not be called when no leader election is enabled.")
|
|
return nil
|
|
},
|
|
},
|
|
}
|
|
|
|
impl := NewContext(context.TODO(), r, ControllerOptions{
|
|
Logger: TestLogger(t),
|
|
WorkQueueName: "Testing",
|
|
Reporter: &FakeStatsReporter{},
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer close(doneCh)
|
|
StartAll(ctx, impl)
|
|
}()
|
|
t.Cleanup(func() {
|
|
cancel()
|
|
<-doneCh
|
|
})
|
|
|
|
select {
|
|
case <-doneCh:
|
|
t.Fatal("StartAll finished early.")
|
|
case <-time.After(10 * time.Second):
|
|
// Give it some time to run.
|
|
}
|
|
|
|
cancel()
|
|
|
|
select {
|
|
case <-time.After(time.Second):
|
|
t.Error("Timed out waiting for controller to finish.")
|
|
case <-doneCh:
|
|
// We expect the work to complete.
|
|
}
|
|
|
|
if got, want := r.reconcileCount.Load(), int32(0); got != want {
|
|
t.Errorf("reconcile count = %v, wanted %v", got, want)
|
|
}
|
|
|
|
// Since the elector can't easily be mocked we assume:
|
|
// 1. We are using the unopposed elector
|
|
// 2. It has a single initial bucket (the Universe)
|
|
//
|
|
// Thus we expect two promotions to occur
|
|
// 1. It provides the initial set of buckets and these are promoted
|
|
// before reconciliation and leader election go routines start
|
|
// 2. When leader election starts the unopposed elector promotes
|
|
// that same bucket again
|
|
if got, want := r.promotionCount.Load(), int32(2); got != want {
|
|
t.Errorf("promotion count = %v, wanted %v", got, want)
|
|
}
|
|
}
|
|
|
|
func TestStartAndShutdownWithLeaderAwareWithLostElection(t *testing.T) {
|
|
promoted := make(chan struct{})
|
|
r := &countingLeaderAwareReconciler{
|
|
LeaderAwareFuncs: reconciler.LeaderAwareFuncs{
|
|
PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
|
|
close(promoted)
|
|
return nil
|
|
},
|
|
},
|
|
}
|
|
cc := leaderelection.ComponentConfig{
|
|
Component: "component",
|
|
LeaseDuration: 15 * time.Second,
|
|
RenewDeadline: 10 * time.Second,
|
|
RetryPeriod: 2 * time.Second,
|
|
}
|
|
kc := fakekube.NewSimpleClientset(
|
|
&coordinationv1.Lease{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: system.Namespace(),
|
|
Name: "component.testing.00-of-01",
|
|
},
|
|
Spec: coordinationv1.LeaseSpec{
|
|
HolderIdentity: ptr.String("not-us"),
|
|
LeaseDurationSeconds: ptr.Int32(3000),
|
|
AcquireTime: &metav1.MicroTime{Time: time.Now()},
|
|
RenewTime: &metav1.MicroTime{Time: time.Now().Add(3000 * time.Second)},
|
|
},
|
|
},
|
|
)
|
|
|
|
impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{
|
|
Logger: TestLogger(t),
|
|
WorkQueueName: "Testing",
|
|
Reporter: &FakeStatsReporter{},
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
ctx = leaderelection.WithStandardLeaderElectorBuilder(ctx, kc, cc)
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer close(doneCh)
|
|
StartAll(ctx, impl)
|
|
}()
|
|
t.Cleanup(func() {
|
|
cancel()
|
|
<-doneCh
|
|
})
|
|
|
|
select {
|
|
case <-promoted:
|
|
t.Fatal("Unexpected promotion.")
|
|
case <-time.After(3 * time.Second):
|
|
// Wait for 3 seconds for good measure.
|
|
case <-doneCh:
|
|
t.Error("StartAll finished early.")
|
|
}
|
|
|
|
cancel()
|
|
|
|
select {
|
|
case <-time.After(time.Second):
|
|
t.Error("Timed out waiting for controller to finish.")
|
|
case <-doneCh:
|
|
// We expect the work to complete.
|
|
}
|
|
|
|
if got, want := r.reconcileCount.Load(), int32(0); got != want {
|
|
t.Errorf("reconcile count = %v, wanted %v", got, want)
|
|
}
|
|
}
|
|
|
|
func TestStartAndShutdownWithWork(t *testing.T) {
|
|
r := &CountingReconciler{}
|
|
reporter := &FakeStatsReporter{}
|
|
impl := NewContext(context.TODO(), r, ControllerOptions{
|
|
Logger: TestLogger(t),
|
|
WorkQueueName: "Testing",
|
|
Reporter: reporter,
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer close(doneCh)
|
|
StartAll(ctx, impl)
|
|
}()
|
|
t.Cleanup(func() {
|
|
cancel()
|
|
<-doneCh
|
|
})
|
|
|
|
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
|
|
|
|
select {
|
|
case <-time.After(10 * time.Millisecond):
|
|
// We don't expect completion before the context is cancelled.
|
|
case <-doneCh:
|
|
t.Error("StartAll finished early.")
|
|
}
|
|
cancel()
|
|
|
|
select {
|
|
case <-time.After(time.Second):
|
|
t.Error("Timed out waiting for controller to finish.")
|
|
case <-doneCh:
|
|
// We expect the work to complete.
|
|
}
|
|
|
|
if got, want := r.count.Load(), int32(1); got != want {
|
|
t.Errorf("reconcile count = %v, wanted %v", got, want)
|
|
}
|
|
if got, want := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want {
|
|
t.Errorf("requeues = %v, wanted %v", got, want)
|
|
}
|
|
|
|
checkStats(t, reporter, 1, 0, 1, trueString)
|
|
}
|
|
|
|
type fakeError struct{}
|
|
|
|
var _ error = (*fakeError)(nil)
|
|
|
|
func (*fakeError) Error() string {
|
|
return "I always error"
|
|
}
|
|
|
|
func TestPermanentError(t *testing.T) {
|
|
err := new(fakeError)
|
|
permErr := NewPermanentError(err)
|
|
if !IsPermanentError(permErr) {
|
|
t.Errorf("Expected type %T to be a permanentError", permErr)
|
|
}
|
|
if IsPermanentError(err) {
|
|
t.Errorf("Expected type %T to not be a permanentError", err)
|
|
}
|
|
|
|
wrapPermErr := fmt.Errorf("wrapped: %w", permErr)
|
|
if !IsPermanentError(wrapPermErr) {
|
|
t.Error("Expected wrapped permanentError to be equivalent to a permanentError")
|
|
}
|
|
|
|
unwrapErr := new(fakeError)
|
|
if !errors.As(permErr, &unwrapErr) {
|
|
t.Errorf("Could not unwrap %T from permanentError", unwrapErr)
|
|
}
|
|
}
|
|
|
|
func TestRequeueKey(t *testing.T) {
|
|
err := new(fakeError)
|
|
reqErr := NewRequeueImmediately()
|
|
if ok, _ := IsRequeueKey(reqErr); !ok {
|
|
t.Errorf("Expected type %T to be a requeueKeyError", reqErr)
|
|
}
|
|
if ok, _ := IsRequeueKey(err); ok {
|
|
t.Errorf("Expected type %T to not be a requeueKeyError", err)
|
|
}
|
|
|
|
want := 10 * time.Minute
|
|
reqErr = NewRequeueAfter(want)
|
|
wrapReqErr := fmt.Errorf("wrapped: %w", reqErr)
|
|
if ok, got := IsRequeueKey(wrapReqErr); !ok {
|
|
t.Error("Expected wrapped requeueKeyError to be equivalent to a requeueKeyError")
|
|
} else if want != got {
|
|
t.Errorf("IsRequeueKey() = (true, %v), wanted (true, %v)", got, want)
|
|
}
|
|
}
|
|
|
|
type errorReconciler struct{}
|
|
|
|
func (er *errorReconciler) Reconcile(context.Context, string) error {
|
|
return new(fakeError)
|
|
}
|
|
|
|
func TestStartAndShutdownWithErroringWork(t *testing.T) {
|
|
const testTimeout = 500 * time.Millisecond
|
|
|
|
item := types.NamespacedName{Namespace: "", Name: "bar"}
|
|
|
|
impl := NewContext(context.TODO(), &errorReconciler{}, ControllerOptions{
|
|
Logger: TestLogger(t),
|
|
WorkQueueName: "Testing",
|
|
Reporter: &FakeStatsReporter{},
|
|
})
|
|
impl.EnqueueKey(item)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer close(doneCh)
|
|
// StartAll blocks until all the worker threads finish, which shouldn't
|
|
// be until we cancel the context.
|
|
StartAll(ctx, impl)
|
|
}()
|
|
t.Cleanup(func() {
|
|
cancel()
|
|
<-doneCh
|
|
})
|
|
|
|
// Keep checking the number of requeues, send to channel to indicate success.
|
|
itemRequeued := make(chan struct{})
|
|
defer close(itemRequeued)
|
|
|
|
var successCheck wait.ConditionWithContextFunc = func(context.Context) (bool, error) {
|
|
// Check that the work was requeued in RateLimiter, as NumRequeues
|
|
// can't fully reflect the real state of queue length.
|
|
// Here we need to wait for NumRequeues to be more than 1, to ensure
|
|
// the key get re-queued and reprocessed as expect.
|
|
if impl.WorkQueue().NumRequeues(item) > 1 {
|
|
itemRequeued <- struct{}{}
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
}
|
|
go wait.PollUntilContextCancel(ctx, 5*time.Millisecond, true, successCheck)
|
|
|
|
select {
|
|
case <-itemRequeued:
|
|
// shut down reconciler
|
|
cancel()
|
|
|
|
case <-doneCh:
|
|
t.Fatal("StartAll finished early")
|
|
|
|
case <-ctx.Done():
|
|
t.Fatal("Timed out waiting for item to be requeued")
|
|
}
|
|
}
|
|
|
|
type permanentErrorReconciler struct{}
|
|
|
|
func (er *permanentErrorReconciler) Reconcile(context.Context, string) error {
|
|
return NewPermanentError(new(fakeError))
|
|
}
|
|
|
|
func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) {
|
|
r := &permanentErrorReconciler{}
|
|
reporter := &FakeStatsReporter{}
|
|
impl := NewContext(context.TODO(), r, ControllerOptions{
|
|
Logger: TestLogger(t),
|
|
WorkQueueName: "Testing",
|
|
Reporter: reporter,
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer close(doneCh)
|
|
StartAll(ctx, impl)
|
|
}()
|
|
t.Cleanup(func() {
|
|
cancel()
|
|
<-doneCh
|
|
})
|
|
|
|
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
|
|
|
|
select {
|
|
case <-time.After(20 * time.Millisecond):
|
|
// We don't expect completion before the context is cancelled.
|
|
case <-doneCh:
|
|
t.Error("StartAll finished early.")
|
|
}
|
|
cancel()
|
|
|
|
select {
|
|
case <-time.After(time.Second):
|
|
t.Error("Timed out waiting for controller to finish.")
|
|
case <-doneCh:
|
|
// We expect the work to complete.
|
|
}
|
|
|
|
// Check that the work was not requeued in RateLimiter.
|
|
if got, want := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want {
|
|
t.Errorf("Requeue count = %v, wanted %v", got, want)
|
|
}
|
|
|
|
checkStats(t, reporter, 1, 0, 1, falseString)
|
|
}
|
|
|
|
type requeueAfterReconciler struct {
|
|
duration time.Duration
|
|
}
|
|
|
|
func (er *requeueAfterReconciler) Reconcile(context.Context, string) error {
|
|
return NewRequeueAfter(er.duration)
|
|
}
|
|
|
|
func TestStartAndShutdownWithRequeuingWork(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
minimum int
|
|
duration time.Duration
|
|
}{{
|
|
name: "no duration",
|
|
minimum: 100,
|
|
}, {
|
|
name: "small duration",
|
|
minimum: 2,
|
|
duration: 100 * time.Millisecond,
|
|
}}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
r := &requeueAfterReconciler{duration: test.duration}
|
|
reporter := &FakeStatsReporter{}
|
|
impl := NewContext(context.TODO(), r, ControllerOptions{
|
|
Logger: TestLogger(t),
|
|
WorkQueueName: "Testing",
|
|
Reporter: reporter,
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer close(doneCh)
|
|
StartAll(ctx, impl)
|
|
}()
|
|
t.Cleanup(func() {
|
|
cancel()
|
|
<-doneCh
|
|
})
|
|
|
|
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
|
|
|
|
select {
|
|
case <-time.After(20 * time.Millisecond):
|
|
// We don't expect completion before the context is cancelled.
|
|
case <-doneCh:
|
|
t.Error("StartAll finished early.")
|
|
}
|
|
cancel()
|
|
|
|
select {
|
|
case <-time.After(time.Second):
|
|
t.Error("Timed out waiting for controller to finish.")
|
|
case <-doneCh:
|
|
// We expect the work to complete.
|
|
}
|
|
|
|
// Check that the work was not requeued in RateLimiter.
|
|
if got, wantAtLeast := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), test.minimum; got >= wantAtLeast {
|
|
t.Errorf("Requeue count = %v, wanted at least %v", got, wantAtLeast)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func drainWorkQueue(wq workqueue.TypedRateLimitingInterface[any]) (hasQueue []types.NamespacedName) {
|
|
for {
|
|
key, shutdown := wq.Get()
|
|
if key == nil && shutdown {
|
|
break
|
|
}
|
|
hasQueue = append(hasQueue, key.(types.NamespacedName))
|
|
}
|
|
return
|
|
}
|
|
|
|
type fakeInformer struct {
|
|
cache.SharedInformer
|
|
}
|
|
|
|
type fakeStore struct {
|
|
cache.Store
|
|
}
|
|
|
|
func (*fakeInformer) GetStore() cache.Store {
|
|
return &fakeStore{}
|
|
}
|
|
|
|
var (
|
|
fakeKeys = []string{"foo/bar", "bar/foo", "fizz/buzz"}
|
|
fakeObjs = []interface{}{
|
|
&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "bar",
|
|
Namespace: "foo",
|
|
},
|
|
},
|
|
&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: "bar",
|
|
},
|
|
},
|
|
&Resource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "buzz",
|
|
Namespace: "fizz",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
|
|
func (*fakeStore) ListKeys() []string {
|
|
return fakeKeys
|
|
}
|
|
|
|
func (*fakeStore) List() []interface{} {
|
|
return fakeObjs
|
|
}
|
|
|
|
func TestImplGlobalResync(t *testing.T) {
|
|
r := &CountingReconciler{}
|
|
impl := NewContext(context.TODO(), r, ControllerOptions{
|
|
Logger: TestLogger(t),
|
|
WorkQueueName: "Testing",
|
|
Reporter: &FakeStatsReporter{},
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer close(doneCh)
|
|
StartAll(ctx, impl)
|
|
}()
|
|
t.Cleanup(func() {
|
|
cancel()
|
|
<-doneCh
|
|
})
|
|
|
|
impl.GlobalResync(&fakeInformer{})
|
|
|
|
// The global resync delays enqueuing things by a second with a jitter that
|
|
// goes up to len(fakeObjs) times a second: time.Duration(1+len(fakeObjs)) * time.Second.
|
|
// In this test, the fast lane is empty, so we can assume immediate enqueuing.
|
|
select {
|
|
case <-time.After(50 * time.Millisecond):
|
|
// We don't expect completion before the context is cancelled.
|
|
case <-doneCh:
|
|
t.Error("StartAll finished early.")
|
|
}
|
|
cancel()
|
|
|
|
select {
|
|
case <-time.After(time.Second):
|
|
t.Error("Timed out waiting for controller to finish.")
|
|
case <-doneCh:
|
|
// We expect the work to complete.
|
|
}
|
|
|
|
if got, want := r.count.Load(), int32(3); want != got {
|
|
t.Errorf("GlobalResync: want = %v, got = %v", want, got)
|
|
}
|
|
}
|
|
|
|
func checkStats(t *testing.T, r *FakeStatsReporter, reportCount, lastQueueDepth, reconcileCount int, lastReconcileSuccess string) {
|
|
qd := r.GetQueueDepths()
|
|
if got, want := len(qd), reportCount; got != want {
|
|
t.Errorf("Queue depth reports = %v, wanted %v", got, want)
|
|
}
|
|
if got, want := qd[len(qd)-1], int64(lastQueueDepth); got != want {
|
|
t.Errorf("Queue depth report = %v, wanted %v", got, want)
|
|
}
|
|
rd := r.GetReconcileData()
|
|
if got, want := len(rd), reconcileCount; got != want {
|
|
t.Errorf("Reconcile reports = %v, wanted %v", got, want)
|
|
}
|
|
if got, want := rd[len(rd)-1].Success, lastReconcileSuccess; got != want {
|
|
t.Errorf("Reconcile success = %v, wanted %v", got, want)
|
|
}
|
|
}
|
|
|
|
type fixedInformer struct {
|
|
m sync.Mutex
|
|
sunk bool
|
|
done bool
|
|
}
|
|
|
|
var _ Informer = (*fixedInformer)(nil)
|
|
|
|
func (fi *fixedInformer) Run(stopCh <-chan struct{}) {
|
|
<-stopCh
|
|
|
|
fi.m.Lock()
|
|
defer fi.m.Unlock()
|
|
fi.done = true
|
|
}
|
|
|
|
func (fi *fixedInformer) HasSynced() bool {
|
|
fi.m.Lock()
|
|
defer fi.m.Unlock()
|
|
return fi.sunk
|
|
}
|
|
|
|
func (fi *fixedInformer) ToggleSynced(b bool) {
|
|
fi.m.Lock()
|
|
defer fi.m.Unlock()
|
|
fi.sunk = b
|
|
}
|
|
|
|
func (fi *fixedInformer) Done() bool {
|
|
fi.m.Lock()
|
|
defer fi.m.Unlock()
|
|
return fi.done
|
|
}
|
|
|
|
func TestStartInformersSuccess(t *testing.T) {
|
|
errCh := make(chan error)
|
|
defer close(errCh)
|
|
|
|
fi := &fixedInformer{sunk: true}
|
|
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
go func() {
|
|
errCh <- StartInformers(stopCh, fi)
|
|
}()
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
t.Error("Unexpected error:", err)
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Error("Timed out waiting for informers to sync.")
|
|
}
|
|
}
|
|
|
|
func TestStartInformersEventualSuccess(t *testing.T) {
|
|
errCh := make(chan error)
|
|
defer close(errCh)
|
|
|
|
fi := &fixedInformer{sunk: false}
|
|
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
go func() {
|
|
errCh <- StartInformers(stopCh, fi)
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Wait a brief period to ensure nothing is sent.
|
|
case err := <-errCh:
|
|
t.Fatal("Unexpected send on errCh:", err)
|
|
}
|
|
|
|
// Let the Sync complete.
|
|
fi.ToggleSynced(true)
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
t.Error("Unexpected error:", err)
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Error("Timed out waiting for informers to sync.")
|
|
}
|
|
}
|
|
|
|
func TestStartInformersFailure(t *testing.T) {
|
|
errCh := make(chan error)
|
|
defer close(errCh)
|
|
|
|
fi := &fixedInformer{sunk: false}
|
|
|
|
stopCh := make(chan struct{})
|
|
go func() {
|
|
errCh <- StartInformers(stopCh, fi)
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Wait a brief period to ensure nothing is sent.
|
|
case err := <-errCh:
|
|
t.Fatal("Unexpected send on errCh:", err)
|
|
}
|
|
|
|
// Now close the stopCh and we should see an error sent.
|
|
close(stopCh)
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
if err == nil {
|
|
t.Error("Unexpected success syncing informers after stopCh closed.")
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Error("Timed out waiting for informers to sync.")
|
|
}
|
|
}
|
|
|
|
func TestRunInformersSuccess(t *testing.T) {
|
|
errCh := make(chan error)
|
|
defer close(errCh)
|
|
|
|
fi := &fixedInformer{sunk: true}
|
|
|
|
stopCh := make(chan struct{})
|
|
go func() {
|
|
_, err := RunInformers(stopCh, fi)
|
|
errCh <- err
|
|
}()
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatal("Timed out waiting for informers to sync.")
|
|
}
|
|
|
|
close(stopCh)
|
|
}
|
|
|
|
func TestRunInformersEventualSuccess(t *testing.T) {
|
|
errCh := make(chan error)
|
|
defer close(errCh)
|
|
|
|
fi := &fixedInformer{sunk: false}
|
|
|
|
stopCh := make(chan struct{})
|
|
go func() {
|
|
_, err := RunInformers(stopCh, fi)
|
|
errCh <- err
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Wait a brief period to ensure nothing is sent.
|
|
case err := <-errCh:
|
|
t.Fatal("Unexpected send on errCh:", err)
|
|
}
|
|
|
|
// Let the Sync complete.
|
|
fi.ToggleSynced(true)
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatal("Timed out waiting for informers to sync.")
|
|
}
|
|
|
|
close(stopCh)
|
|
}
|
|
|
|
func TestRunInformersFailure(t *testing.T) {
|
|
errCh := make(chan error)
|
|
defer close(errCh)
|
|
|
|
fi := &fixedInformer{sunk: false}
|
|
|
|
stopCh := make(chan struct{})
|
|
go func() {
|
|
_, err := RunInformers(stopCh, fi)
|
|
errCh <- err
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Wait a brief period to ensure nothing is sent.
|
|
case err := <-errCh:
|
|
t.Fatal("Unexpected send on errCh:", err)
|
|
}
|
|
|
|
// Now close the stopCh and we should see an error sent.
|
|
close(stopCh)
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
if err == nil {
|
|
t.Fatal("Unexpected success syncing informers after stopCh closed.")
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatal("Timed out waiting for informers to sync.")
|
|
}
|
|
}
|
|
|
|
func TestRunInformersFinished(t *testing.T) {
|
|
fi := &fixedInformer{sunk: true}
|
|
defer func() {
|
|
if !fi.Done() {
|
|
t.Fatalf("Test didn't wait for informers to finish")
|
|
}
|
|
}()
|
|
|
|
ctx, cancel := context.WithCancel(TestContextWithLogger(t))
|
|
t.Cleanup(cancel)
|
|
|
|
waitInformers, err := RunInformers(ctx.Done(), fi)
|
|
if err != nil {
|
|
t.Fatal("Failed to start informers:", err)
|
|
}
|
|
|
|
cancel()
|
|
|
|
ch := make(chan struct{})
|
|
go func() {
|
|
waitInformers()
|
|
ch <- struct{}{}
|
|
}()
|
|
|
|
select {
|
|
case <-ch:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("Timed out waiting for informers to finish.")
|
|
}
|
|
}
|
|
|
|
func TestGetResyncPeriod(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
if got := GetResyncPeriod(ctx); got != DefaultResyncPeriod {
|
|
t.Errorf("GetResyncPeriod() = %v, wanted %v", got, nil)
|
|
}
|
|
|
|
bob := 30 * time.Second
|
|
ctx = WithResyncPeriod(ctx, bob)
|
|
|
|
if want, got := bob, GetResyncPeriod(ctx); got != want {
|
|
t.Errorf("GetResyncPeriod() = %v, wanted %v", got, want)
|
|
}
|
|
|
|
tribob := 90 * time.Second
|
|
if want, got := tribob, GetTrackerLease(ctx); got != want {
|
|
t.Errorf("GetTrackerLease() = %v, wanted %v", got, want)
|
|
}
|
|
}
|
|
|
|
func TestGetEventRecorder(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
if got := GetEventRecorder(ctx); got != nil {
|
|
t.Errorf("GetEventRecorder() = %v, wanted nil", got)
|
|
}
|
|
|
|
ctx = WithEventRecorder(ctx, record.NewFakeRecorder(1000))
|
|
|
|
if got := GetEventRecorder(ctx); got == nil {
|
|
t.Error("GetEventRecorder() = nil, wanted non-nil")
|
|
}
|
|
}
|