mirror of https://github.com/rancher/lasso.git
259 lines
7.8 KiB
Go
259 lines
7.8 KiB
Go
package controller
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/rancher/lasso/pkg/cache"
|
|
"github.com/rancher/lasso/pkg/client"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/util/workqueue"
|
|
)
|
|
|
|
type SharedControllerFactory interface {
|
|
ForObject(obj runtime.Object) (SharedController, error)
|
|
ForKind(gvk schema.GroupVersionKind) (SharedController, error)
|
|
ForResource(gvr schema.GroupVersionResource, namespaced bool) SharedController
|
|
ForResourceKind(gvr schema.GroupVersionResource, kind string, namespaced bool) SharedController
|
|
SharedCacheFactory() cache.SharedCacheFactory
|
|
Start(ctx context.Context, workers int) error
|
|
}
|
|
|
|
type SharedControllerFactoryOptions struct {
|
|
CacheOptions *cache.SharedCacheFactoryOptions
|
|
|
|
DefaultRateLimiter workqueue.RateLimiter
|
|
DefaultWorkers int
|
|
|
|
KindRateLimiter map[schema.GroupVersionKind]workqueue.RateLimiter
|
|
KindWorkers map[schema.GroupVersionKind]int
|
|
|
|
// SyncOnlyChangedObjects causes the handle function to only proceed if the object was actually updated.
|
|
// This is intended to be used by applications with many objects and/or controllers types that have
|
|
// alternative means of rerunning when necessary. When the informer's resync their cache the update
|
|
// function is run. If this setting is enabled, when the update handler is triggered the overhead is
|
|
// reduced but has the tradeoff of not rerunning handlers. Handlers that rely on external objects or
|
|
// services, or experience a bug might need to rerun despite the respective object not changing. If this
|
|
// is enabled, it is the responsibility of the app to ensure logic is retried when needed. The result is
|
|
// that running the handler func on resync will mostly only serve the purpose of catching missed cache
|
|
// events.
|
|
SyncOnlyChangedObjects bool
|
|
}
|
|
|
|
type sharedControllerFactory struct {
|
|
controllerLock sync.RWMutex
|
|
|
|
sharedCacheFactory cache.SharedCacheFactory
|
|
controllers map[schema.GroupVersionResource]*sharedController
|
|
|
|
rateLimiter workqueue.RateLimiter
|
|
workers int
|
|
kindRateLimiter map[schema.GroupVersionKind]workqueue.RateLimiter
|
|
kindWorkers map[schema.GroupVersionKind]int
|
|
|
|
syncOnlyChangedObjects bool
|
|
}
|
|
|
|
func NewSharedControllerFactoryFromConfig(config *rest.Config, scheme *runtime.Scheme) (SharedControllerFactory, error) {
|
|
cf, err := client.NewSharedClientFactory(config, &client.SharedClientFactoryOptions{
|
|
Scheme: scheme,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return NewSharedControllerFactory(cache.NewSharedCachedFactory(cf, nil), nil), nil
|
|
}
|
|
|
|
// NewSharedControllerFactoryFromConfigWithOptions accepts options for configuring a new SharedControllerFactory and its
|
|
// cache.
|
|
func NewSharedControllerFactoryFromConfigWithOptions(config *rest.Config, scheme *runtime.Scheme, opts *SharedControllerFactoryOptions) (SharedControllerFactory, error) {
|
|
cf, err := client.NewSharedClientFactory(config, &client.SharedClientFactoryOptions{
|
|
Scheme: scheme,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var cacheOpts *cache.SharedCacheFactoryOptions
|
|
if opts != nil {
|
|
cacheOpts = opts.CacheOptions
|
|
}
|
|
return NewSharedControllerFactory(cache.NewSharedCachedFactory(cf, cacheOpts), opts), nil
|
|
}
|
|
|
|
func NewSharedControllerFactory(cacheFactory cache.SharedCacheFactory, opts *SharedControllerFactoryOptions) SharedControllerFactory {
|
|
opts = applyDefaultSharedOptions(opts)
|
|
return &sharedControllerFactory{
|
|
sharedCacheFactory: cacheFactory,
|
|
controllers: map[schema.GroupVersionResource]*sharedController{},
|
|
workers: opts.DefaultWorkers,
|
|
kindWorkers: opts.KindWorkers,
|
|
rateLimiter: opts.DefaultRateLimiter,
|
|
kindRateLimiter: opts.KindRateLimiter,
|
|
syncOnlyChangedObjects: opts.SyncOnlyChangedObjects,
|
|
}
|
|
}
|
|
|
|
func applyDefaultSharedOptions(opts *SharedControllerFactoryOptions) *SharedControllerFactoryOptions {
|
|
var newOpts SharedControllerFactoryOptions
|
|
if opts != nil {
|
|
newOpts = *opts
|
|
}
|
|
if newOpts.DefaultWorkers == 0 {
|
|
newOpts.DefaultWorkers = 5
|
|
}
|
|
return &newOpts
|
|
}
|
|
|
|
func (s *sharedControllerFactory) EnableSyncOnlyChangedObjects() {
|
|
s.syncOnlyChangedObjects = true
|
|
}
|
|
|
|
func (s *sharedControllerFactory) Start(ctx context.Context, defaultWorkers int) error {
|
|
s.controllerLock.Lock()
|
|
defer s.controllerLock.Unlock()
|
|
|
|
if err := s.sharedCacheFactory.Start(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// copy so we can release the lock during cache wait
|
|
controllersCopy := map[schema.GroupVersionResource]*sharedController{}
|
|
for k, v := range s.controllers {
|
|
controllersCopy[k] = v
|
|
}
|
|
|
|
// Do not hold lock while waiting because this can cause a deadlock if
|
|
// one of the handlers you are waiting on tries to acquire this lock (by looking up
|
|
// shared controller)
|
|
s.controllerLock.Unlock()
|
|
s.sharedCacheFactory.WaitForCacheSync(ctx)
|
|
s.controllerLock.Lock()
|
|
|
|
for gvr, controller := range controllersCopy {
|
|
w, err := s.getWorkers(gvr, defaultWorkers)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := controller.Start(ctx, w); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *sharedControllerFactory) ForObject(obj runtime.Object) (SharedController, error) {
|
|
gvk, err := s.sharedCacheFactory.SharedClientFactory().GVKForObject(obj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.ForKind(gvk)
|
|
}
|
|
|
|
func (s *sharedControllerFactory) ForKind(gvk schema.GroupVersionKind) (SharedController, error) {
|
|
gvr, nsed, err := s.sharedCacheFactory.SharedClientFactory().ResourceForGVK(gvk)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return s.ForResourceKind(gvr, gvk.Kind, nsed), nil
|
|
}
|
|
|
|
func (s *sharedControllerFactory) ForResource(gvr schema.GroupVersionResource, namespaced bool) SharedController {
|
|
return s.ForResourceKind(gvr, "", namespaced)
|
|
}
|
|
|
|
func (s *sharedControllerFactory) ForResourceKind(gvr schema.GroupVersionResource, kind string, namespaced bool) SharedController {
|
|
controllerResult := s.byResource(gvr)
|
|
if controllerResult != nil {
|
|
return controllerResult
|
|
}
|
|
|
|
s.controllerLock.Lock()
|
|
defer s.controllerLock.Unlock()
|
|
|
|
controllerResult = s.controllers[gvr]
|
|
if controllerResult != nil {
|
|
return controllerResult
|
|
}
|
|
|
|
client := s.sharedCacheFactory.SharedClientFactory().ForResourceKind(gvr, kind, namespaced)
|
|
|
|
handler := &SharedHandler{ControllerName: gvr.String()}
|
|
|
|
controllerResult = &sharedController{
|
|
deferredController: func() (Controller, error) {
|
|
var (
|
|
gvk schema.GroupVersionKind
|
|
err error
|
|
)
|
|
|
|
if kind == "" {
|
|
gvk, err = s.sharedCacheFactory.SharedClientFactory().GVKForResource(gvr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
gvk = gvr.GroupVersion().WithKind(kind)
|
|
}
|
|
|
|
cache, err := s.sharedCacheFactory.ForResourceKind(gvr, kind, namespaced)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rateLimiter, ok := s.kindRateLimiter[gvk]
|
|
if !ok {
|
|
rateLimiter = s.rateLimiter
|
|
}
|
|
|
|
starter := func(ctx context.Context) error {
|
|
return s.sharedCacheFactory.StartGVK(ctx, gvk)
|
|
}
|
|
|
|
c := New(gvk.String(), cache, starter, handler, &Options{
|
|
RateLimiter: rateLimiter,
|
|
SyncOnlyChangedObjects: s.syncOnlyChangedObjects,
|
|
})
|
|
|
|
return c, err
|
|
},
|
|
handler: handler,
|
|
client: client,
|
|
}
|
|
|
|
s.controllers[gvr] = controllerResult
|
|
return controllerResult
|
|
}
|
|
|
|
func (s *sharedControllerFactory) getWorkers(gvr schema.GroupVersionResource, workers int) (int, error) {
|
|
gvk, err := s.sharedCacheFactory.SharedClientFactory().GVKForResource(gvr)
|
|
if meta.IsNoMatchError(err) {
|
|
return workers, nil
|
|
} else if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
w, ok := s.kindWorkers[gvk]
|
|
if ok {
|
|
return w, nil
|
|
}
|
|
if workers > 0 {
|
|
return workers, nil
|
|
}
|
|
return s.workers, nil
|
|
}
|
|
|
|
func (s *sharedControllerFactory) byResource(gvr schema.GroupVersionResource) *sharedController {
|
|
s.controllerLock.RLock()
|
|
defer s.controllerLock.RUnlock()
|
|
return s.controllers[gvr]
|
|
}
|
|
|
|
func (s *sharedControllerFactory) SharedCacheFactory() cache.SharedCacheFactory {
|
|
return s.sharedCacheFactory
|
|
}
|