Merge pull request #590 from Garrybest/informer-manager

informer-manager: stop the informer before deleting the cluster
This commit is contained in:
karmada-bot 2021-08-16 19:48:25 +08:00 committed by GitHub
commit 67ded1f9ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 112 additions and 33 deletions

View File

@ -151,6 +151,12 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
if err := serviceExportController.SetupWithManager(mgr); err != nil { if err := serviceExportController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup ServiceExport controller: %v", err) klog.Fatalf("Failed to setup ServiceExport controller: %v", err)
} }
// Ensure the InformerManager stops when the stop channel closes
go func() {
<-stopChan
informermanager.StopInstance()
}()
} }
func registerWithControlPlaneAPIServer(controlPlaneRestConfig *restclient.Config, memberClusterName string) error { func registerWithControlPlaneAPIServer(controlPlaneRestConfig *restclient.Config, memberClusterName string) error {

View File

@ -118,7 +118,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
resourceDetector := &detector.ResourceDetector{ resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet, DiscoveryClientSet: discoverClientSet,
Client: mgr.GetClient(), Client: mgr.GetClient(),
InformerManager: informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0), InformerManager: informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan),
RESTMapper: mgr.GetRESTMapper(), RESTMapper: mgr.GetRESTMapper(),
DynamicClient: dynamicClientSet, DynamicClient: dynamicClientSet,
SkippedResourceConfig: skippedResourceConfig, SkippedResourceConfig: skippedResourceConfig,
@ -128,7 +128,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
klog.Fatalf("Failed to setup resource detector: %v", err) klog.Fatalf("Failed to setup resource detector: %v", err)
} }
setupClusterAPIClusterDetector(mgr, opts) setupClusterAPIClusterDetector(mgr, opts, stopChan)
clusterController := &cluster.Controller{ clusterController := &cluster.Controller{
Client: mgr.GetClient(), Client: mgr.GetClient(),
@ -282,10 +282,16 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
if err := serviceImportController.SetupWithManager(mgr); err != nil { if err := serviceImportController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup ServiceImport controller: %v", err) klog.Fatalf("Failed to setup ServiceImport controller: %v", err)
} }
// Ensure the InformerManager stops when the stop channel closes
go func() {
<-stopChan
informermanager.StopInstance()
}()
} }
// setupClusterAPIClusterDetector initialize Cluster detector with the cluster-api management cluster. // setupClusterAPIClusterDetector initialize Cluster detector with the cluster-api management cluster.
func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options.Options) { func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
if len(opts.ClusterAPIKubeconfig) == 0 { if len(opts.ClusterAPIKubeconfig) == 0 {
return return
} }
@ -308,7 +314,7 @@ func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options
ControllerPlaneConfig: mgr.GetConfig(), ControllerPlaneConfig: mgr.GetConfig(),
ClusterAPIConfig: clusterAPIRestConfig, ClusterAPIConfig: clusterAPIRestConfig,
ClusterAPIClient: clusterAPIClient, ClusterAPIClient: clusterAPIClient,
InformerManager: informermanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0), InformerManager: informermanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0, stopChan),
} }
if err := mgr.Add(clusterAPIClusterDetector); err != nil { if err := mgr.Add(clusterAPIClusterDetector); err != nil {
klog.Fatalf("Failed to setup cluster-api cluster detector: %v", err) klog.Fatalf("Failed to setup cluster-api cluster detector: %v", err)

View File

@ -69,7 +69,7 @@ func (d *ClusterDetector) discoveryCluster() {
} }
} }
d.InformerManager.Start(d.stopCh) d.InformerManager.Start()
} }
// OnAdd handles object add event and push the object to queue. // OnAdd handles object add event and push the object to queue.

View File

@ -175,8 +175,8 @@ func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1al
return nil return nil
} }
c.InformerManager.Start(cluster.Name, c.StopChan) c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan) synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil { if synced == nil {
klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name) klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name)
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)

View File

@ -79,13 +79,13 @@ type ClusterStatusController struct {
// The Controller will requeue the Request to be processed again if an error is non-nil or // The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will requeue the reconcile key after the duration. // Result.Requeue is true, otherwise upon completion it will requeue the reconcile key after the duration.
func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Syncing cluster status: %s", req.NamespacedName.String()) klog.V(4).Infof("Syncing cluster status: %s", req.NamespacedName.Name)
cluster := &v1alpha1.Cluster{} cluster := &v1alpha1.Cluster{}
if err := c.Client.Get(context.TODO(), req.NamespacedName, cluster); err != nil { if err := c.Client.Get(context.TODO(), req.NamespacedName, cluster); err != nil {
// The resource may no longer exist, in which case we stop the informer. // The resource may no longer exist, in which case we stop the informer.
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
// TODO(Garrybest): stop the informer and delete the cluster manager c.InformerManager.Stop(req.NamespacedName.Name)
return controllerruntime.Result{}, nil return controllerruntime.Result{}, nil
} }
@ -227,8 +227,8 @@ func (c *ClusterStatusController) buildInformerForCluster(cluster *v1alpha1.Clus
return singleClusterInformerManager, nil return singleClusterInformerManager, nil
} }
c.InformerManager.Start(cluster.Name, c.StopChan) c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan) synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil { if synced == nil {
klog.Errorf("The informer factory for cluster(%s) does not exist.", cluster.Name) klog.Errorf("The informer factory for cluster(%s) does not exist.", cluster.Name)
return nil, fmt.Errorf("informer factory for cluster(%s) does not exist", cluster.Name) return nil, fmt.Errorf("informer factory for cluster(%s) does not exist", cluster.Name)

View File

@ -366,8 +366,8 @@ func (c *WorkStatusController) registerInformersAndStart(cluster *v1alpha1.Clust
return nil return nil
} }
c.InformerManager.Start(cluster.Name, c.StopChan) c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan) synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil { if synced == nil {
klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name) klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name)
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)

22
pkg/util/context.go Normal file
View File

@ -0,0 +1,22 @@
package util
import "context"
// ContextForChannel derives a child context from a parent channel.
//
// The derived context's Done channel is closed when the returned cancel function
// is called or when the parent channel is closed, whichever happens first.
//
// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.
func ContextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-parentCh:
cancel()
case <-ctx.Done():
}
}()
return ctx, cancel
}

View File

@ -156,7 +156,7 @@ func (d *ResourceDetector) discoverResources(period time.Duration) {
klog.Infof("Setup informer for %s", r.String()) klog.Infof("Setup informer for %s", r.String())
d.InformerManager.ForResource(r, d.EventHandler) d.InformerManager.ForResource(r, d.EventHandler)
} }
d.InformerManager.Start(d.stopCh) d.InformerManager.Start()
}, period, d.stopCh) }, period, d.stopCh)
} }

View File

