package framework import ( "context" "fmt" "os" "strings" "time" "github.com/onsi/gomega" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" karmada "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" "github.com/karmada-io/karmada/pkg/util" ) const ( // MinimumCluster represents the minimum number of member clusters to run E2E test. MinimumCluster = 2 ) var ( clusters []*clusterv1alpha1.Cluster clusterNames []string clusterClients []*util.ClusterClient clusterDynamicClients []*util.DynamicClusterClient pullModeClusters map[string]string ) // Clusters will return all member clusters we have. func Clusters() []*clusterv1alpha1.Cluster { return clusters } // ClusterNames will return all member clusters' names we have. func ClusterNames() []string { return clusterNames } // InitClusterInformation init the E2E test's cluster information. func InitClusterInformation(karmadaClient karmada.Interface, controlPlaneClient client.Client) { var err error pullModeClusters, err = fetchPullBasedClusters() gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) clusters, err = fetchClusters(karmadaClient) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) var meetRequirement bool meetRequirement, err = isClusterMeetRequirements(clusters) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) gomega.Expect(meetRequirement).Should(gomega.BeTrue()) for _, cluster := range clusters { clusterClient, clusterDynamicClient, err := newClusterClientSet(controlPlaneClient, cluster) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) clusterNames = append(clusterNames, cluster.Name) clusterClients = append(clusterClients, clusterClient) clusterDynamicClients = append(clusterDynamicClients, clusterDynamicClient) err = setClusterLabel(controlPlaneClient, cluster.Name) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) } gomega.Expect(clusterNames).Should(gomega.HaveLen(len(clusters))) } // GetClusterClient get cluster client func GetClusterClient(clusterName string) kubernetes.Interface { for _, clusterClient := range clusterClients { if clusterClient.ClusterName == clusterName { return clusterClient.KubeClient } } return nil } // GetClusterDynamicClient get cluster dynamicClient func GetClusterDynamicClient(clusterName string) dynamic.Interface { for _, clusterClient := range clusterDynamicClients { if clusterClient.ClusterName == clusterName { return clusterClient.DynamicClientSet } } return nil } func fetchPullBasedClusters() (map[string]string, error) { pullBasedClusters := os.Getenv("PULL_BASED_CLUSTERS") if pullBasedClusters == "" { return nil, nil } pullBasedClustersMap := make(map[string]string) pullBasedClusters = strings.TrimSuffix(pullBasedClusters, ";") clusterInfo := strings.Split(pullBasedClusters, ";") for _, cluster := range clusterInfo { clusterNameAndConfigPath := strings.Split(cluster, ":") if len(clusterNameAndConfigPath) != 2 { return nil, fmt.Errorf("failed to parse config path for cluster: %s", cluster) } pullBasedClustersMap[clusterNameAndConfigPath[0]] = clusterNameAndConfigPath[1] } return pullBasedClustersMap, nil } // fetchClusters will fetch all member clusters we have. func fetchClusters(client karmada.Interface) ([]*clusterv1alpha1.Cluster, error) { clusterList, err := client.ClusterV1alpha1().Clusters().List(context.TODO(), metav1.ListOptions{}) if err != nil { return nil, err } clusters := make([]*clusterv1alpha1.Cluster, 0, len(clusterList.Items)) for _, cluster := range clusterList.Items { pinedCluster := cluster if pinedCluster.Spec.SyncMode == clusterv1alpha1.Pull { if _, exist := pullModeClusters[cluster.Name]; !exist { continue } } clusters = append(clusters, &pinedCluster) } return clusters, nil } // fetchCluster will fetch member cluster by name. func fetchCluster(client karmada.Interface, clusterName string) (*clusterv1alpha1.Cluster, error) { cluster, err := client.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{}) if err != nil { return nil, err } return cluster, nil } // isClusterMeetRequirements checks if current environment meet the requirements of E2E. func isClusterMeetRequirements(clusters []*clusterv1alpha1.Cluster) (bool, error) { // check if member cluster number meets requirements if len(clusters) < MinimumCluster { return false, fmt.Errorf("needs at lease %d member cluster to run, but got: %d", MinimumCluster, len(clusters)) } // check if all member cluster status is ready for _, cluster := range clusters { if !util.IsClusterReady(&cluster.Status) { return false, fmt.Errorf("cluster %s not ready", cluster.GetName()) } } klog.Infof("Got %d member cluster and all in ready state.", len(clusters)) return true, nil } func newClusterClientSet(controlPlaneClient client.Client, c *clusterv1alpha1.Cluster) (*util.ClusterClient, *util.DynamicClusterClient, error) { if c.Spec.SyncMode == clusterv1alpha1.Push { clusterClient, err := util.NewClusterClientSet(c.Name, controlPlaneClient, nil) if err != nil { return nil, nil, err } clusterDynamicClient, err := util.NewClusterDynamicClientSet(c.Name, controlPlaneClient) if err != nil { return nil, nil, err } return clusterClient, clusterDynamicClient, nil } clusterConfigPath := pullModeClusters[c.Name] clusterConfig, err := clientcmd.BuildConfigFromFlags("", clusterConfigPath) if err != nil { return nil, nil, err } clusterClientSet := util.ClusterClient{ClusterName: c.Name} clusterDynamicClientSet := util.DynamicClusterClient{ClusterName: c.Name} clusterClientSet.KubeClient = kubernetes.NewForConfigOrDie(clusterConfig) clusterDynamicClientSet.DynamicClientSet = dynamic.NewForConfigOrDie(clusterConfig) return &clusterClientSet, &clusterDynamicClientSet, nil } // setClusterLabel set cluster label of E2E func setClusterLabel(c client.Client, clusterName string) error { err := wait.PollImmediate(2*time.Second, 10*time.Second, func() (done bool, err error) { clusterObj := &clusterv1alpha1.Cluster{} if err := c.Get(context.TODO(), client.ObjectKey{Name: clusterName}, clusterObj); err != nil { if apierrors.IsConflict(err) { return false, nil } return false, err } if clusterObj.Labels == nil { clusterObj.Labels = make(map[string]string) } clusterObj.Labels["location"] = "CHN" if clusterObj.Spec.SyncMode == clusterv1alpha1.Push { clusterObj.Labels["sync-mode"] = "Push" } if err := c.Update(context.TODO(), clusterObj); err != nil { if apierrors.IsConflict(err) { return false, nil } return false, err } return true, nil }) return err } // GetClusterNamesFromClusters will get Clusters' names form Clusters Object. func GetClusterNamesFromClusters(clusters []*clusterv1alpha1.Cluster) []string { clusterNames := make([]string, 0, len(clusters)) for _, cluster := range clusters { clusterNames = append(clusterNames, cluster.Name) } return clusterNames } // WaitClusterFitWith wait cluster fit with fit func. func WaitClusterFitWith(c client.Client, clusterName string, fit func(cluster *clusterv1alpha1.Cluster) bool) { gomega.Eventually(func() (bool, error) { currentCluster, err := util.GetCluster(c, clusterName) if err != nil { return false, err } return fit(currentCluster), nil }, pollTimeout, pollInterval).Should(gomega.Equal(true)) }