Enable propagration of HasSynced

* Add tracker types and tests
* Modify ResourceEventHandler interface's OnAdd member
* Add additional ResourceEventHandlerDetailedFuncs struct
* Fix SharedInformer to let users track HasSynced for their handlers
* Fix in-tree controllers which weren't computing HasSynced correctly
* Deprecate the cache.Pop function

Kubernetes-commit: 8100efc7b3122ad119ee8fa4bbbedef3b90f2e0d
This commit is contained in:
Daniel Smith 2022-11-18 00:12:50 +00:00 committed by Kubernetes Publisher
parent b973647620
commit d053de6ca3
6 changed files with 106 additions and 101 deletions

View File

@ -36,11 +36,6 @@ type mutatingWebhookConfigurationManager struct {
configuration *atomic.Value
lister admissionregistrationlisters.MutatingWebhookConfigurationLister
hasSynced func() bool
// initialConfigurationSynced tracks if
// the existing webhook configs have been synced (honored) by the
// manager at startup-- the informer has synced and either has no items
// or has finished executing updateConfiguration() once.
initialConfigurationSynced *atomic.Bool
}
var _ generic.Source = &mutatingWebhookConfigurationManager{}
@ -48,23 +43,25 @@ var _ generic.Source = &mutatingWebhookConfigurationManager{}
func NewMutatingWebhookConfigurationManager(f informers.SharedInformerFactory) generic.Source {
informer := f.Admissionregistration().V1().MutatingWebhookConfigurations()
manager := &mutatingWebhookConfigurationManager{
configuration: &atomic.Value{},
lister: informer.Lister(),
hasSynced: informer.Informer().HasSynced,
initialConfigurationSynced: &atomic.Bool{},
configuration: &atomic.Value{},
lister: informer.Lister(),
}
// Start with an empty list
manager.configuration.Store([]webhook.WebhookAccessor{})
manager.initialConfigurationSynced.Store(false)
// On any change, rebuild the config
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO: the initial sync for this is N ^ 2, ideally we should make it N.
handler, _ := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) { manager.updateConfiguration() },
UpdateFunc: func(_, _ interface{}) { manager.updateConfiguration() },
DeleteFunc: func(_ interface{}) { manager.updateConfiguration() },
})
// Since our processing is synchronous, this is all we need to do to
// see if we have processed everything or not.
manager.hasSynced = handler.HasSynced
return manager
}
@ -73,28 +70,9 @@ func (m *mutatingWebhookConfigurationManager) Webhooks() []webhook.WebhookAccess
return m.configuration.Load().([]webhook.WebhookAccessor)
}
// HasSynced returns true when the manager is synced with existing webhookconfig
// objects at startup-- which means the informer is synced and either has no items
// or updateConfiguration() has completed.
func (m *mutatingWebhookConfigurationManager) HasSynced() bool {
if !m.hasSynced() {
return false
}
if m.initialConfigurationSynced.Load() {
// the informer has synced and configuration has been updated
return true
}
if configurations, err := m.lister.List(labels.Everything()); err == nil && len(configurations) == 0 {
// the empty list we initially stored is valid to use.
// Setting initialConfigurationSynced to true, so subsequent checks
// would be able to take the fast path on the atomic boolean in a
// cluster without any admission webhooks configured.
m.initialConfigurationSynced.Store(true)
// the informer has synced and we don't have any items
return true
}
return false
}
// HasSynced returns true if the initial set of mutating webhook configurations
// has been loaded.
func (m *mutatingWebhookConfigurationManager) HasSynced() bool { return m.hasSynced() }
func (m *mutatingWebhookConfigurationManager) updateConfiguration() {
configurations, err := m.lister.List(labels.Everything())
@ -103,7 +81,6 @@ func (m *mutatingWebhookConfigurationManager) updateConfiguration() {
return
}
m.configuration.Store(mergeMutatingWebhookConfigurations(configurations))
m.initialConfigurationSynced.Store(true)
}
func mergeMutatingWebhookConfigurations(configurations []*v1.MutatingWebhookConfiguration) []webhook.WebhookAccessor {

View File

@ -36,11 +36,6 @@ type validatingWebhookConfigurationManager struct {
configuration *atomic.Value
lister admissionregistrationlisters.ValidatingWebhookConfigurationLister
hasSynced func() bool
// initialConfigurationSynced tracks if
// the existing webhook configs have been synced (honored) by the
// manager at startup-- the informer has synced and either has no items
// or has finished executing updateConfiguration() once.
initialConfigurationSynced *atomic.Bool
}
var _ generic.Source = &validatingWebhookConfigurationManager{}
@ -48,23 +43,25 @@ var _ generic.Source = &validatingWebhookConfigurationManager{}
func NewValidatingWebhookConfigurationManager(f informers.SharedInformerFactory) generic.Source {
informer := f.Admissionregistration().V1().ValidatingWebhookConfigurations()
manager := &validatingWebhookConfigurationManager{
configuration: &atomic.Value{},
lister: informer.Lister(),
hasSynced: informer.Informer().HasSynced,
initialConfigurationSynced: &atomic.Bool{},
configuration: &atomic.Value{},
lister: informer.Lister(),
}
// Start with an empty list
manager.configuration.Store([]webhook.WebhookAccessor{})
manager.initialConfigurationSynced.Store(false)
// On any change, rebuild the config
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO: the initial sync for this is N ^ 2, ideally we should make it N.
handle, _ := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) { manager.updateConfiguration() },
UpdateFunc: func(_, _ interface{}) { manager.updateConfiguration() },
DeleteFunc: func(_ interface{}) { manager.updateConfiguration() },
})
// Since our processing is synchronous, this is all we need to do to
// see if we have processed everything or not.
manager.hasSynced = handle.HasSynced
return manager
}
@ -73,29 +70,9 @@ func (v *validatingWebhookConfigurationManager) Webhooks() []webhook.WebhookAcce
return v.configuration.Load().([]webhook.WebhookAccessor)
}
// HasSynced returns true when the manager is synced with existing webhookconfig
// objects at startup-- which means the informer is synced and either has no items
// or updateConfiguration() has completed.
func (v *validatingWebhookConfigurationManager) HasSynced() bool {
if !v.hasSynced() {
return false
}
if v.initialConfigurationSynced.Load() {
// the informer has synced and configuration has been updated
return true
}
if configurations, err := v.lister.List(labels.Everything()); err == nil && len(configurations) == 0 {
// the empty list we initially stored is valid to use.
// Setting initialConfigurationSynced to true, so subsequent checks
// would be able to take the fast path on the atomic boolean in a
// cluster without any admission webhooks configured.
v.initialConfigurationSynced.Store(true)
// the informer has synced and we don't have any items
return true
}
return false
}
// HasSynced returns true if the initial set of mutating webhook configurations
// has been loaded.
func (v *validatingWebhookConfigurationManager) HasSynced() bool { return v.hasSynced() }
func (v *validatingWebhookConfigurationManager) updateConfiguration() {
configurations, err := v.lister.List(labels.Everything())
@ -104,7 +81,6 @@ func (v *validatingWebhookConfigurationManager) updateConfiguration() {
return
}
v.configuration.Store(mergeValidatingWebhookConfigurations(configurations))
v.initialConfigurationSynced.Store(true)
}
func mergeValidatingWebhookConfigurations(configurations []*v1.ValidatingWebhookConfiguration) []webhook.WebhookAccessor {

View File

@ -39,6 +39,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
var _ CELPolicyEvaluator = &celAdmissionController{}
@ -165,6 +166,7 @@ func NewAdmissionController(
}
func (c *celAdmissionController) Run(stopCh <-chan struct{}) {
// TODO: Doesn't this comparison need a lock?
if c.runningContext != nil {
return
}
@ -302,9 +304,10 @@ func (c *celAdmissionController) Validate(
// If the param informer for this admission policy has not yet
// had time to perform an initial listing, don't attempt to use
// it.
//!TOOD(alexzielenski): add a wait for a very short amount of
// time for the cache to sync
if !paramInfo.controller.HasSynced() {
//!TODO(alexzielenski): Add a shorter timeout
// than "forever" to this wait.
if !cache.WaitForCacheSync(c.runningContext.Done(), paramInfo.controller.HasSynced) {
addConfigError(fmt.Errorf("paramKind kind `%v` not yet synced to use for admission",
paramKind.String()), definition, binding)
continue

View File

@ -128,7 +128,7 @@ func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name strin
c.dynamicClient,
paramsGVR.Resource,
corev1.NamespaceAll,
30*time.Second,
30*time.Second, // TODO: do we really need to ever resync these?
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
nil,
)

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
kerrors "k8s.io/apimachinery/pkg/api/errors"
@ -30,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/cache/synctrack"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
@ -45,6 +47,11 @@ type controller[T runtime.Object] struct {
reconciler func(namespace, name string, newObj T) error
options ControllerOptions
// must hold a func() bool or nil
notificationsDelivered atomic.Value
hasProcessed synctrack.AsyncTracker[string]
}
type ControllerOptions struct {
@ -69,12 +76,20 @@ func NewController[T runtime.Object](
options.Name = fmt.Sprintf("%T-controller", *new(T))
}
return &controller[T]{
c := &controller[T]{
options: options,
informer: informer,
reconciler: reconciler,
queue: nil,
}
c.hasProcessed.UpstreamHasSynced = func() bool {
f := c.notificationsDelivered.Load()
if f == nil {
return false
}
return f.(func() bool)()
}
return c
}
// Runs the controller and returns an error explaining why running was stopped.
@ -92,20 +107,22 @@ func (c *controller[T]) Run(ctx context.Context) error {
// would never shut down the workqueue
defer c.queue.ShutDown()
enqueue := func(obj interface{}) {
enqueue := func(obj interface{}, isInInitialList bool) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
if isInInitialList {
c.hasProcessed.Start(key)
}
c.queue.Add(key)
}
registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
enqueue(obj)
},
registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{
AddFunc: enqueue,
UpdateFunc: func(oldObj, newObj interface{}) {
oldMeta, err1 := meta.Accessor(oldObj)
newMeta, err2 := meta.Accessor(newObj)
@ -126,13 +143,14 @@ func (c *controller[T]) Run(ctx context.Context) error {
return
}
enqueue(newObj)
enqueue(newObj, false)
},
DeleteFunc: func(obj interface{}) {
// Enqueue
enqueue(obj)
enqueue(obj, false)
},
})
c.notificationsDelivered.Store(registration.HasSynced)
// Error might be raised if informer was started and stopped already
if err != nil {
@ -142,6 +160,7 @@ func (c *controller[T]) Run(ctx context.Context) error {
// Make sure event handler is removed from informer in case return early from
// an error
defer func() {
c.notificationsDelivered.Store(func() bool { return false })
// Remove event handler and Handle Error here. Error should only be raised
// for improper usage of event handler API.
if err := c.informer.RemoveEventHandler(registration); err != nil {
@ -188,7 +207,7 @@ func (c *controller[T]) Run(ctx context.Context) error {
}
func (c *controller[T]) HasSynced() bool {
return c.informer.HasSynced()
return c.hasProcessed.HasSynced()
}
func (c *controller[T]) runWorker() {
@ -220,6 +239,7 @@ func (c *controller[T]) runWorker() {
// but the key is invalid so there is no point in doing that)
return fmt.Errorf("expected string in workqueue but got %#v", obj)
}
defer c.hasProcessed.Finished(key)
if err := c.reconcile(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.

View File

@ -106,6 +106,7 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim
controller generic.Controller[*unstructured.Unstructured],
informer *testInformer,
waitForReconcile func(runtime.Object) error,
verifyNoMoreEvents func() bool,
) {
tracker = clienttesting.NewObjectTracker(scheme, codecs.UniversalDecoder())
reconciledObjects := make(chan runtime.Object)
@ -127,7 +128,11 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim
if customReconciler != nil {
err = customReconciler(namespace, name, newObj)
}
reconciledObjects <- copied
select {
case reconciledObjects <- copied:
case <-ctx.Done():
panic("timed out attempting to deliver reconcile event")
}
return err
}
@ -149,23 +154,24 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim
generic.ControllerOptions{},
)
go func() {
<-ctx.Done()
close(reconciledObjects)
verifyNoMoreEvents = func() bool {
close(reconciledObjects) // closing means that a future attempt to send will crash
for leftover := range reconciledObjects {
panic(fmt.Errorf("leftover object which was not anticipated by test: %v", leftover))
}
}()
// TODO(alexzielenski): this effectively doesn't test anything since the
// controller drops any pending events when it shuts down.
return true
}
return tracker, myController, informer, waitForReconcile
return tracker, myController, informer, waitForReconcile, verifyNoMoreEvents
}
func TestReconcile(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
tracker, myController, informer, waitForReconcile := setupTest(testContext, nil)
tracker, myController, informer, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, nil)
// Add object to informer
initialObject := &unstructured.Unstructured{}
@ -196,11 +202,16 @@ func TestReconcile(t *testing.T) {
require.ErrorIs(t, stopReason, context.Canceled)
}()
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// The controller is blocked because the reconcile function sends on an
// unbuffered channel.
require.False(t, myController.HasSynced())
// Wait for all enqueued reconciliations
require.NoError(t, waitForReconcile(initialObject))
// Now it is safe to wait for it to Sync
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// Updated object
updatedObject := &unstructured.Unstructured{}
updatedObject.SetUnstructuredContent(map[string]interface{}{
@ -220,13 +231,15 @@ func TestReconcile(t *testing.T) {
testCancel()
wg.Wait()
verifyNoMoreEvents()
}
func TestShutdown(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
_, myController, informer, _ := setupTest(testContext, nil)
_, myController, informer, _, verifyNoMoreEvents := setupTest(testContext, nil)
wg := sync.WaitGroup{}
@ -256,6 +269,8 @@ func TestShutdown(t *testing.T) {
// Ensure the event handler was cleaned up
require.Empty(t, informer.registrations)
verifyNoMoreEvents()
}
// Show an error is thrown informer isn't started when the controller runs
@ -263,7 +278,7 @@ func TestInformerNeverStarts(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 400*time.Millisecond)
defer testCancel()
_, myController, informer, _ := setupTest(testContext, nil)
_, myController, informer, _, verifyNoMoreEvents := setupTest(testContext, nil)
wg := sync.WaitGroup{}
@ -283,6 +298,8 @@ func TestInformerNeverStarts(t *testing.T) {
// Ensure there are no event handlers
require.Empty(t, informer.registrations)
verifyNoMoreEvents()
}
// Shows that if RV does not change, the reconciler does not get called
@ -290,7 +307,7 @@ func TestIgnoredUpdate(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
tracker, myController, informer, waitForReconcile := setupTest(testContext, nil)
tracker, myController, informer, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, nil)
// Add object to informer
initialObject := &unstructured.Unstructured{}
@ -321,11 +338,16 @@ func TestIgnoredUpdate(t *testing.T) {
require.ErrorIs(t, stopReason, context.Canceled)
}()
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// The controller is blocked because the reconcile function sends on an
// unbuffered channel.
require.False(t, myController.HasSynced())
// Wait for all enqueued reconciliations
require.NoError(t, waitForReconcile(initialObject))
// Now it is safe to wait for it to Sync
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// Send update with the same object
require.NoError(t, tracker.Update(fakeGVR, initialObject, ""))
@ -334,8 +356,9 @@ func TestIgnoredUpdate(t *testing.T) {
testCancel()
wg.Wait()
// Test infrastructure has logic to panic if there are any reconciled objects
// that weren't "expected"
// TODO(alexzielenski): Find a better way to test this since the
// controller drops any pending events when it shuts down.
verifyNoMoreEvents()
}
// Shows that an object which fails reconciliation will retry
@ -345,7 +368,7 @@ func TestReconcileRetry(t *testing.T) {
calls := atomic.Uint64{}
success := atomic.Bool{}
tracker, myController, _, waitForReconcile := setupTest(testContext, func(s1, s2 string, o runtime.Object) error {
tracker, myController, _, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, func(s1, s2 string, o runtime.Object) error {
if calls.Add(1) > 2 {
// Suddenly start liking the object
@ -390,13 +413,14 @@ func TestReconcileRetry(t *testing.T) {
require.True(t, success.Load(), "last call to reconcile should return success")
testCancel()
wg.Wait()
verifyNoMoreEvents()
}
func TestInformerList(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
tracker, myController, _, _ := setupTest(testContext, nil)
tracker, myController, _, _, _ := setupTest(testContext, nil)
wg := sync.WaitGroup{}
@ -406,7 +430,12 @@ func TestInformerList(t *testing.T) {
myController.Informer().Run(testContext.Done())
}()
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
defer func() {
testCancel()
wg.Wait()
}()
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.Informer().HasSynced))
object1 := &unstructured.Unstructured{}
object1.SetUnstructuredContent(map[string]interface{}{