Merge pull request #115918 from yt2985/genericWatch
Partition watchers by namespace/name scope Kubernetes-commit: f2fdda8667228b357880ed353e494baabc902681
This commit is contained in:
commit
a5d1ee28dd
4
go.mod
4
go.mod
|
@ -42,7 +42,7 @@ require (
|
||||||
google.golang.org/protobuf v1.28.1
|
google.golang.org/protobuf v1.28.1
|
||||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||||
gopkg.in/square/go-jose.v2 v2.2.2
|
gopkg.in/square/go-jose.v2 v2.2.2
|
||||||
k8s.io/api v0.0.0-20230228090259-b5b22ca1babf
|
k8s.io/api v0.0.0-20230228090300-2ed8d24822d3
|
||||||
k8s.io/apimachinery v0.0.0-20230227225516-80f59387d3d1
|
k8s.io/apimachinery v0.0.0-20230227225516-80f59387d3d1
|
||||||
k8s.io/client-go v0.0.0-20230228090623-8f4ee7119f1d
|
k8s.io/client-go v0.0.0-20230228090623-8f4ee7119f1d
|
||||||
k8s.io/component-base v0.0.0-20230215215219-ae9be4dda9da
|
k8s.io/component-base v0.0.0-20230215215219-ae9be4dda9da
|
||||||
|
@ -124,7 +124,7 @@ require (
|
||||||
)
|
)
|
||||||
|
|
||||||
replace (
|
replace (
|
||||||
k8s.io/api => k8s.io/api v0.0.0-20230228090259-b5b22ca1babf
|
k8s.io/api => k8s.io/api v0.0.0-20230228090300-2ed8d24822d3
|
||||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230227225516-80f59387d3d1
|
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230227225516-80f59387d3d1
|
||||||
k8s.io/client-go => k8s.io/client-go v0.0.0-20230228090623-8f4ee7119f1d
|
k8s.io/client-go => k8s.io/client-go v0.0.0-20230228090623-8f4ee7119f1d
|
||||||
k8s.io/component-base => k8s.io/component-base v0.0.0-20230215215219-ae9be4dda9da
|
k8s.io/component-base => k8s.io/component-base v0.0.0-20230215215219-ae9be4dda9da
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -995,8 +995,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
|
||||||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||||
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||||
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||||
k8s.io/api v0.0.0-20230228090259-b5b22ca1babf h1:nOM4wjFnU0nlgQ5xSvS24ayMbBOFw1N/jyEj4ni0ugA=
|
k8s.io/api v0.0.0-20230228090300-2ed8d24822d3 h1:WGEoVJWNgow/pVi9lNmsWAuZC7Wr5gOJvE4niE+/WIc=
|
||||||
k8s.io/api v0.0.0-20230228090259-b5b22ca1babf/go.mod h1:XAyCRdxxjlGJLzvSsIhA8Ccnsryd6Xh0CgmB3ah3AmQ=
|
k8s.io/api v0.0.0-20230228090300-2ed8d24822d3/go.mod h1:XAyCRdxxjlGJLzvSsIhA8Ccnsryd6Xh0CgmB3ah3AmQ=
|
||||||
k8s.io/apimachinery v0.0.0-20230227225516-80f59387d3d1 h1:Any9/HFr7VOfgDnp+b98WhCql/Tup2fuQD4QYMMSt60=
|
k8s.io/apimachinery v0.0.0-20230227225516-80f59387d3d1 h1:Any9/HFr7VOfgDnp+b98WhCql/Tup2fuQD4QYMMSt60=
|
||||||
k8s.io/apimachinery v0.0.0-20230227225516-80f59387d3d1/go.mod h1:8B/+OdWlScxVvirboh1J5IZSHQrCreQ7fi/5UQntvX0=
|
k8s.io/apimachinery v0.0.0-20230227225516-80f59387d3d1/go.mod h1:8B/+OdWlScxVvirboh1J5IZSHQrCreQ7fi/5UQntvX0=
|
||||||
k8s.io/client-go v0.0.0-20230228090623-8f4ee7119f1d h1:AJ80TdJqiuvAWNAL4OnY8tD6Vpx1i4QRGLlLAZWjJ9w=
|
k8s.io/client-go v0.0.0-20230228090623-8f4ee7119f1d h1:AJ80TdJqiuvAWNAL4OnY8tD6Vpx1i4QRGLlLAZWjJ9w=
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
"k8s.io/apimachinery/pkg/api/validation"
|
||||||
"k8s.io/apimachinery/pkg/api/validation/path"
|
"k8s.io/apimachinery/pkg/api/validation/path"
|
||||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
@ -364,6 +365,16 @@ func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate,
|
||||||
Predicate: p,
|
Predicate: p,
|
||||||
Recursive: true,
|
Recursive: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if we're not already namespace-scoped, see if the field selector narrows the scope of the watch
|
||||||
|
if requestNamespace, _ := genericapirequest.NamespaceFrom(ctx); len(requestNamespace) == 0 {
|
||||||
|
if selectorNamespace, ok := p.MatchesSingleNamespace(); ok {
|
||||||
|
if len(validation.ValidateNamespaceName(selectorNamespace, false)) == 0 {
|
||||||
|
ctx = genericapirequest.WithNamespace(ctx, selectorNamespace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if name, ok := p.MatchesSingle(); ok {
|
if name, ok := p.MatchesSingle(); ok {
|
||||||
if key, err := e.KeyFunc(ctx, name); err == nil {
|
if key, err := e.KeyFunc(ctx, name); err == nil {
|
||||||
storageOpts.Recursive = false
|
storageOpts.Recursive = false
|
||||||
|
@ -1279,6 +1290,15 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti
|
||||||
func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) {
|
func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) {
|
||||||
storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p, Recursive: true}
|
storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p, Recursive: true}
|
||||||
|
|
||||||
|
// if we're not already namespace-scoped, see if the field selector narrows the scope of the watch
|
||||||
|
if requestNamespace, _ := genericapirequest.NamespaceFrom(ctx); len(requestNamespace) == 0 {
|
||||||
|
if selectorNamespace, ok := p.MatchesSingleNamespace(); ok {
|
||||||
|
if len(validation.ValidateNamespaceName(selectorNamespace, false)) == 0 {
|
||||||
|
ctx = genericapirequest.WithNamespace(ctx, selectorNamespace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
key := e.KeyRootFunc(ctx)
|
key := e.KeyRootFunc(ctx)
|
||||||
if name, ok := p.MatchesSingle(); ok {
|
if name, ok := p.MatchesSingle(); ok {
|
||||||
if k, err := e.KeyFunc(ctx, name); err == nil {
|
if k, err := e.KeyFunc(ctx, name); err == nil {
|
||||||
|
|
|
@ -37,6 +37,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/audit"
|
"k8s.io/apiserver/pkg/audit"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/features"
|
"k8s.io/apiserver/pkg/features"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
||||||
|
@ -126,29 +127,37 @@ func (wm watchersMap) terminateAll(done func(*cacheWatcher)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type indexedWatchers struct {
|
type indexedWatchers struct {
|
||||||
allWatchers watchersMap
|
allWatchers map[namespacedName]watchersMap
|
||||||
valueWatchers map[string]watchersMap
|
valueWatchers map[string]watchersMap
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {
|
func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespacedName, value string, supported bool) {
|
||||||
if supported {
|
if supported {
|
||||||
if _, ok := i.valueWatchers[value]; !ok {
|
if _, ok := i.valueWatchers[value]; !ok {
|
||||||
i.valueWatchers[value] = watchersMap{}
|
i.valueWatchers[value] = watchersMap{}
|
||||||
}
|
}
|
||||||
i.valueWatchers[value].addWatcher(w, number)
|
i.valueWatchers[value].addWatcher(w, number)
|
||||||
} else {
|
} else {
|
||||||
i.allWatchers.addWatcher(w, number)
|
scopedWatchers, ok := i.allWatchers[scope]
|
||||||
|
if !ok {
|
||||||
|
scopedWatchers = watchersMap{}
|
||||||
|
i.allWatchers[scope] = scopedWatchers
|
||||||
|
}
|
||||||
|
scopedWatchers.addWatcher(w, number)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool, done func(*cacheWatcher)) {
|
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) {
|
||||||
if supported {
|
if supported {
|
||||||
i.valueWatchers[value].deleteWatcher(number, done)
|
i.valueWatchers[value].deleteWatcher(number, done)
|
||||||
if len(i.valueWatchers[value]) == 0 {
|
if len(i.valueWatchers[value]) == 0 {
|
||||||
delete(i.valueWatchers, value)
|
delete(i.valueWatchers, value)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
i.allWatchers.deleteWatcher(number, done)
|
i.allWatchers[scope].deleteWatcher(number, done)
|
||||||
|
if len(i.allWatchers[scope]) == 0 {
|
||||||
|
delete(i.allWatchers, scope)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,10 +169,13 @@ func (i *indexedWatchers) terminateAll(groupResource schema.GroupResource, done
|
||||||
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
|
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
|
||||||
klog.Warningf("Terminating all watchers from cacher %v", groupResource)
|
klog.Warningf("Terminating all watchers from cacher %v", groupResource)
|
||||||
}
|
}
|
||||||
i.allWatchers.terminateAll(done)
|
for _, watchers := range i.allWatchers {
|
||||||
|
watchers.terminateAll(done)
|
||||||
|
}
|
||||||
for _, watchers := range i.valueWatchers {
|
for _, watchers := range i.valueWatchers {
|
||||||
watchers.terminateAll(done)
|
watchers.terminateAll(done)
|
||||||
}
|
}
|
||||||
|
i.allWatchers = map[namespacedName]watchersMap{}
|
||||||
i.valueWatchers = map[string]watchersMap{}
|
i.valueWatchers = map[string]watchersMap{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,7 +373,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
||||||
indexedTrigger: indexedTrigger,
|
indexedTrigger: indexedTrigger,
|
||||||
watcherIdx: 0,
|
watcherIdx: 0,
|
||||||
watchers: indexedWatchers{
|
watchers: indexedWatchers{
|
||||||
allWatchers: make(map[int]*cacheWatcher),
|
allWatchers: make(map[namespacedName]watchersMap),
|
||||||
valueWatchers: make(map[string]watchersMap),
|
valueWatchers: make(map[string]watchersMap),
|
||||||
},
|
},
|
||||||
// TODO: Figure out the correct value for the buffer size.
|
// TODO: Figure out the correct value for the buffer size.
|
||||||
|
@ -478,6 +490,11 @@ func (c *Cacher) Delete(
|
||||||
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil)
|
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type namespacedName struct {
|
||||||
|
namespace string
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
// Watch implements storage.Interface.
|
// Watch implements storage.Interface.
|
||||||
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
||||||
pred := opts.Predicate
|
pred := opts.Predicate
|
||||||
|
@ -497,6 +514,19 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
||||||
return nil, errors.NewServiceUnavailable(err.Error())
|
return nil, errors.NewServiceUnavailable(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// determine the namespace and name scope of the watch, first from the request, secondarily from the field selector
|
||||||
|
scope := namespacedName{}
|
||||||
|
if requestNamespace, ok := request.NamespaceFrom(ctx); ok && len(requestNamespace) > 0 {
|
||||||
|
scope.namespace = requestNamespace
|
||||||
|
} else if selectorNamespace, ok := pred.Field.RequiresExactMatch("metadata.namespace"); ok {
|
||||||
|
scope.namespace = selectorNamespace
|
||||||
|
}
|
||||||
|
if requestInfo, ok := request.RequestInfoFrom(ctx); ok && requestInfo != nil && len(requestInfo.Name) > 0 {
|
||||||
|
scope.name = requestInfo.Name
|
||||||
|
} else if selectorName, ok := pred.Field.RequiresExactMatch("metadata.name"); ok {
|
||||||
|
scope.name = selectorName
|
||||||
|
}
|
||||||
|
|
||||||
triggerValue, triggerSupported := "", false
|
triggerValue, triggerSupported := "", false
|
||||||
if c.indexedTrigger != nil {
|
if c.indexedTrigger != nil {
|
||||||
for _, field := range pred.IndexFields {
|
for _, field := range pred.IndexFields {
|
||||||
|
@ -554,8 +584,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
// Update watcher.forget function once we can compute it.
|
// Update watcher.forget function once we can compute it.
|
||||||
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, triggerValue, triggerSupported)
|
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
|
||||||
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
|
||||||
|
|
||||||
// Add it to the queue only when the client support watch bookmarks.
|
// Add it to the queue only when the client support watch bookmarks.
|
||||||
if watcher.allowWatchBookmarks {
|
if watcher.allowWatchBookmarks {
|
||||||
|
@ -986,10 +1016,32 @@ func (c *Cacher) startDispatching(event *watchCacheEvent) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate over "allWatchers" no matter what the trigger function is.
|
// iterate over watchers for each applicable namespace/name tuple
|
||||||
for _, watcher := range c.watchers.allWatchers {
|
namespace := event.ObjFields["metadata.namespace"]
|
||||||
|
name := event.ObjFields["metadata.name"]
|
||||||
|
if len(namespace) > 0 {
|
||||||
|
if len(name) > 0 {
|
||||||
|
// namespaced watchers scoped by name
|
||||||
|
for _, watcher := range c.watchers.allWatchers[namespacedName{namespace: namespace, name: name}] {
|
||||||
|
c.watchersBuffer = append(c.watchersBuffer, watcher)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// namespaced watchers not scoped by name
|
||||||
|
for _, watcher := range c.watchers.allWatchers[namespacedName{namespace: namespace}] {
|
||||||
|
c.watchersBuffer = append(c.watchersBuffer, watcher)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(name) > 0 {
|
||||||
|
// cluster-wide watchers scoped by name
|
||||||
|
for _, watcher := range c.watchers.allWatchers[namespacedName{name: name}] {
|
||||||
|
c.watchersBuffer = append(c.watchersBuffer, watcher)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// cluster-wide watchers unscoped by name
|
||||||
|
for _, watcher := range c.watchers.allWatchers[namespacedName{}] {
|
||||||
c.watchersBuffer = append(c.watchersBuffer, watcher)
|
c.watchersBuffer = append(c.watchersBuffer, watcher)
|
||||||
}
|
}
|
||||||
|
|
||||||
if supported {
|
if supported {
|
||||||
// Iterate over watchers interested in the given values of the trigger.
|
// Iterate over watchers interested in the given values of the trigger.
|
||||||
for _, triggerValue := range triggerValues {
|
for _, triggerValue := range triggerValues {
|
||||||
|
@ -1071,7 +1123,7 @@ func (c *Cacher) Stop() {
|
||||||
c.stopWg.Wait()
|
c.stopWg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func forgetWatcher(c *Cacher, w *cacheWatcher, index int, triggerValue string, triggerSupported bool) func(bool) {
|
func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName, triggerValue string, triggerSupported bool) func(bool) {
|
||||||
return func(drainWatcher bool) {
|
return func(drainWatcher bool) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
@ -1081,7 +1133,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, triggerValue string, t
|
||||||
// It's possible that the watcher is already not in the structure (e.g. in case of
|
// It's possible that the watcher is already not in the structure (e.g. in case of
|
||||||
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
|
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
|
||||||
// on a watcher multiple times.
|
// on a watcher multiple times.
|
||||||
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherLocked)
|
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -298,6 +298,7 @@ func TestWatchCacheBypass(t *testing.T) {
|
||||||
backingStorage.injectError(errDummy)
|
backingStorage.injectError(errDummy)
|
||||||
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
||||||
ResourceVersion: "0",
|
ResourceVersion: "0",
|
||||||
|
Predicate: storage.Everything,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
|
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
|
||||||
|
|
|
@ -34,6 +34,16 @@ func TestWatch(t *testing.T) {
|
||||||
storagetesting.RunTestWatch(ctx, t, store)
|
storagetesting.RunTestWatch(ctx, t, store)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClusterScopedWatch(t *testing.T) {
|
||||||
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.TestClusterScopedWatch(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNamespaceScopedWatch(t *testing.T) {
|
||||||
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.TestNamespaceScopedWatch(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
func TestDeleteTriggerWatch(t *testing.T) {
|
func TestDeleteTriggerWatch(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
|
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
|
||||||
|
|
|
@ -112,6 +112,18 @@ func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set)
|
||||||
return matched
|
return matched
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MatchesSingleNamespace will return (namespace, true) if and only if s.Field matches on the object's
|
||||||
|
// namespace.
|
||||||
|
func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) {
|
||||||
|
if len(s.Continue) > 0 {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
if namespace, ok := s.Field.RequiresExactMatch("metadata.namespace"); ok {
|
||||||
|
return namespace, true
|
||||||
|
}
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
// MatchesSingle will return (name, true) if and only if s.Field matches on the object's
|
// MatchesSingle will return (name, true) if and only if s.Field matches on the object's
|
||||||
// name.
|
// name.
|
||||||
func (s *SelectionPredicate) MatchesSingle() (string, bool) {
|
func (s *SelectionPredicate) MatchesSingle() (string, bool) {
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
|
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/value"
|
"k8s.io/apiserver/pkg/storage/value"
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
|
@ -372,8 +373,548 @@ func RunOptionalTestProgressNotify(ctx context.Context, t *testing.T, store stor
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// It tests watches of cluster-scoped resources.
|
||||||
|
func TestClusterScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
// For watch request, the name of object is specified with field selector
|
||||||
|
// "metadata.name=objectName". So in this watch tests, we should set the
|
||||||
|
// requestedName and field selector "metadata.name=requestedName" at the
|
||||||
|
// same time or set neighter of them.
|
||||||
|
requestedName string
|
||||||
|
recursive bool
|
||||||
|
fieldSelector fields.Selector
|
||||||
|
indexFields []string
|
||||||
|
watchTests []*testWatchStruct
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "cluster-wide watch, request without name, without field selector",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.Everything(),
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{basePod("t1-foo1"), true, watch.Added},
|
||||||
|
{basePodUpdated("t1-foo1"), true, watch.Modified},
|
||||||
|
{basePodAssigned("t1-foo2", "t1-bar1"), true, watch.Added},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cluster-wide watch, request without name, field selector with spec.nodeName",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("spec.nodeName=t2-bar1"),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{basePod("t2-foo1"), false, ""},
|
||||||
|
{basePodAssigned("t2-foo1", "t2-bar1"), true, watch.Added},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cluster-wide watch, request without name, field selector with spec.nodeName to filter out watch",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("spec.nodeName!=t3-bar1"),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{basePod("t3-foo1"), true, watch.Added},
|
||||||
|
{basePod("t3-foo2"), true, watch.Added},
|
||||||
|
{basePodUpdated("t3-foo1"), true, watch.Modified},
|
||||||
|
{basePodAssigned("t3-foo1", "t3-bar1"), true, watch.Deleted},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cluster-wide watch, request with name, field selector with metadata.name",
|
||||||
|
requestedName: "t4-foo1",
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("metadata.name=t4-foo1"),
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{basePod("t4-foo1"), true, watch.Added},
|
||||||
|
{basePod("t4-foo2"), false, ""},
|
||||||
|
{basePodUpdated("t4-foo1"), true, watch.Modified},
|
||||||
|
{basePodUpdated("t4-foo2"), false, ""},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cluster-wide watch, request with name, field selector with metadata.name and spec.nodeName",
|
||||||
|
requestedName: "t5-foo1",
|
||||||
|
fieldSelector: fields.SelectorFromSet(fields.Set{
|
||||||
|
"metadata.name": "t5-foo1",
|
||||||
|
"spec.nodeName": "t5-bar1",
|
||||||
|
}),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{basePod("t5-foo1"), false, ""},
|
||||||
|
{basePod("t5-foo2"), false, ""},
|
||||||
|
{basePodUpdated("t5-foo1"), false, ""},
|
||||||
|
{basePodUpdated("t5-foo2"), false, ""},
|
||||||
|
{basePodAssigned("t5-foo1", "t5-bar1"), true, watch.Added},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cluster-wide watch, request with name, field selector with metadata.name, and with spec.nodeName to filter out watch",
|
||||||
|
requestedName: "t6-foo1",
|
||||||
|
fieldSelector: fields.AndSelectors(
|
||||||
|
fields.ParseSelectorOrDie("spec.nodeName!=t6-bar1"),
|
||||||
|
fields.SelectorFromSet(fields.Set{"metadata.name": "t6-foo1"}),
|
||||||
|
),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{basePod("t6-foo1"), true, watch.Added},
|
||||||
|
{basePod("t6-foo2"), false, ""},
|
||||||
|
{basePodUpdated("t6-foo1"), true, watch.Modified},
|
||||||
|
{basePodAssigned("t6-foo1", "t6-bar1"), true, watch.Deleted},
|
||||||
|
{basePodAssigned("t6-foo2", "t6-bar1"), false, ""},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
requestInfo := &genericapirequest.RequestInfo{}
|
||||||
|
requestInfo.Name = tt.requestedName
|
||||||
|
requestInfo.Namespace = ""
|
||||||
|
ctx = genericapirequest.WithRequestInfo(ctx, requestInfo)
|
||||||
|
ctx = genericapirequest.WithNamespace(ctx, "")
|
||||||
|
|
||||||
|
watchKey := "/pods"
|
||||||
|
if tt.requestedName != "" {
|
||||||
|
watchKey += "/" + tt.requestedName
|
||||||
|
}
|
||||||
|
|
||||||
|
predicate := createPodPredicate(tt.fieldSelector, false, tt.indexFields)
|
||||||
|
|
||||||
|
list := &example.PodList{}
|
||||||
|
opts := storage.ListOptions{
|
||||||
|
ResourceVersion: "",
|
||||||
|
Predicate: predicate,
|
||||||
|
Recursive: true,
|
||||||
|
}
|
||||||
|
if err := store.GetList(ctx, "/pods", opts, list); err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts.ResourceVersion = list.ResourceVersion
|
||||||
|
opts.Recursive = tt.recursive
|
||||||
|
|
||||||
|
w, err := store.Watch(ctx, watchKey, opts)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
currentObjs := map[string]*example.Pod{}
|
||||||
|
for _, watchTest := range tt.watchTests {
|
||||||
|
out := &example.Pod{}
|
||||||
|
key := "pods/" + watchTest.obj.Name
|
||||||
|
err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||||
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
|
obj := watchTest.obj.DeepCopy()
|
||||||
|
return obj, nil
|
||||||
|
}), nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectObj := out
|
||||||
|
if watchTest.watchType == watch.Deleted {
|
||||||
|
expectObj = currentObjs[watchTest.obj.Name]
|
||||||
|
expectObj.ResourceVersion = out.ResourceVersion
|
||||||
|
delete(currentObjs, watchTest.obj.Name)
|
||||||
|
} else {
|
||||||
|
currentObjs[watchTest.obj.Name] = out
|
||||||
|
}
|
||||||
|
if watchTest.expectEvent {
|
||||||
|
testCheckResult(t, watchTest.watchType, w, expectObj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.Stop()
|
||||||
|
testCheckStop(t, w)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// It tests watch of namespace-scoped resources.
|
||||||
|
func TestNamespaceScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
// For watch request, the name of object is specified with field selector
|
||||||
|
// "metadata.name=objectName". So in this watch tests, we should set the
|
||||||
|
// requestedName and field selector "metadata.name=requestedName" at the
|
||||||
|
// same time or set neighter of them.
|
||||||
|
requestedName string
|
||||||
|
requestedNamespace string
|
||||||
|
recursive bool
|
||||||
|
fieldSelector fields.Selector
|
||||||
|
indexFields []string
|
||||||
|
watchTests []*testWatchStruct
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request without name, request without namespace, without field selector",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.Everything(),
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t1-foo1", "t1-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPod("t1-foo2", "t1-ns2"), true, watch.Added},
|
||||||
|
{baseNamespacedPodUpdated("t1-foo1", "t1-ns1"), true, watch.Modified},
|
||||||
|
{baseNamespacedPodUpdated("t1-foo2", "t1-ns2"), true, watch.Modified},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request without name, request without namespace, field selector with metadata.namespace",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=t2-ns1"),
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t2-foo1", "t2-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPod("t2-foo1", "t2-ns2"), false, ""},
|
||||||
|
{baseNamespacedPodUpdated("t2-foo1", "t2-ns1"), true, watch.Modified},
|
||||||
|
{baseNamespacedPodUpdated("t2-foo1", "t2-ns2"), false, ""},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request without name, request without namespace, field selector with spec.nodename",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("spec.nodeName=t3-bar1"),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t3-foo1", "t3-ns1"), false, ""},
|
||||||
|
{baseNamespacedPod("t3-foo2", "t3-ns2"), false, ""},
|
||||||
|
{baseNamespacedPodAssigned("t3-foo1", "t3-ns1", "t3-bar1"), true, watch.Added},
|
||||||
|
{baseNamespacedPodAssigned("t3-foo2", "t3-ns2", "t3-bar1"), true, watch.Added},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request without name, request without namespace, field selector with spec.nodename to filter out watch",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("spec.nodeName!=t4-bar1"),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t4-foo1", "t4-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPod("t4-foo2", "t4-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPodUpdated("t4-foo1", "t4-ns1"), true, watch.Modified},
|
||||||
|
{baseNamespacedPodAssigned("t4-foo1", "t4-ns1", "t4-bar1"), true, watch.Deleted},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request without name, request with namespace, without field selector",
|
||||||
|
requestedNamespace: "t5-ns1",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.Everything(),
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t5-foo1", "t5-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPod("t5-foo1", "t5-ns2"), false, ""},
|
||||||
|
{baseNamespacedPod("t5-foo2", "t5-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPodUpdated("t5-foo1", "t5-ns1"), true, watch.Modified},
|
||||||
|
{baseNamespacedPodUpdated("t5-foo1", "t5-ns2"), false, ""},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request without name, request with namespace, field selector with matched metadata.namespace",
|
||||||
|
requestedNamespace: "t6-ns1",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=t6-ns1"),
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t6-foo1", "t6-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPod("t6-foo1", "t6-ns2"), false, ""},
|
||||||
|
{baseNamespacedPodUpdated("t6-foo1", "t6-ns1"), true, watch.Modified},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request without name, request with namespace, field selector with non-matched metadata.namespace",
|
||||||
|
requestedNamespace: "t7-ns1",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=t7-ns2"),
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t7-foo1", "t7-ns1"), false, ""},
|
||||||
|
{baseNamespacedPod("t7-foo1", "t7-ns2"), false, ""},
|
||||||
|
{baseNamespacedPodUpdated("t7-foo1", "t7-ns1"), false, ""},
|
||||||
|
{baseNamespacedPodUpdated("t7-foo1", "t7-ns2"), false, ""},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request without name, request with namespace, field selector with spec.nodename",
|
||||||
|
requestedNamespace: "t8-ns1",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("spec.nodeName=t8-bar2"),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t8-foo1", "t8-ns1"), false, ""},
|
||||||
|
{baseNamespacedPodAssigned("t8-foo1", "t8-ns1", "t8-bar1"), false, ""},
|
||||||
|
{baseNamespacedPodAssigned("t8-foo1", "t8-ns2", "t8-bar2"), false, ""},
|
||||||
|
{baseNamespacedPodAssigned("t8-foo1", "t8-ns1", "t8-bar2"), true, watch.Added},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request without name, request with namespace, field selector with spec.nodename to filter out watch",
|
||||||
|
requestedNamespace: "t9-ns2",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("spec.nodeName!=t9-bar1"),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t9-foo1", "t9-ns1"), false, ""},
|
||||||
|
{baseNamespacedPod("t9-foo1", "t9-ns2"), true, watch.Added},
|
||||||
|
{baseNamespacedPodAssigned("t9-foo1", "t9-ns2", "t9-bar1"), true, watch.Deleted},
|
||||||
|
{baseNamespacedPodAssigned("t9-foo1", "t9-ns2", "t9-bar2"), true, watch.Added},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request with name, request without namespace, field selector with metadata.name",
|
||||||
|
requestedName: "t10-foo1",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("metadata.name=t10-foo1"),
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t10-foo1", "t10-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPod("t10-foo1", "t10-ns2"), true, watch.Added},
|
||||||
|
{baseNamespacedPod("t10-foo2", "t10-ns1"), false, ""},
|
||||||
|
{baseNamespacedPodUpdated("t10-foo1", "t10-ns1"), true, watch.Modified},
|
||||||
|
{baseNamespacedPodAssigned("t10-foo1", "t10-ns1", "t10-bar1"), true, watch.Modified},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request with name, request without namespace, field selector with metadata.name and metadata.namespace",
|
||||||
|
requestedName: "t11-foo1",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.SelectorFromSet(fields.Set{
|
||||||
|
"metadata.name": "t11-foo1",
|
||||||
|
"metadata.namespace": "t11-ns1",
|
||||||
|
}),
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t11-foo1", "t11-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPod("t11-foo2", "t11-ns1"), false, ""},
|
||||||
|
{baseNamespacedPod("t11-foo1", "t11-ns2"), false, ""},
|
||||||
|
{baseNamespacedPodUpdated("t11-foo1", "t11-ns1"), true, watch.Modified},
|
||||||
|
{baseNamespacedPodAssigned("t11-foo1", "t11-ns1", "t11-bar1"), true, watch.Modified},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request with name, request without namespace, field selector with metadata.name and spec.nodeName",
|
||||||
|
requestedName: "t12-foo1",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.SelectorFromSet(fields.Set{
|
||||||
|
"metadata.name": "t12-foo1",
|
||||||
|
"spec.nodeName": "t12-bar1",
|
||||||
|
}),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t12-foo1", "t12-ns1"), false, ""},
|
||||||
|
{baseNamespacedPodUpdated("t12-foo1", "t12-ns1"), false, ""},
|
||||||
|
{baseNamespacedPodAssigned("t12-foo1", "t12-ns1", "t12-bar1"), true, watch.Added},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request with name, request without namespace, field selector with metadata.name, and with spec.nodeName to filter out watch",
|
||||||
|
requestedName: "t15-foo1",
|
||||||
|
recursive: true,
|
||||||
|
fieldSelector: fields.AndSelectors(
|
||||||
|
fields.ParseSelectorOrDie("spec.nodeName!=t15-bar1"),
|
||||||
|
fields.SelectorFromSet(fields.Set{"metadata.name": "t15-foo1"}),
|
||||||
|
),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t15-foo1", "t15-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPod("t15-foo2", "t15-ns1"), false, ""},
|
||||||
|
{baseNamespacedPodUpdated("t15-foo1", "t15-ns1"), true, watch.Modified},
|
||||||
|
{baseNamespacedPodAssigned("t15-foo1", "t15-ns1", "t15-bar1"), true, watch.Deleted},
|
||||||
|
{baseNamespacedPodAssigned("t15-foo1", "t15-ns1", "t15-bar2"), true, watch.Added},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request with name, request with namespace, with field selector metadata.name",
|
||||||
|
requestedName: "t16-foo1",
|
||||||
|
requestedNamespace: "t16-ns1",
|
||||||
|
fieldSelector: fields.ParseSelectorOrDie("metadata.name=t16-foo1"),
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t16-foo1", "t16-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPod("t16-foo2", "t16-ns1"), false, ""},
|
||||||
|
{baseNamespacedPodUpdated("t16-foo1", "t16-ns1"), true, watch.Modified},
|
||||||
|
{baseNamespacedPodAssigned("t16-foo1", "t16-ns1", "t16-bar1"), true, watch.Modified},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request with name, request with namespace, with field selector metadata.name and metadata.namespace",
|
||||||
|
requestedName: "t17-foo2",
|
||||||
|
requestedNamespace: "t17-ns1",
|
||||||
|
fieldSelector: fields.SelectorFromSet(fields.Set{
|
||||||
|
"metadata.name": "t17-foo2",
|
||||||
|
"metadata.namespace": "t17-ns1",
|
||||||
|
}),
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t17-foo1", "t17-ns1"), false, ""},
|
||||||
|
{baseNamespacedPod("t17-foo2", "t17-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPodUpdated("t17-foo1", "t17-ns1"), false, ""},
|
||||||
|
{baseNamespacedPodAssigned("t17-foo2", "t17-ns1", "t17-bar1"), true, watch.Modified},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request with name, request with namespace, with field selector metadata.name, metadata.namespace and spec.nodename",
|
||||||
|
requestedName: "t18-foo1",
|
||||||
|
requestedNamespace: "t18-ns1",
|
||||||
|
fieldSelector: fields.SelectorFromSet(fields.Set{
|
||||||
|
"metadata.name": "t18-foo1",
|
||||||
|
"metadata.namespace": "t18-ns1",
|
||||||
|
"spec.nodeName": "t18-bar1",
|
||||||
|
}),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t18-foo1", "t18-ns1"), false, ""},
|
||||||
|
{baseNamespacedPod("t18-foo2", "t18-ns1"), false, ""},
|
||||||
|
{baseNamespacedPod("t18-foo1", "t18-ns2"), false, ""},
|
||||||
|
{baseNamespacedPodUpdated("t18-foo1", "t18-ns1"), false, ""},
|
||||||
|
{baseNamespacedPodAssigned("t18-foo1", "t18-ns1", "t18-bar1"), true, watch.Added},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespaced watch, request with name, request with namespace, with field selector metadata.name, metadata.namespace, and with spec.nodename to filter out watch",
|
||||||
|
requestedName: "t19-foo2",
|
||||||
|
requestedNamespace: "t19-ns1",
|
||||||
|
fieldSelector: fields.AndSelectors(
|
||||||
|
fields.ParseSelectorOrDie("spec.nodeName!=t19-bar1"),
|
||||||
|
fields.SelectorFromSet(fields.Set{"metadata.name": "t19-foo2", "metadata.namespace": "t19-ns1"}),
|
||||||
|
),
|
||||||
|
indexFields: []string{"spec.nodeName"},
|
||||||
|
watchTests: []*testWatchStruct{
|
||||||
|
{baseNamespacedPod("t19-foo1", "t19-ns1"), false, ""},
|
||||||
|
{baseNamespacedPod("t19-foo2", "t19-ns2"), false, ""},
|
||||||
|
{baseNamespacedPod("t19-foo2", "t19-ns1"), true, watch.Added},
|
||||||
|
{baseNamespacedPodUpdated("t19-foo2", "t19-ns1"), true, watch.Modified},
|
||||||
|
{baseNamespacedPodAssigned("t19-foo2", "t19-ns1", "t19-bar1"), true, watch.Deleted},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
requestInfo := &genericapirequest.RequestInfo{}
|
||||||
|
requestInfo.Name = tt.requestedName
|
||||||
|
requestInfo.Namespace = tt.requestedNamespace
|
||||||
|
ctx = genericapirequest.WithRequestInfo(ctx, requestInfo)
|
||||||
|
ctx = genericapirequest.WithNamespace(ctx, tt.requestedNamespace)
|
||||||
|
|
||||||
|
watchKey := "/pods"
|
||||||
|
if tt.requestedNamespace != "" {
|
||||||
|
watchKey += "/" + tt.requestedNamespace
|
||||||
|
if tt.requestedName != "" {
|
||||||
|
watchKey += "/" + tt.requestedName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
predicate := createPodPredicate(tt.fieldSelector, true, tt.indexFields)
|
||||||
|
|
||||||
|
list := &example.PodList{}
|
||||||
|
opts := storage.ListOptions{
|
||||||
|
ResourceVersion: "",
|
||||||
|
Predicate: predicate,
|
||||||
|
Recursive: true,
|
||||||
|
}
|
||||||
|
if err := store.GetList(ctx, "/pods", opts, list); err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts.ResourceVersion = list.ResourceVersion
|
||||||
|
opts.Recursive = tt.recursive
|
||||||
|
|
||||||
|
w, err := store.Watch(ctx, watchKey, opts)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
currentObjs := map[string]*example.Pod{}
|
||||||
|
for _, watchTest := range tt.watchTests {
|
||||||
|
out := &example.Pod{}
|
||||||
|
key := "pods/" + watchTest.obj.Namespace + "/" + watchTest.obj.Name
|
||||||
|
err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||||
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
|
obj := watchTest.obj.DeepCopy()
|
||||||
|
return obj, nil
|
||||||
|
}), nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectObj := out
|
||||||
|
podIdentifier := watchTest.obj.Namespace + "/" + watchTest.obj.Name
|
||||||
|
if watchTest.watchType == watch.Deleted {
|
||||||
|
expectObj = currentObjs[podIdentifier]
|
||||||
|
expectObj.ResourceVersion = out.ResourceVersion
|
||||||
|
delete(currentObjs, podIdentifier)
|
||||||
|
} else {
|
||||||
|
currentObjs[podIdentifier] = out
|
||||||
|
}
|
||||||
|
if watchTest.expectEvent {
|
||||||
|
testCheckResult(t, watchTest.watchType, w, expectObj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.Stop()
|
||||||
|
testCheckStop(t, w)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type testWatchStruct struct {
|
type testWatchStruct struct {
|
||||||
obj *example.Pod
|
obj *example.Pod
|
||||||
expectEvent bool
|
expectEvent bool
|
||||||
watchType watch.EventType
|
watchType watch.EventType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createPodPredicate(field fields.Selector, namespaceScoped bool, indexField []string) storage.SelectionPredicate {
|
||||||
|
return storage.SelectionPredicate{
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: field,
|
||||||
|
GetAttrs: determinePodGetAttrFunc(namespaceScoped, indexField),
|
||||||
|
IndexFields: indexField,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func determinePodGetAttrFunc(namespaceScoped bool, indexField []string) storage.AttrFunc {
|
||||||
|
if indexField != nil {
|
||||||
|
if namespaceScoped {
|
||||||
|
return namespacedScopedNodeNameAttrFunc
|
||||||
|
}
|
||||||
|
return clusterScopedNodeNameAttrFunc
|
||||||
|
}
|
||||||
|
if namespaceScoped {
|
||||||
|
return storage.DefaultNamespaceScopedAttr
|
||||||
|
}
|
||||||
|
return storage.DefaultClusterScopedAttr
|
||||||
|
}
|
||||||
|
|
||||||
|
func namespacedScopedNodeNameAttrFunc(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
return nil, fields.Set{
|
||||||
|
"spec.nodeName": pod.Spec.NodeName,
|
||||||
|
"metadata.name": pod.ObjectMeta.Name,
|
||||||
|
"metadata.namespace": pod.ObjectMeta.Namespace,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func clusterScopedNodeNameAttrFunc(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
return nil, fields.Set{
|
||||||
|
"spec.nodeName": pod.Spec.NodeName,
|
||||||
|
"metadata.name": pod.ObjectMeta.Name,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func basePod(podName string) *example.Pod {
|
||||||
|
return baseNamespacedPod(podName, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func basePodUpdated(podName string) *example.Pod {
|
||||||
|
return baseNamespacedPodUpdated(podName, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func basePodAssigned(podName, nodeName string) *example.Pod {
|
||||||
|
return baseNamespacedPodAssigned(podName, "", nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func baseNamespacedPod(podName, namespace string) *example.Pod {
|
||||||
|
return &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: podName, Namespace: namespace},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func baseNamespacedPodUpdated(podName, namespace string) *example.Pod {
|
||||||
|
return &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: podName, Namespace: namespace},
|
||||||
|
Status: example.PodStatus{Phase: "Running"},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func baseNamespacedPodAssigned(podName, namespace, nodeName string) *example.Pod {
|
||||||
|
return &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: podName, Namespace: namespace},
|
||||||
|
Spec: example.PodSpec{NodeName: nodeName},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -179,6 +179,18 @@ func TestList(t *testing.T) {
|
||||||
storagetesting.RunTestList(ctx, t, cacher, true)
|
storagetesting.RunTestList(ctx, t, cacher, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClusterScopedWatch(t *testing.T) {
|
||||||
|
ctx, cacher, terminate := testSetup(t, withClusterScopedKeyFunc, withSpecNodeNameIndexerFuncs)
|
||||||
|
t.Cleanup(terminate)
|
||||||
|
storagetesting.TestClusterScopedWatch(ctx, t, cacher)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNamespaceScopedWatch(t *testing.T) {
|
||||||
|
ctx, cacher, terminate := testSetup(t, withSpecNodeNameIndexerFuncs)
|
||||||
|
t.Cleanup(terminate)
|
||||||
|
storagetesting.TestNamespaceScopedWatch(ctx, t, cacher)
|
||||||
|
}
|
||||||
|
|
||||||
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
|
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
|
||||||
_, _, line, _ := goruntime.Caller(1)
|
_, _, line, _ := goruntime.Caller(1)
|
||||||
select {
|
select {
|
||||||
|
@ -759,6 +771,7 @@ type tearDownFunc func()
|
||||||
type setupOptions struct {
|
type setupOptions struct {
|
||||||
resourcePrefix string
|
resourcePrefix string
|
||||||
keyFunc func(runtime.Object) (string, error)
|
keyFunc func(runtime.Object) (string, error)
|
||||||
|
indexerFuncs map[string]storage.IndexerFunc
|
||||||
clock clock.Clock
|
clock clock.Clock
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -772,6 +785,24 @@ func withDefaults(options *setupOptions) {
|
||||||
options.clock = clock.RealClock{}
|
options.clock = clock.RealClock{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func withClusterScopedKeyFunc(options *setupOptions) {
|
||||||
|
options.keyFunc = func(obj runtime.Object) (string, error) {
|
||||||
|
return storage.NoNamespaceKeyFunc(options.resourcePrefix, obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func withSpecNodeNameIndexerFuncs(options *setupOptions) {
|
||||||
|
options.indexerFuncs = map[string]storage.IndexerFunc{
|
||||||
|
"spec.nodeName": func(obj runtime.Object) string {
|
||||||
|
pod, ok := obj.(*example.Pod)
|
||||||
|
if !ok {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return pod.Spec.NodeName
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstorage.Cacher, tearDownFunc) {
|
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstorage.Cacher, tearDownFunc) {
|
||||||
setupOpts := setupOptions{}
|
setupOpts := setupOptions{}
|
||||||
opts = append([]setupOption{withDefaults}, opts...)
|
opts = append([]setupOption{withDefaults}, opts...)
|
||||||
|
@ -795,6 +826,7 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstora
|
||||||
GetAttrsFunc: GetPodAttrs,
|
GetAttrsFunc: GetPodAttrs,
|
||||||
NewFunc: newPod,
|
NewFunc: newPod,
|
||||||
NewListFunc: newPodList,
|
NewListFunc: newPodList,
|
||||||
|
IndexerFuncs: setupOpts.indexerFuncs,
|
||||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||||
Clock: setupOpts.clock,
|
Clock: setupOpts.clock,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue