diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 92124f700..0480123e2 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -95,7 +95,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()), EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), PredicateFunc: helper.NewClusterPredicateByAgent(opts.ClusterName), - InformerManager: informermanager.NewMultiClusterInformerManager(), + InformerManager: informermanager.GetInstance(), StopChan: stopChan, ClusterClientSetFunc: util.NewClusterClientSetForAgent, ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, @@ -124,7 +124,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop Client: mgr.GetClient(), EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName), RESTMapper: mgr.GetRESTMapper(), - InformerManager: informermanager.NewMultiClusterInformerManager(), + InformerManager: informermanager.GetInstance(), StopChan: stopChan, WorkerNumber: 1, ObjectWatcher: objectWatcher, @@ -140,7 +140,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop Client: mgr.GetClient(), EventRecorder: mgr.GetEventRecorderFor(mcs.ControllerName), RESTMapper: mgr.GetRESTMapper(), - InformerManager: informermanager.NewMultiClusterInformerManager(), + InformerManager: informermanager.GetInstance(), StopChan: stopChan, WorkerNumber: 1, PredicateFunc: helper.NewPredicateForServiceExportControllerByAgent(opts.ClusterName), diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index a0b86da56..19ed4b902 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -151,7 +151,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()), EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), PredicateFunc: clusterPredicateFunc, - InformerManager: informermanager.NewMultiClusterInformerManager(), + InformerManager: informermanager.GetInstance(), StopChan: stopChan, ClusterClientSetFunc: util.NewClusterClientSet, ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, @@ -218,7 +218,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop Client: mgr.GetClient(), EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName), RESTMapper: mgr.GetRESTMapper(), - InformerManager: informermanager.NewMultiClusterInformerManager(), + InformerManager: informermanager.GetInstance(), StopChan: stopChan, WorkerNumber: 1, ObjectWatcher: objectWatcher, @@ -242,7 +242,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop Client: mgr.GetClient(), EventRecorder: mgr.GetEventRecorderFor(mcs.ControllerName), RESTMapper: mgr.GetRESTMapper(), - InformerManager: informermanager.NewMultiClusterInformerManager(), + InformerManager: informermanager.GetInstance(), StopChan: stopChan, WorkerNumber: 1, PredicateFunc: helper.NewPredicateForServiceExportController(mgr), diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index 317303964..d9e2a679a 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -163,8 +163,16 @@ func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1al serviceExportGVR, endpointSliceGVR, } + + allSynced := true for _, gvr := range gvrTargets { - singleClusterInformerManager.ForResource(gvr, c.getEventHandler(cluster.Name)) + if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, c.getEventHandler(cluster.Name)) { + allSynced = false + singleClusterInformerManager.ForResource(gvr, c.getEventHandler(cluster.Name)) + } + } + if allSynced { + return nil } c.InformerManager.Start(cluster.Name, c.StopChan) diff --git a/pkg/controllers/status/cluster_status_controller.go b/pkg/controllers/status/cluster_status_controller.go index 0e3a43cea..a2abd096a 100644 --- a/pkg/controllers/status/cluster_status_controller.go +++ b/pkg/controllers/status/cluster_status_controller.go @@ -208,25 +208,27 @@ func (c *ClusterStatusController) updateStatusIfNeeded(cluster *v1alpha1.Cluster // and pod and start it. If the informer manager exist, return it. func (c *ClusterStatusController) buildInformerForCluster(cluster *v1alpha1.Cluster) (informermanager.SingleClusterInformerManager, error) { singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) - if singleClusterInformerManager != nil { - return singleClusterInformerManager, nil + if singleClusterInformerManager == nil { + clusterClient, err := c.ClusterDynamicClientSetFunc(cluster, c.Client) + if err != nil { + klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) + return nil, err + } + singleClusterInformerManager = c.InformerManager.ForCluster(clusterClient.ClusterName, clusterClient.DynamicClientSet, 0) } - clusterClient, err := c.ClusterDynamicClientSetFunc(cluster, c.Client) - if err != nil { - klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) - return nil, err - } - singleClusterInformerManager = c.InformerManager.ForCluster(clusterClient.ClusterName, clusterClient.DynamicClientSet, 0) - - gvrs := []schema.GroupVersionResource{ - nodeGVR, - podGVR, - } + gvrs := []schema.GroupVersionResource{nodeGVR, podGVR} // create the informer for pods and nodes + allSynced := true for _, gvr := range gvrs { - singleClusterInformerManager.Lister(gvr) + if !singleClusterInformerManager.IsInformerSynced(gvr) { + allSynced = false + singleClusterInformerManager.Lister(gvr) + } + } + if allSynced { + return singleClusterInformerManager, nil } c.InformerManager.Start(cluster.Name, c.StopChan) diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index faf0fe345..831ce84ce 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -348,8 +348,15 @@ func (c *WorkStatusController) registerInformersAndStart(cluster *v1alpha1.Clust return err } + allSynced := true for gvr := range gvrTargets { - singleClusterInformerManager.ForResource(gvr, c.getEventHandler()) + if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, c.getEventHandler()) { + allSynced = false + singleClusterInformerManager.ForResource(gvr, c.getEventHandler()) + } + } + if allSynced { + return nil } c.InformerManager.Start(cluster.Name, c.StopChan) diff --git a/pkg/util/informermanager/multi-cluster-manager.go b/pkg/util/informermanager/multi-cluster-manager.go index e2fbd76b1..417d91994 100644 --- a/pkg/util/informermanager/multi-cluster-manager.go +++ b/pkg/util/informermanager/multi-cluster-manager.go @@ -8,6 +8,19 @@ import ( "k8s.io/client-go/dynamic" ) +var ( + instance MultiClusterInformerManager + once sync.Once +) + +// GetInstance returns a shared MultiClusterInformerManager instance. +func GetInstance() MultiClusterInformerManager { + once.Do(func() { + instance = NewMultiClusterInformerManager() + }) + return instance +} + // MultiClusterInformerManager manages dynamic shared informer for all resources, include Kubernetes resource and // custom resources defined by CustomResourceDefinition, across multi-cluster. type MultiClusterInformerManager interface { diff --git a/pkg/util/informermanager/single-cluster-manager.go b/pkg/util/informermanager/single-cluster-manager.go index d104fe219..d708fb4e9 100644 --- a/pkg/util/informermanager/single-cluster-manager.go +++ b/pkg/util/informermanager/single-cluster-manager.go @@ -18,6 +18,13 @@ type SingleClusterInformerManager interface { // The handler should not be nil. ForResource(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) + // IsInformerSynced checks if the resource's informer is synced. + // A informer is synced means: + // - The informer has been created(by method 'ForResource' or 'Lister'). + // - The informer has started(by method 'Start'). + // - The informer's cache has been synced. + IsInformerSynced(resource schema.GroupVersionResource) bool + // IsHandlerExist checks if handler already added to a the informer that watches the 'resource'. IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool @@ -39,12 +46,15 @@ func NewSingleClusterInformerManager(client dynamic.Interface, defaultResync tim return &singleClusterInformerManagerImpl{ informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, defaultResync), handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler), + syncedInformers: make(map[schema.GroupVersionResource]struct{}), } } type singleClusterInformerManagerImpl struct { informerFactory dynamicinformer.DynamicSharedInformerFactory + syncedInformers map[schema.GroupVersionResource]struct{} + handlers map[schema.GroupVersionResource][]cache.ResourceEventHandler lock sync.RWMutex @@ -60,6 +70,14 @@ func (s *singleClusterInformerManagerImpl) ForResource(resource schema.GroupVers s.appendHandler(resource, handler) } +func (s *singleClusterInformerManagerImpl) IsInformerSynced(resource schema.GroupVersionResource) bool { + s.lock.RLock() + defer s.lock.RUnlock() + + _, exist := s.syncedInformers[resource] + return exist +} + func (s *singleClusterInformerManagerImpl) IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool { s.lock.RLock() defer s.lock.RUnlock() @@ -98,5 +116,13 @@ func (s *singleClusterInformerManagerImpl) Start(stopCh <-chan struct{}) { } func (s *singleClusterInformerManagerImpl) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { - return s.informerFactory.WaitForCacheSync(stopCh) + s.lock.Lock() + defer s.lock.Unlock() + res := s.informerFactory.WaitForCacheSync(stopCh) + for resource, synced := range res { + if _, exist := s.syncedInformers[resource]; !exist && synced { + s.syncedInformers[resource] = struct{}{} + } + } + return res }