add unit test & split workload mutating webhook

Signed-off-by: yunbo <yunbo10124scut@gmail.com>
This commit is contained in:
yunbo 2024-12-10 17:40:45 +08:00
parent 0cb11a8203
commit 4811ca7dd5
16 changed files with 796 additions and 303 deletions

View File

@ -65,48 +65,6 @@ webhooks:
resources:
- deployments
sideEffects: None
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
service:
name: webhook-service
namespace: system
path: /mutate-apps-v1-statefulset
failurePolicy: Fail
name: mstatefulset.kb.io
rules:
- apiGroups:
- apps
apiVersions:
- v1
operations:
- UPDATE
resources:
- statefulsets
sideEffects: None
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
service:
name: webhook-service
namespace: system
path: /mutate-apps-kruise-io-statefulset
failurePolicy: Fail
name: madvancedstatefulset.kb.io
rules:
- apiGroups:
- apps.kruise.io
apiVersions:
- v1alpha1
- v1beta1
operations:
- CREATE
- UPDATE
resources:
- statefulsets
sideEffects: None
- admissionReviewVersions:
- v1
- v1beta1

View File

@ -18,16 +18,16 @@ webhooks:
matchExpressions:
- key: rollouts.kruise.io/workload-type
operator: Exists
- name: mstatefulset.kb.io
objectSelector:
matchExpressions:
- key: rollouts.kruise.io/workload-type
operator: Exists
- name: madvancedstatefulset.kb.io
objectSelector:
matchExpressions:
- key: rollouts.kruise.io/workload-type
operator: Exists
# - name: mstatefulset.kb.io
# objectSelector:
# matchExpressions:
# - key: rollouts.kruise.io/workload-type
# operator: Exists
# - name: madvancedstatefulset.kb.io
# objectSelector:
# matchExpressions:
# - key: rollouts.kruise.io/workload-type
# operator: Exists
- name: mdeployment.kb.io
objectSelector:
matchExpressions:

View File

