Merge pull request #554 from sttts/sttts-engine-cached-client

controller/engine: use local cache for client read requests
This commit is contained in:
Nic Cope 2023-10-13 14:17:18 -07:00 committed by GitHub
commit 80a3e75d99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 406 additions and 24 deletions

287
pkg/controller/cache.go Normal file
View File

@ -0,0 +1,287 @@
/*
Copyright 2023 The Crossplane Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"strings"
"sync"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"github.com/crossplane/crossplane-runtime/pkg/errors"
)
// GVKRoutedCache is a cache that routes requests by GVK to other caches.
type GVKRoutedCache struct {
scheme *runtime.Scheme
fallback cache.Cache
lock sync.RWMutex
delegates map[schema.GroupVersionKind]cache.Cache
}
// NewGVKRoutedCache returns a new routed cache.
func NewGVKRoutedCache(scheme *runtime.Scheme, fallback cache.Cache) *GVKRoutedCache {
return &GVKRoutedCache{
scheme: scheme,
fallback: fallback,
delegates: make(map[schema.GroupVersionKind]cache.Cache),
}
}
var _ cache.Cache = &GVKRoutedCache{}
// AddDelegate adds a delegated cache for a given GVK.
func (c *GVKRoutedCache) AddDelegate(gvk schema.GroupVersionKind, delegate cache.Cache) {
c.lock.Lock()
defer c.lock.Unlock()
c.delegates[gvk] = delegate
}
// RemoveDelegate removes a delegated cache for a given GVK.
func (c *GVKRoutedCache) RemoveDelegate(gvk schema.GroupVersionKind) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.delegates, gvk)
}
// Get retrieves an object for a given ObjectKey backed by a cache.
func (c *GVKRoutedCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return errors.Errorf("failed to get GVK for type %T: %w", obj, err)
}
c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()
if ok {
return delegate.Get(ctx, key, obj, opts...)
}
return c.fallback.Get(ctx, key, obj, opts...)
}
// List lists objects for a given ObjectList backed by a cache.
func (c *GVKRoutedCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
gvk, err := apiutil.GVKForObject(list, c.scheme)
if err != nil {
return errors.Errorf("failed to get GVK for type %T: %w", list, err)
}
if !strings.HasSuffix(gvk.Kind, "List") {
// following controller-runtime here which does not support non
// <Kind>List types.
return errors.Errorf("non-list type %T (kind %q) passed as output", list, gvk)
}
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()
if ok {
return delegate.List(ctx, list, opts...)
}
return c.fallback.List(ctx, list, opts...)
}
// GetInformer returns an informer for the given object.
func (c *GVKRoutedCache) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return nil, errors.Errorf("failed to get GVK for type %T: %w", obj, err)
}
c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()
if ok {
return delegate.GetInformer(ctx, obj, opts...)
}
return c.fallback.GetInformer(ctx, obj, opts...)
}
// GetInformerForKind returns an informer for the given GVK.
func (c *GVKRoutedCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) {
c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()
if ok {
return delegate.GetInformerForKind(ctx, gvk, opts...)
}
return c.fallback.GetInformerForKind(ctx, gvk, opts...)
}
// Start for a GVKRoutedCache is a no-op. Start must be called for each delegate.
func (c *GVKRoutedCache) Start(_ context.Context) error {
return nil
}
// WaitForCacheSync for a GVKRoutedCache waits for all delegates to sync, and
// returns false if any of them fails to sync.
func (c *GVKRoutedCache) WaitForCacheSync(ctx context.Context) bool {
c.lock.RLock()
syncedCh := make(chan bool, len(c.delegates)+1)
cas := make([]cache.Cache, 0, len(c.delegates))
for _, ca := range c.delegates {
cas = append(cas, ca)
}
cas = append(cas, c.fallback)
c.lock.RUnlock()
var wg sync.WaitGroup
ctx, cancelFn := context.WithCancel(ctx)
for _, ca := range cas {
wg.Add(1)
go func(ca cache.Cache) {
defer wg.Done()
synced := ca.WaitForCacheSync(ctx)
if !synced {
// first unsynced cache breaks the whole wait
cancelFn()
}
syncedCh <- synced
}(ca)
}
wg.Wait()
close(syncedCh)
cancelFn()
// any not synced?
for synced := range syncedCh {
if !synced {
return false
}
}
return true
}
// IndexField adds an index with the given field name on the given object type
// by using the given function to extract the value for that field.
func (c *GVKRoutedCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return errors.Errorf("failed to get GVK for type %T: %w", obj, err)
}
c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()
if ok {
return delegate.IndexField(ctx, obj, field, extractValue)
}
return c.fallback.IndexField(ctx, obj, field, extractValue)
}
// cachedRoutedClient wraps a client and routes read requests by GVK to a cache.
type cachedRoutedClient struct {
client.Client
scheme *runtime.Scheme
cache *GVKRoutedCache
}
func (c *cachedRoutedClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return errors.Errorf("failed to get GVK for type %T: %w", obj, err)
}
c.cache.lock.RLock()
delegate, ok := c.cache.delegates[gvk]
c.cache.lock.RUnlock()
if ok {
return delegate.Get(ctx, key, obj, opts...)
}
return c.Client.Get(ctx, key, obj, opts...)
}
func (c *cachedRoutedClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
gvk, err := apiutil.GVKForObject(list, c.scheme)
if err != nil {
return errors.Errorf("failed to get GVK for type %T: %w", list, err)
}
if !strings.HasSuffix(gvk.Kind, "List") {
// following controller-runtime here which does not support non
// <Kind>List types.
return errors.Errorf("non-list type %T (kind %q) passed as output", list, gvk)
}
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
c.cache.lock.RLock()
delegate, ok := c.cache.delegates[gvk]
c.cache.lock.RUnlock()
if ok {
return delegate.List(ctx, list, opts...)
}
return c.Client.List(ctx, list, opts...)
}
// WithGVKRoutedCache returns a manager backed by a GVKRoutedCache. The client
// returned by the manager will route read requests to cached GVKs.
func WithGVKRoutedCache(c *GVKRoutedCache, mgr controllerruntime.Manager) controllerruntime.Manager {
return &routedManager{
Manager: mgr,
client: &cachedRoutedClient{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
cache: c,
},
cache: c,
}
}
type routedManager struct {
controllerruntime.Manager
client client.Client
cache cache.Cache
}
func (m *routedManager) GetClient() client.Client {
return m.client
}
func (m *routedManager) GetCache() cache.Cache {
return m.cache
}

View File

@ -24,6 +24,7 @@ import (
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
@ -176,51 +177,107 @@ func TriggeredBy(source source.Source, h handler.EventHandler, p ...predicate.Pr
// the supplied options, and configured with the supplied watches. Start does
// not block.
func (e *Engine) Start(name string, o controller.Options, w ...Watch) error {
if e.IsRunning(name) {
return nil
c, err := e.Create(name, o, w...)
if err != nil {
return err
}
return c.Start(context.Background())
}
ctx, stop := context.WithCancel(context.Background())
e.mx.Lock()
e.started[name] = stop
e.errors[name] = nil
e.mx.Unlock()
// NamedController is a controller that's not yet started. It gives access to
// the underlying cache, which may be used e.g. to add indexes.
type NamedController interface {
Start(ctx context.Context) error
GetCache() cache.Cache
}
// Each controller gets its own cache because there's currently no way to
// stop an informer. In practice a controller-runtime cache is a map of
// kinds to informers. If we delete the CRD for a kind we need to stop the
// relevant informer, or it will spew errors about the kind not existing. We
// work around this by stopping the entire cache.
type namedController struct {
name string
e *Engine
ca cache.Cache
ctrl controller.Controller
}
// Create the named controller. Each controller gets its own cache
// whose lifecycle is coupled to the controller. The controller is created with
// the supplied options, and configured with the supplied watches. It is not
// started yet.
func (e *Engine) Create(name string, o controller.Options, w ...Watch) (NamedController, error) {
// Each controller gets its own cache for the GVKs it owns. This cache is
// wrapped by a GVKRoutedCache that routes requests to other GVKs to the
// manager's cache. This way we can share informers for composed resources
// (that's where this is primarily used) with other controllers, but get
// control about the lifecycle of the owned GVKs' informers.
ca, err := e.newCache(e.mgr.GetConfig(), cache.Options{Scheme: e.mgr.GetScheme(), Mapper: e.mgr.GetRESTMapper()})
if err != nil {
return errors.Wrap(err, errCreateCache)
return nil, errors.Wrap(err, errCreateCache)
}
ctrl, err := e.newCtrl(name, e.mgr, o)
// Wrap the existing manager to use our cache for the GVKs of this controller.
rc := NewGVKRoutedCache(e.mgr.GetScheme(), e.mgr.GetCache())
rm := &routedManager{
Manager: e.mgr,
client: &cachedRoutedClient{
Client: e.mgr.GetClient(),
scheme: e.mgr.GetScheme(),
cache: rc,
},
cache: rc,
}
ctrl, err := e.newCtrl(name, rm, o)
if err != nil {
return errors.Wrap(err, errCreateController)
return nil, errors.Wrap(err, errCreateController)
}
for _, wt := range w {
if wt.customSource != nil {
if err := ctrl.Watch(wt.customSource, wt.handler, wt.predicates...); err != nil {
return errors.Wrap(err, errWatch)
return nil, errors.Wrap(err, errWatch)
}
continue
}
// route cache and client (read) requests to our cache for this GVK.
gvk, err := apiutil.GVKForObject(wt.kind, e.mgr.GetScheme())
if err != nil {
return nil, errors.Wrapf(err, "failed to get GVK for type %T", wt.kind)
}
rc.AddDelegate(gvk, ca)
if err := ctrl.Watch(source.Kind(ca, wt.kind), wt.handler, wt.predicates...); err != nil {
return errors.Wrap(err, errWatch)
return nil, errors.Wrap(err, errWatch)
}
}
return &namedController{name: name, e: e, ca: ca, ctrl: ctrl}, nil
}
// Start the named controller. Start does not block.
func (c *namedController) Start(ctx context.Context) error {
if c.e.IsRunning(c.name) {
return nil
}
ctx, stop := context.WithCancel(ctx)
c.e.mx.Lock()
c.e.started[c.name] = stop
c.e.errors[c.name] = nil
c.e.mx.Unlock()
go func() {
<-e.mgr.Elected()
e.done(name, errors.Wrap(ca.Start(ctx), errCrashCache))
<-c.e.mgr.Elected()
c.e.done(c.name, errors.Wrap(c.ca.Start(ctx), errCrashCache))
}()
go func() {
<-e.mgr.Elected()
e.done(name, errors.Wrap(ctrl.Start(ctx), errCrashController))
<-c.e.mgr.Elected()
c.e.done(c.name, errors.Wrap(c.ctrl.Start(ctx), errCrashController))
}()
return nil
}
// GetCache returns the cache used by the named controller.
func (c *namedController) GetCache() cache.Cache {
return c.ca
}

View File

@ -22,6 +22,8 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/controller"
@ -92,7 +94,11 @@ func TestEngine(t *testing.T) {
},
"NewControllerError": {
reason: "Errors creating a new controller should be returned",
e: NewEngine(&fake.Manager{},
e: NewEngine(
&fake.Manager{
Scheme: runtime.NewScheme(),
Cache: &MockCache{},
},
WithNewCacheFn(func(*rest.Config, cache.Options) (cache.Cache, error) { return nil, nil }),
WithNewControllerFn(func(string, manager.Manager, controller.Options) (controller.Controller, error) { return nil, errBoom }),
),
@ -105,7 +111,11 @@ func TestEngine(t *testing.T) {
},
"WatchError": {
reason: "Errors adding a watch should be returned",
e: NewEngine(&fake.Manager{},
e: NewEngine(
&fake.Manager{
Scheme: runtime.NewScheme(),
Cache: &MockCache{},
},
WithNewCacheFn(func(*rest.Config, cache.Options) (cache.Cache, error) { return nil, nil }),
WithNewControllerFn(func(string, manager.Manager, controller.Options) (controller.Controller, error) {
c := &MockController{MockWatch: func(source.Source, handler.EventHandler, ...predicate.Predicate) error { return errBoom }}
@ -114,12 +124,35 @@ func TestEngine(t *testing.T) {
),
args: args{
name: "coolcontroller",
w: []Watch{For(&fake.Managed{}, nil)},
w: []Watch{For(&unstructured.Unstructured{
Object: map[string]interface{}{"apiVersion": "example.org/v1", "kind": "Thing"},
}, nil)},
},
want: want{
err: errors.Wrap(errBoom, errWatch),
},
},
"SchemeError": {
reason: "Passing an object of unknown GVK",
e: NewEngine(
&fake.Manager{
Scheme: runtime.NewScheme(),
Cache: &MockCache{},
},
WithNewCacheFn(func(*rest.Config, cache.Options) (cache.Cache, error) { return nil, nil }),
WithNewControllerFn(func(string, manager.Manager, controller.Options) (controller.Controller, error) {
c := &MockController{MockWatch: func(source.Source, handler.EventHandler, ...predicate.Predicate) error { return errBoom }}
return c, nil
}),
),
args: args{
name: "coolcontroller",
w: []Watch{For(&unstructured.Unstructured{}, nil)},
},
want: want{
err: errors.Wrap(runtime.NewMissingKindErr("unstructured object has no kind"), "failed to get GVK for type *unstructured.Unstructured"),
},
},
"CacheCrashError": {
reason: "Errors starting or running a cache should be returned",
e: NewEngine(&fake.Manager{},

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
@ -442,6 +443,7 @@ func (m *CompositeClaim) DeepCopyObject() runtime.Object {
type Manager struct {
manager.Manager
Cache cache.Cache
Client client.Client
Scheme *runtime.Scheme
Config *rest.Config
@ -456,6 +458,9 @@ func (m *Manager) Elected() <-chan struct{} {
return e
}
// GetCache returns the cache.
func (m *Manager) GetCache() cache.Cache { return m.Cache }
// GetClient returns the client.
func (m *Manager) GetClient() client.Client { return m.Client }