mirror of https://github.com/knative/client.git
191 lines
4.9 KiB
Go
191 lines
4.9 KiB
Go
// Copyright © 2019 The Knative Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package wait
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
api_errors "k8s.io/apimachinery/pkg/api/errors"
|
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/client-go/rest"
|
|
)
|
|
|
|
// PollInterval determines when you should poll. Useful to mock out, or for
|
|
// replacing with exponential backoff later.
|
|
type PollInterval interface {
|
|
PollChan() <-chan time.Time
|
|
Stop()
|
|
}
|
|
|
|
type pollingWatcher struct {
|
|
c rest.Interface
|
|
ns string
|
|
resource string
|
|
name string
|
|
timeout time.Duration
|
|
done chan bool
|
|
result chan watch.Event
|
|
wg *sync.WaitGroup
|
|
// we can mock the interface for testing.
|
|
pollInterval PollInterval
|
|
// mock hook for testing.
|
|
poll func() (runtime.Object, error)
|
|
}
|
|
|
|
type watchF func(context.Context, v1.ListOptions) (watch.Interface, error)
|
|
|
|
type tickerPollInterval struct {
|
|
t *time.Ticker
|
|
}
|
|
|
|
func (t *tickerPollInterval) PollChan() <-chan time.Time {
|
|
return t.t.C
|
|
}
|
|
|
|
func (t *tickerPollInterval) Stop() {
|
|
t.t.Stop()
|
|
}
|
|
|
|
func newTickerPollInterval(d time.Duration) *tickerPollInterval {
|
|
return &tickerPollInterval{time.NewTicker(d)}
|
|
}
|
|
|
|
// NewWatcher makes a watch.Interface on the given resource in the client,
|
|
// falling back to polling if the server does not support Watch.
|
|
func NewWatcher(watchFunc watchF, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) {
|
|
native, err := nativeWatch(watchFunc, name, timeout)
|
|
if err == nil {
|
|
return native, nil
|
|
}
|
|
polling := &pollingWatcher{
|
|
c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{},
|
|
newTickerPollInterval(time.Second), nativePoll(c, ns, resource, name)}
|
|
err = polling.start()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return polling, nil
|
|
}
|
|
|
|
func (w *pollingWatcher) start() error {
|
|
w.wg.Add(1)
|
|
|
|
go func() {
|
|
defer w.wg.Done()
|
|
defer w.pollInterval.Stop()
|
|
var err error
|
|
var old, new runtime.Object
|
|
done := false
|
|
for !done {
|
|
old = new
|
|
|
|
select {
|
|
case <-w.pollInterval.PollChan():
|
|
new, err = w.poll()
|
|
newObj, ok1 := new.(v1.Object)
|
|
oldObj, ok2 := old.(v1.Object)
|
|
|
|
if err != nil && api_errors.IsNotFound(err) {
|
|
if old != nil {
|
|
// Deleted
|
|
w.result <- watch.Event{
|
|
Type: watch.Deleted,
|
|
Object: old,
|
|
}
|
|
}
|
|
//... Otherwise maybe just doesn't exist.
|
|
} else if err != nil {
|
|
// Just an error
|
|
w.result <- watch.Event{
|
|
Type: watch.Error,
|
|
}
|
|
} else if old == nil && new != nil {
|
|
// Added
|
|
w.result <- watch.Event{
|
|
Type: watch.Added,
|
|
Object: new,
|
|
}
|
|
} else if !(ok1 && ok2) {
|
|
// Error wrong types
|
|
w.result <- watch.Event{
|
|
Type: watch.Error,
|
|
}
|
|
} else if newObj.GetUID() != oldObj.GetUID() {
|
|
// Deleted and readded.
|
|
w.result <- watch.Event{
|
|
Type: watch.Deleted,
|
|
Object: old,
|
|
}
|
|
w.result <- watch.Event{
|
|
Type: watch.Added,
|
|
Object: new,
|
|
}
|
|
} else if newObj.GetResourceVersion() != oldObj.GetResourceVersion() {
|
|
// Modified.
|
|
w.result <- watch.Event{
|
|
Type: watch.Modified,
|
|
Object: new,
|
|
}
|
|
}
|
|
case done = <-w.done:
|
|
break
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (w *pollingWatcher) ResultChan() <-chan watch.Event {
|
|
return w.result
|
|
}
|
|
|
|
func (w *pollingWatcher) Stop() {
|
|
w.done <- true
|
|
w.wg.Wait()
|
|
close(w.result)
|
|
close(w.done)
|
|
}
|
|
|
|
func nativeWatch(watchFunc watchF, name string, timeout time.Duration) (watch.Interface, error) {
|
|
opts := v1.ListOptions{
|
|
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
|
|
}
|
|
opts.Watch = true
|
|
addWatchTimeout(&opts, timeout)
|
|
return watchFunc(context.TODO(), opts)
|
|
}
|
|
|
|
func nativePoll(c rest.Interface, ns, resource, name string) func() (runtime.Object, error) {
|
|
return func() (runtime.Object, error) {
|
|
return c.Get().Namespace(ns).Resource(resource).Name(name).Do(context.TODO()).Get()
|
|
}
|
|
}
|
|
|
|
func addWatchTimeout(opts *v1.ListOptions, timeout time.Duration) {
|
|
if timeout == 0 {
|
|
return
|
|
}
|
|
// Wait for service to enter 'Ready' state, with a timeout of which is slightly larger than
|
|
// the provided timeout. We have our own timeout which fires after "timeout" seconds
|
|
// and stops the watch
|
|
timeOutWatchSeconds := int64((timeout + 30*time.Second) / time.Second)
|
|
opts.TimeoutSeconds = &timeOutWatchSeconds
|
|
}
|