Merge pull request #490 from Garrybest/informer-manager

share informer manager in global
This commit is contained in:
karmada-bot 2021-07-06 09:36:23 +08:00 committed by GitHub
commit 30c22865aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 79 additions and 23 deletions

View File

@ -95,7 +95,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()), KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()),
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
PredicateFunc: helper.NewClusterPredicateByAgent(opts.ClusterName), PredicateFunc: helper.NewClusterPredicateByAgent(opts.ClusterName),
InformerManager: informermanager.NewMultiClusterInformerManager(), InformerManager: informermanager.GetInstance(),
StopChan: stopChan, StopChan: stopChan,
ClusterClientSetFunc: util.NewClusterClientSetForAgent, ClusterClientSetFunc: util.NewClusterClientSetForAgent,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
@ -124,7 +124,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
Client: mgr.GetClient(), Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName), EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: mgr.GetRESTMapper(), RESTMapper: mgr.GetRESTMapper(),
InformerManager: informermanager.NewMultiClusterInformerManager(), InformerManager: informermanager.GetInstance(),
StopChan: stopChan, StopChan: stopChan,
WorkerNumber: 1, WorkerNumber: 1,
ObjectWatcher: objectWatcher, ObjectWatcher: objectWatcher,
@ -140,7 +140,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
Client: mgr.GetClient(), Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.ControllerName), EventRecorder: mgr.GetEventRecorderFor(mcs.ControllerName),
RESTMapper: mgr.GetRESTMapper(), RESTMapper: mgr.GetRESTMapper(),
InformerManager: informermanager.NewMultiClusterInformerManager(), InformerManager: informermanager.GetInstance(),
StopChan: stopChan, StopChan: stopChan,
WorkerNumber: 1, WorkerNumber: 1,
PredicateFunc: helper.NewPredicateForServiceExportControllerByAgent(opts.ClusterName), PredicateFunc: helper.NewPredicateForServiceExportControllerByAgent(opts.ClusterName),

View File

@ -151,7 +151,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()), KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()),
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
PredicateFunc: clusterPredicateFunc, PredicateFunc: clusterPredicateFunc,
InformerManager: informermanager.NewMultiClusterInformerManager(), InformerManager: informermanager.GetInstance(),
StopChan: stopChan, StopChan: stopChan,
ClusterClientSetFunc: util.NewClusterClientSet, ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
@ -218,7 +218,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
Client: mgr.GetClient(), Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName), EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: mgr.GetRESTMapper(), RESTMapper: mgr.GetRESTMapper(),
InformerManager: informermanager.NewMultiClusterInformerManager(), InformerManager: informermanager.GetInstance(),
StopChan: stopChan, StopChan: stopChan,
WorkerNumber: 1, WorkerNumber: 1,
ObjectWatcher: objectWatcher, ObjectWatcher: objectWatcher,
@ -242,7 +242,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
Client: mgr.GetClient(), Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.ControllerName), EventRecorder: mgr.GetEventRecorderFor(mcs.ControllerName),
RESTMapper: mgr.GetRESTMapper(), RESTMapper: mgr.GetRESTMapper(),
InformerManager: informermanager.NewMultiClusterInformerManager(), InformerManager: informermanager.GetInstance(),
StopChan: stopChan, StopChan: stopChan,
WorkerNumber: 1, WorkerNumber: 1,
PredicateFunc: helper.NewPredicateForServiceExportController(mgr), PredicateFunc: helper.NewPredicateForServiceExportController(mgr),

View File

@ -163,9 +163,17 @@ func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1al
serviceExportGVR, serviceExportGVR,
endpointSliceGVR, endpointSliceGVR,
} }
allSynced := true
for _, gvr := range gvrTargets { for _, gvr := range gvrTargets {
if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, c.getEventHandler(cluster.Name)) {
allSynced = false
singleClusterInformerManager.ForResource(gvr, c.getEventHandler(cluster.Name)) singleClusterInformerManager.ForResource(gvr, c.getEventHandler(cluster.Name))
} }
}
if allSynced {
return nil
}
c.InformerManager.Start(cluster.Name, c.StopChan) c.InformerManager.Start(cluster.Name, c.StopChan)
synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan) synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan)

View File

@ -208,26 +208,28 @@ func (c *ClusterStatusController) updateStatusIfNeeded(cluster *v1alpha1.Cluster
// and pod and start it. If the informer manager exist, return it. // and pod and start it. If the informer manager exist, return it.
func (c *ClusterStatusController) buildInformerForCluster(cluster *v1alpha1.Cluster) (informermanager.SingleClusterInformerManager, error) { func (c *ClusterStatusController) buildInformerForCluster(cluster *v1alpha1.Cluster) (informermanager.SingleClusterInformerManager, error) {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager != nil { if singleClusterInformerManager == nil {
return singleClusterInformerManager, nil
}
clusterClient, err := c.ClusterDynamicClientSetFunc(cluster, c.Client) clusterClient, err := c.ClusterDynamicClientSetFunc(cluster, c.Client)
if err != nil { if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return nil, err return nil, err
} }
singleClusterInformerManager = c.InformerManager.ForCluster(clusterClient.ClusterName, clusterClient.DynamicClientSet, 0) singleClusterInformerManager = c.InformerManager.ForCluster(clusterClient.ClusterName, clusterClient.DynamicClientSet, 0)
gvrs := []schema.GroupVersionResource{
nodeGVR,
podGVR,
} }
gvrs := []schema.GroupVersionResource{nodeGVR, podGVR}
// create the informer for pods and nodes // create the informer for pods and nodes
allSynced := true
for _, gvr := range gvrs { for _, gvr := range gvrs {
if !singleClusterInformerManager.IsInformerSynced(gvr) {
allSynced = false
singleClusterInformerManager.Lister(gvr) singleClusterInformerManager.Lister(gvr)
} }
}
if allSynced {
return singleClusterInformerManager, nil
}
c.InformerManager.Start(cluster.Name, c.StopChan) c.InformerManager.Start(cluster.Name, c.StopChan)
synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan) synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan)

View File

@ -348,9 +348,16 @@ func (c *WorkStatusController) registerInformersAndStart(cluster *v1alpha1.Clust
return err return err
} }
allSynced := true
for gvr := range gvrTargets { for gvr := range gvrTargets {
if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, c.getEventHandler()) {
allSynced = false
singleClusterInformerManager.ForResource(gvr, c.getEventHandler()) singleClusterInformerManager.ForResource(gvr, c.getEventHandler())
} }
}
if allSynced {
return nil
}
c.InformerManager.Start(cluster.Name, c.StopChan) c.InformerManager.Start(cluster.Name, c.StopChan)
synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan) synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan)

View File

@ -8,6 +8,19 @@ import (
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
) )
var (
instance MultiClusterInformerManager
once sync.Once
)
// GetInstance returns a shared MultiClusterInformerManager instance.
func GetInstance() MultiClusterInformerManager {
once.Do(func() {
instance = NewMultiClusterInformerManager()
})
return instance
}
// MultiClusterInformerManager manages dynamic shared informer for all resources, include Kubernetes resource and // MultiClusterInformerManager manages dynamic shared informer for all resources, include Kubernetes resource and
// custom resources defined by CustomResourceDefinition, across multi-cluster. // custom resources defined by CustomResourceDefinition, across multi-cluster.
type MultiClusterInformerManager interface { type MultiClusterInformerManager interface {

View File

@ -18,6 +18,13 @@ type SingleClusterInformerManager interface {
// The handler should not be nil. // The handler should not be nil.
ForResource(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) ForResource(resource schema.GroupVersionResource, handler cache.ResourceEventHandler)
// IsInformerSynced checks if the resource's informer is synced.
// A informer is synced means:
// - The informer has been created(by method 'ForResource' or 'Lister').
// - The informer has started(by method 'Start').
// - The informer's cache has been synced.
IsInformerSynced(resource schema.GroupVersionResource) bool
// IsHandlerExist checks if handler already added to a the informer that watches the 'resource'. // IsHandlerExist checks if handler already added to a the informer that watches the 'resource'.
IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool
@ -39,12 +46,15 @@ func NewSingleClusterInformerManager(client dynamic.Interface, defaultResync tim
return &singleClusterInformerManagerImpl{ return &singleClusterInformerManagerImpl{
informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, defaultResync), informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, defaultResync),
handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler), handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler),
syncedInformers: make(map[schema.GroupVersionResource]struct{}),
} }
} }
type singleClusterInformerManagerImpl struct { type singleClusterInformerManagerImpl struct {
informerFactory dynamicinformer.DynamicSharedInformerFactory informerFactory dynamicinformer.DynamicSharedInformerFactory
syncedInformers map[schema.GroupVersionResource]struct{}
handlers map[schema.GroupVersionResource][]cache.ResourceEventHandler handlers map[schema.GroupVersionResource][]cache.ResourceEventHandler
lock sync.RWMutex lock sync.RWMutex
@ -60,6 +70,14 @@ func (s *singleClusterInformerManagerImpl) ForResource(resource schema.GroupVers
s.appendHandler(resource, handler) s.appendHandler(resource, handler)
} }
func (s *singleClusterInformerManagerImpl) IsInformerSynced(resource schema.GroupVersionResource) bool {
s.lock.RLock()
defer s.lock.RUnlock()
_, exist := s.syncedInformers[resource]
return exist
}
func (s *singleClusterInformerManagerImpl) IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool { func (s *singleClusterInformerManagerImpl) IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool {
s.lock.RLock() s.lock.RLock()
defer s.lock.RUnlock() defer s.lock.RUnlock()
@ -98,5 +116,13 @@ func (s *singleClusterInformerManagerImpl) Start(stopCh <-chan struct{}) {
} }
func (s *singleClusterInformerManagerImpl) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { func (s *singleClusterInformerManagerImpl) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
return s.informerFactory.WaitForCacheSync(stopCh) s.lock.Lock()
defer s.lock.Unlock()
res := s.informerFactory.WaitForCacheSync(stopCh)
for resource, synced := range res {
if _, exist := s.syncedInformers[resource]; !exist && synced {
s.syncedInformers[resource] = struct{}{}
}
}
return res
} }