Bump k8s watch intialization timeout, cleanup logging (#166)

Signed-off-by: Kevin Lingerfelt <kl@buoyant.io>
This commit is contained in:
Kevin Lingerfelt 2018-01-17 15:31:01 -08:00 committed by GitHub
parent 43e6229363
commit e56be9bf0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 51 additions and 23 deletions

View File

@ -129,7 +129,7 @@ type informer struct {
func (i *informer) run() error {
go i.informer.Run(i.stopCh)
return initializeWatcher(i.informer)
return newWatcher(i.informer, endpointResource).run()
}
func (i *informer) stop() {

View File

@ -46,7 +46,7 @@ func NewPodIndex(clientset *kubernetes.Clientset, index cache.IndexFunc) (*PodIn
func (p *PodIndex) Run() error {
go p.reflector.ListAndWatch(p.stopCh)
return initializeWatcher(p.reflector)
return newWatcher(p.reflector, podResource).run()
}
func (p *PodIndex) Stop() {

View File

@ -47,7 +47,7 @@ func NewReplicaSetStore(clientset *kubernetes.Clientset) (*ReplicaSetStore, erro
func (p *ReplicaSetStore) Run() error {
go p.reflector.ListAndWatch(p.stopCh)
return initializeWatcher(p.reflector)
return newWatcher(p.reflector, replicaSetResource).run()
}
func (p *ReplicaSetStore) Stop() {

View File

@ -8,15 +8,29 @@ import (
)
var (
initializationTimeout = 5 * time.Second
sleepBetweenChecks = 100 * time.Millisecond
initializationTimeout = 30 * time.Second
sleepBetweenChecks = 500 * time.Millisecond
)
type watcher interface {
type resourceToWatch interface {
LastSyncResourceVersion() string
}
func initializeWatcher(w watcher) error {
type watcher struct {
resource resourceToWatch
resourceType string
timeout time.Duration
}
func newWatcher(resource resourceToWatch, resourceType string) *watcher {
return &watcher{
resource: resource,
resourceType: resourceType,
timeout: initializationTimeout,
}
}
func (w *watcher) run() error {
timedOut := make(chan struct{}, 1)
defer close(timedOut)
initialized := make(chan struct{}, 1)
@ -26,15 +40,15 @@ func initializeWatcher(w watcher) error {
for {
select {
case <-timedOut:
log.Warn("Watcher timed out")
log.Warnf("[%s watcher] timed out", w.resourceType)
return
case <-time.Tick(sleepBetweenChecks):
if w.LastSyncResourceVersion() != "" {
log.Info("Watcher initialized")
if w.resource.LastSyncResourceVersion() != "" {
log.Infof("[%s watcher] initialized", w.resourceType)
initialized <- struct{}{}
return
}
log.Debug("Waiting for watcher to initialize")
log.Debugf("[%s watcher] waiting for initialization", w.resourceType)
}
}
}()
@ -42,8 +56,8 @@ func initializeWatcher(w watcher) error {
select {
case <-initialized:
return nil
case <-time.After(initializationTimeout):
case <-time.After(w.timeout):
timedOut <- struct{}{}
return fmt.Errorf("Watcher initialization timed out")
return fmt.Errorf("[%s watcher] timed out", w.resourceType)
}
}

View File

@ -1,38 +1,52 @@
package k8s
import (
"sync"
"testing"
"time"
)
type WatcherImpl struct {
type resourceToWatchImpl struct {
sync.RWMutex
lastSyncResourceVersion string
}
func (w *WatcherImpl) LastSyncResourceVersion() string {
func (w *resourceToWatchImpl) LastSyncResourceVersion() string {
w.RLock()
defer w.RUnlock()
return w.lastSyncResourceVersion
}
func (w *resourceToWatchImpl) SetLastSyncResourceVersion(version string) {
w.Lock()
defer w.Unlock()
w.lastSyncResourceVersion = version
}
func TestWatcher(t *testing.T) {
t.Run("Returns nil if the watcher initializes in the time limit", func(t *testing.T) {
watcher := WatcherImpl{}
t.Run("Returns nil if the resource initializes in the time limit", func(t *testing.T) {
resource := &resourceToWatchImpl{}
watcher := newWatcher(resource, "resourcestring")
watcher.timeout = 2 * time.Second
go func() {
time.Sleep(1 * time.Second)
watcher.lastSyncResourceVersion = "synced"
resource.SetLastSyncResourceVersion("synced")
}()
err := initializeWatcher(&watcher)
err := watcher.run()
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
})
t.Run("Returns error if the watcher does not initialize in the time limit", func(t *testing.T) {
watcher := WatcherImpl{}
resource := &resourceToWatchImpl{}
watcher := newWatcher(resource, "resourcestring")
watcher.timeout = 2 * time.Second
go func() {
time.Sleep(6 * time.Second)
watcher.lastSyncResourceVersion = "synced"
time.Sleep(3 * time.Second)
resource.SetLastSyncResourceVersion("synced")
}()
err := initializeWatcher(&watcher)
err := watcher.run()
if err == nil {
t.Fatalf("Expected error, got nil")
}