Correctly handle empty watch event cache

Kubernetes-commit: 0df769f54061aaa1796e2ef496265b3711e6826a
This commit is contained in:
Jordan Liggitt 2017-08-01 22:16:39 -04:00 committed by Kubernetes Publisher
parent 8fcacc09b4
commit d986f949fd
2 changed files with 67 additions and 1 deletions

View File

@ -537,6 +537,70 @@ func TestStartingResourceVersion(t *testing.T) {
}
}
func TestEmptyWatchEventCache(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
// add a few objects
updatePod(t, etcdStorage, makeTestPod("pod1"), nil)
updatePod(t, etcdStorage, makeTestPod("pod2"), nil)
updatePod(t, etcdStorage, makeTestPod("pod3"), nil)
updatePod(t, etcdStorage, makeTestPod("pod4"), nil)
updatePod(t, etcdStorage, makeTestPod("pod5"), nil)
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
// get rv of last pod created
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
// We now have a cacher with an empty cache of watch events and a resourceVersion of rv.
// It should support establishing watches from rv and higher, but not older.
{
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv-1)), storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
expectedGoneError := errors.NewGone("").ErrStatus
verifyWatchEvent(t, watcher, watch.Error, &expectedGoneError)
}
{
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv+1)), storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
select {
case e := <-watcher.ResultChan():
t.Errorf("unexpected event %#v", e)
case <-time.After(3 * time.Second):
// watch from rv+1 remained established successfully
}
}
{
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv)), storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
select {
case e := <-watcher.ResultChan():
t.Errorf("unexpected event %#v", e)
case <-time.After(3 * time.Second):
// watch from rv remained established successfully
}
}
}
func TestRandomWatchDeliver(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)

View File

@ -412,7 +412,9 @@ func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
size := w.endIndex - w.startIndex
oldest := w.resourceVersion
// if we have no watch events in our cache, the oldest one we can successfully deliver to a watcher
// is the *next* event we'll receive, which will be at least one greater than our current resourceVersion
oldest := w.resourceVersion + 1
if size > 0 {
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
}