@ -11,20 +11,32 @@ import (
var ( var (
instance MultiClusterInformerManager instance MultiClusterInformerManager
once sync.Once once sync.Once
stopCh chan struct{}
) )
func init() {
stopCh = make(chan struct{})
}
// GetInstance returns a shared MultiClusterInformerManager instance. // GetInstance returns a shared MultiClusterInformerManager instance.
func GetInstance() MultiClusterInformerManager { func GetInstance() MultiClusterInformerManager {
once.Do(func() { once.Do(func() {
instance = NewMultiClusterInformerManager() instance = NewMultiClusterInformerManager(stopCh)
}) })
return instance return instance
} }
// StopInstance will stop the shared MultiClusterInformerManager instance.
func StopInstance() {
once.Do(func() {
close(stopCh)
})
}
// MultiClusterInformerManager manages dynamic shared informer for all resources, include Kubernetes resource and // MultiClusterInformerManager manages dynamic shared informer for all resources, include Kubernetes resource and
// custom resources defined by CustomResourceDefinition, across multi-cluster. // custom resources defined by CustomResourceDefinition, across multi-cluster.
type MultiClusterInformerManager interface { type MultiClusterInformerManager interface {
// ForCluster builds a informer manager for a specific cluster. // ForCluster builds an informer manager for a specific cluster.
ForCluster(cluster string, client dynamic.Interface, defaultResync time.Duration) SingleClusterInformerManager ForCluster(cluster string, client dynamic.Interface, defaultResync time.Duration) SingleClusterInformerManager
// GetSingleClusterManager gets the informer manager for a specific cluster. // GetSingleClusterManager gets the informer manager for a specific cluster.
@ -34,24 +46,29 @@ type MultiClusterInformerManager interface {
// IsManagerExist checks if the informer manager for the cluster already created. // IsManagerExist checks if the informer manager for the cluster already created.
IsManagerExist(cluster string) bool IsManagerExist(cluster string) bool
// Start will run all informers for a specific cluster, it accepts a stop channel, the informers will keep running until channel closed. // Start will run all informers for a specific cluster.
// Should call after 'ForCluster', otherwise no-ops. // Should call after 'ForCluster', otherwise no-ops.
Start(cluster string, stopCh <-chan struct{}) Start(cluster string)
// Stop will stop all informers for a specific cluster, and delete the cluster from informer managers.
Stop(cluster string)
// WaitForCacheSync waits for all caches to populate. // WaitForCacheSync waits for all caches to populate.
// Should call after 'ForCluster', otherwise no-ops. // Should call after 'ForCluster', otherwise no-ops.
WaitForCacheSync(cluster string, stopCh <-chan struct{}) map[schema.GroupVersionResource]bool WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool
} }
// NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl. // NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl.
func NewMultiClusterInformerManager() MultiClusterInformerManager { func NewMultiClusterInformerManager(stopCh <-chan struct{}) MultiClusterInformerManager {
return &multiClusterInformerManagerImpl{ return &multiClusterInformerManagerImpl{
managers: make(map[string]SingleClusterInformerManager), managers: make(map[string]SingleClusterInformerManager),
stopCh: stopCh,
} }
} }
type multiClusterInformerManagerImpl struct { type multiClusterInformerManagerImpl struct {
managers map[string]SingleClusterInformerManager managers map[string]SingleClusterInformerManager
stopCh <-chan struct{}
lock sync.RWMutex lock sync.RWMutex
} }
@ -70,7 +87,7 @@ func (m *multiClusterInformerManagerImpl) ForCluster(cluster string, client dyna
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
manager := NewSingleClusterInformerManager(client, defaultResync) manager := NewSingleClusterInformerManager(client, defaultResync, m.stopCh)
m.managers[cluster] = manager m.managers[cluster] = manager
return manager return manager
} }
@ -87,19 +104,30 @@ func (m *multiClusterInformerManagerImpl) IsManagerExist(cluster string) bool {
return exist return exist
} }
func (m *multiClusterInformerManagerImpl) Start(cluster string, stopCh <-chan struct{}) { func (m *multiClusterInformerManagerImpl) Start(cluster string) {
// if informer manager haven't been created, just return with no-ops. // if informer manager haven't been created, just return with no-ops.
manager, exist := m.getManager(cluster) manager, exist := m.getManager(cluster)
if !exist { if !exist {
return return
} }
manager.Start(stopCh) manager.Start()
} }
func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string, stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { func (m *multiClusterInformerManagerImpl) Stop(cluster string) {
manager, exist := m.getManager(cluster)
if !exist {
return
}
manager.Stop()
m.lock.Lock()
defer m.lock.Unlock()
delete(m.managers, cluster)
}
func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool {
manager, exist := m.getManager(cluster) manager, exist := m.getManager(cluster)
if !exist { if !exist {
return nil return nil
} }
return manager.WaitForCacheSync(stopCh) return manager.WaitForCacheSync()
} }

View File

