patch batch index to pods during rollout

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
This commit is contained in:
mingzhou.swx 2022-06-09 11:16:26 +08:00
parent 53d32dccb2
commit 12ccfca53f
21 changed files with 672 additions and 136 deletions

View File

@ -42,10 +42,6 @@ type ReleasePlan struct {
// BatchPartition start from 0.
// +optional
BatchPartition *int32 `json:"batchPartition,omitempty"`
// Paused the rollout, the release progress will be paused util paused is false.
// default is false
// +optional
Paused bool `json:"paused,omitempty"`
}
// ReleaseBatch is used to describe how each batch release should be
@ -82,7 +78,7 @@ type BatchReleaseStatus struct {
// newest canary Deployment.
// +optional
CollisionCount *int32 `json:"collisionCount,omitempty"`
// ObservedReleasePlanHash is a hash code of observed itself releasePlan.Batches.
// ObservedReleasePlanHash is a hash code of observed itself spec.releasePlan.
ObservedReleasePlanHash string `json:"observedReleasePlanHash,omitempty"`
// Phase is the release plan phase, which indicates the current state of release
// plan state machine in BatchRelease controller.

View File

@ -44,6 +44,10 @@ type BatchReleaseSpec struct {
TargetRef ObjectRef `json:"targetReference"`
// ReleasePlan is the details on how to rollout the resources
ReleasePlan ReleasePlan `json:"releasePlan"`
// Paused the rollout, the release progress will be paused util paused is false.
// default is false
// +optional
Paused bool `json:"paused,omitempty"`
}
type DeploymentReleaseStrategyType string

View File

@ -52,6 +52,10 @@ spec:
description: BatchReleaseSpec defines how to describe an update between
different compRevision
properties:
paused:
description: Paused the rollout, the release progress will be paused
util paused is false. default is false
type: boolean
releasePlan:
description: ReleasePlan is the details on how to rollout the resources
properties:
@ -93,10 +97,6 @@ spec:
- canaryReplicas
type: object
type: array
paused:
description: Paused the rollout, the release progress will be
paused util paused is false. default is false
type: boolean
type: object
targetReference:
description: TargetRef contains the GVK and name of the workload that
@ -211,7 +211,7 @@ spec:
type: integer
observedReleasePlanHash:
description: ObservedReleasePlanHash is a hash code of observed itself
releasePlan.Batches.
spec.releasePlan.
type: string
observedWorkloadReplicas:
description: ObservedWorkloadReplicas is observed replicas of target

View File

@ -48,9 +48,9 @@ spec:
resources:
limits:
cpu: 100m
memory: 30Mi
memory: 100Mi
requests:
cpu: 100m
memory: 20Mi
memory: 100Mi
serviceAccountName: controller-manager
terminationGracePeriodSeconds: 10

View File

@ -54,14 +54,22 @@ type podEventHandler struct {
}
func (p podEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
pod, ok := evt.Object.(*corev1.Pod)
if !ok {
return
}
p.enqueue(pod, q)
}
func (p podEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
}
func (p podEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
}
func (p podEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
oldPod := evt.ObjectOld.(*corev1.Pod)
newPod := evt.ObjectNew.(*corev1.Pod)
oldPod, oldOK := evt.ObjectOld.(*corev1.Pod)
newPod, newOK := evt.ObjectNew.(*corev1.Pod)
if !oldOK || !newOK {
return
}
if oldPod.ResourceVersion == newPod.ResourceVersion || util.IsPodReady(oldPod) == util.IsPodReady(newPod) {
return
}
@ -79,9 +87,9 @@ func (p podEventHandler) enqueue(pod *corev1.Pod, q workqueue.RateLimitingInterf
Name: owner.Name, Namespace: pod.Namespace,
}
workloadGVK := schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind)
workloadObj := util.GetEmptyWorkloadObject(workloadGVK)
err := p.Get(context.TODO(), workloadNamespacedName, workloadObj)
if err != nil {
workloadObj, err := util.GetOwnerWorkload(p.Reader, pod)
if err != nil || workloadObj == nil {
klog.Errorf("Failed to get owner workload for pod %v, err: %v", client.ObjectKeyFromObject(pod), err)
return
}

View File

@ -24,6 +24,7 @@ import (
appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/controller/batchrelease/workloads"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -261,6 +262,12 @@ func (r *Executor) GetWorkloadController() (workloads.WorkloadController, error)
}
gvk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind)
if !util.IsSupportedWorkload(gvk) {
message := fmt.Sprintf("the workload type '%v' is not supported", gvk)
r.recorder.Event(r.release, v1.EventTypeWarning, "UnsupportedWorkload", message)
return nil, fmt.Errorf(message)
}
targetKey := types.NamespacedName{
Namespace: r.release.Namespace,
Name: targetRef.Name,

View File

@ -57,7 +57,7 @@ func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadContr
message = "Release plan is deleted or cancelled, then terminate"
signalTerminating(r.releaseStatus)
case isPlanPaused(workloadEvent, r.releasePlan, r.releaseStatus):
case isPlanPaused(workloadEvent, r.release, r.releaseStatus):
// handle the case that releasePlan.paused = true
reason = "PlanPaused"
message = "release plan is paused, then stop reconcile"
@ -184,8 +184,8 @@ func isPlanUnhealthy(plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseSt
return int(status.CanaryStatus.CurrentBatch) >= len(plan.Batches) && status.Phase == v1alpha1.RolloutPhaseProgressing
}
func isPlanPaused(event workloads.WorkloadEventType, plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseStatus) bool {
return plan.Paused && status.Phase == v1alpha1.RolloutPhaseProgressing && !isWorkloadGone(event, status)
func isPlanPaused(event workloads.WorkloadEventType, release *v1alpha1.BatchRelease, status *v1alpha1.BatchReleaseStatus) bool {
return release.Spec.Paused && status.Phase == v1alpha1.RolloutPhaseProgressing && !isWorkloadGone(event, status)
}
func isGetWorkloadInfoError(err error) bool {

View File

@ -170,6 +170,12 @@ func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) {
}
}
// patch current batch label to pods
patchDone, err := c.patchPodBatchLabel(canaryGoal)
if !patchDone || err != nil {
return false, err
}
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "SetBatchDone",
"Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
@ -330,3 +336,20 @@ func (c *CloneSetRolloutController) recordCloneSetRevisionAndReplicas() {
c.releaseStatus.StableRevision = c.clone.Status.CurrentRevision
c.releaseStatus.UpdateRevision = c.clone.Status.UpdateRevision
}
func (c *CloneSetRolloutController) patchPodBatchLabel(canaryGoal int32) (bool, error) {
rolloutID, exist := c.parentController.Labels[util.RolloutIDLabel]
if !exist || rolloutID == "" {
return true, nil
}
pods, err := util.ListOwnedPods(c.client, c.clone)
if err != nil {
klog.Errorf("Failed to list pods for CloneSet %v", c.targetNamespacedName)
return false, err
}
batchID := c.parentController.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.parentController.Status.UpdateRevision
return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, c.releasePlanKey)
}

View File

@ -153,6 +153,12 @@ func (c *DeploymentsRolloutController) UpgradeOneBatch() (bool, error) {
}
}
// patch current batch label to pods
patchDone, err := c.patchPodBatchLabel(canaryGoal)
if !patchDone || err != nil {
return false, err
}
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "Batch Rollout", "Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
}
@ -364,3 +370,20 @@ func (c *DeploymentsRolloutController) recordDeploymentRevisionAndReplicas() err
c.releaseStatus.ObservedWorkloadReplicas = *c.stable.Spec.Replicas
return nil
}
func (c *DeploymentsRolloutController) patchPodBatchLabel(canaryGoal int32) (bool, error) {
rolloutID, exist := c.parentController.Labels[util.RolloutIDLabel]
if !exist || rolloutID == "" || c.canary == nil {
return true, nil
}
pods, err := util.ListOwnedPods(c.client, c.canary)
if err != nil {
klog.Errorf("Failed to list pods for Deployment %v", c.stableNamespacedName)
return false, err
}
batchID := c.parentController.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.parentController.Status.UpdateRevision
return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, c.releaseKey)
}

