Implement conditionalProgressRequester that allows requesting watch progress notification if watch cache is not fresh

Kubernetes-commit: 98461be8ffa7383152c442414a16adb217e98080
This commit is contained in:
Marek Siarkowicz 2023-07-10 18:10:49 +02:00 committed by Kubernetes Publisher
parent 6e247788f7
commit cd751eb82e
11 changed files with 331 additions and 9 deletions

View File

@ -104,7 +104,7 @@ type Config struct {
Codec runtime.Codec Codec runtime.Codec
Clock clock.Clock Clock clock.WithTicker
} }
type watchersMap map[int]*cacheWatcher type watchersMap map[int]*cacheWatcher
@ -329,6 +329,10 @@ type Cacher struct {
expiredBookmarkWatchers []*cacheWatcher expiredBookmarkWatchers []*cacheWatcher
} }
func (c *Cacher) RequestWatchProgress(ctx context.Context) error {
return c.storage.RequestWatchProgress(ctx)
}
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from // NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the // its internal cache and updating its cache in the background based on the
// given configuration. // given configuration.
@ -397,9 +401,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
// so that future reuse does not get a spurious timeout. // so that future reuse does not get a spurious timeout.
<-cacher.timer.C <-cacher.timer.C
} }
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock)
watchCache := newWatchCache( watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource) config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix reflectorName := "storage/cacher.go:" + config.ResourcePrefix
@ -419,6 +423,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
cacher.reflector = reflector cacher.reflector = reflector
go cacher.dispatchEvents() go cacher.dispatchEvents()
go progressRequester.Run(stopCh)
cacher.stopWg.Add(1) cacher.stopWg.Add(1)
go func() { go func() {

View File

@ -328,7 +328,7 @@ type setupOptions struct {
keyFunc func(runtime.Object) (string, error) keyFunc func(runtime.Object) (string, error)
indexerFuncs map[string]storage.IndexerFunc indexerFuncs map[string]storage.IndexerFunc
pagingEnabled bool pagingEnabled bool
clock clock.Clock clock clock.WithTicker
} }
type setupOption func(*setupOptions) type setupOption func(*setupOptions)

View File

@ -90,6 +90,10 @@ type dummyStorage struct {
watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error)
} }
func (d *dummyStorage) RequestWatchProgress(ctx context.Context) error {
return nil
}
type dummyWatch struct { type dummyWatch struct {
ch chan watch.Event ch chan watch.Event
} }

View File

@ -196,6 +196,10 @@ type watchCache struct {
// For testing cache interval invalidation. // For testing cache interval invalidation.
indexValidator indexValidator indexValidator indexValidator
// Requests progress notification if there are requests waiting for watch
// to be fresh
waitingUntilFresh *conditionalProgressRequester
} }
func newWatchCache( func newWatchCache(
@ -204,8 +208,9 @@ func newWatchCache(
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error), getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
versioner storage.Versioner, versioner storage.Versioner,
indexers *cache.Indexers, indexers *cache.Indexers,
clock clock.Clock, clock clock.WithTicker,
groupResource schema.GroupResource) *watchCache { groupResource schema.GroupResource,
progressRequester *conditionalProgressRequester) *watchCache {
wc := &watchCache{ wc := &watchCache{
capacity: defaultLowerBoundCapacity, capacity: defaultLowerBoundCapacity,
keyFunc: keyFunc, keyFunc: keyFunc,
@ -222,6 +227,7 @@ func newWatchCache(
clock: clock, clock: clock,
versioner: versioner, versioner: versioner,
groupResource: groupResource, groupResource: groupResource,
waitingUntilFresh: progressRequester,
} }
metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity)) metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity))
wc.cond = sync.NewCond(wc.RLocker()) wc.cond = sync.NewCond(wc.RLocker())

View File

