Merge pull request #594 from XiShanYongYe-Chang/cluster-api-v4
Add cluster lifecycle management using the cluster-api
This commit is contained in:
commit
90fdd649aa
|
@ -5,12 +5,12 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/dynamic"
|
||||
kubeclientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/klog/v2"
|
||||
controllerruntime "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
|
@ -19,6 +19,7 @@ import (
|
|||
|
||||
"github.com/karmada-io/karmada/cmd/controller-manager/app/options"
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/clusterdiscovery/clusterapi"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/binding"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/cluster"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/execution"
|
||||
|
@ -27,6 +28,7 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/controllers/namespace"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/propagationpolicy"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/status"
|
||||
"github.com/karmada-io/karmada/pkg/karmadactl"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/detector"
|
||||
"github.com/karmada-io/karmada/pkg/util/gclient"
|
||||
|
@ -112,6 +114,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
for _, ns := range opts.SkippedPropagatingNamespaces {
|
||||
skippedPropagatingNamespaces[ns] = struct{}{}
|
||||
}
|
||||
|
||||
resourceDetector := &detector.ResourceDetector{
|
||||
DiscoveryClientSet: discoverClientSet,
|
||||
Client: mgr.GetClient(),
|
||||
|
@ -121,13 +124,12 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
SkippedResourceConfig: skippedResourceConfig,
|
||||
SkippedPropagatingNamespaces: skippedPropagatingNamespaces,
|
||||
}
|
||||
|
||||
resourceDetector.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(resourceDetector.EventFilter, resourceDetector.OnAdd, resourceDetector.OnUpdate, resourceDetector.OnDelete)
|
||||
resourceDetector.Processor = util.NewAsyncWorker("resource detector", time.Microsecond, detector.ClusterWideKeyFunc, resourceDetector.Reconcile)
|
||||
if err := mgr.Add(resourceDetector); err != nil {
|
||||
klog.Fatalf("Failed to setup resource detector: %v", err)
|
||||
}
|
||||
|
||||
setupClusterAPIClusterDetector(mgr, opts)
|
||||
|
||||
clusterController := &cluster.Controller{
|
||||
Client: mgr.GetClient(),
|
||||
EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName),
|
||||
|
@ -281,3 +283,36 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
klog.Fatalf("Failed to setup ServiceImport controller: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// setupClusterAPIClusterDetector initialize Cluster detector with the cluster-api management cluster.
|
||||
func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options.Options) {
|
||||
if len(opts.ClusterAPIKubeconfig) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
klog.Infof("Begin to setup cluster-api cluster detector")
|
||||
|
||||
karmadaConfig := karmadactl.NewKarmadaConfig(clientcmd.NewDefaultPathOptions())
|
||||
clusterAPIRestConfig, err := karmadaConfig.GetRestConfig(opts.ClusterAPIContext, opts.ClusterAPIKubeconfig)
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed to get cluster-api management cluster rest config. context: %s, kubeconfig: %s, err: %v", opts.ClusterAPIContext, opts.ClusterAPIKubeconfig, err)
|
||||
}
|
||||
|
||||
clusterAPIClient, err := gclient.NewForConfig(clusterAPIRestConfig)
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed to get config from clusterAPIRestConfig: %v", err)
|
||||
}
|
||||
|
||||
clusterAPIClusterDetector := &clusterapi.ClusterDetector{
|
||||
KarmadaConfig: karmadaConfig,
|
||||
ControllerPlaneConfig: mgr.GetConfig(),
|
||||
ClusterAPIConfig: clusterAPIRestConfig,
|
||||
ClusterAPIClient: clusterAPIClient,
|
||||
InformerManager: informermanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0),
|
||||
}
|
||||
if err := mgr.Add(clusterAPIClusterDetector); err != nil {
|
||||
klog.Fatalf("Failed to setup cluster-api cluster detector: %v", err)
|
||||
}
|
||||
|
||||
klog.Infof("Success to setup cluster-api cluster detector")
|
||||
}
|
||||
|
|
|
@ -47,6 +47,11 @@ type Options struct {
|
|||
SkippedPropagatingAPIs string
|
||||
// SkippedPropagatingNamespaces is a list of namespaces that will be skipped for propagating.
|
||||
SkippedPropagatingNamespaces []string
|
||||
// ClusterAPIContext is the name of the cluster context in cluster-api management cluster KUBECONFIG file.
|
||||
// Default value is the current-context.
|
||||
ClusterAPIContext string
|
||||
// ClusterAPIKubeconfig holds the cluster-api management cluster KUBECONFIG file path.
|
||||
ClusterAPIKubeconfig string
|
||||
}
|
||||
|
||||
// NewOptions builds an empty options.
|
||||
|
@ -86,4 +91,6 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
|
|||
"<group>/<version>/<kind>,<kind> for skip one or more specific resource(e.g. networking.k8s.io/v1beta1/Ingress,IngressClass) where the kinds are case-insensitive.")
|
||||
flags.StringSliceVar(&o.SkippedPropagatingNamespaces, "skipped-propagating-namespaces", []string{},
|
||||
"Comma-separated namespaces that should be skipped from propagating in addition to the default skipped namespaces(namespaces prefixed by kube- and karmada-).")
|
||||
flags.StringVar(&o.ClusterAPIContext, "cluster-api-context", "", "Name of the cluster context in cluster-api management cluster kubeconfig file.")
|
||||
flags.StringVar(&o.ClusterAPIKubeconfig, "cluster-api-kubeconfig", "", "Path to the cluster-api management cluster kubeconfig file.")
|
||||
}
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
package clusterapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
clusterapiv1alpha4 "sigs.k8s.io/cluster-api/api/v1alpha4"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/karmada-io/karmada/pkg/karmadactl"
|
||||
"github.com/karmada-io/karmada/pkg/karmadactl/options"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager/keys"
|
||||
"github.com/karmada-io/karmada/pkg/util/names"
|
||||
)
|
||||
|
||||
var (
|
||||
clusterGVRs = []schema.GroupVersionResource{
|
||||
{Group: clusterapiv1alpha4.GroupVersion.Group, Version: clusterapiv1alpha4.GroupVersion.Version, Resource: "clusters"},
|
||||
}
|
||||
)
|
||||
|
||||
// ClusterDetector is a cluster watcher which watched cluster object in cluster-api management cluster and reconcile the events.
|
||||
type ClusterDetector struct {
|
||||
KarmadaConfig karmadactl.KarmadaConfig
|
||||
ControllerPlaneConfig *rest.Config
|
||||
ClusterAPIConfig *rest.Config
|
||||
ClusterAPIClient client.Client
|
||||
InformerManager informermanager.SingleClusterInformerManager
|
||||
EventHandler cache.ResourceEventHandler
|
||||
Processor util.AsyncWorker
|
||||
|
||||
stopCh <-chan struct{}
|
||||
}
|
||||
|
||||
// Start runs the detector, never stop until stopCh closed.
|
||||
func (d *ClusterDetector) Start(ctx context.Context) error {
|
||||
klog.Infof("Starting cluster-api cluster detector.")
|
||||
d.stopCh = ctx.Done()
|
||||
|
||||
d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
|
||||
d.Processor = util.NewAsyncWorker("cluster-api cluster detector", time.Second, ClusterWideKeyFunc, d.Reconcile)
|
||||
d.Processor.Run(1, d.stopCh)
|
||||
d.discoveryCluster()
|
||||
|
||||
<-d.stopCh
|
||||
klog.Infof("Stopped as stopCh closed.")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *ClusterDetector) discoveryCluster() {
|
||||
for _, gvr := range clusterGVRs {
|
||||
if !d.InformerManager.IsHandlerExist(gvr, d.EventHandler) {
|
||||
klog.Infof("Setup informer fo %s", gvr.String())
|
||||
d.InformerManager.ForResource(gvr, d.EventHandler)
|
||||
}
|
||||
}
|
||||
|
||||
d.InformerManager.Start(d.stopCh)
|
||||
}
|
||||
|
||||
// OnAdd handles object add event and push the object to queue.
|
||||
func (d *ClusterDetector) OnAdd(obj interface{}) {
|
||||
runtimeObj, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
d.Processor.EnqueueRateLimited(runtimeObj)
|
||||
}
|
||||
|
||||
// OnUpdate handles object update event and push the object to queue.
|
||||
func (d *ClusterDetector) OnUpdate(oldObj, newObj interface{}) {
|
||||
d.OnAdd(newObj)
|
||||
}
|
||||
|
||||
// OnDelete handles object delete event and push the object to queue.
|
||||
func (d *ClusterDetector) OnDelete(obj interface{}) {
|
||||
d.OnAdd(obj)
|
||||
}
|
||||
|
||||
// Reconcile performs a full reconciliation for the object referred to by the key.
|
||||
// The key will be re-queued if an error is non-nil.
|
||||
func (d *ClusterDetector) Reconcile(key util.QueueKey) error {
|
||||
clusterWideKey, ok := key.(keys.ClusterWideKey)
|
||||
if !ok {
|
||||
klog.Errorf("invalid key")
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
klog.Infof("Reconciling cluster-api object: %s", clusterWideKey)
|
||||
|
||||
object, err := d.GetUnstructuredObject(clusterWideKey)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
return d.unJoinClusterAPICluster(clusterWideKey.Name)
|
||||
}
|
||||
klog.Errorf("Failed to get unstructured object(%s), error: %v", clusterWideKey, err)
|
||||
return err
|
||||
}
|
||||
|
||||
clusterPhase, ok, err := unstructured.NestedString(object.Object, "status", "phase")
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to retrieving status phase from cluster: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if ok && clusterPhase == string(clusterapiv1alpha4.ClusterPhaseProvisioned) {
|
||||
return d.joinClusterAPICluster(clusterWideKey.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetUnstructuredObject retrieves object by key and returned its unstructured.
|
||||
func (d *ClusterDetector) GetUnstructuredObject(objectKey keys.ClusterWideKey) (*unstructured.Unstructured, error) {
|
||||
objectGVR := schema.GroupVersionResource{
|
||||
Group: objectKey.Group,
|
||||
Version: objectKey.Version,
|
||||
Resource: "clusters",
|
||||
}
|
||||
|
||||
object, err := d.InformerManager.Lister(objectGVR).Get(objectKey.NamespaceKey())
|
||||
if err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
klog.Errorf("Failed to get object(%s), error: %v", objectKey, err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
uncastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to transform object(%s), error: %v", objectKey, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &unstructured.Unstructured{Object: uncastObj}, nil
|
||||
}
|
||||
|
||||
func (d *ClusterDetector) joinClusterAPICluster(clusterName string) error {
|
||||
klog.Infof("Begin to join cluster-api's Cluster(%s) to karmada", clusterName)
|
||||
|
||||
secret := &corev1.Secret{}
|
||||
secretKey := types.NamespacedName{
|
||||
Namespace: metav1.NamespaceDefault,
|
||||
Name: names.GenerateClusterAPISecretName(clusterName),
|
||||
}
|
||||
err := d.ClusterAPIClient.Get(context.TODO(), secretKey, secret)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
klog.Errorf("Can not found secret(%s): %v", secretKey.String(), err)
|
||||
} else {
|
||||
klog.Errorf("Failed to get secret(%s): %v", secretKey.String(), err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
kubeconfigPath, err := generateKubeconfigFile(clusterName, secret.Data["value"])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clusterRestConfig, err := d.KarmadaConfig.GetRestConfig("", kubeconfigPath)
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed to get cluster-api management cluster rest config. kubeconfig: %s, err: %v", kubeconfigPath, err)
|
||||
}
|
||||
|
||||
err = karmadactl.JoinCluster(d.ControllerPlaneConfig, clusterRestConfig, options.DefaultKarmadaClusterNamespace, clusterName, false)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to join cluster-api's cluster(%s): %v", clusterName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
klog.Infof("End to join cluster-api's Cluster(%s) to karmada", clusterName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *ClusterDetector) unJoinClusterAPICluster(clusterName string) error {
|
||||
klog.Infof("Begin to unJoin cluster-api's Cluster(%s) to karmada", clusterName)
|
||||
err := karmadactl.UnJoinCluster(d.ControllerPlaneConfig, nil, options.DefaultKarmadaClusterNamespace, clusterName, false, false)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to unJoin cluster-api's cluster(%s): %v", clusterName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
klog.Infof("End to unJoin cluster-api's Cluster(%s) to karmada", clusterName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func generateKubeconfigFile(clusterName string, kubeconfigData []byte) (string, error) {
|
||||
kubeconfigPath := fmt.Sprintf("/etc/%s.kubeconfig", clusterName)
|
||||
err := ioutil.WriteFile(kubeconfigPath, kubeconfigData, 0600)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to write File %s: %v", kubeconfigPath, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
return kubeconfigPath, nil
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
package clusterapi
|
||||
|
||||
import (
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager/keys"
|
||||
)
|
||||
|
||||
// ClusterWideKeyFunc generates a ClusterWideKey for object.
|
||||
func ClusterWideKeyFunc(obj interface{}) (util.QueueKey, error) {
|
||||
return keys.ClusterWideKeyFunc(obj)
|
||||
}
|
|
@ -18,6 +18,7 @@ import (
|
|||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
kubeclient "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
|
@ -150,8 +151,6 @@ func (j *CommandJoinOption) AddFlags(flags *pflag.FlagSet) {
|
|||
}
|
||||
|
||||
// RunJoin is the implementation of the 'join' command.
|
||||
//nolint:gocyclo
|
||||
// Note: ignore the cyclomatic complexity issue to get gocyclo on board. Tracked by: https://github.com/karmada-io/karmada/issues/460
|
||||
func RunJoin(cmdOut io.Writer, karmadaConfig KarmadaConfig, opts CommandJoinOption) error {
|
||||
klog.V(1).Infof("joining cluster. cluster name: %s", opts.ClusterName)
|
||||
klog.V(1).Infof("joining cluster. cluster namespace: %s", opts.ClusterNamespace)
|
||||
|
@ -164,33 +163,40 @@ func RunJoin(cmdOut io.Writer, karmadaConfig KarmadaConfig, opts CommandJoinOpti
|
|||
return err
|
||||
}
|
||||
|
||||
controlPlaneKarmadaClient := karmadaclientset.NewForConfigOrDie(controlPlaneRestConfig)
|
||||
controlPlaneKubeClient := kubeclient.NewForConfigOrDie(controlPlaneRestConfig)
|
||||
|
||||
// Get cluster config
|
||||
clusterConfig, err := karmadaConfig.GetRestConfig(opts.ClusterContext, opts.ClusterKubeConfig)
|
||||
if err != nil {
|
||||
klog.V(1).Infof("failed to get joining cluster config. error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return JoinCluster(controlPlaneRestConfig, clusterConfig, opts.ClusterNamespace, opts.ClusterName, opts.DryRun)
|
||||
}
|
||||
|
||||
// JoinCluster join the cluster into karmada.
|
||||
//nolint:gocyclo
|
||||
// Note: ignore the cyclomatic complexity issue to get gocyclo on board. Tracked by: https://github.com/karmada-io/karmada/issues/460
|
||||
func JoinCluster(controlPlaneRestConfig, clusterConfig *rest.Config, clusterNamespace, clusterName string, dryRun bool) (err error) {
|
||||
controlPlaneKarmadaClient := karmadaclientset.NewForConfigOrDie(controlPlaneRestConfig)
|
||||
controlPlaneKubeClient := kubeclient.NewForConfigOrDie(controlPlaneRestConfig)
|
||||
clusterKubeClient := kubeclient.NewForConfigOrDie(clusterConfig)
|
||||
|
||||
klog.V(1).Infof("joining cluster config. endpoint: %s", clusterConfig.Host)
|
||||
|
||||
// ensure namespace where the cluster object be stored exists in control plane.
|
||||
if _, err := ensureNamespaceExist(controlPlaneKubeClient, opts.ClusterNamespace, opts.DryRun); err != nil {
|
||||
if _, err := ensureNamespaceExist(controlPlaneKubeClient, clusterNamespace, dryRun); err != nil {
|
||||
return err
|
||||
}
|
||||
// ensure namespace where the karmada control plane credential be stored exists in cluster.
|
||||
if _, err := ensureNamespaceExist(clusterKubeClient, opts.ClusterNamespace, opts.DryRun); err != nil {
|
||||
if _, err := ensureNamespaceExist(clusterKubeClient, clusterNamespace, dryRun); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create a ServiceAccount in cluster.
|
||||
serviceAccountObj := &corev1.ServiceAccount{}
|
||||
serviceAccountObj.Namespace = opts.ClusterNamespace
|
||||
serviceAccountObj.Name = names.GenerateServiceAccountName(opts.ClusterName)
|
||||
if serviceAccountObj, err = ensureServiceAccountExist(clusterKubeClient, serviceAccountObj, opts.DryRun); err != nil {
|
||||
serviceAccountObj.Namespace = clusterNamespace
|
||||
serviceAccountObj.Name = names.GenerateServiceAccountName(clusterName)
|
||||
if serviceAccountObj, err = ensureServiceAccountExist(clusterKubeClient, serviceAccountObj, dryRun); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -198,7 +204,7 @@ func RunJoin(cmdOut io.Writer, karmadaConfig KarmadaConfig, opts CommandJoinOpti
|
|||
clusterRole := &rbacv1.ClusterRole{}
|
||||
clusterRole.Name = names.GenerateRoleName(serviceAccountObj.Name)
|
||||
clusterRole.Rules = clusterPolicyRules
|
||||
if _, err := ensureClusterRoleExist(clusterKubeClient, clusterRole, opts.DryRun); err != nil {
|
||||
if _, err := ensureClusterRoleExist(clusterKubeClient, clusterRole, dryRun); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -207,11 +213,11 @@ func RunJoin(cmdOut io.Writer, karmadaConfig KarmadaConfig, opts CommandJoinOpti
|
|||
clusterRoleBinding.Name = clusterRole.Name
|
||||
clusterRoleBinding.Subjects = buildRoleBindingSubjects(serviceAccountObj.Name, serviceAccountObj.Namespace)
|
||||
clusterRoleBinding.RoleRef = buildClusterRoleReference(clusterRole.Name)
|
||||
if _, err := ensureClusterRoleBindingExist(clusterKubeClient, clusterRoleBinding, opts.DryRun); err != nil {
|
||||
if _, err := ensureClusterRoleBindingExist(clusterKubeClient, clusterRoleBinding, dryRun); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if opts.DryRun {
|
||||
if dryRun {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -223,7 +229,7 @@ func RunJoin(cmdOut io.Writer, karmadaConfig KarmadaConfig, opts CommandJoinOpti
|
|||
klog.Errorf("Failed to retrieve service account(%s/%s) from cluster. err: %v", serviceAccountObj.Namespace, serviceAccountObj.Name, err)
|
||||
return false, err
|
||||
}
|
||||
clusterSecret, err = util.GetTargetSecret(clusterKubeClient, serviceAccountObj.Secrets, corev1.SecretTypeServiceAccountToken, opts.ClusterNamespace)
|
||||
clusterSecret, err = util.GetTargetSecret(clusterKubeClient, serviceAccountObj.Secrets, corev1.SecretTypeServiceAccountToken, clusterNamespace)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -236,8 +242,8 @@ func RunJoin(cmdOut io.Writer, karmadaConfig KarmadaConfig, opts CommandJoinOpti
|
|||
}
|
||||
|
||||
// create secret in control plane
|
||||
secretNamespace := opts.ClusterNamespace
|
||||
secretName := opts.ClusterName
|
||||
secretNamespace := clusterNamespace
|
||||
secretName := clusterName
|
||||
secret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: secretNamespace,
|
||||
|
@ -255,7 +261,7 @@ func RunJoin(cmdOut io.Writer, karmadaConfig KarmadaConfig, opts CommandJoinOpti
|
|||
}
|
||||
|
||||
clusterObj := &clusterv1alpha1.Cluster{}
|
||||
clusterObj.Name = opts.ClusterName
|
||||
clusterObj.Name = clusterName
|
||||
clusterObj.Spec.SyncMode = clusterv1alpha1.Push
|
||||
clusterObj.Spec.APIEndpoint = clusterConfig.Host
|
||||
clusterObj.Spec.SecretRef = &clusterv1alpha1.LocalSecretReference{
|
||||
|
@ -278,7 +284,7 @@ func RunJoin(cmdOut io.Writer, karmadaConfig KarmadaConfig, opts CommandJoinOpti
|
|||
|
||||
cluster, err := CreateClusterObject(controlPlaneKarmadaClient, clusterObj, false)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to create cluster object. cluster name: %s, error: %v", opts.ClusterName, err)
|
||||
klog.Errorf("failed to create cluster object. cluster name: %s, error: %v", clusterName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
kubeclient "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
|
||||
|
@ -115,46 +116,55 @@ func RunUnjoin(cmdOut io.Writer, karmadaConfig KarmadaConfig, opts CommandUnjoin
|
|||
return err
|
||||
}
|
||||
|
||||
var clusterConfig *rest.Config
|
||||
if opts.ClusterKubeConfig != "" {
|
||||
// Get cluster config
|
||||
clusterConfig, err = karmadaConfig.GetRestConfig(opts.ClusterContext, opts.ClusterKubeConfig)
|
||||
if err != nil {
|
||||
klog.V(1).Infof("failed to get unjoining cluster config. error: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return UnJoinCluster(controlPlaneRestConfig, clusterConfig, opts.ClusterNamespace, opts.ClusterName, opts.forceDeletion, opts.DryRun)
|
||||
}
|
||||
|
||||
// UnJoinCluster unJoin the cluster from karmada.
|
||||
func UnJoinCluster(controlPlaneRestConfig, clusterConfig *rest.Config, clusterNamespace, clusterName string, forceDeletion, dryRun bool) (err error) {
|
||||
controlPlaneKarmadaClient := karmadaclientset.NewForConfigOrDie(controlPlaneRestConfig)
|
||||
|
||||
// delete the cluster object in host cluster that associates the unjoining cluster
|
||||
err = deleteClusterObject(controlPlaneKarmadaClient, opts.ClusterName, opts.DryRun)
|
||||
err = deleteClusterObject(controlPlaneKarmadaClient, clusterName, dryRun)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to delete cluster object. cluster name: %s, error: %v", opts.ClusterName, err)
|
||||
klog.Errorf("Failed to delete cluster object. cluster name: %s, error: %v", clusterName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Attempt to delete the cluster role, cluster rolebindings and service account from the unjoining cluster
|
||||
// if user provides the kubeconfig of cluster
|
||||
if opts.ClusterKubeConfig != "" {
|
||||
// Get cluster config
|
||||
clusterConfig, err := karmadaConfig.GetRestConfig(opts.ClusterContext, opts.ClusterKubeConfig)
|
||||
if err != nil {
|
||||
klog.V(1).Infof("failed to get unjoining cluster config. error: %v", err)
|
||||
return err
|
||||
}
|
||||
if clusterConfig != nil {
|
||||
clusterKubeClient := kubeclient.NewForConfigOrDie(clusterConfig)
|
||||
|
||||
klog.V(1).Infof("unjoining cluster config. endpoint: %s", clusterConfig.Host)
|
||||
|
||||
// delete RBAC resource from unjoining cluster
|
||||
err = deleteRBACResources(clusterKubeClient, opts.ClusterName, opts.forceDeletion, opts.DryRun)
|
||||
err = deleteRBACResources(clusterKubeClient, clusterName, forceDeletion, dryRun)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to delete RBAC resource in unjoining cluster %q: %v", opts.ClusterName, err)
|
||||
klog.Errorf("Failed to delete RBAC resource in unjoining cluster %q: %v", clusterName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// delete service account from unjoining cluster
|
||||
err = deleteServiceAccount(clusterKubeClient, opts.ClusterNamespace, opts.ClusterName, opts.forceDeletion, opts.DryRun)
|
||||
err = deleteServiceAccount(clusterKubeClient, clusterNamespace, clusterName, forceDeletion, dryRun)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to delete service account in unjoining cluster %q: %v", opts.ClusterName, err)
|
||||
klog.Errorf("Failed to delete service account in unjoining cluster %q: %v", clusterName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// delete namespace from unjoining cluster
|
||||
err = deleteNamespaceFromUnjoinCluster(clusterKubeClient, opts.ClusterNamespace, opts.ClusterName, opts.forceDeletion, opts.DryRun)
|
||||
err = deleteNamespaceFromUnjoinCluster(clusterKubeClient, clusterNamespace, clusterName, forceDeletion, dryRun)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to delete namespace in unjoining cluster %q: %v", opts.ClusterName, err)
|
||||
klog.Errorf("Failed to delete namespace in unjoining cluster %q: %v", clusterName, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -132,6 +132,8 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
|
|||
clusterBindingHandler := informermanager.NewHandlerOnEvents(d.OnClusterResourceBindingAdd, d.OnClusterResourceBindingUpdate, d.OnClusterResourceBindingDelete)
|
||||
d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler)
|
||||
|
||||
d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete)
|
||||
d.Processor = util.NewAsyncWorker("resource detector", time.Microsecond, ClusterWideKeyFunc, d.Reconcile)
|
||||
d.Processor.Run(1, d.stopCh)
|
||||
go d.discoverResources(30 * time.Second)
|
||||
|
||||
|
@ -515,7 +517,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
|
|||
func (d *ResourceDetector) GetUnstructuredObject(objectKey keys.ClusterWideKey) (*unstructured.Unstructured, error) {
|
||||
objectGVR, err := restmapper.GetGroupVersionResource(d.RESTMapper, objectKey.GroupVersionKind())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get GVK of object: %s, error: %v", objectKey, err)
|
||||
klog.Errorf("Failed to get GVR of object: %s, error: %v", objectKey, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -87,3 +87,8 @@ func GenerateEndpointSliceName(endpointSliceName string, cluster string) string
|
|||
func GenerateDerivedServiceName(serviceName string) string {
|
||||
return fmt.Sprintf("%s-%s", derivedServicePrefix, serviceName)
|
||||
}
|
||||
|
||||
// GenerateClusterAPISecretName generates the secret name of cluster authentication in cluster-api.
|
||||
func GenerateClusterAPISecretName(clusterName string) string {
|
||||
return fmt.Sprintf("%s-kubeconfig", clusterName)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue