Merge pull request #894 from mrlihanbo/cluster_status_bugfix
add Timeout in WaitForCacheSync
This commit is contained in:
commit
51c911a60b
|
@ -95,6 +95,11 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr
|
|||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
if !util.IsClusterReady(&cluster.Status) {
|
||||
klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
|
||||
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
|
||||
}
|
||||
|
||||
return c.buildResourceInformers(cluster)
|
||||
}
|
||||
|
||||
|
@ -175,16 +180,22 @@ func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1al
|
|||
}
|
||||
|
||||
c.InformerManager.Start(cluster.Name)
|
||||
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
|
||||
if synced == nil {
|
||||
klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name)
|
||||
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
|
||||
}
|
||||
for _, gvr := range gvrTargets {
|
||||
if !synced[gvr] {
|
||||
klog.Errorf("Informer for %s hasn't synced.", gvr)
|
||||
return fmt.Errorf("informer for %s hasn't synced", gvr)
|
||||
|
||||
if err := func() error {
|
||||
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout)
|
||||
if synced == nil {
|
||||
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
|
||||
}
|
||||
for _, gvr := range gvrTargets {
|
||||
if !synced[gvr] {
|
||||
return fmt.Errorf("informer for %s hasn't synced", gvr)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
|
||||
c.InformerManager.Stop(cluster.Name)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -118,16 +118,6 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
|
|||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
// get or create informer for pods and nodes in member cluster
|
||||
clusterInformerManager, err := c.buildInformerForCluster(cluster)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
|
||||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
// init the lease controller for every cluster
|
||||
c.initLeaseController(clusterInformerManager.Context(), cluster)
|
||||
|
||||
var currentClusterStatus = clusterv1alpha1.ClusterStatus{}
|
||||
|
||||
var online, healthy bool
|
||||
|
@ -146,9 +136,20 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
|
|||
klog.V(2).Infof("Cluster(%s) still offline after retry, ensuring offline is set.", cluster.Name)
|
||||
currentClusterStatus.Conditions = generateReadyCondition(false, false)
|
||||
setTransitionTime(&cluster.Status, ¤tClusterStatus)
|
||||
c.InformerManager.Stop(cluster.Name)
|
||||
return c.updateStatusIfNeeded(cluster, currentClusterStatus)
|
||||
}
|
||||
|
||||
// get or create informer for pods and nodes in member cluster
|
||||
clusterInformerManager, err := c.buildInformerForCluster(cluster)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
|
||||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
// init the lease controller for every cluster
|
||||
c.initLeaseController(clusterInformerManager.Context(), cluster)
|
||||
|
||||
clusterVersion, err := getKubernetesVersion(clusterClient)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get server version of the member cluster: %v, err is : %v", cluster.Name, err)
|
||||
|
@ -227,17 +228,24 @@ func (c *ClusterStatusController) buildInformerForCluster(cluster *clusterv1alph
|
|||
}
|
||||
|
||||
c.InformerManager.Start(cluster.Name)
|
||||
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
|
||||
if synced == nil {
|
||||
klog.Errorf("The informer factory for cluster(%s) does not exist.", cluster.Name)
|
||||
return nil, fmt.Errorf("informer factory for cluster(%s) does not exist", cluster.Name)
|
||||
}
|
||||
for _, gvr := range gvrs {
|
||||
if !synced[gvr] {
|
||||
klog.Errorf("Informer for %s hasn't synced.", gvr)
|
||||
return nil, fmt.Errorf("informer for %s hasn't synced", gvr)
|
||||
|
||||
if err := func() error {
|
||||
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout)
|
||||
if synced == nil {
|
||||
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
|
||||
}
|
||||
for _, gvr := range gvrs {
|
||||
if !synced[gvr] {
|
||||
return fmt.Errorf("informer for %s hasn't synced", gvr)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
|
||||
c.InformerManager.Stop(cluster.Name)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return singleClusterInformerManager, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -83,6 +83,11 @@ func (c *WorkStatusController) Reconcile(ctx context.Context, req controllerrunt
|
|||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
if !util.IsClusterReady(&cluster.Status) {
|
||||
klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
|
||||
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
|
||||
}
|
||||
|
||||
return c.buildResourceInformers(cluster, work)
|
||||
}
|
||||
|
||||
|
@ -391,17 +396,24 @@ func (c *WorkStatusController) registerInformersAndStart(cluster *clusterv1alpha
|
|||
}
|
||||
|
||||
c.InformerManager.Start(cluster.Name)
|
||||
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
|
||||
if synced == nil {
|
||||
klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name)
|
||||
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
|
||||
}
|
||||
for gvr := range gvrTargets {
|
||||
if !synced[gvr] {
|
||||
klog.Errorf("Informer for %s hasn't synced.", gvr)
|
||||
return fmt.Errorf("informer for %s hasn't synced", gvr)
|
||||
|
||||
if err := func() error {
|
||||
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout)
|
||||
if synced == nil {
|
||||
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
|
||||
}
|
||||
for gvr := range gvrTargets {
|
||||
if !synced[gvr] {
|
||||
return fmt.Errorf("informer for %s hasn't synced", gvr)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
|
||||
c.InformerManager.Stop(cluster.Name)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package util
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
// ServiceNamespaceLabel is added to work object, which is report by member cluster, to specify service namespace associated with EndpointSlice.
|
||||
ServiceNamespaceLabel = "endpointslice.karmada.io/namespace"
|
||||
|
@ -125,3 +127,8 @@ const (
|
|||
// ContextKeyObject is the context value key of a resource.
|
||||
ContextKeyObject ContextKey = "object"
|
||||
)
|
||||
|
||||
const (
|
||||
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
|
||||
CacheSyncTimeout = 30 * time.Second
|
||||
)
|
||||
|
|
|
@ -57,6 +57,10 @@ type MultiClusterInformerManager interface {
|
|||
// WaitForCacheSync waits for all caches to populate.
|
||||
// Should call after 'ForCluster', otherwise no-ops.
|
||||
WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool
|
||||
|
||||
// WaitForCacheSyncWithTimeout waits for all caches to populate with a definitive timeout.
|
||||
// Should call after 'ForCluster', otherwise no-ops.
|
||||
WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool
|
||||
}
|
||||
|
||||
// NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl.
|
||||
|
@ -132,3 +136,11 @@ func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string) map[s
|
|||
}
|
||||
return manager.WaitForCacheSync()
|
||||
}
|
||||
|
||||
func (m *multiClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool {
|
||||
manager, exist := m.getManager(cluster)
|
||||
if !exist {
|
||||
return nil
|
||||
}
|
||||
return manager.WaitForCacheSyncWithTimeout(cacheSyncTimeout)
|
||||
}
|
||||
|
|
|
@ -46,6 +46,9 @@ type SingleClusterInformerManager interface {
|
|||
// WaitForCacheSync waits for all caches to populate.
|
||||
WaitForCacheSync() map[schema.GroupVersionResource]bool
|
||||
|
||||
// WaitForCacheSyncWithTimeout waits for all caches to populate with a definitive timeout.
|
||||
WaitForCacheSyncWithTimeout(cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool
|
||||
|
||||
// Context returns the single cluster context.
|
||||
Context() context.Context
|
||||
}
|
||||
|
@ -136,9 +139,20 @@ func (s *singleClusterInformerManagerImpl) Stop() {
|
|||
}
|
||||
|
||||
func (s *singleClusterInformerManagerImpl) WaitForCacheSync() map[schema.GroupVersionResource]bool {
|
||||
return s.waitForCacheSync(s.ctx)
|
||||
}
|
||||
|
||||
func (s *singleClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool {
|
||||
ctx, cancel := context.WithTimeout(s.ctx, cacheSyncTimeout)
|
||||
defer cancel()
|
||||
|
||||
return s.waitForCacheSync(ctx)
|
||||
}
|
||||
|
||||
func (s *singleClusterInformerManagerImpl) waitForCacheSync(ctx context.Context) map[schema.GroupVersionResource]bool {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
res := s.informerFactory.WaitForCacheSync(s.ctx.Done())
|
||||
res := s.informerFactory.WaitForCacheSync(ctx.Done())
|
||||
for resource, synced := range res {
|
||||
if _, exist := s.syncedInformers[resource]; !exist && synced {
|
||||
s.syncedInformers[resource] = struct{}{}
|
||||
|
|
Loading…
Reference in New Issue