update canary message when revision or rollout-id changed

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
This commit is contained in:
mingzhou.swx 2022-10-09 11:55:02 +08:00
parent f21c3fb763
commit 147395d49b
5 changed files with 184 additions and 8 deletions

30
api/v1alpha1/helper.go Normal file
View File

@ -0,0 +1,30 @@
package v1alpha1
import "encoding/json"
// CanaryMessage record each changes of rollout-id in Rollout.Status.CanaryStatus.Message field.
// PaaS can use this message to judge whether pods would be upgraded when users publishing resources.
// For example, users may just modify Service instead of Workload, PaaS can use this message to distinguish such scene.
type CanaryMessage struct {
// RolloutID indicate the corresponding to rollout-id recorded in this message
RolloutID string `json:"rolloutID,omitempty"`
// RevisionChanged is true if workload revision changed when rollout-id equals the above RolloutID.
RevisionChanged bool `json:"revisionChanged,omitempty"`
}
// DecodeCanaryMessage decode message as CanaryMessage
func DecodeCanaryMessage(message string) *CanaryMessage {
structured := &CanaryMessage{}
_ = json.Unmarshal([]byte(message), structured)
return structured
}
// EncodeCanaryMessage build a new message based on parameters
func EncodeCanaryMessage(revisionChanged bool, rolloutID string) string {
structured := &CanaryMessage{
RevisionChanged: revisionChanged,
RolloutID: rolloutID,
}
message, _ := json.Marshal(structured)
return string(message)
}

View File