@ -1,6 +1,7 @@
package informermanager package informermanager
import ( import (
"context"
"sync" "sync"
"time" "time"
@ -8,6 +9,8 @@ import (
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"github.com/karmada-io/karmada/pkg/util"
) )
// SingleClusterInformerManager manages dynamic shared informer for all resources, include Kubernetes resource and // SingleClusterInformerManager manages dynamic shared informer for all resources, include Kubernetes resource and
@ -32,25 +35,35 @@ type SingleClusterInformerManager interface {
// The informer for 'resource' will be created if not exist, but without any event handler. // The informer for 'resource' will be created if not exist, but without any event handler.
Lister(resource schema.GroupVersionResource) cache.GenericLister Lister(resource schema.GroupVersionResource) cache.GenericLister
// Start will run all informers with a stop channel, the informers will keep running until channel closed. // Start will run all informers, the informers will keep running until the channel closed.
// It is intended to be called after create new informer(s), and it's safe to call multi times. // It is intended to be called after create new informer(s), and it's safe to call multi times.
Start(stopCh <-chan struct{}) Start()
// Stop stops all single cluster informers of a cluster. Once it is stopped, it will be not able
// to Start again.
Stop()
// WaitForCacheSync waits for all caches to populate. // WaitForCacheSync waits for all caches to populate.
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool WaitForCacheSync() map[schema.GroupVersionResource]bool
} }
// NewSingleClusterInformerManager constructs a new instance of singleClusterInformerManagerImpl. // NewSingleClusterInformerManager constructs a new instance of singleClusterInformerManagerImpl.
// defaultResync with value '0' means no re-sync. // defaultResync with value '0' means no re-sync.
func NewSingleClusterInformerManager(client dynamic.Interface, defaultResync time.Duration) SingleClusterInformerManager { func NewSingleClusterInformerManager(client dynamic.Interface, defaultResync time.Duration, parentCh <-chan struct{}) SingleClusterInformerManager {
ctx, cancel := util.ContextForChannel(parentCh)
return &singleClusterInformerManagerImpl{ return &singleClusterInformerManagerImpl{
informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, defaultResync), informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, defaultResync),
handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler), handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler),
syncedInformers: make(map[schema.GroupVersionResource]struct{}), syncedInformers: make(map[schema.GroupVersionResource]struct{}),
ctx: ctx,
cancel: cancel,
} }
} }
type singleClusterInformerManagerImpl struct { type singleClusterInformerManagerImpl struct {
ctx context.Context
cancel context.CancelFunc
informerFactory dynamicinformer.DynamicSharedInformerFactory informerFactory dynamicinformer.DynamicSharedInformerFactory
syncedInformers map[schema.GroupVersionResource]struct{} syncedInformers map[schema.GroupVersionResource]struct{}
@ -111,14 +124,18 @@ func (s *singleClusterInformerManagerImpl) appendHandler(resource schema.GroupVe
s.handlers[resource] = append(handlers, handler) s.handlers[resource] = append(handlers, handler)
} }
func (s *singleClusterInformerManagerImpl) Start(stopCh <-chan struct{}) { func (s *singleClusterInformerManagerImpl) Start() {
s.informerFactory.Start(stopCh) s.informerFactory.Start(s.ctx.Done())
} }
func (s *singleClusterInformerManagerImpl) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { func (s *singleClusterInformerManagerImpl) Stop() {
s.cancel()
}
func (s *singleClusterInformerManagerImpl) WaitForCacheSync() map[schema.GroupVersionResource]bool {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
res := s.informerFactory.WaitForCacheSync(stopCh) res := s.informerFactory.WaitForCacheSync(s.ctx.Done())
for resource, synced := range res { for resource, synced := range res {
if _, exist := s.syncedInformers[resource]; !exist && synced { if _, exist := s.syncedInformers[resource]; !exist && synced {
s.syncedInformers[resource] = struct{}{} s.syncedInformers[resource] = struct{}{}