Use the generic/typed workqueue throughout

This change makes us use the generic workqueue throughout the project in
order to improve type safety and readability of the code.

Kubernetes-commit: 6d0ac8c561a7ac66c21e4ee7bd1976c2ecedbf32
This commit is contained in:
Alvaro Aleman 2024-04-28 18:26:18 +02:00 committed by Kubernetes Publisher
parent e7f40e3bda
commit da88853b95
10 changed files with 56 additions and 46 deletions

View File

@ -40,7 +40,7 @@ var _ Controller[runtime.Object] = &controller[runtime.Object]{}
type controller[T runtime.Object] struct {
informer Informer[T]
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
// Returns an error if there was a transient error during reconciliation
// and the object should be tried again later.
@ -99,7 +99,10 @@ func (c *controller[T]) Run(ctx context.Context) error {
klog.Infof("starting %s", c.options.Name)
defer klog.Infof("stopping %s", c.options.Name)
c.queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), c.options.Name)
c.queue = workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: c.options.Name},
)
// Forcefully shutdown workqueue. Drop any enqueued items.
// Important to do this in a `defer` at the start of `Run`.
@ -219,7 +222,7 @@ func (c *controller[T]) runWorker() {
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
err := func(obj string) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
@ -227,19 +230,6 @@ func (c *controller[T]) runWorker() {
// put back on the workqueue and attempted again after a back-off
// period.
defer c.queue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// How did an incorrectly formatted key get in the workqueue?
// Done is sufficient. (Forget resets rate limiter for the key,
// but the key is invalid so there is no point in doing that)
return fmt.Errorf("expected string in workqueue but got %#v", obj)
}
defer c.hasProcessed.Finished(key)
if err := c.reconcile(key); err != nil {

View File

@ -61,7 +61,7 @@ type quotaEvaluator struct {
// The technique is valuable for rollup activities to avoid fanout and reduce resource contention.
// We could move this into a library if another component needed it.
// queue is indexed by namespace, so that we bundle up on a per-namespace basis
queue *workqueue.Type
queue *workqueue.Typed[string]
workLock sync.Mutex
work map[string][]*admissionWaiter
dirtyWork map[string][]*admissionWaiter
@ -122,7 +122,7 @@ func NewQuotaEvaluator(quotaAccessor QuotaAccessor, ignoredResources map[schema.
ignoredResources: ignoredResources,
registry: quotaRegistry,
queue: workqueue.NewTypedWithConfig[any](workqueue.TypedQueueConfig[any]{Name: "admission_quota_controller"}),
queue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{Name: "admission_quota_controller"}),
work: map[string][]*admissionWaiter{},
dirtyWork: map[string][]*admissionWaiter{},
inProgress: sets.String{},
@ -666,11 +666,10 @@ func (e *quotaEvaluator) completeWork(ns string) {
// returned namespace (regardless of whether the work item list is
// empty).
func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) {
uncastNS, shutdown := e.queue.Get()
ns, shutdown := e.queue.Get()
if shutdown {
return "", []*admissionWaiter{}, shutdown
}
ns := uncastNS.(string)
e.workLock.Lock()
defer e.workLock.Unlock()

View File

@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"time"
corev1 "k8s.io/api/core/v1"
@ -35,7 +36,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sync/atomic"
)
const (
@ -74,7 +74,7 @@ type RequestHeaderAuthRequestController struct {
configmapInformer cache.SharedIndexInformer
configmapInformerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
// exportedRequestHeaderBundle is a requestHeaderBundle that contains the last read, non-zero length content of the configmap
exportedRequestHeaderBundle atomic.Value
@ -104,7 +104,10 @@ func NewRequestHeaderAuthRequestController(
extraHeaderPrefixesKey: extraHeaderPrefixesKey,
allowedClientNamesKey: allowedClientNamesKey,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "RequestHeaderAuthRequestController"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "RequestHeaderAuthRequestController"},
),
}
// we construct our own informer because we need such a small subset of the information available. Just one namespace.

View File

@ -54,7 +54,7 @@ type ConfigMapCAController struct {
listeners []Listener
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
// preRunCaches are the caches to sync before starting the work of this control loop
preRunCaches []cache.InformerSynced
}
@ -94,7 +94,10 @@ func NewDynamicCAFromConfigMapController(purpose, namespace, name, key string, k
configmapLister: configmapLister,
configMapInformer: uncastConfigmapInformer,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("DynamicConfigMapCABundle-%s", purpose)),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: fmt.Sprintf("DynamicConfigMapCABundle-%s", purpose)},
),
preRunCaches: []cache.InformerSynced{uncastConfigmapInformer.HasSynced},
}

View File

@ -60,7 +60,7 @@ type DynamicFileCAContent struct {
listeners []Listener
// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
}
var _ Notifier = &DynamicFileCAContent{}
@ -82,7 +82,10 @@ func NewDynamicCAContentFromFile(purpose, filename string) (*DynamicFileCAConten
ret := &DynamicFileCAContent{
name: name,
filename: filename,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("DynamicCABundle-%s", purpose)),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: fmt.Sprintf("DynamicCABundle-%s", purpose)},
),
}
if err := ret.loadCABundle(); err != nil {
return nil, err

View File

@ -47,7 +47,7 @@ type DynamicCertKeyPairContent struct {
listeners []Listener
// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
}
var _ CertKeyContentProvider = &DynamicCertKeyPairContent{}
@ -64,7 +64,10 @@ func NewDynamicServingContentFromFiles(purpose, certFile, keyFile string) (*Dyna
name: name,
certFile: certFile,
keyFile: keyFile,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("DynamicCABundle-%s", purpose)),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: fmt.Sprintf("DynamicCABundle-%s", purpose)},
),
}
if err := ret.loadCertKeyPair(); err != nil {
return nil, err

View File

@ -56,7 +56,7 @@ type DynamicServingCertificateController struct {
currentServingTLSConfig atomic.Value
// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
eventRecorder events.EventRecorder
}
@ -76,7 +76,10 @@ func NewDynamicServingCertificateController(
servingCert: servingCert,
sniCerts: sniCerts,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DynamicServingCertificateController"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "DynamicServingCertificateController"},
),
eventRecorder: eventRecorder,
}

