e2e: add test case cover job scheduling

Signed-off-by: lonelyCZ <531187475@qq.com>
This commit is contained in:
lonelyCZ 2021-11-15 16:15:07 +08:00
parent 12b38d0ffa
commit 1af768774c
3 changed files with 243 additions and 0 deletions

36
test/e2e/framework/job.go Normal file
View File

@ -0,0 +1,36 @@
package framework
import (
"context"
"fmt"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
// CreateJob create Job.
func CreateJob(client kubernetes.Interface, job *batchv1.Job) {
ginkgo.By(fmt.Sprintf("Creating Job(%s/%s)", job.Namespace, job.Name), func() {
_, err := client.BatchV1().Jobs(job.Namespace).Create(context.TODO(), job, metav1.CreateOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
}
// GetJob get Job.
func GetJob(client kubernetes.Interface, namespace, name string) {
ginkgo.By(fmt.Sprintf("Get job(%s)", name), func() {
_, err := client.BatchV1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
}
// RemoveJob delete Job.
func RemoveJob(client kubernetes.Interface, namespace, name string) {
ginkgo.By(fmt.Sprintf("Removing Job(%s/%s)", namespace, name), func() {
err := client.BatchV1().Jobs(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
}

View File

@ -252,6 +252,137 @@ var _ = ginkgo.Describe("propagation with label and group constraints testing",
framework.RemoveClusterPropagationPolicy(karmadaClient, crdPolicy.Name)
})
})
ginkgo.Context("Job propagation testing", func() {
var groupMatchedClusters []*clusterv1alpha1.Cluster
var targetClusterNames []string
policyNamespace := testNamespace
policyName := jobNamePrefix + rand.String(RandomStrLength)
jobNamespace := testNamespace
jobName := policyName
job := helper.NewJob(jobNamespace, jobName)
maxGroups := rand.Intn(2) + 1
minGroups := maxGroups
// set MaxGroups=MinGroups=1 or 2, label is location=CHN.
policy := helper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{
{
APIVersion: job.APIVersion,
Kind: job.Kind,
Name: job.Name,
},
}, policyv1alpha1.Placement{
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
LabelSelector: &metav1.LabelSelector{
MatchLabels: clusterLabels,
},
},
SpreadConstraints: []policyv1alpha1.SpreadConstraint{
{
SpreadByField: policyv1alpha1.SpreadByFieldCluster,
MaxGroups: maxGroups,
MinGroups: minGroups,
},
},
})
ginkgo.It("Job propagation with label and group constraints testing", func() {
framework.CreatePropagationPolicy(karmadaClient, policy)
framework.CreateJob(kubeClient, job)
framework.GetJob(kubeClient, job.Namespace, job.Name)
ginkgo.By("collect the target clusters in resource binding", func() {
bindingName := names.GenerateBindingName(job.Kind, job.Name)
binding := &workv1alpha2.ResourceBinding{}
fmt.Printf("MaxGroups= %v, MinGroups= %v\n", maxGroups, minGroups)
gomega.Eventually(func() int {
err := controlPlaneClient.Get(context.TODO(), client.ObjectKey{Namespace: policyNamespace, Name: bindingName}, binding)
if err != nil {
return -1
}
return len(binding.Spec.Clusters)
}, pollTimeout, pollInterval).Should(gomega.Equal(minGroups))
for _, cluster := range binding.Spec.Clusters {
targetClusterNames = append(targetClusterNames, cluster.Name)
}
fmt.Printf("target clusters in cluster resource binding are %s\n", targetClusterNames)
})
ginkgo.By("check if the scheduled condition is true", func() {
err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
rb, err := getResourceBinding(job)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
return meta.IsStatusConditionTrue(rb.Status.Conditions, workv1alpha2.Scheduled), nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
ginkgo.By("check if job present on right clusters", func() {
for _, targetClusterName := range targetClusterNames {
clusterClient := framework.GetClusterClient(targetClusterName)
gomega.Expect(clusterClient).ShouldNot(gomega.BeNil())
klog.Infof("Check whether job(%s/%s) is present on cluster(%s)", jobNamespace, jobName, targetClusterName)
err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) {
_, err = clusterClient.BatchV1().Jobs(jobNamespace).Get(context.TODO(), jobName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
targetCluster, _ := util.GetCluster(controlPlaneClient, targetClusterName)
groupMatchedClusters = append(groupMatchedClusters, targetCluster)
fmt.Printf("Job(%s/%s) is present on cluster(%s).\n", jobNamespace, jobName, targetClusterName)
return true, nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}
fmt.Printf("there are %d target clusters\n", len(groupMatchedClusters))
gomega.Expect(minGroups == len(groupMatchedClusters)).ShouldNot(gomega.BeFalse())
})
ginkgo.By("updating Job", func() {
patch := map[string]interface{}{
"spec": map[string]interface{}{
"parallelism": pointer.Int32Ptr(updateParallelism),
},
}
bytes, err := json.Marshal(patch)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
_, err = kubeClient.BatchV1().Jobs(jobNamespace).Patch(context.TODO(), jobName, types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
ginkgo.By("check if update has been synced to member clusters", func() {
for _, cluster := range groupMatchedClusters {
clusterClient := framework.GetClusterClient(cluster.Name)
gomega.Expect(clusterClient).ShouldNot(gomega.BeNil())
klog.Infof("Waiting for job(%s/%s) synced on cluster(%s)", jobNamespace, jobName, cluster.Name)
err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) {
memberJob, err := clusterClient.BatchV1().Jobs(jobNamespace).Get(context.TODO(), jobName, metav1.GetOptions{})
if err != nil {
return false, err
}
if *memberJob.Spec.Parallelism == updateParallelism {
return true, nil
}
return false, nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}
})
framework.RemoveJob(kubeClient, job.Namespace, job.Name)
framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name)
})
})
})
/*
@ -650,6 +781,81 @@ var _ = ginkgo.Describe("[ReplicaScheduling] ReplicaSchedulingStrategy testing",
})
})
// JobReplicaScheduling focus on job replica schedule testing.
var _ = ginkgo.Describe("[JobReplicaScheduling] JobReplicaSchedulingStrategy testing", func() {
// `ReplicaSchedulingType` value is `Divided`, `ReplicaDivisionPreference` value is `Weighted`,
// `WeightPreference` isn't nil, `spec.completions` isn't nil.
ginkgo.Context("ReplicaSchedulingType is Divided, ReplicaDivisionPreference is Weighted, WeightPreference isn't nil, spec.completions isn`t nil.", func() {
policyNamespace := testNamespace
policyName := jobNamePrefix + rand.String(RandomStrLength)
jobNamespace := policyNamespace
jobName := policyName
job := helper.NewJob(jobNamespace, jobName)
policy := helper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{
{
APIVersion: job.APIVersion,
Kind: job.Kind,
Name: job.Name,
},
}, policyv1alpha1.Placement{
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
ClusterNames: framework.ClusterNames(),
},
ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{
ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided,
ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceWeighted,
WeightPreference: &policyv1alpha1.ClusterPreferences{},
},
})
ginkgo.It("job replicas divided and weighted testing", func() {
sumWeight := 0
staticWeightLists := make([]policyv1alpha1.StaticClusterWeight, 0)
for index, clusterName := range framework.ClusterNames() {
staticWeightList := policyv1alpha1.StaticClusterWeight{
TargetCluster: policyv1alpha1.ClusterAffinity{
ClusterNames: []string{clusterName},
},
Weight: int64(index + 1),
}
sumWeight += index + 1
staticWeightLists = append(staticWeightLists, staticWeightList)
}
klog.Infof("StaticWeightList of policy is %+v", staticWeightLists)
policy.Spec.Placement.ReplicaScheduling.WeightPreference.StaticWeightList = staticWeightLists
klog.Infof("Sum weight of clusters is %d", sumWeight)
framework.CreatePropagationPolicy(karmadaClient, policy)
sumReplicas := int32(sumWeight)
job.Spec.Parallelism = &sumReplicas
job.Spec.Completions = &sumReplicas
framework.CreateJob(kubeClient, job)
ginkgo.By("check if job's parallelism are divided equally on member clusters", func() {
for index, cluster := range framework.Clusters() {
expectedReplicas := int32(index + 1)
clusterClient := framework.GetClusterClient(cluster.Name)
gomega.Expect(clusterClient).ShouldNot(gomega.BeNil())
gomega.Eventually(func(g gomega.Gomega) (int32, error) {
memberJob, err := clusterClient.BatchV1().Jobs(jobNamespace).Get(context.TODO(), jobName, metav1.GetOptions{})
g.Expect(err).NotTo(gomega.HaveOccurred())
klog.Info(fmt.Sprintf("Job(%s/%s)'s parallelism is %d on cluster(%s), expected: %d.",
jobNamespace, jobName, *memberJob.Spec.Parallelism, cluster.Name, expectedReplicas))
return *memberJob.Spec.Parallelism, nil
}, pollTimeout, pollInterval).Should(gomega.Equal(expectedReplicas))
}
})
framework.RemoveJob(kubeClient, job.Namespace, job.Name)
framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name)
})
})
})
// get the resource binding associated with the workload
func getResourceBinding(workload interface{}) (*workv1alpha2.ResourceBinding, error) {
uncastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(workload)

View File

@ -57,6 +57,7 @@ const (
updatePodImage = "nginx:latest"
updateCRnamespace = "e2e-test"
updateBackoffLimit = 3
updateParallelism = 3
)
var (