send bookmark right now after sending all items in watchCache store
Kubernetes-commit: 723920253349ee3c272c5b5a77e9d19548c1533c
This commit is contained in:
parent
18cfc4e4d4
commit
f7eddd4bda
|
|
@ -510,6 +510,10 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
|
||||||
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
|
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// send bookmark after sending all events in cacheInterval for watchlist request
|
||||||
|
if cacheInterval.initialEventsEndBookmark != nil {
|
||||||
|
c.sendWatchCacheEvent(cacheInterval.initialEventsEndBookmark)
|
||||||
|
}
|
||||||
c.process(ctx, resourceVersion)
|
c.process(ctx, resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -653,6 +653,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
||||||
return newErrWatcher(err), nil
|
return newErrWatcher(err), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.setInitialEventsEndBookmarkIfRequested(cacheInterval, opts, c.watchCache.resourceVersion)
|
||||||
|
|
||||||
addedWatcher := false
|
addedWatcher := false
|
||||||
func() {
|
func() {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
|
|
@ -1448,6 +1450,26 @@ func (c *Cacher) Wait(ctx context.Context) error {
|
||||||
return c.ready.wait(ctx)
|
return c.ready.wait(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setInitialEventsEndBookmarkIfRequested sets initialEventsEndBookmark field in watchCacheInterval for watchlist request
|
||||||
|
func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCacheInterval, opts storage.ListOptions, currentResourceVersion uint64) {
|
||||||
|
if opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks {
|
||||||
|
// We don't need to set the InitialEventsAnnotation for this bookmark event,
|
||||||
|
// because this will be automatically set during event conversion in cacheWatcher.convertToWatchEvent method
|
||||||
|
initialEventsEndBookmark := &watchCacheEvent{
|
||||||
|
Type: watch.Bookmark,
|
||||||
|
Object: c.newFunc(),
|
||||||
|
ResourceVersion: currentResourceVersion,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.versioner.UpdateObject(initialEventsEndBookmark.Object, initialEventsEndBookmark.ResourceVersion); err != nil {
|
||||||
|
klog.Errorf("failure to set resourceVersion to %d on initialEventsEndBookmark event %+v for watchlist request and wait for bookmark trigger to send", initialEventsEndBookmark.ResourceVersion, initialEventsEndBookmark.Object)
|
||||||
|
initialEventsEndBookmark = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheInterval.initialEventsEndBookmark = initialEventsEndBookmark
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// errWatcher implements watch.Interface to return a single error
|
// errWatcher implements watch.Interface to return a single error
|
||||||
type errWatcher struct {
|
type errWatcher struct {
|
||||||
result chan watch.Event
|
result chan watch.Event
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,7 @@ import (
|
||||||
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
||||||
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
||||||
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
|
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
|
||||||
|
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
k8smetrics "k8s.io/component-base/metrics"
|
k8smetrics "k8s.io/component-base/metrics"
|
||||||
|
|
@ -1171,6 +1172,106 @@ func TestCacherSendBookmarkEvents(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInitialEventsEndBookmark(t *testing.T) {
|
||||||
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)
|
||||||
|
forceRequestWatchProgressSupport(t)
|
||||||
|
|
||||||
|
backingStorage := &dummyStorage{}
|
||||||
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
makePod := func(index uint64) *example.Pod {
|
||||||
|
return &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("pod-%d", index),
|
||||||
|
Namespace: "ns",
|
||||||
|
ResourceVersion: fmt.Sprintf("%v", 100+index),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numberOfPods := 3
|
||||||
|
var expectedPodEvents []watch.Event
|
||||||
|
for i := 1; i <= numberOfPods; i++ {
|
||||||
|
pod := makePod(uint64(i))
|
||||||
|
if err := cacher.watchCache.Add(pod); err != nil {
|
||||||
|
t.Fatalf("failed to add a pod: %v", err)
|
||||||
|
}
|
||||||
|
expectedPodEvents = append(expectedPodEvents, watch.Event{Type: watch.Added, Object: pod})
|
||||||
|
}
|
||||||
|
var currentResourceVersion uint64 = 100 + 3
|
||||||
|
|
||||||
|
trueVal, falseVal := true, false
|
||||||
|
|
||||||
|
scenarios := []struct {
|
||||||
|
name string
|
||||||
|
allowWatchBookmarks bool
|
||||||
|
sendInitialEvents *bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=false, sendInitialEvents=false",
|
||||||
|
allowWatchBookmarks: false,
|
||||||
|
sendInitialEvents: &falseVal,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=false, sendInitialEvents=true",
|
||||||
|
allowWatchBookmarks: false,
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=true, sendInitialEvents=true",
|
||||||
|
allowWatchBookmarks: true,
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=true, sendInitialEvents=false",
|
||||||
|
allowWatchBookmarks: true,
|
||||||
|
sendInitialEvents: &falseVal,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=false, sendInitialEvents=nil",
|
||||||
|
allowWatchBookmarks: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, scenario := range scenarios {
|
||||||
|
t.Run(scenario.name, func(t *testing.T) {
|
||||||
|
expectedWatchEvents := expectedPodEvents
|
||||||
|
if scenario.allowWatchBookmarks && scenario.sendInitialEvents != nil && *scenario.sendInitialEvents {
|
||||||
|
expectedWatchEvents = append(expectedWatchEvents, watch.Event{
|
||||||
|
Type: watch.Bookmark,
|
||||||
|
Object: &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: strconv.FormatUint(currentResourceVersion, 10),
|
||||||
|
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pred := storage.Everything
|
||||||
|
pred.AllowWatchBookmarks = scenario.allowWatchBookmarks
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "100", SendInitialEvents: scenario.sendInitialEvents, Predicate: pred})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create watch: %v", err)
|
||||||
|
}
|
||||||
|
storagetesting.TestCheckResultsInStrictOrder(t, w, expectedWatchEvents)
|
||||||
|
storagetesting.TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
|
func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
|
|
||||||
|
|
@ -101,6 +101,9 @@ type watchCacheInterval struct {
|
||||||
// Given that indexer and indexValidator only read state, if
|
// Given that indexer and indexValidator only read state, if
|
||||||
// possible, Locker obtained through RLocker() is provided.
|
// possible, Locker obtained through RLocker() is provided.
|
||||||
lock sync.Locker
|
lock sync.Locker
|
||||||
|
|
||||||
|
// initialEventsEndBookmark will be sent after sending all events in cacheInterval
|
||||||
|
initialEventsEndBookmark *watchCacheEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
|
type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
|
||||||
|
|
|
||||||
|
|
@ -169,6 +169,36 @@ func testCheckResultFunc(t *testing.T, w watch.Interface, check func(actualEvent
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testCheckResultWithIgnoreFunc(t *testing.T, w watch.Interface, expectedEvents []watch.Event, ignore func(watch.Event) bool) {
|
||||||
|
checkIndex := 0
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event := <-w.ResultChan():
|
||||||
|
obj := event.Object
|
||||||
|
if co, ok := obj.(runtime.CacheableObject); ok {
|
||||||
|
event.Object = co.GetObject()
|
||||||
|
}
|
||||||
|
if ignore != nil && ignore(event) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if checkIndex < len(expectedEvents) {
|
||||||
|
expectNoDiff(t, "incorrect event", expectedEvents[checkIndex], event)
|
||||||
|
checkIndex++
|
||||||
|
} else {
|
||||||
|
t.Fatalf("cannot receive correct event, expect no event, but get a event: %+v", event)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
// wait 100ms forcibly in order to receive watchEvents including bookmark event.
|
||||||
|
// we cannot guarantee that we will receive all bookmark events within 100ms,
|
||||||
|
// but too large timeout value will lead to exceed the timeout of package test.
|
||||||
|
if checkIndex < len(expectedEvents) {
|
||||||
|
t.Fatalf("cannot receive enough events within specific time, rest expected events: %+v", expectedEvents[checkIndex:])
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func testCheckStop(t *testing.T, w watch.Interface) {
|
func testCheckStop(t *testing.T, w watch.Interface) {
|
||||||
select {
|
select {
|
||||||
case e, ok := <-w.ResultChan():
|
case e, ok := <-w.ResultChan():
|
||||||
|
|
@ -187,16 +217,18 @@ func testCheckStop(t *testing.T, w watch.Interface) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
|
func TestCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
|
||||||
for _, expectedEvent := range expectedEvents {
|
for _, expectedEvent := range expectedEvents {
|
||||||
testCheckResult(t, w, expectedEvent)
|
testCheckResult(t, w, expectedEvent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCheckNoMoreResults(t *testing.T, w watch.Interface) {
|
func TestCheckNoMoreResultsWithIgnoreFunc(t *testing.T, w watch.Interface, ignore func(watch.Event) bool) {
|
||||||
select {
|
select {
|
||||||
case e := <-w.ResultChan():
|
case e := <-w.ResultChan():
|
||||||
t.Errorf("Unexpected: %#v event received, expected no events", e)
|
if ignore == nil || !ignore(e) {
|
||||||
|
t.Errorf("Unexpected: %#v event received, expected no events", e)
|
||||||
|
}
|
||||||
// We consciously make the timeout short here to speed up tests.
|
// We consciously make the timeout short here to speed up tests.
|
||||||
case <-time.After(100 * time.Millisecond):
|
case <-time.After(100 * time.Millisecond):
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -1493,7 +1493,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
|
||||||
defer w.Stop()
|
defer w.Stop()
|
||||||
|
|
||||||
// make sure we only get initial events
|
// make sure we only get initial events
|
||||||
testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods))
|
TestCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods))
|
||||||
|
|
||||||
// make sure that the actual bookmark has at least RV >= to the expected one
|
// make sure that the actual bookmark has at least RV >= to the expected one
|
||||||
if scenario.expectedInitialEventsBookmarkWithMinimalRV != nil {
|
if scenario.expectedInitialEventsBookmarkWithMinimalRV != nil {
|
||||||
|
|
@ -1527,8 +1527,9 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
|
||||||
require.NoError(t, err, "failed to add a pod: %v")
|
require.NoError(t, err, "failed to add a pod: %v")
|
||||||
createdPods = append(createdPods, out)
|
createdPods = append(createdPods, out)
|
||||||
}
|
}
|
||||||
testCheckResultsInStrictOrder(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods))
|
ignoreEventsFn := func(event watch.Event) bool { return event.Type == watch.Bookmark }
|
||||||
testCheckNoMoreResults(t, w)
|
testCheckResultWithIgnoreFunc(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods), ignoreEventsFn)
|
||||||
|
TestCheckNoMoreResultsWithIgnoreFunc(t, w, ignoreEventsFn)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1582,8 +1583,8 @@ func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, st
|
||||||
|
|
||||||
// make sure we only get initial events from the first ns
|
// make sure we only get initial events from the first ns
|
||||||
// followed by the bookmark with the global RV
|
// followed by the bookmark with the global RV
|
||||||
testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(initialPods, otherNsPod.ResourceVersion))
|
TestCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(initialPods, otherNsPod.ResourceVersion))
|
||||||
testCheckNoMoreResults(t, w)
|
TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
|
@ -1637,8 +1638,8 @@ func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.In
|
||||||
|
|
||||||
// make sure we only get a single pod matching the field selector
|
// make sure we only get a single pod matching the field selector
|
||||||
// followed by the bookmark with the global RV
|
// followed by the bookmark with the global RV
|
||||||
testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(expectedPod, lastAddedPod.ResourceVersion))
|
TestCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(expectedPod, lastAddedPod.ResourceVersion))
|
||||||
testCheckNoMoreResults(t, w)
|
TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func makePod(namePrefix string) *example.Pod {
|
func makePod(namePrefix string) *example.Pod {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue