diff --git a/pkg/admission/configuration/mutating_webhook_manager.go b/pkg/admission/configuration/mutating_webhook_manager.go index ea58e6c32..79fb0fc8f 100644 --- a/pkg/admission/configuration/mutating_webhook_manager.go +++ b/pkg/admission/configuration/mutating_webhook_manager.go @@ -36,11 +36,6 @@ type mutatingWebhookConfigurationManager struct { configuration *atomic.Value lister admissionregistrationlisters.MutatingWebhookConfigurationLister hasSynced func() bool - // initialConfigurationSynced tracks if - // the existing webhook configs have been synced (honored) by the - // manager at startup-- the informer has synced and either has no items - // or has finished executing updateConfiguration() once. - initialConfigurationSynced *atomic.Bool } var _ generic.Source = &mutatingWebhookConfigurationManager{} @@ -48,23 +43,25 @@ var _ generic.Source = &mutatingWebhookConfigurationManager{} func NewMutatingWebhookConfigurationManager(f informers.SharedInformerFactory) generic.Source { informer := f.Admissionregistration().V1().MutatingWebhookConfigurations() manager := &mutatingWebhookConfigurationManager{ - configuration: &atomic.Value{}, - lister: informer.Lister(), - hasSynced: informer.Informer().HasSynced, - initialConfigurationSynced: &atomic.Bool{}, + configuration: &atomic.Value{}, + lister: informer.Lister(), } // Start with an empty list manager.configuration.Store([]webhook.WebhookAccessor{}) - manager.initialConfigurationSynced.Store(false) // On any change, rebuild the config - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + // TODO: the initial sync for this is N ^ 2, ideally we should make it N. + handler, _ := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(_ interface{}) { manager.updateConfiguration() }, UpdateFunc: func(_, _ interface{}) { manager.updateConfiguration() }, DeleteFunc: func(_ interface{}) { manager.updateConfiguration() }, }) + // Since our processing is synchronous, this is all we need to do to + // see if we have processed everything or not. + manager.hasSynced = handler.HasSynced + return manager } @@ -73,28 +70,9 @@ func (m *mutatingWebhookConfigurationManager) Webhooks() []webhook.WebhookAccess return m.configuration.Load().([]webhook.WebhookAccessor) } -// HasSynced returns true when the manager is synced with existing webhookconfig -// objects at startup-- which means the informer is synced and either has no items -// or updateConfiguration() has completed. -func (m *mutatingWebhookConfigurationManager) HasSynced() bool { - if !m.hasSynced() { - return false - } - if m.initialConfigurationSynced.Load() { - // the informer has synced and configuration has been updated - return true - } - if configurations, err := m.lister.List(labels.Everything()); err == nil && len(configurations) == 0 { - // the empty list we initially stored is valid to use. - // Setting initialConfigurationSynced to true, so subsequent checks - // would be able to take the fast path on the atomic boolean in a - // cluster without any admission webhooks configured. - m.initialConfigurationSynced.Store(true) - // the informer has synced and we don't have any items - return true - } - return false -} +// HasSynced returns true if the initial set of mutating webhook configurations +// has been loaded. +func (m *mutatingWebhookConfigurationManager) HasSynced() bool { return m.hasSynced() } func (m *mutatingWebhookConfigurationManager) updateConfiguration() { configurations, err := m.lister.List(labels.Everything()) @@ -103,7 +81,6 @@ func (m *mutatingWebhookConfigurationManager) updateConfiguration() { return } m.configuration.Store(mergeMutatingWebhookConfigurations(configurations)) - m.initialConfigurationSynced.Store(true) } func mergeMutatingWebhookConfigurations(configurations []*v1.MutatingWebhookConfiguration) []webhook.WebhookAccessor { diff --git a/pkg/admission/configuration/validating_webhook_manager.go b/pkg/admission/configuration/validating_webhook_manager.go index 00f954251..da8035674 100644 --- a/pkg/admission/configuration/validating_webhook_manager.go +++ b/pkg/admission/configuration/validating_webhook_manager.go @@ -36,11 +36,6 @@ type validatingWebhookConfigurationManager struct { configuration *atomic.Value lister admissionregistrationlisters.ValidatingWebhookConfigurationLister hasSynced func() bool - // initialConfigurationSynced tracks if - // the existing webhook configs have been synced (honored) by the - // manager at startup-- the informer has synced and either has no items - // or has finished executing updateConfiguration() once. - initialConfigurationSynced *atomic.Bool } var _ generic.Source = &validatingWebhookConfigurationManager{} @@ -48,23 +43,25 @@ var _ generic.Source = &validatingWebhookConfigurationManager{} func NewValidatingWebhookConfigurationManager(f informers.SharedInformerFactory) generic.Source { informer := f.Admissionregistration().V1().ValidatingWebhookConfigurations() manager := &validatingWebhookConfigurationManager{ - configuration: &atomic.Value{}, - lister: informer.Lister(), - hasSynced: informer.Informer().HasSynced, - initialConfigurationSynced: &atomic.Bool{}, + configuration: &atomic.Value{}, + lister: informer.Lister(), } // Start with an empty list manager.configuration.Store([]webhook.WebhookAccessor{}) - manager.initialConfigurationSynced.Store(false) // On any change, rebuild the config - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + // TODO: the initial sync for this is N ^ 2, ideally we should make it N. + handle, _ := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(_ interface{}) { manager.updateConfiguration() }, UpdateFunc: func(_, _ interface{}) { manager.updateConfiguration() }, DeleteFunc: func(_ interface{}) { manager.updateConfiguration() }, }) + // Since our processing is synchronous, this is all we need to do to + // see if we have processed everything or not. + manager.hasSynced = handle.HasSynced + return manager } @@ -73,29 +70,9 @@ func (v *validatingWebhookConfigurationManager) Webhooks() []webhook.WebhookAcce return v.configuration.Load().([]webhook.WebhookAccessor) } -// HasSynced returns true when the manager is synced with existing webhookconfig -// objects at startup-- which means the informer is synced and either has no items -// or updateConfiguration() has completed. -func (v *validatingWebhookConfigurationManager) HasSynced() bool { - if !v.hasSynced() { - return false - } - if v.initialConfigurationSynced.Load() { - // the informer has synced and configuration has been updated - return true - } - if configurations, err := v.lister.List(labels.Everything()); err == nil && len(configurations) == 0 { - // the empty list we initially stored is valid to use. - // Setting initialConfigurationSynced to true, so subsequent checks - // would be able to take the fast path on the atomic boolean in a - // cluster without any admission webhooks configured. - v.initialConfigurationSynced.Store(true) - // the informer has synced and we don't have any items - return true - } - return false - -} +// HasSynced returns true if the initial set of mutating webhook configurations +// has been loaded. +func (v *validatingWebhookConfigurationManager) HasSynced() bool { return v.hasSynced() } func (v *validatingWebhookConfigurationManager) updateConfiguration() { configurations, err := v.lister.List(labels.Everything()) @@ -104,7 +81,6 @@ func (v *validatingWebhookConfigurationManager) updateConfiguration() { return } v.configuration.Store(mergeValidatingWebhookConfigurations(configurations)) - v.initialConfigurationSynced.Store(true) } func mergeValidatingWebhookConfigurations(configurations []*v1.ValidatingWebhookConfiguration) []webhook.WebhookAccessor { diff --git a/pkg/admission/plugin/validatingadmissionpolicy/controller.go b/pkg/admission/plugin/validatingadmissionpolicy/controller.go index 4398aa6b1..bdb2a0680 100644 --- a/pkg/admission/plugin/validatingadmissionpolicy/controller.go +++ b/pkg/admission/plugin/validatingadmissionpolicy/controller.go @@ -39,6 +39,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) var _ CELPolicyEvaluator = &celAdmissionController{} @@ -165,6 +166,7 @@ func NewAdmissionController( } func (c *celAdmissionController) Run(stopCh <-chan struct{}) { + // TODO: Doesn't this comparison need a lock? if c.runningContext != nil { return } @@ -302,9 +304,10 @@ func (c *celAdmissionController) Validate( // If the param informer for this admission policy has not yet // had time to perform an initial listing, don't attempt to use // it. - //!TOOD(alexzielenski): add a wait for a very short amount of - // time for the cache to sync - if !paramInfo.controller.HasSynced() { + //!TODO(alexzielenski): Add a shorter timeout + // than "forever" to this wait. + + if !cache.WaitForCacheSync(c.runningContext.Done(), paramInfo.controller.HasSynced) { addConfigError(fmt.Errorf("paramKind kind `%v` not yet synced to use for admission", paramKind.String()), definition, binding) continue diff --git a/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go b/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go index cbc89b518..17c4a9a64 100644 --- a/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go +++ b/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go @@ -128,7 +128,7 @@ func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name strin c.dynamicClient, paramsGVR.Resource, corev1.NamespaceAll, - 30*time.Second, + 30*time.Second, // TODO: do we really need to ever resync these? cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil, ) diff --git a/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go b/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go index bd5ea818d..e1e1b04eb 100644 --- a/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go +++ b/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/cache/synctrack" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" ) @@ -45,6 +47,11 @@ type controller[T runtime.Object] struct { reconciler func(namespace, name string, newObj T) error options ControllerOptions + + // must hold a func() bool or nil + notificationsDelivered atomic.Value + + hasProcessed synctrack.AsyncTracker[string] } type ControllerOptions struct { @@ -69,12 +76,20 @@ func NewController[T runtime.Object]( options.Name = fmt.Sprintf("%T-controller", *new(T)) } - return &controller[T]{ + c := &controller[T]{ options: options, informer: informer, reconciler: reconciler, queue: nil, } + c.hasProcessed.UpstreamHasSynced = func() bool { + f := c.notificationsDelivered.Load() + if f == nil { + return false + } + return f.(func() bool)() + } + return c } // Runs the controller and returns an error explaining why running was stopped. @@ -92,20 +107,22 @@ func (c *controller[T]) Run(ctx context.Context) error { // would never shut down the workqueue defer c.queue.ShutDown() - enqueue := func(obj interface{}) { + enqueue := func(obj interface{}, isInInitialList bool) { var key string var err error if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil { utilruntime.HandleError(err) return } + if isInInitialList { + c.hasProcessed.Start(key) + } + c.queue.Add(key) } - registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - enqueue(obj) - }, + registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{ + AddFunc: enqueue, UpdateFunc: func(oldObj, newObj interface{}) { oldMeta, err1 := meta.Accessor(oldObj) newMeta, err2 := meta.Accessor(newObj) @@ -126,13 +143,14 @@ func (c *controller[T]) Run(ctx context.Context) error { return } - enqueue(newObj) + enqueue(newObj, false) }, DeleteFunc: func(obj interface{}) { // Enqueue - enqueue(obj) + enqueue(obj, false) }, }) + c.notificationsDelivered.Store(registration.HasSynced) // Error might be raised if informer was started and stopped already if err != nil { @@ -142,6 +160,7 @@ func (c *controller[T]) Run(ctx context.Context) error { // Make sure event handler is removed from informer in case return early from // an error defer func() { + c.notificationsDelivered.Store(func() bool { return false }) // Remove event handler and Handle Error here. Error should only be raised // for improper usage of event handler API. if err := c.informer.RemoveEventHandler(registration); err != nil { @@ -188,7 +207,7 @@ func (c *controller[T]) Run(ctx context.Context) error { } func (c *controller[T]) HasSynced() bool { - return c.informer.HasSynced() + return c.hasProcessed.HasSynced() } func (c *controller[T]) runWorker() { @@ -220,6 +239,7 @@ func (c *controller[T]) runWorker() { // but the key is invalid so there is no point in doing that) return fmt.Errorf("expected string in workqueue but got %#v", obj) } + defer c.hasProcessed.Finished(key) if err := c.reconcile(key); err != nil { // Put the item back on the workqueue to handle any transient errors. diff --git a/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller_test.go b/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller_test.go index 2d3af72f0..cfd805750 100644 --- a/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller_test.go +++ b/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller_test.go @@ -106,6 +106,7 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim controller generic.Controller[*unstructured.Unstructured], informer *testInformer, waitForReconcile func(runtime.Object) error, + verifyNoMoreEvents func() bool, ) { tracker = clienttesting.NewObjectTracker(scheme, codecs.UniversalDecoder()) reconciledObjects := make(chan runtime.Object) @@ -127,7 +128,11 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim if customReconciler != nil { err = customReconciler(namespace, name, newObj) } - reconciledObjects <- copied + select { + case reconciledObjects <- copied: + case <-ctx.Done(): + panic("timed out attempting to deliver reconcile event") + } return err } @@ -149,23 +154,24 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim generic.ControllerOptions{}, ) - go func() { - <-ctx.Done() - close(reconciledObjects) - + verifyNoMoreEvents = func() bool { + close(reconciledObjects) // closing means that a future attempt to send will crash for leftover := range reconciledObjects { panic(fmt.Errorf("leftover object which was not anticipated by test: %v", leftover)) } - }() + // TODO(alexzielenski): this effectively doesn't test anything since the + // controller drops any pending events when it shuts down. + return true + } - return tracker, myController, informer, waitForReconcile + return tracker, myController, informer, waitForReconcile, verifyNoMoreEvents } func TestReconcile(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) defer testCancel() - tracker, myController, informer, waitForReconcile := setupTest(testContext, nil) + tracker, myController, informer, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, nil) // Add object to informer initialObject := &unstructured.Unstructured{} @@ -196,11 +202,16 @@ func TestReconcile(t *testing.T) { require.ErrorIs(t, stopReason, context.Canceled) }() - require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + // The controller is blocked because the reconcile function sends on an + // unbuffered channel. + require.False(t, myController.HasSynced()) // Wait for all enqueued reconciliations require.NoError(t, waitForReconcile(initialObject)) + // Now it is safe to wait for it to Sync + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + // Updated object updatedObject := &unstructured.Unstructured{} updatedObject.SetUnstructuredContent(map[string]interface{}{ @@ -220,13 +231,15 @@ func TestReconcile(t *testing.T) { testCancel() wg.Wait() + + verifyNoMoreEvents() } func TestShutdown(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) defer testCancel() - _, myController, informer, _ := setupTest(testContext, nil) + _, myController, informer, _, verifyNoMoreEvents := setupTest(testContext, nil) wg := sync.WaitGroup{} @@ -256,6 +269,8 @@ func TestShutdown(t *testing.T) { // Ensure the event handler was cleaned up require.Empty(t, informer.registrations) + + verifyNoMoreEvents() } // Show an error is thrown informer isn't started when the controller runs @@ -263,7 +278,7 @@ func TestInformerNeverStarts(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 400*time.Millisecond) defer testCancel() - _, myController, informer, _ := setupTest(testContext, nil) + _, myController, informer, _, verifyNoMoreEvents := setupTest(testContext, nil) wg := sync.WaitGroup{} @@ -283,6 +298,8 @@ func TestInformerNeverStarts(t *testing.T) { // Ensure there are no event handlers require.Empty(t, informer.registrations) + + verifyNoMoreEvents() } // Shows that if RV does not change, the reconciler does not get called @@ -290,7 +307,7 @@ func TestIgnoredUpdate(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) defer testCancel() - tracker, myController, informer, waitForReconcile := setupTest(testContext, nil) + tracker, myController, informer, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, nil) // Add object to informer initialObject := &unstructured.Unstructured{} @@ -321,11 +338,16 @@ func TestIgnoredUpdate(t *testing.T) { require.ErrorIs(t, stopReason, context.Canceled) }() - require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + // The controller is blocked because the reconcile function sends on an + // unbuffered channel. + require.False(t, myController.HasSynced()) // Wait for all enqueued reconciliations require.NoError(t, waitForReconcile(initialObject)) + // Now it is safe to wait for it to Sync + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + // Send update with the same object require.NoError(t, tracker.Update(fakeGVR, initialObject, "")) @@ -334,8 +356,9 @@ func TestIgnoredUpdate(t *testing.T) { testCancel() wg.Wait() - // Test infrastructure has logic to panic if there are any reconciled objects - // that weren't "expected" + // TODO(alexzielenski): Find a better way to test this since the + // controller drops any pending events when it shuts down. + verifyNoMoreEvents() } // Shows that an object which fails reconciliation will retry @@ -345,7 +368,7 @@ func TestReconcileRetry(t *testing.T) { calls := atomic.Uint64{} success := atomic.Bool{} - tracker, myController, _, waitForReconcile := setupTest(testContext, func(s1, s2 string, o runtime.Object) error { + tracker, myController, _, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, func(s1, s2 string, o runtime.Object) error { if calls.Add(1) > 2 { // Suddenly start liking the object @@ -390,13 +413,14 @@ func TestReconcileRetry(t *testing.T) { require.True(t, success.Load(), "last call to reconcile should return success") testCancel() wg.Wait() + + verifyNoMoreEvents() } func TestInformerList(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) - defer testCancel() - tracker, myController, _, _ := setupTest(testContext, nil) + tracker, myController, _, _, _ := setupTest(testContext, nil) wg := sync.WaitGroup{} @@ -406,7 +430,12 @@ func TestInformerList(t *testing.T) { myController.Informer().Run(testContext.Done()) }() - require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + defer func() { + testCancel() + wg.Wait() + }() + + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.Informer().HasSynced)) object1 := &unstructured.Unstructured{} object1.SetUnstructuredContent(map[string]interface{}{