From 4811ca7dd5146a3bea4b022cb7ae6886f5337fd4 Mon Sep 17 00:00:00 2001 From: yunbo Date: Tue, 10 Dec 2024 17:40:45 +0800 Subject: [PATCH] add unit test & split workload mutating webhook Signed-off-by: yunbo --- config/webhook/manifests.yaml | 42 --- config/webhook/patch_manifests.yaml | 20 +- .../batchrelease/batchrelease_executor.go | 2 +- .../bluegreenstyle/cloneset/control.go | 40 ++- .../bluegreenstyle/cloneset/control_test.go | 156 +++++++++++- .../bluegreenstyle/deployment/control.go | 39 ++- .../bluegreenstyle/deployment/control_test.go | 187 +++++++++++++- .../control/bluegreenstyle/hpa/hpa_test.go | 2 - pkg/controller/rollout/rollout_progressing.go | 17 +- pkg/util/errors/types.go | 55 ++-- .../rollout_create_update_handler.go | 21 +- .../mutating/unified_update_handler.go | 241 ++++++++++++++++++ .../mutating/unified_update_handler_test.go | 110 ++++++++ pkg/webhook/workload/mutating/webhooks.go | 6 +- .../mutating/workload_update_handler.go | 84 +----- .../mutating/workload_update_handler_test.go | 77 ------ 16 files changed, 796 insertions(+), 303 deletions(-) create mode 100644 pkg/webhook/workload/mutating/unified_update_handler.go create mode 100644 pkg/webhook/workload/mutating/unified_update_handler_test.go diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index 9af263a..9d2ec03 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -65,48 +65,6 @@ webhooks: resources: - deployments sideEffects: None -- admissionReviewVersions: - - v1 - - v1beta1 - clientConfig: - service: - name: webhook-service - namespace: system - path: /mutate-apps-v1-statefulset - failurePolicy: Fail - name: mstatefulset.kb.io - rules: - - apiGroups: - - apps - apiVersions: - - v1 - operations: - - UPDATE - resources: - - statefulsets - sideEffects: None -- admissionReviewVersions: - - v1 - - v1beta1 - clientConfig: - service: - name: webhook-service - namespace: system - path: /mutate-apps-kruise-io-statefulset - failurePolicy: Fail - name: madvancedstatefulset.kb.io - rules: - - apiGroups: - - apps.kruise.io - apiVersions: - - v1alpha1 - - v1beta1 - operations: - - CREATE - - UPDATE - resources: - - statefulsets - sideEffects: None - admissionReviewVersions: - v1 - v1beta1 diff --git a/config/webhook/patch_manifests.yaml b/config/webhook/patch_manifests.yaml index 5022bca..18993e3 100644 --- a/config/webhook/patch_manifests.yaml +++ b/config/webhook/patch_manifests.yaml @@ -18,16 +18,16 @@ webhooks: matchExpressions: - key: rollouts.kruise.io/workload-type operator: Exists - - name: mstatefulset.kb.io - objectSelector: - matchExpressions: - - key: rollouts.kruise.io/workload-type - operator: Exists - - name: madvancedstatefulset.kb.io - objectSelector: - matchExpressions: - - key: rollouts.kruise.io/workload-type - operator: Exists + # - name: mstatefulset.kb.io + # objectSelector: + # matchExpressions: + # - key: rollouts.kruise.io/workload-type + # operator: Exists + # - name: madvancedstatefulset.kb.io + # objectSelector: + # matchExpressions: + # - key: rollouts.kruise.io/workload-type + # operator: Exists - name: mdeployment.kb.io objectSelector: matchExpressions: diff --git a/pkg/controller/batchrelease/batchrelease_executor.go b/pkg/controller/batchrelease/batchrelease_executor.go index 2f54fc3..bd83651 100644 --- a/pkg/controller/batchrelease/batchrelease_executor.go +++ b/pkg/controller/batchrelease/batchrelease_executor.go @@ -151,7 +151,7 @@ func (r *Executor) progressBatches(release *v1beta1.BatchRelease, newStatus *v1b result = reconcile.Result{RequeueAfter: DefaultDuration} removeProgressingCondition(newStatus) newStatus.CanaryStatus.CurrentBatchState = v1beta1.VerifyingBatchState - case errors.IsFatal(err): + case errors.IsBadRequest(err): progressingStateTransition(newStatus, v1.ConditionTrue, v1beta1.ProgressingReasonInRolling, err.Error()) fallthrough default: diff --git a/pkg/controller/batchrelease/control/bluegreenstyle/cloneset/control.go b/pkg/controller/batchrelease/control/bluegreenstyle/cloneset/control.go index 0af7e3d..1dadad0 100644 --- a/pkg/controller/batchrelease/control/bluegreenstyle/cloneset/control.go +++ b/pkg/controller/batchrelease/control/bluegreenstyle/cloneset/control.go @@ -94,7 +94,7 @@ func (rc *realController) Initialize(release *v1beta1.BatchRelease) error { // patch the cloneset setting, err := control.GetOriginalSetting(rc.object) if err != nil { - return errors.NewFatalError(fmt.Errorf("cannot get original setting for cloneset %v: %s from annotation", klog.KObj(rc.object), err.Error())) + return errors.NewBadRequestError(fmt.Errorf("cannot get original setting for cloneset %v: %s from annotation", klog.KObj(rc.object), err.Error())) } control.InitOriginalSetting(&setting, rc.object) patchData := patch.NewClonesetPatch() @@ -115,7 +115,7 @@ func (rc *realController) Initialize(release *v1beta1.BatchRelease) error { func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error { if err := control.ValidateReadyForBlueGreenRelease(rc.object); err != nil { - return errors.NewFatalError(fmt.Errorf("cannot upgrade batch, because cloneset %v doesn't satisfy conditions: %s", klog.KObj(rc.object), err.Error())) + return errors.NewBadRequestError(fmt.Errorf("cannot upgrade batch, because cloneset %v doesn't satisfy conditions: %s", klog.KObj(rc.object), err.Error())) } desired, _ := intstr.GetScaledValueFromIntOrPercent(&ctx.DesiredSurge, int(ctx.Replicas), true) current, _ := intstr.GetScaledValueFromIntOrPercent(&ctx.CurrentSurge, int(ctx.Replicas), true) @@ -133,10 +133,6 @@ func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error { } func (rc *realController) Finalize(release *v1beta1.BatchRelease) error { - if rc.finalized() { - return nil // No need to finalize again - } - if release.Spec.ReleasePlan.BatchPartition != nil { // continuous release (not supported yet) /* @@ -148,39 +144,37 @@ func (rc *realController) Finalize(release *v1beta1.BatchRelease) error { return nil } - c := util.GetEmptyObjectWithKey(rc.object) - setting, err := control.GetOriginalSetting(rc.object) - if err != nil { - return errors.NewFatalError(fmt.Errorf("cannot get original setting for cloneset %v: %s from annotation", klog.KObj(rc.object), err.Error())) - } - patchData := patch.NewClonesetPatch() - if rc.object.Spec.MinReadySeconds != setting.MinReadySeconds { - // restore the hpa - if err := hpa.RestoreHPA(rc.client, rc.object); err != nil { + // restore the original setting and remove annotation + if !rc.restored() { + c := util.GetEmptyObjectWithKey(rc.object) + setting, err := control.GetOriginalSetting(rc.object) + if err != nil { return err } - // restore the original setting + patchData := patch.NewClonesetPatch() patchData.UpdateMinReadySeconds(setting.MinReadySeconds) patchData.UpdateMaxSurge(setting.MaxSurge) patchData.UpdateMaxUnavailable(setting.MaxUnavailable) + patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation) + patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation) if err := rc.client.Patch(context.TODO(), c, patchData); err != nil { return err } + klog.InfoS("Finalize: cloneset bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object)) } - klog.InfoS("Finalize: cloneset bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object)) + // wait all pods updated and ready if rc.object.Status.ReadyReplicas != rc.object.Status.UpdatedReadyReplicas { - return errors.NewBenignError(fmt.Errorf("cloneset %v finalize not done, readyReplicas %d != updatedReadyReplicas %d, current policy %s", + return errors.NewRetryError(fmt.Errorf("cloneset %v finalize not done, readyReplicas %d != updatedReadyReplicas %d, current policy %s", klog.KObj(rc.object), rc.object.Status.ReadyReplicas, rc.object.Status.UpdatedReadyReplicas, release.Spec.ReleasePlan.FinalizingPolicy)) } klog.InfoS("Finalize: cloneset bluegreen release: all pods updated and ready") - // restore annotation - patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation) - patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation) - return rc.client.Patch(context.TODO(), c, patchData) + + // restore the hpa + return hpa.RestoreHPA(rc.client, rc.object) } -func (rc *realController) finalized() bool { +func (rc *realController) restored() bool { if rc.object == nil || rc.object.DeletionTimestamp != nil { return true } diff --git a/pkg/controller/batchrelease/control/bluegreenstyle/cloneset/control_test.go b/pkg/controller/batchrelease/control/bluegreenstyle/cloneset/control_test.go index 0df0649..06a7719 100644 --- a/pkg/controller/batchrelease/control/bluegreenstyle/cloneset/control_test.go +++ b/pkg/controller/batchrelease/control/bluegreenstyle/cloneset/control_test.go @@ -31,6 +31,7 @@ import ( control "github.com/openkruise/rollouts/pkg/controller/batchrelease/control" "github.com/openkruise/rollouts/pkg/util" apps "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -38,6 +39,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -101,7 +103,7 @@ var ( UpdateRevision: "version-2", CurrentRevision: "version-1", ObservedGeneration: 1, - CollisionCount: pointer.Int32Ptr(1), + CollisionCount: pointer.Int32(1), }, } @@ -141,14 +143,157 @@ var ( }, }, } + hpaDemo = &autoscalingv1.HorizontalPodAutoscaler{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "autoscaling/v1", + Kind: "HorizontalPodAutoscaler", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "hpa", + Namespace: cloneKey.Namespace, + }, + Spec: autoscalingv1.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "CloneSet", + Name: cloneDemo.Name, + }, + MinReplicas: pointer.Int32(1), + MaxReplicas: 10, + }, + } ) func init() { apps.AddToScheme(scheme) rolloutapi.AddToScheme(scheme) kruiseappsv1alpha1.AddToScheme(scheme) + autoscalingv1.AddToScheme(scheme) } +func TestControlPackage(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "CloneSet Control Package Suite") +} + +var _ = Describe("CloneSet Control", func() { + var ( + c client.Client + rc *realController + cloneset *kruiseappsv1alpha1.CloneSet + release *v1beta1.BatchRelease + hpa *autoscalingv1.HorizontalPodAutoscaler + ) + + BeforeEach(func() { + cloneset = cloneDemo.DeepCopy() + release = releaseDemo.DeepCopy() + hpa = hpaDemo.DeepCopy() + c = fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(cloneset, release, hpa). + Build() + rc = &realController{ + key: types.NamespacedName{Namespace: cloneset.Namespace, Name: cloneset.Name}, + client: c, + } + }) + + It("should initialize cloneset successfully", func() { + // build controller + _, err := rc.BuildController() + Expect(err).NotTo(HaveOccurred()) + // call Initialize method + err = retryFunction(3, func() error { + return rc.Initialize(release) + }) + Expect(err).NotTo(HaveOccurred()) + // inspect if HPA is disabled + disabledHPA := &autoscalingv1.HorizontalPodAutoscaler{} + err = c.Get(context.TODO(), types.NamespacedName{Namespace: hpa.Namespace, Name: hpa.Name}, disabledHPA) + Expect(err).NotTo(HaveOccurred()) + Expect(disabledHPA.Spec.ScaleTargetRef.Name).To(Equal(cloneset.Name + "-DisableByRollout")) + + // inspect if Cloneset is patched properly + updatedCloneset := &kruiseappsv1alpha1.CloneSet{} + err = c.Get(context.TODO(), client.ObjectKeyFromObject(cloneset), updatedCloneset) + Expect(err).NotTo(HaveOccurred()) + + // inspect if annotations are added + Expect(updatedCloneset.Annotations).To(HaveKey(v1beta1.OriginalDeploymentStrategyAnnotation)) + Expect(updatedCloneset.Annotations).To(HaveKey(util.BatchReleaseControlAnnotation)) + Expect(updatedCloneset.Annotations[util.BatchReleaseControlAnnotation]).To(Equal(getControlInfo(release))) + + // inspect if strategy is updated + Expect(updatedCloneset.Spec.UpdateStrategy.Paused).To(BeFalse()) + Expect(updatedCloneset.Spec.UpdateStrategy.MaxSurge.IntVal).To(Equal(int32(1))) + Expect(updatedCloneset.Spec.UpdateStrategy.MaxUnavailable.IntVal).To(Equal(int32(0))) + Expect(updatedCloneset.Spec.MinReadySeconds).To(Equal(int32(v1beta1.MaxReadySeconds))) + }) + + It("should finalize CloneSet successfully", func() { + // hack to patch cloneset status + cloneset.Status.UpdatedReadyReplicas = 10 + err := c.Status().Update(context.TODO(), cloneset) + Expect(err).NotTo(HaveOccurred()) + // build controller + rc.object = nil + _, err = rc.BuildController() + Expect(err).NotTo(HaveOccurred()) + // call Finalize method + err = retryFunction(3, func() error { + return rc.Finalize(release) + }) + Expect(err).NotTo(HaveOccurred()) + + // inspect if CloneSet is patched properly + updatedCloneset := &kruiseappsv1alpha1.CloneSet{} + err = c.Get(context.TODO(), client.ObjectKeyFromObject(cloneset), updatedCloneset) + Expect(err).NotTo(HaveOccurred()) + + // inspect if annotations are removed + Expect(updatedCloneset.Annotations).NotTo(HaveKey(v1beta1.OriginalDeploymentStrategyAnnotation)) + Expect(updatedCloneset.Annotations).NotTo(HaveKey(util.BatchReleaseControlAnnotation)) + + // inspect if strategy is restored + Expect(updatedCloneset.Spec.UpdateStrategy.MaxSurge).To(BeNil()) + Expect(*updatedCloneset.Spec.UpdateStrategy.MaxUnavailable).To(Equal(intstr.IntOrString{Type: intstr.Int, IntVal: 1})) + Expect(updatedCloneset.Spec.MinReadySeconds).To(Equal(int32(0))) + + // inspect if HPA is restored + restoredHPA := &autoscalingv1.HorizontalPodAutoscaler{} + err = c.Get(context.TODO(), types.NamespacedName{Namespace: hpa.Namespace, Name: hpa.Name}, restoredHPA) + Expect(err).NotTo(HaveOccurred()) + Expect(restoredHPA.Spec.ScaleTargetRef.Name).To(Equal(cloneset.Name)) + }) + + It("should upgradBatch for CloneSet successfully", func() { + // call Initialize method + _, err := rc.BuildController() + Expect(err).NotTo(HaveOccurred()) + err = retryFunction(3, func() error { + return rc.Initialize(release) + }) + Expect(err).NotTo(HaveOccurred()) + + // call UpgradeBatch method + rc.object = nil + _, err = rc.BuildController() + Expect(err).NotTo(HaveOccurred()) + batchContext, err := rc.CalculateBatchContext(release) + Expect(err).NotTo(HaveOccurred()) + err = rc.UpgradeBatch(batchContext) + Expect(err).NotTo(HaveOccurred()) + + // inspect if CloneSet is patched properly + updatedCloneset := &kruiseappsv1alpha1.CloneSet{} + err = c.Get(context.TODO(), client.ObjectKeyFromObject(cloneset), updatedCloneset) + Expect(err).NotTo(HaveOccurred()) + Expect(*updatedCloneset.Spec.UpdateStrategy.MaxSurge).To(Equal(intstr.IntOrString{Type: intstr.String, StrVal: "10%"})) + Expect(*updatedCloneset.Spec.UpdateStrategy.MaxUnavailable).To(Equal(intstr.IntOrString{Type: intstr.Int, IntVal: 0})) + }) +}) + func TestCalculateBatchContext(t *testing.T) { RegisterFailHandler(Fail) cases := map[string]struct { @@ -392,3 +537,12 @@ func getControlInfo(release *v1beta1.BatchRelease) string { owner, _ := json.Marshal(metav1.NewControllerRef(release, release.GetObjectKind().GroupVersionKind())) return string(owner) } + +func retryFunction(limit int, f func() error) (err error) { + for i := limit; i >= 0; i-- { + if err = f(); err == nil { + return nil + } + } + return err +} diff --git a/pkg/controller/batchrelease/control/bluegreenstyle/deployment/control.go b/pkg/controller/batchrelease/control/bluegreenstyle/deployment/control.go index da93a2f..9e073b3 100644 --- a/pkg/controller/batchrelease/control/bluegreenstyle/deployment/control.go +++ b/pkg/controller/batchrelease/control/bluegreenstyle/deployment/control.go @@ -111,7 +111,7 @@ func (rc *realController) Initialize(release *v1beta1.BatchRelease) error { func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error { if err := control.ValidateReadyForBlueGreenRelease(rc.object); err != nil { - return errors.NewFatalError(fmt.Errorf("cannot upgrade batch, because deployment %v doesn't satisfy conditions: %s", klog.KObj(rc.object), err.Error())) + return errors.NewBadRequestError(fmt.Errorf("cannot upgrade batch, because deployment %v doesn't satisfy conditions: %s", klog.KObj(rc.object), err.Error())) } desired, _ := intstr.GetScaledValueFromIntOrPercent(&ctx.DesiredSurge, int(ctx.Replicas), true) current, _ := intstr.GetScaledValueFromIntOrPercent(&ctx.CurrentSurge, int(ctx.Replicas), true) @@ -137,9 +137,6 @@ func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error { // set pause to false, restore the original setting, delete annotation func (rc *realController) Finalize(release *v1beta1.BatchRelease) error { - if rc.finalized() { - return nil // No need to finalize again. - } if release.Spec.ReleasePlan.BatchPartition != nil { // continuous release (not supported yet) /* @@ -153,41 +150,41 @@ func (rc *realController) Finalize(release *v1beta1.BatchRelease) error { return nil } + // restore the original setting and remove annotation d := util.GetEmptyObjectWithKey(rc.object) - setting, err := control.GetOriginalSetting(rc.object) - if err != nil { - return errors.NewFatalError(fmt.Errorf("cannot get original setting for cloneset %v: %s from annotation", klog.KObj(rc.object), err.Error())) - } - patchData := patch.NewDeploymentPatch() - if rc.object.Spec.MinReadySeconds != setting.MinReadySeconds { - // restore the hpa - if err := hpa.RestoreHPA(rc.client, rc.object); err != nil { + if !rc.restored() { + setting, err := control.GetOriginalSetting(rc.object) + if err != nil { return err } + patchData := patch.NewDeploymentPatch() // restore the original setting patchData.UpdatePaused(false) patchData.UpdateMinReadySeconds(setting.MinReadySeconds) patchData.UpdateProgressDeadlineSeconds(setting.ProgressDeadlineSeconds) patchData.UpdateMaxSurge(setting.MaxSurge) patchData.UpdateMaxUnavailable(setting.MaxUnavailable) + // restore label and annotation + patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation) + patchData.DeleteLabel(v1alpha1.DeploymentStableRevisionLabel) + patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation) if err := rc.client.Patch(context.TODO(), d, patchData); err != nil { return err } + klog.InfoS("Finalize: deployment bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object)) } - klog.InfoS("Finalize: deployment bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object)) + // wait all pods updated and ready if err := waitAllUpdatedAndReady(d.(*apps.Deployment)); err != nil { - return errors.NewBenignError(err) + return errors.NewRetryError(err) } klog.InfoS("Finalize: deployment is ready to resume, restore the original setting", "deployment", klog.KObj(rc.object)) - // restore label and annotation - patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation) - patchData.DeleteLabel(v1alpha1.DeploymentStableRevisionLabel) - patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation) - return rc.client.Patch(context.TODO(), d, patchData) + + // restore hpa + return hpa.RestoreHPA(rc.client, rc.object) } -func (rc *realController) finalized() bool { +func (rc *realController) restored() bool { if rc.object == nil || rc.object.DeletionTimestamp != nil { return true } @@ -313,7 +310,7 @@ func (rc *realController) patchStableRSMinReadySeconds(seconds int32) error { func (rc *realController) patchDeployment(release *v1beta1.BatchRelease) error { setting, err := control.GetOriginalSetting(rc.object) if err != nil { - return errors.NewFatalError(fmt.Errorf("cannot get original setting for deployment %v: %s", klog.KObj(rc.object), err.Error())) + return errors.NewBadRequestError(fmt.Errorf("cannot get original setting for deployment %v: %s", klog.KObj(rc.object), err.Error())) } control.InitOriginalSetting(&setting, rc.object) patchData := patch.NewDeploymentPatch() diff --git a/pkg/controller/batchrelease/control/bluegreenstyle/deployment/control_test.go b/pkg/controller/batchrelease/control/bluegreenstyle/deployment/control_test.go index bdb9bcc..f6fc934 100644 --- a/pkg/controller/batchrelease/control/bluegreenstyle/deployment/control_test.go +++ b/pkg/controller/batchrelease/control/bluegreenstyle/deployment/control_test.go @@ -35,6 +35,7 @@ import ( "github.com/openkruise/rollouts/pkg/util" "github.com/openkruise/rollouts/pkg/util/errors" apps "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -107,7 +108,7 @@ var ( UpdatedReplicas: 0, ReadyReplicas: 10, AvailableReplicas: 10, - CollisionCount: pointer.Int32Ptr(1), + CollisionCount: pointer.Int32(1), ObservedGeneration: 1, }, } @@ -128,7 +129,7 @@ var ( }, }, Spec: apps.DeploymentSpec{ - Replicas: pointer.Int32Ptr(10), + Replicas: pointer.Int32(10), Strategy: apps.DeploymentStrategy{ Type: apps.RollingUpdateDeploymentStrategyType, RollingUpdate: &apps.RollingUpdateDeployment{ @@ -192,14 +193,183 @@ var ( }, }, } + + hpaDemo = &autoscalingv1.HorizontalPodAutoscaler{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "autoscaling/v1", + Kind: "HorizontalPodAutoscaler", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "hpa", + Namespace: deploymentKey.Namespace, + }, + Spec: autoscalingv1.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{ + APIVersion: apps.SchemeGroupVersion.String(), + Kind: "Deployment", + Name: deploymentDemo.Name, + }, + MinReplicas: pointer.Int32(1), + MaxReplicas: 10, + }, + } ) func init() { apps.AddToScheme(scheme) rolloutapi.AddToScheme(scheme) kruiseappsv1alpha1.AddToScheme(scheme) + autoscalingv1.AddToScheme(scheme) } +func TestControlPackage(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Deployment Control Package Suite") +} + +var _ = Describe("Deployment Control", func() { + var ( + c client.Client + rc *realController + deployment *apps.Deployment + release *v1beta1.BatchRelease + hpa *autoscalingv1.HorizontalPodAutoscaler + stableRS *apps.ReplicaSet + canaryRS *apps.ReplicaSet + ) + + BeforeEach(func() { + deployment = deploymentDemo.DeepCopy() + release = releaseDemo.DeepCopy() + hpa = hpaDemo.DeepCopy() + + deployment = getStableWithReady(deployment, "v1").(*apps.Deployment) + stableRS = makeStableReplicaSets(deployment).(*apps.ReplicaSet) + stableRS.Spec.MinReadySeconds = 0 + stableRS.Status.ReadyReplicas = *deployment.Spec.Replicas + stableRS.Status.AvailableReplicas = *deployment.Spec.Replicas + + canaryRS = makeCanaryReplicaSets(deployment).(*apps.ReplicaSet) + canaryRS.Status.ReadyReplicas = 0 + canaryRS.Status.AvailableReplicas = 0 + + c = fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(deployment, release, hpa, stableRS, canaryRS). + Build() + rc = &realController{ + key: types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name}, + client: c, + finder: util.NewControllerFinder(c), + } + }) + + It("should initialize Deployment successfully", func() { + // build controller + _, err := rc.BuildController() + Expect(err).NotTo(HaveOccurred()) + // call Initialize method + err = retryFunction(3, func() error { + return rc.Initialize(release) + }) + Expect(err).NotTo(HaveOccurred()) + // inspect if HPA is disabled + disabledHPA := &autoscalingv1.HorizontalPodAutoscaler{} + err = c.Get(context.TODO(), types.NamespacedName{Namespace: hpa.Namespace, Name: hpa.Name}, disabledHPA) + Expect(err).NotTo(HaveOccurred()) + Expect(disabledHPA.Spec.ScaleTargetRef.Name).To(Equal(deployment.Name + "-DisableByRollout")) + + // inspect if MinReadySeconds of stable ReplicaSet is updated + stableRSAfter := &apps.ReplicaSet{} + err = c.Get(context.TODO(), client.ObjectKeyFromObject(stableRS), stableRSAfter) + Expect(err).NotTo(HaveOccurred()) + Expect(stableRSAfter.Spec.MinReadySeconds).To(Equal(int32(v1beta1.MaxReadySeconds))) + + // inspect if Deployment is patched properly + updatedDeployment := &apps.Deployment{} + err = c.Get(context.TODO(), client.ObjectKeyFromObject(deployment), updatedDeployment) + Expect(err).NotTo(HaveOccurred()) + + // inspect if annotations are added + Expect(updatedDeployment.Annotations).To(HaveKey(v1beta1.OriginalDeploymentStrategyAnnotation)) + Expect(updatedDeployment.Annotations).To(HaveKey(util.BatchReleaseControlAnnotation)) + Expect(updatedDeployment.Annotations[util.BatchReleaseControlAnnotation]).To(Equal(getControlInfo(release))) + + // inspect if strategy is updated + Expect(updatedDeployment.Spec.Strategy.RollingUpdate).NotTo(BeNil()) + Expect(updatedDeployment.Spec.Strategy.RollingUpdate.MaxSurge.IntVal).To(Equal(int32(1))) + Expect(updatedDeployment.Spec.Strategy.RollingUpdate.MaxUnavailable.IntVal).To(Equal(int32(0))) + Expect(updatedDeployment.Spec.MinReadySeconds).To(Equal(int32(v1beta1.MaxReadySeconds))) + Expect(*updatedDeployment.Spec.ProgressDeadlineSeconds).To(Equal(int32(v1beta1.MaxProgressSeconds))) + }) + + It("should finalize Deployment successfully", func() { + // build controller + rc.object = nil + _, err := rc.BuildController() + Expect(err).NotTo(HaveOccurred()) + // call Finalize method + err = retryFunction(3, func() error { + return rc.Finalize(release) + }) + Expect(err).NotTo(HaveOccurred()) + + // inspect if Deployment is patched properly + updatedDeployment := &apps.Deployment{} + err = c.Get(context.TODO(), client.ObjectKeyFromObject(deployment), updatedDeployment) + Expect(err).NotTo(HaveOccurred()) + + // inspect if annotations are removed + Expect(updatedDeployment.Annotations).NotTo(HaveKey(v1beta1.OriginalDeploymentStrategyAnnotation)) + Expect(updatedDeployment.Annotations).NotTo(HaveKey(util.BatchReleaseControlAnnotation)) + + // inspect if strategy is restored + Expect(updatedDeployment.Spec.Strategy.RollingUpdate).NotTo(BeNil()) + Expect(*updatedDeployment.Spec.Strategy.RollingUpdate.MaxSurge).To(Equal(intstr.IntOrString{Type: intstr.String, StrVal: "20%"})) + Expect(*updatedDeployment.Spec.Strategy.RollingUpdate.MaxUnavailable).To(Equal(intstr.IntOrString{Type: intstr.Int, IntVal: 1})) + Expect(updatedDeployment.Spec.MinReadySeconds).To(Equal(int32(0))) + Expect(updatedDeployment.Spec.ProgressDeadlineSeconds).To(BeNil()) + + // inspect if HPA is restored + restoredHPA := &autoscalingv1.HorizontalPodAutoscaler{} + err = c.Get(context.TODO(), types.NamespacedName{Namespace: hpa.Namespace, Name: hpa.Name}, restoredHPA) + Expect(err).NotTo(HaveOccurred()) + Expect(restoredHPA.Spec.ScaleTargetRef.Name).To(Equal(deployment.Name)) + + // inspect if MinReadySeconds of stable ReplicaSet is restored + stableRSAfter := &apps.ReplicaSet{} + err = c.Get(context.TODO(), client.ObjectKeyFromObject(stableRS), stableRSAfter) + Expect(err).NotTo(HaveOccurred()) + Expect(stableRSAfter.Spec.MinReadySeconds).To(Equal(int32(0))) + }) + + It("should upgradBatch for Deployment successfully", func() { + // call Initialize method + _, err := rc.BuildController() + Expect(err).NotTo(HaveOccurred()) + err = retryFunction(3, func() error { + return rc.Initialize(release) + }) + Expect(err).NotTo(HaveOccurred()) + + // call UpgradeBatch method + rc.object = nil + _, err = rc.BuildController() + Expect(err).NotTo(HaveOccurred()) + batchContext, err := rc.CalculateBatchContext(release) + Expect(err).NotTo(HaveOccurred()) + err = rc.UpgradeBatch(batchContext) + Expect(err).NotTo(HaveOccurred()) + // inspect if Deployment is patched properly + updatedDeployment := &apps.Deployment{} + err = c.Get(context.TODO(), client.ObjectKeyFromObject(deployment), updatedDeployment) + Expect(err).NotTo(HaveOccurred()) + Expect(updatedDeployment.Spec.Paused).To(BeFalse()) + Expect(*updatedDeployment.Spec.Strategy.RollingUpdate.MaxSurge).To(Equal(intstr.IntOrString{Type: intstr.String, StrVal: "50%"})) + Expect(*updatedDeployment.Spec.Strategy.RollingUpdate.MaxUnavailable).To(Equal(intstr.IntOrString{Type: intstr.Int, IntVal: 0})) + }) +}) + func TestCalculateBatchContext(t *testing.T) { RegisterFailHandler(Fail) cases := map[string]struct { @@ -218,7 +388,7 @@ func TestCalculateBatchContext(t *testing.T) { } // current partition, ie. maxSurge deployment.Spec.Strategy.RollingUpdate.MaxSurge = &intstr.IntOrString{Type: intstr.String, StrVal: "50%"} - deployment.Spec.Replicas = pointer.Int32Ptr(10) + deployment.Spec.Replicas = pointer.Int32(10) newRss := makeCanaryReplicaSets(deployment).(*apps.ReplicaSet) newRss.Status.ReadyReplicas = 2 return []client.Object{deployment, newRss, makeStableReplicaSets(deployment)} @@ -442,7 +612,7 @@ func TestRealController(t *testing.T) { release.Spec.ReleasePlan.BatchPartition = nil err = controller.Finalize(release) - Expect(errors.IsBenign(err)).Should(BeTrue()) + Expect(errors.IsRetryError(err)).Should(BeTrue()) fetch = &apps.Deployment{} Expect(cli.Get(context.TODO(), deploymentKey, fetch)).NotTo(HaveOccurred()) // check workload strategy @@ -547,3 +717,12 @@ func getStableWithReady(workload client.Object, version string) client.Object { } return nil } + +func retryFunction(limit int, f func() error) (err error) { + for i := limit; i >= 0; i-- { + if err = f(); err == nil { + return nil + } + } + return err +} diff --git a/pkg/controller/batchrelease/control/bluegreenstyle/hpa/hpa_test.go b/pkg/controller/batchrelease/control/bluegreenstyle/hpa/hpa_test.go index a2e2f64..57f94af 100644 --- a/pkg/controller/batchrelease/control/bluegreenstyle/hpa/hpa_test.go +++ b/pkg/controller/batchrelease/control/bluegreenstyle/hpa/hpa_test.go @@ -12,8 +12,6 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - // "github.com/openkruise/rollouts/api/v1alpha1" - // import hpa v1 ) var ( diff --git a/pkg/controller/rollout/rollout_progressing.go b/pkg/controller/rollout/rollout_progressing.go index 60fd756..e4ed86e 100644 --- a/pkg/controller/rollout/rollout_progressing.go +++ b/pkg/controller/rollout/rollout_progressing.go @@ -111,11 +111,12 @@ func (r *RolloutReconciler) reconcileRolloutProgressing(rollout *v1beta1.Rollout case v1alpha1.ProgressingReasonInRolling: klog.Infof("rollout(%s/%s) is Progressing, and in reason(%s)", rollout.Namespace, rollout.Name, cond.Reason) err = r.doProgressingInRolling(rolloutContext) - if utilerrors.IsFatal(err) { + if utilerrors.IsBadRequest(err) { // For fatal errors, do not retry as it wastes resources and has no effect. // therefore, we don't propagate the error, but just log it. // user should do sth instead, eg. for bluegreen continuous release scenario, user should do rollback klog.Warningf("rollout(%s/%s) doProgressingInRolling error(%s)", rollout.Namespace, rollout.Name, err.Error()) + return nil, nil } else if err != nil { return nil, err } @@ -231,6 +232,16 @@ func (r *RolloutReconciler) handleRolloutPaused(rollout *v1beta1.Rollout, newSta return nil } +/* +continuous release (or successive release) is not supported for bluegreen release, especially for cloneset, +here is why: +suppose we are releasing a cloneSet, which has pods of both v1 and v2 for now. If we release v3 before +v2 is fully released, the cloneSet controller might scale down pods without distinguishing between v1 and v2. +This is because our implementation is based on the minReadySeconds, pods of both v1 and v2 are "unavailable" +in the progress of rollout. +Deployment actually has the same problem, however it is possible to bypass this issue for Deployment by setting +minReadySeconds for replicaset separately; unfortunately this workaround seems not work for cloneset +*/ func (r *RolloutReconciler) handleContinuousRelease(c *RolloutContext) error { r.Recorder.Eventf(c.Rollout, corev1.EventTypeNormal, "Progressing", "workload continuous publishing canaryRevision, then restart publishing") klog.Infof("rollout(%s/%s) workload continuous publishing canaryRevision from(%s) -> to(%s), then restart publishing", @@ -239,9 +250,9 @@ func (r *RolloutReconciler) handleContinuousRelease(c *RolloutContext) error { // do nothing for blue-green release if c.Rollout.Spec.Strategy.IsBlueGreenRelease() { cond := util.GetRolloutCondition(*c.NewStatus, v1beta1.RolloutConditionProgressing) - cond.Message = "[warning] new version released in progress of blue-green release, please rollback first" + cond.Message = "new version releasing detected in the progress of blue-green release, please rollback first" c.NewStatus.Message = cond.Message - return utilerrors.NewFatalError(fmt.Errorf("cannot do continuous release for blue-green release, rollback firstly")) + return utilerrors.NewBadRequestError(fmt.Errorf("new version releasing detected in the progress of blue-green release, please rollback first")) } done, err := r.doProgressingReset(c) diff --git a/pkg/util/errors/types.go b/pkg/util/errors/types.go index 59f2f6a..94f7bce 100644 --- a/pkg/util/errors/types.go +++ b/pkg/util/errors/types.go @@ -5,66 +5,65 @@ import ( "fmt" ) -// BenignError represents a benign error that can be handled or ignored by the caller. +// RetryError represents a benign error that can be handled or ignored by the caller. // It encapsulates information that is non-critical and does not require immediate attention. -type BenignError struct { +type RetryError struct { Err error } -// Error implements the error interface for BenignError. +// Error implements the error interface for RetryError. // It returns the error message of the encapsulated error or a default message. -func (e *BenignError) Error() string { +func (e *RetryError) Error() string { if e.Err != nil { - return fmt.Sprintf("[benign]: %s", e.Err.Error()) + return fmt.Sprintf("[retry]: %s", e.Err.Error()) } - return "benign error" + return "retry error" } -// NewBenignError creates a new instance of BenignError. -// If the provided err is nil, it signifies a benign condition without a specific error message. -func NewBenignError(err error) *BenignError { - return &BenignError{Err: err} +// NewRetryError creates a new instance of RetryError. +func NewRetryError(err error) *RetryError { + return &RetryError{Err: err} } -func IsBenign(err error) bool { - var benignErr *BenignError - return errors.As(err, &benignErr) +func IsRetryError(err error) bool { + var re *RetryError + return errors.As(err, &re) } -func AsBenign(err error, target **BenignError) bool { +func AsRetryError(err error, target **RetryError) bool { return errors.As(err, target) } -// FatalError represents a fatal error that requires special handling. +// BadRequestError represents a fatal error that requires special handling. // Such errors are critical and may necessitate logging, alerts, or even program termination. -type FatalError struct { +type BadRequestError struct { Err error } -// Error implements the error interface for FatalError. +// Error implements the error interface for BadRequestError. // It returns the error message of the encapsulated error or a default message. -func (e *FatalError) Error() string { +func (e *BadRequestError) Error() string { if e.Err != nil { return e.Err.Error() } return "fatal error" } -// NewFatalError creates a new instance of FatalError. +// NewBadRequestError creates a new instance of BadRequestError. // It encapsulates the provided error, marking it as critical. -func NewFatalError(err error) *FatalError { - return &FatalError{Err: err} +func NewBadRequestError(err error) *BadRequestError { + return &BadRequestError{Err: err} } -// IsFatal checks whether the provided error is of type FatalError. -// It returns true if the error is a FatalError or wraps a FatalError, false otherwise. -func IsFatal(err error) bool { - var fatalErr *FatalError - return AsFatal(err, &fatalErr) +// IsBadRequest checks whether the provided error is of type BadRequestError. +// It returns true if the error is a BadRequestError or wraps a BadRequestError, false otherwise. +func IsBadRequest(err error) bool { + var brErr *BadRequestError + return AsBadRequest(err, &brErr) } -// AsFatal attempts to cast the provided error to a FatalError. +// AsBadRequest attempts to cast the provided error to a BadRequestError. // It returns true if the casting is successful, allowing the caller to handle it accordingly. -func AsFatal(err error, target **FatalError) bool { +func AsBadRequest(err error, target **BadRequestError) bool { return errors.As(err, target) } diff --git a/pkg/webhook/rollout/validating/rollout_create_update_handler.go b/pkg/webhook/rollout/validating/rollout_create_update_handler.go index 49b727c..6ed46b0 100644 --- a/pkg/webhook/rollout/validating/rollout_create_update_handler.go +++ b/pkg/webhook/rollout/validating/rollout_create_update_handler.go @@ -36,6 +36,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) +var ( + blueGreenSupportWorkloadGVKs = []*schema.GroupVersionKind{ + &util.ControllerKindDep, + &util.ControllerKruiseKindCS, + } +) + // RolloutCreateUpdateHandler handles Rollout type RolloutCreateUpdateHandler struct { // To use the client, you need to do the following: @@ -204,12 +211,12 @@ func (h *RolloutCreateUpdateHandler) validateRolloutConflict(rollout *appsv1beta } func validateRolloutSpec(c *validateContext, rollout *appsv1beta1.Rollout, fldPath *field.Path) field.ErrorList { - errList := validateRolloutSpecObjectRef(&rollout.Spec.WorkloadRef, fldPath.Child("ObjectRef")) + errList := validateRolloutSpecObjectRef(c, &rollout.Spec.WorkloadRef, fldPath.Child("ObjectRef")) errList = append(errList, validateRolloutSpecStrategy(c, &rollout.Spec.Strategy, fldPath.Child("Strategy"))...) return errList } -func validateRolloutSpecObjectRef(workloadRef *appsv1beta1.ObjectRef, fldPath *field.Path) field.ErrorList { +func validateRolloutSpecObjectRef(c *validateContext, workloadRef *appsv1beta1.ObjectRef, fldPath *field.Path) field.ErrorList { if workloadRef == nil { return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), workloadRef, "WorkloadRef is required")} } @@ -218,6 +225,14 @@ func validateRolloutSpecObjectRef(workloadRef *appsv1beta1.ObjectRef, fldPath *f if !util.IsSupportedWorkload(gvk) { return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), workloadRef, "WorkloadRef kind is not supported")} } + if c.style == string(appsv1beta1.BlueGreenRollingStyle) { + for _, allowed := range blueGreenSupportWorkloadGVKs { + if gvk.Group == allowed.Group && gvk.Kind == allowed.Kind { + return nil + } + } + return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), workloadRef, "WorkloadRef kind is not supported for bluegreen style")} + } return nil } @@ -378,7 +393,7 @@ func (h *RolloutCreateUpdateHandler) InjectDecoder(d *admission.Decoder) error { func GetContextFromv1beta1Rollout(rollout *appsv1beta1.Rollout) *validateContext { if rollout.Spec.Strategy.Canary == nil && rollout.Spec.Strategy.BlueGreen == nil { - return nil + return &validateContext{} } style := rollout.Spec.Strategy.GetRollingStyle() if appsv1beta1.IsRealPartition(rollout) { diff --git a/pkg/webhook/workload/mutating/unified_update_handler.go b/pkg/webhook/workload/mutating/unified_update_handler.go new file mode 100644 index 0000000..d29cae3 --- /dev/null +++ b/pkg/webhook/workload/mutating/unified_update_handler.go @@ -0,0 +1,241 @@ +/* +Copyright 2019 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mutating + +import ( + "context" + "encoding/json" + "math" + "net/http" + + kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" + appsv1beta1 "github.com/openkruise/rollouts/api/v1beta1" + "github.com/openkruise/rollouts/pkg/util" + utilclient "github.com/openkruise/rollouts/pkg/util/client" + util2 "github.com/openkruise/rollouts/pkg/webhook/util" + "github.com/openkruise/rollouts/pkg/webhook/util/configuration" + admissionv1 "k8s.io/api/admission/v1" + v1 "k8s.io/api/admissionregistration/v1" + apps "k8s.io/api/apps/v1" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + labels2 "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/runtime/inject" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +// UnifiedWorkloadHandler handles Pod +type UnifiedWorkloadHandler struct { + // To use the client, you need to do the following: + // - uncomment it + // - import sigs.k8s.io/controller-runtime/pkg/client + // - uncomment the InjectClient method at the bottom of this file. + Client client.Client + + // Decoder decodes objects + Decoder *admission.Decoder + Finder *util.ControllerFinder +} + +var _ admission.Handler = &UnifiedWorkloadHandler{} + +// Handle handles admission requests. +func (h *UnifiedWorkloadHandler) Handle(ctx context.Context, req admission.Request) admission.Response { + // if subResources, then ignore + if req.Operation != admissionv1.Update || req.SubResource != "" { + return admission.Allowed("") + } + + meetingRules, err := h.checkWorkloadRules(ctx, req) + if err != nil { + return admission.Errored(http.StatusBadRequest, err) + } + + if !meetingRules { + return admission.Allowed("") + } + + switch req.Kind.Group { + // kruise cloneSet + case kruiseappsv1alpha1.GroupVersion.Group: + switch req.Kind.Kind { + case util.ControllerKruiseKindCS.Kind, util.ControllerKruiseKindDS.Kind: + return admission.Allowed("") + } + // native k8s deloyment + case apps.SchemeGroupVersion.Group: + switch req.Kind.Kind { + case util.ControllerKindDep.Kind: + return admission.Allowed("") + } + } + + // handle other workload types, including native/advanced statefulset + { + newObj := &unstructured.Unstructured{} + newObj.SetGroupVersionKind(schema.GroupVersionKind{Group: req.Kind.Group, Version: req.Kind.Version, Kind: req.Kind.Kind}) + if err := h.Decoder.Decode(req, newObj); err != nil { + return admission.Errored(http.StatusBadRequest, err) + } + if !util.IsWorkloadType(newObj, util.StatefulSetType) && req.Kind.Kind != util.ControllerKindSts.Kind { + return admission.Allowed("") + } + oldObj := &unstructured.Unstructured{} + oldObj.SetGroupVersionKind(schema.GroupVersionKind{Group: req.Kind.Group, Version: req.Kind.Version, Kind: req.Kind.Kind}) + if err := h.Decoder.Decode( + admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}}, + oldObj); err != nil { + return admission.Errored(http.StatusBadRequest, err) + } + changed, err := h.handleStatefulSetLikeWorkload(newObj, oldObj) + if err != nil { + return admission.Errored(http.StatusBadRequest, err) + } + if !changed { + return admission.Allowed("") + } + marshalled, err := json.Marshal(newObj.Object) + if err != nil { + return admission.Errored(http.StatusInternalServerError, err) + } + return admission.PatchResponseFromRaw(req.AdmissionRequest.Object.Raw, marshalled) + } +} + +func (h *UnifiedWorkloadHandler) handleStatefulSetLikeWorkload(newObj, oldObj *unstructured.Unstructured) (bool, error) { + // indicate whether the workload can enter the rollout process + // 1. replicas > 0 + if util.GetReplicas(newObj) == 0 || !util.IsStatefulSetRollingUpdate(newObj) { + return false, nil + } + oldTemplate, newTemplate := util.GetTemplate(oldObj), util.GetTemplate(newObj) + if oldTemplate == nil || newTemplate == nil { + return false, nil + } + oldMetadata, newMetadata := util.GetMetadata(oldObj), util.GetMetadata(newObj) + if newMetadata.Annotations[appsv1beta1.RolloutIDLabel] != "" && + oldMetadata.Annotations[appsv1beta1.RolloutIDLabel] == newMetadata.Annotations[appsv1beta1.RolloutIDLabel] { + return false, nil + } else if newMetadata.Annotations[appsv1beta1.RolloutIDLabel] == "" && util.EqualIgnoreHash(oldTemplate, newTemplate) { + return false, nil + } + + rollout, err := h.fetchMatchedRollout(newObj) + if err != nil { + return false, err + } else if rollout == nil || rollout.Spec.Strategy.IsEmptyRelease() { + return false, nil + } + + util.SetStatefulSetPartition(newObj, math.MaxInt16) + state := &util.RolloutState{RolloutName: rollout.Name} + by, _ := json.Marshal(state) + annotation := newObj.GetAnnotations() + if annotation == nil { + annotation = map[string]string{} + } + annotation[util.InRolloutProgressingAnnotation] = string(by) + newObj.SetAnnotations(annotation) + klog.Infof("StatefulSet(%s/%s) will be released incrementally based on Rollout(%s)", newMetadata.Namespace, newMetadata.Name, rollout.Name) + return true, nil +} + +func (h *UnifiedWorkloadHandler) fetchMatchedRollout(obj client.Object) (*appsv1beta1.Rollout, error) { + oGv := obj.GetObjectKind().GroupVersionKind() + rolloutList := &appsv1beta1.RolloutList{} + if err := h.Client.List(context.TODO(), rolloutList, utilclient.DisableDeepCopy, + &client.ListOptions{Namespace: obj.GetNamespace()}); err != nil { + klog.Errorf("UnifiedWorkloadHandler List rollout failed: %s", err.Error()) + return nil, err + } + for i := range rolloutList.Items { + rollout := &rolloutList.Items[i] + if !rollout.DeletionTimestamp.IsZero() { + continue + } + if rollout.Status.Phase == appsv1beta1.RolloutPhaseDisabled { + klog.Infof("Disabled rollout(%s/%s) fetched when fetching matched rollout", rollout.Namespace, rollout.Name) + continue + } + ref := rollout.Spec.WorkloadRef + gv, err := schema.ParseGroupVersion(ref.APIVersion) + if err != nil { + klog.Warningf("ParseGroupVersion rollout(%s/%s) ref failed: %s", rollout.Namespace, rollout.Name, err.Error()) + continue + } + if oGv.Group == gv.Group && oGv.Kind == ref.Kind && obj.GetName() == ref.Name { + return rollout, nil + } + } + return nil, nil +} + +var _ inject.Client = &UnifiedWorkloadHandler{} + +// InjectClient injects the client into the UnifiedWorkloadHandler +func (h *UnifiedWorkloadHandler) InjectClient(c client.Client) error { + h.Client = c + h.Finder = util.NewControllerFinder(c) + return nil +} + +var _ admission.DecoderInjector = &UnifiedWorkloadHandler{} + +// InjectDecoder injects the decoder into the UnifiedWorkloadHandler +func (h *UnifiedWorkloadHandler) InjectDecoder(d *admission.Decoder) error { + h.Decoder = d + return nil +} + +func (h *UnifiedWorkloadHandler) checkWorkloadRules(ctx context.Context, req admission.Request) (bool, error) { + webhook := &v1.MutatingWebhookConfiguration{} + if err := h.Client.Get(ctx, types.NamespacedName{Name: configuration.MutatingWebhookConfigurationName}, webhook); err != nil { + return false, err + } + + newObject := unstructured.Unstructured{} + if err := h.Decoder.Decode(req, &newObject); err != nil { + return false, err + } + + labels := newObject.GetLabels() + + attr, err := constructAttr(req) + if err != nil { + return false, err + } + + for _, webhook := range webhook.Webhooks { + for _, rule := range webhook.Rules { + m := util2.Matcher{Rule: rule, Attr: attr} + if m.Matches() { + selector, err := v12.LabelSelectorAsSelector(webhook.ObjectSelector) + if err != nil { + return false, nil + } + if selector.Matches(labels2.Set(labels)) { + return true, nil + } + } + } + } + return false, nil +} diff --git a/pkg/webhook/workload/mutating/unified_update_handler_test.go b/pkg/webhook/workload/mutating/unified_update_handler_test.go new file mode 100644 index 0000000..3e70c28 --- /dev/null +++ b/pkg/webhook/workload/mutating/unified_update_handler_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mutating + +import ( + "context" + "encoding/json" + "math" + "reflect" + "testing" + + kruiseappsv1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" + appsv1beta1 "github.com/openkruise/rollouts/api/v1beta1" + "github.com/openkruise/rollouts/pkg/util" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +func TestHandleStatefulSet(t *testing.T) { + cases := []struct { + name string + getObjs func() (*kruiseappsv1beta1.StatefulSet, *kruiseappsv1beta1.StatefulSet) + expectObj func() *kruiseappsv1beta1.StatefulSet + getRollout func() *appsv1beta1.Rollout + isError bool + }{ + { + name: "cloneSet image v1->v2, matched rollout", + getObjs: func() (*kruiseappsv1beta1.StatefulSet, *kruiseappsv1beta1.StatefulSet) { + oldObj := statefulset.DeepCopy() + newObj := statefulset.DeepCopy() + newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2" + return oldObj, newObj + }, + expectObj: func() *kruiseappsv1beta1.StatefulSet { + obj := statefulset.DeepCopy() + obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2" + obj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo"}` + obj.Spec.UpdateStrategy.RollingUpdate.Partition = pointer.Int32(math.MaxInt16) + return obj + }, + getRollout: func() *appsv1beta1.Rollout { + obj := rolloutDemo.DeepCopy() + obj.Spec.WorkloadRef = appsv1beta1.ObjectRef{ + APIVersion: "apps.kruise.io/v1beta1", + Kind: "StatefulSet", + Name: "echoserver", + } + return obj + }, + }, + } + + decoder, _ := admission.NewDecoder(scheme) + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + client := fake.NewClientBuilder().WithScheme(scheme).Build() + h := UnifiedWorkloadHandler{ + Client: client, + Decoder: decoder, + Finder: util.NewControllerFinder(client), + } + rollout := cs.getRollout() + if err := client.Create(context.TODO(), rollout); err != nil { + t.Errorf(err.Error()) + } + + oldObj, newObj := cs.getObjs() + oldO, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(oldObj) + newO, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(newObj) + oldUnstructured := &unstructured.Unstructured{Object: oldO} + newUnstructured := &unstructured.Unstructured{Object: newO} + oldUnstructured.SetGroupVersionKind(newObj.GroupVersionKind()) + newUnstructured.SetGroupVersionKind(newObj.GroupVersionKind()) + _, err := h.handleStatefulSetLikeWorkload(newUnstructured, oldUnstructured) + if cs.isError && err == nil { + t.Fatal("handleStatefulSetLikeWorkload failed") + } else if !cs.isError && err != nil { + t.Fatalf(err.Error()) + } + newStructured := &kruiseappsv1beta1.StatefulSet{} + err = runtime.DefaultUnstructuredConverter.FromUnstructured(newUnstructured.Object, newStructured) + if err != nil { + t.Fatal("DefaultUnstructuredConvert failed") + } + expect := cs.expectObj() + if !reflect.DeepEqual(newStructured, expect) { + by, _ := json.Marshal(newStructured) + t.Fatalf("handlerCloneSet failed, and new(%s)", string(by)) + } + }) + } +} diff --git a/pkg/webhook/workload/mutating/webhooks.go b/pkg/webhook/workload/mutating/webhooks.go index 6ff5d4b..c31c2c0 100644 --- a/pkg/webhook/workload/mutating/webhooks.go +++ b/pkg/webhook/workload/mutating/webhooks.go @@ -23,8 +23,6 @@ import ( // +kubebuilder:webhook:path=/mutate-apps-kruise-io-v1alpha1-cloneset,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps.kruise.io,resources=clonesets,verbs=update,versions=v1alpha1,name=mcloneset.kb.io // +kubebuilder:webhook:path=/mutate-apps-kruise-io-v1alpha1-daemonset,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps.kruise.io,resources=daemonsets,verbs=update,versions=v1alpha1,name=mdaemonset.kb.io // +kubebuilder:webhook:path=/mutate-apps-v1-deployment,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps,resources=deployments,verbs=update,versions=v1,name=mdeployment.kb.io -// +kubebuilder:webhook:path=/mutate-apps-v1-statefulset,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps,resources=statefulsets,verbs=update,versions=v1,name=mstatefulset.kb.io -// +kubebuilder:webhook:path=/mutate-apps-kruise-io-statefulset,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps.kruise.io,resources=statefulsets,verbs=create;update,versions=v1alpha1;v1beta1,name=madvancedstatefulset.kb.io // +kubebuilder:webhook:path=/mutate-unified-workload,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=*,resources=*,verbs=create;update,versions=*,name=munifiedworload.kb.io var ( @@ -32,9 +30,7 @@ var ( HandlerMap = map[string]admission.Handler{ "mutate-apps-kruise-io-v1alpha1-cloneset": &WorkloadHandler{}, "mutate-apps-v1-deployment": &WorkloadHandler{}, - "mutate-apps-v1-statefulset": &WorkloadHandler{}, - "mutate-apps-kruise-io-statefulset": &WorkloadHandler{}, - "mutate-unified-workload": &WorkloadHandler{}, "mutate-apps-kruise-io-v1alpha1-daemonset": &WorkloadHandler{}, + "mutate-unified-workload": &UnifiedWorkloadHandler{}, } ) diff --git a/pkg/webhook/workload/mutating/workload_update_handler.go b/pkg/webhook/workload/mutating/workload_update_handler.go index 0170004..e337616 100644 --- a/pkg/webhook/workload/mutating/workload_update_handler.go +++ b/pkg/webhook/workload/mutating/workload_update_handler.go @@ -189,74 +189,7 @@ func (h *WorkloadHandler) Handle(ctx context.Context, req admission.Request) adm } } - // handle other workload types, including native/advanced statefulset - { - newObj := &unstructured.Unstructured{} - newObj.SetGroupVersionKind(schema.GroupVersionKind{Group: req.Kind.Group, Version: req.Kind.Version, Kind: req.Kind.Kind}) - if err := h.Decoder.Decode(req, newObj); err != nil { - return admission.Errored(http.StatusBadRequest, err) - } - if !util.IsWorkloadType(newObj, util.StatefulSetType) && req.Kind.Kind != util.ControllerKindSts.Kind { - return admission.Allowed("") - } - oldObj := &unstructured.Unstructured{} - oldObj.SetGroupVersionKind(schema.GroupVersionKind{Group: req.Kind.Group, Version: req.Kind.Version, Kind: req.Kind.Kind}) - if err := h.Decoder.Decode( - admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}}, - oldObj); err != nil { - return admission.Errored(http.StatusBadRequest, err) - } - changed, err := h.handleStatefulSetLikeWorkload(newObj, oldObj) - if err != nil { - return admission.Errored(http.StatusBadRequest, err) - } - if !changed { - return admission.Allowed("") - } - marshalled, err := json.Marshal(newObj.Object) - if err != nil { - return admission.Errored(http.StatusInternalServerError, err) - } - return admission.PatchResponseFromRaw(req.AdmissionRequest.Object.Raw, marshalled) - } -} - -func (h *WorkloadHandler) handleStatefulSetLikeWorkload(newObj, oldObj *unstructured.Unstructured) (bool, error) { - // indicate whether the workload can enter the rollout process - // 1. replicas > 0 - if util.GetReplicas(newObj) == 0 || !util.IsStatefulSetRollingUpdate(newObj) { - return false, nil - } - oldTemplate, newTemplate := util.GetTemplate(oldObj), util.GetTemplate(newObj) - if oldTemplate == nil || newTemplate == nil { - return false, nil - } - oldMetadata, newMetadata := util.GetMetadata(oldObj), util.GetMetadata(newObj) - if newMetadata.Annotations[appsv1beta1.RolloutIDLabel] != "" && - oldMetadata.Annotations[appsv1beta1.RolloutIDLabel] == newMetadata.Annotations[appsv1beta1.RolloutIDLabel] { - return false, nil - } else if newMetadata.Annotations[appsv1beta1.RolloutIDLabel] == "" && util.EqualIgnoreHash(oldTemplate, newTemplate) { - return false, nil - } - - rollout, err := h.fetchMatchedRollout(newObj) - if err != nil { - return false, err - } else if rollout == nil || rollout.Spec.Strategy.IsEmptyRelease() { - return false, nil - } - - util.SetStatefulSetPartition(newObj, math.MaxInt16) - state := &util.RolloutState{RolloutName: rollout.Name} - by, _ := json.Marshal(state) - annotation := newObj.GetAnnotations() - if annotation == nil { - annotation = map[string]string{} - } - annotation[util.InRolloutProgressingAnnotation] = string(by) - newObj.SetAnnotations(annotation) - klog.Infof("StatefulSet(%s/%s) will be released incrementally based on Rollout(%s)", newMetadata.Namespace, newMetadata.Name, rollout.Name) - return true, nil + return admission.Allowed("") } func (h *WorkloadHandler) handleDeployment(newObj, oldObj *apps.Deployment) (bool, error) { @@ -386,21 +319,6 @@ func (h *WorkloadHandler) handleCloneSet(newObj, oldObj *kruiseappsv1alpha1.Clon } else if rollout == nil || rollout.Spec.Strategy.IsEmptyRelease() { return false, nil } - /* - continuous release (or successive release) is not supported for bluegreen release, especially for cloneset, - here is why: - suppose we are releasing a cloneset, which has pods of both v1 and v2 for now. If we release v3 before - v2 release is done, the cloneset controller might scale down pods without distinguishing between v1 and v2. - This is because our implementation is based on the minReadySeconds, pods of both v1 and v2 are "unavailable" - in the progress of rollout. - Deployment actually has the same problem, however it is possible to bypass this issue for Deployment by setting - minReadySeconds for replicaset separately; unfortunately this workaround seems not work for cloneset - - // if rollout.Spec.Strategy.IsBlueGreenRelease() && revision.IsContinuousRelease(h.Client, oldObj, newObj) { - // err = fmt.Errorf("successive release is not supported for bluegreen for now, rollback first") - // return false, fmt.Errorf(err.Error()) - // } - */ // if traffic routing, there must only be one version of Pods if rollout.Spec.Strategy.HasTrafficRoutings() && newObj.Status.Replicas != newObj.Status.UpdatedReplicas { diff --git a/pkg/webhook/workload/mutating/workload_update_handler_test.go b/pkg/webhook/workload/mutating/workload_update_handler_test.go index 083c624..13282de 100644 --- a/pkg/webhook/workload/mutating/workload_update_handler_test.go +++ b/pkg/webhook/workload/mutating/workload_update_handler_test.go @@ -36,7 +36,6 @@ import ( apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -825,82 +824,6 @@ func TestHandlerDaemonSet(t *testing.T) { } } -func TestHandleStatefulSet(t *testing.T) { - cases := []struct { - name string - getObjs func() (*kruiseappsv1beta1.StatefulSet, *kruiseappsv1beta1.StatefulSet) - expectObj func() *kruiseappsv1beta1.StatefulSet - getRollout func() *appsv1beta1.Rollout - isError bool - }{ - { - name: "cloneSet image v1->v2, matched rollout", - getObjs: func() (*kruiseappsv1beta1.StatefulSet, *kruiseappsv1beta1.StatefulSet) { - oldObj := statefulset.DeepCopy() - newObj := statefulset.DeepCopy() - newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2" - return oldObj, newObj - }, - expectObj: func() *kruiseappsv1beta1.StatefulSet { - obj := statefulset.DeepCopy() - obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2" - obj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo"}` - obj.Spec.UpdateStrategy.RollingUpdate.Partition = pointer.Int32(math.MaxInt16) - return obj - }, - getRollout: func() *appsv1beta1.Rollout { - obj := rolloutDemo.DeepCopy() - obj.Spec.WorkloadRef = appsv1beta1.ObjectRef{ - APIVersion: "apps.kruise.io/v1beta1", - Kind: "StatefulSet", - Name: "echoserver", - } - return obj - }, - }, - } - - decoder, _ := admission.NewDecoder(scheme) - for _, cs := range cases { - t.Run(cs.name, func(t *testing.T) { - client := fake.NewClientBuilder().WithScheme(scheme).Build() - h := WorkloadHandler{ - Client: client, - Decoder: decoder, - Finder: util.NewControllerFinder(client), - } - rollout := cs.getRollout() - if err := client.Create(context.TODO(), rollout); err != nil { - t.Errorf(err.Error()) - } - - oldObj, newObj := cs.getObjs() - oldO, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(oldObj) - newO, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(newObj) - oldUnstructured := &unstructured.Unstructured{Object: oldO} - newUnstructured := &unstructured.Unstructured{Object: newO} - oldUnstructured.SetGroupVersionKind(newObj.GroupVersionKind()) - newUnstructured.SetGroupVersionKind(newObj.GroupVersionKind()) - _, err := h.handleStatefulSetLikeWorkload(newUnstructured, oldUnstructured) - if cs.isError && err == nil { - t.Fatal("handleStatefulSetLikeWorkload failed") - } else if !cs.isError && err != nil { - t.Fatalf(err.Error()) - } - newStructured := &kruiseappsv1beta1.StatefulSet{} - err = runtime.DefaultUnstructuredConverter.FromUnstructured(newUnstructured.Object, newStructured) - if err != nil { - t.Fatal("DefaultUnstructuredConvert failed") - } - expect := cs.expectObj() - if !reflect.DeepEqual(newStructured, expect) { - by, _ := json.Marshal(newStructured) - t.Fatalf("handlerCloneSet failed, and new(%s)", string(by)) - } - }) - } -} - func TestCheckWorkloadRule(t *testing.T) { ctx := context.Background()