From 6cab3e1a4b0a42c043c906a29474f18bb8728de7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Fri, 3 Mar 2023 13:22:13 +0100 Subject: [PATCH] Fix setting resource version after init events for RV=0 case Kubernetes-commit: 6fb59e94a31cc0288c861e791e5b6e3d9903ca84 --- pkg/storage/cacher/cache_watcher.go | 11 ++++- pkg/storage/cacher/cache_watcher_test.go | 59 ++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/pkg/storage/cacher/cache_watcher.go b/pkg/storage/cacher/cache_watcher.go index 69cc9f7e8..478d2151d 100644 --- a/pkg/storage/cacher/cache_watcher.go +++ b/pkg/storage/cacher/cache_watcher.go @@ -488,10 +488,19 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch break } c.sendWatchCacheEvent(event) + // With some events already sent, update resourceVersion so that // events that were buffered and not yet processed won't be delivered // to this watcher second time causing going back in time. - resourceVersion = event.ResourceVersion + // + // There is one case where events are not necessary ordered by + // resourceVersion, being a case of watching from resourceVersion=0, + // which at the beginning returns the state of each objects. + // For the purpose of it, we need to max it with the resource version + // that we have so far. + if event.ResourceVersion > resourceVersion { + resourceVersion = event.ResourceVersion + } initEventCount++ } diff --git a/pkg/storage/cacher/cache_watcher_test.go b/pkg/storage/cacher/cache_watcher_test.go index c18325951..d408c7ae8 100644 --- a/pkg/storage/cacher/cache_watcher_test.go +++ b/pkg/storage/cacher/cache_watcher_test.go @@ -28,12 +28,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + "k8s.io/client-go/tools/cache" testingclock "k8s.io/utils/clock/testing" ) @@ -285,6 +287,63 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) { } +func TestResourceVersionAfterInitEvents(t *testing.T) { + getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) { + return nil, nil, nil + } + + const numObjects = 10 + store := cache.NewIndexer(storeElementKey, storeElementIndexers(nil)) + + for i := 0; i < numObjects; i++ { + elem := makeTestStoreElement(makeTestPod(fmt.Sprintf("pod-%d", i), uint64(i))) + store.Add(elem) + } + + wci, err := newCacheIntervalFromStore(numObjects, store, getAttrsFunc) + if err != nil { + t.Fatal(err) + } + + filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true } + forget := func(_ bool) {} + deadline := time.Now().Add(time.Minute) + w := newCacheWatcher(numObjects+1, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") + + // Simulate a situation when the last event will that was already in + // the state, wasn't yet processed by cacher and will be delivered + // via channel again. + event := &watchCacheEvent{ + Type: watch.Added, + Object: makeTestPod(fmt.Sprintf("pod-%d", numObjects-1), uint64(numObjects-1)), + ResourceVersion: uint64(numObjects - 1), + } + if !w.add(event, time.NewTimer(time.Second)) { + t.Fatalf("failed to add event") + } + w.stopLocked() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + w.processInterval(context.Background(), wci, uint64(numObjects-1)) + }() + + // We expect all init events to be delivered. + for i := 0; i < numObjects; i++ { + <-w.ResultChan() + } + // We don't expect any other event to be delivered and thus + // the ResultChan to be closed. + result, ok := <-w.ResultChan() + if ok { + t.Errorf("unexpected event: %#v", result) + } + + wg.Wait() +} + func TestTimeBucketWatchersBasic(t *testing.T) { filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true