Merge pull request #1164 from XiShanYongYe-Chang/support-clusterproxy-for-pull

completes ClusterSpec and create impernator secret for pull mode cluster
This commit is contained in:
karmada-bot 2021-12-28 10:44:37 +08:00 committed by GitHub
commit b06925128a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 163 additions and 128 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
kubeclientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
@ -21,6 +22,7 @@ import (
"github.com/karmada-io/karmada/pkg/controllers/execution"
"github.com/karmada-io/karmada/pkg/controllers/mcs"
"github.com/karmada-io/karmada/pkg/controllers/status"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
"github.com/karmada-io/karmada/pkg/karmadactl"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/util"
@ -102,7 +104,7 @@ func run(ctx context.Context, karmadaConfig karmadactl.KarmadaConfig, opts *opti
return err
}
err = registerWithControlPlaneAPIServer(controlPlaneRestConfig, opts.ClusterName)
err = registerWithControlPlaneAPIServer(controlPlaneRestConfig, clusterConfig, opts)
if err != nil {
return fmt.Errorf("failed to register with karmada control plane: %s", err.Error())
}
@ -251,25 +253,74 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error)
return true, nil
}
func registerWithControlPlaneAPIServer(controlPlaneRestConfig *restclient.Config, memberClusterName string) error {
client := gclient.NewForConfigOrDie(controlPlaneRestConfig)
func registerWithControlPlaneAPIServer(controlPlaneRestConfig, clusterRestConfig *restclient.Config, opts *options.Options) error {
controlPlaneKubeClient := kubeclientset.NewForConfigOrDie(controlPlaneRestConfig)
namespaceObj := &corev1.Namespace{}
namespaceObj.Name = util.NamespaceClusterLease
if err := util.CreateNamespaceIfNotExist(client, namespaceObj); err != nil {
if _, err := util.CreateNamespace(controlPlaneKubeClient, namespaceObj); err != nil {
klog.Errorf("Failed to create namespace(%s) object, error: %v", namespaceObj.Name, err)
return err
}
clusterObj := &clusterv1alpha1.Cluster{}
clusterObj.Name = memberClusterName
clusterObj.Spec.SyncMode = clusterv1alpha1.Pull
if err := util.CreateClusterIfNotExist(client, clusterObj); err != nil {
klog.Errorf("Failed to create cluster(%s) object, error: %v", clusterObj.Name, err)
clusterObj, err := generateClusterInControllerPlane(controlPlaneRestConfig, opts)
if err != nil {
return err
}
clusterImpersonatorSecret, err := obtainCredentialsFromMemberCluster(clusterRestConfig, opts)
if err != nil {
return err
}
// create secret to store impersonation info in control plane
impersonationSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: opts.ClusterNamespace,
Name: names.GenerateImpersonationSecretName(opts.ClusterName),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(clusterObj, clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster")),
},
},
Data: map[string][]byte{
clusterv1alpha1.SecretCADataKey: clusterImpersonatorSecret.Data["ca.crt"],
clusterv1alpha1.SecretTokenKey: clusterImpersonatorSecret.Data[clusterv1alpha1.SecretTokenKey],
},
}
_, err = util.CreateSecret(controlPlaneKubeClient, impersonationSecret)
if err != nil {
return fmt.Errorf("failed to create impersonator secret in control plane. error: %v", err)
}
return nil
}
func generateClusterInControllerPlane(controlPlaneRestConfig *restclient.Config, opts *options.Options) (*clusterv1alpha1.Cluster, error) {
clusterObj := &clusterv1alpha1.Cluster{}
clusterObj.Name = opts.ClusterName
clusterObj.Spec.SyncMode = clusterv1alpha1.Pull
clusterObj.Spec.APIEndpoint = opts.ClusterAPIEndpoint
clusterObj.Spec.ProxyURL = opts.ProxyServerAddress
controlPlaneKarmadaClient := karmadaclientset.NewForConfigOrDie(controlPlaneRestConfig)
cluster, err := util.CreateOrUpdateClusterObject(controlPlaneKarmadaClient, clusterObj)
if err != nil {
klog.Errorf("Failed to create cluster(%s) object, error: %v", clusterObj.Name, err)
return nil, err
}
return cluster, nil
}
func obtainCredentialsFromMemberCluster(clusterRestConfig *restclient.Config, opts *options.Options) (*corev1.Secret, error) {
clusterKubeClient := kubeclientset.NewForConfigOrDie(clusterRestConfig)
impersonationSA := &corev1.ServiceAccount{}
impersonationSA.Namespace = opts.ClusterNamespace
impersonationSA.Name = names.GenerateServiceAccountName("impersonator")
impersonatorSecret, err := util.WaitForServiceAccountSecretCreation(clusterKubeClient, impersonationSA)
if err != nil {
return nil, fmt.Errorf("failed to get serviceAccount secret for impersonation from cluster(%s), error: %v", opts.ClusterName, err)
}
return impersonatorSecret, nil
}

View File

@ -47,6 +47,11 @@ type Options struct {
KubeAPIBurst int
ClusterCacheSyncTimeout metav1.Duration
// ClusterAPIEndpoint holds the apiEndpoint of the cluster.
ClusterAPIEndpoint string
// ProxyServerAddress holds the proxy server address that is used to proxy to the cluster.
ProxyServerAddress string
}
// NewOptions builds an default scheduler options.
@ -86,4 +91,6 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string) {
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.DurationVar(&o.ClusterCacheSyncTimeout.Duration, "cluster-cache-sync-timeout", util.CacheSyncTimeout, "Timeout period waiting for cluster cache to sync.")
fs.StringVar(&o.ClusterAPIEndpoint, "cluster-api-endpoint", o.ClusterAPIEndpoint, "APIEndpoint of the cluster.")
fs.StringVar(&o.ProxyServerAddress, "proxy-server-address", o.ProxyServerAddress, "Address of the proxy server that is used to proxy to the cluster.")
}

