Add Informer manager interfaces and implementation
Co-authored-by: chenxianpao <chenxianpao@huawei.com>
This commit is contained in:
parent
148d5ed18b
commit
9b8cdab45c
|
|
@ -0,0 +1,87 @@
|
|||
package informermanager
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
)
|
||||
|
||||
// MultiClusterInformerManager manages dynamic shared informer for all resources, include Kubernetes resource and
|
||||
// custom resources defined by CustomResourceDefinition, across multi-cluster.
|
||||
type MultiClusterInformerManager interface {
|
||||
// ForCluster builds a informer manager for a specific cluster.
|
||||
ForCluster(cluster string, client dynamic.Interface, defaultResync time.Duration) SingleClusterInformerManager
|
||||
|
||||
// IsManagerExist checks if the informer manager for the cluster already created.
|
||||
IsManagerExist(cluster string) bool
|
||||
|
||||
// Start will run all informers for a specific cluster, it accepts a stop channel, the informers will keep running until channel closed.
|
||||
// Should call after 'ForCluster', otherwise no-ops.
|
||||
Start(cluster string, stopCh <-chan struct{})
|
||||
|
||||
// WaitForCacheSync waits for all caches to populate.
|
||||
// Should call after 'ForCluster', otherwise no-ops.
|
||||
WaitForCacheSync(cluster string, stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
|
||||
}
|
||||
|
||||
// NewMultiClusterInformerManagerImpl constructs a new instance of multiClusterInformerManagerImpl.
|
||||
func NewMultiClusterInformerManagerImpl() MultiClusterInformerManager {
|
||||
return &multiClusterInformerManagerImpl{
|
||||
managers: make(map[string]SingleClusterInformerManager),
|
||||
}
|
||||
}
|
||||
|
||||
type multiClusterInformerManagerImpl struct {
|
||||
managers map[string]SingleClusterInformerManager
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
func (m *multiClusterInformerManagerImpl) ForCluster(cluster string, client dynamic.Interface, defaultResync time.Duration) SingleClusterInformerManager {
|
||||
// If informer manager already exist, just return
|
||||
m.lock.RLock()
|
||||
manager, exist := m.managers[cluster]
|
||||
if exist {
|
||||
m.lock.RUnlock()
|
||||
return manager
|
||||
}
|
||||
m.lock.RUnlock()
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
manager = NewSingleClusterInformerManager(client, defaultResync)
|
||||
m.managers[cluster] = manager
|
||||
return manager
|
||||
}
|
||||
|
||||
func (m *multiClusterInformerManagerImpl) IsManagerExist(cluster string) bool {
|
||||
m.lock.RLock()
|
||||
defer m.lock.RUnlock()
|
||||
|
||||
_, exist := m.managers[cluster]
|
||||
|
||||
return exist
|
||||
}
|
||||
|
||||
func (m *multiClusterInformerManagerImpl) Start(cluster string, stopCh <-chan struct{}) {
|
||||
m.lock.RLock()
|
||||
defer m.lock.RUnlock()
|
||||
|
||||
// if informer manager haven't been created, just return with no-ops.
|
||||
manager, exist := m.managers[cluster]
|
||||
if !exist {
|
||||
return
|
||||
}
|
||||
manager.Start(stopCh)
|
||||
}
|
||||
|
||||
func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string, stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
|
||||
m.lock.RLock()
|
||||
defer m.lock.RUnlock()
|
||||
|
||||
manager, exist := m.managers[cluster]
|
||||
if !exist {
|
||||
return nil
|
||||
}
|
||||
return manager.WaitForCacheSync(stopCh)
|
||||
}
|
||||
|
|
@ -0,0 +1,94 @@
|
|||
package informermanager
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// SingleClusterInformerManager manages dynamic shared informer for all resources, include Kubernetes resource and
|
||||
// custom resources defined by CustomResourceDefinition.
|
||||
type SingleClusterInformerManager interface {
|
||||
// ForResource builds a dynamic shared informer for 'resource' then set event handler.
|
||||
// If the informer already exist, the event handler will be appended to the informer.
|
||||
// The handler should not be nil.
|
||||
ForResource(resource schema.GroupVersionResource, handler cache.ResourceEventHandler)
|
||||
|
||||
// IsHandlerExist checks if handler already added to a the informer that watches the 'resource'.
|
||||
IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool
|
||||
|
||||
// Start will run all informers with a stop channel, the informers will keep running until channel closed.
|
||||
// It is intended to be called after create new informer(s), and it's safe to call multi times.
|
||||
Start(stopCh <-chan struct{})
|
||||
|
||||
// WaitForCacheSync waits for all caches to populate.
|
||||
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
|
||||
}
|
||||
|
||||
// NewSingleClusterInformerManager constructs a new instance of singleClusterInformerManagerImpl.
|
||||
// defaultResync with value '0' means no re-sync.
|
||||
func NewSingleClusterInformerManager(client dynamic.Interface, defaultResync time.Duration) SingleClusterInformerManager {
|
||||
return &singleClusterInformerManagerImpl{
|
||||
informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, defaultResync),
|
||||
handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler),
|
||||
}
|
||||
}
|
||||
|
||||
type singleClusterInformerManagerImpl struct {
|
||||
informerFactory dynamicinformer.DynamicSharedInformerFactory
|
||||
|
||||
handlers map[schema.GroupVersionResource][]cache.ResourceEventHandler
|
||||
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
func (s *singleClusterInformerManagerImpl) ForResource(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) {
|
||||
// if handler already exist, just return, nothing changed.
|
||||
if s.IsHandlerExist(resource, handler) {
|
||||
return
|
||||
}
|
||||
|
||||
s.informerFactory.ForResource(resource).Informer().AddEventHandler(handler)
|
||||
s.appendHandler(resource, handler)
|
||||
}
|
||||
|
||||
func (s *singleClusterInformerManagerImpl) IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
handlers, exist := s.handlers[resource]
|
||||
if !exist {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, h := range handlers {
|
||||
if h == handler {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *singleClusterInformerManagerImpl) appendHandler(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
// assume the handler list exist, caller should ensure for that.
|
||||
handlers := s.handlers[resource]
|
||||
|
||||
// assume the handler not exist in it, caller should ensure for that.
|
||||
s.handlers[resource] = append(handlers, handler)
|
||||
}
|
||||
|
||||
func (s *singleClusterInformerManagerImpl) Start(stopCh <-chan struct{}) {
|
||||
s.informerFactory.Start(stopCh)
|
||||
}
|
||||
|
||||
func (s *singleClusterInformerManagerImpl) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
|
||||
return s.informerFactory.WaitForCacheSync(stopCh)
|
||||
}
|
||||
Loading…
Reference in New Issue