/* Copyright 2021 The Karmada Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package e2e import ( "context" "encoding/json" "fmt" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/util" utilhelper "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/test/e2e/framework" "github.com/karmada-io/karmada/test/helper" ) // BasicPropagation focus on basic propagation functionality testing. var _ = ginkgo.Describe("propagation with label and group constraints testing", func() { ginkgo.Context("Deployment propagation testing", func() { var groupMatchedClusters []string var targetClusterNames []string var policyNamespace, policyName string var deploymentNamespace, deploymentName string var deployment *appsv1.Deployment var maxGroups, minGroups int var policy *policyv1alpha1.PropagationPolicy ginkgo.BeforeEach(func() { policyNamespace = testNamespace policyName = deploymentNamePrefix + rand.String(RandomStrLength) deploymentNamespace = testNamespace deploymentName = policyName deployment = helper.NewDeployment(deploymentNamespace, deploymentName) maxGroups = rand.Intn(2) + 1 minGroups = maxGroups // set MaxGroups=MinGroups=1 or 2, label is location=CHN. policy = helper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{ { APIVersion: deployment.APIVersion, Kind: deployment.Kind, Name: deployment.Name, }, }, policyv1alpha1.Placement{ ClusterAffinity: &policyv1alpha1.ClusterAffinity{ LabelSelector: &metav1.LabelSelector{ MatchLabels: clusterLabels, }, }, SpreadConstraints: []policyv1alpha1.SpreadConstraint{ { SpreadByField: policyv1alpha1.SpreadByFieldCluster, MaxGroups: maxGroups, MinGroups: minGroups, }, }, }) }) ginkgo.BeforeEach(func() { framework.CreatePropagationPolicy(karmadaClient, policy) framework.CreateDeployment(kubeClient, deployment) ginkgo.DeferCleanup(func() { framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name) framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name) }) }) ginkgo.It("Deployment propagation with label and group constraints testing", func() { ginkgo.By("collect the target clusters in resource binding", func() { targetClusterNames = framework.ExtractTargetClustersFrom(controlPlaneClient, deployment) gomega.Expect(len(targetClusterNames) == minGroups).ShouldNot(gomega.BeFalse()) }) ginkgo.By("check if the scheduled condition is true", func() { gomega.Eventually(func(g gomega.Gomega) (bool, error) { rb, err := getResourceBinding(deployment) g.Expect(err).ShouldNot(gomega.HaveOccurred()) return meta.IsStatusConditionTrue(rb.Status.Conditions, workv1alpha2.Scheduled), nil }, pollTimeout, pollInterval).Should(gomega.Equal(true)) }) ginkgo.By("check if deployment present on right clusters", func() { for _, targetClusterName := range targetClusterNames { framework.WaitDeploymentPresentOnClusterFitWith(targetClusterName, deployment.Namespace, deployment.Name, func(*appsv1.Deployment) bool { return true }) groupMatchedClusters = append(groupMatchedClusters, targetClusterName) } fmt.Printf("there are %d target clusters\n", len(groupMatchedClusters)) gomega.Expect(minGroups == len(groupMatchedClusters)).ShouldNot(gomega.BeFalse()) }) framework.UpdateDeploymentReplicas(kubeClient, deployment, updateDeploymentReplicas) framework.WaitDeploymentPresentOnClustersFitWith(groupMatchedClusters, deployment.Namespace, deployment.Name, func(deployment *appsv1.Deployment) bool { return *deployment.Spec.Replicas == updateDeploymentReplicas }) }) }) ginkgo.Context("CustomResourceDefinition propagation testing", func() { var groupMatchedClusters []*clusterv1alpha1.Cluster var targetClusterNames []string var crdGroup string var randStr string var crdSpecNames apiextensionsv1.CustomResourceDefinitionNames var crd *apiextensionsv1.CustomResourceDefinition var maxGroups, minGroups int var crdPolicy *policyv1alpha1.ClusterPropagationPolicy var crdGVR schema.GroupVersionResource ginkgo.BeforeEach(func() { crdGroup = fmt.Sprintf("example-%s.karmada.io", rand.String(RandomStrLength)) randStr = rand.String(RandomStrLength) crdSpecNames = apiextensionsv1.CustomResourceDefinitionNames{ Kind: fmt.Sprintf("Foo%s", randStr), ListKind: fmt.Sprintf("Foo%sList", randStr), Plural: fmt.Sprintf("foo%ss", randStr), Singular: fmt.Sprintf("foo%s", randStr), } crd = helper.NewCustomResourceDefinition(crdGroup, crdSpecNames, apiextensionsv1.NamespaceScoped) maxGroups = rand.Intn(2) + 1 minGroups = maxGroups // set MaxGroups=MinGroups=1 or 2, label is location=CHN. crdPolicy = helper.NewClusterPropagationPolicy(crd.Name, []policyv1alpha1.ResourceSelector{ { APIVersion: crd.APIVersion, Kind: crd.Kind, Name: crd.Name, }, }, policyv1alpha1.Placement{ ClusterAffinity: &policyv1alpha1.ClusterAffinity{ LabelSelector: &metav1.LabelSelector{ MatchLabels: clusterLabels, }, }, SpreadConstraints: []policyv1alpha1.SpreadConstraint{ { SpreadByField: policyv1alpha1.SpreadByFieldCluster, MaxGroups: maxGroups, MinGroups: minGroups, }, }, }) crdGVR = schema.GroupVersionResource{Group: "apiextensions.k8s.io", Version: "v1", Resource: "customresourcedefinitions"} }) ginkgo.BeforeEach(func() { framework.CreateClusterPropagationPolicy(karmadaClient, crdPolicy) framework.CreateCRD(dynamicClient, crd) framework.GetCRD(dynamicClient, crd.Name) ginkgo.DeferCleanup(func() { framework.RemoveCRD(dynamicClient, crd.Name) framework.WaitCRDDisappearedOnClusters(framework.GetClusterNamesFromClusters(groupMatchedClusters), crd.Name) framework.RemoveClusterPropagationPolicy(karmadaClient, crdPolicy.Name) }) }) ginkgo.It("CRD with specified label and group constraints propagation testing", func() { ginkgo.By("collect the target clusters in cluster resource binding", func() { bindingName := names.GenerateBindingName(crd.Kind, crd.Name) fmt.Printf("crd kind is %s, name is %s\n", crd.Kind, crd.Name) binding := &workv1alpha2.ClusterResourceBinding{} fmt.Printf("MaxGroups= %v, MinGroups= %v\n", maxGroups, minGroups) gomega.Eventually(func() int { err := controlPlaneClient.Get(context.TODO(), client.ObjectKey{Name: bindingName}, binding) if err != nil { return -1 } return len(binding.Spec.Clusters) }, pollTimeout, pollInterval).Should(gomega.Equal(minGroups)) for _, cluster := range binding.Spec.Clusters { targetClusterNames = append(targetClusterNames, cluster.Name) } fmt.Printf("target clusters in cluster resource binding are %s\n", targetClusterNames) }) ginkgo.By("check if the scheduled condition is true", func() { gomega.Eventually(func(g gomega.Gomega) (bool, error) { crb, err := getClusterResourceBinding(crd) g.Expect(err).ShouldNot(gomega.HaveOccurred()) return meta.IsStatusConditionTrue(crb.Status.Conditions, workv1alpha2.Scheduled), nil }, pollTimeout, pollInterval).Should(gomega.Equal(true)) }) ginkgo.By("check if crd present on right clusters", func() { for _, targetClusterName := range targetClusterNames { clusterDynamicClient := framework.GetClusterDynamicClient(targetClusterName) gomega.Expect(clusterDynamicClient).ShouldNot(gomega.BeNil()) klog.Infof("Waiting for crd(%s) present on cluster(%s)", crd.Name, targetClusterName) err := wait.PollUntilContextTimeout(context.TODO(), pollInterval, pollTimeout, true, func(ctx context.Context) (done bool, err error) { _, err = clusterDynamicClient.Resource(crdGVR).Namespace(crd.Namespace).Get(ctx, crd.Name, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { return false, nil } return false, err } targetCluster, _ := util.GetCluster(controlPlaneClient, targetClusterName) groupMatchedClusters = append(groupMatchedClusters, targetCluster) fmt.Printf("CRD(%s) is present on cluster(%s).\n", crd.Name, targetClusterName) return true, nil }) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) } fmt.Printf("there are %d target clusters\n", len(groupMatchedClusters)) gomega.Expect(minGroups == len(groupMatchedClusters)).ShouldNot(gomega.BeFalse()) }) }) }) ginkgo.Context("Job propagation testing", func() { var groupMatchedClusters []string var targetClusterNames []string var policyNamespace, policyName string var jobNamespace, jobName string var job *batchv1.Job var maxGroups, minGroups int var policy *policyv1alpha1.PropagationPolicy ginkgo.BeforeEach(func() { policyNamespace = testNamespace policyName = jobNamePrefix + rand.String(RandomStrLength) jobNamespace = testNamespace jobName = policyName job = helper.NewJob(jobNamespace, jobName) maxGroups = rand.Intn(2) + 1 minGroups = maxGroups // set MaxGroups=MinGroups=1 or 2, label is location=CHN. policy = helper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{ { APIVersion: job.APIVersion, Kind: job.Kind, Name: job.Name, }, }, policyv1alpha1.Placement{ ClusterAffinity: &policyv1alpha1.ClusterAffinity{ LabelSelector: &metav1.LabelSelector{ MatchLabels: clusterLabels, }, }, SpreadConstraints: []policyv1alpha1.SpreadConstraint{ { SpreadByField: policyv1alpha1.SpreadByFieldCluster, MaxGroups: maxGroups, MinGroups: minGroups, }, }, }) }) ginkgo.BeforeEach(func() { framework.CreatePropagationPolicy(karmadaClient, policy) framework.CreateJob(kubeClient, job) framework.GetJob(kubeClient, job.Namespace, job.Name) ginkgo.DeferCleanup(func() { framework.RemoveJob(kubeClient, job.Namespace, job.Name) framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name) }) }) ginkgo.It("Job propagation with label and group constraints testing", func() { ginkgo.By("collect the target clusters in resource binding", func() { bindingName := names.GenerateBindingName(job.Kind, job.Name) binding := &workv1alpha2.ResourceBinding{} fmt.Printf("MaxGroups= %v, MinGroups= %v\n", maxGroups, minGroups) gomega.Eventually(func() int { err := controlPlaneClient.Get(context.TODO(), client.ObjectKey{Namespace: policyNamespace, Name: bindingName}, binding) if err != nil { return -1 } return len(binding.Spec.Clusters) }, pollTimeout, pollInterval).Should(gomega.Equal(minGroups)) for _, cluster := range binding.Spec.Clusters { targetClusterNames = append(targetClusterNames, cluster.Name) } fmt.Printf("target clusters in cluster resource binding are %s\n", targetClusterNames) }) ginkgo.By("check if the scheduled condition is true", func() { gomega.Eventually(func(g gomega.Gomega) (bool, error) { rb, err := getResourceBinding(job) g.Expect(err).ShouldNot(gomega.HaveOccurred()) return meta.IsStatusConditionTrue(rb.Status.Conditions, workv1alpha2.Scheduled), nil }, pollTimeout, pollInterval).Should(gomega.Equal(true)) }) ginkgo.By("check if job present on right clusters", func() { for _, targetClusterName := range targetClusterNames { framework.WaitJobPresentOnClusterFitWith(targetClusterName, job.Namespace, job.Name, func(*batchv1.Job) bool { return true }) groupMatchedClusters = append(groupMatchedClusters, targetClusterName) } fmt.Printf("there are %d target clusters\n", len(groupMatchedClusters)) gomega.Expect(minGroups == len(groupMatchedClusters)).ShouldNot(gomega.BeFalse()) }) patch := map[string]interface{}{"spec": map[string]interface{}{"parallelism": pointer.Int32(updateParallelism)}} bytes, err := json.Marshal(patch) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) framework.UpdateJobWithPatchBytes(kubeClient, job.Namespace, job.Name, bytes, types.StrategicMergePatchType) framework.WaitJobPresentOnClustersFitWith(groupMatchedClusters, job.Namespace, job.Name, func(job *batchv1.Job) bool { return *job.Spec.Parallelism == updateParallelism }) }) }) }) /* ReplicaScheduling focus on dealing with the number of replicas testing when propagating resources that have replicas in spec (e.g. deployments, statefulsets) to member clusters with ReplicaSchedulingStrategy. Test Case Overview: Case 1: `ReplicaSchedulingType` value is `Duplicated`. Case 2: `ReplicaSchedulingType` value is `Duplicated`, trigger rescheduling when replicas have changed. Case 3: `ReplicaSchedulingType` value is `Divided`, `ReplicaDivisionPreference` value is `Weighted`, `WeightPreference` is nil. Case 4: `ReplicaSchedulingType` value is `Divided`, `ReplicaDivisionPreference` value is `Weighted`, `WeightPreference` is nil, trigger rescheduling when replicas have changed. Case 5: `ReplicaSchedulingType` value is `Divided`, `ReplicaDivisionPreference` value is `Weighted`, `WeightPreference` isn't nil. Case 6: `ReplicaSchedulingType` value is `Divided`, `ReplicaDivisionPreference` value is `Weighted`, `WeightPreference` isn't nil, trigger rescheduling when replicas have changed. */ var _ = ginkgo.Describe("[ReplicaScheduling] ReplicaSchedulingStrategy testing", func() { var policyNamespace, policyName string var deploymentNamespace, deploymentName string var deployment *appsv1.Deployment var policy *policyv1alpha1.PropagationPolicy ginkgo.BeforeEach(func() { policyNamespace = testNamespace policyName = deploymentNamePrefix + rand.String(RandomStrLength) deploymentNamespace = policyNamespace deploymentName = policyName deployment = helper.NewDeployment(deploymentNamespace, deploymentName) policy = helper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{ { APIVersion: deployment.APIVersion, Kind: deployment.Kind, Name: deployment.Name, }, }, policyv1alpha1.Placement{ ClusterAffinity: &policyv1alpha1.ClusterAffinity{ ClusterNames: framework.ClusterNames(), }, ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{ ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDuplicated, }, }) }) ginkgo.JustBeforeEach(func() { framework.CreatePropagationPolicy(karmadaClient, policy) framework.CreateDeployment(kubeClient, deployment) ginkgo.DeferCleanup(func() { framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name) framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name) }) }) // Case 1: `ReplicaSchedulingType` value is `Duplicated`. ginkgo.Context("ReplicaSchedulingType is Duplicated.", func() { ginkgo.It("replicas duplicated testing", func() { klog.Infof("check if deployment's replicas are duplicate on member clusters") framework.WaitDeploymentPresentOnClustersFitWith(framework.ClusterNames(), deployment.Namespace, deployment.Name, func(deploy *appsv1.Deployment) bool { klog.Infof("Deployment(%s/%s)'s replicas is %d, expected: %d.", deploy.Namespace, deploy.Name, *deploy.Spec.Replicas, *deployment.Spec.Replicas) return *deploy.Spec.Replicas == *deployment.Spec.Replicas }) }) }) // Case 2: `ReplicaSchedulingType` value is `Duplicated`, trigger rescheduling when replicas have changed. ginkgo.Context("ReplicaSchedulingType is Duplicated, trigger rescheduling when replicas have changed.", func() { ginkgo.It("replicas duplicated testing when rescheduling", func() { klog.Infof("make sure deployment has been propagated to member clusters") framework.WaitDeploymentPresentOnClustersFitWith(framework.ClusterNames(), deployment.Namespace, deployment.Name, func(*appsv1.Deployment) bool { return true }) framework.UpdateDeploymentReplicas(kubeClient, deployment, updateDeploymentReplicas) klog.Infof("check if deployment's replicas have been updated on member clusters") framework.WaitDeploymentPresentOnClustersFitWith(framework.ClusterNames(), deployment.Namespace, deployment.Name, func(deploy *appsv1.Deployment) bool { klog.Infof("Deployment(%s/%s)'s replicas is %d, expected: %d.", deploy.Namespace, deploy.Name, *deploy.Spec.Replicas, updateDeploymentReplicas) return *deploy.Spec.Replicas == updateDeploymentReplicas }) }) }) // Case 3: `ReplicaSchedulingType` value is `Divided`, `ReplicaDivisionPreference` value is `Weighted`, // `WeightPreference` is nil. ginkgo.Context("ReplicaSchedulingType is Divided, ReplicaDivisionPreference is Weighted, WeightPreference is nil.", func() { var expectedReplicas int32 ginkgo.BeforeEach(func() { policy.Spec.Placement.ReplicaScheduling = &policyv1alpha1.ReplicaSchedulingStrategy{ ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided, ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, } expectedReplicas = int32(2) updateReplicas := expectedReplicas * int32(len(framework.Clusters())) deployment.Spec.Replicas = &updateReplicas }) ginkgo.It("replicas divided and weighted testing", func() { klog.Infof("check if deployment's replicas are divided equally on member clusters") framework.WaitDeploymentPresentOnClustersFitWith(framework.ClusterNames(), deployment.Namespace, deployment.Name, func(deploy *appsv1.Deployment) bool { klog.Infof("Deployment(%s/%s)'s replicas is %d, expected: %d.", deploy.Namespace, deploy.Name, *deploy.Spec.Replicas, expectedReplicas) return *deploy.Spec.Replicas == expectedReplicas }) }) }) // Case 4: `ReplicaSchedulingType` value is `Divided`, `ReplicaDivisionPreference` value is `Weighted`, // `WeightPreference` is nil, trigger rescheduling when replicas have changed. ginkgo.Context("ReplicaSchedulingType is Divided, ReplicaDivisionPreference is Weighted, WeightPreference is "+ "nil, trigger rescheduling when replicas have changed.", func() { ginkgo.BeforeEach(func() { policy.Spec.Placement.ReplicaScheduling = &policyv1alpha1.ReplicaSchedulingStrategy{ ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided, ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, } }) ginkgo.It("replicas divided and weighted testing when rescheduling", func() { framework.WaitDeploymentPresentOnClustersFitWith(framework.ClusterNames(), deployment.Namespace, deployment.Name, func(*appsv1.Deployment) bool { return true }) expectedReplicas := int32(3) updateReplicas := expectedReplicas * int32(len(framework.Clusters())) framework.UpdateDeploymentReplicas(kubeClient, deployment, updateReplicas) klog.Infof("check if deployment's replicas are divided equally on member clusters") framework.WaitDeploymentPresentOnClustersFitWith(framework.ClusterNames(), deployment.Namespace, deployment.Name, func(deploy *appsv1.Deployment) bool { klog.Infof("Deployment(%s/%s)'s replicas is %d, expected: %d.", deploy.Namespace, deploy.Name, *deploy.Spec.Replicas, expectedReplicas) return *deploy.Spec.Replicas == expectedReplicas }) }) }) // Case 5: `ReplicaSchedulingType` value is `Divided`, `ReplicaDivisionPreference` value is `Weighted`, // `WeightPreference` isn't nil. ginkgo.Context("ReplicaSchedulingType is Divided, ReplicaDivisionPreference is Weighted, WeightPreference isn't nil.", func() { ginkgo.BeforeEach(func() { policy.Spec.Placement.ReplicaScheduling = &policyv1alpha1.ReplicaSchedulingStrategy{ ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided, ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, WeightPreference: &policyv1alpha1.ClusterPreferences{}, } sumWeight := 0 staticWeightLists := make([]policyv1alpha1.StaticClusterWeight, 0) for index, clusterName := range framework.ClusterNames() { staticWeightList := policyv1alpha1.StaticClusterWeight{ TargetCluster: policyv1alpha1.ClusterAffinity{ ClusterNames: []string{clusterName}, }, Weight: int64(index + 1), } sumWeight += index + 1 staticWeightLists = append(staticWeightLists, staticWeightList) } klog.Infof("StaticWeightList of policy is %+v", staticWeightLists) policy.Spec.Placement.ReplicaScheduling.WeightPreference.StaticWeightList = staticWeightLists klog.Infof("Sum weight of clusters is %d", sumWeight) sumReplicas := int32(sumWeight) deployment.Spec.Replicas = &sumReplicas }) ginkgo.It("replicas divided and weighted testing", func() { ginkgo.By("check if deployment's replicas are divided equally on member clusters", func() { for index, cluster := range framework.Clusters() { expectedReplicas := int32(index + 1) clusterClient := framework.GetClusterClient(cluster.Name) gomega.Expect(clusterClient).ShouldNot(gomega.BeNil()) gomega.Eventually(func(g gomega.Gomega) (int32, error) { memberDeployment, err := clusterClient.AppsV1().Deployments(deploymentNamespace).Get(context.TODO(), deploymentName, metav1.GetOptions{}) g.Expect(err).NotTo(gomega.HaveOccurred()) klog.Infof("Deployment(%s/%s)'s replicas is %d on cluster(%s), expected: %d.", deploymentNamespace, deploymentName, *memberDeployment.Spec.Replicas, cluster.Name, expectedReplicas) return *memberDeployment.Spec.Replicas, nil }, pollTimeout, pollInterval).Should(gomega.Equal(expectedReplicas)) } }) }) }) // Case 6: `ReplicaSchedulingType` value is `Divided`, `ReplicaDivisionPreference` value is `Weighted`, // `WeightPreference` isn't nil, trigger rescheduling when replicas have changed. ginkgo.Context("ReplicaSchedulingType is Divided, ReplicaDivisionPreference is Weighted, WeightPreference isn't "+ "nil, trigger rescheduling when replicas have changed.", func() { ginkgo.BeforeEach(func() { sumWeight := 0 staticWeightLists := make([]policyv1alpha1.StaticClusterWeight, 0) for index, clusterName := range framework.ClusterNames() { staticWeightList := policyv1alpha1.StaticClusterWeight{ TargetCluster: policyv1alpha1.ClusterAffinity{ ClusterNames: []string{clusterName}, }, Weight: int64(index + 1), } staticWeightLists = append(staticWeightLists, staticWeightList) sumWeight += index + 1 } klog.Infof("StaticWeightList of policy is %+v", staticWeightLists) policy.Spec.Placement.ReplicaScheduling = &policyv1alpha1.ReplicaSchedulingStrategy{ ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided, ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, WeightPreference: &policyv1alpha1.ClusterPreferences{ StaticWeightList: staticWeightLists, }, } sumReplicas := int32(sumWeight) deployment.Spec.Replicas = &sumReplicas }) ginkgo.It("replicas divided and weighted testing when rescheduling", func() { framework.WaitDeploymentPresentOnClustersFitWith(framework.ClusterNames(), deployment.Namespace, deployment.Name, func(*appsv1.Deployment) bool { return true }) sumWeight := 0 for index := range framework.ClusterNames() { sumWeight += index + 1 } klog.Infof("sumWeight of clusters is %d", sumWeight) updateReplicas := 2 * int32(sumWeight) framework.UpdateDeploymentReplicas(kubeClient, deployment, updateReplicas) ginkgo.By("check if deployment's replicas are divided equally on member clusters", func() { for index, cluster := range framework.Clusters() { expectedReplicas := 2 * int32(index+1) clusterClient := framework.GetClusterClient(cluster.Name) gomega.Expect(clusterClient).ShouldNot(gomega.BeNil()) gomega.Eventually(func(g gomega.Gomega) (int32, error) { memberDeployment, err := clusterClient.AppsV1().Deployments(deploymentNamespace).Get(context.TODO(), deploymentName, metav1.GetOptions{}) g.Expect(err).NotTo(gomega.HaveOccurred()) klog.Infof("Deployment(%s/%s)'s replicas is %d on cluster(%s), expected: %d.", deploymentNamespace, deploymentName, *memberDeployment.Spec.Replicas, cluster.Name, expectedReplicas) return *memberDeployment.Spec.Replicas, nil }, pollTimeout, pollInterval).Should(gomega.Equal(expectedReplicas)) } }) }) }) }) // JobReplicaScheduling focus on job replica schedule testing. var _ = ginkgo.Describe("[JobReplicaScheduling] JobReplicaSchedulingStrategy testing", func() { // `ReplicaSchedulingType` value is `Divided`, `ReplicaDivisionPreference` value is `Weighted`, // `WeightPreference` isn't nil, `spec.completions` isn't nil. ginkgo.Context("ReplicaSchedulingType is Divided, ReplicaDivisionPreference is Weighted, WeightPreference isn't nil, spec.completions isn`t nil.", func() { var policyNamespace, policyName string var jobNamespace, jobName string var job *batchv1.Job var policy *policyv1alpha1.PropagationPolicy ginkgo.BeforeEach(func() { policyNamespace = testNamespace policyName = jobNamePrefix + rand.String(RandomStrLength) jobNamespace = policyNamespace jobName = policyName job = helper.NewJob(jobNamespace, jobName) policy = helper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{ { APIVersion: job.APIVersion, Kind: job.Kind, Name: job.Name, }, }, policyv1alpha1.Placement{ ClusterAffinity: &policyv1alpha1.ClusterAffinity{ ClusterNames: framework.ClusterNames(), }, ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{ ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided, ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, WeightPreference: &policyv1alpha1.ClusterPreferences{}, }, }) }) ginkgo.AfterEach(func() { framework.RemoveJob(kubeClient, job.Namespace, job.Name) framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name) }) ginkgo.It("Job replicas divided and weighted testing", func() { sumWeight := 0 staticWeightLists := make([]policyv1alpha1.StaticClusterWeight, 0) for index, clusterName := range framework.ClusterNames() { staticWeightList := policyv1alpha1.StaticClusterWeight{ TargetCluster: policyv1alpha1.ClusterAffinity{ ClusterNames: []string{clusterName}, }, Weight: int64(index + 1), } sumWeight += index + 1 staticWeightLists = append(staticWeightLists, staticWeightList) } klog.Infof("StaticWeightList of policy is %+v", staticWeightLists) policy.Spec.Placement.ReplicaScheduling.WeightPreference.StaticWeightList = staticWeightLists klog.Infof("Sum weight of clusters is %d", sumWeight) framework.CreatePropagationPolicy(karmadaClient, policy) sumReplicas := int32(sumWeight) job.Spec.Parallelism = &sumReplicas job.Spec.Completions = &sumReplicas framework.CreateJob(kubeClient, job) ginkgo.By("check if job's parallelism are divided equally on member clusters", func() { for index, cluster := range framework.Clusters() { expectedReplicas := int32(index + 1) clusterClient := framework.GetClusterClient(cluster.Name) gomega.Expect(clusterClient).ShouldNot(gomega.BeNil()) gomega.Eventually(func(g gomega.Gomega) (int32, error) { memberJob, err := clusterClient.BatchV1().Jobs(jobNamespace).Get(context.TODO(), jobName, metav1.GetOptions{}) g.Expect(err).NotTo(gomega.HaveOccurred()) klog.Infof("Job(%s/%s)'s parallelism is %d on cluster(%s), expected: %d.", jobNamespace, jobName, *memberJob.Spec.Parallelism, cluster.Name, expectedReplicas) return *memberJob.Spec.Parallelism, nil }, pollTimeout, pollInterval).Should(gomega.Equal(expectedReplicas)) } }) }) }) }) // get the resource binding associated with the workload func getResourceBinding(workload interface{}) (*workv1alpha2.ResourceBinding, error) { obj, err := utilhelper.ToUnstructured(workload) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) bindingName := names.GenerateBindingName(obj.GetKind(), obj.GetName()) binding := &workv1alpha2.ResourceBinding{} err = controlPlaneClient.Get(context.TODO(), client.ObjectKey{Namespace: obj.GetNamespace(), Name: bindingName}, binding) return binding, err } // get the cluster resource binding associated with the workload func getClusterResourceBinding(workload interface{}) (*workv1alpha2.ClusterResourceBinding, error) { obj, err := utilhelper.ToUnstructured(workload) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) bindingName := names.GenerateBindingName(obj.GetKind(), obj.GetName()) binding := &workv1alpha2.ClusterResourceBinding{} err = controlPlaneClient.Get(context.TODO(), client.ObjectKey{Name: bindingName}, binding) return binding, err }