diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 8295b1e30..522768d1a 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -151,6 +151,12 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop if err := serviceExportController.SetupWithManager(mgr); err != nil { klog.Fatalf("Failed to setup ServiceExport controller: %v", err) } + + // Ensure the InformerManager stops when the stop channel closes + go func() { + <-stopChan + informermanager.StopInstance() + }() } func registerWithControlPlaneAPIServer(controlPlaneRestConfig *restclient.Config, memberClusterName string) error { diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 6a87a5986..688ad923c 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -118,7 +118,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop resourceDetector := &detector.ResourceDetector{ DiscoveryClientSet: discoverClientSet, Client: mgr.GetClient(), - InformerManager: informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0), + InformerManager: informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan), RESTMapper: mgr.GetRESTMapper(), DynamicClient: dynamicClientSet, SkippedResourceConfig: skippedResourceConfig, @@ -128,7 +128,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop klog.Fatalf("Failed to setup resource detector: %v", err) } - setupClusterAPIClusterDetector(mgr, opts) + setupClusterAPIClusterDetector(mgr, opts, stopChan) clusterController := &cluster.Controller{ Client: mgr.GetClient(), @@ -282,10 +282,16 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop if err := serviceImportController.SetupWithManager(mgr); err != nil { klog.Fatalf("Failed to setup ServiceImport controller: %v", err) } + + // Ensure the InformerManager stops when the stop channel closes + go func() { + <-stopChan + informermanager.StopInstance() + }() } // setupClusterAPIClusterDetector initialize Cluster detector with the cluster-api management cluster. -func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options.Options) { +func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) { if len(opts.ClusterAPIKubeconfig) == 0 { return } @@ -308,7 +314,7 @@ func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options ControllerPlaneConfig: mgr.GetConfig(), ClusterAPIConfig: clusterAPIRestConfig, ClusterAPIClient: clusterAPIClient, - InformerManager: informermanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0), + InformerManager: informermanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0, stopChan), } if err := mgr.Add(clusterAPIClusterDetector); err != nil { klog.Fatalf("Failed to setup cluster-api cluster detector: %v", err) diff --git a/pkg/clusterdiscovery/clusterapi/clusterapi.go b/pkg/clusterdiscovery/clusterapi/clusterapi.go index d15ed4900..e4b1f0ffe 100644 --- a/pkg/clusterdiscovery/clusterapi/clusterapi.go +++ b/pkg/clusterdiscovery/clusterapi/clusterapi.go @@ -69,7 +69,7 @@ func (d *ClusterDetector) discoveryCluster() { } } - d.InformerManager.Start(d.stopCh) + d.InformerManager.Start() } // OnAdd handles object add event and push the object to queue. diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index 58b4d2d77..579b7c3ab 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -175,8 +175,8 @@ func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1al return nil } - c.InformerManager.Start(cluster.Name, c.StopChan) - synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan) + c.InformerManager.Start(cluster.Name) + synced := c.InformerManager.WaitForCacheSync(cluster.Name) if synced == nil { klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name) return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) diff --git a/pkg/controllers/status/cluster_status_controller.go b/pkg/controllers/status/cluster_status_controller.go index fcf4fb997..1e3c1f9e3 100644 --- a/pkg/controllers/status/cluster_status_controller.go +++ b/pkg/controllers/status/cluster_status_controller.go @@ -79,13 +79,13 @@ type ClusterStatusController struct { // The Controller will requeue the Request to be processed again if an error is non-nil or // Result.Requeue is true, otherwise upon completion it will requeue the reconcile key after the duration. func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { - klog.V(4).Infof("Syncing cluster status: %s", req.NamespacedName.String()) + klog.V(4).Infof("Syncing cluster status: %s", req.NamespacedName.Name) cluster := &v1alpha1.Cluster{} if err := c.Client.Get(context.TODO(), req.NamespacedName, cluster); err != nil { // The resource may no longer exist, in which case we stop the informer. if errors.IsNotFound(err) { - // TODO(Garrybest): stop the informer and delete the cluster manager + c.InformerManager.Stop(req.NamespacedName.Name) return controllerruntime.Result{}, nil } @@ -227,8 +227,8 @@ func (c *ClusterStatusController) buildInformerForCluster(cluster *v1alpha1.Clus return singleClusterInformerManager, nil } - c.InformerManager.Start(cluster.Name, c.StopChan) - synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan) + c.InformerManager.Start(cluster.Name) + synced := c.InformerManager.WaitForCacheSync(cluster.Name) if synced == nil { klog.Errorf("The informer factory for cluster(%s) does not exist.", cluster.Name) return nil, fmt.Errorf("informer factory for cluster(%s) does not exist", cluster.Name) diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index a264455b5..9b90c196f 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -366,8 +366,8 @@ func (c *WorkStatusController) registerInformersAndStart(cluster *v1alpha1.Clust return nil } - c.InformerManager.Start(cluster.Name, c.StopChan) - synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan) + c.InformerManager.Start(cluster.Name) + synced := c.InformerManager.WaitForCacheSync(cluster.Name) if synced == nil { klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name) return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) diff --git a/pkg/util/context.go b/pkg/util/context.go new file mode 100644 index 000000000..65c5d381d --- /dev/null +++ b/pkg/util/context.go @@ -0,0 +1,22 @@ +package util + +import "context" + +// ContextForChannel derives a child context from a parent channel. +// +// The derived context's Done channel is closed when the returned cancel function +// is called or when the parent channel is closed, whichever happens first. +// +// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked. +func ContextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + select { + case <-parentCh: + cancel() + case <-ctx.Done(): + } + }() + return ctx, cancel +} diff --git a/pkg/util/detector/detector.go b/pkg/util/detector/detector.go index cc91cbad0..2e5795aa7 100644 --- a/pkg/util/detector/detector.go +++ b/pkg/util/detector/detector.go @@ -156,7 +156,7 @@ func (d *ResourceDetector) discoverResources(period time.Duration) { klog.Infof("Setup informer for %s", r.String()) d.InformerManager.ForResource(r, d.EventHandler) } - d.InformerManager.Start(d.stopCh) + d.InformerManager.Start() }, period, d.stopCh) } diff --git a/pkg/util/informermanager/multi-cluster-manager.go b/pkg/util/informermanager/multi-cluster-manager.go index b6eae3450..47e437b99 100644 --- a/pkg/util/informermanager/multi-cluster-manager.go +++ b/pkg/util/informermanager/multi-cluster-manager.go @@ -11,20 +11,32 @@ import ( var ( instance MultiClusterInformerManager once sync.Once + stopCh chan struct{} ) +func init() { + stopCh = make(chan struct{}) +} + // GetInstance returns a shared MultiClusterInformerManager instance. func GetInstance() MultiClusterInformerManager { once.Do(func() { - instance = NewMultiClusterInformerManager() + instance = NewMultiClusterInformerManager(stopCh) }) return instance } +// StopInstance will stop the shared MultiClusterInformerManager instance. +func StopInstance() { + once.Do(func() { + close(stopCh) + }) +} + // MultiClusterInformerManager manages dynamic shared informer for all resources, include Kubernetes resource and // custom resources defined by CustomResourceDefinition, across multi-cluster. type MultiClusterInformerManager interface { - // ForCluster builds a informer manager for a specific cluster. + // ForCluster builds an informer manager for a specific cluster. ForCluster(cluster string, client dynamic.Interface, defaultResync time.Duration) SingleClusterInformerManager // GetSingleClusterManager gets the informer manager for a specific cluster. @@ -34,24 +46,29 @@ type MultiClusterInformerManager interface { // IsManagerExist checks if the informer manager for the cluster already created. IsManagerExist(cluster string) bool - // Start will run all informers for a specific cluster, it accepts a stop channel, the informers will keep running until channel closed. + // Start will run all informers for a specific cluster. // Should call after 'ForCluster', otherwise no-ops. - Start(cluster string, stopCh <-chan struct{}) + Start(cluster string) + + // Stop will stop all informers for a specific cluster, and delete the cluster from informer managers. + Stop(cluster string) // WaitForCacheSync waits for all caches to populate. // Should call after 'ForCluster', otherwise no-ops. - WaitForCacheSync(cluster string, stopCh <-chan struct{}) map[schema.GroupVersionResource]bool + WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool } // NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl. -func NewMultiClusterInformerManager() MultiClusterInformerManager { +func NewMultiClusterInformerManager(stopCh <-chan struct{}) MultiClusterInformerManager { return &multiClusterInformerManagerImpl{ managers: make(map[string]SingleClusterInformerManager), + stopCh: stopCh, } } type multiClusterInformerManagerImpl struct { managers map[string]SingleClusterInformerManager + stopCh <-chan struct{} lock sync.RWMutex } @@ -70,7 +87,7 @@ func (m *multiClusterInformerManagerImpl) ForCluster(cluster string, client dyna m.lock.Lock() defer m.lock.Unlock() - manager := NewSingleClusterInformerManager(client, defaultResync) + manager := NewSingleClusterInformerManager(client, defaultResync, m.stopCh) m.managers[cluster] = manager return manager } @@ -87,19 +104,30 @@ func (m *multiClusterInformerManagerImpl) IsManagerExist(cluster string) bool { return exist } -func (m *multiClusterInformerManagerImpl) Start(cluster string, stopCh <-chan struct{}) { +func (m *multiClusterInformerManagerImpl) Start(cluster string) { // if informer manager haven't been created, just return with no-ops. manager, exist := m.getManager(cluster) if !exist { return } - manager.Start(stopCh) + manager.Start() } -func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string, stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { +func (m *multiClusterInformerManagerImpl) Stop(cluster string) { + manager, exist := m.getManager(cluster) + if !exist { + return + } + manager.Stop() + m.lock.Lock() + defer m.lock.Unlock() + delete(m.managers, cluster) +} + +func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool { manager, exist := m.getManager(cluster) if !exist { return nil } - return manager.WaitForCacheSync(stopCh) + return manager.WaitForCacheSync() } diff --git a/pkg/util/informermanager/single-cluster-manager.go b/pkg/util/informermanager/single-cluster-manager.go index d708fb4e9..7af9f7d29 100644 --- a/pkg/util/informermanager/single-cluster-manager.go +++ b/pkg/util/informermanager/single-cluster-manager.go @@ -1,6 +1,7 @@ package informermanager import ( + "context" "sync" "time" @@ -8,6 +9,8 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" + + "github.com/karmada-io/karmada/pkg/util" ) // SingleClusterInformerManager manages dynamic shared informer for all resources, include Kubernetes resource and @@ -32,25 +35,35 @@ type SingleClusterInformerManager interface { // The informer for 'resource' will be created if not exist, but without any event handler. Lister(resource schema.GroupVersionResource) cache.GenericLister - // Start will run all informers with a stop channel, the informers will keep running until channel closed. + // Start will run all informers, the informers will keep running until the channel closed. // It is intended to be called after create new informer(s), and it's safe to call multi times. - Start(stopCh <-chan struct{}) + Start() + + // Stop stops all single cluster informers of a cluster. Once it is stopped, it will be not able + // to Start again. + Stop() // WaitForCacheSync waits for all caches to populate. - WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool + WaitForCacheSync() map[schema.GroupVersionResource]bool } // NewSingleClusterInformerManager constructs a new instance of singleClusterInformerManagerImpl. // defaultResync with value '0' means no re-sync. -func NewSingleClusterInformerManager(client dynamic.Interface, defaultResync time.Duration) SingleClusterInformerManager { +func NewSingleClusterInformerManager(client dynamic.Interface, defaultResync time.Duration, parentCh <-chan struct{}) SingleClusterInformerManager { + ctx, cancel := util.ContextForChannel(parentCh) return &singleClusterInformerManagerImpl{ informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, defaultResync), handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler), syncedInformers: make(map[schema.GroupVersionResource]struct{}), + ctx: ctx, + cancel: cancel, } } type singleClusterInformerManagerImpl struct { + ctx context.Context + cancel context.CancelFunc + informerFactory dynamicinformer.DynamicSharedInformerFactory syncedInformers map[schema.GroupVersionResource]struct{} @@ -111,14 +124,18 @@ func (s *singleClusterInformerManagerImpl) appendHandler(resource schema.GroupVe s.handlers[resource] = append(handlers, handler) } -func (s *singleClusterInformerManagerImpl) Start(stopCh <-chan struct{}) { - s.informerFactory.Start(stopCh) +func (s *singleClusterInformerManagerImpl) Start() { + s.informerFactory.Start(s.ctx.Done()) } -func (s *singleClusterInformerManagerImpl) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { +func (s *singleClusterInformerManagerImpl) Stop() { + s.cancel() +} + +func (s *singleClusterInformerManagerImpl) WaitForCacheSync() map[schema.GroupVersionResource]bool { s.lock.Lock() defer s.lock.Unlock() - res := s.informerFactory.WaitForCacheSync(stopCh) + res := s.informerFactory.WaitForCacheSync(s.ctx.Done()) for resource, synced := range res { if _, exist := s.syncedInformers[resource]; !exist && synced { s.syncedInformers[resource] = struct{}{}