Refactor some watchcache tests
Kubernetes-commit: 1eca720dcc727b5deeeeb1164689d42c6cc316eb
This commit is contained in:
		
							parent
							
								
									bd86839aba
								
							
						
					
					
						commit
						d9c1a1d082
					
				| 
						 | 
				
			
			@ -62,7 +62,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
 | 
			
		|||
	}
 | 
			
		||||
	// set the size of the buffer of w.result to 0, so that the writes to
 | 
			
		||||
	// w.result is blocked.
 | 
			
		||||
	w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
	w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
	go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
 | 
			
		||||
	w.Stop()
 | 
			
		||||
	if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
 | 
			
		||||
| 
						 | 
				
			
			@ -182,7 +182,7 @@ TestCase:
 | 
			
		|||
			testCase.events[j].ResourceVersion = uint64(j) + 1
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
		w := newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
		go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0)
 | 
			
		||||
 | 
			
		||||
		ch := w.ResultChan()
 | 
			
		||||
| 
						 | 
				
			
			@ -219,7 +219,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
 | 
			
		|||
	// timeout to zero and run the Stop goroutine concurrently.
 | 
			
		||||
	// May sure that the watch will not be blocked on Stop.
 | 
			
		||||
	for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
 | 
			
		||||
		w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
		w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
		go w.Stop()
 | 
			
		||||
		select {
 | 
			
		||||
		case <-done:
 | 
			
		||||
| 
						 | 
				
			
			@ -231,7 +231,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
 | 
			
		|||
	deadline := time.Now().Add(time.Hour)
 | 
			
		||||
	// After that, verifies the cacheWatcher.process goroutine works correctly.
 | 
			
		||||
	for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
 | 
			
		||||
		w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
		w = newCacheWatcher(2, filter, emptyFunc, storage.APIObjectVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
		w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
 | 
			
		||||
		ctx, cancel := context.WithDeadline(context.Background(), deadline)
 | 
			
		||||
		defer cancel()
 | 
			
		||||
| 
						 | 
				
			
			@ -308,7 +308,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) {
 | 
			
		|||
	filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true }
 | 
			
		||||
	forget := func(_ bool) {}
 | 
			
		||||
	deadline := time.Now().Add(time.Minute)
 | 
			
		||||
	w := newCacheWatcher(numObjects+1, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
	w := newCacheWatcher(numObjects+1, filter, forget, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
 | 
			
		||||
	// Simulate a situation when the last event will that was already in
 | 
			
		||||
	// the state, wasn't yet processed by cacher and will be delivered
 | 
			
		||||
| 
						 | 
				
			
			@ -351,7 +351,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
 | 
			
		|||
	forget := func(bool) {}
 | 
			
		||||
 | 
			
		||||
	newWatcher := func(deadline time.Time) *cacheWatcher {
 | 
			
		||||
		w := newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
		w := newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
		w.setBookmarkAfterResourceVersion(0)
 | 
			
		||||
		return w
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -418,7 +418,7 @@ func TestCacheWatcherDraining(t *testing.T) {
 | 
			
		|||
		makeWatchCacheEvent(5),
 | 
			
		||||
		makeWatchCacheEvent(6),
 | 
			
		||||
	}
 | 
			
		||||
	w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
	w = newCacheWatcher(1, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
	go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
 | 
			
		||||
	if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
 | 
			
		||||
		t.Fatal("failed adding an even to the watcher")
 | 
			
		||||
| 
						 | 
				
			
			@ -459,7 +459,7 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
 | 
			
		|||
		makeWatchCacheEvent(5),
 | 
			
		||||
		makeWatchCacheEvent(6),
 | 
			
		||||
	}
 | 
			
		||||
	w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
	w = newCacheWatcher(1, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
	go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
 | 
			
		||||
	if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
 | 
			
		||||
		t.Fatal("failed adding an even to the watcher")
 | 
			
		||||
| 
						 | 
				
			
			@ -496,7 +496,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived(t *testing.T
 | 
			
		|||
		{Object: &v1.Pod{}},
 | 
			
		||||
		{Object: &v1.Pod{}},
 | 
			
		||||
	}
 | 
			
		||||
	w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
	w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
	w.setBookmarkAfterResourceVersion(10)
 | 
			
		||||
	go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
 | 
			
		||||
	if w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) {
 | 
			
		||||
| 
						 | 
				
			
			@ -542,7 +542,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) {
 | 
			
		|||
		w.stopLocked()
 | 
			
		||||
	}
 | 
			
		||||
	initEvents := []*watchCacheEvent{{Object: makePod(1)}, {Object: makePod(2)}}
 | 
			
		||||
	w = newCacheWatcher(2, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
	w = newCacheWatcher(2, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
			
		||||
	w.setBookmarkAfterResourceVersion(10)
 | 
			
		||||
	go w.processInterval(ctx, intervalFromEvents(initEvents), 0)
 | 
			
		||||
	watchInitializationSignal.Wait()
 | 
			
		||||
| 
						 | 
				
			
			@ -596,7 +596,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) {
 | 
			
		|||
 | 
			
		||||
func TestBookmarkAfterResourceVersionWatchers(t *testing.T) {
 | 
			
		||||
	newWatcher := func(id string, deadline time.Time) *cacheWatcher {
 | 
			
		||||
		w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id)
 | 
			
		||||
		w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id)
 | 
			
		||||
		w.setBookmarkAfterResourceVersion(10)
 | 
			
		||||
		return w
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -33,7 +33,6 @@ import (
 | 
			
		|||
	"k8s.io/apimachinery/pkg/api/apitesting"
 | 
			
		||||
	apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/meta"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/fields"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
| 
						 | 
				
			
			@ -58,42 +57,6 @@ import (
 | 
			
		|||
	"k8s.io/utils/pointer"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type testVersioner struct{}
 | 
			
		||||
 | 
			
		||||
func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
 | 
			
		||||
	return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10))
 | 
			
		||||
}
 | 
			
		||||
func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string, count *int64) error {
 | 
			
		||||
	listAccessor, err := meta.ListAccessor(obj)
 | 
			
		||||
	if err != nil || listAccessor == nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	listAccessor.SetResourceVersion(strconv.FormatUint(resourceVersion, 10))
 | 
			
		||||
	listAccessor.SetContinue(continueValue)
 | 
			
		||||
	listAccessor.SetRemainingItemCount(count)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
 | 
			
		||||
	return fmt.Errorf("unimplemented")
 | 
			
		||||
}
 | 
			
		||||
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
 | 
			
		||||
	accessor, err := meta.Accessor(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	version := accessor.GetResourceVersion()
 | 
			
		||||
	if len(version) == 0 {
 | 
			
		||||
		return 0, nil
 | 
			
		||||
	}
 | 
			
		||||
	return strconv.ParseUint(version, 10, 64)
 | 
			
		||||
}
 | 
			
		||||
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
 | 
			
		||||
	if len(resourceVersion) == 0 {
 | 
			
		||||
		return 0, nil
 | 
			
		||||
	}
 | 
			
		||||
	return strconv.ParseUint(resourceVersion, 10, 64)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	scheme   = runtime.NewScheme()
 | 
			
		||||
	codecs   = serializer.NewCodecFactory(scheme)
 | 
			
		||||
| 
						 | 
				
			
			@ -111,7 +74,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
 | 
			
		|||
	prefix := "pods"
 | 
			
		||||
	config := Config{
 | 
			
		||||
		Storage:        s,
 | 
			
		||||
		Versioner:      testVersioner{},
 | 
			
		||||
		Versioner:      storage.APIObjectVersioner{},
 | 
			
		||||
		GroupResource:  schema.GroupResource{Resource: "pods"},
 | 
			
		||||
		ResourcePrefix: prefix,
 | 
			
		||||
		KeyFunc:        func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
 | 
			
		||||
| 
						 | 
				
			
			@ -133,7 +96,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
 | 
			
		|||
		Clock:       clock.RealClock{},
 | 
			
		||||
	}
 | 
			
		||||
	cacher, err := NewCacherFromConfig(config)
 | 
			
		||||
	return cacher, testVersioner{}, err
 | 
			
		||||
	return cacher, storage.APIObjectVersioner{}, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type dummyStorage struct {
 | 
			
		||||
| 
						 | 
				
			
			@ -348,6 +311,90 @@ func TestWatchCacheBypass(t *testing.T) {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestEmptyWatchEventCache(t *testing.T) {
 | 
			
		||||
	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
 | 
			
		||||
	defer server.Terminate(t)
 | 
			
		||||
 | 
			
		||||
	// add a few objects
 | 
			
		||||
	v := storage.APIObjectVersioner{}
 | 
			
		||||
	lastRV := uint64(0)
 | 
			
		||||
	for i := 0; i < 5; i++ {
 | 
			
		||||
		pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: "test-ns"}}
 | 
			
		||||
		out := &example.Pod{}
 | 
			
		||||
		key := computePodKey(pod)
 | 
			
		||||
		if err := etcdStorage.Create(context.Background(), key, pod, out, 0); err != nil {
 | 
			
		||||
			t.Fatalf("Create failed: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		var err error
 | 
			
		||||
		if lastRV, err = v.ParseResourceVersion(out.ResourceVersion); err != nil {
 | 
			
		||||
			t.Fatalf("Unexpected error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cacher, _, err := newTestCacher(etcdStorage)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Couldn't create cacher: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer cacher.Stop()
 | 
			
		||||
 | 
			
		||||
	// Given that cacher is always initialized from the "current" version of etcd,
 | 
			
		||||
	// we now have a cacher with an empty cache of watch events and a resourceVersion of rv.
 | 
			
		||||
	// It should support establishing watches from rv and higher, but not older.
 | 
			
		||||
 | 
			
		||||
	expectedResourceExpiredError := apierrors.NewResourceExpired("").ErrStatus
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name            string
 | 
			
		||||
		resourceVersion uint64
 | 
			
		||||
		expectedEvent   *watch.Event
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:            "RV-1",
 | 
			
		||||
			resourceVersion: lastRV - 1,
 | 
			
		||||
			expectedEvent:   &watch.Event{Type: watch.Error, Object: &expectedResourceExpiredError},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:            "RV",
 | 
			
		||||
			resourceVersion: lastRV,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:            "RV+1",
 | 
			
		||||
			resourceVersion: lastRV + 1,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			opts := storage.ListOptions{
 | 
			
		||||
				ResourceVersion: strconv.Itoa(int(tt.resourceVersion)),
 | 
			
		||||
				Predicate:       storage.Everything,
 | 
			
		||||
			}
 | 
			
		||||
			watcher, err := cacher.Watch(context.Background(), "/pods/test-ns", opts)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("Failed to create watch: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			defer watcher.Stop()
 | 
			
		||||
			select {
 | 
			
		||||
			case event := <-watcher.ResultChan():
 | 
			
		||||
				if tt.expectedEvent == nil {
 | 
			
		||||
					t.Errorf("Unexpected event: type=%#v, object=%#v", event.Type, event.Object)
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
				if e, a := tt.expectedEvent.Type, event.Type; e != a {
 | 
			
		||||
					t.Errorf("Expected: %s, got: %s", e, a)
 | 
			
		||||
				}
 | 
			
		||||
				if e, a := tt.expectedEvent.Object, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
 | 
			
		||||
					t.Errorf("Expected: %#v, got: %#v", e, a)
 | 
			
		||||
				}
 | 
			
		||||
			case <-time.After(3 * time.Second):
 | 
			
		||||
				if tt.expectedEvent != nil {
 | 
			
		||||
					t.Errorf("Failed to get an event")
 | 
			
		||||
				}
 | 
			
		||||
				// watch remained established successfully
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWatchNotHangingOnStartupFailure(t *testing.T) {
 | 
			
		||||
	// Configure cacher so that it can't initialize, because of
 | 
			
		||||
	// constantly failing lists to the underlying storage.
 | 
			
		||||
| 
						 | 
				
			
			@ -378,7 +425,7 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) {
 | 
			
		|||
 | 
			
		||||
func TestWatcherNotGoingBackInTime(t *testing.T) {
 | 
			
		||||
	backingStorage := &dummyStorage{}
 | 
			
		||||
	cacher, _, err := newTestCacher(backingStorage)
 | 
			
		||||
	cacher, v, err := newTestCacher(backingStorage)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Couldn't create cacher: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -448,7 +495,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
 | 
			
		|||
				shouldContinue = false
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
			rv, err := testVersioner{}.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion())
 | 
			
		||||
			rv, err := v.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("unexpected parsing error: %v", err)
 | 
			
		||||
			} else {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -35,6 +35,10 @@ import (
 | 
			
		|||
 | 
			
		||||
func newPod() runtime.Object { return &example.Pod{} }
 | 
			
		||||
 | 
			
		||||
func computePodKey(obj *example.Pod) string {
 | 
			
		||||
	return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
 | 
			
		||||
	server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
 | 
			
		||||
	storage := etcd3.New(
 | 
			
		||||
| 
						 | 
				
			
			@ -61,7 +65,7 @@ func TestCacherListerWatcher(t *testing.T) {
 | 
			
		|||
	}
 | 
			
		||||
	for _, obj := range objects {
 | 
			
		||||
		out := &example.Pod{}
 | 
			
		||||
		key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
 | 
			
		||||
		key := computePodKey(obj)
 | 
			
		||||
		if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
 | 
			
		||||
			t.Fatalf("Create failed: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -97,7 +101,7 @@ func TestCacherListerWatcherPagination(t *testing.T) {
 | 
			
		|||
	}
 | 
			
		||||
	for _, obj := range objects {
 | 
			
		||||
		out := &example.Pod{}
 | 
			
		||||
		key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
 | 
			
		||||
		key := computePodKey(obj)
 | 
			
		||||
		if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
 | 
			
		||||
			t.Fatalf("Create failed: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -112,10 +112,6 @@ func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3
 | 
			
		|||
	return server, storage
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newTestCacher(s storage.Interface) (*cacherstorage.Cacher, storage.Versioner, error) {
 | 
			
		||||
	return newTestCacherWithClock(s, clock.RealClock{})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) {
 | 
			
		||||
	prefix := "pods"
 | 
			
		||||
	v := storage.APIObjectVersioner{}
 | 
			
		||||
| 
						 | 
				
			
			@ -490,73 +486,6 @@ func TestWatchDeprecated(t *testing.T) {
 | 
			
		|||
	verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestEmptyWatchEventCache(t *testing.T) {
 | 
			
		||||
	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true)
 | 
			
		||||
	defer server.Terminate(t)
 | 
			
		||||
 | 
			
		||||
	// add a few objects
 | 
			
		||||
	updatePod(t, etcdStorage, makeTestPod("pod1"), nil)
 | 
			
		||||
	updatePod(t, etcdStorage, makeTestPod("pod2"), nil)
 | 
			
		||||
	updatePod(t, etcdStorage, makeTestPod("pod3"), nil)
 | 
			
		||||
	updatePod(t, etcdStorage, makeTestPod("pod4"), nil)
 | 
			
		||||
	updatePod(t, etcdStorage, makeTestPod("pod5"), nil)
 | 
			
		||||
 | 
			
		||||
	fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
 | 
			
		||||
 | 
			
		||||
	cacher, v, err := newTestCacher(etcdStorage)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Couldn't create cacher: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer cacher.Stop()
 | 
			
		||||
 | 
			
		||||
	// get rv of last pod created
 | 
			
		||||
	rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// We now have a cacher with an empty cache of watch events and a resourceVersion of rv.
 | 
			
		||||
	// It should support establishing watches from rv and higher, but not older.
 | 
			
		||||
 | 
			
		||||
	{
 | 
			
		||||
		watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv - 1)), Predicate: storage.Everything})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatalf("Unexpected error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		defer watcher.Stop()
 | 
			
		||||
		expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
 | 
			
		||||
		verifyWatchEvent(t, watcher, watch.Error, &expectedResourceExpiredError)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	{
 | 
			
		||||
		watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv + 1)), Predicate: storage.Everything})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatalf("Unexpected error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		defer watcher.Stop()
 | 
			
		||||
		select {
 | 
			
		||||
		case e := <-watcher.ResultChan():
 | 
			
		||||
			t.Errorf("unexpected event %#v", e)
 | 
			
		||||
		case <-time.After(3 * time.Second):
 | 
			
		||||
			// watch from rv+1 remained established successfully
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	{
 | 
			
		||||
		watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv)), Predicate: storage.Everything})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatalf("Unexpected error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		defer watcher.Stop()
 | 
			
		||||
		select {
 | 
			
		||||
		case e := <-watcher.ResultChan():
 | 
			
		||||
			t.Errorf("unexpected event %#v", e)
 | 
			
		||||
		case <-time.After(3 * time.Second):
 | 
			
		||||
			// watch from rv remained established successfully
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWatchDispatchBookmarkEvents(t *testing.T) {
 | 
			
		||||
	ctx, cacher, terminate := testSetup(t)
 | 
			
		||||
	t.Cleanup(terminate)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue