Few changes to the configmap package (#59)

* Few changes to the configmap package

- New* methods return concrete types
- defaultImpl is now InformedWatcher
- fixedImpl is now StaticWatcher
- Included a new ManualWatcher that mimics the InformedWatcher
  in behaviour. ie. allows updates - making this a true 'mock'

* Include note about embedding the ManualWatcher in the InformedWatcher

* Update static_watcher.go

Fix some godoc
This commit is contained in:
Dave Protasowski 2018-09-12 10:42:16 -04:00 committed by Knative Prow Robot
parent 0122abd983
commit 8fc80deb20
8 changed files with 368 additions and 188 deletions

View File

@ -1,128 +0,0 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package configmap
import (
"errors"
"sync"
corev1 "k8s.io/api/core/v1"
informers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
)
// defaultImpl provides a default informer-based implementation of Watcher.
type defaultImpl struct {
sif informers.SharedInformerFactory
informer corev1informers.ConfigMapInformer
ns string
// Guards mutations to defaultImpl fields
m sync.Mutex
observers map[string][]Observer
started bool
}
// Asserts that defaultImpl implements Watcher.
var _ Watcher = (*defaultImpl)(nil)
// Watch implements Watcher
func (di *defaultImpl) Watch(name string, w Observer) {
di.m.Lock()
defer di.m.Unlock()
if di.observers == nil {
di.observers = make(map[string][]Observer)
}
wl, _ := di.observers[name]
di.observers[name] = append(wl, w)
}
// Start implements Watcher
func (di *defaultImpl) Start(stopCh <-chan struct{}) error {
if err := di.registerCallbackAndStartInformer(stopCh); err != nil {
return err
}
// Wait until it has been synced (WITHOUT holding the mutex, so callbacks happen)
if ok := cache.WaitForCacheSync(stopCh, di.informer.Informer().HasSynced); !ok {
return errors.New("Error waiting for ConfigMap informer to sync.")
}
return di.checkObservedResourcesExist()
}
func (di *defaultImpl) registerCallbackAndStartInformer(stopCh <-chan struct{}) error {
di.m.Lock()
defer di.m.Unlock()
if di.started {
return errors.New("Watcher already started!")
}
di.started = true
di.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: di.addConfigMapEvent,
UpdateFunc: di.updateConfigMapEvent,
})
// Start the shared informer factory (non-blocking)
di.sif.Start(stopCh)
return nil
}
func (di *defaultImpl) checkObservedResourcesExist() error {
di.m.Lock()
defer di.m.Unlock()
// Check that all objects with Observers exist in our informers.
for k := range di.observers {
_, err := di.informer.Lister().ConfigMaps(di.ns).Get(k)
if err != nil {
return err
}
}
return nil
}
func (di *defaultImpl) addConfigMapEvent(obj interface{}) {
// If the ConfigMap update is outside of our namespace, then quickly disregard it.
configMap := obj.(*corev1.ConfigMap)
if configMap.Namespace != di.ns {
// Outside of our namespace.
// This shouldn't happen due to our filtered informer.
return
}
// Within our namespace, take the lock and see if there are any registered observers.
di.m.Lock()
defer di.m.Unlock()
wl, ok := di.observers[configMap.Name]
if !ok {
return // No observers.
}
// Iterate over the observers and invoke their callbacks.
for _, w := range wl {
w(configMap)
}
}
func (di *defaultImpl) updateConfigMapEvent(old, new interface{}) {
di.addConfigMapEvent(new)
}

View File