View File

@ -18,14 +18,12 @@ package workloads
import (
"context"
"fmt"
appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
utilclient "github.com/openkruise/rollouts/pkg/util/client"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
@ -75,6 +73,14 @@ func (c *StatefulSetLikeController) GetWorkloadInfo() (*util.WorkloadInfo, error
workloadInfo := util.ParseStatefulSetInfo(set, c.namespacedName)
workloadInfo.Paused = true
if workloadInfo.Status.UpdatedReadyReplicas <= 0 {
updatedReadyReplicas, err := c.countUpdatedReadyPods(workloadInfo.Status.UpdateRevision)
if err != nil {
return nil, err
}
workloadInfo.Status.UpdatedReadyReplicas = updatedReadyReplicas
}
return workloadInfo, nil
}
@ -85,6 +91,7 @@ func (c *StatefulSetLikeController) ClaimWorkload() (bool, error) {
}
err = util.ClaimWorkload(c.Client, c.planController, set, map[string]interface{}{
"type": apps.RollingUpdateStatefulSetStrategyType,
"rollingUpdate": map[string]interface{}{
"partition": pointer.Int32(util.ParseReplicasFrom(set)),
},
@ -122,9 +129,9 @@ func (c *StatefulSetLikeController) UpgradeBatch(canaryReplicasGoal, stableRepli
return false, err
}
observedReplicas := canaryReplicasGoal + stableReplicasGoal
if observedReplicas != util.ParseReplicasFrom(set) {
return false, fmt.Errorf("StatefulSet(%v) scaled, should handle scale event first", c.namespacedName)
// if no needs to patch partition
if isStatefulSetUpgradedDone(set, canaryReplicasGoal, stableReplicasGoal) {
return true, nil
}
err = util.PatchSpec(c.Client, set, map[string]interface{}{
@ -142,20 +149,6 @@ func (c *StatefulSetLikeController) UpgradeBatch(canaryReplicasGoal, stableRepli
return true, nil
}
func (c *StatefulSetLikeController) IsBatchUpgraded(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) {
set, err := c.GetWorkloadObject()
if err != nil {
return false, err
}
if !util.IsStatefulSetRollingUpdate(set) {
return false, fmt.Errorf("StatefulSet(%v) rollingUpdate configuration is nil, should check it manually", c.namespacedName)
}
partition := util.GetStatefulSetPartition(set)
return partition <= stableReplicasGoal, nil
}
func (c *StatefulSetLikeController) IsBatchReady(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) {
workloadInfo, err := c.GetWorkloadInfo()
if err != nil {
@ -198,7 +191,7 @@ func (c *StatefulSetLikeController) IsBatchReady(canaryReplicasGoal, stableRepli
return secondCheckPointReady(), nil
}
func (c *StatefulSetLikeController) listOwnedPods() ([]*v1.Pod, error) {
func (c *StatefulSetLikeController) ListOwnedPods() ([]*v1.Pod, error) {
if c.pods != nil {
return c.pods, nil
}
@ -206,37 +199,18 @@ func (c *StatefulSetLikeController) listOwnedPods() ([]*v1.Pod, error) {
if err != nil {
return nil, err
}
selector, err := util.ParseSelectorFrom(set)
if err != nil || selector == nil {
return nil, err
}
podLister := &v1.PodList{}
err = c.List(context.TODO(), podLister, &client.ListOptions{LabelSelector: selector}, utilclient.DisableDeepCopy)
if err != nil {
return nil, err
}
c.pods = make([]*v1.Pod, 0)
for i := range podLister.Items {
pod := &podLister.Items[i]
if !pod.DeletionTimestamp.IsZero() {
continue
}
owner := metav1.GetControllerOf(pod)
if owner == nil || owner.UID != set.GetUID() {
continue
}
c.pods = append(c.pods, pod)
}
return c.pods, nil
c.pods, err = util.ListOwnedPods(c.Client, set)
return c.pods, err
}
func (c *StatefulSetLikeController) countUpdatedReadyPods(updateRevision string) (int32, error) {
pods, err := c.listOwnedPods()
pods, err := c.ListOwnedPods()
if err != nil {
return 0, err
}
activePods := util.FilterActivePods(pods)
updatedReadyReplicas := int32(0)
for _, pod := range pods {
for _, pod := range activePods {
if util.IsConsistentWithRevision(pod, updateRevision) && util.IsPodReady(pod) {
updatedReadyReplicas++
}
@ -244,3 +218,13 @@ func (c *StatefulSetLikeController) countUpdatedReadyPods(updateRevision string)
klog.V(3).Infof("BatchRelease(%v) observed %d updatedReadyReplicas")
return updatedReadyReplicas, nil
}
func isStatefulSetUpgradedDone(set *unstructured.Unstructured, canaryReplicasGoal, stableReplicasGoal int32) bool {
partition := util.GetStatefulSetPartition(set)
if partition <= stableReplicasGoal {
return true
}
updatedReplicas := util.ParseStatusIntFrom(set, "updatedReplicas")
observedGeneration := util.ParseStatusIntFrom(set, "observedGeneration")
return set.GetGeneration() == observedGeneration && int(updatedReplicas) >= int(canaryReplicasGoal)
}

View File

@ -37,8 +37,8 @@ type UnifiedWorkloadController interface {
ClaimWorkload() (bool, error)
ReleaseWorkload(cleanup bool) (bool, error)
UpgradeBatch(canaryReplicasGoal, stableReplicasGoal int32) (bool, error)
IsBatchUpgraded(canaryReplicasGoal, stableReplicasGoal int32) (bool, error)
IsBatchReady(canaryReplicasGoal, stableReplicasGoal int32) (bool, error)
ListOwnedPods() ([]*v1.Pod, error)
}
// UnifiedWorkloadRolloutControlPlane is responsible for handling rollout StatefulSet type of workloads
@ -154,13 +154,14 @@ func (c *UnifiedWorkloadRolloutControlPlane) UpgradeOneBatch() (bool, error) {
"stable-goal", stableGoal,
"canary-replicas", currentCanaryReplicas)
upgradeDone, err := c.IsBatchUpgraded(canaryGoal, stableGoal)
if err != nil {
isUpgradedDone, err := c.UpgradeBatch(canaryGoal, stableGoal)
if err != nil || !isUpgradedDone {
return false, nil
}
isPatchedDone, err := c.patchPodBatchLabel(workloadInfo, canaryGoal)
if err != nil || !isPatchedDone {
return false, err
} else if !upgradeDone {
if succeed, err := c.UpgradeBatch(canaryGoal, stableGoal); err != nil || !succeed {
return false, nil
}
}
c.recorder.Eventf(c.planController, v1.EventTypeNormal, "SetBatchDone",
@ -313,3 +314,20 @@ func (c *UnifiedWorkloadRolloutControlPlane) RecordWorkloadRevisionAndReplicas()
c.newStatus.UpdateRevision = workloadInfo.Status.UpdateRevision
return nil
}
func (c *UnifiedWorkloadRolloutControlPlane) patchPodBatchLabel(workloadInfo *util.WorkloadInfo, canaryGoal int32) (bool, error) {
rolloutID, exist := c.planController.Labels[util.RolloutIDLabel]
if !exist || rolloutID == "" {
return true, nil
}
pods, err := c.ListOwnedPods()
if err != nil {
klog.Errorf("Failed to list pods for %v", workloadInfo.GVKWithName)
return false, err
}
batchID := c.planController.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.planController.Status.UpdateRevision
return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, client.ObjectKeyFromObject(c.planController))
}

View File

@ -132,11 +132,11 @@ func (r *innerBatchRelease) Promote(index int32, checkReady bool) (bool, error)
klog.Errorf("error getting updated BatchRelease(%s/%s) from client", batch.Namespace, batch.Name)
return err
}
if !batch.Spec.ReleasePlan.Paused && *batch.Spec.ReleasePlan.BatchPartition == index {
if !batch.Spec.Paused && *batch.Spec.ReleasePlan.BatchPartition == index {
return nil
}
batch.Spec.ReleasePlan.BatchPartition = utilpointer.Int32Ptr(index)
batch.Spec.ReleasePlan.Paused = false
batch.Spec.Paused = false
if err := r.Client.Update(context.TODO(), batch); err != nil {
return err
}
@ -322,6 +322,10 @@ func createBatchRelease(rollout *rolloutv1alpha1.Rollout, batchName string) *rol
},
},
}
if rollout.Spec.RolloutID != "" {
br.Labels[util.RolloutIDLabel] = rollout.Spec.RolloutID
}
return br
}

View File

@ -42,27 +42,26 @@ func (r *rolloutContext) runCanary() error {
}
// update canary status
canaryStatus.CanaryReplicas = r.workload.CanaryReplicas
canaryStatus.CanaryReadyReplicas = r.workload.CanaryReadyReplicas
batch, err := r.batchControl.FetchBatchRelease()
if err != nil {
canaryStatus.CanaryReplicas = r.workload.CanaryReplicas
canaryStatus.CanaryReadyReplicas = r.workload.CanaryReadyReplicas
} else {
canaryStatus.CanaryReplicas = batch.Status.CanaryStatus.UpdatedReplicas
canaryStatus.CanaryReadyReplicas = batch.Status.CanaryStatus.UpdatedReadyReplicas
}
switch canaryStatus.CurrentStepState {
case rolloutv1alpha1.CanaryStepStateUpgrade:
klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", r.rollout.Namespace, r.rollout.Name, rolloutv1alpha1.CanaryStepStateUpgrade)
// If the last step is 100%, there is no need to execute the canary process at this time
if r.rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1].Weight == 100 {
klog.Infof("rollout(%s/%s) last step is 100%, there is no need to execute the canary process at this time, and set state=%s",
r.rollout.Namespace, r.rollout.Name, canaryStatus.CurrentStepIndex-1, canaryStatus.CurrentStepIndex, rolloutv1alpha1.CanaryStepStateCompleted)
done, err := r.doCanaryUpgrade()
if err != nil {
return err
} else if done {
canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateTrafficRouting
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateCompleted
} else {
done, err := r.doCanaryUpgrade()
if err != nil {
return err
} else if done {
canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateTrafficRouting
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", r.rollout.Namespace, r.rollout.Name,
canaryStatus.CurrentStepIndex, rolloutv1alpha1.CanaryStepStateUpgrade, canaryStatus.CurrentStepState)
}
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", r.rollout.Namespace, r.rollout.Name,
canaryStatus.CurrentStepIndex, rolloutv1alpha1.CanaryStepStateUpgrade, canaryStatus.CurrentStepState)
}
case rolloutv1alpha1.CanaryStepStateTrafficRouting:
@ -268,6 +267,9 @@ func (r *rolloutContext) removeRolloutStateInWorkload() error {
workloadGVK := schema.FromAPIVersionAndKind(workloadRef.APIVersion, workloadRef.Kind)
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
obj := util.GetEmptyWorkloadObject(workloadGVK)
if obj == nil {
return nil
}
if err := r.Get(context.TODO(), types.NamespacedName{Name: r.workload.Name, Namespace: r.workload.Namespace}, obj); err != nil {
klog.Errorf("getting updated workload(%s.%s) failed: %s", r.workload.Namespace, r.workload.Name, err.Error())
return err

19
pkg/feature/switch.go Normal file
View File

@ -0,0 +1,19 @@
package feature
import "flag"
var (
// If workloadTypeFilterSwitch is true, webhook and controllers
// will filter out unsupported workload type. Currently, rollout
// support Deployment, CloneSet, StatefulSet and Advanced StatefulSet.
// Default to true.
workloadTypeFilterSwitch bool
)
func init() {
flag.BoolVar(&workloadTypeFilterSwitch, "filter-workload-type", true, "filter known workload gvk for rollout controller")
}
func NeedFilterWorkloadType() bool {
return workloadTypeFilterSwitch
}

View File

@ -103,6 +103,7 @@ func (r *ControllerFinder) finders() []ControllerFinderFunc {
}
var (
ControllerKindRS = apps.SchemeGroupVersion.WithKind("ReplicaSet")
ControllerKindDep = apps.SchemeGroupVersion.WithKind("Deployment")
ControllerKindSts = apps.SchemeGroupVersion.WithKind("StatefulSet")
ControllerKruiseKindCS = appsv1alpha1.SchemeGroupVersion.WithKind("CloneSet")

View File

@ -1,10 +1,17 @@
package util
import (
"context"
"fmt"
"strings"
utilclient "github.com/openkruise/rollouts/pkg/util/client"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// IsPodReady returns true if a pod is ready; false otherwise.
@ -57,3 +64,82 @@ func IsConsistentWithRevision(pod *v1.Pod, revision string) bool {
}
return false
}
func FilterActivePods(pods []*v1.Pod) []*v1.Pod {
var activePods []*v1.Pod
for _, pod := range pods {
if pod.DeletionTimestamp.IsZero() {
activePods = append(activePods, pod)
}
}
return activePods
}
func IsCompletedPod(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded
}
func ListOwnedPods(c client.Client, object client.Object) ([]*v1.Pod, error) {
selector, err := parseSelector(object)
if err != nil {
return nil, err
}
podLister := &v1.PodList{}
err = c.List(context.TODO(), podLister, &client.ListOptions{LabelSelector: selector}, utilclient.DisableDeepCopy)
if err != nil {
return nil, err
}
pods := make([]*v1.Pod, 0, len(podLister.Items))
for i := range podLister.Items {
pod := &podLister.Items[i]
if IsCompletedPod(pod) {
continue
}
owner := metav1.GetControllerOf(pod)
if owner == nil || owner.UID != object.GetUID() {
continue
}
pods = append(pods, pod)
}
return pods, nil
}
func PatchPodBatchLabel(c client.Client, pods []*v1.Pod, rolloutID string, batchID int32, updateRevision string, canaryGoal int32, logKey types.NamespacedName) (bool, error) {
// the number of active pods that has been patched successfully.
patchedUpdatedReplicas := int32(0)
for _, pod := range pods {
podRolloutID := pod.Labels[RolloutIDLabel]
if pod.DeletionTimestamp.IsZero() {
// we don't patch label for the active old revision pod
if !IsConsistentWithRevision(pod, updateRevision) {
continue
}
// if it has been patched, count and ignore
if podRolloutID == rolloutID {
patchedUpdatedReplicas++
continue
}
}
// for such terminating pod and others, if it has been patched, just ignore
if podRolloutID == rolloutID {
continue
}
podClone := pod.DeepCopy()
by := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s","%s":"%d"}}}`, RolloutIDLabel, rolloutID, RolloutBatchIDLabel, batchID)
err := c.Patch(context.TODO(), podClone, client.RawPatch(types.StrategicMergePatchType, []byte(by)))
if err != nil {
klog.Errorf("Failed to patch Pod(%v) batchID, err: %v", client.ObjectKeyFromObject(podClone), err)
return false, err
}
if pod.DeletionTimestamp.IsZero() && IsConsistentWithRevision(pod, updateRevision) {
patchedUpdatedReplicas++
}
}
klog.V(3).Infof("Patch %v pods with batchID for batchRelease %v, goal is %d pods", patchedUpdatedReplicas, logKey, canaryGoal)
return patchedUpdatedReplicas >= canaryGoal, nil
}

View File

@ -18,18 +18,15 @@ package util
import (
"encoding/json"
"strconv"
"time"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
kruiseappsv1beta1 "github.com/openkruise/kruise-api/apps/v1beta1"
rolloutv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util/client"
apps "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller"
@ -45,6 +42,13 @@ const (
KruiseRolloutFinalizer = "rollouts.kruise.io/rollout"
// rollout spec hash
RolloutHashAnnotation = "rollouts.kruise.io/hash"
// RolloutIDLabel is designed to distinguish each workload revision publications.
// The value of RolloutIDLabel corresponds Rollout.Spec.RolloutID.
RolloutIDLabel = "apps.kruise.io/rollout-id"
// RolloutBatchIDLabel is the label key of batch id that will be patched to pods during rollout.
// Only when RolloutIDLabel is set, RolloutBatchIDLabel will be patched.
// Users can use RolloutIDLabel and RolloutBatchIDLabel to select the pods that are upgraded in some certain batch and release.
RolloutBatchIDLabel = "apps.kruise.io/rollout-batch-id"
)
// RolloutState is annotation[rollouts.kruise.io/in-progressing] value
@ -97,25 +101,6 @@ func AddWorkloadWatcher(c controller.Controller, handler handler.EventHandler) e
return nil
}
func ReCalculateCanaryStepIndex(rollout *rolloutv1alpha1.Rollout, workloadReplicas, currentReplicas int) int32 {
var stepIndex int32
for i := range rollout.Spec.Strategy.Canary.Steps {
step := rollout.Spec.Strategy.Canary.Steps[i]
var desiredReplicas int
if step.Replicas != nil {
desiredReplicas, _ = intstr.GetScaledValueFromIntOrPercent(step.Replicas, workloadReplicas, true)
} else {
replicas := intstr.FromString(strconv.Itoa(int(step.Weight)) + "%")
desiredReplicas, _ = intstr.GetScaledValueFromIntOrPercent(&replicas, workloadReplicas, true)
}
stepIndex = int32(i + 1)
if currentReplicas <= desiredReplicas {
break
}
}
return stepIndex
}
func DiscoverGVK(gvk schema.GroupVersionKind) bool {
genericClient := client.GetGenericClient()
if genericClient == nil {

View File

@ -29,6 +29,7 @@ import (
"github.com/davecgh/go-spew/spew"
appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/feature"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
@ -60,6 +61,15 @@ const (
alphanums = "bcdfghjklmnpqrstvwxz2456789"
)
var (
knownWorkloadGVKs = []*schema.GroupVersionKind{
&ControllerKindDep,
&ControllerKindSts,
&ControllerKruiseKindCS,
&ControllerKruiseKindSts,
}
)
type WorkloadStatus struct {
Replicas int32
ReadyReplicas int32
@ -153,7 +163,7 @@ func EqualIgnoreHash(template1, template2 *v1.PodTemplateSpec) bool {
}
func HashReleasePlanBatches(releasePlan *v1alpha1.ReleasePlan) string {
by, _ := json.Marshal(releasePlan.Batches)
by, _ := json.Marshal(releasePlan)
md5Hash := sha256.Sum256(by)
return hex.EncodeToString(md5Hash[:])
}
@ -208,6 +218,10 @@ func PatchSpec(c client.Client, object client.Object, spec map[string]interface{
}
func GetEmptyWorkloadObject(gvk schema.GroupVersionKind) client.Object {
if !IsSupportedWorkload(gvk) {
return nil
}
switch gvk.Kind {
case ControllerKindDep.Kind:
return &apps.Deployment{}
@ -316,7 +330,7 @@ func GetStatefulSetMaxUnavailable(object *unstructured.Unstructured) *intstr.Int
func ParseStatefulSetInfo(object *unstructured.Unstructured, namespacedName types.NamespacedName) *WorkloadInfo {
workloadGVKWithName := fmt.Sprintf("%v(%v)", object.GroupVersionKind().String(), namespacedName)
selector, err := ParseSelectorFrom(object)
selector, err := parseSelector(object)
if err != nil {
klog.Errorf("Failed to parse selector for workload(%v)", workloadGVKWithName)
}
@ -428,16 +442,26 @@ func parseMetadataFrom(object *unstructured.Unstructured) *metav1.ObjectMeta {
return meta
}
// ParseSelectorFrom can find labelSelector and parse it as selector for unstructured object
func ParseSelectorFrom(object *unstructured.Unstructured) (labels.Selector, error) {
m, found, err := unstructured.NestedFieldNoCopy(object.Object, "spec", "selector")
if err != nil || !found {
return nil, err
// parseSelector can find labelSelector and parse it as labels.Selector for client object
func parseSelector(object client.Object) (labels.Selector, error) {
switch o := object.(type) {
case *apps.Deployment:
return metav1.LabelSelectorAsSelector(o.Spec.Selector)
case *appsv1alpha1.CloneSet:
return metav1.LabelSelectorAsSelector(o.Spec.Selector)
case *unstructured.Unstructured:
m, found, err := unstructured.NestedFieldNoCopy(o.Object, "spec", "selector")
if err != nil || !found {
return nil, err
}
byteInfo, _ := json.Marshal(m)
labelSelector := &metav1.LabelSelector{}
_ = json.Unmarshal(byteInfo, labelSelector)
return metav1.LabelSelectorAsSelector(labelSelector)
default:
panic("unsupported workload type to ParseSelector function")
}
byteInfo, _ := json.Marshal(m)
labelSelector := &metav1.LabelSelector{}
_ = json.Unmarshal(byteInfo, labelSelector)
return metav1.LabelSelectorAsSelector(labelSelector)
}
func unmarshalIntStr(m interface{}) *intstr.IntOrString {
@ -461,3 +485,43 @@ func GenRandomStr(length int) string {
randStr := rand.String(length)
return rand.SafeEncodeString(randStr)
}
func IsSupportedWorkload(gvk schema.GroupVersionKind) bool {
if !feature.NeedFilterWorkloadType() {
return true
}
for _, known := range knownWorkloadGVKs {
if gvk.Group == known.Group && gvk.Kind == known.Kind {
return true
}
}
return false
}
func GetOwnerWorkload(r client.Reader, object client.Object) (client.Object, error) {
owner := metav1.GetControllerOf(object)
if owner == nil {
return nil, nil
}
ownerGvk := schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind)
ownerKey := types.NamespacedName{Namespace: object.GetNamespace(), Name: owner.Name}
if ownerGvk.Group == ControllerKindRS.Group && ownerGvk.Kind == ControllerKindRS.Kind {
replicaset := &apps.ReplicaSet{}
err := r.Get(context.TODO(), ownerKey, replicaset)
if err != nil {
return nil, err
}
return GetOwnerWorkload(r, replicaset)
}
ownerObj := GetEmptyWorkloadObject(ownerGvk)
if ownerObj == nil {
return nil, nil
}
err := r.Get(context.TODO(), ownerKey, ownerObj)
if err != nil {
return nil, err
}
return ownerObj, nil
}

View File

@ -23,8 +23,10 @@ import (
"reflect"
appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
utilclient "github.com/openkruise/rollouts/pkg/util/client"
addmissionv1 "k8s.io/api/admission/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/validation/field"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -146,6 +148,11 @@ func validateRolloutSpecObjectRef(objectRef *appsv1alpha1.ObjectRef, fldPath *fi
if objectRef.WorkloadRef == nil {
return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), objectRef.WorkloadRef, "WorkloadRef is required")}
}
gvk := schema.FromAPIVersionAndKind(objectRef.WorkloadRef.APIVersion, objectRef.WorkloadRef.Kind)
if !util.IsSupportedWorkload(gvk) {
return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), objectRef.WorkloadRef, "WorkloadRef kind is not supported")}
}
return nil
}

View File

@ -85,7 +85,10 @@ var _ = SIGDescribe("Rollout", func() {
if err != nil {
return err
}
clone.Spec = *object.Spec.DeepCopy()
clone.Spec.Replicas = utilpointer.Int32(*object.Spec.Replicas)
clone.Spec.Template = *object.Spec.Template.DeepCopy()
clone.Labels = mergeMap(clone.Labels, object.Labels)
clone.Annotations = mergeMap(clone.Annotations, object.Annotations)
return k8sClient.Update(context.TODO(), clone)
})).NotTo(HaveOccurred())
@ -100,7 +103,10 @@ var _ = SIGDescribe("Rollout", func() {
if err != nil {
return err
}
clone.Spec = *object.Spec.DeepCopy()
clone.Spec.Replicas = utilpointer.Int32(*object.Spec.Replicas)
clone.Spec.Template = *object.Spec.Template.DeepCopy()
clone.Labels = mergeMap(clone.Labels, object.Labels)
clone.Annotations = mergeMap(clone.Annotations, object.Annotations)
return k8sClient.Update(context.TODO(), clone)
})).NotTo(HaveOccurred())
@ -115,7 +121,10 @@ var _ = SIGDescribe("Rollout", func() {
if err != nil {
return err
}
clone.Spec = *object.Spec.DeepCopy()
clone.Spec.Replicas = utilpointer.Int32(*object.Spec.Replicas)
clone.Spec.Template = *object.Spec.Template.DeepCopy()
clone.Labels = mergeMap(clone.Labels, object.Labels)
clone.Annotations = mergeMap(clone.Annotations, object.Annotations)
return k8sClient.Update(context.TODO(), clone)
})).NotTo(HaveOccurred())
@ -130,7 +139,10 @@ var _ = SIGDescribe("Rollout", func() {
if err != nil {
return err
}
clone.Spec = *object.Spec.DeepCopy()
clone.Spec.Replicas = utilpointer.Int32(*object.Spec.Replicas)
clone.Spec.Template = *object.Spec.Template.DeepCopy()
clone.Labels = mergeMap(clone.Labels, object.Labels)
clone.Annotations = mergeMap(clone.Annotations, object.Annotations)
return k8sClient.Update(context.TODO(), clone)
})).NotTo(HaveOccurred())
@ -274,10 +286,10 @@ var _ = SIGDescribe("Rollout", func() {
return &canaryList.Items[0], nil
}
GetPodsOfDeployment := func(obj *apps.Deployment) ([]*v1.Pod, error) {
ListPods := func(namespace string, labelSelector *metav1.LabelSelector) ([]*v1.Pod, error) {
appList := &v1.PodList{}
selector, _ := metav1.LabelSelectorAsSelector(obj.Spec.Selector)
err := k8sClient.List(context.TODO(), appList, &client.ListOptions{Namespace: obj.Namespace, LabelSelector: selector})
selector, _ := metav1.LabelSelectorAsSelector(labelSelector)
err := k8sClient.List(context.TODO(), appList, &client.ListOptions{Namespace: namespace, LabelSelector: selector})
if err != nil {
return nil, err
}
@ -291,6 +303,20 @@ var _ = SIGDescribe("Rollout", func() {
return apps, nil
}
CheckPodBatchLabel := func(namespace string, labelSelector *metav1.LabelSelector, rolloutID, batchID string, expected int) {
pods, err := ListPods(namespace, labelSelector)
Expect(err).NotTo(HaveOccurred())
count := 0
for _, pod := range pods {
if pod.Labels[util.RolloutIDLabel] == rolloutID &&
pod.Labels[util.RolloutBatchIDLabel] == batchID {
count++
}
}
Expect(count).Should(BeNumerically("==", expected))
}
BeforeEach(func() {
namespace = randomNamespaceName("rollout")
ns := v1.Namespace{
@ -650,7 +676,7 @@ var _ = SIGDescribe("Rollout", func() {
Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred())
CreateObject(workload)
WaitDeploymentAllPodsReady(workload)
pods, err := GetPodsOfDeployment(workload)
pods, err := ListPods(workload.Name, workload.Spec.Selector)
Expect(err).NotTo(HaveOccurred())
appNames := make(map[string]struct{})
for _, app := range pods {
@ -717,7 +743,7 @@ var _ = SIGDescribe("Rollout", func() {
}
}
// deployment pods not changed
cpods, err := GetPodsOfDeployment(workload)
cpods, err := ListPods(workload.Name, workload.Spec.Selector)
Expect(err).NotTo(HaveOccurred())
cappNames := make(map[string]struct{})
for _, pod := range cpods {
@ -911,7 +937,9 @@ var _ = SIGDescribe("Rollout", func() {
},
{
Weight: 100,
Pause: rolloutsv1alpha1.RolloutPause{},
Pause: rolloutsv1alpha1.RolloutPause{
Duration: utilpointer.Int32(0),
},
},
}
CreateObject(rollout)
@ -1040,7 +1068,9 @@ var _ = SIGDescribe("Rollout", func() {
},
{
Weight: 100,
Pause: rolloutsv1alpha1.RolloutPause{},
Pause: rolloutsv1alpha1.RolloutPause{
Duration: utilpointer.Int32(0),
},
},
}
CreateObject(rollout)
@ -1173,7 +1203,9 @@ var _ = SIGDescribe("Rollout", func() {
},
{
Weight: 100,
Pause: rolloutsv1alpha1.RolloutPause{},
Pause: rolloutsv1alpha1.RolloutPause{
Duration: utilpointer.Int32(0),
},
},
}
CreateObject(rollout)
@ -3099,6 +3131,271 @@ var _ = SIGDescribe("Rollout", func() {
WaitRolloutWorkloadGenration(rollout.Name, workload.Generation)
})
})
KruiseDescribe("Rolout Patch pod batch ID", func() {
It("Normal Case", func() {
By("Creating Rollout...")
rollout := &rolloutsv1alpha1.Rollout{}
Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred())
rollout.Spec.ObjectRef.WorkloadRef = &rolloutsv1alpha1.WorkloadRef{
APIVersion: "apps.kruise.io/v1beta1",
Kind: "StatefulSet",
Name: "echoserver",
}
rollout.Spec.Strategy.Canary.TrafficRoutings = nil
rollout.Spec.RolloutID = "1"
CreateObject(rollout)
By("Creating workload and waiting for all pods ready...")
// headless-service
headlessService := &v1.Service{}
Expect(ReadYamlToObject("./test_data/rollout/headless_service.yaml", headlessService)).ToNot(HaveOccurred())
CreateObject(headlessService)
// service
service := &v1.Service{}
Expect(ReadYamlToObject("./test_data/rollout/service.yaml", service)).ToNot(HaveOccurred())
CreateObject(service)
// ingress
ingress := &netv1.Ingress{}
Expect(ReadYamlToObject("./test_data/rollout/nginx_ingress.yaml", ingress)).ToNot(HaveOccurred())
CreateObject(ingress)
// workload
workload := &appsv1beta1.StatefulSet{}
Expect(ReadYamlToObject("./test_data/rollout/advanced_statefulset.yaml", workload)).ToNot(HaveOccurred())
CreateObject(workload)
WaitAdvancedStatefulSetPodsReady(workload)
By("Update statefulset env NODE_NAME from(version1) -> to(version2)")
newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"})
workload.Spec.Template.Spec.Containers[0].Env = newEnvs
UpdateAdvancedStatefulSet(workload)
// wait step 1 complete
By("wait step(1) pause")
WaitRolloutCanaryStepPaused(rollout.Name, 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1)
// resume rollout canary
ResumeRolloutCanary(rollout.Name)
By("resume rollout, and wait next step(2)")
WaitRolloutCanaryStepPaused(rollout.Name, 2)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 1)
// resume rollout
ResumeRolloutCanary(rollout.Name)
By("check rollout canary status success, resume rollout, and wait rollout canary complete")
WaitRolloutStatusPhase(rollout.Name, rolloutsv1alpha1.RolloutPhaseHealthy)
WaitAdvancedStatefulSetPodsReady(workload)
// check batch id after rollout
By("rollout completed, and check pod batch label")
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "3", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "4", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "5", 1)
})
It("Scaling Case", func() {
By("Creating Rollout...")
rollout := &rolloutsv1alpha1.Rollout{}
Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred())
rollout.Spec.ObjectRef.WorkloadRef = &rolloutsv1alpha1.WorkloadRef{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: "echoserver",
}
rollout.Spec.Strategy.Canary.Steps = []rolloutsv1alpha1.CanaryStep{
{
Weight: 20,
Pause: rolloutsv1alpha1.RolloutPause{
Duration: utilpointer.Int32(10),
},
},
{
Weight: 40,
Pause: rolloutsv1alpha1.RolloutPause{},
},
{
Weight: 60,
Pause: rolloutsv1alpha1.RolloutPause{
Duration: utilpointer.Int32(10),
},
},
{
Weight: 100,
Pause: rolloutsv1alpha1.RolloutPause{
Duration: utilpointer.Int32(10),
},
},
}
rollout.Spec.RolloutID = "1"
CreateObject(rollout)
By("Creating workload and waiting for all pods ready...")
// service
service := &v1.Service{}
Expect(ReadYamlToObject("./test_data/rollout/service.yaml", service)).ToNot(HaveOccurred())
CreateObject(service)
// ingress
ingress := &netv1.Ingress{}
Expect(ReadYamlToObject("./test_data/rollout/nginx_ingress.yaml", ingress)).ToNot(HaveOccurred())
CreateObject(ingress)
// workload
workload := &appsv1alpha1.CloneSet{}
Expect(ReadYamlToObject("./test_data/rollout/cloneset.yaml", workload)).ToNot(HaveOccurred())
CreateObject(workload)
WaitCloneSetAllPodsReady(workload)
// v1 -> v2, start rollout action
By("Update cloneset env NODE_NAME from(version1) -> to(version2)")
newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"})
workload.Spec.Template.Spec.Containers[0].Env = newEnvs
UpdateCloneSet(workload)
time.Sleep(time.Second * 2)
// wait step 2 complete
By("wait step(2) pause")
WaitRolloutCanaryStepPaused(rollout.Name, 2)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 1)
// scale up replicas, 5 -> 10
By("scaling up CloneSet from 5 -> 10")
Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred())
workload.Spec.Replicas = utilpointer.Int32(10)
UpdateCloneSet(workload)
Eventually(func() int32 {
object := &appsv1alpha1.CloneSet{}
Expect(GetObject(workload.Name, object)).NotTo(HaveOccurred())
return object.Status.UpdatedReplicas
}, 5*time.Minute, time.Second).Should(Equal(int32(4)))
// check pod batch label after scale
By("check pod batch label after scale")
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 3)
// resume rollout canary
By("check rollout canary status success, resume rollout, and wait rollout canary complete")
ResumeRolloutCanary(rollout.Name)
WaitRolloutStatusPhase(rollout.Name, rolloutsv1alpha1.RolloutPhaseHealthy)
WaitCloneSetAllPodsReady(workload)
By("rollout completed, and check pod batch label")
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 3)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "3", 2)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "4", 4)
})
/*
It("Rollback Case", func() {
By("Creating Rollout...")
rollout := &rolloutsv1alpha1.Rollout{}
Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred())
rollout.Spec.ObjectRef.WorkloadRef = &rolloutsv1alpha1.WorkloadRef{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: "echoserver",
}
rollout.Spec.Strategy.Canary.TrafficRoutings = nil
rollout.Annotations = map[string]string{
util.RollbackInBatchAnnotation: "true",
}
rollout.Spec.Strategy.Canary.Steps = []rolloutsv1alpha1.CanaryStep{
{
Weight: 20,
Pause: rolloutsv1alpha1.RolloutPause{},
},
{
Weight: 40,
Pause: rolloutsv1alpha1.RolloutPause{},
},
{
Weight: 60,
Pause: rolloutsv1alpha1.RolloutPause{},
},
{
Weight: 80,
Pause: rolloutsv1alpha1.RolloutPause{},
},
{
Weight: 100,
Pause: rolloutsv1alpha1.RolloutPause{
Duration: utilpointer.Int32(0),
},
},
}
CreateObject(rollout)
By("Creating workload and waiting for all pods ready...")
workload := &appsv1alpha1.CloneSet{}
Expect(ReadYamlToObject("./test_data/rollout/cloneset.yaml", workload)).ToNot(HaveOccurred())
CreateObject(workload)
WaitCloneSetAllPodsReady(workload)
// v1 -> v2, start rollout action
By("Update cloneSet env NODE_NAME from(version1) -> to(version2)")
workload.Labels[util.RolloutIDLabel] = "1"
newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"})
workload.Spec.Template.Spec.Containers[0].Env = newEnvs
UpdateCloneSet(workload)
By("wait step(1) pause")
WaitRolloutCanaryStepPaused(rollout.Name, 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1)
By("wait step(2) pause")
ResumeRolloutCanary(rollout.Name)
WaitRolloutCanaryStepPaused(rollout.Name, 2)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 1)
By("wait step(3) pause")
ResumeRolloutCanary(rollout.Name)
WaitRolloutCanaryStepPaused(rollout.Name, 3)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "3", 1)
By("Update cloneSet env NODE_NAME from(version2) -> to(version1)")
Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred())
workload.Labels[util.RolloutIDLabel] = "2"
newEnvs = mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version1"})
workload.Spec.Template.Spec.Containers[0].Env = newEnvs
UpdateCloneSet(workload)
time.Sleep(10 * time.Second)
// make sure disable quickly rollback policy
By("Wait step (1) paused")
WaitRolloutCanaryStepPaused(rollout.Name, 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "1", 1)
By("wait step(2) pause")
ResumeRolloutCanary(rollout.Name)
WaitRolloutCanaryStepPaused(rollout.Name, 2)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "2", 1)
By("wait step(3) pause")
ResumeRolloutCanary(rollout.Name)
WaitRolloutCanaryStepPaused(rollout.Name, 3)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "3", 1)
By("wait step(4) pause")
ResumeRolloutCanary(rollout.Name)
WaitRolloutCanaryStepPaused(rollout.Name, 4)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "4", 1)
By("Wait rollout complete")
ResumeRolloutCanary(rollout.Name)
WaitRolloutStatusPhase(rollout.Name, rolloutsv1alpha1.RolloutPhaseHealthy)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "1", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "2", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "3", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "4", 1)
CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "5", 1)
})
*/
})
})
func mergeEnvVar(original []v1.EnvVar, add v1.EnvVar) []v1.EnvVar {
@ -3112,3 +3409,10 @@ func mergeEnvVar(original []v1.EnvVar, add v1.EnvVar) []v1.EnvVar {
newEnvs = append(newEnvs, add)
return newEnvs
}
func mergeMap(dst, patch map[string]string) map[string]string {
for k1, v1 := range patch {
dst[k1] = v1
}
return dst
}

View File

@ -20,6 +20,7 @@ spec:
- weight: 80
pause: {duration: 10}
- weight: 100
pause: {duration: 0}
trafficRoutings:
- service: echoserver
type: nginx