storage/cacher/cache_watcher: add RV to watchCacheInterval
Kubernetes-commit: f87e4a19c88fa908eb176ee7925f211bafba9b45
This commit is contained in:
parent
c34f5e1bba
commit
743b5776f9
|
|
@ -746,7 +746,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string,
|
|||
indexerFunc := func(i int) *watchCacheEvent {
|
||||
return w.cache[i%w.capacity]
|
||||
}
|
||||
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, w.RWMutex.RLocker())
|
||||
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, resourceVersion, w.RWMutex.RLocker())
|
||||
return ci, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -91,6 +91,10 @@ type watchCacheInterval struct {
|
|||
// lock on each invocation of Next().
|
||||
buffer *watchCacheIntervalBuffer
|
||||
|
||||
// resourceVersion is the resourceVersion from which
|
||||
// the interval was constructed.
|
||||
resourceVersion uint64
|
||||
|
||||
// lock effectively protects access to the underlying source
|
||||
// of events through - indexer and indexValidator.
|
||||
//
|
||||
|
|
@ -103,13 +107,14 @@ type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
|
|||
type indexerFunc func(int) *watchCacheEvent
|
||||
type indexValidator func(int) bool
|
||||
|
||||
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, locker sync.Locker) *watchCacheInterval {
|
||||
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, resourceVersion uint64, locker sync.Locker) *watchCacheInterval {
|
||||
return &watchCacheInterval{
|
||||
startIndex: startIndex,
|
||||
endIndex: endIndex,
|
||||
indexer: indexer,
|
||||
indexValidator: indexValidator,
|
||||
buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)},
|
||||
resourceVersion: resourceVersion,
|
||||
lock: locker,
|
||||
}
|
||||
}
|
||||
|
|
@ -174,6 +179,7 @@ func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAt
|
|||
// Simulate that we already have all the events we're looking for.
|
||||
endIndex: 0,
|
||||
buffer: buffer,
|
||||
resourceVersion: resourceVersion,
|
||||
}
|
||||
|
||||
return ci, nil
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ func intervalFromEvents(events []*watchCacheEvent) *watchCacheInterval {
|
|||
}
|
||||
indexValidator := func(_ int) bool { return true }
|
||||
|
||||
return newCacheInterval(startIndex, endIndex, indexer, indexValidator, locker)
|
||||
return newCacheInterval(startIndex, endIndex, indexer, indexValidator, 0, locker)
|
||||
}
|
||||
|
||||
func bufferFromEvents(events []*watchCacheEvent) *watchCacheIntervalBuffer {
|
||||
|
|
@ -300,6 +300,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) {
|
|||
wc.endIndex,
|
||||
indexerFunc,
|
||||
wc.isIndexValidLocked,
|
||||
wc.resourceVersion,
|
||||
&wc.RWMutex,
|
||||
)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue