Fix the bug of returning future data in watch

Kubernetes-commit: 0bc0ad01c9d91fe4baebcef449bdd538a0e91728
This commit is contained in:
Wojciech Tyczyński 2023-03-03 13:10:42 +01:00 committed by Kubernetes Publisher
parent a45b9813bc
commit 50f153f229
1 changed files with 17 additions and 8 deletions

View File

@ -156,14 +156,15 @@ type watchCache struct {
// getAttrsFunc is used to get labels and fields of an object.
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
// cache is used a cyclic buffer - its first element (with the smallest
// resourceVersion) is defined by startIndex, its last element is defined
// by endIndex (if cache is full it will be startIndex + capacity).
// Both startIndex and endIndex can be greater than buffer capacity -
// you should always apply modulo capacity to get an index in cache array.
// cache is used a cyclic buffer - the "current" contents of it are
// stored in [start_index%capacity, end_index%capacity) - so the
// "current" contents have exactly end_index-start_index items.
cache []*watchCacheEvent
startIndex int
endIndex int
// removedEventSinceRelist holds the information whether any of the events
// were already removed from the `cache` cyclic buffer since the last relist
removedEventSinceRelist bool
// store will effectively support LIST operation from the "end of cache
// history" i.e. from the moment just after the newest cached watched event.
@ -346,6 +347,7 @@ func (w *watchCache) updateCache(event *watchCacheEvent) {
if w.isCacheFullLocked() {
// Cache is full - remove the oldest element.
w.startIndex++
w.removedEventSinceRelist = true
}
w.cache[w.endIndex%w.capacity] = event
w.endIndex++
@ -572,8 +574,15 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
w.Lock()
defer w.Unlock()
w.startIndex = 0
w.endIndex = 0
// Ensure startIndex never decreases, so that existing watchCacheInterval
// instances get "invalid" errors if the try to download from the buffer
// using their own start/end indexes calculated from previous buffer
// content.
// Empty the cyclic buffer, ensuring startIndex doesn't decrease.
w.startIndex = w.endIndex
w.removedEventSinceRelist = false
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
return err
}
@ -664,7 +673,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
size := w.endIndex - w.startIndex
var oldest uint64
switch {
case w.listResourceVersion > 0 && w.startIndex == 0:
case w.listResourceVersion > 0 && !w.removedEventSinceRelist:
// If no event was removed from the buffer since last relist, the oldest watch
// event we can deliver is one greater than the resource version of the list.
oldest = w.listResourceVersion + 1