diff --git a/api/v1alpha1/batchrelease_plan_types.go b/api/v1alpha1/batchrelease_plan_types.go index fe820d1..4bd45ab 100644 --- a/api/v1alpha1/batchrelease_plan_types.go +++ b/api/v1alpha1/batchrelease_plan_types.go @@ -42,10 +42,6 @@ type ReleasePlan struct { // BatchPartition start from 0. // +optional BatchPartition *int32 `json:"batchPartition,omitempty"` - // Paused the rollout, the release progress will be paused util paused is false. - // default is false - // +optional - Paused bool `json:"paused,omitempty"` } // ReleaseBatch is used to describe how each batch release should be @@ -82,7 +78,7 @@ type BatchReleaseStatus struct { // newest canary Deployment. // +optional CollisionCount *int32 `json:"collisionCount,omitempty"` - // ObservedReleasePlanHash is a hash code of observed itself releasePlan.Batches. + // ObservedReleasePlanHash is a hash code of observed itself spec.releasePlan. ObservedReleasePlanHash string `json:"observedReleasePlanHash,omitempty"` // Phase is the release plan phase, which indicates the current state of release // plan state machine in BatchRelease controller. diff --git a/api/v1alpha1/batchrelease_types.go b/api/v1alpha1/batchrelease_types.go index 3233704..9981b8d 100644 --- a/api/v1alpha1/batchrelease_types.go +++ b/api/v1alpha1/batchrelease_types.go @@ -44,6 +44,10 @@ type BatchReleaseSpec struct { TargetRef ObjectRef `json:"targetReference"` // ReleasePlan is the details on how to rollout the resources ReleasePlan ReleasePlan `json:"releasePlan"` + // Paused the rollout, the release progress will be paused util paused is false. + // default is false + // +optional + Paused bool `json:"paused,omitempty"` } type DeploymentReleaseStrategyType string diff --git a/config/crd/bases/rollouts.kruise.io_batchreleases.yaml b/config/crd/bases/rollouts.kruise.io_batchreleases.yaml index 86e1977..2ac5887 100644 --- a/config/crd/bases/rollouts.kruise.io_batchreleases.yaml +++ b/config/crd/bases/rollouts.kruise.io_batchreleases.yaml @@ -52,6 +52,10 @@ spec: description: BatchReleaseSpec defines how to describe an update between different compRevision properties: + paused: + description: Paused the rollout, the release progress will be paused + util paused is false. default is false + type: boolean releasePlan: description: ReleasePlan is the details on how to rollout the resources properties: @@ -93,10 +97,6 @@ spec: - canaryReplicas type: object type: array - paused: - description: Paused the rollout, the release progress will be - paused util paused is false. default is false - type: boolean type: object targetReference: description: TargetRef contains the GVK and name of the workload that @@ -211,7 +211,7 @@ spec: type: integer observedReleasePlanHash: description: ObservedReleasePlanHash is a hash code of observed itself - releasePlan.Batches. + spec.releasePlan. type: string observedWorkloadReplicas: description: ObservedWorkloadReplicas is observed replicas of target diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 79adfe7..b8bdeb5 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -48,9 +48,9 @@ spec: resources: limits: cpu: 100m - memory: 30Mi + memory: 100Mi requests: cpu: 100m - memory: 20Mi + memory: 100Mi serviceAccountName: controller-manager terminationGracePeriodSeconds: 10 diff --git a/pkg/controller/batchrelease/batchrelease_event_handler.go b/pkg/controller/batchrelease/batchrelease_event_handler.go index cd43671..eae12b8 100644 --- a/pkg/controller/batchrelease/batchrelease_event_handler.go +++ b/pkg/controller/batchrelease/batchrelease_event_handler.go @@ -54,14 +54,22 @@ type podEventHandler struct { } func (p podEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + pod, ok := evt.Object.(*corev1.Pod) + if !ok { + return + } + p.enqueue(pod, q) } func (p podEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { } func (p podEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { } func (p podEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { - oldPod := evt.ObjectOld.(*corev1.Pod) - newPod := evt.ObjectNew.(*corev1.Pod) + oldPod, oldOK := evt.ObjectOld.(*corev1.Pod) + newPod, newOK := evt.ObjectNew.(*corev1.Pod) + if !oldOK || !newOK { + return + } if oldPod.ResourceVersion == newPod.ResourceVersion || util.IsPodReady(oldPod) == util.IsPodReady(newPod) { return } @@ -79,9 +87,9 @@ func (p podEventHandler) enqueue(pod *corev1.Pod, q workqueue.RateLimitingInterf Name: owner.Name, Namespace: pod.Namespace, } workloadGVK := schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind) - workloadObj := util.GetEmptyWorkloadObject(workloadGVK) - err := p.Get(context.TODO(), workloadNamespacedName, workloadObj) - if err != nil { + workloadObj, err := util.GetOwnerWorkload(p.Reader, pod) + if err != nil || workloadObj == nil { + klog.Errorf("Failed to get owner workload for pod %v, err: %v", client.ObjectKeyFromObject(pod), err) return } diff --git a/pkg/controller/batchrelease/batchrelease_plan_executor.go b/pkg/controller/batchrelease/batchrelease_plan_executor.go index 84831ba..0dbf74a 100644 --- a/pkg/controller/batchrelease/batchrelease_plan_executor.go +++ b/pkg/controller/batchrelease/batchrelease_plan_executor.go @@ -24,6 +24,7 @@ import ( appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" "github.com/openkruise/rollouts/api/v1alpha1" "github.com/openkruise/rollouts/pkg/controller/batchrelease/workloads" + "github.com/openkruise/rollouts/pkg/util" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -261,6 +262,12 @@ func (r *Executor) GetWorkloadController() (workloads.WorkloadController, error) } gvk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind) + if !util.IsSupportedWorkload(gvk) { + message := fmt.Sprintf("the workload type '%v' is not supported", gvk) + r.recorder.Event(r.release, v1.EventTypeWarning, "UnsupportedWorkload", message) + return nil, fmt.Errorf(message) + } + targetKey := types.NamespacedName{ Namespace: r.release.Namespace, Name: targetRef.Name, diff --git a/pkg/controller/batchrelease/batchrelease_special_cases_handler.go b/pkg/controller/batchrelease/batchrelease_special_cases_handler.go index f4c4364..0f982b1 100644 --- a/pkg/controller/batchrelease/batchrelease_special_cases_handler.go +++ b/pkg/controller/batchrelease/batchrelease_special_cases_handler.go @@ -57,7 +57,7 @@ func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadContr message = "Release plan is deleted or cancelled, then terminate" signalTerminating(r.releaseStatus) - case isPlanPaused(workloadEvent, r.releasePlan, r.releaseStatus): + case isPlanPaused(workloadEvent, r.release, r.releaseStatus): // handle the case that releasePlan.paused = true reason = "PlanPaused" message = "release plan is paused, then stop reconcile" @@ -184,8 +184,8 @@ func isPlanUnhealthy(plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseSt return int(status.CanaryStatus.CurrentBatch) >= len(plan.Batches) && status.Phase == v1alpha1.RolloutPhaseProgressing } -func isPlanPaused(event workloads.WorkloadEventType, plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseStatus) bool { - return plan.Paused && status.Phase == v1alpha1.RolloutPhaseProgressing && !isWorkloadGone(event, status) +func isPlanPaused(event workloads.WorkloadEventType, release *v1alpha1.BatchRelease, status *v1alpha1.BatchReleaseStatus) bool { + return release.Spec.Paused && status.Phase == v1alpha1.RolloutPhaseProgressing && !isWorkloadGone(event, status) } func isGetWorkloadInfoError(err error) bool { diff --git a/pkg/controller/batchrelease/workloads/cloneset_control_plane.go b/pkg/controller/batchrelease/workloads/cloneset_control_plane.go index 004bb54..f3ae2fb 100644 --- a/pkg/controller/batchrelease/workloads/cloneset_control_plane.go +++ b/pkg/controller/batchrelease/workloads/cloneset_control_plane.go @@ -170,6 +170,12 @@ func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) { } } + // patch current batch label to pods + patchDone, err := c.patchPodBatchLabel(canaryGoal) + if !patchDone || err != nil { + return false, err + } + c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "SetBatchDone", "Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch) return true, nil @@ -330,3 +336,20 @@ func (c *CloneSetRolloutController) recordCloneSetRevisionAndReplicas() { c.releaseStatus.StableRevision = c.clone.Status.CurrentRevision c.releaseStatus.UpdateRevision = c.clone.Status.UpdateRevision } + +func (c *CloneSetRolloutController) patchPodBatchLabel(canaryGoal int32) (bool, error) { + rolloutID, exist := c.parentController.Labels[util.RolloutIDLabel] + if !exist || rolloutID == "" { + return true, nil + } + + pods, err := util.ListOwnedPods(c.client, c.clone) + if err != nil { + klog.Errorf("Failed to list pods for CloneSet %v", c.targetNamespacedName) + return false, err + } + + batchID := c.parentController.Status.CanaryStatus.CurrentBatch + 1 + updateRevision := c.parentController.Status.UpdateRevision + return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, c.releasePlanKey) +} diff --git a/pkg/controller/batchrelease/workloads/deployment_double_control_plane.go b/pkg/controller/batchrelease/workloads/deployment_double_control_plane.go index 337ecac..2c5d1e3 100644 --- a/pkg/controller/batchrelease/workloads/deployment_double_control_plane.go +++ b/pkg/controller/batchrelease/workloads/deployment_double_control_plane.go @@ -153,6 +153,12 @@ func (c *DeploymentsRolloutController) UpgradeOneBatch() (bool, error) { } } + // patch current batch label to pods + patchDone, err := c.patchPodBatchLabel(canaryGoal) + if !patchDone || err != nil { + return false, err + } + c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "Batch Rollout", "Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch) return true, nil } @@ -364,3 +370,20 @@ func (c *DeploymentsRolloutController) recordDeploymentRevisionAndReplicas() err c.releaseStatus.ObservedWorkloadReplicas = *c.stable.Spec.Replicas return nil } + +func (c *DeploymentsRolloutController) patchPodBatchLabel(canaryGoal int32) (bool, error) { + rolloutID, exist := c.parentController.Labels[util.RolloutIDLabel] + if !exist || rolloutID == "" || c.canary == nil { + return true, nil + } + + pods, err := util.ListOwnedPods(c.client, c.canary) + if err != nil { + klog.Errorf("Failed to list pods for Deployment %v", c.stableNamespacedName) + return false, err + } + + batchID := c.parentController.Status.CanaryStatus.CurrentBatch + 1 + updateRevision := c.parentController.Status.UpdateRevision + return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, c.releaseKey) +} diff --git a/pkg/controller/batchrelease/workloads/statefulset_like_controller.go b/pkg/controller/batchrelease/workloads/statefulset_like_controller.go index 96d950e..e6fb279 100644 --- a/pkg/controller/batchrelease/workloads/statefulset_like_controller.go +++ b/pkg/controller/batchrelease/workloads/statefulset_like_controller.go @@ -18,14 +18,12 @@ package workloads import ( "context" - "fmt" appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" "github.com/openkruise/rollouts/pkg/util" - utilclient "github.com/openkruise/rollouts/pkg/util/client" + apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -75,6 +73,14 @@ func (c *StatefulSetLikeController) GetWorkloadInfo() (*util.WorkloadInfo, error workloadInfo := util.ParseStatefulSetInfo(set, c.namespacedName) workloadInfo.Paused = true + if workloadInfo.Status.UpdatedReadyReplicas <= 0 { + updatedReadyReplicas, err := c.countUpdatedReadyPods(workloadInfo.Status.UpdateRevision) + if err != nil { + return nil, err + } + workloadInfo.Status.UpdatedReadyReplicas = updatedReadyReplicas + } + return workloadInfo, nil } @@ -85,6 +91,7 @@ func (c *StatefulSetLikeController) ClaimWorkload() (bool, error) { } err = util.ClaimWorkload(c.Client, c.planController, set, map[string]interface{}{ + "type": apps.RollingUpdateStatefulSetStrategyType, "rollingUpdate": map[string]interface{}{ "partition": pointer.Int32(util.ParseReplicasFrom(set)), }, @@ -122,9 +129,9 @@ func (c *StatefulSetLikeController) UpgradeBatch(canaryReplicasGoal, stableRepli return false, err } - observedReplicas := canaryReplicasGoal + stableReplicasGoal - if observedReplicas != util.ParseReplicasFrom(set) { - return false, fmt.Errorf("StatefulSet(%v) scaled, should handle scale event first", c.namespacedName) + // if no needs to patch partition + if isStatefulSetUpgradedDone(set, canaryReplicasGoal, stableReplicasGoal) { + return true, nil } err = util.PatchSpec(c.Client, set, map[string]interface{}{ @@ -142,20 +149,6 @@ func (c *StatefulSetLikeController) UpgradeBatch(canaryReplicasGoal, stableRepli return true, nil } -func (c *StatefulSetLikeController) IsBatchUpgraded(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) { - set, err := c.GetWorkloadObject() - if err != nil { - return false, err - } - - if !util.IsStatefulSetRollingUpdate(set) { - return false, fmt.Errorf("StatefulSet(%v) rollingUpdate configuration is nil, should check it manually", c.namespacedName) - } - - partition := util.GetStatefulSetPartition(set) - return partition <= stableReplicasGoal, nil -} - func (c *StatefulSetLikeController) IsBatchReady(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) { workloadInfo, err := c.GetWorkloadInfo() if err != nil { @@ -198,7 +191,7 @@ func (c *StatefulSetLikeController) IsBatchReady(canaryReplicasGoal, stableRepli return secondCheckPointReady(), nil } -func (c *StatefulSetLikeController) listOwnedPods() ([]*v1.Pod, error) { +func (c *StatefulSetLikeController) ListOwnedPods() ([]*v1.Pod, error) { if c.pods != nil { return c.pods, nil } @@ -206,37 +199,18 @@ func (c *StatefulSetLikeController) listOwnedPods() ([]*v1.Pod, error) { if err != nil { return nil, err } - selector, err := util.ParseSelectorFrom(set) - if err != nil || selector == nil { - return nil, err - } - podLister := &v1.PodList{} - err = c.List(context.TODO(), podLister, &client.ListOptions{LabelSelector: selector}, utilclient.DisableDeepCopy) - if err != nil { - return nil, err - } - c.pods = make([]*v1.Pod, 0) - for i := range podLister.Items { - pod := &podLister.Items[i] - if !pod.DeletionTimestamp.IsZero() { - continue - } - owner := metav1.GetControllerOf(pod) - if owner == nil || owner.UID != set.GetUID() { - continue - } - c.pods = append(c.pods, pod) - } - return c.pods, nil + c.pods, err = util.ListOwnedPods(c.Client, set) + return c.pods, err } func (c *StatefulSetLikeController) countUpdatedReadyPods(updateRevision string) (int32, error) { - pods, err := c.listOwnedPods() + pods, err := c.ListOwnedPods() if err != nil { return 0, err } + activePods := util.FilterActivePods(pods) updatedReadyReplicas := int32(0) - for _, pod := range pods { + for _, pod := range activePods { if util.IsConsistentWithRevision(pod, updateRevision) && util.IsPodReady(pod) { updatedReadyReplicas++ } @@ -244,3 +218,13 @@ func (c *StatefulSetLikeController) countUpdatedReadyPods(updateRevision string) klog.V(3).Infof("BatchRelease(%v) observed %d updatedReadyReplicas") return updatedReadyReplicas, nil } + +func isStatefulSetUpgradedDone(set *unstructured.Unstructured, canaryReplicasGoal, stableReplicasGoal int32) bool { + partition := util.GetStatefulSetPartition(set) + if partition <= stableReplicasGoal { + return true + } + updatedReplicas := util.ParseStatusIntFrom(set, "updatedReplicas") + observedGeneration := util.ParseStatusIntFrom(set, "observedGeneration") + return set.GetGeneration() == observedGeneration && int(updatedReplicas) >= int(canaryReplicasGoal) +} diff --git a/pkg/controller/batchrelease/workloads/unified_workload_control_plane.go b/pkg/controller/batchrelease/workloads/unified_workload_control_plane.go index 7fc2c30..a79df3c 100644 --- a/pkg/controller/batchrelease/workloads/unified_workload_control_plane.go +++ b/pkg/controller/batchrelease/workloads/unified_workload_control_plane.go @@ -37,8 +37,8 @@ type UnifiedWorkloadController interface { ClaimWorkload() (bool, error) ReleaseWorkload(cleanup bool) (bool, error) UpgradeBatch(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) - IsBatchUpgraded(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) IsBatchReady(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) + ListOwnedPods() ([]*v1.Pod, error) } // UnifiedWorkloadRolloutControlPlane is responsible for handling rollout StatefulSet type of workloads @@ -154,13 +154,14 @@ func (c *UnifiedWorkloadRolloutControlPlane) UpgradeOneBatch() (bool, error) { "stable-goal", stableGoal, "canary-replicas", currentCanaryReplicas) - upgradeDone, err := c.IsBatchUpgraded(canaryGoal, stableGoal) - if err != nil { + isUpgradedDone, err := c.UpgradeBatch(canaryGoal, stableGoal) + if err != nil || !isUpgradedDone { + return false, nil + } + + isPatchedDone, err := c.patchPodBatchLabel(workloadInfo, canaryGoal) + if err != nil || !isPatchedDone { return false, err - } else if !upgradeDone { - if succeed, err := c.UpgradeBatch(canaryGoal, stableGoal); err != nil || !succeed { - return false, nil - } } c.recorder.Eventf(c.planController, v1.EventTypeNormal, "SetBatchDone", @@ -313,3 +314,20 @@ func (c *UnifiedWorkloadRolloutControlPlane) RecordWorkloadRevisionAndReplicas() c.newStatus.UpdateRevision = workloadInfo.Status.UpdateRevision return nil } + +func (c *UnifiedWorkloadRolloutControlPlane) patchPodBatchLabel(workloadInfo *util.WorkloadInfo, canaryGoal int32) (bool, error) { + rolloutID, exist := c.planController.Labels[util.RolloutIDLabel] + if !exist || rolloutID == "" { + return true, nil + } + + pods, err := c.ListOwnedPods() + if err != nil { + klog.Errorf("Failed to list pods for %v", workloadInfo.GVKWithName) + return false, err + } + + batchID := c.planController.Status.CanaryStatus.CurrentBatch + 1 + updateRevision := c.planController.Status.UpdateRevision + return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, client.ObjectKeyFromObject(c.planController)) +} diff --git a/pkg/controller/rollout/batchrelease/inner_batchrelease.go b/pkg/controller/rollout/batchrelease/inner_batchrelease.go index d81e1f0..0b2d911 100644 --- a/pkg/controller/rollout/batchrelease/inner_batchrelease.go +++ b/pkg/controller/rollout/batchrelease/inner_batchrelease.go @@ -132,11 +132,11 @@ func (r *innerBatchRelease) Promote(index int32, checkReady bool) (bool, error) klog.Errorf("error getting updated BatchRelease(%s/%s) from client", batch.Namespace, batch.Name) return err } - if !batch.Spec.ReleasePlan.Paused && *batch.Spec.ReleasePlan.BatchPartition == index { + if !batch.Spec.Paused && *batch.Spec.ReleasePlan.BatchPartition == index { return nil } batch.Spec.ReleasePlan.BatchPartition = utilpointer.Int32Ptr(index) - batch.Spec.ReleasePlan.Paused = false + batch.Spec.Paused = false if err := r.Client.Update(context.TODO(), batch); err != nil { return err } @@ -322,6 +322,10 @@ func createBatchRelease(rollout *rolloutv1alpha1.Rollout, batchName string) *rol }, }, } + + if rollout.Spec.RolloutID != "" { + br.Labels[util.RolloutIDLabel] = rollout.Spec.RolloutID + } return br } diff --git a/pkg/controller/rollout/canary.go b/pkg/controller/rollout/canary.go index 5795f6b..7ff8382 100644 --- a/pkg/controller/rollout/canary.go +++ b/pkg/controller/rollout/canary.go @@ -42,27 +42,26 @@ func (r *rolloutContext) runCanary() error { } // update canary status - canaryStatus.CanaryReplicas = r.workload.CanaryReplicas - canaryStatus.CanaryReadyReplicas = r.workload.CanaryReadyReplicas + batch, err := r.batchControl.FetchBatchRelease() + if err != nil { + canaryStatus.CanaryReplicas = r.workload.CanaryReplicas + canaryStatus.CanaryReadyReplicas = r.workload.CanaryReadyReplicas + } else { + canaryStatus.CanaryReplicas = batch.Status.CanaryStatus.UpdatedReplicas + canaryStatus.CanaryReadyReplicas = batch.Status.CanaryStatus.UpdatedReadyReplicas + } + switch canaryStatus.CurrentStepState { case rolloutv1alpha1.CanaryStepStateUpgrade: klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", r.rollout.Namespace, r.rollout.Name, rolloutv1alpha1.CanaryStepStateUpgrade) - // If the last step is 100%, there is no need to execute the canary process at this time - if r.rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1].Weight == 100 { - klog.Infof("rollout(%s/%s) last step is 100%, there is no need to execute the canary process at this time, and set state=%s", - r.rollout.Namespace, r.rollout.Name, canaryStatus.CurrentStepIndex-1, canaryStatus.CurrentStepIndex, rolloutv1alpha1.CanaryStepStateCompleted) + done, err := r.doCanaryUpgrade() + if err != nil { + return err + } else if done { + canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateTrafficRouting canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()} - canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateCompleted - } else { - done, err := r.doCanaryUpgrade() - if err != nil { - return err - } else if done { - canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateTrafficRouting - canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()} - klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", r.rollout.Namespace, r.rollout.Name, - canaryStatus.CurrentStepIndex, rolloutv1alpha1.CanaryStepStateUpgrade, canaryStatus.CurrentStepState) - } + klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", r.rollout.Namespace, r.rollout.Name, + canaryStatus.CurrentStepIndex, rolloutv1alpha1.CanaryStepStateUpgrade, canaryStatus.CurrentStepState) } case rolloutv1alpha1.CanaryStepStateTrafficRouting: @@ -268,6 +267,9 @@ func (r *rolloutContext) removeRolloutStateInWorkload() error { workloadGVK := schema.FromAPIVersionAndKind(workloadRef.APIVersion, workloadRef.Kind) err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { obj := util.GetEmptyWorkloadObject(workloadGVK) + if obj == nil { + return nil + } if err := r.Get(context.TODO(), types.NamespacedName{Name: r.workload.Name, Namespace: r.workload.Namespace}, obj); err != nil { klog.Errorf("getting updated workload(%s.%s) failed: %s", r.workload.Namespace, r.workload.Name, err.Error()) return err diff --git a/pkg/feature/switch.go b/pkg/feature/switch.go new file mode 100644 index 0000000..364b394 --- /dev/null +++ b/pkg/feature/switch.go @@ -0,0 +1,19 @@ +package feature + +import "flag" + +var ( + // If workloadTypeFilterSwitch is true, webhook and controllers + // will filter out unsupported workload type. Currently, rollout + // support Deployment, CloneSet, StatefulSet and Advanced StatefulSet. + // Default to true. + workloadTypeFilterSwitch bool +) + +func init() { + flag.BoolVar(&workloadTypeFilterSwitch, "filter-workload-type", true, "filter known workload gvk for rollout controller") +} + +func NeedFilterWorkloadType() bool { + return workloadTypeFilterSwitch +} diff --git a/pkg/util/controller_finder.go b/pkg/util/controller_finder.go index 312cc49..fb82db2 100644 --- a/pkg/util/controller_finder.go +++ b/pkg/util/controller_finder.go @@ -103,6 +103,7 @@ func (r *ControllerFinder) finders() []ControllerFinderFunc { } var ( + ControllerKindRS = apps.SchemeGroupVersion.WithKind("ReplicaSet") ControllerKindDep = apps.SchemeGroupVersion.WithKind("Deployment") ControllerKindSts = apps.SchemeGroupVersion.WithKind("StatefulSet") ControllerKruiseKindCS = appsv1alpha1.SchemeGroupVersion.WithKind("CloneSet") diff --git a/pkg/util/pod_utils.go b/pkg/util/pod_utils.go index 74607a9..b435dcd 100644 --- a/pkg/util/pod_utils.go +++ b/pkg/util/pod_utils.go @@ -1,10 +1,17 @@ package util import ( + "context" + "fmt" "strings" + utilclient "github.com/openkruise/rollouts/pkg/util/client" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" ) // IsPodReady returns true if a pod is ready; false otherwise. @@ -57,3 +64,82 @@ func IsConsistentWithRevision(pod *v1.Pod, revision string) bool { } return false } + +func FilterActivePods(pods []*v1.Pod) []*v1.Pod { + var activePods []*v1.Pod + for _, pod := range pods { + if pod.DeletionTimestamp.IsZero() { + activePods = append(activePods, pod) + } + } + return activePods +} + +func IsCompletedPod(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded +} + +func ListOwnedPods(c client.Client, object client.Object) ([]*v1.Pod, error) { + selector, err := parseSelector(object) + if err != nil { + return nil, err + } + + podLister := &v1.PodList{} + err = c.List(context.TODO(), podLister, &client.ListOptions{LabelSelector: selector}, utilclient.DisableDeepCopy) + if err != nil { + return nil, err + } + pods := make([]*v1.Pod, 0, len(podLister.Items)) + for i := range podLister.Items { + pod := &podLister.Items[i] + if IsCompletedPod(pod) { + continue + } + owner := metav1.GetControllerOf(pod) + if owner == nil || owner.UID != object.GetUID() { + continue + } + pods = append(pods, pod) + } + return pods, nil +} + +func PatchPodBatchLabel(c client.Client, pods []*v1.Pod, rolloutID string, batchID int32, updateRevision string, canaryGoal int32, logKey types.NamespacedName) (bool, error) { + // the number of active pods that has been patched successfully. + patchedUpdatedReplicas := int32(0) + for _, pod := range pods { + podRolloutID := pod.Labels[RolloutIDLabel] + if pod.DeletionTimestamp.IsZero() { + // we don't patch label for the active old revision pod + if !IsConsistentWithRevision(pod, updateRevision) { + continue + } + // if it has been patched, count and ignore + if podRolloutID == rolloutID { + patchedUpdatedReplicas++ + continue + } + } + + // for such terminating pod and others, if it has been patched, just ignore + if podRolloutID == rolloutID { + continue + } + + podClone := pod.DeepCopy() + by := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s","%s":"%d"}}}`, RolloutIDLabel, rolloutID, RolloutBatchIDLabel, batchID) + err := c.Patch(context.TODO(), podClone, client.RawPatch(types.StrategicMergePatchType, []byte(by))) + if err != nil { + klog.Errorf("Failed to patch Pod(%v) batchID, err: %v", client.ObjectKeyFromObject(podClone), err) + return false, err + } + + if pod.DeletionTimestamp.IsZero() && IsConsistentWithRevision(pod, updateRevision) { + patchedUpdatedReplicas++ + } + } + + klog.V(3).Infof("Patch %v pods with batchID for batchRelease %v, goal is %d pods", patchedUpdatedReplicas, logKey, canaryGoal) + return patchedUpdatedReplicas >= canaryGoal, nil +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 74bb1ce..65b1e8f 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -18,18 +18,15 @@ package util import ( "encoding/json" - "strconv" "time" kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" kruiseappsv1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" - rolloutv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" "github.com/openkruise/rollouts/pkg/util/client" apps "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -45,6 +42,13 @@ const ( KruiseRolloutFinalizer = "rollouts.kruise.io/rollout" // rollout spec hash RolloutHashAnnotation = "rollouts.kruise.io/hash" + // RolloutIDLabel is designed to distinguish each workload revision publications. + // The value of RolloutIDLabel corresponds Rollout.Spec.RolloutID. + RolloutIDLabel = "apps.kruise.io/rollout-id" + // RolloutBatchIDLabel is the label key of batch id that will be patched to pods during rollout. + // Only when RolloutIDLabel is set, RolloutBatchIDLabel will be patched. + // Users can use RolloutIDLabel and RolloutBatchIDLabel to select the pods that are upgraded in some certain batch and release. + RolloutBatchIDLabel = "apps.kruise.io/rollout-batch-id" ) // RolloutState is annotation[rollouts.kruise.io/in-progressing] value @@ -97,25 +101,6 @@ func AddWorkloadWatcher(c controller.Controller, handler handler.EventHandler) e return nil } -func ReCalculateCanaryStepIndex(rollout *rolloutv1alpha1.Rollout, workloadReplicas, currentReplicas int) int32 { - var stepIndex int32 - for i := range rollout.Spec.Strategy.Canary.Steps { - step := rollout.Spec.Strategy.Canary.Steps[i] - var desiredReplicas int - if step.Replicas != nil { - desiredReplicas, _ = intstr.GetScaledValueFromIntOrPercent(step.Replicas, workloadReplicas, true) - } else { - replicas := intstr.FromString(strconv.Itoa(int(step.Weight)) + "%") - desiredReplicas, _ = intstr.GetScaledValueFromIntOrPercent(&replicas, workloadReplicas, true) - } - stepIndex = int32(i + 1) - if currentReplicas <= desiredReplicas { - break - } - } - return stepIndex -} - func DiscoverGVK(gvk schema.GroupVersionKind) bool { genericClient := client.GetGenericClient() if genericClient == nil { diff --git a/pkg/util/workloads_utils.go b/pkg/util/workloads_utils.go index 1e37049..831c2e2 100644 --- a/pkg/util/workloads_utils.go +++ b/pkg/util/workloads_utils.go @@ -29,6 +29,7 @@ import ( "github.com/davecgh/go-spew/spew" appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" "github.com/openkruise/rollouts/api/v1alpha1" + "github.com/openkruise/rollouts/pkg/feature" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" @@ -60,6 +61,15 @@ const ( alphanums = "bcdfghjklmnpqrstvwxz2456789" ) +var ( + knownWorkloadGVKs = []*schema.GroupVersionKind{ + &ControllerKindDep, + &ControllerKindSts, + &ControllerKruiseKindCS, + &ControllerKruiseKindSts, + } +) + type WorkloadStatus struct { Replicas int32 ReadyReplicas int32 @@ -153,7 +163,7 @@ func EqualIgnoreHash(template1, template2 *v1.PodTemplateSpec) bool { } func HashReleasePlanBatches(releasePlan *v1alpha1.ReleasePlan) string { - by, _ := json.Marshal(releasePlan.Batches) + by, _ := json.Marshal(releasePlan) md5Hash := sha256.Sum256(by) return hex.EncodeToString(md5Hash[:]) } @@ -208,6 +218,10 @@ func PatchSpec(c client.Client, object client.Object, spec map[string]interface{ } func GetEmptyWorkloadObject(gvk schema.GroupVersionKind) client.Object { + if !IsSupportedWorkload(gvk) { + return nil + } + switch gvk.Kind { case ControllerKindDep.Kind: return &apps.Deployment{} @@ -316,7 +330,7 @@ func GetStatefulSetMaxUnavailable(object *unstructured.Unstructured) *intstr.Int func ParseStatefulSetInfo(object *unstructured.Unstructured, namespacedName types.NamespacedName) *WorkloadInfo { workloadGVKWithName := fmt.Sprintf("%v(%v)", object.GroupVersionKind().String(), namespacedName) - selector, err := ParseSelectorFrom(object) + selector, err := parseSelector(object) if err != nil { klog.Errorf("Failed to parse selector for workload(%v)", workloadGVKWithName) } @@ -428,16 +442,26 @@ func parseMetadataFrom(object *unstructured.Unstructured) *metav1.ObjectMeta { return meta } -// ParseSelectorFrom can find labelSelector and parse it as selector for unstructured object -func ParseSelectorFrom(object *unstructured.Unstructured) (labels.Selector, error) { - m, found, err := unstructured.NestedFieldNoCopy(object.Object, "spec", "selector") - if err != nil || !found { - return nil, err +// parseSelector can find labelSelector and parse it as labels.Selector for client object +func parseSelector(object client.Object) (labels.Selector, error) { + switch o := object.(type) { + case *apps.Deployment: + return metav1.LabelSelectorAsSelector(o.Spec.Selector) + case *appsv1alpha1.CloneSet: + return metav1.LabelSelectorAsSelector(o.Spec.Selector) + case *unstructured.Unstructured: + m, found, err := unstructured.NestedFieldNoCopy(o.Object, "spec", "selector") + if err != nil || !found { + return nil, err + } + byteInfo, _ := json.Marshal(m) + labelSelector := &metav1.LabelSelector{} + _ = json.Unmarshal(byteInfo, labelSelector) + return metav1.LabelSelectorAsSelector(labelSelector) + default: + panic("unsupported workload type to ParseSelector function") } - byteInfo, _ := json.Marshal(m) - labelSelector := &metav1.LabelSelector{} - _ = json.Unmarshal(byteInfo, labelSelector) - return metav1.LabelSelectorAsSelector(labelSelector) + } func unmarshalIntStr(m interface{}) *intstr.IntOrString { @@ -461,3 +485,43 @@ func GenRandomStr(length int) string { randStr := rand.String(length) return rand.SafeEncodeString(randStr) } + +func IsSupportedWorkload(gvk schema.GroupVersionKind) bool { + if !feature.NeedFilterWorkloadType() { + return true + } + for _, known := range knownWorkloadGVKs { + if gvk.Group == known.Group && gvk.Kind == known.Kind { + return true + } + } + return false +} + +func GetOwnerWorkload(r client.Reader, object client.Object) (client.Object, error) { + owner := metav1.GetControllerOf(object) + if owner == nil { + return nil, nil + } + + ownerGvk := schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind) + ownerKey := types.NamespacedName{Namespace: object.GetNamespace(), Name: owner.Name} + if ownerGvk.Group == ControllerKindRS.Group && ownerGvk.Kind == ControllerKindRS.Kind { + replicaset := &apps.ReplicaSet{} + err := r.Get(context.TODO(), ownerKey, replicaset) + if err != nil { + return nil, err + } + return GetOwnerWorkload(r, replicaset) + } + + ownerObj := GetEmptyWorkloadObject(ownerGvk) + if ownerObj == nil { + return nil, nil + } + err := r.Get(context.TODO(), ownerKey, ownerObj) + if err != nil { + return nil, err + } + return ownerObj, nil +} diff --git a/pkg/webhook/rollout/validating/rollout_create_update_handler.go b/pkg/webhook/rollout/validating/rollout_create_update_handler.go index 6a3766e..c95b19a 100644 --- a/pkg/webhook/rollout/validating/rollout_create_update_handler.go +++ b/pkg/webhook/rollout/validating/rollout_create_update_handler.go @@ -23,8 +23,10 @@ import ( "reflect" appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" + "github.com/openkruise/rollouts/pkg/util" utilclient "github.com/openkruise/rollouts/pkg/util/client" addmissionv1 "k8s.io/api/admission/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/validation/field" "sigs.k8s.io/controller-runtime/pkg/client" @@ -146,6 +148,11 @@ func validateRolloutSpecObjectRef(objectRef *appsv1alpha1.ObjectRef, fldPath *fi if objectRef.WorkloadRef == nil { return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), objectRef.WorkloadRef, "WorkloadRef is required")} } + + gvk := schema.FromAPIVersionAndKind(objectRef.WorkloadRef.APIVersion, objectRef.WorkloadRef.Kind) + if !util.IsSupportedWorkload(gvk) { + return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), objectRef.WorkloadRef, "WorkloadRef kind is not supported")} + } return nil } diff --git a/test/e2e/rollout_test.go b/test/e2e/rollout_test.go index 2aba257..b96da8a 100644 --- a/test/e2e/rollout_test.go +++ b/test/e2e/rollout_test.go @@ -85,7 +85,10 @@ var _ = SIGDescribe("Rollout", func() { if err != nil { return err } - clone.Spec = *object.Spec.DeepCopy() + clone.Spec.Replicas = utilpointer.Int32(*object.Spec.Replicas) + clone.Spec.Template = *object.Spec.Template.DeepCopy() + clone.Labels = mergeMap(clone.Labels, object.Labels) + clone.Annotations = mergeMap(clone.Annotations, object.Annotations) return k8sClient.Update(context.TODO(), clone) })).NotTo(HaveOccurred()) @@ -100,7 +103,10 @@ var _ = SIGDescribe("Rollout", func() { if err != nil { return err } - clone.Spec = *object.Spec.DeepCopy() + clone.Spec.Replicas = utilpointer.Int32(*object.Spec.Replicas) + clone.Spec.Template = *object.Spec.Template.DeepCopy() + clone.Labels = mergeMap(clone.Labels, object.Labels) + clone.Annotations = mergeMap(clone.Annotations, object.Annotations) return k8sClient.Update(context.TODO(), clone) })).NotTo(HaveOccurred()) @@ -115,7 +121,10 @@ var _ = SIGDescribe("Rollout", func() { if err != nil { return err } - clone.Spec = *object.Spec.DeepCopy() + clone.Spec.Replicas = utilpointer.Int32(*object.Spec.Replicas) + clone.Spec.Template = *object.Spec.Template.DeepCopy() + clone.Labels = mergeMap(clone.Labels, object.Labels) + clone.Annotations = mergeMap(clone.Annotations, object.Annotations) return k8sClient.Update(context.TODO(), clone) })).NotTo(HaveOccurred()) @@ -130,7 +139,10 @@ var _ = SIGDescribe("Rollout", func() { if err != nil { return err } - clone.Spec = *object.Spec.DeepCopy() + clone.Spec.Replicas = utilpointer.Int32(*object.Spec.Replicas) + clone.Spec.Template = *object.Spec.Template.DeepCopy() + clone.Labels = mergeMap(clone.Labels, object.Labels) + clone.Annotations = mergeMap(clone.Annotations, object.Annotations) return k8sClient.Update(context.TODO(), clone) })).NotTo(HaveOccurred()) @@ -274,10 +286,10 @@ var _ = SIGDescribe("Rollout", func() { return &canaryList.Items[0], nil } - GetPodsOfDeployment := func(obj *apps.Deployment) ([]*v1.Pod, error) { + ListPods := func(namespace string, labelSelector *metav1.LabelSelector) ([]*v1.Pod, error) { appList := &v1.PodList{} - selector, _ := metav1.LabelSelectorAsSelector(obj.Spec.Selector) - err := k8sClient.List(context.TODO(), appList, &client.ListOptions{Namespace: obj.Namespace, LabelSelector: selector}) + selector, _ := metav1.LabelSelectorAsSelector(labelSelector) + err := k8sClient.List(context.TODO(), appList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}) if err != nil { return nil, err } @@ -291,6 +303,20 @@ var _ = SIGDescribe("Rollout", func() { return apps, nil } + CheckPodBatchLabel := func(namespace string, labelSelector *metav1.LabelSelector, rolloutID, batchID string, expected int) { + pods, err := ListPods(namespace, labelSelector) + Expect(err).NotTo(HaveOccurred()) + + count := 0 + for _, pod := range pods { + if pod.Labels[util.RolloutIDLabel] == rolloutID && + pod.Labels[util.RolloutBatchIDLabel] == batchID { + count++ + } + } + Expect(count).Should(BeNumerically("==", expected)) + } + BeforeEach(func() { namespace = randomNamespaceName("rollout") ns := v1.Namespace{ @@ -650,7 +676,7 @@ var _ = SIGDescribe("Rollout", func() { Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) CreateObject(workload) WaitDeploymentAllPodsReady(workload) - pods, err := GetPodsOfDeployment(workload) + pods, err := ListPods(workload.Name, workload.Spec.Selector) Expect(err).NotTo(HaveOccurred()) appNames := make(map[string]struct{}) for _, app := range pods { @@ -717,7 +743,7 @@ var _ = SIGDescribe("Rollout", func() { } } // deployment pods not changed - cpods, err := GetPodsOfDeployment(workload) + cpods, err := ListPods(workload.Name, workload.Spec.Selector) Expect(err).NotTo(HaveOccurred()) cappNames := make(map[string]struct{}) for _, pod := range cpods { @@ -911,7 +937,9 @@ var _ = SIGDescribe("Rollout", func() { }, { Weight: 100, - Pause: rolloutsv1alpha1.RolloutPause{}, + Pause: rolloutsv1alpha1.RolloutPause{ + Duration: utilpointer.Int32(0), + }, }, } CreateObject(rollout) @@ -1040,7 +1068,9 @@ var _ = SIGDescribe("Rollout", func() { }, { Weight: 100, - Pause: rolloutsv1alpha1.RolloutPause{}, + Pause: rolloutsv1alpha1.RolloutPause{ + Duration: utilpointer.Int32(0), + }, }, } CreateObject(rollout) @@ -1173,7 +1203,9 @@ var _ = SIGDescribe("Rollout", func() { }, { Weight: 100, - Pause: rolloutsv1alpha1.RolloutPause{}, + Pause: rolloutsv1alpha1.RolloutPause{ + Duration: utilpointer.Int32(0), + }, }, } CreateObject(rollout) @@ -3099,6 +3131,271 @@ var _ = SIGDescribe("Rollout", func() { WaitRolloutWorkloadGenration(rollout.Name, workload.Generation) }) }) + + KruiseDescribe("Rolout Patch pod batch ID", func() { + It("Normal Case", func() { + By("Creating Rollout...") + rollout := &rolloutsv1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Spec.ObjectRef.WorkloadRef = &rolloutsv1alpha1.WorkloadRef{ + APIVersion: "apps.kruise.io/v1beta1", + Kind: "StatefulSet", + Name: "echoserver", + } + rollout.Spec.Strategy.Canary.TrafficRoutings = nil + rollout.Spec.RolloutID = "1" + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + // headless-service + headlessService := &v1.Service{} + Expect(ReadYamlToObject("./test_data/rollout/headless_service.yaml", headlessService)).ToNot(HaveOccurred()) + CreateObject(headlessService) + // service + service := &v1.Service{} + Expect(ReadYamlToObject("./test_data/rollout/service.yaml", service)).ToNot(HaveOccurred()) + CreateObject(service) + // ingress + ingress := &netv1.Ingress{} + Expect(ReadYamlToObject("./test_data/rollout/nginx_ingress.yaml", ingress)).ToNot(HaveOccurred()) + CreateObject(ingress) + // workload + workload := &appsv1beta1.StatefulSet{} + Expect(ReadYamlToObject("./test_data/rollout/advanced_statefulset.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitAdvancedStatefulSetPodsReady(workload) + + By("Update statefulset env NODE_NAME from(version1) -> to(version2)") + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateAdvancedStatefulSet(workload) + + // wait step 1 complete + By("wait step(1) pause") + WaitRolloutCanaryStepPaused(rollout.Name, 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1) + + // resume rollout canary + ResumeRolloutCanary(rollout.Name) + By("resume rollout, and wait next step(2)") + WaitRolloutCanaryStepPaused(rollout.Name, 2) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 1) + + // resume rollout + ResumeRolloutCanary(rollout.Name) + By("check rollout canary status success, resume rollout, and wait rollout canary complete") + WaitRolloutStatusPhase(rollout.Name, rolloutsv1alpha1.RolloutPhaseHealthy) + WaitAdvancedStatefulSetPodsReady(workload) + + // check batch id after rollout + By("rollout completed, and check pod batch label") + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "3", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "4", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "5", 1) + }) + + It("Scaling Case", func() { + By("Creating Rollout...") + rollout := &rolloutsv1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Spec.ObjectRef.WorkloadRef = &rolloutsv1alpha1.WorkloadRef{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "CloneSet", + Name: "echoserver", + } + rollout.Spec.Strategy.Canary.Steps = []rolloutsv1alpha1.CanaryStep{ + { + Weight: 20, + Pause: rolloutsv1alpha1.RolloutPause{ + Duration: utilpointer.Int32(10), + }, + }, + { + Weight: 40, + Pause: rolloutsv1alpha1.RolloutPause{}, + }, + { + Weight: 60, + Pause: rolloutsv1alpha1.RolloutPause{ + Duration: utilpointer.Int32(10), + }, + }, + { + Weight: 100, + Pause: rolloutsv1alpha1.RolloutPause{ + Duration: utilpointer.Int32(10), + }, + }, + } + rollout.Spec.RolloutID = "1" + CreateObject(rollout) + By("Creating workload and waiting for all pods ready...") + // service + service := &v1.Service{} + Expect(ReadYamlToObject("./test_data/rollout/service.yaml", service)).ToNot(HaveOccurred()) + CreateObject(service) + // ingress + ingress := &netv1.Ingress{} + Expect(ReadYamlToObject("./test_data/rollout/nginx_ingress.yaml", ingress)).ToNot(HaveOccurred()) + CreateObject(ingress) + // workload + workload := &appsv1alpha1.CloneSet{} + Expect(ReadYamlToObject("./test_data/rollout/cloneset.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitCloneSetAllPodsReady(workload) + + // v1 -> v2, start rollout action + By("Update cloneset env NODE_NAME from(version1) -> to(version2)") + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateCloneSet(workload) + time.Sleep(time.Second * 2) + + // wait step 2 complete + By("wait step(2) pause") + WaitRolloutCanaryStepPaused(rollout.Name, 2) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 1) + + // scale up replicas, 5 -> 10 + By("scaling up CloneSet from 5 -> 10") + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + workload.Spec.Replicas = utilpointer.Int32(10) + UpdateCloneSet(workload) + Eventually(func() int32 { + object := &appsv1alpha1.CloneSet{} + Expect(GetObject(workload.Name, object)).NotTo(HaveOccurred()) + return object.Status.UpdatedReplicas + }, 5*time.Minute, time.Second).Should(Equal(int32(4))) + + // check pod batch label after scale + By("check pod batch label after scale") + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 3) + + // resume rollout canary + By("check rollout canary status success, resume rollout, and wait rollout canary complete") + ResumeRolloutCanary(rollout.Name) + WaitRolloutStatusPhase(rollout.Name, rolloutsv1alpha1.RolloutPhaseHealthy) + WaitCloneSetAllPodsReady(workload) + + By("rollout completed, and check pod batch label") + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 3) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "3", 2) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "4", 4) + }) + + /* + It("Rollback Case", func() { + By("Creating Rollout...") + rollout := &rolloutsv1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Spec.ObjectRef.WorkloadRef = &rolloutsv1alpha1.WorkloadRef{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "CloneSet", + Name: "echoserver", + } + rollout.Spec.Strategy.Canary.TrafficRoutings = nil + rollout.Annotations = map[string]string{ + util.RollbackInBatchAnnotation: "true", + } + rollout.Spec.Strategy.Canary.Steps = []rolloutsv1alpha1.CanaryStep{ + { + Weight: 20, + Pause: rolloutsv1alpha1.RolloutPause{}, + }, + { + Weight: 40, + Pause: rolloutsv1alpha1.RolloutPause{}, + }, + { + Weight: 60, + Pause: rolloutsv1alpha1.RolloutPause{}, + }, + { + Weight: 80, + Pause: rolloutsv1alpha1.RolloutPause{}, + }, + { + Weight: 100, + Pause: rolloutsv1alpha1.RolloutPause{ + Duration: utilpointer.Int32(0), + }, + }, + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + workload := &appsv1alpha1.CloneSet{} + Expect(ReadYamlToObject("./test_data/rollout/cloneset.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitCloneSetAllPodsReady(workload) + + // v1 -> v2, start rollout action + By("Update cloneSet env NODE_NAME from(version1) -> to(version2)") + workload.Labels[util.RolloutIDLabel] = "1" + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateCloneSet(workload) + + By("wait step(1) pause") + WaitRolloutCanaryStepPaused(rollout.Name, 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1) + + By("wait step(2) pause") + ResumeRolloutCanary(rollout.Name) + WaitRolloutCanaryStepPaused(rollout.Name, 2) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 1) + + By("wait step(3) pause") + ResumeRolloutCanary(rollout.Name) + WaitRolloutCanaryStepPaused(rollout.Name, 3) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "3", 1) + + By("Update cloneSet env NODE_NAME from(version2) -> to(version1)") + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + workload.Labels[util.RolloutIDLabel] = "2" + newEnvs = mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version1"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateCloneSet(workload) + time.Sleep(10 * time.Second) + + // make sure disable quickly rollback policy + By("Wait step (1) paused") + WaitRolloutCanaryStepPaused(rollout.Name, 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "1", 1) + + By("wait step(2) pause") + ResumeRolloutCanary(rollout.Name) + WaitRolloutCanaryStepPaused(rollout.Name, 2) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "2", 1) + + By("wait step(3) pause") + ResumeRolloutCanary(rollout.Name) + WaitRolloutCanaryStepPaused(rollout.Name, 3) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "3", 1) + + By("wait step(4) pause") + ResumeRolloutCanary(rollout.Name) + WaitRolloutCanaryStepPaused(rollout.Name, 4) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "4", 1) + + By("Wait rollout complete") + ResumeRolloutCanary(rollout.Name) + WaitRolloutStatusPhase(rollout.Name, rolloutsv1alpha1.RolloutPhaseHealthy) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "1", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "2", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "3", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "4", 1) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "5", 1) + }) + */ + }) }) func mergeEnvVar(original []v1.EnvVar, add v1.EnvVar) []v1.EnvVar { @@ -3112,3 +3409,10 @@ func mergeEnvVar(original []v1.EnvVar, add v1.EnvVar) []v1.EnvVar { newEnvs = append(newEnvs, add) return newEnvs } + +func mergeMap(dst, patch map[string]string) map[string]string { + for k1, v1 := range patch { + dst[k1] = v1 + } + return dst +} diff --git a/test/e2e/test_data/rollout/rollout_canary_base.yaml b/test/e2e/test_data/rollout/rollout_canary_base.yaml index e08a01a..67a47f2 100644 --- a/test/e2e/test_data/rollout/rollout_canary_base.yaml +++ b/test/e2e/test_data/rollout/rollout_canary_base.yaml @@ -20,6 +20,7 @@ spec: - weight: 80 pause: {duration: 10} - weight: 100 + pause: {duration: 0} trafficRoutings: - service: echoserver type: nginx