mirror of https://github.com/knative/pkg.git
Add RunAndSyncInformers helper that makes sure informers are synced in tests (#2055)
* Add RunAndSyncInformers helper that makes sure informers are synced in tests * Review stuff * Drop Println
This commit is contained in:
parent
9eeb66f2b0
commit
c326b70b83
|
|
@ -18,10 +18,19 @@ package testing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
|
clientgotesting "k8s.io/client-go/testing"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
|
||||||
"knative.dev/pkg/controller"
|
"knative.dev/pkg/controller"
|
||||||
|
|
@ -43,3 +52,73 @@ func SetupFakeContextWithCancel(t zaptest.TestingT) (context.Context, context.Ca
|
||||||
ctx, is := injection.Fake.SetupInformers(ctx, &rest.Config{})
|
ctx, is := injection.Fake.SetupInformers(ctx, &rest.Config{})
|
||||||
return ctx, c, is
|
return ctx, c, is
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fakeClient is an interface capturing the two functions we need from fake clients.
|
||||||
|
type fakeClient interface {
|
||||||
|
PrependWatchReactor(resource string, reaction clientgotesting.WatchReactionFunc)
|
||||||
|
PrependReactor(verb, resource string, reaction clientgotesting.ReactionFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
// withTracker is an interface capturing only the Tracker function. The dynamic client
|
||||||
|
// currently does not have that, so we need to special-case it.
|
||||||
|
type withTracker interface {
|
||||||
|
Tracker() clientgotesting.ObjectTracker
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunAndSyncInformers runs the given informers, then makes sure their caches are all
|
||||||
|
// synced and in addition makes sure that all the Watch calls have been properly setup.
|
||||||
|
// See https://github.com/kubernetes/kubernetes/issues/95372 for background on the Watch
|
||||||
|
// calls tragedy.
|
||||||
|
func RunAndSyncInformers(ctx context.Context, informers ...controller.Informer) (func(), error) {
|
||||||
|
var watchesPending atomic.Int32
|
||||||
|
|
||||||
|
for _, client := range injection.Fake.FetchAllClients(ctx) {
|
||||||
|
c := client.(fakeClient)
|
||||||
|
|
||||||
|
var tracker clientgotesting.ObjectTracker
|
||||||
|
if withTracker, ok := c.(withTracker); ok {
|
||||||
|
tracker = withTracker.Tracker()
|
||||||
|
} else {
|
||||||
|
// Required setup for the dynamic client as it doesn't define a Tracker() function.
|
||||||
|
// TODO(markusthoemmes): Drop this if https://github.com/kubernetes/kubernetes/pull/100085 lands.
|
||||||
|
scheme := runtime.NewScheme()
|
||||||
|
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "fake-dynamic-client-group", Version: "v1", Kind: "List"}, &unstructured.UnstructuredList{})
|
||||||
|
codecs := serializer.NewCodecFactory(scheme)
|
||||||
|
tracker = clientgotesting.NewObjectTracker(scheme, codecs.UniversalDecoder())
|
||||||
|
}
|
||||||
|
|
||||||
|
c.PrependReactor("list", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
// Every list (before actual informer usage) is going to be followed by a Watch call.
|
||||||
|
watchesPending.Inc()
|
||||||
|
return false, nil, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
c.PrependWatchReactor("*", func(action clientgotesting.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
// The actual Watch call. This is a reimplementation of the default Watch
|
||||||
|
// calls in fakes to guarantee we have actually **done** the work.
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := tracker.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
watchesPending.Dec()
|
||||||
|
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
wf, err := controller.RunInformers(ctx.Done(), informers...)
|
||||||
|
if err != nil {
|
||||||
|
return wf, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = wait.PollImmediate(time.Microsecond, wait.ForeverTestTimeout, func() (bool, error) {
|
||||||
|
if watchesPending.Load() == 0 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
return wf, err
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"knative.dev/pkg/controller"
|
|
||||||
|
|
||||||
// Make system.Namespace() work in tests.
|
// Make system.Namespace() work in tests.
|
||||||
_ "knative.dev/pkg/system/testing"
|
_ "knative.dev/pkg/system/testing"
|
||||||
|
|
@ -53,7 +52,7 @@ func newNonRunningTestWebhook(t *testing.T, options Options, acs ...interface{})
|
||||||
ctx, ctxCancel, informers := SetupFakeContextWithCancel(t)
|
ctx, ctxCancel, informers := SetupFakeContextWithCancel(t)
|
||||||
ctx = WithOptions(ctx, options)
|
ctx = WithOptions(ctx, options)
|
||||||
|
|
||||||
stopCb, err := controller.RunInformers(ctx.Done(), informers...)
|
stopCb, err := RunAndSyncInformers(ctx, informers...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("StartInformers() =", err)
|
t.Fatal("StartInformers() =", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue