Merge pull request #2385 from RainbowMango/pr_lease_context

Start lease controller with separated context.
This commit is contained in:
karmada-bot 2022-08-16 20:59:44 +08:00 committed by GitHub
commit 39400eda1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 24 additions and 7 deletions

View File

@ -77,7 +77,10 @@ type ClusterStatusController struct {
// ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that // ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that
// how long the current holder of a lease has last updated the lease. // how long the current holder of a lease has last updated the lease.
ClusterLeaseRenewIntervalFraction float64 ClusterLeaseRenewIntervalFraction float64
// ClusterLeaseControllers store clusters and their corresponding lease controllers. // ClusterLeaseControllers stores context canceler function for each lease controller.
// Each lease controller is started with a separated context.
// key: cluster name of the lease controller servers for.
// value: context canceler function to stop the controller after cluster is un-registered.
ClusterLeaseControllers sync.Map ClusterLeaseControllers sync.Map
// ClusterSuccessThreshold is the duration of successes for the cluster to be considered healthy after recovery. // ClusterSuccessThreshold is the duration of successes for the cluster to be considered healthy after recovery.
ClusterSuccessThreshold metav1.Duration ClusterSuccessThreshold metav1.Duration
@ -102,7 +105,16 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
c.GenericInformerManager.Stop(req.NamespacedName.Name) c.GenericInformerManager.Stop(req.NamespacedName.Name)
c.TypedInformerManager.Stop(req.NamespacedName.Name) c.TypedInformerManager.Stop(req.NamespacedName.Name)
c.clusterConditionCache.delete(req.Name) c.clusterConditionCache.delete(req.NamespacedName.Name)
// stop lease controller after the cluster is gone.
// only used for clusters in Pull mode because no need to set up lease syncing for Push clusters.
canceller, exists := c.ClusterLeaseControllers.LoadAndDelete(req.NamespacedName.Name)
if exists {
if cf, ok := canceller.(context.CancelFunc); ok {
cf()
}
}
return controllerruntime.Result{}, nil return controllerruntime.Result{}, nil
} }
@ -170,7 +182,7 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
if cluster.Spec.SyncMode == clusterv1alpha1.Pull { if cluster.Spec.SyncMode == clusterv1alpha1.Pull {
// init the lease controller for pull mode clusters // init the lease controller for pull mode clusters
c.initLeaseController(clusterInformerManager.Context(), cluster) c.initLeaseController(cluster)
} }
clusterVersion, err := getKubernetesVersion(clusterClient) clusterVersion, err := getKubernetesVersion(clusterClient)
@ -304,7 +316,7 @@ func (c *ClusterStatusController) buildInformerForCluster(clusterClient *util.Cl
return singleClusterInformerManager, nil return singleClusterInformerManager, nil
} }
func (c *ClusterStatusController) initLeaseController(ctx context.Context, cluster *clusterv1alpha1.Cluster) { func (c *ClusterStatusController) initLeaseController(cluster *clusterv1alpha1.Cluster) {
// If lease controller has been registered, we skip this function. // If lease controller has been registered, we skip this function.
if _, exists := c.ClusterLeaseControllers.Load(cluster.Name); exists { if _, exists := c.ClusterLeaseControllers.Load(cluster.Name); exists {
return return
@ -323,13 +335,18 @@ func (c *ClusterStatusController) initLeaseController(ctx context.Context, clust
util.NamespaceClusterLease, util.NamespaceClusterLease,
util.SetLeaseOwnerFunc(c.Client, cluster.Name)) util.SetLeaseOwnerFunc(c.Client, cluster.Name))
c.ClusterLeaseControllers.Store(cluster.Name, clusterLeaseController) ctx, cancelFunc := context.WithCancel(context.TODO())
c.ClusterLeaseControllers.Store(cluster.Name, cancelFunc)
// start syncing lease // start syncing lease
go func() { go func() {
klog.Infof("Starting syncing lease for cluster: %s", cluster.Name)
// lease controller will keep running until the stop channel is closed(context is canceled)
clusterLeaseController.Run(ctx.Done()) clusterLeaseController.Run(ctx.Done())
<-ctx.Done()
c.ClusterLeaseControllers.Delete(cluster.Name) klog.Infof("Stop syncing lease for cluster: %s", cluster.Name)
c.ClusterLeaseControllers.Delete(cluster.Name) // ensure the cache is clean
}() }()
} }