From 8fc80deb200e7853795e6baf4029f406ca9534b8 Mon Sep 17 00:00:00 2001 From: Dave Protasowski Date: Wed, 12 Sep 2018 10:42:16 -0400 Subject: [PATCH] Few changes to the configmap package (#59) * Few changes to the configmap package - New* methods return concrete types - defaultImpl is now InformedWatcher - fixedImpl is now StaticWatcher - Included a new ManualWatcher that mimics the InformedWatcher in behaviour. ie. allows updates - making this a true 'mock' * Include note about embedding the ManualWatcher in the InformedWatcher * Update static_watcher.go Fix some godoc --- configmap/default.go | 128 ------------------ configmap/informed_watcher.go | 122 +++++++++++++++++ ...fault_test.go => informed_watcher_test.go} | 24 ++-- configmap/manual_watcher.go | 71 ++++++++++ configmap/manual_watcher_test.go | 115 ++++++++++++++++ configmap/{fixed.go => static_watcher.go} | 58 ++++---- .../{fixed_test.go => static_watcher_test.go} | 22 +-- configmap/watcher.go | 16 --- 8 files changed, 368 insertions(+), 188 deletions(-) delete mode 100644 configmap/default.go create mode 100644 configmap/informed_watcher.go rename configmap/{default_test.go => informed_watcher_test.go} (92%) create mode 100644 configmap/manual_watcher.go create mode 100644 configmap/manual_watcher_test.go rename configmap/{fixed.go => static_watcher.go} (53%) rename configmap/{fixed_test.go => static_watcher_test.go} (80%) diff --git a/configmap/default.go b/configmap/default.go deleted file mode 100644 index 0f30cf6d9..000000000 --- a/configmap/default.go +++ /dev/null @@ -1,128 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package configmap - -import ( - "errors" - "sync" - - corev1 "k8s.io/api/core/v1" - informers "k8s.io/client-go/informers" - corev1informers "k8s.io/client-go/informers/core/v1" - "k8s.io/client-go/tools/cache" -) - -// defaultImpl provides a default informer-based implementation of Watcher. -type defaultImpl struct { - sif informers.SharedInformerFactory - informer corev1informers.ConfigMapInformer - ns string - - // Guards mutations to defaultImpl fields - m sync.Mutex - - observers map[string][]Observer - started bool -} - -// Asserts that defaultImpl implements Watcher. -var _ Watcher = (*defaultImpl)(nil) - -// Watch implements Watcher -func (di *defaultImpl) Watch(name string, w Observer) { - di.m.Lock() - defer di.m.Unlock() - - if di.observers == nil { - di.observers = make(map[string][]Observer) - } - - wl, _ := di.observers[name] - di.observers[name] = append(wl, w) -} - -// Start implements Watcher -func (di *defaultImpl) Start(stopCh <-chan struct{}) error { - if err := di.registerCallbackAndStartInformer(stopCh); err != nil { - return err - } - - // Wait until it has been synced (WITHOUT holding the mutex, so callbacks happen) - if ok := cache.WaitForCacheSync(stopCh, di.informer.Informer().HasSynced); !ok { - return errors.New("Error waiting for ConfigMap informer to sync.") - } - - return di.checkObservedResourcesExist() -} - -func (di *defaultImpl) registerCallbackAndStartInformer(stopCh <-chan struct{}) error { - di.m.Lock() - defer di.m.Unlock() - if di.started { - return errors.New("Watcher already started!") - } - di.started = true - - di.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: di.addConfigMapEvent, - UpdateFunc: di.updateConfigMapEvent, - }) - - // Start the shared informer factory (non-blocking) - di.sif.Start(stopCh) - return nil -} - -func (di *defaultImpl) checkObservedResourcesExist() error { - di.m.Lock() - defer di.m.Unlock() - // Check that all objects with Observers exist in our informers. - for k := range di.observers { - _, err := di.informer.Lister().ConfigMaps(di.ns).Get(k) - if err != nil { - return err - } - } - return nil -} - -func (di *defaultImpl) addConfigMapEvent(obj interface{}) { - // If the ConfigMap update is outside of our namespace, then quickly disregard it. - configMap := obj.(*corev1.ConfigMap) - if configMap.Namespace != di.ns { - // Outside of our namespace. - // This shouldn't happen due to our filtered informer. - return - } - - // Within our namespace, take the lock and see if there are any registered observers. - di.m.Lock() - defer di.m.Unlock() - wl, ok := di.observers[configMap.Name] - if !ok { - return // No observers. - } - - // Iterate over the observers and invoke their callbacks. - for _, w := range wl { - w(configMap) - } -} - -func (di *defaultImpl) updateConfigMapEvent(old, new interface{}) { - di.addConfigMapEvent(new) -} diff --git a/configmap/informed_watcher.go b/configmap/informed_watcher.go new file mode 100644 index 000000000..9da18f777 --- /dev/null +++ b/configmap/informed_watcher.go @@ -0,0 +1,122 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +istributed under the License is istributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "errors" + "time" + + corev1 "k8s.io/api/core/v1" + informers "k8s.io/client-go/informers" + corev1informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +// NewDefaultWatcher creates a new default configmap.Watcher instance. +// +// Deprecated: Use NewInformedWatcher +func NewDefaultWatcher(kc kubernetes.Interface, namespace string) *InformedWatcher { + return NewInformedWatcher(kc, namespace) +} + +// NewInformedWatcher watchers a Kubernetes namespace for configmap changs +func NewInformedWatcher(kc kubernetes.Interface, namespace string) *InformedWatcher { + sif := informers.NewSharedInformerFactoryWithOptions( + kc, + 5*time.Minute, + informers.WithNamespace(namespace), + ) + + return &InformedWatcher{ + sif: sif, + informer: sif.Core().V1().ConfigMaps(), + ManualWatcher: ManualWatcher{ + Namespace: namespace, + }, + } +} + +// InformedWatcher provides an informer-based implementation of Watcher. +type InformedWatcher struct { + sif informers.SharedInformerFactory + informer corev1informers.ConfigMapInformer + started bool + + // Embedding this struct allows us to reuse the logic + // of registering and notifying observers. This simplifies the + // InformedWatcher to just setting up the Kubernetes informer + ManualWatcher +} + +// Asserts that InformedWatcher implements Watcher. +var _ Watcher = (*InformedWatcher)(nil) + +// Start implements Watcher +func (i *InformedWatcher) Start(stopCh <-chan struct{}) error { + if err := i.registerCallbackAndStartInformer(stopCh); err != nil { + return err + } + + // Wait until it has been synced (WITHOUT holing the mutex, so callbacks happen) + if ok := cache.WaitForCacheSync(stopCh, i.informer.Informer().HasSynced); !ok { + return errors.New("Error waiting for ConfigMap informer to sync.") + } + + return i.checkObservedResourcesExist() +} + +func (i *InformedWatcher) registerCallbackAndStartInformer(stopCh <-chan struct{}) error { + i.m.Lock() + defer i.m.Unlock() + if i.started { + return errors.New("Watcher already started!") + } + i.started = true + + i.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: i.addConfigMapEvent, + UpdateFunc: i.updateConfigMapEvent, + }) + + // Start the shared informer factory (non-blocking) + i.sif.Start(stopCh) + return nil +} + +func (i *InformedWatcher) checkObservedResourcesExist() error { + i.m.Lock() + defer i.m.Unlock() + // Check that all objects with Observers exist in our informers. + for k := range i.observers { + _, err := i.informer.Lister().ConfigMaps(i.Namespace).Get(k) + if err != nil { + return err + } + } + return nil +} + +func (i *InformedWatcher) addConfigMapEvent(obj interface{}) { + configMap := obj.(*corev1.ConfigMap) + i.OnChange(configMap) +} + +func (i *InformedWatcher) updateConfigMapEvent(old, new interface{}) { + configMap := new.(*corev1.ConfigMap) + i.OnChange(configMap) +} diff --git a/configmap/default_test.go b/configmap/informed_watcher_test.go similarity index 92% rename from configmap/default_test.go rename to configmap/informed_watcher_test.go index c69997da7..b8cff0eed 100644 --- a/configmap/default_test.go +++ b/configmap/informed_watcher_test.go @@ -33,27 +33,27 @@ func (c *counter) callback(*corev1.ConfigMap) { c.count++ } -func TestWatch(t *testing.T) { +func TestInformedWatcher(t *testing.T) { fooCM := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "knative-system", + Namespace: "default", Name: "foo", }, } barCM := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "knative-system", + Namespace: "default", Name: "bar", }, } kc := fakekubeclientset.NewSimpleClientset(fooCM, barCM) - cm := NewDefaultWatcher(kc, "knative-system").(*defaultImpl) + cm := NewInformedWatcher(kc, "default") foo1 := &counter{name: "foo1"} - cm.Watch("foo", foo1.callback) foo2 := &counter{name: "foo2"} - cm.Watch("foo", foo2.callback) bar := &counter{name: "bar"} + cm.Watch("foo", foo1.callback) + cm.Watch("foo", foo2.callback) cm.Watch("bar", bar.callback) stopCh := make(chan struct{}) @@ -79,6 +79,7 @@ func TestWatch(t *testing.T) { t.Errorf("%v.count = %v, want %v", obj, got, want) } } + for _, obj := range []*counter{bar} { if got, want := obj.count, 1; got != want { t.Errorf("%v.count = %v, want %v", obj, got, want) @@ -109,9 +110,10 @@ func TestWatch(t *testing.T) { } // After an unwatched ConfigMap update, no change. + cm.updateConfigMapEvent(nil, &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "knative-system", + Namespace: "default", Name: "not-watched", }, }) @@ -124,7 +126,7 @@ func TestWatch(t *testing.T) { // After a change in an unrelated namespace, no change. cm.updateConfigMapEvent(nil, &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "different-system", + Namespace: "not-default", Name: "foo", }, }) @@ -137,7 +139,7 @@ func TestWatch(t *testing.T) { func TestWatchMissingFailsOnStart(t *testing.T) { kc := fakekubeclientset.NewSimpleClientset() - cm := NewDefaultWatcher(kc, "knative-system").(*defaultImpl) + cm := NewInformedWatcher(kc, "default") foo1 := &counter{name: "foo1"} cm.Watch("foo", foo1.callback) @@ -155,12 +157,12 @@ func TestWatchMissingFailsOnStart(t *testing.T) { func TestErrorOnMultipleStarts(t *testing.T) { fooCM := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "knative-system", + Namespace: "default", Name: "foo", }, } kc := fakekubeclientset.NewSimpleClientset(fooCM) - cm := NewDefaultWatcher(kc, "knative-system").(*defaultImpl) + cm := NewInformedWatcher(kc, "default") foo1 := &counter{name: "foo1"} cm.Watch("foo", foo1.callback) diff --git a/configmap/manual_watcher.go b/configmap/manual_watcher.go new file mode 100644 index 000000000..b14c5ac7b --- /dev/null +++ b/configmap/manual_watcher.go @@ -0,0 +1,71 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "sync" + + corev1 "k8s.io/api/core/v1" +) + +// ManualWatcher will notify Observers when a ConfigMap is manually reported as changed +type ManualWatcher struct { + Namespace string + + // Guards mutations to defaultImpl fields + m sync.Mutex + + started bool + observers map[string][]Observer +} + +var _ Watcher = (*ManualWatcher)(nil) + +// Watch implements Watcher +func (w *ManualWatcher) Watch(name string, o Observer) { + w.m.Lock() + defer w.m.Unlock() + + if w.observers == nil { + w.observers = make(map[string][]Observer) + } + + wl, _ := w.observers[name] + w.observers[name] = append(wl, o) +} + +func (w *ManualWatcher) Start(<-chan struct{}) error { + return nil +} + +func (w *ManualWatcher) OnChange(configMap *corev1.ConfigMap) { + if configMap.Namespace != w.Namespace { + return + } + // Within our namespace, take the lock and see if there are any registered observers. + w.m.Lock() + defer w.m.Unlock() + observers, ok := w.observers[configMap.Name] + if !ok { + return // No observers. + } + + // Iterate over the observers and invoke their callbacks. + for _, o := range observers { + o(configMap) + } +} diff --git a/configmap/manual_watcher_test.go b/configmap/manual_watcher_test.go new file mode 100644 index 000000000..b2e89a9da --- /dev/null +++ b/configmap/manual_watcher_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestManualStartNOOP(t *testing.T) { + watcher := ManualWatcher{ + Namespace: "default", + } + if err := watcher.Start(nil); err != nil { + t.Errorf("Unexpected error watcher.Start() = %v", err) + } +} + +func TestCallbackInvoked(t *testing.T) { + watcher := ManualWatcher{ + Namespace: "default", + } + + observer := counter{} + + watcher.Watch("foo", observer.callback) + watcher.OnChange(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo", + }, + }) + + if observer.count == 0 { + t.Errorf("Expected callback to be invoked - got invocations %v", observer.count) + } +} + +func TestDifferentNamespace(t *testing.T) { + watcher := ManualWatcher{ + Namespace: "default", + } + + observer := counter{} + + watcher.Watch("foo", observer.callback) + watcher.OnChange(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "not-default", + Name: "foo", + }, + }) + + if observer.count != 0 { + t.Errorf("Expected callback to be not be invoked - got invocations %v", observer.count) + } +} + +func TestLateRegistration(t *testing.T) { + watcher := ManualWatcher{ + Namespace: "default", + } + + observer := counter{} + + watcher.OnChange(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo", + }, + }) + + watcher.Watch("foo", observer.callback) + + if observer.count != 0 { + t.Errorf("Expected callback to be not be invoked - got invocations %v", observer.count) + } +} + +func TestDifferentConfigName(t *testing.T) { + watcher := ManualWatcher{ + Namespace: "default", + } + + observer := counter{} + + watcher.OnChange(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo", + }, + }) + + watcher.Watch("bar", observer.callback) + + if observer.count != 0 { + t.Errorf("Expected callback to be not be invoked - got invocations %v", observer.count) + } +} diff --git a/configmap/fixed.go b/configmap/static_watcher.go similarity index 53% rename from configmap/fixed.go rename to configmap/static_watcher.go index d915e7ae0..96a01140d 100644 --- a/configmap/fixed.go +++ b/configmap/static_watcher.go @@ -17,39 +17,47 @@ limitations under the License. package configmap import ( - "log" + "fmt" corev1 "k8s.io/api/core/v1" ) -// fixedImpl provides a fixed informer-based implementation of Watcher. -type fixedImpl struct { - cfgs map[string]*corev1.ConfigMap +// NewFixedWatcher returns a StaticWatcher that exposes a collection of ConfigMaps. +// +// Deprecated: Use NewStaticWatcher +func NewFixedWatcher(cms ...*corev1.ConfigMap) *StaticWatcher { + return NewStaticWatcher(cms...) } -// Asserts that fixedImpl implements Watcher. -var _ Watcher = (*fixedImpl)(nil) - -// Watch implements Watcher -func (di *fixedImpl) Watch(name string, w Observer) { - cm, ok := di.cfgs[name] - if ok { - w(cm) - } else { - log.Printf("Name %q is not found.", name) - } -} - -// Start implements Watcher -func (di *fixedImpl) Start(stopCh <-chan struct{}) error { - return nil -} - -// NewFixedWatcher returns an Watcher that exposes the fixed collection of ConfigMaps. -func NewFixedWatcher(cms ...*corev1.ConfigMap) Watcher { +// NewStaticWatcher returns an StaticWatcher that exposes a collection of ConfigMaps. +func NewStaticWatcher(cms ...*corev1.ConfigMap) *StaticWatcher { cmm := make(map[string]*corev1.ConfigMap) for _, cm := range cms { cmm[cm.Name] = cm } - return &fixedImpl{cfgs: cmm} + return &StaticWatcher{cfgs: cmm} +} + +// StaticWatcher is a Watcher with static ConfigMaps. Callbacks will +// occur when Watch is invoked for a specific Observer +type StaticWatcher struct { + cfgs map[string]*corev1.ConfigMap +} + +// Asserts that fixedImpl implements Watcher. +var _ Watcher = (*StaticWatcher)(nil) + +// Watch implements Watcher +func (di *StaticWatcher) Watch(name string, o Observer) { + cm, ok := di.cfgs[name] + if ok { + o(cm) + } else { + panic(fmt.Sprintf("Tried to watch unknown config with name %q", name)) + } +} + +// Start implements Watcher +func (di *StaticWatcher) Start(<-chan struct{}) error { + return nil } diff --git a/configmap/fixed_test.go b/configmap/static_watcher_test.go similarity index 80% rename from configmap/fixed_test.go rename to configmap/static_watcher_test.go index f4a35181e..29355029a 100644 --- a/configmap/fixed_test.go +++ b/configmap/static_watcher_test.go @@ -23,7 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestFixedWatch(t *testing.T) { +func TestStaticWatcher(t *testing.T) { fooCM := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: "knative-system", @@ -37,7 +37,7 @@ func TestFixedWatch(t *testing.T) { }, } - cm := NewFixedWatcher(fooCM, barCM) + cm := NewStaticWatcher(fooCM, barCM) foo1 := &counter{name: "foo1"} cm.Watch("foo", foo1.callback) @@ -45,13 +45,8 @@ func TestFixedWatch(t *testing.T) { cm.Watch("foo", foo2.callback) bar := &counter{name: "bar"} cm.Watch("bar", bar.callback) - // This won't increment bar. However, it will log to make it - // easier to debug failed lookups in tests. - cm.Watch("unknown", bar.callback) - stopCh := make(chan struct{}) - defer close(stopCh) - err := cm.Start(stopCh) + err := cm.Start(nil) if err != nil { t.Fatalf("cm.Start() = %v", err) } @@ -64,3 +59,14 @@ func TestFixedWatch(t *testing.T) { } } } + +func TestUnknownConfigMapName(t *testing.T) { + defer func() { + if recover() == nil { + t.Error("Expected calling Watch with an unknown configmap name to panic") + } + }() + + cm := NewStaticWatcher() + cm.Watch("unknown", func(*corev1.ConfigMap) {}) +} diff --git a/configmap/watcher.go b/configmap/watcher.go index ccb75f36c..d248bbd73 100644 --- a/configmap/watcher.go +++ b/configmap/watcher.go @@ -17,11 +17,7 @@ limitations under the License. package configmap import ( - "time" - corev1 "k8s.io/api/core/v1" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" ) // Observer is the signature of the callbacks that notify an observer of the latest @@ -40,15 +36,3 @@ type Watcher interface { // initial state of the ConfigMaps they are watching. Start(<-chan struct{}) error } - -// NewDefaultWatcher creates a new default configmap.Watcher instance. -func NewDefaultWatcher(kc kubernetes.Interface, ns string) Watcher { - sif := kubeinformers.NewFilteredSharedInformerFactory( - kc, 5*time.Minute, ns, nil) - - return &defaultImpl{ - sif: sif, - informer: sif.Core().V1().ConfigMaps(), - ns: ns, - } -}