add cluster lease controller
Signed-off-by: Garrybest <garrybest@foxmail.com>
This commit is contained in:
parent
821742ec91
commit
b9010cfe9b
|
@ -7,6 +7,7 @@ import (
|
|||
"os"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
kubeclientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/klog/v2"
|
||||
|
@ -103,14 +104,17 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
}
|
||||
|
||||
clusterStatusController := &status.ClusterStatusController{
|
||||
Client: mgr.GetClient(),
|
||||
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
|
||||
PredicateFunc: predicateFun,
|
||||
InformerManager: informermanager.NewMultiClusterInformerManager(),
|
||||
StopChan: stopChan,
|
||||
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
|
||||
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
||||
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
||||
Client: mgr.GetClient(),
|
||||
KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()),
|
||||
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
|
||||
PredicateFunc: predicateFun,
|
||||
InformerManager: informermanager.NewMultiClusterInformerManager(),
|
||||
StopChan: stopChan,
|
||||
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
|
||||
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
||||
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
||||
ClusterLeaseDuration: opts.ClusterLeaseDuration,
|
||||
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
|
||||
}
|
||||
if err := clusterStatusController.SetupWithManager(mgr); err != nil {
|
||||
klog.Fatalf("Failed to setup cluster status controller: %v", err)
|
||||
|
|
|
@ -12,11 +12,15 @@ type Options struct {
|
|||
KarmadaKubeConfig string
|
||||
ClusterName string
|
||||
|
||||
// ClusterStatusUpdateFrequency is the frequency that karmada-agent computes cluster status.
|
||||
// If cluster lease feature is not enabled, it is also the frequency that karmada-agent posts cluster status
|
||||
// to karmada-apiserver. In that case, be cautious when changing the constant, it must work with
|
||||
// ClusterMonitorGracePeriod(--cluster-monitor-grace-period) in karmada-controller-manager.
|
||||
// ClusterStatusUpdateFrequency is the frequency that controller computes and report cluster status.
|
||||
// It must work with ClusterMonitorGracePeriod(--cluster-monitor-grace-period) in karmada-controller-manager.
|
||||
ClusterStatusUpdateFrequency metav1.Duration
|
||||
// ClusterLeaseDuration is a duration that candidates for a lease need to wait to force acquire it.
|
||||
// This is measure against time of last observed RenewTime.
|
||||
ClusterLeaseDuration metav1.Duration
|
||||
// ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that
|
||||
// how long the current holder of a lease has last updated the lease.
|
||||
ClusterLeaseRenewIntervalFraction float64
|
||||
}
|
||||
|
||||
// NewOptions builds an default scheduler options.
|
||||
|
@ -33,4 +37,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
|
|||
fs.StringVar(&o.KarmadaKubeConfig, "karmada-kubeconfig", o.KarmadaKubeConfig, "Path to karmada kubeconfig.")
|
||||
fs.StringVar(&o.ClusterName, "cluster-name", o.ClusterName, "Name of member cluster that the agent serves for.")
|
||||
fs.DurationVar(&o.ClusterStatusUpdateFrequency.Duration, "cluster-status-update-frequency", 10*time.Second, "Specifies how often karmada-agent posts cluster status to karmada-apiserver. Note: be cautious when changing the constant, it must work with ClusterMonitorGracePeriod in karmada-controller-manager.")
|
||||
fs.DurationVar(&o.ClusterLeaseDuration.Duration, "cluster-lease-duration", 40*time.Second,
|
||||
"Specifies the expiration period of a cluster lease.")
|
||||
fs.Float64Var(&o.ClusterLeaseRenewIntervalFraction, "cluster-lease-renew-interval-fraction", 0.25,
|
||||
"Specifies the cluster lease renew interval fraction.")
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/spf13/cobra"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/dynamic"
|
||||
kubeclientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/component-base/logs"
|
||||
"k8s.io/klog/v2"
|
||||
controllerruntime "sigs.k8s.io/controller-runtime"
|
||||
|
@ -143,14 +144,17 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
}
|
||||
|
||||
clusterStatusController := &status.ClusterStatusController{
|
||||
Client: mgr.GetClient(),
|
||||
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
|
||||
PredicateFunc: clusterPredicateFunc,
|
||||
InformerManager: informermanager.NewMultiClusterInformerManager(),
|
||||
StopChan: stopChan,
|
||||
ClusterClientSetFunc: util.NewClusterClientSet,
|
||||
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
|
||||
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
||||
Client: mgr.GetClient(),
|
||||
KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()),
|
||||
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
|
||||
PredicateFunc: clusterPredicateFunc,
|
||||
InformerManager: informermanager.NewMultiClusterInformerManager(),
|
||||
StopChan: stopChan,
|
||||
ClusterClientSetFunc: util.NewClusterClientSet,
|
||||
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
|
||||
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
||||
ClusterLeaseDuration: opts.ClusterLeaseDuration,
|
||||
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
|
||||
}
|
||||
if err := clusterStatusController.SetupWithManager(mgr); err != nil {
|
||||
klog.Fatalf("Failed to setup cluster status controller: %v", err)
|
||||
|
|
|
@ -30,11 +30,15 @@ type Options struct {
|
|||
// SecurePort is the port that the the server serves at.
|
||||
// Note: We hope support https in the future once controller-runtime provides the functionality.
|
||||
SecurePort int
|
||||
// ClusterStatusUpdateFrequency is the frequency that karmada-controller-manager computes cluster status.
|
||||
// If cluster lease feature is not enabled, it is also the frequency that karmada-agent posts cluster status
|
||||
// to karmada-apiserver. In that case, be cautious when changing the constant, it must work with
|
||||
// ClusterMonitorGracePeriod(--cluster-monitor-grace-period) in karmada-controller-manager.
|
||||
// ClusterStatusUpdateFrequency is the frequency that controller computes and report cluster status.
|
||||
// It must work with ClusterMonitorGracePeriod(--cluster-monitor-grace-period) in karmada-controller-manager.
|
||||
ClusterStatusUpdateFrequency metav1.Duration
|
||||
// ClusterLeaseDuration is a duration that candidates for a lease need to wait to force acquire it.
|
||||
// This is measure against time of last observed lease RenewTime.
|
||||
ClusterLeaseDuration metav1.Duration
|
||||
// ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that
|
||||
// how long the current holder of a lease has last updated the lease.
|
||||
ClusterLeaseRenewIntervalFraction float64
|
||||
}
|
||||
|
||||
// NewOptions builds an empty options.
|
||||
|
@ -79,4 +83,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
|
|||
flags.DurationVar(&o.ClusterStatusUpdateFrequency.Duration, "cluster-status-update-frequency", 10*time.Second,
|
||||
"Specifies how often karmada-controller-manager posts cluster status to karmada-apiserver.")
|
||||
flags.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", true, "Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.")
|
||||
flags.DurationVar(&o.ClusterLeaseDuration.Duration, "cluster-lease-duration", 40*time.Second,
|
||||
"Specifies the expiration period of a cluster lease.")
|
||||
flags.Float64Var(&o.ClusterLeaseRenewIntervalFraction, "cluster-lease-renew-interval-fraction", 0.25,
|
||||
"Specifies the cluster lease renew interval fraction.")
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
@ -15,9 +16,12 @@ import (
|
|||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/component-helpers/apimachinery/lease"
|
||||
"k8s.io/klog/v2"
|
||||
controllerruntime "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
@ -52,6 +56,7 @@ var (
|
|||
// ClusterStatusController is to sync status of Cluster.
|
||||
type ClusterStatusController struct {
|
||||
client.Client // used to operate Cluster resources.
|
||||
KubeClient clientset.Interface
|
||||
EventRecorder record.EventRecorder
|
||||
PredicateFunc predicate.Predicate
|
||||
InformerManager informermanager.MultiClusterInformerManager
|
||||
|
@ -59,10 +64,16 @@ type ClusterStatusController struct {
|
|||
ClusterClientSetFunc func(c *v1alpha1.Cluster, client client.Client) (*util.ClusterClient, error)
|
||||
ClusterDynamicClientSetFunc func(c *v1alpha1.Cluster, client client.Client) (*util.DynamicClusterClient, error)
|
||||
|
||||
// ClusterStatusUpdateFrequency is the frequency that controller computes cluster status.
|
||||
// If cluster lease feature is not enabled, it is also the frequency that controller posts cluster status
|
||||
// to karmada-apiserver.
|
||||
// ClusterStatusUpdateFrequency is the frequency that controller computes and report cluster status.
|
||||
ClusterStatusUpdateFrequency metav1.Duration
|
||||
// ClusterLeaseDuration is a duration that candidates for a lease need to wait to force acquire it.
|
||||
// This is measure against time of last observed lease RenewTime.
|
||||
ClusterLeaseDuration metav1.Duration
|
||||
// ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that
|
||||
// how long the current holder of a lease has last updated the lease.
|
||||
ClusterLeaseRenewIntervalFraction float64
|
||||
// ClusterLeaseControllers store clusters and their corresponding lease controllers.
|
||||
ClusterLeaseControllers sync.Map
|
||||
}
|
||||
|
||||
// Reconcile syncs status of the given member cluster.
|
||||
|
@ -116,6 +127,9 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *v1alpha1.Cluster) (
|
|||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
// init the lease controller for every cluster
|
||||
c.initLeaseController(cluster)
|
||||
|
||||
var currentClusterStatus = v1alpha1.ClusterStatus{}
|
||||
|
||||
// get the health status of member cluster
|
||||
|
@ -224,6 +238,36 @@ func (c *ClusterStatusController) buildInformerForCluster(cluster *v1alpha1.Clus
|
|||
return singleClusterInformerManager, nil
|
||||
}
|
||||
|
||||
func (c *ClusterStatusController) initLeaseController(cluster *v1alpha1.Cluster) {
|
||||
// If lease controller has been registered, we skip this function.
|
||||
if _, exists := c.ClusterLeaseControllers.Load(cluster.Name); exists {
|
||||
return
|
||||
}
|
||||
|
||||
// renewInterval is how often the lease renew time is updated.
|
||||
renewInterval := time.Duration(float64(c.ClusterLeaseDuration.Nanoseconds()) * c.ClusterLeaseRenewIntervalFraction)
|
||||
|
||||
nodeLeaseController := lease.NewController(
|
||||
clock.RealClock{},
|
||||
c.KubeClient,
|
||||
cluster.Name,
|
||||
int32(c.ClusterLeaseDuration.Seconds()),
|
||||
nil,
|
||||
renewInterval,
|
||||
util.NamespaceClusterLease,
|
||||
util.SetLeaseOwnerFunc(c.Client, cluster.Name))
|
||||
|
||||
c.ClusterLeaseControllers.Store(cluster.Name, nodeLeaseController)
|
||||
|
||||
// start syncing lease
|
||||
// todo(garryfang): stop the lease controller when cluster does not exist according to #384
|
||||
go func() {
|
||||
nodeLeaseController.Run(c.StopChan)
|
||||
<-c.StopChan
|
||||
c.ClusterLeaseControllers.Delete(cluster.Name)
|
||||
}()
|
||||
}
|
||||
|
||||
func getClusterHealthStatus(clusterClient *util.ClusterClient) (online, healthy bool) {
|
||||
healthStatus, err := healthEndpointCheck(clusterClient.KubeClient, "/readyz")
|
||||
if err != nil && healthStatus == http.StatusNotFound {
|
||||
|
|
|
@ -10,6 +10,11 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
)
|
||||
|
||||
const (
|
||||
// NamespaceClusterLease is the namespace which cluster lease are stored.
|
||||
NamespaceClusterLease = "karmada-cluster"
|
||||
)
|
||||
|
||||
// IsClusterReady tells whether the cluster status in 'Ready' condition.
|
||||
func IsClusterReady(clusterStatus *v1alpha1.ClusterStatus) bool {
|
||||
for _, condition := range clusterStatus.Conditions {
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
coordinationv1 "k8s.io/api/coordination/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
)
|
||||
|
||||
// SetLeaseOwnerFunc helps construct a newLeasePostProcessFunc which sets
|
||||
// a cluster OwnerReference to the given lease object.
|
||||
func SetLeaseOwnerFunc(c client.Client, clusterName string) func(lease *coordinationv1.Lease) error {
|
||||
return func(lease *coordinationv1.Lease) error {
|
||||
// Try to set owner reference every time when renewing the lease if it is not set, until successful.
|
||||
if len(lease.OwnerReferences) == 0 {
|
||||
clusterObj := &clusterv1alpha1.Cluster{}
|
||||
if err := c.Get(context.TODO(), client.ObjectKey{Name: clusterName}, clusterObj); err == nil {
|
||||
lease.OwnerReferences = []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: clusterObj.APIVersion,
|
||||
Kind: clusterObj.Kind,
|
||||
Name: clusterName,
|
||||
UID: clusterObj.UID,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
klog.Errorf("failed to get cluster %q when trying to set owner ref to the cluster lease: %v", clusterName, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue