Add logic to convert the watch.Interface. (#106)

The watch logic returns events containing `unstructured.Unstructured`.  This adds a proxy that converts those to the appropriate structured type.
This commit is contained in:
Matt Moore 2018-09-29 21:38:22 -07:00 committed by Knative Prow Robot
parent bb5b93d475
commit fd1e3cc5c9
3 changed files with 323 additions and 1 deletions

74
apis/duck/proxy.go Normal file
View File

@ -0,0 +1,74 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package duck
import (
"sync"
"k8s.io/apimachinery/pkg/watch"
)
// NewProxyWatcher is based on the same concept from Kubernetes apimachinery in 1.12 here:
// https://github.com/kubernetes/apimachinery/blob/c6dd271be/pkg/watch/watch.go#L272
// Replace this copy once we've update our client libraries.
// proxyWatcher lets you wrap your channel in watch.Interface. Threadsafe.
type proxyWatcher struct {
result chan watch.Event
stopCh chan struct{}
mutex sync.Mutex
stopped bool
}
var _ watch.Interface = (*proxyWatcher)(nil)
// NewProxyWatcher creates new proxyWatcher by wrapping a channel
func NewProxyWatcher(ch chan watch.Event) watch.Interface {
return &proxyWatcher{
result: ch,
stopCh: make(chan struct{}),
stopped: false,
}
}
// Stop implements Interface
func (pw *proxyWatcher) Stop() {
pw.mutex.Lock()
defer pw.mutex.Unlock()
if !pw.stopped {
pw.stopped = true
close(pw.stopCh)
}
}
// Stopping returns true if Stop() has been called
func (pw *proxyWatcher) Stopping() bool {
pw.mutex.Lock()
defer pw.mutex.Unlock()
return pw.stopped
}
// ResultChan implements watch.Interface
func (pw *proxyWatcher) ResultChan() <-chan watch.Event {
return pw.result
}
// StopChan returns stop channel
func (pw *proxyWatcher) StopChan() <-chan struct{} {
return pw.stopCh
}

View File

@ -18,12 +18,14 @@ package duck
import (
"fmt"
"net/http"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
@ -47,7 +49,7 @@ func (dif *TypedInformerFactory) Get(gvr schema.GroupVersionResource) (cache.Sha
listObj := dif.Type.GetListType()
lw := &cache.ListWatch{
ListFunc: asStructuredLister(dif.Client.Resource(gvr).List, listObj),
WatchFunc: dif.Client.Resource(gvr).Watch,
WatchFunc: AsStructuredWatcher(dif.Client.Resource(gvr).Watch, dif.Type),
}
inf := cache.NewSharedIndexInformer(lw, dif.Type, dif.ResyncPeriod, cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
@ -79,3 +81,61 @@ func asStructuredLister(ulist unstructuredLister, listObj runtime.Object) cache.
return res, nil
}
}
// AsStructuredWatcher is public for testing only.
// TODO(mattmoor): Move tests for this to `package duck` and make private.
func AsStructuredWatcher(wf cache.WatchFunc, obj runtime.Object) cache.WatchFunc {
return func(lo metav1.ListOptions) (watch.Interface, error) {
uw, err := wf(lo)
if err != nil {
return nil, err
}
structuredCh := make(chan watch.Event)
go func() {
defer close(structuredCh)
unstructuredCh := uw.ResultChan()
for {
select {
case ue, ok := <-unstructuredCh:
if !ok {
// Channel is closed.
return
}
unstructuredObj, ok := ue.Object.(*unstructured.Unstructured)
if !ok {
// If it isn't an unstructured object, then forward the
// event as-is. This is likely to happen when the event's
// Type is an Error.
structuredCh <- ue
continue
}
structuredObj := obj.DeepCopyObject()
err := FromUnstructured(unstructuredObj, structuredObj)
if err != nil {
// Pass back an error indicating that the object we got
// was invalid.
structuredCh <- watch.Event{
Type: watch.Error,
Object: &metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusUnprocessableEntity,
Reason: metav1.StatusReasonInvalid,
Message: err.Error(),
},
}
continue
}
// Send the structured event.
structuredCh <- watch.Event{
Type: ue.Type,
Object: structuredObj,
}
}
}
}()
return NewProxyWatcher(structuredCh), nil
}
}

View File

@ -17,11 +17,17 @@ limitations under the License.
package duck_test
import (
"encoding/json"
"errors"
"testing"
"time"
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic/fake"
"github.com/knative/pkg/apis/duck"
@ -85,3 +91,185 @@ func TestSimpleList(t *testing.T) {
// TODO(mattmoor): Access through informer
}
func TestAsStructuredWatcherNestedError(t *testing.T) {
want := errors.New("this is what we expect")
nwf := func(lo metav1.ListOptions) (watch.Interface, error) {
return nil, want
}
wf := duck.AsStructuredWatcher(nwf, &duckv1alpha1.Generational{})
_, got := wf(metav1.ListOptions{})
if got != want {
t.Errorf("WatchFunc() = %v, wanted %v", got, want)
}
}
func TestAsStructuredWatcherClosedChannel(t *testing.T) {
nwf := func(lo metav1.ListOptions) (watch.Interface, error) {
return watch.NewEmptyWatch(), nil
}
wf := duck.AsStructuredWatcher(nwf, &duckv1alpha1.Generational{})
wi, err := wf(metav1.ListOptions{})
if err != nil {
t.Errorf("WatchFunc() = %v", err)
}
ch := wi.ResultChan()
x, ok := <-ch
if ok {
t.Errorf("<-ch = %v, wanted closed", x)
}
}
func TestAsStructuredWatcherPassThru(t *testing.T) {
unstructuredCh := make(chan watch.Event)
nwf := func(lo metav1.ListOptions) (watch.Interface, error) {
return duck.NewProxyWatcher(unstructuredCh), nil
}
wf := duck.AsStructuredWatcher(nwf, &duckv1alpha1.Generational{})
wi, err := wf(metav1.ListOptions{})
if err != nil {
t.Errorf("WatchFunc() = %v", err)
}
defer wi.Stop()
ch := wi.ResultChan()
// Don't expect a message yet.
select {
case x, ok := <-ch:
t.Errorf("Saw unexpected message on channel: %v, %v.", x, ok)
case _ = <-time.After(100 * time.Millisecond):
// Expected path.
}
want := watch.Added
unstructuredCh <- watch.Event{
Type: want,
Object: &unstructured.Unstructured{},
}
// Expect a message when we send one though.
select {
case x, ok := <-ch:
if !ok {
t.Fatal("<-ch = closed, wanted *duckv1alpha1.Generational{}")
}
if got := x.Type; got != want {
t.Errorf("x.Type = %v, wanted %v", got, want)
}
if _, ok := x.Object.(*duckv1alpha1.Generational); !ok {
t.Errorf("<-ch = %T, wanted %T", x, &duckv1alpha1.Generational{})
}
case _ = <-time.After(100 * time.Millisecond):
t.Errorf("Didn't see expected message on channel.")
}
}
func TestAsStructuredWatcherPassThruErrors(t *testing.T) {
unstructuredCh := make(chan watch.Event)
nwf := func(lo metav1.ListOptions) (watch.Interface, error) {
return duck.NewProxyWatcher(unstructuredCh), nil
}
wf := duck.AsStructuredWatcher(nwf, &duckv1alpha1.Generational{})
wi, err := wf(metav1.ListOptions{})
if err != nil {
t.Errorf("WatchFunc() = %v", err)
}
defer wi.Stop()
ch := wi.ResultChan()
want := watch.Event{
Type: watch.Error,
Object: &metav1.Status{
Code: 42,
},
}
unstructuredCh <- want
// Expect a message when we send one though.
select {
case got, ok := <-ch:
if !ok {
t.Fatal("<-ch = closed, wanted *metav1.Status{}")
}
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("<-ch (-want, +got) = %v", diff)
}
case _ = <-time.After(100 * time.Millisecond):
t.Errorf("Didn't see expected message on channel.")
}
}
func TestAsStructuredWatcherErrorConverting(t *testing.T) {
unstructuredCh := make(chan watch.Event)
nwf := func(lo metav1.ListOptions) (watch.Interface, error) {
return duck.NewProxyWatcher(unstructuredCh), nil
}
wf := duck.AsStructuredWatcher(nwf, &badObject{})
wi, err := wf(metav1.ListOptions{})
if err != nil {
t.Errorf("WatchFunc() = %v", err)
}
defer wi.Stop()
ch := wi.ResultChan()
unstructuredCh <- watch.Event{
Type: watch.Added,
Object: &unstructured.Unstructured{
Object: map[string]interface{}{
"foo": "bar",
},
},
}
// Expect a message when we send one though.
select {
case x, ok := <-ch:
if !ok {
t.Fatal("<-ch = closed, wanted *duckv1alpha1.Generational{}")
}
if got, want := x.Type, watch.Error; got != want {
t.Errorf("<-ch = %v, wanted %v", got, want)
}
if status, ok := x.Object.(*metav1.Status); !ok {
t.Errorf("<-ch = %T, wanted %T", x, &metav1.Status{})
} else if got, want := status.Message, errNoUnmarshal.Error(); got != want {
t.Errorf("<-ch = %v, wanted %v", got, want)
}
case _ = <-time.After(100 * time.Millisecond):
t.Errorf("Didn't see expected message on channel.")
}
}
var errNoUnmarshal = errors.New("this cannot be unmarshalled")
type badObject struct {
Foo doNotUnmarshal `json:"foo"`
}
type doNotUnmarshal struct{}
var _ json.Unmarshaler = (*doNotUnmarshal)(nil)
func (*doNotUnmarshal) UnmarshalJSON([]byte) error {
return errNoUnmarshal
}
func (bo *badObject) GetObjectKind() schema.ObjectKind {
return &metav1.TypeMeta{}
}
func (bo *badObject) DeepCopyObject() runtime.Object {
return &badObject{}
}