@ -287,6 +287,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) {
for _, c := range cases { for _, c := range cases {
t.Run(c.name, func(t *testing.T) { t.Run(c.name, func(t *testing.T) {
wc := newTestWatchCache(capacity, &cache.Indexers{}) wc := newTestWatchCache(capacity, &cache.Indexers{})
defer wc.Stop()
for i := 0; i < c.eventsAddedToWatchcache; i++ { for i := 0; i < c.eventsAddedToWatchcache; i++ {
wc.Add(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i))) wc.Add(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i)))
} }

View File

@ -68,6 +68,9 @@ func makeTestStoreElement(pod *v1.Pod) *storeElement {
type testWatchCache struct { type testWatchCache struct {
*watchCache *watchCache
bookmarkRevision chan int64
stopCh chan struct{}
} }
func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) { func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
@ -112,7 +115,13 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
} }
versioner := storage.APIObjectVersioner{} versioner := storage.APIObjectVersioner{}
mockHandler := func(*watchCacheEvent) {} mockHandler := func(*watchCacheEvent) {}
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}) wc := &testWatchCache{}
wc.bookmarkRevision = make(chan int64, 1)
wc.stopCh = make(chan struct{})
clock := testingclock.NewFakeClock(time.Now())
pr := newConditionalProgressRequester(wc.RequestWatchProgress, clock)
go pr.Run(wc.stopCh)
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, clock, schema.GroupResource{Resource: "pods"}, pr)
// To preserve behavior of tests that assume a given capacity, // To preserve behavior of tests that assume a given capacity,
// resize it to th expected size. // resize it to th expected size.
wc.capacity = capacity wc.capacity = capacity
@ -120,11 +129,28 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity) wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity)
wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity) wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity)
return &testWatchCache{watchCache: wc} return wc
}
func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error {
go func() {
select {
case rev := <-w.bookmarkRevision:
w.UpdateResourceVersion(fmt.Sprintf("%d", rev))
case <-ctx.Done():
return
}
}()
return nil
}
func (w *testWatchCache) Stop() {
close(w.stopCh)
} }
func TestWatchCacheBasic(t *testing.T) { func TestWatchCacheBasic(t *testing.T) {
store := newTestWatchCache(2, &cache.Indexers{}) store := newTestWatchCache(2, &cache.Indexers{})
defer store.Stop()
// Test Add/Update/Delete. // Test Add/Update/Delete.
pod1 := makeTestPod("pod", 1) pod1 := makeTestPod("pod", 1)
@ -202,6 +228,7 @@ func TestWatchCacheBasic(t *testing.T) {
func TestEvents(t *testing.T) { func TestEvents(t *testing.T) {
store := newTestWatchCache(5, &cache.Indexers{}) store := newTestWatchCache(5, &cache.Indexers{})
defer store.Stop()
// no dynamic-size cache to fit old tests. // no dynamic-size cache to fit old tests.
store.lowerBoundCapacity = 5 store.lowerBoundCapacity = 5
@ -326,6 +353,7 @@ func TestEvents(t *testing.T) {
func TestMarker(t *testing.T) { func TestMarker(t *testing.T) {
store := newTestWatchCache(3, &cache.Indexers{}) store := newTestWatchCache(3, &cache.Indexers{})
defer store.Stop()
// First thing that is called when propagated from storage is Replace. // First thing that is called when propagated from storage is Replace.
store.Replace([]interface{}{ store.Replace([]interface{}{
@ -380,7 +408,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
return []string{pod.Spec.NodeName}, nil return []string{pod.Spec.NodeName}, nil
}, },
}) })
defer store.Stop()
// In background, update the store. // In background, update the store.
go func() { go func() {
store.Add(makeTestPodDetails("pod1", 2, "node1", map[string]string{"label": "value1"})) store.Add(makeTestPodDetails("pod1", 2, "node1", map[string]string{"label": "value1"}))
@ -463,6 +491,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
func TestWaitUntilFreshAndGet(t *testing.T) { func TestWaitUntilFreshAndGet(t *testing.T) {
ctx := context.Background() ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{}) store := newTestWatchCache(3, &cache.Indexers{})
defer store.Stop()
// In background, update the store. // In background, update the store.
go func() { go func() {
@ -489,6 +518,7 @@ func TestWaitUntilFreshAndGet(t *testing.T) {
func TestWaitUntilFreshAndListTimeout(t *testing.T) { func TestWaitUntilFreshAndListTimeout(t *testing.T) {
ctx := context.Background() ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{}) store := newTestWatchCache(3, &cache.Indexers{})
defer store.Stop()
fc := store.clock.(*testingclock.FakeClock) fc := store.clock.(*testingclock.FakeClock)
// In background, step clock after the below call starts the timer. // In background, step clock after the below call starts the timer.
@ -529,6 +559,7 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
func TestReflectorForWatchCache(t *testing.T) { func TestReflectorForWatchCache(t *testing.T) {
ctx := context.Background() ctx := context.Background()
store := newTestWatchCache(5, &cache.Indexers{}) store := newTestWatchCache(5, &cache.Indexers{})
defer store.Stop()
{ {
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil) _, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil)
@ -792,6 +823,7 @@ func TestDynamicCache(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{}) store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{})
defer store.Stop()
store.cache = make([]*watchCacheEvent, test.cacheCapacity) store.cache = make([]*watchCacheEvent, test.cacheCapacity)
store.startIndex = test.startIndex store.startIndex = test.startIndex
store.lowerBoundCapacity = test.lowerBoundCapacity store.lowerBoundCapacity = test.lowerBoundCapacity
@ -840,6 +872,7 @@ func checkCacheElements(cache *testWatchCache) bool {
func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
store := newTestWatchCache(2, &cache.Indexers{}) store := newTestWatchCache(2, &cache.Indexers{})
defer store.Stop()
now := store.clock.Now() now := store.clock.Now()
addEvent := func(key string, rv uint64, t time.Time) { addEvent := func(key string, rv uint64, t time.Time) {
@ -988,6 +1021,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
for _, test := range testCases { for _, test := range testCases {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
store := newTestWatchCache(test.capacity, &cache.Indexers{}) store := newTestWatchCache(test.capacity, &cache.Indexers{})
defer store.Stop()
got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed) got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed)
if got != test.expected { if got != test.expected {
t.Errorf("unexpected channel size got: %v, expected: %v", got, test.expected) t.Errorf("unexpected channel size got: %v, expected: %v", got, test.expected)
@ -998,6 +1032,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
func BenchmarkWatchCache_updateCache(b *testing.B) { func BenchmarkWatchCache_updateCache(b *testing.B) {
store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{}) store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
defer store.Stop()
store.cache = store.cache[:0] store.cache = store.cache[:0]
store.upperBoundCapacity = defaultUpperBoundCapacity store.upperBoundCapacity = defaultUpperBoundCapacity
loadEventWithDuration(store, defaultUpperBoundCapacity, 0) loadEventWithDuration(store, defaultUpperBoundCapacity, 0)

View File

@ -0,0 +1,117 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
"sync"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
const (
// progressRequestPeriod determines period of requesting progress
// from etcd when there is a request waiting for watch cache to be fresh.
progressRequestPeriod = 100 * time.Millisecond
)
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock clock.WithTicker) *conditionalProgressRequester {
pr := &conditionalProgressRequester{
clock: clock,
requestWatchProgress: requestWatchProgress,
}
pr.cond = sync.NewCond(pr.mux.RLocker())
return pr
}
type WatchProgressRequester func(ctx context.Context) error
// conditionalProgressRequester will request progress notification if there
// is a request waiting for watch cache to be fresh.
type conditionalProgressRequester struct {
clock clock.WithTicker
requestWatchProgress WatchProgressRequester
mux sync.RWMutex
cond *sync.Cond
waiting int
stopped bool
}
func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
ctx := wait.ContextForChannel(stopCh)
go func() {
defer utilruntime.HandleCrash()
<-stopCh
pr.mux.Lock()
defer pr.mux.Unlock()
pr.stopped = true
pr.cond.Signal()
}()
ticker := pr.clock.NewTicker(progressRequestPeriod)
defer ticker.Stop()
for {
stopped := func() bool {
pr.mux.RLock()
defer pr.mux.RUnlock()
for pr.waiting == 0 && !pr.stopped {
pr.cond.Wait()
}
return pr.stopped
}()
if stopped {
return
}
select {
case <-ticker.C():
shouldRequest := func() bool {
pr.mux.RLock()
defer pr.mux.RUnlock()
return pr.waiting > 0 && !pr.stopped
}()
if !shouldRequest {
continue
}
err := pr.requestWatchProgress(ctx)
if err != nil {
klog.V(4).InfoS("Error requesting bookmark", "err", err)
}
case <-stopCh:
return
}
}
}
func (pr *conditionalProgressRequester) Add() {
pr.mux.Lock()
defer pr.mux.Unlock()
pr.waiting += 1
pr.cond.Signal()
}
func (pr *conditionalProgressRequester) Remove() {
pr.mux.Lock()
defer pr.mux.Unlock()
pr.waiting -= 1
pr.cond.Signal()
}

View File

@ -0,0 +1,129 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
)
var (
pollPeriod = time.Millisecond
minimalNoChange = 20 * time.Millisecond
pollTimeout = 5 * time.Second
)
func TestConditionalProgressRequester(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
logger := klog.FromContext(ctx)
clock := testingclock.NewFakeClock(time.Now())
pr := newTestConditionalProgressRequester(clock)
stopCh := make(chan struct{})
go pr.Run(stopCh)
var wantRequestsSent int32
var requestsSent int32
logger.Info("No progress requests if no-one is waiting")
clock.Step(progressRequestPeriod * 2)
if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
}
logger.Info("Adding allows progress request to be sent every period")
pr.Add()
for wantRequestsSent < 10 {
clock.Step(progressRequestPeriod)
wantRequestsSent++
if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
}
}
pr.Remove()
logger.Info("No progress requests if no-one is waiting")
clock.Step(progressRequestPeriod * 2)
if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
}
logger.Info("No progress after stopping")
close(stopCh)
if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
}
pr.Add()
clock.Step(progressRequestPeriod * 2)
if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
}
}
func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester {
pr := &testConditionalProgressRequester{}
pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock)
return pr
}
type testConditionalProgressRequester struct {
*conditionalProgressRequester
progressRequestsSentCount atomic.Int32
}
func (pr *testConditionalProgressRequester) RequestWatchProgress(ctx context.Context) error {
pr.progressRequestsSentCount.Add(1)
return nil
}
func pollConditionNoChange(interval, stable, timeout time.Duration, condition func() bool) error {
passCounter := 0
requiredNumberOfPasses := int(stable/interval) + 1
return wait.Poll(interval, timeout, func() (done bool, err error) {
if condition() {
passCounter++
} else {
passCounter = 0
}
return passCounter >= requiredNumberOfPasses, nil
})
}

View File

@ -85,6 +85,12 @@ type store struct {
leaseManager *leaseManager leaseManager *leaseManager
} }
func (s *store) RequestWatchProgress(ctx context.Context) error {
// Use watchContext to match ctx metadata provided when creating the watch.
// In best case scenario we would use the same context that watch was created, but there is no way access it from watchCache.
return s.client.RequestProgress(s.watchContext(ctx))
}
type objState struct { type objState struct {
obj runtime.Object obj runtime.Object
meta *storage.ResponseMeta meta *storage.ResponseMeta

View File

@ -215,6 +215,10 @@ func (wc *watchChan) ResultChan() <-chan watch.Event {
return wc.resultChan return wc.resultChan
} }
func (wc *watchChan) RequestWatchProgress() error {
return wc.watcher.client.RequestProgress(wc.ctx)
}
// sync tries to retrieve existing data and send them to process. // sync tries to retrieve existing data and send them to process.
// The revision to watch will be set to the revision in response. // The revision to watch will be set to the revision in response.
// All events sent will have isCreated=true // All events sent will have isCreated=true

View File

@ -236,6 +236,21 @@ type Interface interface {
// Count returns number of different entries under the key (generally being path prefix). // Count returns number of different entries under the key (generally being path prefix).
Count(key string) (int64, error) Count(key string) (int64, error)
// RequestWatchProgress requests the a watch stream progress status be sent in the
// watch response stream as soon as possible.
// Used for monitor watch progress even if watching resources with no changes.
//
// If watch is lagging, progress status might:
// * be pointing to stale resource version. Use etcd KV request to get linearizable resource version.
// * not be delivered at all. It's recommended to poll request progress periodically.
//
// Note: Only watches with matching context grpc metadata will be notified.
// https://github.com/kubernetes/kubernetes/blob/9325a57125e8502941d1b0c7379c4bb80a678d5c/vendor/go.etcd.io/etcd/client/v3/watch.go#L1037-L1042
//
// TODO: Remove when storage.Interface will be separate from etc3.store.
// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
RequestWatchProgress(ctx context.Context) error
} }
// GetOptions provides the options that may be provided for storage get operations. // GetOptions provides the options that may be provided for storage get operations.