Merge pull request #124692 from linxiulei/watchlist_opt
cacher: apply key for initial events Kubernetes-commit: e6d641651a676c50d861821e18706aa154bceed6
This commit is contained in:
commit
54ba047441
4
go.mod
4
go.mod
|
@ -45,8 +45,8 @@ require (
|
|||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
gopkg.in/square/go-jose.v2 v2.6.0
|
||||
k8s.io/api v0.0.0-20240531003526-c114cd746b5a
|
||||
k8s.io/apimachinery v0.0.0-20240530220031-733a95eb52c3
|
||||
k8s.io/client-go v0.0.0-20240531003927-52e5651101ed
|
||||
k8s.io/apimachinery v0.0.0-20240603234208-703232ea6da4
|
||||
k8s.io/client-go v0.0.0-20240604003933-7c6e307a725f
|
||||
k8s.io/component-base v0.0.0-20240531004836-3486e8d18caf
|
||||
k8s.io/klog/v2 v2.120.1
|
||||
k8s.io/kms v0.0.0-20240528085127-26a6cff8e4bf
|
||||
|
|
8
go.sum
8
go.sum
|
@ -382,10 +382,10 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
|
|||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
k8s.io/api v0.0.0-20240531003526-c114cd746b5a h1:P8nQ3iz4FxeKN26Y8fz9qoEkUZy/DkQPtBmkVyriAS0=
|
||||
k8s.io/api v0.0.0-20240531003526-c114cd746b5a/go.mod h1:2VfykmUr8OqDStfcJWPvSW182MtoxAMeWsJHXwxqzXo=
|
||||
k8s.io/apimachinery v0.0.0-20240530220031-733a95eb52c3 h1:wKdO12WPN63kX3M6aJQqn6IaacuD1sZw1CswMGfjmJk=
|
||||
k8s.io/apimachinery v0.0.0-20240530220031-733a95eb52c3/go.mod h1:ClkKrTMwhmMjgsEHpX2w3F+YKj0ctDOaAxqL7clxG0U=
|
||||
k8s.io/client-go v0.0.0-20240531003927-52e5651101ed h1:0caFt79cyI5V9Kx+Mgx+1Ew7Y7P5Xbs2JTg6Obt+UeQ=
|
||||
k8s.io/client-go v0.0.0-20240531003927-52e5651101ed/go.mod h1:eKPERGqe84mnn/p9CKDZxsnjDJAMSlwr4L/be7kWLh0=
|
||||
k8s.io/apimachinery v0.0.0-20240603234208-703232ea6da4 h1:On2XjWF6loaEJ/GLZH+ZyovYHWfOZUgl9qUdC8Mn05o=
|
||||
k8s.io/apimachinery v0.0.0-20240603234208-703232ea6da4/go.mod h1:ClkKrTMwhmMjgsEHpX2w3F+YKj0ctDOaAxqL7clxG0U=
|
||||
k8s.io/client-go v0.0.0-20240604003933-7c6e307a725f h1:7A1fNtVKUFjDecsZm2r/Ec3DtdVhuylFo6Hst/cD+HI=
|
||||
k8s.io/client-go v0.0.0-20240604003933-7c6e307a725f/go.mod h1:5farkvv+2p9AXn5JduImtwwykMh2JcZQO9mlYXp2qEA=
|
||||
k8s.io/component-base v0.0.0-20240531004836-3486e8d18caf h1:+fSEnCvnEYfGmGtp8/KAmuCG2H/DG91PNcbneTJTRe8=
|
||||
k8s.io/component-base v0.0.0-20240531004836-3486e8d18caf/go.mod h1:+RdT910JB0Ke8zGWTWKLba1mVLmboEEJzaj5m6AMT+o=
|
||||
k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=
|
||||
|
|
|
@ -300,7 +300,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) {
|
|||
store.Add(elem)
|
||||
}
|
||||
|
||||
wci, err := newCacheIntervalFromStore(numObjects, store, getAttrsFunc)
|
||||
wci, err := newCacheIntervalFromStore(numObjects, store, getAttrsFunc, "", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -622,7 +622,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||
defer c.watchCache.RUnlock()
|
||||
|
||||
var cacheInterval *watchCacheInterval
|
||||
cacheInterval, err = c.watchCache.getAllEventsSinceLocked(requiredResourceVersion, opts)
|
||||
cacheInterval, err = c.watchCache.getAllEventsSinceLocked(requiredResourceVersion, key, opts)
|
||||
if err != nil {
|
||||
// To match the uncached watch implementation, once we have passed authn/authz/admission,
|
||||
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
|
||||
|
|
|
@ -712,9 +712,10 @@ func (w *watchCache) isIndexValidLocked(index int) bool {
|
|||
// getAllEventsSinceLocked returns a watchCacheInterval that can be used to
|
||||
// retrieve events since a certain resourceVersion. This function assumes to
|
||||
// be called under the watchCache lock.
|
||||
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) {
|
||||
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string, opts storage.ListOptions) (*watchCacheInterval, error) {
|
||||
_, matchesSingle := opts.Predicate.MatchesSingle()
|
||||
if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
|
||||
return w.getIntervalFromStoreLocked()
|
||||
return w.getIntervalFromStoreLocked(key, matchesSingle)
|
||||
}
|
||||
|
||||
size := w.endIndex - w.startIndex
|
||||
|
@ -743,7 +744,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storag
|
|||
// current state and only then start watching from that point.
|
||||
//
|
||||
// TODO: In v2 api, we should stop returning the current state - #13969.
|
||||
return w.getIntervalFromStoreLocked()
|
||||
return w.getIntervalFromStoreLocked(key, matchesSingle)
|
||||
}
|
||||
// SendInitialEvents = false and resourceVersion = 0
|
||||
// means that the request would like to start watching
|
||||
|
@ -769,8 +770,8 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storag
|
|||
// getIntervalFromStoreLocked returns a watchCacheInterval
|
||||
// that covers the entire storage state.
|
||||
// This function assumes to be called under the watchCache lock.
|
||||
func (w *watchCache) getIntervalFromStoreLocked() (*watchCacheInterval, error) {
|
||||
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
|
||||
func (w *watchCache) getIntervalFromStoreLocked(key string, matchesSingle bool) (*watchCacheInterval, error) {
|
||||
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc, key, matchesSingle)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -133,9 +133,22 @@ func (s sortableWatchCacheEvents) Swap(i, j int) {
|
|||
// returned by Next() need to be events from a List() done on the underlying store of
|
||||
// the watch cache.
|
||||
// The items returned in the interval will be sorted by Key.
|
||||
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc) (*watchCacheInterval, error) {
|
||||
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
|
||||
buffer := &watchCacheIntervalBuffer{}
|
||||
allItems := store.List()
|
||||
var allItems []interface{}
|
||||
|
||||
if matchesSingle {
|
||||
item, exists, err := store.GetByKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if exists {
|
||||
allItems = append(allItems, item)
|
||||
}
|
||||
} else {
|
||||
allItems = store.List()
|
||||
}
|
||||
buffer.buffer = make([]*watchCacheEvent, len(allItems))
|
||||
for i, item := range allItems {
|
||||
elem, ok := item.(*storeElement)
|
||||
|
|
|
@ -391,7 +391,7 @@ func TestCacheIntervalNextFromStore(t *testing.T) {
|
|||
store.Add(elem)
|
||||
}
|
||||
|
||||
wci, err := newCacheIntervalFromStore(rv, store, getAttrsFunc)
|
||||
wci, err := newCacheIntervalFromStore(rv, store, getAttrsFunc, "", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts
|
|||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
|
||||
return w.getAllEventsSinceLocked(resourceVersion, opts)
|
||||
return w.getAllEventsSinceLocked(resourceVersion, "", opts)
|
||||
}
|
||||
|
||||
// newTestWatchCache just adds a fake clock.
|
||||
|
|
|
@ -118,7 +118,7 @@ func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set)
|
|||
// MatchesSingleNamespace will return (namespace, true) if and only if s.Field matches on the object's
|
||||
// namespace.
|
||||
func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) {
|
||||
if len(s.Continue) > 0 {
|
||||
if len(s.Continue) > 0 || s.Field == nil {
|
||||
return "", false
|
||||
}
|
||||
if namespace, ok := s.Field.RequiresExactMatch("metadata.namespace"); ok {
|
||||
|
@ -130,7 +130,7 @@ func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) {
|
|||
// MatchesSingle will return (name, true) if and only if s.Field matches on the object's
|
||||
// name.
|
||||
func (s *SelectionPredicate) MatchesSingle() (string, bool) {
|
||||
if len(s.Continue) > 0 {
|
||||
if len(s.Continue) > 0 || s.Field == nil {
|
||||
return "", false
|
||||
}
|
||||
// TODO: should be namespace.name
|
||||
|
|
Loading…
Reference in New Issue