@ -0,0 +1,122 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
istributed under the License is istributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package configmap
import (
"errors"
"time"
corev1 "k8s.io/api/core/v1"
informers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
// NewDefaultWatcher creates a new default configmap.Watcher instance.
//
// Deprecated: Use NewInformedWatcher
func NewDefaultWatcher(kc kubernetes.Interface, namespace string) *InformedWatcher {
return NewInformedWatcher(kc, namespace)
}
// NewInformedWatcher watchers a Kubernetes namespace for configmap changs
func NewInformedWatcher(kc kubernetes.Interface, namespace string) *InformedWatcher {
sif := informers.NewSharedInformerFactoryWithOptions(
kc,
5*time.Minute,
informers.WithNamespace(namespace),
)
return &InformedWatcher{
sif: sif,
informer: sif.Core().V1().ConfigMaps(),
ManualWatcher: ManualWatcher{
Namespace: namespace,
},
}
}
// InformedWatcher provides an informer-based implementation of Watcher.
type InformedWatcher struct {
sif informers.SharedInformerFactory
informer corev1informers.ConfigMapInformer
started bool
// Embedding this struct allows us to reuse the logic
// of registering and notifying observers. This simplifies the
// InformedWatcher to just setting up the Kubernetes informer
ManualWatcher
}
// Asserts that InformedWatcher implements Watcher.
var _ Watcher = (*InformedWatcher)(nil)
// Start implements Watcher
func (i *InformedWatcher) Start(stopCh <-chan struct{}) error {
if err := i.registerCallbackAndStartInformer(stopCh); err != nil {
return err
}
// Wait until it has been synced (WITHOUT holing the mutex, so callbacks happen)
if ok := cache.WaitForCacheSync(stopCh, i.informer.Informer().HasSynced); !ok {
return errors.New("Error waiting for ConfigMap informer to sync.")
}
return i.checkObservedResourcesExist()
}
func (i *InformedWatcher) registerCallbackAndStartInformer(stopCh <-chan struct{}) error {
i.m.Lock()
defer i.m.Unlock()
if i.started {
return errors.New("Watcher already started!")
}
i.started = true
i.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: i.addConfigMapEvent,
UpdateFunc: i.updateConfigMapEvent,
})
// Start the shared informer factory (non-blocking)
i.sif.Start(stopCh)
return nil
}
func (i *InformedWatcher) checkObservedResourcesExist() error {
i.m.Lock()
defer i.m.Unlock()
// Check that all objects with Observers exist in our informers.
for k := range i.observers {
_, err := i.informer.Lister().ConfigMaps(i.Namespace).Get(k)
if err != nil {
return err
}
}
return nil
}
func (i *InformedWatcher) addConfigMapEvent(obj interface{}) {
configMap := obj.(*corev1.ConfigMap)
i.OnChange(configMap)
}
func (i *InformedWatcher) updateConfigMapEvent(old, new interface{}) {
configMap := new.(*corev1.ConfigMap)
i.OnChange(configMap)
}

View File

@ -33,27 +33,27 @@ func (c *counter) callback(*corev1.ConfigMap) {
c.count++
}
func TestWatch(t *testing.T) {
func TestInformedWatcher(t *testing.T) {
fooCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "knative-system",
Namespace: "default",
Name: "foo",
},
}
barCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "knative-system",
Namespace: "default",
Name: "bar",
},
}
kc := fakekubeclientset.NewSimpleClientset(fooCM, barCM)
cm := NewDefaultWatcher(kc, "knative-system").(*defaultImpl)
cm := NewInformedWatcher(kc, "default")
foo1 := &counter{name: "foo1"}
cm.Watch("foo", foo1.callback)
foo2 := &counter{name: "foo2"}
cm.Watch("foo", foo2.callback)
bar := &counter{name: "bar"}
cm.Watch("foo", foo1.callback)
cm.Watch("foo", foo2.callback)
cm.Watch("bar", bar.callback)
stopCh := make(chan struct{})
@ -79,6 +79,7 @@ func TestWatch(t *testing.T) {
t.Errorf("%v.count = %v, want %v", obj, got, want)
}
}
for _, obj := range []*counter{bar} {
if got, want := obj.count, 1; got != want {
t.Errorf("%v.count = %v, want %v", obj, got, want)
@ -109,9 +110,10 @@ func TestWatch(t *testing.T) {
}
// After an unwatched ConfigMap update, no change.
cm.updateConfigMapEvent(nil, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "knative-system",
Namespace: "default",
Name: "not-watched",
},
})
@ -124,7 +126,7 @@ func TestWatch(t *testing.T) {
// After a change in an unrelated namespace, no change.
cm.updateConfigMapEvent(nil, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "different-system",
Namespace: "not-default",
Name: "foo",
},
})
@ -137,7 +139,7 @@ func TestWatch(t *testing.T) {
func TestWatchMissingFailsOnStart(t *testing.T) {
kc := fakekubeclientset.NewSimpleClientset()
cm := NewDefaultWatcher(kc, "knative-system").(*defaultImpl)
cm := NewInformedWatcher(kc, "default")
foo1 := &counter{name: "foo1"}
cm.Watch("foo", foo1.callback)
@ -155,12 +157,12 @@ func TestWatchMissingFailsOnStart(t *testing.T) {
func TestErrorOnMultipleStarts(t *testing.T) {
fooCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "knative-system",
Namespace: "default",
Name: "foo",
},
}
kc := fakekubeclientset.NewSimpleClientset(fooCM)
cm := NewDefaultWatcher(kc, "knative-system").(*defaultImpl)
cm := NewInformedWatcher(kc, "default")
foo1 := &counter{name: "foo1"}
cm.Watch("foo", foo1.callback)

View File

@ -0,0 +1,71 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package configmap
import (
"sync"
corev1 "k8s.io/api/core/v1"
)
// ManualWatcher will notify Observers when a ConfigMap is manually reported as changed
type ManualWatcher struct {
Namespace string
// Guards mutations to defaultImpl fields
m sync.Mutex
started bool
observers map[string][]Observer
}
var _ Watcher = (*ManualWatcher)(nil)
// Watch implements Watcher
func (w *ManualWatcher) Watch(name string, o Observer) {
w.m.Lock()
defer w.m.Unlock()
if w.observers == nil {
w.observers = make(map[string][]Observer)
}
wl, _ := w.observers[name]
w.observers[name] = append(wl, o)
}
func (w *ManualWatcher) Start(<-chan struct{}) error {
return nil
}
func (w *ManualWatcher) OnChange(configMap *corev1.ConfigMap) {
if configMap.Namespace != w.Namespace {
return
}
// Within our namespace, take the lock and see if there are any registered observers.
w.m.Lock()
defer w.m.Unlock()
observers, ok := w.observers[configMap.Name]
if !ok {
return // No observers.
}
// Iterate over the observers and invoke their callbacks.
for _, o := range observers {
o(configMap)
}
}

View File

@ -0,0 +1,115 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package configmap
import (
"testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestManualStartNOOP(t *testing.T) {
watcher := ManualWatcher{
Namespace: "default",
}
if err := watcher.Start(nil); err != nil {
t.Errorf("Unexpected error watcher.Start() = %v", err)
}
}
func TestCallbackInvoked(t *testing.T) {
watcher := ManualWatcher{
Namespace: "default",
}
observer := counter{}
watcher.Watch("foo", observer.callback)
watcher.OnChange(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "foo",
},
})
if observer.count == 0 {
t.Errorf("Expected callback to be invoked - got invocations %v", observer.count)
}
}
func TestDifferentNamespace(t *testing.T) {
watcher := ManualWatcher{
Namespace: "default",
}
observer := counter{}
watcher.Watch("foo", observer.callback)
watcher.OnChange(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "not-default",
Name: "foo",
},
})
if observer.count != 0 {
t.Errorf("Expected callback to be not be invoked - got invocations %v", observer.count)
}
}
func TestLateRegistration(t *testing.T) {
watcher := ManualWatcher{
Namespace: "default",
}
observer := counter{}
watcher.OnChange(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "foo",
},
})
watcher.Watch("foo", observer.callback)
if observer.count != 0 {
t.Errorf("Expected callback to be not be invoked - got invocations %v", observer.count)
}
}
func TestDifferentConfigName(t *testing.T) {
watcher := ManualWatcher{
Namespace: "default",
}
observer := counter{}
watcher.OnChange(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "foo",
},
})
watcher.Watch("bar", observer.callback)
if observer.count != 0 {
t.Errorf("Expected callback to be not be invoked - got invocations %v", observer.count)
}
}

View File

@ -17,39 +17,47 @@ limitations under the License.
package configmap
import (
"log"
"fmt"
corev1 "k8s.io/api/core/v1"
)
// fixedImpl provides a fixed informer-based implementation of Watcher.
type fixedImpl struct {
cfgs map[string]*corev1.ConfigMap
// NewFixedWatcher returns a StaticWatcher that exposes a collection of ConfigMaps.
//
// Deprecated: Use NewStaticWatcher
func NewFixedWatcher(cms ...*corev1.ConfigMap) *StaticWatcher {
return NewStaticWatcher(cms...)
}
// Asserts that fixedImpl implements Watcher.
var _ Watcher = (*fixedImpl)(nil)
// Watch implements Watcher
func (di *fixedImpl) Watch(name string, w Observer) {
cm, ok := di.cfgs[name]
if ok {
w(cm)
} else {
log.Printf("Name %q is not found.", name)
}
}
// Start implements Watcher
func (di *fixedImpl) Start(stopCh <-chan struct{}) error {
return nil
}
// NewFixedWatcher returns an Watcher that exposes the fixed collection of ConfigMaps.
func NewFixedWatcher(cms ...*corev1.ConfigMap) Watcher {
// NewStaticWatcher returns an StaticWatcher that exposes a collection of ConfigMaps.
func NewStaticWatcher(cms ...*corev1.ConfigMap) *StaticWatcher {
cmm := make(map[string]*corev1.ConfigMap)
for _, cm := range cms {
cmm[cm.Name] = cm
}
return &fixedImpl{cfgs: cmm}
return &StaticWatcher{cfgs: cmm}
}
// StaticWatcher is a Watcher with static ConfigMaps. Callbacks will
// occur when Watch is invoked for a specific Observer
type StaticWatcher struct {
cfgs map[string]*corev1.ConfigMap
}
// Asserts that fixedImpl implements Watcher.
var _ Watcher = (*StaticWatcher)(nil)
// Watch implements Watcher
func (di *StaticWatcher) Watch(name string, o Observer) {
cm, ok := di.cfgs[name]
if ok {
o(cm)
} else {
panic(fmt.Sprintf("Tried to watch unknown config with name %q", name))
}
}
// Start implements Watcher
func (di *StaticWatcher) Start(<-chan struct{}) error {
return nil
}

View File

@ -23,7 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestFixedWatch(t *testing.T) {
func TestStaticWatcher(t *testing.T) {
fooCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "knative-system",
@ -37,7 +37,7 @@ func TestFixedWatch(t *testing.T) {
},
}
cm := NewFixedWatcher(fooCM, barCM)
cm := NewStaticWatcher(fooCM, barCM)
foo1 := &counter{name: "foo1"}
cm.Watch("foo", foo1.callback)
@ -45,13 +45,8 @@ func TestFixedWatch(t *testing.T) {
cm.Watch("foo", foo2.callback)
bar := &counter{name: "bar"}
cm.Watch("bar", bar.callback)
// This won't increment bar. However, it will log to make it
// easier to debug failed lookups in tests.
cm.Watch("unknown", bar.callback)
stopCh := make(chan struct{})
defer close(stopCh)
err := cm.Start(stopCh)
err := cm.Start(nil)
if err != nil {
t.Fatalf("cm.Start() = %v", err)
}
@ -64,3 +59,14 @@ func TestFixedWatch(t *testing.T) {
}
}
}
func TestUnknownConfigMapName(t *testing.T) {
defer func() {
if recover() == nil {
t.Error("Expected calling Watch with an unknown configmap name to panic")
}
}()
cm := NewStaticWatcher()
cm.Watch("unknown", func(*corev1.ConfigMap) {})
}

View File

@ -17,11 +17,7 @@ limitations under the License.
package configmap
import (
"time"
corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)
// Observer is the signature of the callbacks that notify an observer of the latest
@ -40,15 +36,3 @@ type Watcher interface {
// initial state of the ConfigMaps they are watching.
Start(<-chan struct{}) error
}
// NewDefaultWatcher creates a new default configmap.Watcher instance.
func NewDefaultWatcher(kc kubernetes.Interface, ns string) Watcher {
sif := kubeinformers.NewFilteredSharedInformerFactory(
kc, 5*time.Minute, ns, nil)
return &defaultImpl{
sif: sif,
informer: sif.Core().V1().ConfigMaps(),
ns: ns,
}
}