@ -154,6 +154,21 @@ func (in *BatchReleaseStatus) DeepCopy() *BatchReleaseStatus {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CanaryMessage) DeepCopyInto(out *CanaryMessage) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CanaryMessage.
func (in *CanaryMessage) DeepCopy() *CanaryMessage {
if in == nil {
return nil
}
out := new(CanaryMessage)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CanaryStatus) DeepCopyInto(out *CanaryStatus) {
*out = *in

View File

@ -57,7 +57,6 @@ func (r *RolloutReconciler) reconcileRolloutProgressing(rollout *rolloutv1alpha1
case rolloutv1alpha1.ProgressingReasonInitializing:
klog.Infof("rollout(%s/%s) is Progressing, and in reason(%s)", rollout.Namespace, rollout.Name, cond.Reason)
// new canaryStatus
newStatus.CanaryStatus = &rolloutv1alpha1.CanaryStatus{}
done, _, err := r.doProgressingInitializing(rollout, newStatus)
if err != nil {
klog.Errorf("rollout(%s/%s) doProgressingInitializing error(%s)", rollout.Namespace, rollout.Name, err.Error())
@ -141,6 +140,7 @@ func progressingStateTransition(status *rolloutv1alpha1.RolloutStatus, condStatu
func (r *RolloutReconciler) doProgressingInitializing(rollout *rolloutv1alpha1.Rollout, newStatus *rolloutv1alpha1.RolloutStatus) (bool, string, error) {
// canary release
resetCanaryStatus(newStatus)
return r.verifyCanaryStrategy(rollout, newStatus)
}
@ -246,3 +246,10 @@ func (r *RolloutReconciler) reCalculateCanaryStepIndex(rollout *rolloutv1alpha1.
}
return stepIndex, nil
}
func resetCanaryStatus(newStatus *rolloutv1alpha1.RolloutStatus) {
// Message cannot be cleaned up here
canaryStatus := &rolloutv1alpha1.CanaryStatus{}
canaryStatus.Message = newStatus.CanaryStatus.Message
newStatus.CanaryStatus = canaryStatus
}

View File

@ -77,15 +77,29 @@ func (r *RolloutReconciler) updateRolloutStatus(rollout *rolloutv1alpha1.Rollout
done = false
return
}
newStatus.StableRevision = workload.StableRevision
currentRolloutID := getRolloutID(workload, rollout)
// update CanaryStatus for newStatus if we need
// update workload generation to canaryStatus.ObservedWorkloadGeneration
// rollout is a target ref bypass, so there needs to be a field to identify the rollout execution process or results,
// which version of deployment is targeted, ObservedWorkloadGeneration that is to compare with the workload generation
if newStatus.CanaryStatus != nil && newStatus.CanaryStatus.CanaryRevision != "" &&
newStatus.CanaryStatus.CanaryRevision == workload.CanaryRevision {
newStatus.CanaryStatus.ObservedRolloutID = getRolloutID(workload, rollout)
newStatus.CanaryStatus.ObservedWorkloadGeneration = workload.Generation
if newStatus.CanaryStatus != nil && newStatus.CanaryStatus.CanaryRevision != "" {
// update canaryStatus message if we need
if isMeaninglessChangesOfRolloutID(&newStatus, workload, currentRolloutID) {
newStatus.CanaryStatus.Message = rolloutv1alpha1.EncodeCanaryMessage(false, currentRolloutID)
}
// update observed rollout-id and workload generation if we need
if isCanaryRevisionNotChanged(&newStatus, workload) {
newStatus.CanaryStatus.ObservedRolloutID = currentRolloutID
newStatus.CanaryStatus.ObservedWorkloadGeneration = workload.Generation
} else {
// revision changed, update the canary message
newStatus.CanaryStatus.Message = rolloutv1alpha1.EncodeCanaryMessage(true, currentRolloutID)
klog.Infof("checkpoint: update message meaningful %s", newStatus.CanaryStatus.Message)
}
}
// fresh and update stable revision
newStatus.StableRevision = workload.StableRevision
switch newStatus.Phase {
case rolloutv1alpha1.RolloutPhaseInitial:
@ -112,16 +126,16 @@ func (r *RolloutReconciler) updateRolloutStatus(rollout *rolloutv1alpha1.Rollout
// But at the first deployment of Rollout/Workload, CanaryStatus isn't set due to no rollout progression,
// and PaaS platform cannot judge whether the deployment is completed base on the code above. So we have
// to update the status just like the rollout was completed.
newStatus.CanaryStatus = &rolloutv1alpha1.CanaryStatus{
CanaryReplicas: workload.CanaryReplicas,
CanaryReadyReplicas: workload.CanaryReadyReplicas,
ObservedRolloutID: getRolloutID(workload, rollout),
ObservedRolloutID: currentRolloutID,
ObservedWorkloadGeneration: workload.Generation,
PodTemplateHash: workload.PodTemplateHash,
CanaryRevision: workload.CanaryRevision,
CurrentStepIndex: int32(len(rollout.Spec.Strategy.Canary.Steps)),
CurrentStepState: rolloutv1alpha1.CanaryStepStateCompleted,
Message: rolloutv1alpha1.EncodeCanaryMessage(false, currentRolloutID),
}
newStatus.Message = "workload deployment is completed"
}
@ -196,3 +210,26 @@ func (r *RolloutReconciler) calculateRolloutHash(rollout *rolloutv1alpha1.Rollou
func hash(data string) string {
return fmt.Sprintf("%x", sha256.Sum256([]byte(data)))
}
// isMeaninglessChangesOfRolloutID return true if rollout-id changed, but rollout is not triggered
func isMeaninglessChangesOfRolloutID(newStatus *rolloutv1alpha1.RolloutStatus, workload *util.Workload, newRolloutID string) bool {
if newRolloutID == "" {
return false
}
// just return false if no changes of rollout-id
if newRolloutID == newStatus.CanaryStatus.ObservedRolloutID {
return false
}
record := rolloutv1alpha1.DecodeCanaryMessage(newStatus.CanaryStatus.Message)
if record.RolloutID == newRolloutID {
return false
}
// rollout-id changed but revision changes has been observed by Rollout
return isCanaryRevisionNotChanged(newStatus, workload) && newStatus.StableRevision == workload.StableRevision
}
// observedCanaryRevision return true if newStatus.CanaryStatus.CanaryRevision == workload.CanaryRevision
func isCanaryRevisionNotChanged(newStatus *rolloutv1alpha1.RolloutStatus, workload *util.Workload) bool {
return newStatus.CanaryStatus.CanaryRevision == workload.CanaryRevision
}

View File

@ -3663,6 +3663,93 @@ var _ = SIGDescribe("Rollout", func() {
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "4", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "5", 1)
})
It("check canary status message", func() {
By("Creating Rollout...")
rollout := &rolloutsv1alpha1.Rollout{}
Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred())
rollout.Spec.ObjectRef.WorkloadRef = &rolloutsv1alpha1.WorkloadRef{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: "echoserver",
}
rollout.Annotations = map[string]string{
util.RollbackInBatchAnnotation: "true",
}
rollout.Spec.Strategy.Canary.TrafficRoutings = nil
rollout.Spec.Strategy.Canary.Steps = []rolloutsv1alpha1.CanaryStep{
{
Weight: utilpointer.Int32(20),
Pause: rolloutsv1alpha1.RolloutPause{
Duration: utilpointer.Int32(10),
},
},
{
Weight: utilpointer.Int32(50),
Pause: rolloutsv1alpha1.RolloutPause{},
},
{
Weight: utilpointer.Int32(100),
Pause: rolloutsv1alpha1.RolloutPause{
Duration: utilpointer.Int32(10),
},
},
}
CreateObject(rollout)
By("Creating workload and waiting for all pods ready...")
workload := &appsv1alpha1.CloneSet{}
Expect(ReadYamlToObject("./test_data/rollout/cloneset.yaml", workload)).ToNot(HaveOccurred())
workload.Labels[util.RolloutIDLabel] = "1"
CreateObject(workload)
WaitCloneSetAllPodsReady(workload)
By("check after first deployment")
Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred())
record := rolloutsv1alpha1.DecodeCanaryMessage(rollout.Status.CanaryStatus.Message)
Expect(record.RevisionChanged).Should(BeFalse())
Expect(record.RolloutID).Should(Equal("1"))
By("only update rollout-id from 1-> 2")
workload.Labels[util.RolloutIDLabel] = "2"
UpdateCloneSet(workload)
time.Sleep(5 * time.Second)
Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred())
record = rolloutsv1alpha1.DecodeCanaryMessage(rollout.Status.CanaryStatus.Message)
Expect(record.RevisionChanged).Should(BeFalse())
Expect(record.RolloutID).Should(Equal("2"))
By("update revision and check")
GetObject(workload.Name, workload)
By(fmt.Sprintf("%v", workload.Spec.Template.Spec.Containers[0].Env))
newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"})
workload.Spec.Template.Spec.Containers[0].Env = newEnvs
UpdateCloneSet(workload)
WaitRolloutCanaryStepPaused(rollout.Name, 2)
Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred())
record = rolloutsv1alpha1.DecodeCanaryMessage(rollout.Status.CanaryStatus.Message)
Expect(record.RevisionChanged).Should(BeTrue())
Expect(record.RolloutID).Should(Equal("2"))
By("Update rollout-id from 2 to 3, but revision is not changed")
workload.Labels[util.RolloutIDLabel] = "3"
UpdateCloneSet(workload)
time.Sleep(5 * time.Second)
Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred())
record = rolloutsv1alpha1.DecodeCanaryMessage(rollout.Status.CanaryStatus.Message)
Expect(record.RevisionChanged).Should(BeFalse())
Expect(record.RolloutID).Should(Equal("3"))
By("rollback revision, update rollout id 3 to 4")
newEnvs = mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version1"})
workload.Spec.Template.Spec.Containers[0].Env = newEnvs
workload.Labels[util.RolloutIDLabel] = "4"
UpdateCloneSet(workload)
WaitRolloutCanaryStepPaused(rollout.Name, 1)
Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred())
record = rolloutsv1alpha1.DecodeCanaryMessage(rollout.Status.CanaryStatus.Message)
Expect(record.RevisionChanged).Should(BeTrue())
Expect(record.RolloutID).Should(Equal("4"))
})
})
})