@ -151,7 +151,7 @@ func (r *Executor) progressBatches(release *v1beta1.BatchRelease, newStatus *v1b
result = reconcile.Result{RequeueAfter: DefaultDuration}
removeProgressingCondition(newStatus)
newStatus.CanaryStatus.CurrentBatchState = v1beta1.VerifyingBatchState
case errors.IsFatal(err):
case errors.IsBadRequest(err):
progressingStateTransition(newStatus, v1.ConditionTrue, v1beta1.ProgressingReasonInRolling, err.Error())
fallthrough
default:

View File

@ -94,7 +94,7 @@ func (rc *realController) Initialize(release *v1beta1.BatchRelease) error {
// patch the cloneset
setting, err := control.GetOriginalSetting(rc.object)
if err != nil {
return errors.NewFatalError(fmt.Errorf("cannot get original setting for cloneset %v: %s from annotation", klog.KObj(rc.object), err.Error()))
return errors.NewBadRequestError(fmt.Errorf("cannot get original setting for cloneset %v: %s from annotation", klog.KObj(rc.object), err.Error()))
}
control.InitOriginalSetting(&setting, rc.object)
patchData := patch.NewClonesetPatch()
@ -115,7 +115,7 @@ func (rc *realController) Initialize(release *v1beta1.BatchRelease) error {
func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error {
if err := control.ValidateReadyForBlueGreenRelease(rc.object); err != nil {
return errors.NewFatalError(fmt.Errorf("cannot upgrade batch, because cloneset %v doesn't satisfy conditions: %s", klog.KObj(rc.object), err.Error()))
return errors.NewBadRequestError(fmt.Errorf("cannot upgrade batch, because cloneset %v doesn't satisfy conditions: %s", klog.KObj(rc.object), err.Error()))
}
desired, _ := intstr.GetScaledValueFromIntOrPercent(&ctx.DesiredSurge, int(ctx.Replicas), true)
current, _ := intstr.GetScaledValueFromIntOrPercent(&ctx.CurrentSurge, int(ctx.Replicas), true)
@ -133,10 +133,6 @@ func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error {
}
func (rc *realController) Finalize(release *v1beta1.BatchRelease) error {
if rc.finalized() {
return nil // No need to finalize again
}
if release.Spec.ReleasePlan.BatchPartition != nil {
// continuous release (not supported yet)
/*
@ -148,39 +144,37 @@ func (rc *realController) Finalize(release *v1beta1.BatchRelease) error {
return nil
}
c := util.GetEmptyObjectWithKey(rc.object)
setting, err := control.GetOriginalSetting(rc.object)
if err != nil {
return errors.NewFatalError(fmt.Errorf("cannot get original setting for cloneset %v: %s from annotation", klog.KObj(rc.object), err.Error()))
}
patchData := patch.NewClonesetPatch()
if rc.object.Spec.MinReadySeconds != setting.MinReadySeconds {
// restore the hpa
if err := hpa.RestoreHPA(rc.client, rc.object); err != nil {
// restore the original setting and remove annotation
if !rc.restored() {
c := util.GetEmptyObjectWithKey(rc.object)
setting, err := control.GetOriginalSetting(rc.object)
if err != nil {
return err
}
// restore the original setting
patchData := patch.NewClonesetPatch()
patchData.UpdateMinReadySeconds(setting.MinReadySeconds)
patchData.UpdateMaxSurge(setting.MaxSurge)
patchData.UpdateMaxUnavailable(setting.MaxUnavailable)
patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation)
patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation)
if err := rc.client.Patch(context.TODO(), c, patchData); err != nil {
return err
}
klog.InfoS("Finalize: cloneset bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object))
}
klog.InfoS("Finalize: cloneset bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object))
// wait all pods updated and ready
if rc.object.Status.ReadyReplicas != rc.object.Status.UpdatedReadyReplicas {
return errors.NewBenignError(fmt.Errorf("cloneset %v finalize not done, readyReplicas %d != updatedReadyReplicas %d, current policy %s",
return errors.NewRetryError(fmt.Errorf("cloneset %v finalize not done, readyReplicas %d != updatedReadyReplicas %d, current policy %s",
klog.KObj(rc.object), rc.object.Status.ReadyReplicas, rc.object.Status.UpdatedReadyReplicas, release.Spec.ReleasePlan.FinalizingPolicy))
}
klog.InfoS("Finalize: cloneset bluegreen release: all pods updated and ready")
// restore annotation
patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation)
patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation)
return rc.client.Patch(context.TODO(), c, patchData)
// restore the hpa
return hpa.RestoreHPA(rc.client, rc.object)
}
func (rc *realController) finalized() bool {
func (rc *realController) restored() bool {
if rc.object == nil || rc.object.DeletionTimestamp != nil {
return true
}

View File

@ -31,6 +31,7 @@ import (
control "github.com/openkruise/rollouts/pkg/controller/batchrelease/control"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -38,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
@ -101,7 +103,7 @@ var (
UpdateRevision: "version-2",
CurrentRevision: "version-1",
ObservedGeneration: 1,
CollisionCount: pointer.Int32Ptr(1),
CollisionCount: pointer.Int32(1),
},
}
@ -141,14 +143,157 @@ var (
},
},
}
hpaDemo = &autoscalingv1.HorizontalPodAutoscaler{
TypeMeta: metav1.TypeMeta{
APIVersion: "autoscaling/v1",
Kind: "HorizontalPodAutoscaler",
},
ObjectMeta: metav1.ObjectMeta{
Name: "hpa",
Namespace: cloneKey.Namespace,
},
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: cloneDemo.Name,
},
MinReplicas: pointer.Int32(1),
MaxReplicas: 10,
},
}
)
func init() {
apps.AddToScheme(scheme)
rolloutapi.AddToScheme(scheme)
kruiseappsv1alpha1.AddToScheme(scheme)
autoscalingv1.AddToScheme(scheme)
}
func TestControlPackage(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "CloneSet Control Package Suite")
}
var _ = Describe("CloneSet Control", func() {
var (
c client.Client
rc *realController
cloneset *kruiseappsv1alpha1.CloneSet
release *v1beta1.BatchRelease
hpa *autoscalingv1.HorizontalPodAutoscaler
)
BeforeEach(func() {
cloneset = cloneDemo.DeepCopy()
release = releaseDemo.DeepCopy()
hpa = hpaDemo.DeepCopy()
c = fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(cloneset, release, hpa).
Build()
rc = &realController{
key: types.NamespacedName{Namespace: cloneset.Namespace, Name: cloneset.Name},
client: c,
}
})
It("should initialize cloneset successfully", func() {
// build controller
_, err := rc.BuildController()
Expect(err).NotTo(HaveOccurred())
// call Initialize method
err = retryFunction(3, func() error {
return rc.Initialize(release)
})
Expect(err).NotTo(HaveOccurred())
// inspect if HPA is disabled
disabledHPA := &autoscalingv1.HorizontalPodAutoscaler{}
err = c.Get(context.TODO(), types.NamespacedName{Namespace: hpa.Namespace, Name: hpa.Name}, disabledHPA)
Expect(err).NotTo(HaveOccurred())
Expect(disabledHPA.Spec.ScaleTargetRef.Name).To(Equal(cloneset.Name + "-DisableByRollout"))
// inspect if Cloneset is patched properly
updatedCloneset := &kruiseappsv1alpha1.CloneSet{}
err = c.Get(context.TODO(), client.ObjectKeyFromObject(cloneset), updatedCloneset)
Expect(err).NotTo(HaveOccurred())
// inspect if annotations are added
Expect(updatedCloneset.Annotations).To(HaveKey(v1beta1.OriginalDeploymentStrategyAnnotation))
Expect(updatedCloneset.Annotations).To(HaveKey(util.BatchReleaseControlAnnotation))
Expect(updatedCloneset.Annotations[util.BatchReleaseControlAnnotation]).To(Equal(getControlInfo(release)))
// inspect if strategy is updated
Expect(updatedCloneset.Spec.UpdateStrategy.Paused).To(BeFalse())
Expect(updatedCloneset.Spec.UpdateStrategy.MaxSurge.IntVal).To(Equal(int32(1)))
Expect(updatedCloneset.Spec.UpdateStrategy.MaxUnavailable.IntVal).To(Equal(int32(0)))
Expect(updatedCloneset.Spec.MinReadySeconds).To(Equal(int32(v1beta1.MaxReadySeconds)))
})
It("should finalize CloneSet successfully", func() {
// hack to patch cloneset status
cloneset.Status.UpdatedReadyReplicas = 10
err := c.Status().Update(context.TODO(), cloneset)
Expect(err).NotTo(HaveOccurred())
// build controller
rc.object = nil
_, err = rc.BuildController()
Expect(err).NotTo(HaveOccurred())
// call Finalize method
err = retryFunction(3, func() error {
return rc.Finalize(release)
})
Expect(err).NotTo(HaveOccurred())
// inspect if CloneSet is patched properly
updatedCloneset := &kruiseappsv1alpha1.CloneSet{}
err = c.Get(context.TODO(), client.ObjectKeyFromObject(cloneset), updatedCloneset)
Expect(err).NotTo(HaveOccurred())
// inspect if annotations are removed
Expect(updatedCloneset.Annotations).NotTo(HaveKey(v1beta1.OriginalDeploymentStrategyAnnotation))
Expect(updatedCloneset.Annotations).NotTo(HaveKey(util.BatchReleaseControlAnnotation))
// inspect if strategy is restored
Expect(updatedCloneset.Spec.UpdateStrategy.MaxSurge).To(BeNil())
Expect(*updatedCloneset.Spec.UpdateStrategy.MaxUnavailable).To(Equal(intstr.IntOrString{Type: intstr.Int, IntVal: 1}))
Expect(updatedCloneset.Spec.MinReadySeconds).To(Equal(int32(0)))
// inspect if HPA is restored
restoredHPA := &autoscalingv1.HorizontalPodAutoscaler{}
err = c.Get(context.TODO(), types.NamespacedName{Namespace: hpa.Namespace, Name: hpa.Name}, restoredHPA)
Expect(err).NotTo(HaveOccurred())
Expect(restoredHPA.Spec.ScaleTargetRef.Name).To(Equal(cloneset.Name))
})
It("should upgradBatch for CloneSet successfully", func() {
// call Initialize method
_, err := rc.BuildController()
Expect(err).NotTo(HaveOccurred())
err = retryFunction(3, func() error {
return rc.Initialize(release)
})
Expect(err).NotTo(HaveOccurred())
// call UpgradeBatch method
rc.object = nil
_, err = rc.BuildController()
Expect(err).NotTo(HaveOccurred())
batchContext, err := rc.CalculateBatchContext(release)
Expect(err).NotTo(HaveOccurred())
err = rc.UpgradeBatch(batchContext)
Expect(err).NotTo(HaveOccurred())
// inspect if CloneSet is patched properly
updatedCloneset := &kruiseappsv1alpha1.CloneSet{}
err = c.Get(context.TODO(), client.ObjectKeyFromObject(cloneset), updatedCloneset)
Expect(err).NotTo(HaveOccurred())
Expect(*updatedCloneset.Spec.UpdateStrategy.MaxSurge).To(Equal(intstr.IntOrString{Type: intstr.String, StrVal: "10%"}))
Expect(*updatedCloneset.Spec.UpdateStrategy.MaxUnavailable).To(Equal(intstr.IntOrString{Type: intstr.Int, IntVal: 0}))
})
})
func TestCalculateBatchContext(t *testing.T) {
RegisterFailHandler(Fail)
cases := map[string]struct {
@ -392,3 +537,12 @@ func getControlInfo(release *v1beta1.BatchRelease) string {
owner, _ := json.Marshal(metav1.NewControllerRef(release, release.GetObjectKind().GroupVersionKind()))
return string(owner)
}
func retryFunction(limit int, f func() error) (err error) {
for i := limit; i >= 0; i-- {
if err = f(); err == nil {
return nil
}
}
return err
}

View File

@ -111,7 +111,7 @@ func (rc *realController) Initialize(release *v1beta1.BatchRelease) error {
func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error {
if err := control.ValidateReadyForBlueGreenRelease(rc.object); err != nil {
return errors.NewFatalError(fmt.Errorf("cannot upgrade batch, because deployment %v doesn't satisfy conditions: %s", klog.KObj(rc.object), err.Error()))
return errors.NewBadRequestError(fmt.Errorf("cannot upgrade batch, because deployment %v doesn't satisfy conditions: %s", klog.KObj(rc.object), err.Error()))
}
desired, _ := intstr.GetScaledValueFromIntOrPercent(&ctx.DesiredSurge, int(ctx.Replicas), true)
current, _ := intstr.GetScaledValueFromIntOrPercent(&ctx.CurrentSurge, int(ctx.Replicas), true)
@ -137,9 +137,6 @@ func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error {
// set pause to false, restore the original setting, delete annotation
func (rc *realController) Finalize(release *v1beta1.BatchRelease) error {
if rc.finalized() {
return nil // No need to finalize again.
}
if release.Spec.ReleasePlan.BatchPartition != nil {
// continuous release (not supported yet)
/*
@ -153,41 +150,41 @@ func (rc *realController) Finalize(release *v1beta1.BatchRelease) error {
return nil
}
// restore the original setting and remove annotation
d := util.GetEmptyObjectWithKey(rc.object)
setting, err := control.GetOriginalSetting(rc.object)
if err != nil {
return errors.NewFatalError(fmt.Errorf("cannot get original setting for cloneset %v: %s from annotation", klog.KObj(rc.object), err.Error()))
}
patchData := patch.NewDeploymentPatch()
if rc.object.Spec.MinReadySeconds != setting.MinReadySeconds {
// restore the hpa
if err := hpa.RestoreHPA(rc.client, rc.object); err != nil {
if !rc.restored() {
setting, err := control.GetOriginalSetting(rc.object)
if err != nil {
return err
}
patchData := patch.NewDeploymentPatch()
// restore the original setting
patchData.UpdatePaused(false)
patchData.UpdateMinReadySeconds(setting.MinReadySeconds)
patchData.UpdateProgressDeadlineSeconds(setting.ProgressDeadlineSeconds)
patchData.UpdateMaxSurge(setting.MaxSurge)
patchData.UpdateMaxUnavailable(setting.MaxUnavailable)
// restore label and annotation
patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation)
patchData.DeleteLabel(v1alpha1.DeploymentStableRevisionLabel)
patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation)
if err := rc.client.Patch(context.TODO(), d, patchData); err != nil {
return err
}
klog.InfoS("Finalize: deployment bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object))
}
klog.InfoS("Finalize: deployment bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object))
// wait all pods updated and ready
if err := waitAllUpdatedAndReady(d.(*apps.Deployment)); err != nil {
return errors.NewBenignError(err)
return errors.NewRetryError(err)
}
klog.InfoS("Finalize: deployment is ready to resume, restore the original setting", "deployment", klog.KObj(rc.object))
// restore label and annotation
patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation)
patchData.DeleteLabel(v1alpha1.DeploymentStableRevisionLabel)
patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation)
return rc.client.Patch(context.TODO(), d, patchData)
// restore hpa
return hpa.RestoreHPA(rc.client, rc.object)
}
func (rc *realController) finalized() bool {
func (rc *realController) restored() bool {
if rc.object == nil || rc.object.DeletionTimestamp != nil {
return true
}
@ -313,7 +310,7 @@ func (rc *realController) patchStableRSMinReadySeconds(seconds int32) error {
func (rc *realController) patchDeployment(release *v1beta1.BatchRelease) error {
setting, err := control.GetOriginalSetting(rc.object)
if err != nil {
return errors.NewFatalError(fmt.Errorf("cannot get original setting for deployment %v: %s", klog.KObj(rc.object), err.Error()))
return errors.NewBadRequestError(fmt.Errorf("cannot get original setting for deployment %v: %s", klog.KObj(rc.object), err.Error()))
}
control.InitOriginalSetting(&setting, rc.object)
patchData := patch.NewDeploymentPatch()

View File

@ -35,6 +35,7 @@ import (
"github.com/openkruise/rollouts/pkg/util"
"github.com/openkruise/rollouts/pkg/util/errors"
apps "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -107,7 +108,7 @@ var (
UpdatedReplicas: 0,
ReadyReplicas: 10,
AvailableReplicas: 10,
CollisionCount: pointer.Int32Ptr(1),
CollisionCount: pointer.Int32(1),
ObservedGeneration: 1,
},
}
@ -128,7 +129,7 @@ var (
},
},
Spec: apps.DeploymentSpec{
Replicas: pointer.Int32Ptr(10),
Replicas: pointer.Int32(10),
Strategy: apps.DeploymentStrategy{
Type: apps.RollingUpdateDeploymentStrategyType,
RollingUpdate: &apps.RollingUpdateDeployment{
@ -192,14 +193,183 @@ var (
},
},
}
hpaDemo = &autoscalingv1.HorizontalPodAutoscaler{
TypeMeta: metav1.TypeMeta{
APIVersion: "autoscaling/v1",
Kind: "HorizontalPodAutoscaler",
},
ObjectMeta: metav1.ObjectMeta{
Name: "hpa",
Namespace: deploymentKey.Namespace,
},
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
APIVersion: apps.SchemeGroupVersion.String(),
Kind: "Deployment",
Name: deploymentDemo.Name,
},
MinReplicas: pointer.Int32(1),
MaxReplicas: 10,
},
}
)
func init() {
apps.AddToScheme(scheme)
rolloutapi.AddToScheme(scheme)
kruiseappsv1alpha1.AddToScheme(scheme)
autoscalingv1.AddToScheme(scheme)
}
func TestControlPackage(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Deployment Control Package Suite")
}
var _ = Describe("Deployment Control", func() {
var (
c client.Client
rc *realController
deployment *apps.Deployment
release *v1beta1.BatchRelease
hpa *autoscalingv1.HorizontalPodAutoscaler
stableRS *apps.ReplicaSet
canaryRS *apps.ReplicaSet
)
BeforeEach(func() {
deployment = deploymentDemo.DeepCopy()
release = releaseDemo.DeepCopy()
hpa = hpaDemo.DeepCopy()
deployment = getStableWithReady(deployment, "v1").(*apps.Deployment)
stableRS = makeStableReplicaSets(deployment).(*apps.ReplicaSet)
stableRS.Spec.MinReadySeconds = 0
stableRS.Status.ReadyReplicas = *deployment.Spec.Replicas
stableRS.Status.AvailableReplicas = *deployment.Spec.Replicas
canaryRS = makeCanaryReplicaSets(deployment).(*apps.ReplicaSet)
canaryRS.Status.ReadyReplicas = 0
canaryRS.Status.AvailableReplicas = 0
c = fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(deployment, release, hpa, stableRS, canaryRS).
Build()
rc = &realController{
key: types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name},
client: c,
finder: util.NewControllerFinder(c),
}
})
It("should initialize Deployment successfully", func() {
// build controller
_, err := rc.BuildController()
Expect(err).NotTo(HaveOccurred())
// call Initialize method
err = retryFunction(3, func() error {
return rc.Initialize(release)
})
Expect(err).NotTo(HaveOccurred())
// inspect if HPA is disabled
disabledHPA := &autoscalingv1.HorizontalPodAutoscaler{}
err = c.Get(context.TODO(), types.NamespacedName{Namespace: hpa.Namespace, Name: hpa.Name}, disabledHPA)
Expect(err).NotTo(HaveOccurred())
Expect(disabledHPA.Spec.ScaleTargetRef.Name).To(Equal(deployment.Name + "-DisableByRollout"))
// inspect if MinReadySeconds of stable ReplicaSet is updated
stableRSAfter := &apps.ReplicaSet{}
err = c.Get(context.TODO(), client.ObjectKeyFromObject(stableRS), stableRSAfter)
Expect(err).NotTo(HaveOccurred())
Expect(stableRSAfter.Spec.MinReadySeconds).To(Equal(int32(v1beta1.MaxReadySeconds)))
// inspect if Deployment is patched properly
updatedDeployment := &apps.Deployment{}
err = c.Get(context.TODO(), client.ObjectKeyFromObject(deployment), updatedDeployment)
Expect(err).NotTo(HaveOccurred())
// inspect if annotations are added
Expect(updatedDeployment.Annotations).To(HaveKey(v1beta1.OriginalDeploymentStrategyAnnotation))
Expect(updatedDeployment.Annotations).To(HaveKey(util.BatchReleaseControlAnnotation))
Expect(updatedDeployment.Annotations[util.BatchReleaseControlAnnotation]).To(Equal(getControlInfo(release)))
// inspect if strategy is updated
Expect(updatedDeployment.Spec.Strategy.RollingUpdate).NotTo(BeNil())
Expect(updatedDeployment.Spec.Strategy.RollingUpdate.MaxSurge.IntVal).To(Equal(int32(1)))
Expect(updatedDeployment.Spec.Strategy.RollingUpdate.MaxUnavailable.IntVal).To(Equal(int32(0)))
Expect(updatedDeployment.Spec.MinReadySeconds).To(Equal(int32(v1beta1.MaxReadySeconds)))
Expect(*updatedDeployment.Spec.ProgressDeadlineSeconds).To(Equal(int32(v1beta1.MaxProgressSeconds)))
})
It("should finalize Deployment successfully", func() {
// build controller
rc.object = nil
_, err := rc.BuildController()
Expect(err).NotTo(HaveOccurred())
// call Finalize method
err = retryFunction(3, func() error {
return rc.Finalize(release)
})
Expect(err).NotTo(HaveOccurred())
// inspect if Deployment is patched properly
updatedDeployment := &apps.Deployment{}
err = c.Get(context.TODO(), client.ObjectKeyFromObject(deployment), updatedDeployment)
Expect(err).NotTo(HaveOccurred())
// inspect if annotations are removed
Expect(updatedDeployment.Annotations).NotTo(HaveKey(v1beta1.OriginalDeploymentStrategyAnnotation))
Expect(updatedDeployment.Annotations).NotTo(HaveKey(util.BatchReleaseControlAnnotation))
// inspect if strategy is restored
Expect(updatedDeployment.Spec.Strategy.RollingUpdate).NotTo(BeNil())
Expect(*updatedDeployment.Spec.Strategy.RollingUpdate.MaxSurge).To(Equal(intstr.IntOrString{Type: intstr.String, StrVal: "20%"}))
Expect(*updatedDeployment.Spec.Strategy.RollingUpdate.MaxUnavailable).To(Equal(intstr.IntOrString{Type: intstr.Int, IntVal: 1}))
Expect(updatedDeployment.Spec.MinReadySeconds).To(Equal(int32(0)))
Expect(updatedDeployment.Spec.ProgressDeadlineSeconds).To(BeNil())
// inspect if HPA is restored
restoredHPA := &autoscalingv1.HorizontalPodAutoscaler{}
err = c.Get(context.TODO(), types.NamespacedName{Namespace: hpa.Namespace, Name: hpa.Name}, restoredHPA)
Expect(err).NotTo(HaveOccurred())
Expect(restoredHPA.Spec.ScaleTargetRef.Name).To(Equal(deployment.Name))
// inspect if MinReadySeconds of stable ReplicaSet is restored
stableRSAfter := &apps.ReplicaSet{}
err = c.Get(context.TODO(), client.ObjectKeyFromObject(stableRS), stableRSAfter)
Expect(err).NotTo(HaveOccurred())
Expect(stableRSAfter.Spec.MinReadySeconds).To(Equal(int32(0)))
})
It("should upgradBatch for Deployment successfully", func() {
// call Initialize method
_, err := rc.BuildController()
Expect(err).NotTo(HaveOccurred())
err = retryFunction(3, func() error {
return rc.Initialize(release)
})
Expect(err).NotTo(HaveOccurred())
// call UpgradeBatch method
rc.object = nil
_, err = rc.BuildController()
Expect(err).NotTo(HaveOccurred())
batchContext, err := rc.CalculateBatchContext(release)
Expect(err).NotTo(HaveOccurred())
err = rc.UpgradeBatch(batchContext)
Expect(err).NotTo(HaveOccurred())
// inspect if Deployment is patched properly
updatedDeployment := &apps.Deployment{}
err = c.Get(context.TODO(), client.ObjectKeyFromObject(deployment), updatedDeployment)
Expect(err).NotTo(HaveOccurred())
Expect(updatedDeployment.Spec.Paused).To(BeFalse())
Expect(*updatedDeployment.Spec.Strategy.RollingUpdate.MaxSurge).To(Equal(intstr.IntOrString{Type: intstr.String, StrVal: "50%"}))
Expect(*updatedDeployment.Spec.Strategy.RollingUpdate.MaxUnavailable).To(Equal(intstr.IntOrString{Type: intstr.Int, IntVal: 0}))
})
})
func TestCalculateBatchContext(t *testing.T) {
RegisterFailHandler(Fail)
cases := map[string]struct {
@ -218,7 +388,7 @@ func TestCalculateBatchContext(t *testing.T) {
}
// current partition, ie. maxSurge
deployment.Spec.Strategy.RollingUpdate.MaxSurge = &intstr.IntOrString{Type: intstr.String, StrVal: "50%"}
deployment.Spec.Replicas = pointer.Int32Ptr(10)
deployment.Spec.Replicas = pointer.Int32(10)
newRss := makeCanaryReplicaSets(deployment).(*apps.ReplicaSet)
newRss.Status.ReadyReplicas = 2
return []client.Object{deployment, newRss, makeStableReplicaSets(deployment)}
@ -442,7 +612,7 @@ func TestRealController(t *testing.T) {
release.Spec.ReleasePlan.BatchPartition = nil
err = controller.Finalize(release)
Expect(errors.IsBenign(err)).Should(BeTrue())
Expect(errors.IsRetryError(err)).Should(BeTrue())
fetch = &apps.Deployment{}
Expect(cli.Get(context.TODO(), deploymentKey, fetch)).NotTo(HaveOccurred())
// check workload strategy
@ -547,3 +717,12 @@ func getStableWithReady(workload client.Object, version string) client.Object {
}
return nil
}
func retryFunction(limit int, f func() error) (err error) {
for i := limit; i >= 0; i-- {
if err = f(); err == nil {
return nil
}
}
return err
}

View File

@ -12,8 +12,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
// "github.com/openkruise/rollouts/api/v1alpha1"
// import hpa v1
)
var (

View File

@ -111,11 +111,12 @@ func (r *RolloutReconciler) reconcileRolloutProgressing(rollout *v1beta1.Rollout
case v1alpha1.ProgressingReasonInRolling:
klog.Infof("rollout(%s/%s) is Progressing, and in reason(%s)", rollout.Namespace, rollout.Name, cond.Reason)
err = r.doProgressingInRolling(rolloutContext)
if utilerrors.IsFatal(err) {
if utilerrors.IsBadRequest(err) {
// For fatal errors, do not retry as it wastes resources and has no effect.
// therefore, we don't propagate the error, but just log it.
// user should do sth instead, eg. for bluegreen continuous release scenario, user should do rollback
klog.Warningf("rollout(%s/%s) doProgressingInRolling error(%s)", rollout.Namespace, rollout.Name, err.Error())
return nil, nil
} else if err != nil {
return nil, err
}
@ -231,6 +232,16 @@ func (r *RolloutReconciler) handleRolloutPaused(rollout *v1beta1.Rollout, newSta
return nil
}
/*
continuous release (or successive release) is not supported for bluegreen release, especially for cloneset,
here is why:
suppose we are releasing a cloneSet, which has pods of both v1 and v2 for now. If we release v3 before
v2 is fully released, the cloneSet controller might scale down pods without distinguishing between v1 and v2.
This is because our implementation is based on the minReadySeconds, pods of both v1 and v2 are "unavailable"
in the progress of rollout.
Deployment actually has the same problem, however it is possible to bypass this issue for Deployment by setting
minReadySeconds for replicaset separately; unfortunately this workaround seems not work for cloneset
*/
func (r *RolloutReconciler) handleContinuousRelease(c *RolloutContext) error {
r.Recorder.Eventf(c.Rollout, corev1.EventTypeNormal, "Progressing", "workload continuous publishing canaryRevision, then restart publishing")
klog.Infof("rollout(%s/%s) workload continuous publishing canaryRevision from(%s) -> to(%s), then restart publishing",
@ -239,9 +250,9 @@ func (r *RolloutReconciler) handleContinuousRelease(c *RolloutContext) error {
// do nothing for blue-green release
if c.Rollout.Spec.Strategy.IsBlueGreenRelease() {
cond := util.GetRolloutCondition(*c.NewStatus, v1beta1.RolloutConditionProgressing)
cond.Message = "[warning] new version released in progress of blue-green release, please rollback first"
cond.Message = "new version releasing detected in the progress of blue-green release, please rollback first"
c.NewStatus.Message = cond.Message
return utilerrors.NewFatalError(fmt.Errorf("cannot do continuous release for blue-green release, rollback firstly"))
return utilerrors.NewBadRequestError(fmt.Errorf("new version releasing detected in the progress of blue-green release, please rollback first"))
}
done, err := r.doProgressingReset(c)

View File

@ -5,66 +5,65 @@ import (
"fmt"
)
// BenignError represents a benign error that can be handled or ignored by the caller.
// RetryError represents a benign error that can be handled or ignored by the caller.
// It encapsulates information that is non-critical and does not require immediate attention.
type BenignError struct {
type RetryError struct {
Err error
}
// Error implements the error interface for BenignError.
// Error implements the error interface for RetryError.
// It returns the error message of the encapsulated error or a default message.
func (e *BenignError) Error() string {
func (e *RetryError) Error() string {
if e.Err != nil {
return fmt.Sprintf("[benign]: %s", e.Err.Error())
return fmt.Sprintf("[retry]: %s", e.Err.Error())
}
return "benign error"
return "retry error"
}
// NewBenignError creates a new instance of BenignError.
// If the provided err is nil, it signifies a benign condition without a specific error message.
func NewBenignError(err error) *BenignError {
return &BenignError{Err: err}
// NewRetryError creates a new instance of RetryError.
func NewRetryError(err error) *RetryError {
return &RetryError{Err: err}
}
func IsBenign(err error) bool {
var benignErr *BenignError
return errors.As(err, &benignErr)
func IsRetryError(err error) bool {
var re *RetryError
return errors.As(err, &re)
}
func AsBenign(err error, target **BenignError) bool {
func AsRetryError(err error, target **RetryError) bool {
return errors.As(err, target)
}
// FatalError represents a fatal error that requires special handling.
// BadRequestError represents a fatal error that requires special handling.
// Such errors are critical and may necessitate logging, alerts, or even program termination.
type FatalError struct {
type BadRequestError struct {
Err error
}
// Error implements the error interface for FatalError.
// Error implements the error interface for BadRequestError.
// It returns the error message of the encapsulated error or a default message.
func (e *FatalError) Error() string {
func (e *BadRequestError) Error() string {
if e.Err != nil {
return e.Err.Error()
}
return "fatal error"
}
// NewFatalError creates a new instance of FatalError.
// NewBadRequestError creates a new instance of BadRequestError.
// It encapsulates the provided error, marking it as critical.
func NewFatalError(err error) *FatalError {
return &FatalError{Err: err}
func NewBadRequestError(err error) *BadRequestError {
return &BadRequestError{Err: err}
}
// IsFatal checks whether the provided error is of type FatalError.
// It returns true if the error is a FatalError or wraps a FatalError, false otherwise.
func IsFatal(err error) bool {
var fatalErr *FatalError
return AsFatal(err, &fatalErr)
// IsBadRequest checks whether the provided error is of type BadRequestError.
// It returns true if the error is a BadRequestError or wraps a BadRequestError, false otherwise.
func IsBadRequest(err error) bool {
var brErr *BadRequestError
return AsBadRequest(err, &brErr)
}
// AsFatal attempts to cast the provided error to a FatalError.
// AsBadRequest attempts to cast the provided error to a BadRequestError.
// It returns true if the casting is successful, allowing the caller to handle it accordingly.
func AsFatal(err error, target **FatalError) bool {
func AsBadRequest(err error, target **BadRequestError) bool {
return errors.As(err, target)
}

View File

@ -36,6 +36,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
var (
blueGreenSupportWorkloadGVKs = []*schema.GroupVersionKind{
&util.ControllerKindDep,
&util.ControllerKruiseKindCS,
}
)
// RolloutCreateUpdateHandler handles Rollout
type RolloutCreateUpdateHandler struct {
// To use the client, you need to do the following:
@ -204,12 +211,12 @@ func (h *RolloutCreateUpdateHandler) validateRolloutConflict(rollout *appsv1beta
}
func validateRolloutSpec(c *validateContext, rollout *appsv1beta1.Rollout, fldPath *field.Path) field.ErrorList {
errList := validateRolloutSpecObjectRef(&rollout.Spec.WorkloadRef, fldPath.Child("ObjectRef"))
errList := validateRolloutSpecObjectRef(c, &rollout.Spec.WorkloadRef, fldPath.Child("ObjectRef"))
errList = append(errList, validateRolloutSpecStrategy(c, &rollout.Spec.Strategy, fldPath.Child("Strategy"))...)
return errList
}
func validateRolloutSpecObjectRef(workloadRef *appsv1beta1.ObjectRef, fldPath *field.Path) field.ErrorList {
func validateRolloutSpecObjectRef(c *validateContext, workloadRef *appsv1beta1.ObjectRef, fldPath *field.Path) field.ErrorList {
if workloadRef == nil {
return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), workloadRef, "WorkloadRef is required")}
}
@ -218,6 +225,14 @@ func validateRolloutSpecObjectRef(workloadRef *appsv1beta1.ObjectRef, fldPath *f
if !util.IsSupportedWorkload(gvk) {
return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), workloadRef, "WorkloadRef kind is not supported")}
}
if c.style == string(appsv1beta1.BlueGreenRollingStyle) {
for _, allowed := range blueGreenSupportWorkloadGVKs {
if gvk.Group == allowed.Group && gvk.Kind == allowed.Kind {
return nil
}
}
return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), workloadRef, "WorkloadRef kind is not supported for bluegreen style")}
}
return nil
}
@ -378,7 +393,7 @@ func (h *RolloutCreateUpdateHandler) InjectDecoder(d *admission.Decoder) error {
func GetContextFromv1beta1Rollout(rollout *appsv1beta1.Rollout) *validateContext {
if rollout.Spec.Strategy.Canary == nil && rollout.Spec.Strategy.BlueGreen == nil {
return nil
return &validateContext{}
}
style := rollout.Spec.Strategy.GetRollingStyle()
if appsv1beta1.IsRealPartition(rollout) {

View File

@ -0,0 +1,241 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mutating
import (
"context"
"encoding/json"
"math"
"net/http"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/rollouts/api/v1beta1"
"github.com/openkruise/rollouts/pkg/util"
utilclient "github.com/openkruise/rollouts/pkg/util/client"
util2 "github.com/openkruise/rollouts/pkg/webhook/util"
"github.com/openkruise/rollouts/pkg/webhook/util/configuration"
admissionv1 "k8s.io/api/admission/v1"
v1 "k8s.io/api/admissionregistration/v1"
apps "k8s.io/api/apps/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
labels2 "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
// UnifiedWorkloadHandler handles Pod
type UnifiedWorkloadHandler struct {
// To use the client, you need to do the following:
// - uncomment it
// - import sigs.k8s.io/controller-runtime/pkg/client
// - uncomment the InjectClient method at the bottom of this file.
Client client.Client
// Decoder decodes objects
Decoder *admission.Decoder
Finder *util.ControllerFinder
}
var _ admission.Handler = &UnifiedWorkloadHandler{}
// Handle handles admission requests.
func (h *UnifiedWorkloadHandler) Handle(ctx context.Context, req admission.Request) admission.Response {
// if subResources, then ignore
if req.Operation != admissionv1.Update || req.SubResource != "" {
return admission.Allowed("")
}
meetingRules, err := h.checkWorkloadRules(ctx, req)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !meetingRules {
return admission.Allowed("")
}
switch req.Kind.Group {
// kruise cloneSet
case kruiseappsv1alpha1.GroupVersion.Group:
switch req.Kind.Kind {
case util.ControllerKruiseKindCS.Kind, util.ControllerKruiseKindDS.Kind:
return admission.Allowed("")
}
// native k8s deloyment
case apps.SchemeGroupVersion.Group:
switch req.Kind.Kind {
case util.ControllerKindDep.Kind:
return admission.Allowed("")
}
}
// handle other workload types, including native/advanced statefulset
{
newObj := &unstructured.Unstructured{}
newObj.SetGroupVersionKind(schema.GroupVersionKind{Group: req.Kind.Group, Version: req.Kind.Version, Kind: req.Kind.Kind})
if err := h.Decoder.Decode(req, newObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !util.IsWorkloadType(newObj, util.StatefulSetType) && req.Kind.Kind != util.ControllerKindSts.Kind {
return admission.Allowed("")
}
oldObj := &unstructured.Unstructured{}
oldObj.SetGroupVersionKind(schema.GroupVersionKind{Group: req.Kind.Group, Version: req.Kind.Version, Kind: req.Kind.Kind})
if err := h.Decoder.Decode(
admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}},
oldObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
changed, err := h.handleStatefulSetLikeWorkload(newObj, oldObj)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !changed {
return admission.Allowed("")
}
marshalled, err := json.Marshal(newObj.Object)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.AdmissionRequest.Object.Raw, marshalled)
}
}
func (h *UnifiedWorkloadHandler) handleStatefulSetLikeWorkload(newObj, oldObj *unstructured.Unstructured) (bool, error) {
// indicate whether the workload can enter the rollout process
// 1. replicas > 0
if util.GetReplicas(newObj) == 0 || !util.IsStatefulSetRollingUpdate(newObj) {
return false, nil
}
oldTemplate, newTemplate := util.GetTemplate(oldObj), util.GetTemplate(newObj)
if oldTemplate == nil || newTemplate == nil {
return false, nil
}
oldMetadata, newMetadata := util.GetMetadata(oldObj), util.GetMetadata(newObj)
if newMetadata.Annotations[appsv1beta1.RolloutIDLabel] != "" &&
oldMetadata.Annotations[appsv1beta1.RolloutIDLabel] == newMetadata.Annotations[appsv1beta1.RolloutIDLabel] {
return false, nil
} else if newMetadata.Annotations[appsv1beta1.RolloutIDLabel] == "" && util.EqualIgnoreHash(oldTemplate, newTemplate) {
return false, nil
}
rollout, err := h.fetchMatchedRollout(newObj)
if err != nil {
return false, err
} else if rollout == nil || rollout.Spec.Strategy.IsEmptyRelease() {
return false, nil
}
util.SetStatefulSetPartition(newObj, math.MaxInt16)
state := &util.RolloutState{RolloutName: rollout.Name}
by, _ := json.Marshal(state)
annotation := newObj.GetAnnotations()
if annotation == nil {
annotation = map[string]string{}
}
annotation[util.InRolloutProgressingAnnotation] = string(by)
newObj.SetAnnotations(annotation)
klog.Infof("StatefulSet(%s/%s) will be released incrementally based on Rollout(%s)", newMetadata.Namespace, newMetadata.Name, rollout.Name)
return true, nil
}
func (h *UnifiedWorkloadHandler) fetchMatchedRollout(obj client.Object) (*appsv1beta1.Rollout, error) {
oGv := obj.GetObjectKind().GroupVersionKind()
rolloutList := &appsv1beta1.RolloutList{}
if err := h.Client.List(context.TODO(), rolloutList, utilclient.DisableDeepCopy,
&client.ListOptions{Namespace: obj.GetNamespace()}); err != nil {
klog.Errorf("UnifiedWorkloadHandler List rollout failed: %s", err.Error())
return nil, err
}
for i := range rolloutList.Items {
rollout := &rolloutList.Items[i]
if !rollout.DeletionTimestamp.IsZero() {
continue
}
if rollout.Status.Phase == appsv1beta1.RolloutPhaseDisabled {
klog.Infof("Disabled rollout(%s/%s) fetched when fetching matched rollout", rollout.Namespace, rollout.Name)
continue
}
ref := rollout.Spec.WorkloadRef
gv, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
klog.Warningf("ParseGroupVersion rollout(%s/%s) ref failed: %s", rollout.Namespace, rollout.Name, err.Error())
continue
}
if oGv.Group == gv.Group && oGv.Kind == ref.Kind && obj.GetName() == ref.Name {
return rollout, nil
}
}
return nil, nil
}
var _ inject.Client = &UnifiedWorkloadHandler{}
// InjectClient injects the client into the UnifiedWorkloadHandler
func (h *UnifiedWorkloadHandler) InjectClient(c client.Client) error {
h.Client = c
h.Finder = util.NewControllerFinder(c)
return nil
}
var _ admission.DecoderInjector = &UnifiedWorkloadHandler{}
// InjectDecoder injects the decoder into the UnifiedWorkloadHandler
func (h *UnifiedWorkloadHandler) InjectDecoder(d *admission.Decoder) error {
h.Decoder = d
return nil
}
func (h *UnifiedWorkloadHandler) checkWorkloadRules(ctx context.Context, req admission.Request) (bool, error) {
webhook := &v1.MutatingWebhookConfiguration{}
if err := h.Client.Get(ctx, types.NamespacedName{Name: configuration.MutatingWebhookConfigurationName}, webhook); err != nil {
return false, err
}
newObject := unstructured.Unstructured{}
if err := h.Decoder.Decode(req, &newObject); err != nil {
return false, err
}
labels := newObject.GetLabels()
attr, err := constructAttr(req)
if err != nil {
return false, err
}
for _, webhook := range webhook.Webhooks {
for _, rule := range webhook.Rules {
m := util2.Matcher{Rule: rule, Attr: attr}
if m.Matches() {
selector, err := v12.LabelSelectorAsSelector(webhook.ObjectSelector)
if err != nil {
return false, nil
}
if selector.Matches(labels2.Set(labels)) {
return true, nil
}
}
}
}
return false, nil
}

View File

@ -0,0 +1,110 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mutating
import (
"context"
"encoding/json"
"math"
"reflect"
"testing"
kruiseappsv1beta1 "github.com/openkruise/kruise-api/apps/v1beta1"
appsv1beta1 "github.com/openkruise/rollouts/api/v1beta1"
"github.com/openkruise/rollouts/pkg/util"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
func TestHandleStatefulSet(t *testing.T) {
cases := []struct {
name string
getObjs func() (*kruiseappsv1beta1.StatefulSet, *kruiseappsv1beta1.StatefulSet)
expectObj func() *kruiseappsv1beta1.StatefulSet
getRollout func() *appsv1beta1.Rollout
isError bool
}{
{
name: "cloneSet image v1->v2, matched rollout",
getObjs: func() (*kruiseappsv1beta1.StatefulSet, *kruiseappsv1beta1.StatefulSet) {
oldObj := statefulset.DeepCopy()
newObj := statefulset.DeepCopy()
newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
return oldObj, newObj
},
expectObj: func() *kruiseappsv1beta1.StatefulSet {
obj := statefulset.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo"}`
obj.Spec.UpdateStrategy.RollingUpdate.Partition = pointer.Int32(math.MaxInt16)
return obj
},
getRollout: func() *appsv1beta1.Rollout {
obj := rolloutDemo.DeepCopy()
obj.Spec.WorkloadRef = appsv1beta1.ObjectRef{
APIVersion: "apps.kruise.io/v1beta1",
Kind: "StatefulSet",
Name: "echoserver",
}
return obj
},
},
}
decoder, _ := admission.NewDecoder(scheme)
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
client := fake.NewClientBuilder().WithScheme(scheme).Build()
h := UnifiedWorkloadHandler{
Client: client,
Decoder: decoder,
Finder: util.NewControllerFinder(client),
}
rollout := cs.getRollout()
if err := client.Create(context.TODO(), rollout); err != nil {
t.Errorf(err.Error())
}
oldObj, newObj := cs.getObjs()
oldO, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(oldObj)
newO, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(newObj)
oldUnstructured := &unstructured.Unstructured{Object: oldO}
newUnstructured := &unstructured.Unstructured{Object: newO}
oldUnstructured.SetGroupVersionKind(newObj.GroupVersionKind())
newUnstructured.SetGroupVersionKind(newObj.GroupVersionKind())
_, err := h.handleStatefulSetLikeWorkload(newUnstructured, oldUnstructured)
if cs.isError && err == nil {
t.Fatal("handleStatefulSetLikeWorkload failed")
} else if !cs.isError && err != nil {
t.Fatalf(err.Error())
}
newStructured := &kruiseappsv1beta1.StatefulSet{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(newUnstructured.Object, newStructured)
if err != nil {
t.Fatal("DefaultUnstructuredConvert failed")
}
expect := cs.expectObj()
if !reflect.DeepEqual(newStructured, expect) {
by, _ := json.Marshal(newStructured)
t.Fatalf("handlerCloneSet failed, and new(%s)", string(by))
}
})
}
}

View File

@ -23,8 +23,6 @@ import (
// +kubebuilder:webhook:path=/mutate-apps-kruise-io-v1alpha1-cloneset,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps.kruise.io,resources=clonesets,verbs=update,versions=v1alpha1,name=mcloneset.kb.io
// +kubebuilder:webhook:path=/mutate-apps-kruise-io-v1alpha1-daemonset,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps.kruise.io,resources=daemonsets,verbs=update,versions=v1alpha1,name=mdaemonset.kb.io
// +kubebuilder:webhook:path=/mutate-apps-v1-deployment,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps,resources=deployments,verbs=update,versions=v1,name=mdeployment.kb.io
// +kubebuilder:webhook:path=/mutate-apps-v1-statefulset,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps,resources=statefulsets,verbs=update,versions=v1,name=mstatefulset.kb.io
// +kubebuilder:webhook:path=/mutate-apps-kruise-io-statefulset,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps.kruise.io,resources=statefulsets,verbs=create;update,versions=v1alpha1;v1beta1,name=madvancedstatefulset.kb.io
// +kubebuilder:webhook:path=/mutate-unified-workload,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=*,resources=*,verbs=create;update,versions=*,name=munifiedworload.kb.io
var (
@ -32,9 +30,7 @@ var (
HandlerMap = map[string]admission.Handler{
"mutate-apps-kruise-io-v1alpha1-cloneset": &WorkloadHandler{},
"mutate-apps-v1-deployment": &WorkloadHandler{},
"mutate-apps-v1-statefulset": &WorkloadHandler{},
"mutate-apps-kruise-io-statefulset": &WorkloadHandler{},
"mutate-unified-workload": &WorkloadHandler{},
"mutate-apps-kruise-io-v1alpha1-daemonset": &WorkloadHandler{},
"mutate-unified-workload": &UnifiedWorkloadHandler{},
}
)

View File

@ -189,74 +189,7 @@ func (h *WorkloadHandler) Handle(ctx context.Context, req admission.Request) adm
}
}
// handle other workload types, including native/advanced statefulset
{
newObj := &unstructured.Unstructured{}
newObj.SetGroupVersionKind(schema.GroupVersionKind{Group: req.Kind.Group, Version: req.Kind.Version, Kind: req.Kind.Kind})
if err := h.Decoder.Decode(req, newObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !util.IsWorkloadType(newObj, util.StatefulSetType) && req.Kind.Kind != util.ControllerKindSts.Kind {
return admission.Allowed("")
}
oldObj := &unstructured.Unstructured{}
oldObj.SetGroupVersionKind(schema.GroupVersionKind{Group: req.Kind.Group, Version: req.Kind.Version, Kind: req.Kind.Kind})
if err := h.Decoder.Decode(
admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}},
oldObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
changed, err := h.handleStatefulSetLikeWorkload(newObj, oldObj)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !changed {
return admission.Allowed("")
}
marshalled, err := json.Marshal(newObj.Object)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.AdmissionRequest.Object.Raw, marshalled)
}
}
func (h *WorkloadHandler) handleStatefulSetLikeWorkload(newObj, oldObj *unstructured.Unstructured) (bool, error) {
// indicate whether the workload can enter the rollout process
// 1. replicas > 0
if util.GetReplicas(newObj) == 0 || !util.IsStatefulSetRollingUpdate(newObj) {
return false, nil
}
oldTemplate, newTemplate := util.GetTemplate(oldObj), util.GetTemplate(newObj)
if oldTemplate == nil || newTemplate == nil {
return false, nil
}
oldMetadata, newMetadata := util.GetMetadata(oldObj), util.GetMetadata(newObj)
if newMetadata.Annotations[appsv1beta1.RolloutIDLabel] != "" &&
oldMetadata.Annotations[appsv1beta1.RolloutIDLabel] == newMetadata.Annotations[appsv1beta1.RolloutIDLabel] {
return false, nil
} else if newMetadata.Annotations[appsv1beta1.RolloutIDLabel] == "" && util.EqualIgnoreHash(oldTemplate, newTemplate) {
return false, nil
}
rollout, err := h.fetchMatchedRollout(newObj)
if err != nil {
return false, err
} else if rollout == nil || rollout.Spec.Strategy.IsEmptyRelease() {
return false, nil
}
util.SetStatefulSetPartition(newObj, math.MaxInt16)
state := &util.RolloutState{RolloutName: rollout.Name}
by, _ := json.Marshal(state)
annotation := newObj.GetAnnotations()
if annotation == nil {
annotation = map[string]string{}
}
annotation[util.InRolloutProgressingAnnotation] = string(by)
newObj.SetAnnotations(annotation)
klog.Infof("StatefulSet(%s/%s) will be released incrementally based on Rollout(%s)", newMetadata.Namespace, newMetadata.Name, rollout.Name)
return true, nil
return admission.Allowed("")
}
func (h *WorkloadHandler) handleDeployment(newObj, oldObj *apps.Deployment) (bool, error) {
@ -386,21 +319,6 @@ func (h *WorkloadHandler) handleCloneSet(newObj, oldObj *kruiseappsv1alpha1.Clon
} else if rollout == nil || rollout.Spec.Strategy.IsEmptyRelease() {
return false, nil
}
/*
continuous release (or successive release) is not supported for bluegreen release, especially for cloneset,
here is why:
suppose we are releasing a cloneset, which has pods of both v1 and v2 for now. If we release v3 before
v2 release is done, the cloneset controller might scale down pods without distinguishing between v1 and v2.
This is because our implementation is based on the minReadySeconds, pods of both v1 and v2 are "unavailable"
in the progress of rollout.
Deployment actually has the same problem, however it is possible to bypass this issue for Deployment by setting
minReadySeconds for replicaset separately; unfortunately this workaround seems not work for cloneset
// if rollout.Spec.Strategy.IsBlueGreenRelease() && revision.IsContinuousRelease(h.Client, oldObj, newObj) {
// err = fmt.Errorf("successive release is not supported for bluegreen for now, rollback first")
// return false, fmt.Errorf(err.Error())
// }
*/
// if traffic routing, there must only be one version of Pods
if rollout.Spec.Strategy.HasTrafficRoutings() && newObj.Status.Replicas != newObj.Status.UpdatedReplicas {

View File

@ -36,7 +36,6 @@ import (
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
@ -825,82 +824,6 @@ func TestHandlerDaemonSet(t *testing.T) {
}
}
func TestHandleStatefulSet(t *testing.T) {
cases := []struct {
name string
getObjs func() (*kruiseappsv1beta1.StatefulSet, *kruiseappsv1beta1.StatefulSet)
expectObj func() *kruiseappsv1beta1.StatefulSet
getRollout func() *appsv1beta1.Rollout
isError bool
}{
{
name: "cloneSet image v1->v2, matched rollout",
getObjs: func() (*kruiseappsv1beta1.StatefulSet, *kruiseappsv1beta1.StatefulSet) {
oldObj := statefulset.DeepCopy()
newObj := statefulset.DeepCopy()
newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
return oldObj, newObj
},
expectObj: func() *kruiseappsv1beta1.StatefulSet {
obj := statefulset.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo"}`
obj.Spec.UpdateStrategy.RollingUpdate.Partition = pointer.Int32(math.MaxInt16)
return obj
},
getRollout: func() *appsv1beta1.Rollout {
obj := rolloutDemo.DeepCopy()
obj.Spec.WorkloadRef = appsv1beta1.ObjectRef{
APIVersion: "apps.kruise.io/v1beta1",
Kind: "StatefulSet",
Name: "echoserver",
}
return obj
},
},
}
decoder, _ := admission.NewDecoder(scheme)
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
client := fake.NewClientBuilder().WithScheme(scheme).Build()
h := WorkloadHandler{
Client: client,
Decoder: decoder,
Finder: util.NewControllerFinder(client),
}
rollout := cs.getRollout()
if err := client.Create(context.TODO(), rollout); err != nil {
t.Errorf(err.Error())
}
oldObj, newObj := cs.getObjs()
oldO, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(oldObj)
newO, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(newObj)
oldUnstructured := &unstructured.Unstructured{Object: oldO}
newUnstructured := &unstructured.Unstructured{Object: newO}
oldUnstructured.SetGroupVersionKind(newObj.GroupVersionKind())
newUnstructured.SetGroupVersionKind(newObj.GroupVersionKind())
_, err := h.handleStatefulSetLikeWorkload(newUnstructured, oldUnstructured)
if cs.isError && err == nil {
t.Fatal("handleStatefulSetLikeWorkload failed")
} else if !cs.isError && err != nil {
t.Fatalf(err.Error())
}
newStructured := &kruiseappsv1beta1.StatefulSet{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(newUnstructured.Object, newStructured)
if err != nil {
t.Fatal("DefaultUnstructuredConvert failed")
}
expect := cs.expectObj()
if !reflect.DeepEqual(newStructured, expect) {
by, _ := json.Marshal(newStructured)
t.Fatalf("handlerCloneSet failed, and new(%s)", string(by))
}
})
}
}
func TestCheckWorkloadRule(t *testing.T) {
ctx := context.Background()