client/pkg/wait/poll_watcher.go

189 lines
4.9 KiB
Go

// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wait
import (
"context"
"sync"
"time"
api_errors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
)
const pollInterval = time.Second
// PollInterval determines when you should poll. Useful to mock out, or for
// replacing with exponential backoff later.
type PollInterval interface {
PollChan() <-chan time.Time
Stop()
}
type pollingWatcher struct {
c rest.Interface
ns string
resource string
name string
timeout time.Duration
done chan bool
result chan watch.Event
wg *sync.WaitGroup
// we can mock the interface for testing.
pollInterval PollInterval
// mock hook for testing.
poll func() (runtime.Object, error)
}
type watchF func(context.Context, v1.ListOptions) (watch.Interface, error)
type tickerPollInterval struct {
t *time.Ticker
}
func (t *tickerPollInterval) PollChan() <-chan time.Time {
return t.t.C
}
func (t *tickerPollInterval) Stop() {
t.t.Stop()
}
func newTickerPollInterval(d time.Duration) *tickerPollInterval {
return &tickerPollInterval{time.NewTicker(d)}
}
// NewWatcher makes a watch.Interface on the given resource in the client,
// falling back to polling if the server does not support Watch.
func NewWatcher(ctx context.Context, watchFunc watchF, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) {
native, err := nativeWatch(ctx, watchFunc, name, timeout)
if err == nil {
return native, nil
}
polling := &pollingWatcher{
c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{},
newTickerPollInterval(pollInterval), nativePoll(ctx, c, ns, resource, name)}
polling.start()
return polling, nil
}
func (w *pollingWatcher) start() {
w.wg.Add(1)
go func() {
defer w.wg.Done()
defer w.pollInterval.Stop()
var err error
var old, new runtime.Object
done := false
for !done {
old = new
select {
case <-w.pollInterval.PollChan():
new, err = w.poll()
newObj, ok1 := new.(v1.Object)
oldObj, ok2 := old.(v1.Object)
if err != nil && api_errors.IsNotFound(err) {
if old != nil {
// Deleted
w.result <- watch.Event{
Type: watch.Deleted,
Object: old,
}
}
//... Otherwise maybe just doesn't exist.
} else if err != nil {
// Just an error
w.result <- watch.Event{
Type: watch.Error,
}
} else if old == nil && new != nil {
// Added
w.result <- watch.Event{
Type: watch.Added,
Object: new,
}
} else if !(ok1 && ok2) {
// Error wrong types
w.result <- watch.Event{
Type: watch.Error,
}
} else if newObj.GetUID() != oldObj.GetUID() {
// Deleted and readded.
w.result <- watch.Event{
Type: watch.Deleted,
Object: old,
}
w.result <- watch.Event{
Type: watch.Added,
Object: new,
}
} else if newObj.GetResourceVersion() != oldObj.GetResourceVersion() {
// Modified.
w.result <- watch.Event{
Type: watch.Modified,
Object: new,
}
}
case done = <-w.done:
break
}
}
}()
}
func (w *pollingWatcher) ResultChan() <-chan watch.Event {
return w.result
}
func (w *pollingWatcher) Stop() {
w.done <- true
w.wg.Wait()
close(w.result)
close(w.done)
}
func nativeWatch(ctx context.Context, watchFunc watchF, name string, timeout time.Duration) (watch.Interface, error) {
opts := v1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
}
opts.Watch = true
addWatchTimeout(&opts, timeout)
return watchFunc(ctx, opts)
}
func nativePoll(ctx context.Context, c rest.Interface, ns, resource, name string) func() (runtime.Object, error) {
return func() (runtime.Object, error) {
return c.Get().Namespace(ns).Resource(resource).Name(name).Do(ctx).Get()
}
}
func addWatchTimeout(opts *v1.ListOptions, timeout time.Duration) {
if timeout == 0 {
return
}
// Wait for service to enter 'Ready' state, with a timeout of which is slightly larger than
// the provided timeout. We have our own timeout which fires after "timeout" seconds
// and stops the watch
timeOutWatchSeconds := int64((timeout + 30*time.Second) / time.Second)
opts.TimeoutSeconds = &timeOutWatchSeconds
}