add disable filed for rollout

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
This commit is contained in:
mingzhou.swx 2022-08-30 18:07:55 +08:00
parent 3d1df9c315
commit eae8246621
9 changed files with 153 additions and 12 deletions

View File

@ -72,6 +72,9 @@ type WorkloadRef struct {
// RolloutStrategy defines strategy to apply during next rollout
type RolloutStrategy struct {
// Disabled means the rollout will be not work, just like it was deleted.
// Default value is false
Disabled bool `json:"disabled,omitempty"`
// Paused indicates that the Rollout is paused.
// Default value is false
Paused bool `json:"paused,omitempty"`
@ -212,6 +215,12 @@ const (
ProgressingReasonCanceled = "Canceled"
ProgressingReasonPaused = "Paused"
// Disabled condition
RolloutConditionDisabled RolloutConditionType = "Disabled"
// Disabled Reason
DisabledReasonDisabling = "InDisabling"
DisabledReasonCompleted = "Completed"
// Terminating condition
RolloutConditionTerminating RolloutConditionType = "Terminating"
// Terminating Reason
@ -271,6 +280,8 @@ const (
RolloutPhaseProgressing RolloutPhase = "Progressing"
// RolloutPhaseTerminating indicates a rollout is terminated
RolloutPhaseTerminating RolloutPhase = "Terminating"
// RolloutPhaseDisabled indicates a rollout is disabled
RolloutPhaseDisabled RolloutPhase = "Disabled"
)
// +genclient

View File

@ -172,6 +172,10 @@ spec:
type: object
type: array
type: object
disabled:
description: Disabled means the rollout will be not work, just
like it was deleted. Default value is false
type: boolean
paused:
description: Paused indicates that the Rollout is paused. Default
value is false

View File

@ -75,7 +75,6 @@ func (p podEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingI
return
}
klog.Infof("Pod %v ready condition changed, then enqueue", client.ObjectKeyFromObject(newPod))
p.enqueue(newPod, q)
}

View File

@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
)
@ -196,21 +197,27 @@ func (r *rolloutContext) doCanaryPaused() (bool, error) {
currentStep := r.rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1]
steps := len(r.rollout.Spec.Strategy.Canary.Steps)
cond := util.GetRolloutCondition(*r.newStatus, rolloutv1alpha1.RolloutConditionProgressing)
// need manual confirmation
if currentStep.Pause.Duration == nil {
if currentStep.Pause.Duration == nil && !isRolloutReallyCompleted(currentStep, r.workload.Replicas) {
klog.Infof("rollout(%s/%s) don't set pause duration, and need manual confirmation", r.rollout.Namespace, r.rollout.Name)
cond.Message = fmt.Sprintf("Rollout is in step(%d/%d), and you need manually confirm to enter the next step", canaryStatus.CurrentStepIndex, steps)
r.newStatus.Message = cond.Message
return false, nil
}
cond.Message = fmt.Sprintf("Rollout is in step(%d/%d), and wait duration(%d seconds) to enter the next step", canaryStatus.CurrentStepIndex, steps, *currentStep.Pause.Duration)
// check duration
var duration time.Duration
if currentStep.Pause.Duration != nil {
duration = time.Second * time.Duration(*currentStep.Pause.Duration)
}
cond.Message = fmt.Sprintf("Rollout is in step(%d/%d), and wait duration(%v) to enter the next step", canaryStatus.CurrentStepIndex, steps, duration)
r.newStatus.Message = cond.Message
// wait duration time, then go to next step
duration := time.Second * time.Duration(*currentStep.Pause.Duration)
expectedTime := canaryStatus.LastUpdateTime.Add(duration)
if expectedTime.Before(time.Now()) {
klog.Infof("rollout(%s/%s) canary step(%d) paused duration(%d seconds), and go to the next step",
r.rollout.Namespace, r.rollout.Name, canaryStatus.CurrentStepIndex, *currentStep.Pause.Duration)
klog.Infof("rollout(%s/%s) canary step(%d) paused duration(%v), and go to the next step",
r.rollout.Namespace, r.rollout.Name, canaryStatus.CurrentStepIndex, duration)
return true, nil
}
r.recheckTime = &expectedTime
@ -292,3 +299,15 @@ func (r *rolloutContext) removeRolloutStateInWorkload() error {
klog.Infof("remove rollout(%s/%s) workload(%s) annotation[%s] success", r.rollout.Namespace, r.rollout.Name, r.workload.Name, util.InRolloutProgressingAnnotation)
return nil
}
func isRolloutReallyCompleted(step rolloutv1alpha1.CanaryStep, replicas int32) bool {
var currentReplicas int
if step.Replicas != nil {
currentReplicas, _ = intstr.GetScaledValueFromIntOrPercent(step.Replicas, int(replicas), true)
} else if step.Weight != nil {
weight := intstr.FromString(fmt.Sprintf("%d%%", *step.Weight))
currentReplicas, _ = intstr.GetScaledValueFromIntOrPercent(&weight, int(replicas), true)
}
return int32(currentReplicas) >= replicas && (step.Weight == nil || *step.Weight == 100)
}

View File

@ -49,6 +49,29 @@ func (r *RolloutReconciler) reconcileRolloutTerminating(rollout *rolloutv1alpha1
return recheckTime, nil
}
func (r *RolloutReconciler) reconcileRolloutDisabled(rollout *rolloutv1alpha1.Rollout) (*time.Time, error) {
cond := util.GetRolloutCondition(rollout.Status, rolloutv1alpha1.RolloutConditionDisabled)
if cond.Reason == rolloutv1alpha1.DisabledReasonCompleted {
return nil, nil
}
newStatus := rollout.Status.DeepCopy()
done, recheckTime, err := r.doFinalising(rollout, newStatus, false)
if err != nil {
return nil, err
} else if done {
klog.Infof("rollout(%s/%s) is disabled, and state from(%s) -> to(%s)", rollout.Namespace, rollout.Name, cond.Reason, rolloutv1alpha1.DisabledReasonCompleted)
cond.Reason = rolloutv1alpha1.DisabledReasonCompleted
cond.Status = corev1.ConditionTrue
util.SetRolloutCondition(newStatus, *cond)
}
err = r.updateRolloutStatusInternal(rollout, *newStatus)
if err != nil {
klog.Errorf("update rollout(%s/%s) status failed: %s", rollout.Namespace, rollout.Name, err.Error())
return nil, err
}
return recheckTime, nil
}
func (r *RolloutReconciler) doFinalising(rollout *rolloutv1alpha1.Rollout, newStatus *rolloutv1alpha1.RolloutStatus, isComplete bool) (bool, *time.Time, error) {
klog.Infof("reconcile rollout(%s/%s) doFinalising", rollout.Namespace, rollout.Name)
// fetch target workload

View File

@ -136,6 +136,8 @@ func (r *RolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
recheckTime, err = r.reconcileRolloutProgressing(rollout)
case rolloutv1alpha1.RolloutPhaseTerminating:
recheckTime, err = r.reconcileRolloutTerminating(rollout)
case rolloutv1alpha1.RolloutPhaseDisabled:
recheckTime, err = r.reconcileRolloutDisabled(rollout)
}
if err != nil {
return ctrl.Result{}, err

View File

@ -49,14 +49,23 @@ func (r *RolloutReconciler) updateRolloutStatus(rollout *rolloutv1alpha1.Rollout
rollout.Status = newStatus
}()
// delete rollout CRD
if !rollout.DeletionTimestamp.IsZero() && newStatus.Phase != rolloutv1alpha1.RolloutPhaseTerminating {
// delete rollout
newStatus.Phase = rolloutv1alpha1.RolloutPhaseTerminating
cond := util.NewRolloutCondition(rolloutv1alpha1.RolloutConditionTerminating, corev1.ConditionFalse, rolloutv1alpha1.TerminatingReasonInTerminating, "Rollout is in terminating")
util.SetRolloutCondition(&newStatus, *cond)
} else if rollout.Spec.Strategy.Disabled && newStatus.Phase != rolloutv1alpha1.RolloutPhaseDisabled {
// disable rollout
newStatus.Phase = rolloutv1alpha1.RolloutPhaseDisabled
cond := util.NewRolloutCondition(rolloutv1alpha1.RolloutConditionDisabled, corev1.ConditionFalse, rolloutv1alpha1.DisabledReasonDisabling, "Rollout is in disabling")
util.SetRolloutCondition(&newStatus, *cond)
} else if !rollout.Spec.Strategy.Disabled && newStatus.Phase == rolloutv1alpha1.RolloutPhaseDisabled {
// recover rollout
newStatus.Phase = rolloutv1alpha1.RolloutPhaseInitial
} else if newStatus.Phase == "" {
newStatus.Phase = rolloutv1alpha1.RolloutPhaseInitial
}
// get ref workload
workload, err := r.Finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef)
if err != nil {

View File

@ -176,7 +176,7 @@ func (h *WorkloadHandler) handleStatefulSetLikeWorkload(newObj, oldObj *unstruct
return
}
// 3. have matched rollout crd
rollout, err := h.fetchMatchedRollout(newObj)
rollout, err := h.fetchMatchedActiveRollout(newObj)
if err != nil {
return
} else if rollout == nil {
@ -236,7 +236,7 @@ func (h *WorkloadHandler) handleDeployment(newObj, oldObj *apps.Deployment) (cha
return
}
// 5. have matched rollout crd
rollout, err := h.fetchMatchedRollout(newObj)
rollout, err := h.fetchMatchedActiveRollout(newObj)
if err != nil {
return
} else if rollout == nil {
@ -267,7 +267,7 @@ func (h *WorkloadHandler) handleCloneSet(newObj, oldObj *kruiseappsv1alpha1.Clon
return
}
// 3. have matched rollout crd
rollout, err := h.fetchMatchedRollout(newObj)
rollout, err := h.fetchMatchedActiveRollout(newObj)
if err != nil {
return
} else if rollout == nil {
@ -288,7 +288,7 @@ func (h *WorkloadHandler) handleCloneSet(newObj, oldObj *kruiseappsv1alpha1.Clon
return
}
func (h *WorkloadHandler) fetchMatchedRollout(obj client.Object) (*appsv1alpha1.Rollout, error) {
func (h *WorkloadHandler) fetchMatchedActiveRollout(obj client.Object) (*appsv1alpha1.Rollout, error) {
oGv := obj.GetObjectKind().GroupVersionKind()
rolloutList := &appsv1alpha1.RolloutList{}
if err := h.Client.List(context.TODO(), rolloutList, utilclient.DisableDeepCopy,
@ -298,7 +298,7 @@ func (h *WorkloadHandler) fetchMatchedRollout(obj client.Object) (*appsv1alpha1.
}
for i := range rolloutList.Items {
rollout := &rolloutList.Items[i]
if !rollout.DeletionTimestamp.IsZero() || rollout.Spec.ObjectRef.WorkloadRef == nil {
if !rollout.DeletionTimestamp.IsZero() || rollout.Spec.Strategy.Disabled || rollout.Spec.ObjectRef.WorkloadRef == nil {
continue
}
ref := rollout.Spec.ObjectRef.WorkloadRef

View File

@ -3668,6 +3668,80 @@ var _ = SIGDescribe("Rollout", func() {
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "4", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "5", 1)
})
It("disable rollout, then recover it", func() {
By("Creating Rollout...")
rollout := &rolloutsv1alpha1.Rollout{}
Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred())
rollout.Spec.ObjectRef.WorkloadRef = &rolloutsv1alpha1.WorkloadRef{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: "echoserver",
}
rollout.Spec.Strategy.Canary.TrafficRoutings = nil
rollout.Annotations = map[string]string{
util.RollbackInBatchAnnotation: "true",
}
rollout.Spec.Strategy.Canary.Steps = []rolloutsv1alpha1.CanaryStep{
{
Weight: utilpointer.Int32(20),
Pause: rolloutsv1alpha1.RolloutPause{
Duration: utilpointer.Int32(10),
},
},
{
Weight: utilpointer.Int32(50),
},
{
Weight: utilpointer.Int32(100),
},
}
CreateObject(rollout)
By("Creating workload and waiting for all pods ready...")
workload := &appsv1alpha1.CloneSet{}
Expect(ReadYamlToObject("./test_data/rollout/cloneset.yaml", workload)).ToNot(HaveOccurred())
workload.Spec.Template.Spec.Containers[0].ImagePullPolicy = "IfNotPresent"
CreateObject(workload)
WaitCloneSetAllPodsReady(workload)
By("Update cloneSet env NODE_NAME from(version1) -> to(version2)")
newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"})
workload.Spec.Template.Spec.Containers[0].Env = newEnvs
UpdateCloneSet(workload)
By("Disable rollout....")
WaitRolloutCanaryStepPaused(rollout.Name, 2)
rollout.Spec.Strategy.Disabled = true
UpdateRollout(rollout)
By("Check rollout/workload status after disable")
WaitRolloutStatusPhase(rollout.Name, rolloutsv1alpha1.RolloutPhaseDisabled)
WaitCloneSetAllPodsReady(workload)
By("Update cloneSet env NODE_NAME from(version2) -> to(version3)")
newEnvs = mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version3"})
workload.Spec.Template.Spec.Containers[0].Env = newEnvs
UpdateCloneSet(workload)
WaitCloneSetAllPodsReady(workload)
By("Recover rollout and check...")
rollout.Spec.Strategy.Disabled = false
UpdateRollout(rollout)
WaitRolloutStatusPhase(rollout.Name, rolloutsv1alpha1.RolloutPhaseHealthy)
By("Update cloneSet env NODE_NAME from(version3) -> to(version4)")
newEnvs = mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version4"})
workload.Spec.Template.Spec.Containers[0].Env = newEnvs
UpdateCloneSet(workload)
time.Sleep(10 * time.Second)
By("Wait 2-th step paused and resume, then check...")
WaitRolloutCanaryStepPaused(rollout.Name, 2)
ResumeRolloutCanary(rollout.Name)
WaitRolloutStatusPhase(rollout.Name, rolloutsv1alpha1.RolloutPhaseHealthy)
WaitCloneSetAllPodsReady(workload)
})
})
})