rollouts/pkg/controller/rollout/progressing.go

343 lines
15 KiB
Go

/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rollout
import (
"context"
"fmt"
"strconv"
"time"
rolloutv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/controller/rollout/batchrelease"
"github.com/openkruise/rollouts/pkg/util"
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
)
var defaultGracePeriodSeconds int32 = 3
// parameter1 retryReconcile, parameter2 error
func (r *RolloutReconciler) reconcileRolloutProgressing(rollout *rolloutv1alpha1.Rollout) (*time.Time, error) {
cond := util.GetRolloutCondition(rollout.Status, rolloutv1alpha1.RolloutConditionProgressing)
klog.Infof("reconcile rollout(%s/%s) progressing action", rollout.Namespace, rollout.Name)
workload, err := r.Finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef)
if err != nil {
klog.Errorf("rollout(%s/%s) get workload failed: %s", rollout.Namespace, rollout.Name, err.Error())
return nil, err
} else if workload == nil {
klog.Errorf("rollout(%s/%s) workload Not Found", rollout.Namespace, rollout.Name)
return nil, nil
} else if !workload.IsStatusConsistent {
klog.Infof("rollout(%s/%s) workload status isn't consistent, then wait a moment", rollout.Namespace, rollout.Name)
return nil, nil
}
var recheckTime *time.Time
newStatus := rollout.Status.DeepCopy()
switch cond.Reason {
case rolloutv1alpha1.ProgressingReasonInitializing:
klog.Infof("rollout(%s/%s) is Progressing, and in reason(%s)", rollout.Namespace, rollout.Name, cond.Reason)
// new canaryStatus
newStatus.CanaryStatus = &rolloutv1alpha1.CanaryStatus{}
done, _, err := r.doProgressingInitializing(rollout, newStatus)
if err != nil {
klog.Errorf("rollout(%s/%s) doProgressingInitializing error(%s)", rollout.Namespace, rollout.Name, err.Error())
return nil, err
} else if done {
progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonInRolling, "Rollout is in Progressing")
} else {
// Incomplete, recheck
expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
recheckTime = &expectedTime
klog.Infof("rollout(%s/%s) doProgressingInitializing is incomplete, and recheck(%s)", rollout.Namespace, rollout.Name, expectedTime.String())
}
case rolloutv1alpha1.ProgressingReasonInRolling:
// rollout canceled, indicates rollback(v1 -> v2 -> v1)
if workload.IsInRollback {
newStatus.CanaryStatus.CanaryRevision = workload.CanaryRevision
r.Recorder.Eventf(rollout, corev1.EventTypeNormal, "Progressing", "workload has been rollback, then rollout is canceled")
klog.Infof("rollout(%s/%s) workload has been rollback, then rollout canceled", rollout.Namespace, rollout.Name)
progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonCancelling, "The workload has been rolled back and the rollout process will be cancelled")
// paused rollout progress
} else if rollout.Spec.Strategy.Paused {
klog.Infof("rollout(%s/%s) is Progressing, but paused", rollout.Namespace, rollout.Name)
progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonPaused, "Rollout has been paused, you can resume it by kube-cli")
// In case of continuous publishing(v1 -> v2 -> v3), then restart publishing
} else if newStatus.CanaryStatus.CanaryRevision != "" && workload.CanaryRevision != newStatus.CanaryStatus.CanaryRevision {
r.Recorder.Eventf(rollout, corev1.EventTypeNormal, "Progressing", "workload continuous publishing canaryRevision, then restart publishing")
klog.Infof("rollout(%s/%s) workload continuous publishing canaryRevision from(%s) -> to(%s), then restart publishing",
rollout.Namespace, rollout.Name, newStatus.CanaryStatus.CanaryRevision, workload.CanaryRevision)
done, err := r.doProgressingReset(rollout, newStatus)
if err != nil {
klog.Errorf("rollout(%s/%s) doProgressingReset failed: %s", rollout.Namespace, rollout.Name, err.Error())
return nil, err
} else if done {
progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonInitializing, "Workload is continuous release")
klog.Infof("rollout(%s/%s) workload is continuous publishing, reset complete", rollout.Namespace, rollout.Name)
} else {
// Incomplete, recheck
expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
recheckTime = &expectedTime
klog.Infof("rollout(%s/%s) workload is continuous publishing, reset incomplete, and recheck(%s)", rollout.Namespace, rollout.Name, expectedTime.String())
}
// rollout canary steps configuration change
} else if newStatus.CanaryStatus.RolloutHash != "" && newStatus.CanaryStatus.RolloutHash != rollout.Annotations[util.RolloutHashAnnotation] {
batchControl := batchrelease.NewInnerBatchController(r.Client, rollout)
newStepIndex, err := r.reCalculateCanaryStepIndex(rollout, batchControl)
if err != nil {
klog.Errorf("rollout(%s/%s) reCalculate Canary StepIndex failed: %s", rollout.Namespace, rollout.Name, err.Error())
return nil, err
}
// canary step configuration change causes current step index change
newStatus.CanaryStatus.CurrentStepIndex = newStepIndex
newStatus.CanaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateUpgrade
newStatus.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
newStatus.CanaryStatus.RolloutHash = rollout.Annotations[util.RolloutHashAnnotation]
klog.Infof("rollout(%s/%s) canary step configuration change, and stepIndex(%d) state(%s)",
rollout.Namespace, rollout.Name, newStatus.CanaryStatus.CurrentStepIndex, newStatus.CanaryStatus.CurrentStepState)
} else {
klog.Infof("rollout(%s/%s) is Progressing, and in reason(%s)", rollout.Namespace, rollout.Name, cond.Reason)
//check if canary is done
if newStatus.CanaryStatus.CurrentStepState == rolloutv1alpha1.CanaryStepStateCompleted {
klog.Infof("rollout(%s/%s) progressing rolling done", rollout.Namespace, rollout.Name)
progressingStateTransition(newStatus, corev1.ConditionTrue, rolloutv1alpha1.ProgressingReasonFinalising, "Rollout has been completed and some closing work is being done")
} else { // rollout is in rolling
newStatus.CanaryStatus.PodTemplateHash = workload.PodTemplateHash
recheckTime, err = r.doProgressingInRolling(rollout, newStatus)
if err != nil {
return nil, err
}
}
}
// after the normal completion of rollout, enter into the Finalising process
case rolloutv1alpha1.ProgressingReasonFinalising:
klog.Infof("rollout(%s/%s) is Progressing, and in reason(%s)", rollout.Namespace, rollout.Name, cond.Reason)
var done bool
done, recheckTime, err = r.doFinalising(rollout, newStatus, true)
if err != nil {
return nil, err
// finalizer is finished
} else if done {
progressingStateTransition(newStatus, corev1.ConditionTrue, rolloutv1alpha1.ProgressingReasonSucceeded, "Rollout has been completed, and succeed")
}
case rolloutv1alpha1.ProgressingReasonPaused:
// rollout canceled, indicates rollback(v1 -> v2 -> v1)
if workload.IsInRollback {
newStatus.CanaryStatus.CanaryRevision = workload.CanaryRevision
r.Recorder.Eventf(rollout, corev1.EventTypeNormal, "Progressing", "workload has been rollback, then rollout is canceled")
klog.Infof("rollout(%s/%s) workload has been rollback, then rollout canceled", rollout.Namespace, rollout.Name)
progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonCancelling, "The workload has been rolled back and the rollout process will be cancelled")
// from paused to inRolling
} else if !rollout.Spec.Strategy.Paused {
klog.Infof("rollout(%s/%s) is Progressing, but paused", rollout.Namespace, rollout.Name)
progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonInRolling, "")
}
case rolloutv1alpha1.ProgressingReasonCancelling:
klog.Infof("rollout(%s/%s) is Progressing, and in reason(%s)", rollout.Namespace, rollout.Name, cond.Reason)
var done bool
done, recheckTime, err = r.doFinalising(rollout, newStatus, false)
if err != nil {
return nil, err
// finalizer is finished
} else if done {
progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonCanceled, "")
}
case rolloutv1alpha1.ProgressingReasonSucceeded, rolloutv1alpha1.ProgressingReasonCanceled:
klog.Infof("rollout(%s/%s) is Progressing, and in reason(%s)", rollout.Namespace, rollout.Name, cond.Reason)
}
err = r.updateRolloutStatusInternal(rollout, *newStatus)
if err != nil {
klog.Errorf("update rollout(%s/%s) status failed: %s", rollout.Namespace, rollout.Name, err.Error())
return nil, err
}
return recheckTime, nil
}
func progressingStateTransition(status *rolloutv1alpha1.RolloutStatus, condStatus corev1.ConditionStatus, reason, message string) {
cond := util.GetRolloutCondition(*status, rolloutv1alpha1.RolloutConditionProgressing)
if cond == nil {
cond = util.NewRolloutCondition(rolloutv1alpha1.RolloutConditionProgressing, condStatus, reason, message)
} else {
cond.Status = condStatus
cond.Reason = reason
if message != "" {
cond.Message = message
}
}
util.SetRolloutCondition(status, *cond)
status.Message = cond.Message
}
func (r *RolloutReconciler) doProgressingInitializing(rollout *rolloutv1alpha1.Rollout, newStatus *rolloutv1alpha1.RolloutStatus) (bool, string, error) {
// canary release
return r.verifyCanaryStrategy(rollout, newStatus)
}
func (r *RolloutReconciler) doProgressingInRolling(rollout *rolloutv1alpha1.Rollout, newStatus *rolloutv1alpha1.RolloutStatus) (*time.Time, error) {
// fetch target workload
workload, err := r.Finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef)
if err != nil {
klog.Errorf("rollout(%s/%s) GetWorkloadForRef failed: %s", rollout.Namespace, rollout.Name, err.Error())
return nil, err
} else if workload == nil {
expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
klog.Warningf("rollout(%s/%s) Fetch workload Not Found, and recheck(%s)", rollout.Namespace, rollout.Name, expectedTime.String())
return &expectedTime, nil
}
rolloutCon := &rolloutContext{
Client: r.Client,
rollout: rollout,
newStatus: newStatus,
workload: workload,
batchControl: batchrelease.NewInnerBatchController(r.Client, rollout),
recorder: r.Recorder,
}
err = rolloutCon.reconcile()
if err != nil {
klog.Errorf("rollout(%s/%s) Progressing failed: %s", rollout.Namespace, rollout.Name, err.Error())
return nil, err
}
return rolloutCon.recheckTime, nil
}
func (r *RolloutReconciler) doProgressingReset(rollout *rolloutv1alpha1.Rollout, newStatus *rolloutv1alpha1.RolloutStatus) (bool, error) {
rolloutCon := &rolloutContext{
Client: r.Client,
rollout: rollout,
newStatus: newStatus,
batchControl: batchrelease.NewInnerBatchController(r.Client, rollout),
recorder: r.Recorder,
}
if rolloutCon.rollout.Spec.Strategy.Canary.TrafficRoutings != nil {
// 1. remove stable service podRevision selector
done, err := rolloutCon.restoreStableService()
if err != nil || !done {
return done, err
}
// 2. route all traffic to stable service
done, err = rolloutCon.doFinalisingTrafficRouting()
if err != nil || !done {
return done, err
}
}
// 3. delete batchRelease CRD
done, err := rolloutCon.batchControl.Finalize()
if err != nil {
klog.Errorf("rollout(%s/%s) DoFinalising batchRelease failed: %s", rollout.Namespace, rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
}
return true, nil
}
func (r *RolloutReconciler) verifyCanaryStrategy(rollout *rolloutv1alpha1.Rollout, newStatus *rolloutv1alpha1.RolloutStatus) (bool, string, error) {
canary := rollout.Spec.Strategy.Canary
// Traffic routing
if canary.TrafficRoutings != nil {
if ok, msg, err := r.verifyTrafficRouting(rollout.Namespace, canary.TrafficRoutings[0]); !ok {
return ok, msg, err
}
}
// It is not allowed to modify the rollout.spec in progressing phase (validate webhook rollout),
// but in many scenarios the user may modify the workload and rollout spec at the same time,
// and there is a possibility that the workload is released first, and due to some network or other reasons the rollout spec is delayed by a few seconds,
// so this is mainly compatible with this scenario.
cond := util.GetRolloutCondition(*newStatus, rolloutv1alpha1.RolloutConditionProgressing)
if verifyTime := cond.LastUpdateTime.Add(time.Second * time.Duration(defaultGracePeriodSeconds)); verifyTime.After(time.Now()) {
klog.Infof("verify rollout(%s/%s) TrafficRouting done, and wait a moment", rollout.Namespace, rollout.Name)
return false, "", nil
}
return true, "", nil
}
func (r *RolloutReconciler) verifyTrafficRouting(ns string, tr *rolloutv1alpha1.TrafficRouting) (bool, string, error) {
// check service
service := &corev1.Service{}
err := r.Get(context.TODO(), types.NamespacedName{Namespace: ns, Name: tr.Service}, service)
if err != nil {
if errors.IsNotFound(err) {
return false, fmt.Sprintf("Service(%s/%s) is Not Found", ns, tr.Service), nil
}
return false, "", err
}
// check ingress
var ingressName string
switch tr.Type {
case "nginx":
ingressName = tr.Ingress.Name
}
ingress := &netv1.Ingress{}
err = r.Get(context.TODO(), types.NamespacedName{Namespace: ns, Name: ingressName}, ingress)
if err != nil {
if errors.IsNotFound(err) {
return false, fmt.Sprintf("Ingress(%s/%s) is Not Found", ns, ingressName), nil
}
return false, "", err
}
return true, "", nil
}
func (r *RolloutReconciler) reCalculateCanaryStepIndex(rollout *rolloutv1alpha1.Rollout, batchControl batchrelease.BatchRelease) (int32, error) {
batch, err := batchControl.FetchBatchRelease()
if errors.IsNotFound(err) {
return 1, nil
} else if err != nil {
return 0, err
}
workload, err := r.Finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef)
if err != nil {
klog.Errorf("rollout(%s/%s) get workload failed: %s", rollout.Namespace, rollout.Name, err.Error())
return 0, err
}
currentReplicas, _ := intstr.GetScaledValueFromIntOrPercent(&batch.Spec.ReleasePlan.Batches[*batch.Spec.ReleasePlan.BatchPartition].CanaryReplicas, int(workload.Replicas), true)
var stepIndex int32
for i := range rollout.Spec.Strategy.Canary.Steps {
step := rollout.Spec.Strategy.Canary.Steps[i]
var desiredReplicas int
if step.Replicas != nil {
desiredReplicas, _ = intstr.GetScaledValueFromIntOrPercent(step.Replicas, int(workload.Replicas), true)
} else {
replicas := intstr.FromString(strconv.Itoa(int(step.Weight)) + "%")
desiredReplicas, _ = intstr.GetScaledValueFromIntOrPercent(&replicas, int(workload.Replicas), true)
}
stepIndex = int32(i + 1)
if currentReplicas <= desiredReplicas {
break
}
}
return stepIndex, nil
}