Start lease controller with separated context.

Signed-off-by: RainbowMango <qdurenhongcai@gmail.com>
This commit is contained in:
RainbowMango 2022-08-16 10:10:54 +08:00
parent c5ce2af608
commit 8bb253a483
1 changed files with 24 additions and 7 deletions

View File

@ -77,7 +77,10 @@ type ClusterStatusController struct {
// ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that
// how long the current holder of a lease has last updated the lease.
ClusterLeaseRenewIntervalFraction float64
// ClusterLeaseControllers store clusters and their corresponding lease controllers.
// ClusterLeaseControllers stores context canceler function for each lease controller.
// Each lease controller is started with a separated context.
// key: cluster name of the lease controller servers for.
// value: context canceler function to stop the controller after cluster is un-registered.
ClusterLeaseControllers sync.Map
// ClusterSuccessThreshold is the duration of successes for the cluster to be considered healthy after recovery.
ClusterSuccessThreshold metav1.Duration
@ -102,7 +105,16 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr
if apierrors.IsNotFound(err) {
c.GenericInformerManager.Stop(req.NamespacedName.Name)
c.TypedInformerManager.Stop(req.NamespacedName.Name)
c.clusterConditionCache.delete(req.Name)
c.clusterConditionCache.delete(req.NamespacedName.Name)
// stop lease controller after the cluster is gone.
// only used for clusters in Pull mode because no need to set up lease syncing for Push clusters.
canceller, exists := c.ClusterLeaseControllers.LoadAndDelete(req.NamespacedName.Name)
if exists {
if cf, ok := canceller.(context.CancelFunc); ok {
cf()
}
}
return controllerruntime.Result{}, nil
}
@ -170,7 +182,7 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
if cluster.Spec.SyncMode == clusterv1alpha1.Pull {
// init the lease controller for pull mode clusters
c.initLeaseController(clusterInformerManager.Context(), cluster)
c.initLeaseController(cluster)
}
clusterVersion, err := getKubernetesVersion(clusterClient)
@ -304,7 +316,7 @@ func (c *ClusterStatusController) buildInformerForCluster(clusterClient *util.Cl
return singleClusterInformerManager, nil
}
func (c *ClusterStatusController) initLeaseController(ctx context.Context, cluster *clusterv1alpha1.Cluster) {
func (c *ClusterStatusController) initLeaseController(cluster *clusterv1alpha1.Cluster) {
// If lease controller has been registered, we skip this function.
if _, exists := c.ClusterLeaseControllers.Load(cluster.Name); exists {
return
@ -323,13 +335,18 @@ func (c *ClusterStatusController) initLeaseController(ctx context.Context, clust
util.NamespaceClusterLease,
util.SetLeaseOwnerFunc(c.Client, cluster.Name))
c.ClusterLeaseControllers.Store(cluster.Name, clusterLeaseController)
ctx, cancelFunc := context.WithCancel(context.TODO())
c.ClusterLeaseControllers.Store(cluster.Name, cancelFunc)
// start syncing lease
go func() {
klog.Infof("Starting syncing lease for cluster: %s", cluster.Name)
// lease controller will keep running until the stop channel is closed(context is canceled)
clusterLeaseController.Run(ctx.Done())
<-ctx.Done()
c.ClusterLeaseControllers.Delete(cluster.Name)
klog.Infof("Stop syncing lease for cluster: %s", cluster.Name)
c.ClusterLeaseControllers.Delete(cluster.Name) // ensure the cache is clean
}()
}