View File

@ -1,21 +1,17 @@
package karmadactl
import (
"context"
"errors"
"fmt"
"io"
"strings"
"time"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
kubeclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
@ -241,12 +237,12 @@ func obtainCredentialsFromMemberCluster(clusterKubeClient kubeclient.Interface,
return nil, nil, nil
}
clusterSecret, err := waitForServiceAccountSecretCreation(clusterKubeClient, serviceAccountObj, clusterNamespace)
clusterSecret, err := util.WaitForServiceAccountSecretCreation(clusterKubeClient, serviceAccountObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to get serviceAccount secret from cluster(%s), error: %v", clusterName, err)
}
impersonatorSecret, err := waitForServiceAccountSecretCreation(clusterKubeClient, impersonationSA, clusterNamespace)
impersonatorSecret, err := util.WaitForServiceAccountSecretCreation(clusterKubeClient, impersonationSA)
if err != nil {
return nil, nil, fmt.Errorf("failed to get serviceAccount secret for impersonation from cluster(%s), error: %v", clusterName, err)
}
@ -254,32 +250,6 @@ func obtainCredentialsFromMemberCluster(clusterKubeClient kubeclient.Interface,
return clusterSecret, impersonatorSecret, nil
}
func waitForServiceAccountSecretCreation(clusterKubeClient kubeclient.Interface, serviceAccountObj *corev1.ServiceAccount, clusterNamespace string) (*corev1.Secret, error) {
var clusterSecret *corev1.Secret
err := wait.Poll(1*time.Second, 30*time.Second, func() (done bool, err error) {
serviceAccount, err := clusterKubeClient.CoreV1().ServiceAccounts(serviceAccountObj.Namespace).Get(context.TODO(), serviceAccountObj.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, fmt.Errorf("failed to retrieve service account(%s/%s) from cluster, err: %v", serviceAccountObj.Namespace, serviceAccountObj.Name, err)
}
clusterSecret, err = util.GetTargetSecret(clusterKubeClient, serviceAccount.Secrets, corev1.SecretTypeServiceAccountToken, clusterNamespace)
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
})
if err != nil {
return nil, fmt.Errorf("failed to get serviceAccount secret, error: %v", err)
}
return clusterSecret, nil
}
func registerClusterInControllerPlane(opts CommandJoinOption, controlPlaneRestConfig, clusterConfig *rest.Config, controlPlaneKubeClient kubeclient.Interface, clusterSecret, clusterImpersonatorSecret *corev1.Secret) error {
// create secret in control plane
secret := &corev1.Secret{
@ -336,7 +306,7 @@ func registerClusterInControllerPlane(opts CommandJoinOption, controlPlaneRestCo
err = util.PatchSecret(controlPlaneKubeClient, impersonationSecret.Namespace, impersonationSecret.Name, types.MergePatchType, patchSecretBody)
if err != nil {
return fmt.Errorf("failed to patch impersonator secret %s/%s, error: %v", secret.Namespace, secret.Name, err)
return fmt.Errorf("failed to patch impersonator secret %s/%s, error: %v", impersonationSecret.Namespace, impersonationSecret.Name, err)
}
return nil
@ -369,7 +339,7 @@ func generateClusterInControllerPlane(controlPlaneConfig, clusterConfig *rest.Co
}
controlPlaneKarmadaClient := karmadaclientset.NewForConfigOrDie(controlPlaneConfig)
cluster, err := CreateClusterObject(controlPlaneKarmadaClient, clusterObj)
cluster, err := util.CreateClusterObject(controlPlaneKarmadaClient, clusterObj)
if err != nil {
return nil, fmt.Errorf("failed to create cluster(%s) object. error: %v", opts.ClusterName, err)
}
@ -425,51 +395,6 @@ func ensureClusterRoleBindingExist(client kubeclient.Interface, clusterRoleBindi
return createdObj, nil
}
// CreateClusterObject create cluster object in karmada control plane
func CreateClusterObject(controlPlaneClient *karmadaclientset.Clientset, clusterObj *clusterv1alpha1.Cluster) (*clusterv1alpha1.Cluster, error) {
cluster, exist, err := GetCluster(controlPlaneClient, clusterObj.Name)
if err != nil {
return nil, err
}
if exist {
return cluster, fmt.Errorf("cluster(%s) already exist", clusterObj.Name)
}
if cluster, err = CreateCluster(controlPlaneClient, clusterObj); err != nil {
klog.Warningf("failed to create cluster(%s). error: %v", clusterObj.Name, err)
return nil, err
}
return cluster, nil
}
// GetCluster tells if a cluster already joined to control plane.
func GetCluster(client karmadaclientset.Interface, name string) (*clusterv1alpha1.Cluster, bool, error) {
cluster, err := client.ClusterV1alpha1().Clusters().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil, false, nil
}
klog.Warningf("failed to retrieve cluster(%s). error: %v", cluster.Name, err)
return nil, false, err
}
return cluster, true, nil
}
// CreateCluster creates a new cluster object in control plane.
func CreateCluster(controlPlaneClient karmadaclientset.Interface, cluster *clusterv1alpha1.Cluster) (*clusterv1alpha1.Cluster, error) {
newCluster, err := controlPlaneClient.ClusterV1alpha1().Clusters().Create(context.TODO(), cluster, metav1.CreateOptions{})
if err != nil {
klog.Warningf("failed to create cluster(%s). error: %v", cluster.Name, err)
return nil, err
}
return newCluster, nil
}
// buildRoleBindingSubjects will generate a subject as per service account.
// The subject used by RoleBinding or ClusterRoleBinding.
func buildRoleBindingSubjects(serviceAccountName, serviceAccountNamespace string) []rbacv1.Subject {

View File

@ -71,17 +71,9 @@ func (r *ProxyREST) Connect(ctx context.Context, id string, options runtime.Obje
}
func (r *ProxyREST) getImpersonateToken(clusterName string) (string, error) {
cluster, err := r.karmadaClient.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
if err != nil {
return "", err
}
if cluster.Spec.SecretRef == nil {
return "", fmt.Errorf("the secretRef of cluster %s is nil", clusterName)
}
secret, err := r.kubeClient.CoreV1().Secrets(cluster.Spec.SecretRef.Namespace).Get(context.TODO(),
names.GenerateImpersonationSecretName(cluster.Spec.SecretRef.Name), metav1.GetOptions{})
// TODO: add impersonation secretRef to Cluster api to indicate impersonation secret.
secret, err := r.kubeClient.CoreV1().Secrets(names.NamespaceKarmadaCluster).Get(context.TODO(),
names.GenerateImpersonationSecretName(clusterName), metav1.GetOptions{})
if err != nil {
return "", err
}

View File

@ -2,13 +2,18 @@ package util
import (
"context"
"fmt"
"reflect"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
)
const (
@ -30,16 +35,87 @@ func GetCluster(hostClient client.Client, clusterName string) (*clusterv1alpha1.
return cluster, nil
}
// CreateClusterIfNotExist try to create the cluster if it does not exist.
func CreateClusterIfNotExist(client client.Client, clusterObj *clusterv1alpha1.Cluster) error {
cluster := &clusterv1alpha1.Cluster{}
if err := client.Get(context.TODO(), types.NamespacedName{Name: clusterObj.Name}, cluster); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
if err := client.Create(context.TODO(), clusterObj); err != nil {
return err
}
// CreateClusterObject create cluster object in karmada control plane
func CreateClusterObject(controlPlaneClient *karmadaclientset.Clientset, clusterObj *clusterv1alpha1.Cluster) (*clusterv1alpha1.Cluster, error) {
cluster, exist, err := GetClusterWithKarmadaClient(controlPlaneClient, clusterObj.Name)
if err != nil {
return nil, err
}
return nil
if exist {
return cluster, fmt.Errorf("cluster(%s) already exist", clusterObj.Name)
}
if cluster, err = createCluster(controlPlaneClient, clusterObj); err != nil {
klog.Warningf("failed to create cluster(%s). error: %v", clusterObj.Name, err)
return nil, err
}
return cluster, nil
}
// CreateOrUpdateClusterObject create cluster object in karmada control plane,
// if cluster object has been existed and different from input clusterObj, update it.
func CreateOrUpdateClusterObject(controlPlaneClient *karmadaclientset.Clientset, clusterObj *clusterv1alpha1.Cluster) (*clusterv1alpha1.Cluster, error) {
cluster, exist, err := GetClusterWithKarmadaClient(controlPlaneClient, clusterObj.Name)
if err != nil {
return nil, err
}
if exist {
if reflect.DeepEqual(cluster.Spec, clusterObj.Spec) {
klog.Warningf("cluster(%s) already exist and newest", clusterObj.Name)
return cluster, nil
}
cluster.Spec = clusterObj.Spec
cluster, err = updateCluster(controlPlaneClient, cluster)
if err != nil {
klog.Warningf("failed to create cluster(%s). error: %v", clusterObj.Name, err)
return nil, err
}
return cluster, nil
}
if cluster, err = createCluster(controlPlaneClient, clusterObj); err != nil {
klog.Warningf("failed to create cluster(%s). error: %v", clusterObj.Name, err)
return nil, err
}
return cluster, nil
}
// GetClusterWithKarmadaClient tells if a cluster already joined to control plane.
func GetClusterWithKarmadaClient(client karmadaclientset.Interface, name string) (*clusterv1alpha1.Cluster, bool, error) {
cluster, err := client.ClusterV1alpha1().Clusters().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil, false, nil
}
klog.Warningf("failed to retrieve cluster(%s). error: %v", cluster.Name, err)
return nil, false, err
}
return cluster, true, nil
}
func createCluster(controlPlaneClient karmadaclientset.Interface, cluster *clusterv1alpha1.Cluster) (*clusterv1alpha1.Cluster, error) {
newCluster, err := controlPlaneClient.ClusterV1alpha1().Clusters().Create(context.TODO(), cluster, metav1.CreateOptions{})
if err != nil {
klog.Warningf("failed to create cluster(%s). error: %v", cluster.Name, err)
return nil, err
}
return newCluster, nil
}
func updateCluster(controlPlaneClient karmadaclientset.Interface, cluster *clusterv1alpha1.Cluster) (*clusterv1alpha1.Cluster, error) {
newCluster, err := controlPlaneClient.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
if err != nil {
klog.Warningf("failed to update cluster(%s). error: %v", cluster.Name, err)
return nil, err
}
return newCluster, nil
}

View File

@ -7,9 +7,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kubeclient "k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// IsNamespaceExist tells if specific already exists.
@ -49,20 +47,6 @@ func DeleteNamespace(client kubeclient.Interface, namespace string) error {
return nil
}
// CreateNamespaceIfNotExist try to create the namespace if it does not exist.
func CreateNamespaceIfNotExist(client client.Client, namespaceObj *corev1.Namespace) error {
namespace := &corev1.Namespace{}
if err := client.Get(context.TODO(), types.NamespacedName{Name: namespaceObj.Name}, namespace); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
if err := client.Create(context.TODO(), namespaceObj); err != nil {
return err
}
}
return nil
}
// EnsureNamespaceExist makes sure that the specific namespace exist in cluster.
// If namespace not exit, just create it.
func EnsureNamespaceExist(client kubeclient.Interface, namespace string, dryRun bool) (*corev1.Namespace, error) {