diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index a350faf6a..d89d57431 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -104,7 +104,7 @@ type Config struct { Codec runtime.Codec - Clock clock.Clock + Clock clock.WithTicker } type watchersMap map[int]*cacheWatcher @@ -329,6 +329,10 @@ type Cacher struct { expiredBookmarkWatchers []*cacheWatcher } +func (c *Cacher) RequestWatchProgress(ctx context.Context) error { + return c.storage.RequestWatchProgress(ctx) +} + // NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from // its internal cache and updating its cache in the background based on the // given configuration. @@ -397,9 +401,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { // so that future reuse does not get a spurious timeout. <-cacher.timer.C } - + progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock) watchCache := newWatchCache( - config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource) + config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester) listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix @@ -419,6 +423,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { cacher.reflector = reflector go cacher.dispatchEvents() + go progressRequester.Run(stopCh) cacher.stopWg.Add(1) go func() { diff --git a/pkg/storage/cacher/cacher_test.go b/pkg/storage/cacher/cacher_test.go index 21400048a..018ecfdcd 100644 --- a/pkg/storage/cacher/cacher_test.go +++ b/pkg/storage/cacher/cacher_test.go @@ -328,7 +328,7 @@ type setupOptions struct { keyFunc func(runtime.Object) (string, error) indexerFuncs map[string]storage.IndexerFunc pagingEnabled bool - clock clock.Clock + clock clock.WithTicker } type setupOption func(*setupOptions) diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 47219676e..ae77e5497 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -90,6 +90,10 @@ type dummyStorage struct { watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) } +func (d *dummyStorage) RequestWatchProgress(ctx context.Context) error { + return nil +} + type dummyWatch struct { ch chan watch.Event } diff --git a/pkg/storage/cacher/watch_cache.go b/pkg/storage/cacher/watch_cache.go index 9e7259c9a..bbffffc19 100644 --- a/pkg/storage/cacher/watch_cache.go +++ b/pkg/storage/cacher/watch_cache.go @@ -196,6 +196,10 @@ type watchCache struct { // For testing cache interval invalidation. indexValidator indexValidator + + // Requests progress notification if there are requests waiting for watch + // to be fresh + waitingUntilFresh *conditionalProgressRequester } func newWatchCache( @@ -204,8 +208,9 @@ func newWatchCache( getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error), versioner storage.Versioner, indexers *cache.Indexers, - clock clock.Clock, - groupResource schema.GroupResource) *watchCache { + clock clock.WithTicker, + groupResource schema.GroupResource, + progressRequester *conditionalProgressRequester) *watchCache { wc := &watchCache{ capacity: defaultLowerBoundCapacity, keyFunc: keyFunc, @@ -222,6 +227,7 @@ func newWatchCache( clock: clock, versioner: versioner, groupResource: groupResource, + waitingUntilFresh: progressRequester, } metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity)) wc.cond = sync.NewCond(wc.RLocker()) diff --git a/pkg/storage/cacher/watch_cache_interval_test.go b/pkg/storage/cacher/watch_cache_interval_test.go index 14562da10..fb5c3fb10 100644 --- a/pkg/storage/cacher/watch_cache_interval_test.go +++ b/pkg/storage/cacher/watch_cache_interval_test.go @@ -287,6 +287,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { wc := newTestWatchCache(capacity, &cache.Indexers{}) + defer wc.Stop() for i := 0; i < c.eventsAddedToWatchcache; i++ { wc.Add(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i))) } diff --git a/pkg/storage/cacher/watch_cache_test.go b/pkg/storage/cacher/watch_cache_test.go index d5571bb43..1a4c071e9 100644 --- a/pkg/storage/cacher/watch_cache_test.go +++ b/pkg/storage/cacher/watch_cache_test.go @@ -68,6 +68,9 @@ func makeTestStoreElement(pod *v1.Pod) *storeElement { type testWatchCache struct { *watchCache + + bookmarkRevision chan int64 + stopCh chan struct{} } func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) { @@ -112,7 +115,13 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { } versioner := storage.APIObjectVersioner{} mockHandler := func(*watchCacheEvent) {} - wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}) + wc := &testWatchCache{} + wc.bookmarkRevision = make(chan int64, 1) + wc.stopCh = make(chan struct{}) + clock := testingclock.NewFakeClock(time.Now()) + pr := newConditionalProgressRequester(wc.RequestWatchProgress, clock) + go pr.Run(wc.stopCh) + wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, clock, schema.GroupResource{Resource: "pods"}, pr) // To preserve behavior of tests that assume a given capacity, // resize it to th expected size. wc.capacity = capacity @@ -120,11 +129,28 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity) wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity) - return &testWatchCache{watchCache: wc} + return wc +} + +func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error { + go func() { + select { + case rev := <-w.bookmarkRevision: + w.UpdateResourceVersion(fmt.Sprintf("%d", rev)) + case <-ctx.Done(): + return + } + }() + return nil +} + +func (w *testWatchCache) Stop() { + close(w.stopCh) } func TestWatchCacheBasic(t *testing.T) { store := newTestWatchCache(2, &cache.Indexers{}) + defer store.Stop() // Test Add/Update/Delete. pod1 := makeTestPod("pod", 1) @@ -202,6 +228,7 @@ func TestWatchCacheBasic(t *testing.T) { func TestEvents(t *testing.T) { store := newTestWatchCache(5, &cache.Indexers{}) + defer store.Stop() // no dynamic-size cache to fit old tests. store.lowerBoundCapacity = 5 @@ -326,6 +353,7 @@ func TestEvents(t *testing.T) { func TestMarker(t *testing.T) { store := newTestWatchCache(3, &cache.Indexers{}) + defer store.Stop() // First thing that is called when propagated from storage is Replace. store.Replace([]interface{}{ @@ -380,7 +408,7 @@ func TestWaitUntilFreshAndList(t *testing.T) { return []string{pod.Spec.NodeName}, nil }, }) - + defer store.Stop() // In background, update the store. go func() { store.Add(makeTestPodDetails("pod1", 2, "node1", map[string]string{"label": "value1"})) @@ -463,6 +491,7 @@ func TestWaitUntilFreshAndList(t *testing.T) { func TestWaitUntilFreshAndGet(t *testing.T) { ctx := context.Background() store := newTestWatchCache(3, &cache.Indexers{}) + defer store.Stop() // In background, update the store. go func() { @@ -489,6 +518,7 @@ func TestWaitUntilFreshAndGet(t *testing.T) { func TestWaitUntilFreshAndListTimeout(t *testing.T) { ctx := context.Background() store := newTestWatchCache(3, &cache.Indexers{}) + defer store.Stop() fc := store.clock.(*testingclock.FakeClock) // In background, step clock after the below call starts the timer. @@ -529,6 +559,7 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) { func TestReflectorForWatchCache(t *testing.T) { ctx := context.Background() store := newTestWatchCache(5, &cache.Indexers{}) + defer store.Stop() { _, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil) @@ -792,6 +823,7 @@ func TestDynamicCache(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{}) + defer store.Stop() store.cache = make([]*watchCacheEvent, test.cacheCapacity) store.startIndex = test.startIndex store.lowerBoundCapacity = test.lowerBoundCapacity @@ -840,6 +872,7 @@ func checkCacheElements(cache *testWatchCache) bool { func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { store := newTestWatchCache(2, &cache.Indexers{}) + defer store.Stop() now := store.clock.Now() addEvent := func(key string, rv uint64, t time.Time) { @@ -988,6 +1021,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { store := newTestWatchCache(test.capacity, &cache.Indexers{}) + defer store.Stop() got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed) if got != test.expected { t.Errorf("unexpected channel size got: %v, expected: %v", got, test.expected) @@ -998,6 +1032,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) { func BenchmarkWatchCache_updateCache(b *testing.B) { store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{}) + defer store.Stop() store.cache = store.cache[:0] store.upperBoundCapacity = defaultUpperBoundCapacity loadEventWithDuration(store, defaultUpperBoundCapacity, 0) diff --git a/pkg/storage/cacher/watch_progress.go b/pkg/storage/cacher/watch_progress.go new file mode 100644 index 000000000..332c27b5d --- /dev/null +++ b/pkg/storage/cacher/watch_progress.go @@ -0,0 +1,117 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "context" + "sync" + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +const ( + // progressRequestPeriod determines period of requesting progress + // from etcd when there is a request waiting for watch cache to be fresh. + progressRequestPeriod = 100 * time.Millisecond +) + +func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock clock.WithTicker) *conditionalProgressRequester { + pr := &conditionalProgressRequester{ + clock: clock, + requestWatchProgress: requestWatchProgress, + } + pr.cond = sync.NewCond(pr.mux.RLocker()) + return pr +} + +type WatchProgressRequester func(ctx context.Context) error + +// conditionalProgressRequester will request progress notification if there +// is a request waiting for watch cache to be fresh. +type conditionalProgressRequester struct { + clock clock.WithTicker + requestWatchProgress WatchProgressRequester + + mux sync.RWMutex + cond *sync.Cond + waiting int + stopped bool +} + +func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { + ctx := wait.ContextForChannel(stopCh) + go func() { + defer utilruntime.HandleCrash() + <-stopCh + pr.mux.Lock() + defer pr.mux.Unlock() + pr.stopped = true + pr.cond.Signal() + }() + ticker := pr.clock.NewTicker(progressRequestPeriod) + defer ticker.Stop() + for { + stopped := func() bool { + pr.mux.RLock() + defer pr.mux.RUnlock() + for pr.waiting == 0 && !pr.stopped { + pr.cond.Wait() + } + return pr.stopped + }() + if stopped { + return + } + + select { + case <-ticker.C(): + shouldRequest := func() bool { + pr.mux.RLock() + defer pr.mux.RUnlock() + return pr.waiting > 0 && !pr.stopped + }() + if !shouldRequest { + continue + } + err := pr.requestWatchProgress(ctx) + if err != nil { + klog.V(4).InfoS("Error requesting bookmark", "err", err) + } + case <-stopCh: + return + } + } +} + +func (pr *conditionalProgressRequester) Add() { + pr.mux.Lock() + defer pr.mux.Unlock() + pr.waiting += 1 + pr.cond.Signal() +} + +func (pr *conditionalProgressRequester) Remove() { + pr.mux.Lock() + defer pr.mux.Unlock() + pr.waiting -= 1 + pr.cond.Signal() +} diff --git a/pkg/storage/cacher/watch_progress_test.go b/pkg/storage/cacher/watch_progress_test.go new file mode 100644 index 000000000..e8cc13c3c --- /dev/null +++ b/pkg/storage/cacher/watch_progress_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" + "k8s.io/utils/clock" + testingclock "k8s.io/utils/clock/testing" +) + +var ( + pollPeriod = time.Millisecond + minimalNoChange = 20 * time.Millisecond + pollTimeout = 5 * time.Second +) + +func TestConditionalProgressRequester(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + logger := klog.FromContext(ctx) + + clock := testingclock.NewFakeClock(time.Now()) + pr := newTestConditionalProgressRequester(clock) + stopCh := make(chan struct{}) + go pr.Run(stopCh) + var wantRequestsSent int32 + var requestsSent int32 + + logger.Info("No progress requests if no-one is waiting") + clock.Step(progressRequestPeriod * 2) + + if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) + } + + logger.Info("Adding allows progress request to be sent every period") + pr.Add() + for wantRequestsSent < 10 { + clock.Step(progressRequestPeriod) + wantRequestsSent++ + + if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) + } + } + pr.Remove() + + logger.Info("No progress requests if no-one is waiting") + clock.Step(progressRequestPeriod * 2) + if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) + } + + logger.Info("No progress after stopping") + close(stopCh) + if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) + } + pr.Add() + clock.Step(progressRequestPeriod * 2) + if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) + } +} + +func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester { + pr := &testConditionalProgressRequester{} + pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock) + return pr +} + +type testConditionalProgressRequester struct { + *conditionalProgressRequester + progressRequestsSentCount atomic.Int32 +} + +func (pr *testConditionalProgressRequester) RequestWatchProgress(ctx context.Context) error { + pr.progressRequestsSentCount.Add(1) + return nil +} + +func pollConditionNoChange(interval, stable, timeout time.Duration, condition func() bool) error { + passCounter := 0 + requiredNumberOfPasses := int(stable/interval) + 1 + return wait.Poll(interval, timeout, func() (done bool, err error) { + if condition() { + passCounter++ + } else { + passCounter = 0 + } + return passCounter >= requiredNumberOfPasses, nil + }) +} diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index ff700bf60..737415223 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -85,6 +85,12 @@ type store struct { leaseManager *leaseManager } +func (s *store) RequestWatchProgress(ctx context.Context) error { + // Use watchContext to match ctx metadata provided when creating the watch. + // In best case scenario we would use the same context that watch was created, but there is no way access it from watchCache. + return s.client.RequestProgress(s.watchContext(ctx)) +} + type objState struct { obj runtime.Object meta *storage.ResponseMeta diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index a4f524997..d4929bd9d 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -215,6 +215,10 @@ func (wc *watchChan) ResultChan() <-chan watch.Event { return wc.resultChan } +func (wc *watchChan) RequestWatchProgress() error { + return wc.watcher.client.RequestProgress(wc.ctx) +} + // sync tries to retrieve existing data and send them to process. // The revision to watch will be set to the revision in response. // All events sent will have isCreated=true diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index daf30a242..76123fde8 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -236,6 +236,21 @@ type Interface interface { // Count returns number of different entries under the key (generally being path prefix). Count(key string) (int64, error) + + // RequestWatchProgress requests the a watch stream progress status be sent in the + // watch response stream as soon as possible. + // Used for monitor watch progress even if watching resources with no changes. + // + // If watch is lagging, progress status might: + // * be pointing to stale resource version. Use etcd KV request to get linearizable resource version. + // * not be delivered at all. It's recommended to poll request progress periodically. + // + // Note: Only watches with matching context grpc metadata will be notified. + // https://github.com/kubernetes/kubernetes/blob/9325a57125e8502941d1b0c7379c4bb80a678d5c/vendor/go.etcd.io/etcd/client/v3/watch.go#L1037-L1042 + // + // TODO: Remove when storage.Interface will be separate from etc3.store. + // Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache. + RequestWatchProgress(ctx context.Context) error } // GetOptions provides the options that may be provided for storage get operations.