Fix unnecessary too-old-errors from watch cache
Kubernetes-commit: 12021725922efc3a80c8a0673b28826a524eb0a0
This commit is contained in:
		
							parent
							
								
									f0a843124c
								
							
						
					
					
						commit
						d3536986da
					
				| 
						 | 
					@ -127,6 +127,9 @@ type watchCache struct {
 | 
				
			||||||
	// ResourceVersion up to which the watchCache is propagated.
 | 
						// ResourceVersion up to which the watchCache is propagated.
 | 
				
			||||||
	resourceVersion uint64
 | 
						resourceVersion uint64
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// ResourceVersion of the last list result (populated via Replace() method).
 | 
				
			||||||
 | 
						listResourceVersion uint64
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// This handler is run at the end of every successful Replace() method.
 | 
						// This handler is run at the end of every successful Replace() method.
 | 
				
			||||||
	onReplace func()
 | 
						onReplace func()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -147,16 +150,17 @@ func newWatchCache(
 | 
				
			||||||
	getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error),
 | 
						getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error),
 | 
				
			||||||
	versioner storage.Versioner) *watchCache {
 | 
						versioner storage.Versioner) *watchCache {
 | 
				
			||||||
	wc := &watchCache{
 | 
						wc := &watchCache{
 | 
				
			||||||
		capacity:        capacity,
 | 
							capacity:            capacity,
 | 
				
			||||||
		keyFunc:         keyFunc,
 | 
							keyFunc:             keyFunc,
 | 
				
			||||||
		getAttrsFunc:    getAttrsFunc,
 | 
							getAttrsFunc:        getAttrsFunc,
 | 
				
			||||||
		cache:           make([]watchCacheElement, capacity),
 | 
							cache:               make([]watchCacheElement, capacity),
 | 
				
			||||||
		startIndex:      0,
 | 
							startIndex:          0,
 | 
				
			||||||
		endIndex:        0,
 | 
							endIndex:            0,
 | 
				
			||||||
		store:           cache.NewStore(storeElementKey),
 | 
							store:               cache.NewStore(storeElementKey),
 | 
				
			||||||
		resourceVersion: 0,
 | 
							resourceVersion:     0,
 | 
				
			||||||
		clock:           clock.RealClock{},
 | 
							listResourceVersion: 0,
 | 
				
			||||||
		versioner:       versioner,
 | 
							clock:               clock.RealClock{},
 | 
				
			||||||
 | 
							versioner:           versioner,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	wc.cond = sync.NewCond(wc.RLocker())
 | 
						wc.cond = sync.NewCond(wc.RLocker())
 | 
				
			||||||
	return wc
 | 
						return wc
 | 
				
			||||||
| 
						 | 
					@ -390,6 +394,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
 | 
				
			||||||
	if err := w.store.Replace(toReplace, resourceVersion); err != nil {
 | 
						if err := w.store.Replace(toReplace, resourceVersion); err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						w.listResourceVersion = version
 | 
				
			||||||
	w.resourceVersion = version
 | 
						w.resourceVersion = version
 | 
				
			||||||
	if w.onReplace != nil {
 | 
						if w.onReplace != nil {
 | 
				
			||||||
		w.onReplace()
 | 
							w.onReplace()
 | 
				
			||||||
| 
						 | 
					@ -412,12 +417,26 @@ func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
 | 
					func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
 | 
				
			||||||
	size := w.endIndex - w.startIndex
 | 
						size := w.endIndex - w.startIndex
 | 
				
			||||||
	// if we have no watch events in our cache, the oldest one we can successfully deliver to a watcher
 | 
						var oldest uint64
 | 
				
			||||||
	// is the *next* event we'll receive, which will be at least one greater than our current resourceVersion
 | 
						switch {
 | 
				
			||||||
	oldest := w.resourceVersion + 1
 | 
						case size >= w.capacity:
 | 
				
			||||||
	if size > 0 {
 | 
							// Once the watch event buffer is full, the oldest watch event we can deliver
 | 
				
			||||||
 | 
							// is the first one in the buffer.
 | 
				
			||||||
		oldest = w.cache[w.startIndex%w.capacity].resourceVersion
 | 
							oldest = w.cache[w.startIndex%w.capacity].resourceVersion
 | 
				
			||||||
 | 
						case w.listResourceVersion > 0:
 | 
				
			||||||
 | 
							// If the watch event buffer isn't full, the oldest watch event we can deliver
 | 
				
			||||||
 | 
							// is one greater than the resource version of the last full list.
 | 
				
			||||||
 | 
							oldest = w.listResourceVersion + 1
 | 
				
			||||||
 | 
						case size > 0:
 | 
				
			||||||
 | 
							// If we've never completed a list, use the resourceVersion of the oldest event
 | 
				
			||||||
 | 
							// in the buffer.
 | 
				
			||||||
 | 
							// This should only happen in unit tests that populate the buffer without
 | 
				
			||||||
 | 
							// performing list/replace operations.
 | 
				
			||||||
 | 
							oldest = w.cache[w.startIndex%w.capacity].resourceVersion
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("watch cache isn't correctly initialized")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if resourceVersion == 0 {
 | 
						if resourceVersion == 0 {
 | 
				
			||||||
		// resourceVersion = 0 means that we don't require any specific starting point
 | 
							// resourceVersion = 0 means that we don't require any specific starting point
 | 
				
			||||||
		// and we would like to start watching from ~now.
 | 
							// and we would like to start watching from ~now.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -19,6 +19,7 @@ package cacher
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
 | 
						"strings"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -278,6 +279,41 @@ func TestEvents(t *testing.T) {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestMarker(t *testing.T) {
 | 
				
			||||||
 | 
						store := newTestWatchCache(3)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// First thing that is called when propagated from storage is Replace.
 | 
				
			||||||
 | 
						store.Replace([]interface{}{
 | 
				
			||||||
 | 
							makeTestPod("pod1", 5),
 | 
				
			||||||
 | 
							makeTestPod("pod2", 9),
 | 
				
			||||||
 | 
						}, "9")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						_, err := store.GetAllEventsSince(8)
 | 
				
			||||||
 | 
						if err == nil || !strings.Contains(err.Error(), "too old resource version") {
 | 
				
			||||||
 | 
							t.Errorf("unexpected error: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Getting events from 8 should return no events,
 | 
				
			||||||
 | 
						// even though there is a marker there.
 | 
				
			||||||
 | 
						result, err := store.GetAllEventsSince(9)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected error: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(result) != 0 {
 | 
				
			||||||
 | 
							t.Errorf("unexpected result: %#v, expected no events", result)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pod := makeTestPod("pods", 12)
 | 
				
			||||||
 | 
						store.Add(pod)
 | 
				
			||||||
 | 
						// Getting events from 8 should still work and return one event.
 | 
				
			||||||
 | 
						result, err = store.GetAllEventsSince(9)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected error: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(result) != 1 || !apiequality.Semantic.DeepEqual(result[0].Object, pod) {
 | 
				
			||||||
 | 
							t.Errorf("unexpected result: %#v, expected %v", result, pod)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestWaitUntilFreshAndList(t *testing.T) {
 | 
					func TestWaitUntilFreshAndList(t *testing.T) {
 | 
				
			||||||
	store := newTestWatchCache(3)
 | 
						store := newTestWatchCache(3)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue