diff --git a/pkg/registry/generic/options.go b/pkg/registry/generic/options.go index af651371f..fb9adba1a 100644 --- a/pkg/registry/generic/options.go +++ b/pkg/registry/generic/options.go @@ -47,6 +47,6 @@ type RESTOptionsGetter interface { // StoreOptions is set of configuration options used to complete generic registries. type StoreOptions struct { RESTOptions RESTOptionsGetter - TriggerFunc storage.TriggerPublisherFunc + TriggerFunc storage.TriggerPublisherFuncs AttrFunc storage.AttrFunc } diff --git a/pkg/registry/generic/registry/storage_factory.go b/pkg/registry/generic/registry/storage_factory.go index 6b34fc1e4..acefc2386 100644 --- a/pkg/registry/generic/registry/storage_factory.go +++ b/pkg/registry/generic/registry/storage_factory.go @@ -39,7 +39,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator { newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) { + triggerFuncs storage.TriggerPublisherFuncs) (storage.Interface, factory.DestroyFunc, error) { s, d, err := generic.NewRawStorage(storageConfig) if err != nil { @@ -56,16 +56,16 @@ func StorageWithCacher(capacity int) generic.StorageDecorator { // TODO: we would change this later to make storage always have cacher and hide low level KV layer inside. // Currently it has two layers of same storage interface -- cacher and low level kv. cacherConfig := cacherstorage.Config{ - CacheCapacity: capacity, - Storage: s, - Versioner: etcd3.APIObjectVersioner{}, - ResourcePrefix: resourcePrefix, - KeyFunc: keyFunc, - NewFunc: newFunc, - NewListFunc: newListFunc, - GetAttrsFunc: getAttrsFunc, - TriggerPublisherFunc: triggerFunc, - Codec: storageConfig.Codec, + CacheCapacity: capacity, + Storage: s, + Versioner: etcd3.APIObjectVersioner{}, + ResourcePrefix: resourcePrefix, + KeyFunc: keyFunc, + NewFunc: newFunc, + NewListFunc: newListFunc, + GetAttrsFunc: getAttrsFunc, + TriggerPublisherFuncs: triggerFuncs, + Codec: storageConfig.Codec, } cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig) if err != nil { diff --git a/pkg/registry/generic/registry/store.go b/pkg/registry/generic/registry/store.go index ee3c4fb8d..be03caa41 100644 --- a/pkg/registry/generic/registry/store.go +++ b/pkg/registry/generic/registry/store.go @@ -1287,11 +1287,6 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { return e.KeyFunc(genericapirequest.NewContext(), accessor.GetName()) } - triggerFunc := options.TriggerFunc - if triggerFunc == nil { - triggerFunc = storage.NoTriggerPublisher - } - if e.DeleteCollectionWorkers == 0 { e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers } @@ -1318,7 +1313,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { e.NewFunc, e.NewListFunc, attrFunc, - triggerFunc, + options.TriggerFunc, ) if err != nil { return err diff --git a/pkg/registry/generic/storage_decorator.go b/pkg/registry/generic/storage_decorator.go index 6509ef831..1c553413e 100644 --- a/pkg/registry/generic/storage_decorator.go +++ b/pkg/registry/generic/storage_decorator.go @@ -32,7 +32,7 @@ type StorageDecorator func( newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) + trigger storage.TriggerPublisherFuncs) (storage.Interface, factory.DestroyFunc, error) // UndecoratedStorage returns the given a new storage from the given config // without any decoration. @@ -43,7 +43,7 @@ func UndecoratedStorage( newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) { + trigger storage.TriggerPublisherFuncs) (storage.Interface, factory.DestroyFunc, error) { return NewRawStorage(config) } diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index 2c4c1a354..2b701084d 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -88,9 +88,9 @@ type Config struct { // GetAttrsFunc is used to get object labels, fields GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, err error) - // TriggerPublisherFunc is used for optimizing amount of watchers that + // TriggerPublisherFuncs is used for optimizing amount of watchers that // needs to process an incoming event. - TriggerPublisherFunc storage.TriggerPublisherFunc + TriggerPublisherFuncs storage.TriggerPublisherFuncs // NewFunc is a function that creates new empty object storing a object of type Type. NewFunc func() runtime.Object @@ -209,6 +209,11 @@ func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher { type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool +type indexedTriggerFunc struct { + indexName string + triggerFunc storage.TriggerPublisherFunc +} + // Cacher is responsible for serving WATCH and LIST requests for a given // resource from its internal cache and updating its cache in the background // based on the underlying storage contents. @@ -248,9 +253,9 @@ type Cacher struct { // newFunc is a function that creates new empty object storing a object of type Type. newFunc func() runtime.Object - // triggerFunc is used for optimizing amount of watchers that needs to process + // indexedTrigger is used for optimizing amount of watchers that needs to process // an incoming event. - triggerFunc storage.TriggerPublisherFunc + indexedTrigger *indexedTriggerFunc // watchers is mapping from the value of trigger function that a // watcher is interested into the watchers watcherIdx int @@ -300,15 +305,32 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { return nil, fmt.Errorf("storage codec doesn't seem to match given type: %v", err) } + var indexedTrigger *indexedTriggerFunc + if config.TriggerPublisherFuncs != nil { + // For now, we don't support multiple trigger functions defined + // for a given resource. + if len(config.TriggerPublisherFuncs) > 1 { + return nil, fmt.Errorf("cacher %s doesn't support more than one TriggerPublisherFunc: ", reflect.TypeOf(obj).String()) + } + for key, value := range config.TriggerPublisherFuncs { + if value != nil { + indexedTrigger = &indexedTriggerFunc{ + indexName: key, + triggerFunc: value, + } + } + } + } + clock := clock.RealClock{} cacher := &Cacher{ - ready: newReady(), - storage: config.Storage, - objectType: reflect.TypeOf(obj), - versioner: config.Versioner, - newFunc: config.NewFunc, - triggerFunc: config.TriggerPublisherFunc, - watcherIdx: 0, + ready: newReady(), + storage: config.Storage, + objectType: reflect.TypeOf(obj), + versioner: config.Versioner, + newFunc: config.NewFunc, + indexedTrigger: indexedTrigger, + watcherIdx: 0, watchers: indexedWatchers{ allWatchers: make(map[int]*cacheWatcher), valueWatchers: make(map[string]watchersMap), @@ -419,23 +441,27 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.ready.wait() triggerValue, triggerSupported := "", false - // TODO: Currently we assume that in a given Cacher object, any that is - // passed here is aware of exactly the same trigger (at most one). - // Thus, either 0 or 1 values will be returned. - if matchValues := pred.MatcherIndex(); len(matchValues) > 0 { - triggerValue, triggerSupported = matchValues[0].Value, true + if c.indexedTrigger != nil { + for _, field := range pred.IndexFields { + if field == c.indexedTrigger.indexName { + if value, ok := pred.Field.RequiresExactMatch(field); ok { + triggerValue, triggerSupported = value, true + } + } + } } - // If there is triggerFunc defined, but triggerSupported is false, + // If there is indexedTrigger defined, but triggerSupported is false, // we can't narrow the amount of events significantly at this point. // - // That said, currently triggerFunc is defined only for Pods and Nodes, - // and there is only constant number of watchers for which triggerSupported - // is false (excluding those issues explicitly by users). + // That said, currently indexedTrigger is defined only for couple resources: + // Pods, Nodes, Secrets and ConfigMaps and there is only a constant + // number of watchers for which triggerSupported is false (excluding those + // issued explicitly by users). // Thus, to reduce the risk of those watchers blocking all watchers of a // given resource in the system, we increase the sizes of buffers for them. chanSize := 10 - if c.triggerFunc != nil && !triggerSupported { + if c.indexedTrigger != nil && !triggerSupported { // TODO: We should tune this value and ideally make it dependent on the // number of objects of a given type and/or their churn. chanSize = 1000 @@ -711,29 +737,20 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) { } func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { - // TODO: Currently we assume that in a given Cacher object, its - // is aware of exactly the same trigger (at most one). Thus calling: - // c.triggerFunc() - // can return only 0 or 1 values. - // That means, that triggerValues itself may return up to 2 different values. - if c.triggerFunc == nil { + if c.indexedTrigger == nil { return nil, false } + result := make([]string, 0, 2) - matchValues := c.triggerFunc(event.Object) - if len(matchValues) > 0 { - result = append(result, matchValues[0].Value) - } + result = append(result, c.indexedTrigger.triggerFunc(event.Object)) if event.PrevObject == nil { - return result, len(result) > 0 + return result, true } - prevMatchValues := c.triggerFunc(event.PrevObject) - if len(prevMatchValues) > 0 { - if len(result) == 0 || result[0] != prevMatchValues[0].Value { - result = append(result, prevMatchValues[0].Value) - } + prevTriggerValue := c.indexedTrigger.triggerFunc(event.PrevObject) + if result[0] != prevTriggerValue { + result = append(result, prevTriggerValue) } - return result, len(result) > 0 + return result, true } func (c *Cacher) processEvent(event *watchCacheEvent) { diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index e649c87f5..c850b2484 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -73,16 +73,15 @@ type ResponseMeta struct { ResourceVersion uint64 } -// MatchValue defines a pair (, ). -type MatchValue struct { - IndexName string - Value string -} +// TriggerPublisherFunc is a function that for a given object computes +// for a particular . +// TODO(wojtek-t): Rename to IndexerFunc? +type TriggerPublisherFunc func(obj runtime.Object) string -// TriggerPublisherFunc is a function that takes an object, and returns a list of pairs -// (, ) for all indexes known -// to that function. -type TriggerPublisherFunc func(obj runtime.Object) []MatchValue +// TriggerPublisherFuncs is a mapping from to function that +// for a given object computes . +// TODO(wojtek-t): Rename to IndexerFuncs? +type TriggerPublisherFuncs map[string]TriggerPublisherFunc // Everything accepts all objects. var Everything = SelectionPredicate{ diff --git a/pkg/storage/selection_predicate.go b/pkg/storage/selection_predicate.go index b2f8c8e88..66d8d1f95 100644 --- a/pkg/storage/selection_predicate.go +++ b/pkg/storage/selection_predicate.go @@ -124,20 +124,6 @@ func (s *SelectionPredicate) MatchesSingle() (string, bool) { return "", false } -// For any index defined by IndexFields, if a matcher can match only (a subset) -// of objects that return for a given index, a pair (, ) -// wil be returned. -// TODO: Consider supporting also labels. -func (s *SelectionPredicate) MatcherIndex() []MatchValue { - var result []MatchValue - for _, field := range s.IndexFields { - if value, ok := s.Field.RequiresExactMatch(field); ok { - result = append(result, MatchValue{IndexName: field, Value: value}) - } - } - return result -} - // Empty returns true if the predicate performs no filtering. func (s *SelectionPredicate) Empty() bool { return s.Label.Empty() && s.Field.Empty() diff --git a/pkg/storage/util.go b/pkg/storage/util.go index 8c571b1c8..9da8d9713 100644 --- a/pkg/storage/util.go +++ b/pkg/storage/util.go @@ -39,14 +39,6 @@ func EverythingFunc(runtime.Object) bool { return true } -func NoTriggerFunc() []MatchValue { - return nil -} - -func NoTriggerPublisher(runtime.Object) []MatchValue { - return nil -} - func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) { meta, err := meta.Accessor(obj) if err != nil {