/* Copyright 2022 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package generic_test import ( "context" "errors" "fmt" "reflect" "sync" "sync/atomic" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic" clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" ) type testInformer struct { cache.SharedIndexInformer lock sync.Mutex registrations map[interface{}]struct{} } func (t *testInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { res, err := t.SharedIndexInformer.AddEventHandler(handler) if err != nil { return res, err } func() { t.lock.Lock() defer t.lock.Unlock() if t.registrations == nil { t.registrations = make(map[interface{}]struct{}) } t.registrations[res] = struct{}{} }() return res, err } func (t *testInformer) RemoveEventHandler(registration cache.ResourceEventHandlerRegistration) error { func() { t.lock.Lock() defer t.lock.Unlock() if _, ok := t.registrations[registration]; !ok { panic("removing unknown event handler?") } delete(t.registrations, registration) }() return t.SharedIndexInformer.RemoveEventHandler(registration) } var ( scheme *runtime.Scheme = runtime.NewScheme() codecs serializer.CodecFactory = serializer.NewCodecFactory(scheme) fakeGVR schema.GroupVersionResource = schema.GroupVersionResource{ Group: "fake.example.com", Version: "v1", Resource: "fakes", } fakeGVK schema.GroupVersionKind = fakeGVR.GroupVersion().WithKind("Fake") fakeGVKList schema.GroupVersionKind = fakeGVR.GroupVersion().WithKind("FakeList") ) func init() { scheme.AddKnownTypeWithName(fakeGVK, &unstructured.Unstructured{}) scheme.AddKnownTypeWithName(fakeGVKList, &unstructured.UnstructuredList{}) } func setupTest(ctx context.Context, customReconciler func(string, string, runtime.Object) error) ( tracker clienttesting.ObjectTracker, controller generic.Controller[*unstructured.Unstructured], informer *testInformer, waitForReconcile func(runtime.Object) error, ) { tracker = clienttesting.NewObjectTracker(scheme, codecs.UniversalDecoder()) reconciledObjects := make(chan runtime.Object) // Set up fake informers that return instances of mock Policy definitoins // and mock policy bindings informer = &testInformer{SharedIndexInformer: cache.NewSharedIndexInformer(&cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return tracker.List(fakeGVR, fakeGVK, "") }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return tracker.Watch(fakeGVR, "") }, }, &unstructured.Unstructured{}, 30*time.Second, nil)} reconciler := func(namespace, name string, newObj *unstructured.Unstructured) error { var err error copied := newObj.DeepCopyObject() if customReconciler != nil { err = customReconciler(namespace, name, newObj) } reconciledObjects <- copied return err } waitForReconcile = func(obj runtime.Object) error { select { case reconciledObj := <-reconciledObjects: if reflect.DeepEqual(obj, reconciledObj) { return nil } return fmt.Errorf("expected equal objects: %v", cmp.Diff(obj, reconciledObj)) case <-ctx.Done(): return fmt.Errorf("context done before reconcile: %w", ctx.Err()) } } myController := generic.NewController( generic.NewInformer[*unstructured.Unstructured](informer), reconciler, generic.ControllerOptions{}, ) go func() { <-ctx.Done() close(reconciledObjects) for leftover := range reconciledObjects { panic(fmt.Errorf("leftover object which was not anticipated by test: %v", leftover)) } }() return tracker, myController, informer, waitForReconcile } func TestReconcile(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) defer testCancel() tracker, myController, informer, waitForReconcile := setupTest(testContext, nil) // Add object to informer initialObject := &unstructured.Unstructured{} initialObject.SetUnstructuredContent(map[string]interface{}{ "metadata": map[string]interface{}{ "name": "object1", "resourceVersion": "1", }, }) initialObject.SetGroupVersionKind(fakeGVK) require.NoError(t, tracker.Add(initialObject)) wg := sync.WaitGroup{} // Start informer wg.Add(1) go func() { defer wg.Done() informer.Run(testContext.Done()) }() // Start controller wg.Add(1) go func() { defer wg.Done() stopReason := myController.Run(testContext) require.ErrorIs(t, stopReason, context.Canceled) }() require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) // Wait for all enqueued reconciliations require.NoError(t, waitForReconcile(initialObject)) // Updated object updatedObject := &unstructured.Unstructured{} updatedObject.SetUnstructuredContent(map[string]interface{}{ "metadata": map[string]interface{}{ "name": "object1", "resourceVersion": "2", }, "newKey": "a key", }) updatedObject.SetGroupVersionKind(fakeGVK) require.NoError(t, tracker.Update(fakeGVR, updatedObject, "")) // Wait for all enqueued reconciliations require.NoError(t, waitForReconcile(updatedObject)) require.NoError(t, tracker.Delete(fakeGVR, updatedObject.GetNamespace(), updatedObject.GetName())) require.NoError(t, waitForReconcile(nil)) testCancel() wg.Wait() } func TestShutdown(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) defer testCancel() _, myController, informer, _ := setupTest(testContext, nil) wg := sync.WaitGroup{} // Start informer wg.Add(1) go func() { defer wg.Done() informer.Run(testContext.Done()) }() // Start controller wg.Add(1) go func() { defer wg.Done() stopReason := myController.Run(testContext) require.ErrorIs(t, stopReason, context.Canceled) }() // Wait for controller and informer to start up require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) // Stop the controller and informer testCancel() // Wait for controller and informer to stop wg.Wait() // Ensure the event handler was cleaned up require.Empty(t, informer.registrations) } // Show an error is thrown informer isn't started when the controller runs func TestInformerNeverStarts(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 400*time.Millisecond) defer testCancel() _, myController, informer, _ := setupTest(testContext, nil) wg := sync.WaitGroup{} // Start controller wg.Add(1) go func() { defer wg.Done() stopReason := myController.Run(testContext) require.ErrorIs(t, stopReason, context.DeadlineExceeded) }() // Wait for deadline to pass without syncing the cache require.False(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) // Wait for controller to stop (or context deadline will pass quickly) wg.Wait() // Ensure there are no event handlers require.Empty(t, informer.registrations) } // Shows that if RV does not change, the reconciler does not get called func TestIgnoredUpdate(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) defer testCancel() tracker, myController, informer, waitForReconcile := setupTest(testContext, nil) // Add object to informer initialObject := &unstructured.Unstructured{} initialObject.SetUnstructuredContent(map[string]interface{}{ "metadata": map[string]interface{}{ "name": "object1", "resourceVersion": "1", }, }) initialObject.SetGroupVersionKind(fakeGVK) require.NoError(t, tracker.Add(initialObject)) wg := sync.WaitGroup{} // Start informer wg.Add(1) go func() { defer wg.Done() informer.Run(testContext.Done()) }() // Start controller wg.Add(1) go func() { defer wg.Done() stopReason := myController.Run(testContext) require.ErrorIs(t, stopReason, context.Canceled) }() require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) // Wait for all enqueued reconciliations require.NoError(t, waitForReconcile(initialObject)) // Send update with the same object require.NoError(t, tracker.Update(fakeGVR, initialObject, "")) // Don't wait for it to be reconciled testCancel() wg.Wait() // Test infrastructure has logic to panic if there are any reconciled objects // that weren't "expected" } // Shows that an object which fails reconciliation will retry func TestReconcileRetry(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) defer testCancel() calls := atomic.Uint64{} success := atomic.Bool{} tracker, myController, _, waitForReconcile := setupTest(testContext, func(s1, s2 string, o runtime.Object) error { if calls.Add(1) > 2 { // Suddenly start liking the object success.Store(true) return nil } return errors.New("i dont like this object") }) // Start informer wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() myController.Informer().Run(testContext.Done()) }() // Start controller wg.Add(1) go func() { defer wg.Done() stopReason := myController.Run(testContext) require.ErrorIs(t, stopReason, context.Canceled) }() // Add object to informer initialObject := &unstructured.Unstructured{} initialObject.SetUnstructuredContent(map[string]interface{}{ "metadata": map[string]interface{}{ "name": "object1", "resourceVersion": "1", }, }) initialObject.SetGroupVersionKind(fakeGVK) require.NoError(t, tracker.Add(initialObject)) require.NoError(t, waitForReconcile(initialObject), "initial reconcile") require.NoError(t, waitForReconcile(initialObject), "previous reconcile failed, should retry quickly") require.NoError(t, waitForReconcile(initialObject), "previous reconcile failed, should retry quickly") // Will not try again since calls > 2 for last reconcile require.True(t, success.Load(), "last call to reconcile should return success") testCancel() wg.Wait() } func TestInformerList(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) defer testCancel() tracker, myController, _, _ := setupTest(testContext, nil) wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() myController.Informer().Run(testContext.Done()) }() require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) object1 := &unstructured.Unstructured{} object1.SetUnstructuredContent(map[string]interface{}{ "metadata": map[string]interface{}{ "name": "object1", "resourceVersion": "object1", }, }) object1.SetGroupVersionKind(fakeGVK) object1v2 := &unstructured.Unstructured{} object1v2.SetUnstructuredContent(map[string]interface{}{ "metadata": map[string]interface{}{ "name": "object1", "resourceVersion": "object1v2", }, }) object1v2.SetGroupVersionKind(fakeGVK) object2 := &unstructured.Unstructured{} object2.SetUnstructuredContent(map[string]interface{}{ "metadata": map[string]interface{}{ "name": "object2", "resourceVersion": "object2", }, }) object2.SetGroupVersionKind(fakeGVK) object3 := &unstructured.Unstructured{} object3.SetUnstructuredContent(map[string]interface{}{ "metadata": map[string]interface{}{ "name": "object3", "resourceVersion": "object3", }, }) object3.SetGroupVersionKind(fakeGVK) namespacedObject1 := &unstructured.Unstructured{} namespacedObject1.SetUnstructuredContent(map[string]interface{}{ "metadata": map[string]interface{}{ "name": "namespacedObject1", "namespace": "test", "resourceVersion": "namespacedObject1", }, }) namespacedObject1.SetGroupVersionKind(fakeGVK) namespacedObject2 := &unstructured.Unstructured{} namespacedObject2.SetUnstructuredContent(map[string]interface{}{ "metadata": map[string]interface{}{ "name": "namespacedObject2", "namespace": "test", "resourceVersion": "namespacedObject2", }, }) namespacedObject2.SetGroupVersionKind(fakeGVK) require.NoError(t, tracker.Add(object1)) require.NoError(t, tracker.Add(object2)) require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) { return myController.Informer().LastSyncResourceVersion() == object2.GetResourceVersion(), nil })) values, err := myController.Informer().List(labels.Everything()) require.NoError(t, err) require.ElementsMatch(t, []*unstructured.Unstructured{object1, object2}, values) require.NoError(t, tracker.Update(fakeGVR, object1v2, object1v2.GetNamespace())) require.NoError(t, tracker.Delete(fakeGVR, object2.GetNamespace(), object2.GetName())) require.NoError(t, tracker.Add(object3)) require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) { return myController.Informer().LastSyncResourceVersion() == object3.GetResourceVersion(), nil })) values, err = myController.Informer().List(labels.Everything()) require.NoError(t, err) require.ElementsMatch(t, []*unstructured.Unstructured{object1v2, object3}, values) require.NoError(t, tracker.Add(namespacedObject1)) require.NoError(t, tracker.Add(namespacedObject2)) require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) { return myController.Informer().LastSyncResourceVersion() == namespacedObject2.GetResourceVersion(), nil })) values, err = myController.Informer().Namespaced(namespacedObject1.GetNamespace()).List(labels.Everything()) require.NoError(t, err) require.ElementsMatch(t, []*unstructured.Unstructured{namespacedObject1, namespacedObject2}, values) value, err := myController.Informer().Get(object3.GetName()) require.NoError(t, err) require.Equal(t, value, object3) value, err = myController.Informer().Namespaced(namespacedObject1.GetNamespace()).Get(namespacedObject1.GetName()) require.NoError(t, err) require.Equal(t, value, namespacedObject1) _, err = myController.Informer().Get("fakeobjectname") require.True(t, k8serrors.IsNotFound(err)) _, err = myController.Informer().Namespaced("test").Get("fakeobjectname") require.True(t, k8serrors.IsNotFound(err)) _, err = myController.Informer().Namespaced("fakenamespace").Get("fakeobjectname") require.True(t, k8serrors.IsNotFound(err)) }