View File

@ -49,7 +49,7 @@ type DynamicEncryptionConfigContent struct {
lastLoadedEncryptionConfigHash string
// queue for processing changes in encryption config file.
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
// dynamicTransformers updates the transformers when encryption config file changes.
dynamicTransformers *encryptionconfig.DynamicTransformers
@ -78,8 +78,11 @@ func NewDynamicEncryptionConfiguration(
filePath: filePath,
lastLoadedEncryptionConfigHash: configContentHash,
dynamicTransformers: dynamicTransformers,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
apiServerID: apiServerID,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: name},
),
apiServerID: apiServerID,
getEncryptionConfigHash: func(_ context.Context, filepath string) (string, error) {
return encryptionconfig.GetEncryptionConfigHash(filepath)
},
@ -150,7 +153,7 @@ func (d *DynamicEncryptionConfigContent) processNextWorkItem(serverCtx context.C
return true
}
func (d *DynamicEncryptionConfigContent) processWorkItem(serverCtx context.Context, workqueueKey interface{}) {
func (d *DynamicEncryptionConfigContent) processWorkItem(serverCtx context.Context, workqueueKey string) {
var (
updatedEffectiveConfig bool
err error

View File

@ -349,7 +349,7 @@ apiserver_encryption_config_controller_automatic_reloads_total{apiserver_id_hash
}
type mockWorkQueue struct {
workqueue.RateLimitingInterface // will panic if any unexpected method is called
workqueue.TypedRateLimitingInterface[string] // will panic if any unexpected method is called
closeOnce sync.Once
addCalled chan struct{}
@ -362,33 +362,33 @@ type mockWorkQueue struct {
addRateLimitedCount atomic.Uint64
}
func (m *mockWorkQueue) Done(item interface{}) {
func (m *mockWorkQueue) Done(item string) {
m.count.Add(1)
m.wasCanceled = m.ctx.Err() != nil
m.cancel()
}
func (m *mockWorkQueue) Get() (item interface{}, shutdown bool) {
func (m *mockWorkQueue) Get() (item string, shutdown bool) {
<-m.addCalled
switch m.count.Load() {
case 0:
return nil, false
return "", false
case 1:
return nil, true
return "", true
default:
panic("too many calls to Get")
}
}
func (m *mockWorkQueue) Add(item interface{}) {
func (m *mockWorkQueue) Add(item string) {
m.closeOnce.Do(func() {
close(m.addCalled)
})
}
func (m *mockWorkQueue) ShutDown() {}
func (m *mockWorkQueue) AddRateLimited(item interface{}) { m.addRateLimitedCount.Add(1) }
func (m *mockWorkQueue) ShutDown() {}
func (m *mockWorkQueue) AddRateLimited(item string) { m.addRateLimitedCount.Add(1) }
type mockHealthChecker struct {
pluginName string

View File

@ -135,7 +135,7 @@ type configController struct {
// configQueue holds `(interface{})(0)` when the configuration
// objects need to be reprocessed.
configQueue workqueue.RateLimitingInterface
configQueue workqueue.TypedRateLimitingInterface[int]
plLister flowcontrollister.PriorityLevelConfigurationLister
plInformerSynced cache.InformerSynced
@ -292,7 +292,10 @@ func newTestableController(config TestableConfig) *configController {
klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
// Start with longish delay because conflicts will be between
// different processes, so take some time to go away.
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
cfgCtlr.configQueue = workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.NewTypedItemExponentialFailureRateLimiter[int](200*time.Millisecond, 8*time.Hour),
workqueue.TypedRateLimitingQueueConfig[int]{Name: "priority_and_fairness_config_queue"},
)
// ensure the data structure reflects the mandatory config
cfgCtlr.lockAndDigestConfigObjects(nil, nil)
fci := config.InformerFactory.Flowcontrol().V1()
@ -474,7 +477,7 @@ func (cfgCtlr *configController) processNextWorkItem() bool {
return false
}
func(obj interface{}) {
func(obj int) {
defer cfgCtlr.configQueue.Done(obj)
specificDelay, err := cfgCtlr.syncOne()
switch {