Try splitting configmap package (#1851)

This commit is contained in:
Matt Moore 2020-12-22 13:58:04 -08:00 committed by GitHub
parent e41409af6c
commit e2d6b4f845
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 96 additions and 32 deletions

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package configmap
package informer
import (
"errors"
@ -31,6 +31,7 @@ import (
"k8s.io/client-go/informers/internalinterfaces"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/configmap"
)
// NewInformedWatcherFromFactory watches a Kubernetes namespace for ConfigMap changes.
@ -38,7 +39,7 @@ func NewInformedWatcherFromFactory(sif informers.SharedInformerFactory, namespac
return &InformedWatcher{
sif: sif,
informer: sif.Core().V1().ConfigMaps(),
ManualWatcher: ManualWatcher{
ManualWatcher: configmap.ManualWatcher{
Namespace: namespace,
},
defaults: make(map[string]*corev1.ConfigMap),
@ -97,22 +98,22 @@ type InformedWatcher struct {
// Embedding this struct allows us to reuse the logic
// of registering and notifying observers. This simplifies the
// InformedWatcher to just setting up the Kubernetes informer.
ManualWatcher
configmap.ManualWatcher
}
// Asserts that InformedWatcher implements Watcher.
var _ Watcher = (*InformedWatcher)(nil)
var _ configmap.Watcher = (*InformedWatcher)(nil)
// Asserts that InformedWatcher implements DefaultingWatcher.
var _ DefaultingWatcher = (*InformedWatcher)(nil)
var _ configmap.DefaultingWatcher = (*InformedWatcher)(nil)
// WatchWithDefault implements DefaultingWatcher.
func (i *InformedWatcher) WatchWithDefault(cm corev1.ConfigMap, o ...Observer) {
func (i *InformedWatcher) WatchWithDefault(cm corev1.ConfigMap, o ...configmap.Observer) {
i.defaults[cm.Name] = &cm
i.m.Lock()
i.Lock()
started := i.started
i.m.Unlock()
i.Unlock()
if started {
// TODO make both Watch and WatchWithDefault work after the InformedWatcher has started.
// This likely entails changing this to `o(&cm)` and having Watch check started, if it has
@ -130,11 +131,12 @@ func (i *InformedWatcher) Start(stopCh <-chan struct{}) error {
// Pretend that all the defaulted ConfigMaps were just created. This is done before we start
// the informer to ensure that if a defaulted ConfigMap does exist, then the real value is
// processed after the default one.
for k := range i.observers {
i.ForEach(func(k string, _ []configmap.Observer) error {
if def, ok := i.defaults[k]; ok {
i.addConfigMapEvent(def)
}
}
return nil
})
if err := i.registerCallbackAndStartInformer(stopCh); err != nil {
return err
@ -149,8 +151,8 @@ func (i *InformedWatcher) Start(stopCh <-chan struct{}) error {
}
func (i *InformedWatcher) registerCallbackAndStartInformer(stopCh <-chan struct{}) error {
i.m.Lock()
defer i.m.Unlock()
i.Lock()
defer i.Unlock()
if i.started {
return errors.New("watcher already started")
}
@ -168,19 +170,19 @@ func (i *InformedWatcher) registerCallbackAndStartInformer(stopCh <-chan struct{
}
func (i *InformedWatcher) checkObservedResourcesExist() error {
i.m.RLock()
defer i.m.RUnlock()
i.RLock()
defer i.RUnlock()
// Check that all objects with Observers exist in our informers.
for k := range i.observers {
return i.ForEach(func(k string, _ []configmap.Observer) error {
if _, err := i.informer.Lister().ConfigMaps(i.Namespace).Get(k); err != nil {
if _, ok := i.defaults[k]; ok && k8serrors.IsNotFound(err) {
// It is defaulted, so it is OK that it doesn't exist.
continue
return nil
}
return err
}
}
return nil
return nil
})
}
func (i *InformedWatcher) addConfigMapEvent(obj interface{}) {

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package configmap
package informer
import (
"context"

View File

@ -27,7 +27,7 @@ type ManualWatcher struct {
Namespace string
// Guards observers
m sync.RWMutex
sync.RWMutex
observers map[string][]Observer
}
@ -35,8 +35,8 @@ var _ Watcher = (*ManualWatcher)(nil)
// Watch implements Watcher
func (w *ManualWatcher) Watch(name string, o ...Observer) {
w.m.Lock()
defer w.m.Unlock()
w.Lock()
defer w.Unlock()
if w.observers == nil {
w.observers = make(map[string][]Observer, 1)
@ -44,6 +44,16 @@ func (w *ManualWatcher) Watch(name string, o ...Observer) {
w.observers[name] = append(w.observers[name], o...)
}
// Watch implements Watcher
func (w *ManualWatcher) ForEach(f func(string, []Observer) error) error {
for k, v := range w.observers {
if err := f(k, v); err != nil {
return err
}
}
return nil
}
// Start implements Watcher
func (w *ManualWatcher) Start(<-chan struct{}) error {
return nil
@ -55,8 +65,8 @@ func (w *ManualWatcher) OnChange(configMap *corev1.ConfigMap) {
return
}
// Within our namespace, take the lock and see if there are any registered observers.
w.m.RLock()
defer w.m.RUnlock()
w.RLock()
defer w.RUnlock()
// Iterate over the observers and invoke their callbacks.
for _, o := range w.observers[configMap.Name] {
o(configMap)

View File

@ -17,12 +17,35 @@ limitations under the License.
package configmap
import (
"sync"
"testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type counter struct {
name string
mu sync.RWMutex
cfg []*corev1.ConfigMap
wg *sync.WaitGroup
}
func (c *counter) callback(cm *corev1.ConfigMap) {
c.mu.Lock()
defer c.mu.Unlock()
c.cfg = append(c.cfg, cm)
if c.wg != nil {
c.wg.Done()
}
}
func (c *counter) count() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.cfg)
}
func TestManualStartNOOP(t *testing.T) {
watcher := ManualWatcher{
Namespace: "default",

29
configmap/nodep_test.go Normal file
View File

@ -0,0 +1,29 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package configmap_test
import (
"testing"
"knative.dev/pkg/depcheck"
)
func TestNoDeps(t *testing.T) {
depcheck.AssertNoDependency(t, map[string][]string{
"knative.dev/pkg/configmap": depcheck.KnownHeavyDependencies,
})
}

View File

@ -38,7 +38,7 @@ import (
"k8s.io/client-go/rest"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
cminformer "knative.dev/pkg/configmap/informer"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/leaderelection"
@ -290,12 +290,12 @@ func CheckK8sClientMinimumVersionOrDie(ctx context.Context, logger *zap.SugaredL
// SetupConfigMapWatchOrDie establishes a watch of the configmaps in the system
// namespace that are labeled to be watched or dies by calling log.Fatalw.
func SetupConfigMapWatchOrDie(ctx context.Context, logger *zap.SugaredLogger) *configmap.InformedWatcher {
func SetupConfigMapWatchOrDie(ctx context.Context, logger *zap.SugaredLogger) *cminformer.InformedWatcher {
kc := kubeclient.Get(ctx)
// Create ConfigMaps watcher with optional label-based filter.
var cmLabelReqs []labels.Requirement
if cmLabel := system.ResourceLabel(); cmLabel != "" {
req, err := configmap.FilterConfigByLabelExists(cmLabel)
req, err := cminformer.FilterConfigByLabelExists(cmLabel)
if err != nil {
logger.Fatalw("Failed to generate requirement for label "+cmLabel, zap.Error(err))
}
@ -303,13 +303,13 @@ func SetupConfigMapWatchOrDie(ctx context.Context, logger *zap.SugaredLogger) *c
cmLabelReqs = append(cmLabelReqs, *req)
}
// TODO(mattmoor): This should itself take a context and be injection-based.
return configmap.NewInformedWatcher(kc, system.Namespace(), cmLabelReqs...)
return cminformer.NewInformedWatcher(kc, system.Namespace(), cmLabelReqs...)
}
// WatchLoggingConfigOrDie establishes a watch of the logging config or dies by
// calling log.Fatalw. Note, if the config does not exist, it will be defaulted
// and this method will not die.
func WatchLoggingConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher, logger *zap.SugaredLogger, atomicLevel zap.AtomicLevel, component string) {
func WatchLoggingConfigOrDie(ctx context.Context, cmw *cminformer.InformedWatcher, logger *zap.SugaredLogger, atomicLevel zap.AtomicLevel, component string) {
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, logging.ConfigMapName(),
metav1.GetOptions{}); err == nil {
cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
@ -321,7 +321,7 @@ func WatchLoggingConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher
// WatchObservabilityConfigOrDie establishes a watch of the observability config
// or dies by calling log.Fatalw. Note, if the config does not exist, it will be
// defaulted and this method will not die.
func WatchObservabilityConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher, profilingHandler *profiling.Handler, logger *zap.SugaredLogger, component string) {
func WatchObservabilityConfigOrDie(ctx context.Context, cmw *cminformer.InformedWatcher, profilingHandler *profiling.Handler, logger *zap.SugaredLogger, component string) {
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, metrics.ConfigMapName(),
metav1.GetOptions{}); err == nil {
cmw.Watch(metrics.ConfigMapName(),
@ -351,7 +351,7 @@ func SecretFetcher(ctx context.Context) metrics.SecretFetcher {
// ControllersAndWebhooksFromCtors returns a list of the controllers and a list
// of the webhooks created from the given constructors.
func ControllersAndWebhooksFromCtors(ctx context.Context,
cmw *configmap.InformedWatcher,
cmw *cminformer.InformedWatcher,
ctors ...injection.ControllerConstructor) ([]*controller.Impl, []interface{}) {
// Check whether the context has been infused with a leader elector builder.

View File

@ -42,7 +42,7 @@ func SetupStaticPublishing(logger *zap.SugaredLogger, serviceName string, cfg *c
// just ensures that if generated, they are collected appropriately. This is normally done by using
// tracing.HTTPSpanMiddleware as a middleware HTTP handler. The configuration will be dynamically
// updated when the ConfigMap is updated.
func SetupDynamicPublishing(logger *zap.SugaredLogger, configMapWatcher *configmap.InformedWatcher, serviceName, tracingConfigName string) error {
func SetupDynamicPublishing(logger *zap.SugaredLogger, configMapWatcher configmap.Watcher, serviceName, tracingConfigName string) error {
oct := NewOpenCensusTracer(WithExporter(serviceName, logger))
tracerUpdater := func(name string, value interface{}) {