Merge pull request #592 from mrlihanbo/bugfix-job
Support propagation and retain Job resource
This commit is contained in:
commit
f2ac7afd5d
|
@ -60,6 +60,8 @@ const (
|
||||||
DeploymentKind = "Deployment"
|
DeploymentKind = "Deployment"
|
||||||
// ServiceKind indicates the target resource is a service
|
// ServiceKind indicates the target resource is a service
|
||||||
ServiceKind = "Service"
|
ServiceKind = "Service"
|
||||||
|
// JobKind indicates the target resource is a job
|
||||||
|
JobKind = "Job"
|
||||||
// PodKind indicates the target resource is a pod
|
// PodKind indicates the target resource is a pod
|
||||||
PodKind = "Pod"
|
PodKind = "Pod"
|
||||||
// ServiceAccountKind indicates the target resource is a serviceaccount
|
// ServiceAccountKind indicates the target resource is a serviceaccount
|
||||||
|
|
|
@ -43,8 +43,9 @@ func RetainClusterFields(desiredObj, clusterObj *unstructured.Unstructured) erro
|
||||||
return retainServiceAccountFields(desiredObj, clusterObj)
|
return retainServiceAccountFields(desiredObj, clusterObj)
|
||||||
case util.PersistentVolumeClaimKind:
|
case util.PersistentVolumeClaimKind:
|
||||||
return retainPersistentVolumeClaimFields(desiredObj, clusterObj)
|
return retainPersistentVolumeClaimFields(desiredObj, clusterObj)
|
||||||
|
case util.JobKind:
|
||||||
|
return retainJobSelectorFields(desiredObj, clusterObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,6 +190,30 @@ func retainPersistentVolumeClaimFields(desiredObj, clusterObj *unstructured.Unst
|
||||||
return fmt.Errorf("error setting volumeName for pvc: %w", err)
|
return fmt.Errorf("error setting volumeName for pvc: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func retainJobSelectorFields(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||||
|
matchLabels, exist, err := unstructured.NestedStringMap(clusterObj.Object, "spec", "selector", "matchLabels")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if exist {
|
||||||
|
err = unstructured.SetNestedStringMap(desiredObj.Object, matchLabels, "spec", "selector", "matchLabels")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
templateLabels, exist, err := unstructured.NestedStringMap(clusterObj.Object, "spec", "template", "metadata", "labels")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if exist {
|
||||||
|
err = unstructured.SetNestedStringMap(desiredObj.Object, templateLabels, "spec", "template", "metadata", "labels")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
@ -44,7 +45,11 @@ func (a *MutatingAdmission) Handle(ctx context.Context, req admission.Request) a
|
||||||
return admission.Errored(http.StatusInternalServerError, err)
|
return admission.Errored(http.StatusInternalServerError, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
removeIrrelevantField(workloadObj)
|
err = removeIrrelevantField(workloadObj)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Failed to remove irrelevant field for work(%s): %v", work.Name, err)
|
||||||
|
return admission.Errored(http.StatusInternalServerError, err)
|
||||||
|
}
|
||||||
|
|
||||||
workloadJSON, err := workloadObj.MarshalJSON()
|
workloadJSON, err := workloadObj.MarshalJSON()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -72,7 +77,7 @@ func (a *MutatingAdmission) InjectDecoder(d *admission.Decoder) error {
|
||||||
|
|
||||||
// removeIrrelevantField used to remove fields that generated by kube-apiserver and no need(or can't) propagate to
|
// removeIrrelevantField used to remove fields that generated by kube-apiserver and no need(or can't) propagate to
|
||||||
// member clusters.
|
// member clusters.
|
||||||
func removeIrrelevantField(workload *unstructured.Unstructured) {
|
func removeIrrelevantField(workload *unstructured.Unstructured) error {
|
||||||
// populated by the kubernetes.
|
// populated by the kubernetes.
|
||||||
unstructured.RemoveNestedField(workload.Object, "metadata", "creationTimestamp")
|
unstructured.RemoveNestedField(workload.Object, "metadata", "creationTimestamp")
|
||||||
|
|
||||||
|
@ -81,6 +86,11 @@ func removeIrrelevantField(workload *unstructured.Unstructured) {
|
||||||
// member clusters.
|
// member clusters.
|
||||||
unstructured.RemoveNestedField(workload.Object, "metadata", "deletionTimestamp")
|
unstructured.RemoveNestedField(workload.Object, "metadata", "deletionTimestamp")
|
||||||
|
|
||||||
|
// populated by the kubernetes.
|
||||||
|
// The kubernetes will set this fields in case of graceful deletion. This field is read-only and can't propagate to
|
||||||
|
// member clusters.
|
||||||
|
unstructured.RemoveNestedField(workload.Object, "metadata", "deletionGracePeriodSeconds")
|
||||||
|
|
||||||
// populated by the kubernetes.
|
// populated by the kubernetes.
|
||||||
unstructured.RemoveNestedField(workload.Object, "metadata", "generation")
|
unstructured.RemoveNestedField(workload.Object, "metadata", "generation")
|
||||||
|
|
||||||
|
@ -106,4 +116,54 @@ func removeIrrelevantField(workload *unstructured.Unstructured) {
|
||||||
unstructured.RemoveNestedField(workload.Object, "spec", "clusterIP")
|
unstructured.RemoveNestedField(workload.Object, "spec", "clusterIP")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if workload.GetKind() == util.JobKind {
|
||||||
|
job := &batchv1.Job{}
|
||||||
|
err := runtime.DefaultUnstructuredConverter.FromUnstructured(workload.UnstructuredContent(), job)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if job.Spec.ManualSelector == nil || !*job.Spec.ManualSelector {
|
||||||
|
return removeGenerateSelectorOfJob(workload)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func removeGenerateSelectorOfJob(workload *unstructured.Unstructured) error {
|
||||||
|
matchLabels, exist, err := unstructured.NestedStringMap(workload.Object, "spec", "selector", "matchLabels")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if exist {
|
||||||
|
if util.GetLabelValue(matchLabels, "controller-uid") != "" {
|
||||||
|
delete(matchLabels, "controller-uid")
|
||||||
|
}
|
||||||
|
err = unstructured.SetNestedStringMap(workload.Object, matchLabels, "spec", "selector", "matchLabels")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
templateLabels, exist, err := unstructured.NestedStringMap(workload.Object, "spec", "template", "metadata", "labels")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if exist {
|
||||||
|
if util.GetLabelValue(templateLabels, "controller-uid") != "" {
|
||||||
|
delete(templateLabels, "controller-uid")
|
||||||
|
}
|
||||||
|
|
||||||
|
if util.GetLabelValue(templateLabels, "job-name") != "" {
|
||||||
|
delete(templateLabels, "job-name")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = unstructured.SetNestedStringMap(workload.Object, templateLabels, "spec", "template", "metadata", "labels")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -596,4 +596,129 @@ var _ = ginkgo.Describe("[BasicPropagation] basic propagation testing", func() {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
ginkgo.Context("Job propagation testing", func() {
|
||||||
|
policyNamespace := testNamespace
|
||||||
|
policyName := jobNamePrefix + rand.String(RandomStrLength)
|
||||||
|
jobNamespace := testNamespace
|
||||||
|
jobName := policyName
|
||||||
|
|
||||||
|
job := testhelper.NewJob(jobNamespace, jobName)
|
||||||
|
policy := testhelper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{
|
||||||
|
{
|
||||||
|
APIVersion: job.APIVersion,
|
||||||
|
Kind: job.Kind,
|
||||||
|
Name: job.Name,
|
||||||
|
},
|
||||||
|
}, policyv1alpha1.Placement{
|
||||||
|
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
|
||||||
|
ClusterNames: clusterNames,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.BeforeEach(func() {
|
||||||
|
ginkgo.By(fmt.Sprintf("creating policy(%s/%s)", policyNamespace, policyName), func() {
|
||||||
|
_, err := karmadaClient.PolicyV1alpha1().PropagationPolicies(policyNamespace).Create(context.TODO(), policy, metav1.CreateOptions{})
|
||||||
|
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.AfterEach(func() {
|
||||||
|
ginkgo.By(fmt.Sprintf("removing policy(%s/%s)", policyNamespace, policyName), func() {
|
||||||
|
err := karmadaClient.PolicyV1alpha1().PropagationPolicies(policyNamespace).Delete(context.TODO(), policyName, metav1.DeleteOptions{})
|
||||||
|
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.It("job propagation testing", func() {
|
||||||
|
ginkgo.By(fmt.Sprintf("creating job(%s/%s)", jobNamespace, jobName), func() {
|
||||||
|
_, err := kubeClient.BatchV1().Jobs(jobNamespace).Create(context.TODO(), job, metav1.CreateOptions{})
|
||||||
|
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.By("check if job present on member clusters", func() {
|
||||||
|
for _, cluster := range clusters {
|
||||||
|
clusterClient := getClusterClient(cluster.Name)
|
||||||
|
gomega.Expect(clusterClient).ShouldNot(gomega.BeNil())
|
||||||
|
|
||||||
|
klog.Infof("Waiting for job(%s/%s) present on cluster(%s)", jobNamespace, jobName, cluster.Name)
|
||||||
|
err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) {
|
||||||
|
_, err = clusterClient.BatchV1().Jobs(jobNamespace).Get(context.TODO(), jobName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.By(fmt.Sprintf("updating job(%s/%s)", jobNamespace, jobName), func() {
|
||||||
|
patch := []map[string]interface{}{
|
||||||
|
{
|
||||||
|
"op": "replace",
|
||||||
|
"path": "/spec/backoffLimit",
|
||||||
|
"value": pointer.Int32Ptr(updateBackoffLimit),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
bytes, err := json.Marshal(patch)
|
||||||
|
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||||
|
|
||||||
|
_, err = kubeClient.BatchV1().Jobs(jobNamespace).Patch(context.TODO(), jobName, types.JSONPatchType, bytes, metav1.PatchOptions{})
|
||||||
|
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.By("check if update has been synced to member clusters", func() {
|
||||||
|
for _, cluster := range clusters {
|
||||||
|
clusterClient := getClusterClient(cluster.Name)
|
||||||
|
gomega.Expect(clusterClient).ShouldNot(gomega.BeNil())
|
||||||
|
|
||||||
|
klog.Infof("Waiting for job(%s/%s) synced on cluster(%s)", jobNamespace, jobName, cluster.Name)
|
||||||
|
err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) {
|
||||||
|
newJob, err := clusterClient.BatchV1().Jobs(jobNamespace).Get(context.TODO(), jobName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if *newJob.Spec.BackoffLimit == updateBackoffLimit {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.By(fmt.Sprintf("removing job(%s/%s)", jobNamespace, jobName), func() {
|
||||||
|
foregroundDelete := metav1.DeletePropagationForeground
|
||||||
|
err := kubeClient.BatchV1().Jobs(jobNamespace).Delete(context.TODO(), jobName, metav1.DeleteOptions{PropagationPolicy: &foregroundDelete})
|
||||||
|
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.By("check if job has been deleted from member clusters", func() {
|
||||||
|
for _, cluster := range clusters {
|
||||||
|
clusterClient := getClusterClient(cluster.Name)
|
||||||
|
gomega.Expect(clusterClient).ShouldNot(gomega.BeNil())
|
||||||
|
|
||||||
|
klog.Infof("Waiting for job(%s/%s) disappear on cluster(%s)", jobNamespace, jobName, cluster.Name)
|
||||||
|
err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) {
|
||||||
|
_, err = clusterClient.BatchV1().Jobs(jobNamespace).Get(context.TODO(), jobName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -55,11 +55,13 @@ const (
|
||||||
serviceNamePrefix = "service-"
|
serviceNamePrefix = "service-"
|
||||||
podNamePrefix = "pod-"
|
podNamePrefix = "pod-"
|
||||||
crdNamePrefix = "cr-"
|
crdNamePrefix = "cr-"
|
||||||
|
jobNamePrefix = "job-"
|
||||||
|
|
||||||
updateDeploymentReplicas = 6
|
updateDeploymentReplicas = 6
|
||||||
updateServicePort = 81
|
updateServicePort = 81
|
||||||
updatePodImage = "nginx:latest"
|
updatePodImage = "nginx:latest"
|
||||||
updateCRnamespace = "e2e-test"
|
updateCRnamespace = "e2e-test"
|
||||||
|
updateBackoffLimit = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
appsv1 "k8s.io/api/apps/v1"
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
@ -211,3 +212,33 @@ func NewCustomResource(apiVersion, kind, namespace, name string) *unstructured.U
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewJob will build a job object.
|
||||||
|
func NewJob(namespace string, name string) *batchv1.Job {
|
||||||
|
return &batchv1.Job{
|
||||||
|
TypeMeta: metav1.TypeMeta{
|
||||||
|
APIVersion: "batch/v1",
|
||||||
|
Kind: "Job",
|
||||||
|
},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: namespace,
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
Spec: batchv1.JobSpec{
|
||||||
|
Template: corev1.PodTemplateSpec{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
Spec: corev1.PodSpec{
|
||||||
|
Containers: []corev1.Container{{
|
||||||
|
Name: "pi",
|
||||||
|
Image: "perl",
|
||||||
|
Command: []string{"perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"},
|
||||||
|
}},
|
||||||
|
RestartPolicy: corev1.RestartPolicyNever,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
BackoffLimit: pointer.Int32Ptr(4),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue