diff --git a/pkg/controllers/membercluster/membercluster_client.go b/pkg/controllers/membercluster/membercluster_client.go new file mode 100644 index 000000000..a23ebd6f3 --- /dev/null +++ b/pkg/controllers/membercluster/membercluster_client.go @@ -0,0 +1,76 @@ +package membercluster + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + + "github.com/huawei-cloudnative/karmada/pkg/apis/membercluster/v1alpha1" +) + +const ( + // kubeAPIQPS is the maximum QPS to the master from this client + kubeAPIQPS = 20.0 + // kubeAPIBurst is the Maximum burst for throttle + kubeAPIBurst = 30 + tokenKey = "token" + cADataKey = "caBundle" +) + +// ClusterClient stands for a ClusterClient for the given member cluster +type ClusterClient struct { + KubeClient *kubeclientset.Clientset + clusterName string +} + +// NewClusterClientSet returns a ClusterClient for the given member cluster. +func NewClusterClientSet(c *v1alpha1.MemberCluster, client kubeclientset.Interface, namespace string) (*ClusterClient, error) { + clusterConfig, err := buildMemberClusterConfig(c, client, namespace) + if err != nil { + return nil, err + } + var clusterClientSet = ClusterClient{clusterName: c.Name} + + if clusterConfig != nil { + clusterClientSet.KubeClient = kubeclientset.NewForConfigOrDie(clusterConfig) + } + return &clusterClientSet, nil +} + +func buildMemberClusterConfig(memberCluster *v1alpha1.MemberCluster, client kubeclientset.Interface, namespace string) (*rest.Config, error) { + clusterName := memberCluster.Name + apiEndpoint := memberCluster.Spec.APIEndpoint + if apiEndpoint == "" { + return nil, fmt.Errorf("the api endpoint of cluster %s is empty", clusterName) + } + + secretName := memberCluster.Spec.SecretRef.Name + if secretName == "" { + return nil, fmt.Errorf("cluster %s does not have a secret name", clusterName) + } + + secret, err := client.CoreV1().Secrets(namespace).Get(context.TODO(), secretName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + token, tokenFound := secret.Data[tokenKey] + if !tokenFound || len(token) == 0 { + return nil, fmt.Errorf("the secret for cluster %s is missing a non-empty value for %q", clusterName, tokenKey) + } + + clusterConfig, err := clientcmd.BuildConfigFromFlags(apiEndpoint, "") + if err != nil { + return nil, err + } + clusterConfig.CAData = secret.Data[cADataKey] + clusterConfig.BearerToken = string(token) + clusterConfig.QPS = kubeAPIQPS + clusterConfig.Burst = kubeAPIBurst + + return clusterConfig, nil +} diff --git a/pkg/controllers/membercluster/membercluster_controller.go b/pkg/controllers/membercluster/membercluster_controller.go index 71e53da79..47cb43b4d 100644 --- a/pkg/controllers/membercluster/membercluster_controller.go +++ b/pkg/controllers/membercluster/membercluster_controller.go @@ -1,11 +1,14 @@ package membercluster import ( + "context" "fmt" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -17,9 +20,10 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "github.com/huawei-cloudnative/karmada/pkg/apis/membercluster/v1alpha1" "github.com/huawei-cloudnative/karmada/pkg/controllers/util" clientset "github.com/huawei-cloudnative/karmada/pkg/generated/clientset/versioned" - multikubecheme "github.com/huawei-cloudnative/karmada/pkg/generated/clientset/versioned/scheme" + karmadakubecheme "github.com/huawei-cloudnative/karmada/pkg/generated/clientset/versioned/scheme" informers "github.com/huawei-cloudnative/karmada/pkg/generated/informers/externalversions" listers "github.com/huawei-cloudnative/karmada/pkg/generated/listers/membercluster/v1alpha1" ) @@ -59,7 +63,7 @@ func StartMemberClusterController(config *util.ControllerConfig, stopChan <-chan klog.Infof("Starting member cluster controller") go wait.Until(func() { - if err := controller.Run(1, stopChan); err != nil { + if err := controller.Run(2, stopChan); err != nil { klog.Errorf("controller exit unexpected! will restart later, controller: %s, error: %v", controllerAgentName, err) } }, 1*time.Second, stopChan) @@ -74,11 +78,11 @@ func newMemberClusterController(config *util.ControllerConfig) (*Controller, err kubeClientSet := kubernetes.NewForConfigOrDie(headClusterConfig) karmadaClientSet := clientset.NewForConfigOrDie(headClusterConfig) - karmadaInformerFactory := informers.NewSharedInformerFactory(karmadaClientSet, 0) + karmadaInformerFactory := informers.NewSharedInformerFactory(karmadaClientSet, 10*time.Second) memberclusterInformer := karmadaInformerFactory.Membercluster().V1alpha1().MemberClusters() - // Add multikube types to the default Kubernetes Scheme so Events can be logged for karmada types. - utilruntime.Must(multikubecheme.AddToScheme(scheme.Scheme)) + // Add karmada types to the default Kubernetes Scheme so Events can be logged for karmada types. + utilruntime.Must(karmadakubecheme.AddToScheme(scheme.Scheme)) // Create event broadcaster klog.V(1).Infof("Creating event broadcaster for %s", controllerAgentName) @@ -106,7 +110,29 @@ func newMemberClusterController(config *util.ControllerConfig) (*Controller, err controller.enqueueEventResource(new) }, DeleteFunc: func(obj interface{}) { - klog.Infof("Received delete event. Do nothing just log.") + castObj, ok := obj.(*v1alpha1.MemberCluster) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + castObj, ok = tombstone.Obj.(*v1alpha1.MemberCluster) + if !ok { + klog.Errorf("Tombstone contained object that is not expected %#v", obj) + return + } + } + + // TODO: postpone delete namespace if there is any work object which should be deleted by binding controller. + + // delete member cluster workspace when member cluster unjoined + if err := controller.kubeClientSet.CoreV1().Namespaces().Delete(context.TODO(), castObj.Name, v1.DeleteOptions{}); err != nil { + if !apierrors.IsNotFound(err) { + klog.Errorf("Error while deleting namespace %s: %s", castObj.Name, err) + return + } + } }, }) @@ -229,6 +255,36 @@ func (c *Controller) syncHandler(key string) error { return err } + // create member cluster workspace when member cluster joined + _, err = c.kubeClientSet.CoreV1().Namespaces().Get(context.TODO(), membercluster.Name, v1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + memberclusterNS := &corev1.Namespace{ + ObjectMeta: v1.ObjectMeta{ + Name: membercluster.Name, + }, + } + _, err = c.kubeClientSet.CoreV1().Namespaces().Create(context.TODO(), memberclusterNS, v1.CreateOptions{}) + if err != nil { + + return err + } + } else { + klog.V(2).Infof("Could not get %s namespace: %v", membercluster.Name, err) + return err + } + } + + // create a ClusterClient for the given member cluster + memberclusterClient, err := NewClusterClientSet(membercluster, c.kubeClientSet, membercluster.Spec.SecretRef.Namespace) + if err != nil { + c.eventRecorder.Eventf(membercluster, corev1.EventTypeWarning, "MalformedClusterConfig", err.Error()) + return err + } + + // update status of the given member cluster + updateIndividualClusterStatus(membercluster, c.karmadaClientSet, memberclusterClient) + klog.Infof("Sync member cluster: %s/%s", membercluster.Namespace, membercluster.Name) return nil diff --git a/pkg/controllers/membercluster/membercluster_status.go b/pkg/controllers/membercluster/membercluster_status.go new file mode 100644 index 000000000..a3dfe59e8 --- /dev/null +++ b/pkg/controllers/membercluster/membercluster_status.go @@ -0,0 +1,110 @@ +package membercluster + +import ( + "context" + "strings" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + + "github.com/huawei-cloudnative/karmada/pkg/apis/membercluster/v1alpha1" + clientset "github.com/huawei-cloudnative/karmada/pkg/generated/clientset/versioned" +) + +const ( + clusterReady = "ClusterReady" + healthzOk = "/healthz responded with ok" + clusterNotReady = "ClusterNotReady" + healthzNotOk = "/healthz responded without ok" + clusterNotReachableReason = "ClusterNotReachable" + clusterNotReachableMsg = "cluster is not reachable" + clusterReachableReason = "ClusterReachable" + clusterReachableMsg = "cluster is reachable" + clusterOffline = "Offline" +) + +func updateIndividualClusterStatus(cluster *v1alpha1.MemberCluster, hostClient clientset.Interface, clusterClient *ClusterClient) { + // update the health status of member cluster + currentClusterStatus, err := getMemberClusterHealthStatus(clusterClient) + if err != nil { + klog.Warningf("Failed to get health status of the member cluster: %v, err is : %v", cluster.Name, err) + cluster.Status = *currentClusterStatus + _, err = hostClient.MemberclusterV1alpha1().MemberClusters("karmada-cluster").Update(context.TODO(), cluster, v1.UpdateOptions{}) + if err != nil { + klog.Warningf("Failed to update health status of the member cluster: %v, err is : %v", cluster.Name, err) + return + } + return + } + + // update the cluster version of member cluster + clusterVersion, err := clusterClient.KubeClient.Discovery().ServerVersion() + if err != nil { + klog.Warningf("Failed to get server version of the member cluster: %v, err is : %v", cluster.Name, err) + } + + currentClusterStatus.KubernetesVersion = clusterVersion.GitVersion + cluster.Status = *currentClusterStatus + _, err = hostClient.MemberclusterV1alpha1().MemberClusters("karmada-cluster").Update(context.TODO(), cluster, v1.UpdateOptions{}) + if err != nil { + klog.Warningf("Failed to update health status of the member cluster: %v, err is : %v", cluster.Name, err) + return + } +} + +func getMemberClusterHealthStatus(clusterClient *ClusterClient) (*v1alpha1.MemberClusterStatus, error) { + clusterStatus := v1alpha1.MemberClusterStatus{} + currentTime := v1.Now() + clusterReady := clusterReady + healthzOk := healthzOk + newClusterReadyCondition := v1.Condition{ + Type: clusterReady, + Status: v1.ConditionTrue, + Reason: clusterReady, + Message: healthzOk, + LastTransitionTime: currentTime, + } + + clusterNotReady := clusterNotReady + healthzNotOk := healthzNotOk + newClusterNotReadyCondition := v1.Condition{ + Type: clusterReady, + Status: v1.ConditionFalse, + Reason: clusterNotReady, + Message: healthzNotOk, + LastTransitionTime: currentTime, + } + + clusterNotReachableReason := clusterNotReachableReason + clusterNotReachableMsg := clusterNotReachableMsg + newClusterOfflineCondition := v1.Condition{ + Type: clusterOffline, + Status: v1.ConditionTrue, + Reason: clusterNotReachableReason, + Message: clusterNotReachableMsg, + LastTransitionTime: currentTime, + } + clusterReachableReason := clusterReachableReason + clusterReachableMsg := clusterReachableMsg + newClusterNotOfflineCondition := v1.Condition{ + Type: clusterOffline, + Status: v1.ConditionFalse, + Reason: clusterReachableReason, + Message: clusterReachableMsg, + LastTransitionTime: currentTime, + } + + body, err := clusterClient.KubeClient.DiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).Raw() + if err != nil { + klog.Warningf("Failed to do cluster health check for cluster %v, err is : %v ", clusterClient.clusterName, err) + clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterOfflineCondition) + } else { + if !strings.EqualFold(string(body), "ok") { + clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterNotReadyCondition, newClusterNotOfflineCondition) + } else { + clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterReadyCondition) + } + } + + return &clusterStatus, err +}