diff --git a/configmap/informed_watcher.go b/configmap/informer/informed_watcher.go similarity index 92% rename from configmap/informed_watcher.go rename to configmap/informer/informed_watcher.go index 16447b634..2231ee52f 100644 --- a/configmap/informed_watcher.go +++ b/configmap/informer/informed_watcher.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package configmap +package informer import ( "errors" @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/informers/internalinterfaces" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "knative.dev/pkg/configmap" ) // NewInformedWatcherFromFactory watches a Kubernetes namespace for ConfigMap changes. @@ -38,7 +39,7 @@ func NewInformedWatcherFromFactory(sif informers.SharedInformerFactory, namespac return &InformedWatcher{ sif: sif, informer: sif.Core().V1().ConfigMaps(), - ManualWatcher: ManualWatcher{ + ManualWatcher: configmap.ManualWatcher{ Namespace: namespace, }, defaults: make(map[string]*corev1.ConfigMap), @@ -97,22 +98,22 @@ type InformedWatcher struct { // Embedding this struct allows us to reuse the logic // of registering and notifying observers. This simplifies the // InformedWatcher to just setting up the Kubernetes informer. - ManualWatcher + configmap.ManualWatcher } // Asserts that InformedWatcher implements Watcher. -var _ Watcher = (*InformedWatcher)(nil) +var _ configmap.Watcher = (*InformedWatcher)(nil) // Asserts that InformedWatcher implements DefaultingWatcher. -var _ DefaultingWatcher = (*InformedWatcher)(nil) +var _ configmap.DefaultingWatcher = (*InformedWatcher)(nil) // WatchWithDefault implements DefaultingWatcher. -func (i *InformedWatcher) WatchWithDefault(cm corev1.ConfigMap, o ...Observer) { +func (i *InformedWatcher) WatchWithDefault(cm corev1.ConfigMap, o ...configmap.Observer) { i.defaults[cm.Name] = &cm - i.m.Lock() + i.Lock() started := i.started - i.m.Unlock() + i.Unlock() if started { // TODO make both Watch and WatchWithDefault work after the InformedWatcher has started. // This likely entails changing this to `o(&cm)` and having Watch check started, if it has @@ -130,11 +131,12 @@ func (i *InformedWatcher) Start(stopCh <-chan struct{}) error { // Pretend that all the defaulted ConfigMaps were just created. This is done before we start // the informer to ensure that if a defaulted ConfigMap does exist, then the real value is // processed after the default one. - for k := range i.observers { + i.ForEach(func(k string, _ []configmap.Observer) error { if def, ok := i.defaults[k]; ok { i.addConfigMapEvent(def) } - } + return nil + }) if err := i.registerCallbackAndStartInformer(stopCh); err != nil { return err @@ -149,8 +151,8 @@ func (i *InformedWatcher) Start(stopCh <-chan struct{}) error { } func (i *InformedWatcher) registerCallbackAndStartInformer(stopCh <-chan struct{}) error { - i.m.Lock() - defer i.m.Unlock() + i.Lock() + defer i.Unlock() if i.started { return errors.New("watcher already started") } @@ -168,19 +170,19 @@ func (i *InformedWatcher) registerCallbackAndStartInformer(stopCh <-chan struct{ } func (i *InformedWatcher) checkObservedResourcesExist() error { - i.m.RLock() - defer i.m.RUnlock() + i.RLock() + defer i.RUnlock() // Check that all objects with Observers exist in our informers. - for k := range i.observers { + return i.ForEach(func(k string, _ []configmap.Observer) error { if _, err := i.informer.Lister().ConfigMaps(i.Namespace).Get(k); err != nil { if _, ok := i.defaults[k]; ok && k8serrors.IsNotFound(err) { // It is defaulted, so it is OK that it doesn't exist. - continue + return nil } return err } - } - return nil + return nil + }) } func (i *InformedWatcher) addConfigMapEvent(obj interface{}) { diff --git a/configmap/informed_watcher_test.go b/configmap/informer/informed_watcher_test.go similarity index 99% rename from configmap/informed_watcher_test.go rename to configmap/informer/informed_watcher_test.go index fa2057eef..c25194122 100644 --- a/configmap/informed_watcher_test.go +++ b/configmap/informer/informed_watcher_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package configmap +package informer import ( "context" diff --git a/configmap/manual_watcher.go b/configmap/manual_watcher.go index b1eb27633..bd28e7c23 100644 --- a/configmap/manual_watcher.go +++ b/configmap/manual_watcher.go @@ -27,7 +27,7 @@ type ManualWatcher struct { Namespace string // Guards observers - m sync.RWMutex + sync.RWMutex observers map[string][]Observer } @@ -35,8 +35,8 @@ var _ Watcher = (*ManualWatcher)(nil) // Watch implements Watcher func (w *ManualWatcher) Watch(name string, o ...Observer) { - w.m.Lock() - defer w.m.Unlock() + w.Lock() + defer w.Unlock() if w.observers == nil { w.observers = make(map[string][]Observer, 1) @@ -44,6 +44,16 @@ func (w *ManualWatcher) Watch(name string, o ...Observer) { w.observers[name] = append(w.observers[name], o...) } +// Watch implements Watcher +func (w *ManualWatcher) ForEach(f func(string, []Observer) error) error { + for k, v := range w.observers { + if err := f(k, v); err != nil { + return err + } + } + return nil +} + // Start implements Watcher func (w *ManualWatcher) Start(<-chan struct{}) error { return nil @@ -55,8 +65,8 @@ func (w *ManualWatcher) OnChange(configMap *corev1.ConfigMap) { return } // Within our namespace, take the lock and see if there are any registered observers. - w.m.RLock() - defer w.m.RUnlock() + w.RLock() + defer w.RUnlock() // Iterate over the observers and invoke their callbacks. for _, o := range w.observers[configMap.Name] { o(configMap) diff --git a/configmap/manual_watcher_test.go b/configmap/manual_watcher_test.go index 5da37d3b5..be7bdc9bb 100644 --- a/configmap/manual_watcher_test.go +++ b/configmap/manual_watcher_test.go @@ -17,12 +17,35 @@ limitations under the License. package configmap import ( + "sync" "testing" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +type counter struct { + name string + mu sync.RWMutex + cfg []*corev1.ConfigMap + wg *sync.WaitGroup +} + +func (c *counter) callback(cm *corev1.ConfigMap) { + c.mu.Lock() + defer c.mu.Unlock() + c.cfg = append(c.cfg, cm) + if c.wg != nil { + c.wg.Done() + } +} + +func (c *counter) count() int { + c.mu.RLock() + defer c.mu.RUnlock() + return len(c.cfg) +} + func TestManualStartNOOP(t *testing.T) { watcher := ManualWatcher{ Namespace: "default", diff --git a/configmap/nodep_test.go b/configmap/nodep_test.go new file mode 100644 index 000000000..fb8813e87 --- /dev/null +++ b/configmap/nodep_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap_test + +import ( + "testing" + + "knative.dev/pkg/depcheck" +) + +func TestNoDeps(t *testing.T) { + depcheck.AssertNoDependency(t, map[string][]string{ + "knative.dev/pkg/configmap": depcheck.KnownHeavyDependencies, + }) +} diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 72cab9496..b2067bb1b 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -38,7 +38,7 @@ import ( "k8s.io/client-go/rest" kubeclient "knative.dev/pkg/client/injection/kube/client" - "knative.dev/pkg/configmap" + cminformer "knative.dev/pkg/configmap/informer" "knative.dev/pkg/controller" "knative.dev/pkg/injection" "knative.dev/pkg/leaderelection" @@ -290,12 +290,12 @@ func CheckK8sClientMinimumVersionOrDie(ctx context.Context, logger *zap.SugaredL // SetupConfigMapWatchOrDie establishes a watch of the configmaps in the system // namespace that are labeled to be watched or dies by calling log.Fatalw. -func SetupConfigMapWatchOrDie(ctx context.Context, logger *zap.SugaredLogger) *configmap.InformedWatcher { +func SetupConfigMapWatchOrDie(ctx context.Context, logger *zap.SugaredLogger) *cminformer.InformedWatcher { kc := kubeclient.Get(ctx) // Create ConfigMaps watcher with optional label-based filter. var cmLabelReqs []labels.Requirement if cmLabel := system.ResourceLabel(); cmLabel != "" { - req, err := configmap.FilterConfigByLabelExists(cmLabel) + req, err := cminformer.FilterConfigByLabelExists(cmLabel) if err != nil { logger.Fatalw("Failed to generate requirement for label "+cmLabel, zap.Error(err)) } @@ -303,13 +303,13 @@ func SetupConfigMapWatchOrDie(ctx context.Context, logger *zap.SugaredLogger) *c cmLabelReqs = append(cmLabelReqs, *req) } // TODO(mattmoor): This should itself take a context and be injection-based. - return configmap.NewInformedWatcher(kc, system.Namespace(), cmLabelReqs...) + return cminformer.NewInformedWatcher(kc, system.Namespace(), cmLabelReqs...) } // WatchLoggingConfigOrDie establishes a watch of the logging config or dies by // calling log.Fatalw. Note, if the config does not exist, it will be defaulted // and this method will not die. -func WatchLoggingConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher, logger *zap.SugaredLogger, atomicLevel zap.AtomicLevel, component string) { +func WatchLoggingConfigOrDie(ctx context.Context, cmw *cminformer.InformedWatcher, logger *zap.SugaredLogger, atomicLevel zap.AtomicLevel, component string) { if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, logging.ConfigMapName(), metav1.GetOptions{}); err == nil { cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component)) @@ -321,7 +321,7 @@ func WatchLoggingConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher // WatchObservabilityConfigOrDie establishes a watch of the observability config // or dies by calling log.Fatalw. Note, if the config does not exist, it will be // defaulted and this method will not die. -func WatchObservabilityConfigOrDie(ctx context.Context, cmw *configmap.InformedWatcher, profilingHandler *profiling.Handler, logger *zap.SugaredLogger, component string) { +func WatchObservabilityConfigOrDie(ctx context.Context, cmw *cminformer.InformedWatcher, profilingHandler *profiling.Handler, logger *zap.SugaredLogger, component string) { if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, metrics.ConfigMapName(), metav1.GetOptions{}); err == nil { cmw.Watch(metrics.ConfigMapName(), @@ -351,7 +351,7 @@ func SecretFetcher(ctx context.Context) metrics.SecretFetcher { // ControllersAndWebhooksFromCtors returns a list of the controllers and a list // of the webhooks created from the given constructors. func ControllersAndWebhooksFromCtors(ctx context.Context, - cmw *configmap.InformedWatcher, + cmw *cminformer.InformedWatcher, ctors ...injection.ControllerConstructor) ([]*controller.Impl, []interface{}) { // Check whether the context has been infused with a leader elector builder. diff --git a/tracing/setup.go b/tracing/setup.go index 503e424e3..c5de76489 100644 --- a/tracing/setup.go +++ b/tracing/setup.go @@ -42,7 +42,7 @@ func SetupStaticPublishing(logger *zap.SugaredLogger, serviceName string, cfg *c // just ensures that if generated, they are collected appropriately. This is normally done by using // tracing.HTTPSpanMiddleware as a middleware HTTP handler. The configuration will be dynamically // updated when the ConfigMap is updated. -func SetupDynamicPublishing(logger *zap.SugaredLogger, configMapWatcher *configmap.InformedWatcher, serviceName, tracingConfigName string) error { +func SetupDynamicPublishing(logger *zap.SugaredLogger, configMapWatcher configmap.Watcher, serviceName, tracingConfigName string) error { oct := NewOpenCensusTracer(WithExporter(serviceName, logger)) tracerUpdater := func(name string, value interface{}) {