add typed shared informer

Signed-off-by: Poor12 <shentiecheng@huawei.com>
This commit is contained in:
Poor12 2022-08-09 17:19:47 +08:00
parent 0cc00f7bcf
commit e528657382
2 changed files with 354 additions and 0 deletions

View File

@ -0,0 +1,146 @@
package typedManager
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
)
var (
instance MultiClusterInformerManager
once sync.Once
stopOnce sync.Once
stopCh chan struct{}
)
func init() {
stopCh = make(chan struct{})
}
// GetInstance returns a shared MultiClusterInformerManager instance.
func GetInstance() MultiClusterInformerManager {
once.Do(func() {
instance = NewMultiClusterInformerManager(stopCh)
})
return instance
}
// StopInstance will stop the shared MultiClusterInformerManager instance.
func StopInstance() {
stopOnce.Do(func() {
close(stopCh)
})
}
// MultiClusterInformerManager manages typed shared informer for all resources, include Kubernetes resource and
// custom resources defined by CustomResourceDefinition, across multi-cluster.
type MultiClusterInformerManager interface {
// ForCluster builds an informer manager for a specific cluster.
ForCluster(cluster string, client kubernetes.Interface, defaultResync time.Duration) SingleClusterInformerManager
// GetSingleClusterManager gets the informer manager for a specific cluster.
// The informer manager should be created before, otherwise, nil will be returned.
GetSingleClusterManager(cluster string) SingleClusterInformerManager
// IsManagerExist checks if the informer manager for the cluster already created.
IsManagerExist(cluster string) bool
// Start will run all informers for a specific cluster.
// Should call after 'ForCluster', otherwise no-ops.
Start(cluster string)
// Stop will stop all informers for a specific cluster, and delete the cluster from informer managers.
Stop(cluster string)
// WaitForCacheSync waits for all caches to populate.
// Should call after 'ForCluster', otherwise no-ops.
WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool
// WaitForCacheSyncWithTimeout waits for all caches to populate with a definitive timeout.
// Should call after 'ForCluster', otherwise no-ops.
WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool
}
// NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl.
func NewMultiClusterInformerManager(stopCh <-chan struct{}) MultiClusterInformerManager {
return &multiClusterInformerManagerImpl{
managers: make(map[string]SingleClusterInformerManager),
stopCh: stopCh,
}
}
type multiClusterInformerManagerImpl struct {
managers map[string]SingleClusterInformerManager
stopCh <-chan struct{}
lock sync.RWMutex
}
func (m *multiClusterInformerManagerImpl) getManager(cluster string) (SingleClusterInformerManager, bool) {
m.lock.RLock()
defer m.lock.RUnlock()
manager, exist := m.managers[cluster]
return manager, exist
}
func (m *multiClusterInformerManagerImpl) ForCluster(cluster string, client kubernetes.Interface, defaultResync time.Duration) SingleClusterInformerManager {
// If informer manager already exist, just return
if manager, exist := m.getManager(cluster); exist {
return manager
}
m.lock.Lock()
defer m.lock.Unlock()
manager := NewSingleClusterInformerManager(client, defaultResync, m.stopCh)
m.managers[cluster] = manager
return manager
}
func (m *multiClusterInformerManagerImpl) GetSingleClusterManager(cluster string) SingleClusterInformerManager {
if manager, exist := m.getManager(cluster); exist {
return manager
}
return nil
}
func (m *multiClusterInformerManagerImpl) IsManagerExist(cluster string) bool {
_, exist := m.getManager(cluster)
return exist
}
func (m *multiClusterInformerManagerImpl) Start(cluster string) {
// if informer manager haven't been created, just return with no-ops.
manager, exist := m.getManager(cluster)
if !exist {
return
}
manager.Start()
}
func (m *multiClusterInformerManagerImpl) Stop(cluster string) {
manager, exist := m.getManager(cluster)
if !exist {
return
}
manager.Stop()
m.lock.Lock()
defer m.lock.Unlock()
delete(m.managers, cluster)
}
func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool {
manager, exist := m.getManager(cluster)
if !exist {
return nil
}
return manager.WaitForCacheSync()
}
func (m *multiClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool {
manager, exist := m.getManager(cluster)
if !exist {
return nil
}
return manager.WaitForCacheSyncWithTimeout(cacheSyncTimeout)
}

View File

@ -0,0 +1,208 @@
package typedManager
import (
"context"
"reflect"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/karmada-io/karmada/pkg/util"
)
var (
nodeGVR = corev1.SchemeGroupVersion.WithResource("nodes")
podGVR = corev1.SchemeGroupVersion.WithResource("pods")
gvrTypeMap = map[reflect.Type]schema.GroupVersionResource{
reflect.TypeOf(&corev1.Node{}): nodeGVR,
reflect.TypeOf(&corev1.Pod{}): podGVR,
}
)
// SingleClusterInformerManager manages typed shared informer for all resources, include Kubernetes resource and
// custom resources defined by CustomResourceDefinition.
type SingleClusterInformerManager interface {
// ForResource builds a typed shared informer for 'resource' then set event handler.
// If the informer already exist, the event handler will be appended to the informer.
// The handler should not be nil.
ForResource(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) error
// IsInformerSynced checks if the resource's informer is synced.
// An informer is synced means:
// - The informer has been created(by method 'ForResource' or 'Lister').
// - The informer has started(by method 'Start').
// - The informer's cache has been synced.
IsInformerSynced(resource schema.GroupVersionResource) bool
// IsHandlerExist checks if handler already added to the informer that watches the 'resource'.
IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool
// Lister returns a lister used to get 'resource' from informer's store.
// The informer for 'resource' will be created if not exist, but without any event handler.
Lister(resource schema.GroupVersionResource) (interface{}, error)
// Start will run all informers, the informers will keep running until the channel closed.
// It is intended to be called after create new informer(s), and it's safe to call multi times.
Start()
// Stop stops all single cluster informers of a cluster. Once it is stopped, it will be not able
// to Start again.
Stop()
// WaitForCacheSync waits for all caches to populate.
WaitForCacheSync() map[schema.GroupVersionResource]bool
// WaitForCacheSyncWithTimeout waits for all caches to populate with a definitive timeout.
WaitForCacheSyncWithTimeout(cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool
// Context returns the single cluster context.
Context() context.Context
// GetClient returns the typed client.
GetClient() kubernetes.Interface
}
// NewSingleClusterInformerManager constructs a new instance of singleClusterInformerManagerImpl.
// defaultResync with value '0' means no re-sync.
func NewSingleClusterInformerManager(client kubernetes.Interface, defaultResync time.Duration, parentCh <-chan struct{}) SingleClusterInformerManager {
ctx, cancel := util.ContextForChannel(parentCh)
return &singleClusterInformerManagerImpl{
informerFactory: informers.NewSharedInformerFactory(client, defaultResync),
handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler),
syncedInformers: make(map[schema.GroupVersionResource]struct{}),
ctx: ctx,
cancel: cancel,
client: client,
}
}
type singleClusterInformerManagerImpl struct {
ctx context.Context
cancel context.CancelFunc
informerFactory informers.SharedInformerFactory
syncedInformers map[schema.GroupVersionResource]struct{}
handlers map[schema.GroupVersionResource][]cache.ResourceEventHandler
client kubernetes.Interface
lock sync.RWMutex
}
func (s *singleClusterInformerManagerImpl) ForResource(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) error {
// if handler already exist, just return, nothing changed.
if s.IsHandlerExist(resource, handler) {
return nil
}
resourceInformer, err := s.informerFactory.ForResource(resource)
if err != nil {
return err
}
resourceInformer.Informer().AddEventHandler(handler)
s.appendHandler(resource, handler)
return nil
}
func (s *singleClusterInformerManagerImpl) IsInformerSynced(resource schema.GroupVersionResource) bool {
s.lock.RLock()
defer s.lock.RUnlock()
_, exist := s.syncedInformers[resource]
return exist
}
func (s *singleClusterInformerManagerImpl) IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool {
s.lock.RLock()
defer s.lock.RUnlock()
handlers, exist := s.handlers[resource]
if !exist {
return false
}
for _, h := range handlers {
if h == handler {
return true
}
}
return false
}
func (s *singleClusterInformerManagerImpl) Lister(resource schema.GroupVersionResource) (interface{}, error) {
if resource == nodeGVR {
return s.informerFactory.Core().V1().Nodes().Lister(), nil
}
if resource == podGVR {
return s.informerFactory.Core().V1().Pods().Lister(), nil
}
resourceInformer, err := s.informerFactory.ForResource(resource)
if err != nil {
return nil, err
}
return resourceInformer.Lister(), nil
}
func (s *singleClusterInformerManagerImpl) appendHandler(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) {
s.lock.Lock()
defer s.lock.Unlock()
// assume the handler list exist, caller should ensure for that.
handlers := s.handlers[resource]
// assume the handler not exist in it, caller should ensure for that.
s.handlers[resource] = append(handlers, handler)
}
func (s *singleClusterInformerManagerImpl) Start() {
s.informerFactory.Start(s.ctx.Done())
}
func (s *singleClusterInformerManagerImpl) Stop() {
s.cancel()
}
func (s *singleClusterInformerManagerImpl) WaitForCacheSync() map[schema.GroupVersionResource]bool {
return s.waitForCacheSync(s.ctx)
}
func (s *singleClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool {
ctx, cancel := context.WithTimeout(s.ctx, cacheSyncTimeout)
defer cancel()
return s.waitForCacheSync(ctx)
}
func (s *singleClusterInformerManagerImpl) waitForCacheSync(ctx context.Context) map[schema.GroupVersionResource]bool {
s.lock.Lock()
defer s.lock.Unlock()
res := s.informerFactory.WaitForCacheSync(ctx.Done())
m := make(map[schema.GroupVersionResource]bool)
for resource, synced := range res {
if gvr, exist := gvrTypeMap[resource]; exist {
m[gvr] = synced
if _, exist = s.syncedInformers[gvr]; !exist && synced {
s.syncedInformers[gvr] = struct{}{}
}
}
}
return m
}
func (s *singleClusterInformerManagerImpl) Context() context.Context {
return s.ctx
}
func (s *singleClusterInformerManagerImpl) GetClient() kubernetes.Interface {
return s.client
}