improve code implementation (#35)

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>

Co-authored-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
This commit is contained in:
Wei-Xiang Sun 2022-05-25 17:06:43 +08:00 committed by GitHub
parent 15d5a77260
commit e557a759b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 425 additions and 611 deletions

View File

@ -21,6 +21,7 @@ import (
"os"
kruisev1aplphal "github.com/openkruise/kruise-api/apps/v1alpha1"
kruisev1beta "github.com/openkruise/kruise-api/apps/v1beta1"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
br "github.com/openkruise/rollouts/pkg/controller/batchrelease"
"github.com/openkruise/rollouts/pkg/controller/rollout"
@ -48,6 +49,7 @@ var (
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(kruisev1aplphal.AddToScheme(scheme))
utilruntime.Must(kruisev1beta.AddToScheme(scheme))
utilruntime.Must(rolloutsv1alpha1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
@ -92,6 +94,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("rollout-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Rollout")
os.Exit(1)
}
if err = br.Add(mgr); err != nil {

View File

@ -97,7 +97,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}
if util.DiscoverGVK(util.CloneSetGVK) {
if util.DiscoverGVK(util.ControllerKruiseKindCS) {
// Watch changes to CloneSet
err = c.Watch(&source.Kind{Type: &kruiseappsv1alpha1.CloneSet{}}, &workloadEventHandler{Reader: mgr.GetCache()})
if err != nil {
@ -162,7 +162,10 @@ func (r *BatchReleaseReconciler) Reconcile(ctx context.Context, req ctrl.Request
// executor start to execute the batch release plan.
startTimestamp := time.Now()
result, currentStatus := r.executor.Do()
result, currentStatus, err := r.executor.Do()
if err != nil {
return reconcile.Result{}, err
}
defer func() {
klog.InfoS("Finished one round of reconciling release plan",

View File

@ -22,10 +22,11 @@ import (
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/controller/batchrelease/workloads"
"github.com/openkruise/rollouts/pkg/util"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
@ -43,11 +44,6 @@ const (
DeleteEventAction EventAction = "Delete"
)
var (
controllerKruiseKindCS = kruiseappsv1alpha1.SchemeGroupVersion.WithKind("CloneSet")
controllerKindDep = appsv1.SchemeGroupVersion.WithKind("Deployment")
)
var _ handler.EventHandler = &workloadEventHandler{}
type workloadEventHandler struct {
@ -59,90 +55,30 @@ func (w workloadEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimi
}
func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
var oldAccessor, newAccessor *workloads.WorkloadInfo
var gvk schema.GroupVersionKind
switch evt.ObjectNew.(type) {
case *kruiseappsv1alpha1.CloneSet:
gvk = controllerKruiseKindCS
oldClone := evt.ObjectOld.(*kruiseappsv1alpha1.CloneSet)
newClone := evt.ObjectNew.(*kruiseappsv1alpha1.CloneSet)
var oldReplicas, newReplicas int32
if oldClone.Spec.Replicas != nil {
oldReplicas = *oldClone.Spec.Replicas
}
if newClone.Spec.Replicas != nil {
newReplicas = *newClone.Spec.Replicas
}
oldAccessor = &workloads.WorkloadInfo{
Replicas: &oldReplicas,
Paused: oldClone.Spec.UpdateStrategy.Paused,
Status: &workloads.WorkloadStatus{
Replicas: oldClone.Status.Replicas,
ReadyReplicas: oldClone.Status.ReadyReplicas,
UpdatedReplicas: oldClone.Status.UpdatedReplicas,
UpdatedReadyReplicas: oldClone.Status.UpdatedReadyReplicas,
ObservedGeneration: oldClone.Status.ObservedGeneration,
},
Metadata: &oldClone.ObjectMeta,
}
newAccessor = &workloads.WorkloadInfo{
Replicas: &newReplicas,
Paused: newClone.Spec.UpdateStrategy.Paused,
Status: &workloads.WorkloadStatus{
Replicas: newClone.Status.Replicas,
ReadyReplicas: newClone.Status.ReadyReplicas,
UpdatedReplicas: newClone.Status.UpdatedReplicas,
UpdatedReadyReplicas: newClone.Status.UpdatedReadyReplicas,
ObservedGeneration: newClone.Status.ObservedGeneration,
},
Metadata: &newClone.ObjectMeta,
}
case *appsv1.Deployment:
gvk = controllerKindDep
oldDeploy := evt.ObjectOld.(*appsv1.Deployment)
newDeploy := evt.ObjectNew.(*appsv1.Deployment)
var oldReplicas, newReplicas int32
if oldDeploy.Spec.Replicas != nil {
oldReplicas = *oldDeploy.Spec.Replicas
}
if newDeploy.Spec.Replicas != nil {
newReplicas = *newDeploy.Spec.Replicas
}
oldAccessor = &workloads.WorkloadInfo{
Replicas: &oldReplicas,
Paused: oldDeploy.Spec.Paused,
Status: &workloads.WorkloadStatus{
Replicas: oldDeploy.Status.Replicas,
ReadyReplicas: oldDeploy.Status.AvailableReplicas,
UpdatedReplicas: oldDeploy.Status.UpdatedReplicas,
ObservedGeneration: oldDeploy.Status.ObservedGeneration,
},
Metadata: &oldDeploy.ObjectMeta,
}
newAccessor = &workloads.WorkloadInfo{
Replicas: &newReplicas,
Paused: newDeploy.Spec.Paused,
Status: &workloads.WorkloadStatus{
Replicas: newDeploy.Status.Replicas,
ReadyReplicas: newDeploy.Status.AvailableReplicas,
UpdatedReplicas: newDeploy.Status.UpdatedReplicas,
ObservedGeneration: newDeploy.Status.ObservedGeneration,
},
Metadata: &newDeploy.ObjectMeta,
}
gvk = util.ControllerKindDep
case *kruiseappsv1alpha1.CloneSet:
gvk = util.ControllerKruiseKindCS
case *unstructured.Unstructured:
gvk = evt.ObjectNew.(*unstructured.Unstructured).GroupVersionKind()
default:
return
}
oldObject, err := runtime.DefaultUnstructuredConverter.ToUnstructured(evt.ObjectOld)
if err != nil {
return
}
newObject, err := runtime.DefaultUnstructuredConverter.ToUnstructured(evt.ObjectNew)
if err != nil {
return
}
objectKey := client.ObjectKeyFromObject(evt.ObjectNew)
oldAccessor := util.ParseWorkloadInfo(&unstructured.Unstructured{Object: oldObject}, objectKey)
newAccessor := util.ParseWorkloadInfo(&unstructured.Unstructured{Object: newObject}, objectKey)
if newAccessor.Metadata.ResourceVersion == oldAccessor.Metadata.ResourceVersion {
return
}
@ -157,7 +93,7 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi
Name: newAccessor.Metadata.Name,
}
brNsn, err := w.getBatchRelease(workloadNamespacedName, gvk, newAccessor.Metadata.Annotations[util.BatchReleaseControlAnnotation])
brNsn, err := getBatchRelease(w.Reader, workloadNamespacedName, gvk, newAccessor.Metadata.Annotations[util.BatchReleaseControlAnnotation])
if err != nil {
klog.Errorf("unable to get BatchRelease related with %s (%s/%s), error: %v",
gvk.Kind, workloadNamespacedName.Namespace, workloadNamespacedName.Name, err)
@ -179,26 +115,25 @@ func (w workloadEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimi
func (w workloadEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
}
func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface,
obj client.Object, action EventAction) {
var controlInfo string
func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface, obj client.Object, action EventAction) {
var gvk schema.GroupVersionKind
switch obj.(type) {
case *kruiseappsv1alpha1.CloneSet:
gvk = controllerKruiseKindCS
controlInfo = obj.(*kruiseappsv1alpha1.CloneSet).Annotations[util.BatchReleaseControlAnnotation]
gvk = util.ControllerKruiseKindCS
case *appsv1.Deployment:
gvk = controllerKindDep
controlInfo = obj.(*appsv1.Deployment).Annotations[util.BatchReleaseControlAnnotation]
gvk = util.ControllerKindDep
case *unstructured.Unstructured:
gvk = obj.(*unstructured.Unstructured).GroupVersionKind()
default:
return
}
controlInfo := obj.GetAnnotations()[util.BatchReleaseControlAnnotation]
workloadNamespacedName := types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
brNsn, err := w.getBatchRelease(workloadNamespacedName, gvk, controlInfo)
brNsn, err := getBatchRelease(w.Reader, workloadNamespacedName, gvk, controlInfo)
if err != nil {
klog.Errorf("Unable to get BatchRelease related with %s (%s/%s), err: %v",
gvk.Kind, workloadNamespacedName.Namespace, workloadNamespacedName.Name, err)
@ -211,7 +146,7 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface,
}
}
func (w *workloadEventHandler) getBatchRelease(workloadNamespaceName types.NamespacedName, gvk schema.GroupVersionKind, controlInfo string) (nsn types.NamespacedName, err error) {
func getBatchRelease(c client.Reader, workloadNamespaceName types.NamespacedName, gvk schema.GroupVersionKind, controlInfo string) (nsn types.NamespacedName, err error) {
if len(controlInfo) > 0 {
br := &metav1.OwnerReference{}
err = json.Unmarshal([]byte(controlInfo), br)
@ -228,7 +163,7 @@ func (w *workloadEventHandler) getBatchRelease(workloadNamespaceName types.Names
brList := &v1alpha1.BatchReleaseList{}
listOptions := &client.ListOptions{Namespace: workloadNamespaceName.Namespace}
if err = w.List(context.TODO(), brList, listOptions); err != nil {
if err = c.List(context.TODO(), brList, listOptions); err != nil {
klog.Errorf("List BatchRelease failed: %s", err.Error())
return
}
@ -250,17 +185,17 @@ func (w *workloadEventHandler) getBatchRelease(workloadNamespaceName types.Names
return
}
func observeGenerationChanged(newOne, oldOne *workloads.WorkloadInfo) bool {
func observeGenerationChanged(newOne, oldOne *util.WorkloadInfo) bool {
return newOne.Metadata.Generation != oldOne.Metadata.Generation
}
func observeLatestGeneration(newOne, oldOne *workloads.WorkloadInfo) bool {
func observeLatestGeneration(newOne, oldOne *util.WorkloadInfo) bool {
oldNot := oldOne.Metadata.Generation != oldOne.Status.ObservedGeneration
newDid := newOne.Metadata.Generation == newOne.Status.ObservedGeneration
return oldNot && newDid
}
func observeScaleEventDone(newOne, oldOne *workloads.WorkloadInfo) bool {
func observeScaleEventDone(newOne, oldOne *util.WorkloadInfo) bool {
_, controlled := newOne.Metadata.Annotations[util.BatchReleaseControlAnnotation]
if !controlled {
return false
@ -273,7 +208,7 @@ func observeScaleEventDone(newOne, oldOne *workloads.WorkloadInfo) bool {
return oldScaling && newDone
}
func observeReplicasChanged(newOne, oldOne *workloads.WorkloadInfo) bool {
func observeReplicasChanged(newOne, oldOne *util.WorkloadInfo) bool {
_, controlled := newOne.Metadata.Annotations[util.BatchReleaseControlAnnotation]
if !controlled {
return false
@ -283,5 +218,6 @@ func observeReplicasChanged(newOne, oldOne *workloads.WorkloadInfo) bool {
oldOne.Status.Replicas != newOne.Status.Replicas ||
oldOne.Status.ReadyReplicas != newOne.Status.ReadyReplicas ||
oldOne.Status.UpdatedReplicas != newOne.Status.UpdatedReplicas ||
oldOne.Status.AvailableReplicas != newOne.Status.AvailableReplicas ||
oldOne.Status.UpdatedReadyReplicas != newOne.Status.UpdatedReadyReplicas
}

View File

@ -73,7 +73,7 @@ func (r *Executor) SetReleaseInfo(release *v1alpha1.BatchRelease) {
}
// Do execute the release plan
func (r *Executor) Do() (reconcile.Result, *v1alpha1.BatchReleaseStatus) {
func (r *Executor) Do() (reconcile.Result, *v1alpha1.BatchReleaseStatus, error) {
klog.InfoS("Starting one round of reconciling release plan",
"BatchRelease", client.ObjectKeyFromObject(r.release),
"phase", r.releaseStatus.Phase,
@ -82,18 +82,19 @@ func (r *Executor) Do() (reconcile.Result, *v1alpha1.BatchReleaseStatus) {
workloadController, err := r.GetWorkloadController()
if err != nil || workloadController == nil {
return reconcile.Result{}, r.releaseStatus
return reconcile.Result{}, r.releaseStatus, nil
}
shouldStopThisRound, result := r.checkHealthBeforeExecution(workloadController)
if shouldStopThisRound {
return result, r.releaseStatus
shouldStopThisRound, result, err := r.checkHealthBeforeExecution(workloadController)
if shouldStopThisRound || err != nil {
return result, r.releaseStatus, err
}
return r.executeBatchReleasePlan(workloadController)
}
func (r *Executor) executeBatchReleasePlan(workloadController workloads.WorkloadController) (reconcile.Result, *v1alpha1.BatchReleaseStatus) {
func (r *Executor) executeBatchReleasePlan(workloadController workloads.WorkloadController) (reconcile.Result, *v1alpha1.BatchReleaseStatus, error) {
var err error
status := r.releaseStatus
result := reconcile.Result{}
@ -106,44 +107,50 @@ func (r *Executor) executeBatchReleasePlan(workloadController workloads.Workload
case v1alpha1.RolloutPhaseHealthy:
// verify whether the workload is ready to execute the release plan in this state.
requeue, err := workloadController.VerifyWorkload()
if err != nil {
var verifiedDone bool
verifiedDone, err = workloadController.VerifyWorkload()
switch {
case err != nil:
setCondition(status, v1alpha1.VerifyingBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
} else {
case verifiedDone:
status.Phase = v1alpha1.RolloutPhasePreparing
setCondition(status, v1alpha1.PreparingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is preparing for progress")
}
if requeue {
result = reconcile.Result{RequeueAfter: DefaultDuration}
setCondition(status, v1alpha1.PreparingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is preparing for progress")
}
case v1alpha1.RolloutPhasePreparing:
// prepare and initialize something before progressing in this state.
done, err := workloadController.PrepareBeforeProgress()
var preparedDone bool
preparedDone, err = workloadController.PrepareBeforeProgress()
switch {
case err != nil:
setCondition(status, v1alpha1.PreparingBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
case done:
setCondition(status, v1alpha1.ProgressingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is progressing")
case preparedDone:
status.Phase = v1alpha1.RolloutPhaseProgressing
fallthrough
default:
result = reconcile.Result{RequeueAfter: DefaultDuration}
setCondition(status, v1alpha1.ProgressingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is progressing")
}
case v1alpha1.RolloutPhaseProgressing:
// progress the release plan in this state.
var progressDone bool
if progressDone, result = r.progressBatches(workloadController); progressDone {
setCondition(status, v1alpha1.FinalizingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is finalizing")
progressDone, result, err = r.progressBatches(workloadController)
switch {
case progressDone:
status.Phase = v1alpha1.RolloutPhaseFinalizing
setCondition(status, v1alpha1.FinalizingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is finalizing")
}
case v1alpha1.RolloutPhaseFinalizing:
// finalize canary the resources when progressing done.
// Do not clean the canary resources, because rollout
// controller should set the traffic routing first.
if succeed := workloadController.FinalizeProgress(false); succeed {
var finalizedDone bool
finalizedDone, err = workloadController.FinalizeProgress(false)
switch {
case err != nil:
setCondition(status, v1alpha1.CompletedBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
case finalizedDone:
if IsAllBatchReady(r.releasePlan, r.releaseStatus) {
status.Phase = v1alpha1.RolloutPhaseCompleted
setCondition(status, v1alpha1.CompletedBatchReleaseCondition, v1.ConditionTrue, v1alpha1.SucceededBatchReleaseConditionReason, "BatchRelease is completed")
@ -151,15 +158,19 @@ func (r *Executor) executeBatchReleasePlan(workloadController workloads.Workload
status.Phase = v1alpha1.RolloutPhaseCancelled
setCondition(status, v1alpha1.CancelledBatchReleaseCondition, v1.ConditionTrue, v1alpha1.SucceededBatchReleaseConditionReason, "BatchRelease is cancelled")
}
} else {
default:
result = reconcile.Result{RequeueAfter: DefaultDuration}
}
case v1alpha1.RolloutPhaseTerminating:
// when batchRelease is deleted, should clean up extra canary resources
if succeed := workloadController.FinalizeProgress(true); succeed {
var finalizedDone bool
finalizedDone, err = workloadController.FinalizeProgress(true)
switch {
case err != nil:
setCondition(status, v1alpha1.CompletedBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
case finalizedDone:
setCondition(status, v1alpha1.TerminatedBatchReleaseCondition, v1.ConditionTrue, v1alpha1.SucceededBatchReleaseConditionReason, "BatchRelease is terminated")
} else {
default:
result = reconcile.Result{RequeueAfter: DefaultDuration}
}
@ -171,11 +182,12 @@ func (r *Executor) executeBatchReleasePlan(workloadController workloads.Workload
panic(fmt.Sprintf("illegal release status %+v", status))
}
return result, status
return result, status, err
}
// reconcile logic when we are in the middle of release, we have to go through finalizing state before succeed or fail
func (r *Executor) progressBatches(workloadController workloads.WorkloadController) (bool, reconcile.Result) {
func (r *Executor) progressBatches(workloadController workloads.WorkloadController) (bool, reconcile.Result, error) {
var err error
progressDone := false
status := r.releaseStatus
result := reconcile.Result{}
@ -185,28 +197,28 @@ func (r *Executor) progressBatches(workloadController workloads.WorkloadControll
switch status.CanaryStatus.CurrentBatchState {
case "", v1alpha1.UpgradingBatchState:
// modify workload replicas/partition based on release plan in this state.
upgradeDone, err := workloadController.UpgradeOneBatch()
upgradeDone, upgradeErr := workloadController.UpgradeOneBatch()
switch {
case err != nil:
case upgradeErr != nil:
err = upgradeErr
setCondition(status, "Progressing", v1.ConditionFalse, "UpgradeBatchFailed", err.Error())
case upgradeDone:
status.CanaryStatus.CurrentBatchState = v1alpha1.VerifyingBatchState
fallthrough
default:
result = reconcile.Result{RequeueAfter: DefaultDuration}
status.CanaryStatus.CurrentBatchState = v1alpha1.VerifyingBatchState
}
case v1alpha1.VerifyingBatchState:
// TODO: metrics analysis
// replicas/partition has been modified, should wait pod ready in this state.
verified, err := workloadController.CheckOneBatchReady()
verified, verifiedErr := workloadController.CheckOneBatchReady()
switch {
case err != nil:
case verifiedErr != nil:
err = verifiedErr
setCondition(status, "Progressing", v1.ConditionFalse, "VerifyBatchFailed", err.Error())
case verified:
result = reconcile.Result{RequeueAfter: DefaultDuration}
now := metav1.Now()
status.CanaryStatus.BatchReadyTime = &now
result = reconcile.Result{RequeueAfter: DefaultDuration}
status.CanaryStatus.CurrentBatchState = v1alpha1.ReadyBatchState
default:
status.CanaryStatus.CurrentBatchState = v1alpha1.UpgradingBatchState
@ -237,7 +249,7 @@ func (r *Executor) progressBatches(workloadController workloads.WorkloadControll
panic(fmt.Sprintf("illegal status %+v", r.releaseStatus))
}
return progressDone, result
return progressDone, result, err
}
// GetWorkloadController pick the right workload controller to work on the workload

View File

@ -25,13 +25,17 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadController) (needStopThisRound bool, result reconcile.Result) {
func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadController) (bool, reconcile.Result, error) {
var err error
var reason string
var message string
var needRetry bool
needStopThisRound := false
result := reconcile.Result{}
// sync the workload info and watch the workload change event
workloadEvent, workloadInfo, err := controller.SyncWorkloadInfo()
@ -49,7 +53,7 @@ func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadContr
case isPlanTerminating(r.release, r.releaseStatus):
// handle the case that the plan is deleted or is terminating
reason = "PlanTerminating"
message = "Release plan is deleted, then terminate"
message = "Release plan is deleted or cancelled, then terminate"
signalTerminating(r.releaseStatus)
case isPlanPaused(err, r.releasePlan, r.releaseStatus):
@ -86,7 +90,6 @@ func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadContr
// handle the case of IgnoreNotFound(err) != nil
reason = "GetWorkloadError"
message = err.Error()
needRetry = true
case isWorkloadGone(err, r.releaseStatus):
// handle the case that the workload is deleted
@ -105,12 +108,9 @@ func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadContr
reason = "ReplicasChanged"
message = "workload is scaling, then reinitialize batch status"
signalReinitializeBatch(r.releaseStatus)
case isWorkloadRollback(workloadEvent, r.releaseStatus):
// handle the case that workload is rolling back during progressing
reason = "StableOrRollback"
message = "workload is stable or rolling back, then abort"
signalFinalize(r.releaseStatus)
// we must ensure that this field is updated only when we have observed
// the workload scaling event, otherwise this event may be lost.
r.releaseStatus.ObservedWorkloadReplicas = *workloadInfo.Replicas
case isWorkloadChanged(workloadEvent, r.releaseStatus):
// handle the case of continuous release v1 -> v2 -> v3
@ -150,23 +150,18 @@ func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadContr
}
// will retry after 50ms
if needRetry {
err = client.IgnoreNotFound(err)
if needRetry && err == nil {
needStopThisRound = true
result = reconcile.Result{RequeueAfter: DefaultDuration}
}
return needStopThisRound, result
return needStopThisRound, result, err
}
func refreshStatus(release *v1alpha1.BatchRelease, newStatus *v1alpha1.BatchReleaseStatus, workloadInfo *workloads.WorkloadInfo) {
func refreshStatus(release *v1alpha1.BatchRelease, newStatus *v1alpha1.BatchReleaseStatus, workloadInfo *util.WorkloadInfo) {
// refresh workload info for status
if workloadInfo != nil {
if workloadInfo.Replicas != nil {
newStatus.ObservedWorkloadReplicas = *workloadInfo.Replicas
}
if workloadInfo.UpdateRevision != nil {
newStatus.UpdateRevision = *workloadInfo.UpdateRevision
}
if workloadInfo.Status != nil {
newStatus.CanaryStatus.UpdatedReplicas = workloadInfo.Status.UpdatedReplicas
newStatus.CanaryStatus.UpdatedReadyReplicas = workloadInfo.Status.UpdatedReadyReplicas
@ -209,16 +204,12 @@ func isWorkloadScaling(event workloads.WorkloadEventType, status *v1alpha1.Batch
return event == workloads.WorkloadReplicasChanged && status.Phase == v1alpha1.RolloutPhaseProgressing
}
func isWorkloadRollback(event workloads.WorkloadEventType, status *v1alpha1.BatchReleaseStatus) bool {
return event == workloads.WorkloadRollback && status.Phase == v1alpha1.RolloutPhaseProgressing
}
func isWorkloadChanged(event workloads.WorkloadEventType, status *v1alpha1.BatchReleaseStatus) bool {
return event == workloads.WorkloadPodTemplateChanged && status.Phase == v1alpha1.RolloutPhaseProgressing
}
func isWorkloadUnhealthy(event workloads.WorkloadEventType, _ *v1alpha1.BatchReleaseStatus) bool {
return event == workloads.WorkloadUnHealthy
func isWorkloadUnhealthy(event workloads.WorkloadEventType, status *v1alpha1.BatchReleaseStatus) bool {
return event == workloads.WorkloadUnHealthy && status.Phase == v1alpha1.RolloutPhaseProgressing
}
func isWorkloadUnstable(event workloads.WorkloadEventType, _ *v1alpha1.BatchReleaseStatus) bool {

View File

@ -29,7 +29,7 @@ import (
func HasTerminatingCondition(status v1alpha1.BatchReleaseStatus) bool {
for i := range status.Conditions {
c := status.Conditions[i]
if c.Type == "Terminated" && c.Status == v1.ConditionTrue {
if c.Type == v1alpha1.TerminatedBatchReleaseCondition && c.Status == v1.ConditionTrue {
return true
}
}

View File

@ -22,6 +22,7 @@ import (
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
@ -57,36 +58,38 @@ func NewCloneSetRolloutController(cli client.Client, recorder record.EventRecord
// VerifyWorkload verifies that the workload is ready to execute release plan
func (c *CloneSetRolloutController) VerifyWorkload() (bool, error) {
var err error
var message string
defer func() {
if err != nil {
klog.Warningf(err.Error())
c.recorder.Event(c.parentController, v1.EventTypeWarning, "VerifyFailed", err.Error())
} else if message != "" {
klog.Warningf(message)
}
}()
if err = c.fetchCloneSet(); err != nil {
return true, err
message = err.Error()
return false, err
}
// if the workload status is untrustworthy
if c.clone.Status.ObservedGeneration != c.clone.Generation {
err = fmt.Errorf("CloneSet(%v) is still reconciling, wait for it to be done", c.targetNamespacedName)
return false, err
message = fmt.Sprintf("CloneSet(%v) is still reconciling, wait for it to be done", c.targetNamespacedName)
return false, nil
}
// if the cloneSet has been promoted, no need to go on
if c.clone.Status.UpdatedReplicas == *c.clone.Spec.Replicas {
err = fmt.Errorf("CloneSet(%v) update revision has been promoted, no need to reconcile", c.targetNamespacedName)
return false, err
message = fmt.Sprintf("CloneSet(%v) update revision has been promoted, no need to reconcile", c.targetNamespacedName)
return false, nil
}
// if the cloneSet is not paused and is not under our control
if !c.clone.Spec.UpdateStrategy.Paused {
err = fmt.Errorf("CloneSet(%v) should be paused before execute the release plan", c.targetNamespacedName)
return false, err
message = fmt.Sprintf("CloneSet(%v) should be paused before execute the release plan", c.targetNamespacedName)
return false, nil
}
klog.V(3).Infof("Verified CloneSet(%v) Successfully,", c.targetNamespacedName)
c.recorder.Event(c.parentController, v1.EventTypeNormal, "VerifiedSuccessfully", "ReleasePlan and the CloneSet resource are verified")
return true, nil
}
@ -94,13 +97,12 @@ func (c *CloneSetRolloutController) VerifyWorkload() (bool, error) {
// PrepareBeforeProgress makes sure that the source and target CloneSet is under our control
func (c *CloneSetRolloutController) PrepareBeforeProgress() (bool, error) {
if err := c.fetchCloneSet(); err != nil {
//c.releaseStatus.RolloutRetry(err.Error())
return false, nil
return false, err
}
// claim the cloneSet is under our control
if _, err := c.claimCloneSet(c.clone); err != nil {
return false, nil
return false, err
}
// record revisions and replicas info to BatchRelease.Status
@ -114,7 +116,7 @@ func (c *CloneSetRolloutController) PrepareBeforeProgress() (bool, error) {
// and then set the partition accordingly
func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) {
if err := c.fetchCloneSet(); err != nil {
return false, nil
return false, err
}
if c.releaseStatus.ObservedWorkloadReplicas == 0 {
@ -127,6 +129,7 @@ func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) {
return false, nil
}
currentBatch := c.parentController.Status.CanaryStatus.CurrentBatch
// the number of canary pods should have in current batch
canaryGoal := c.calculateCurrentCanary(c.releaseStatus.ObservedWorkloadReplicas)
// the number of stable pods should have in current batch
@ -137,14 +140,6 @@ func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) {
workloadPartition, _ := intstr.GetValueFromIntOrPercent(c.clone.Spec.UpdateStrategy.Partition,
int(c.releaseStatus.ObservedWorkloadReplicas), true)
// in case of no need to upgrade pods
if currentCanaryReplicas >= canaryGoal && int32(workloadPartition) <= stableGoal {
klog.V(3).InfoS("upgraded one batch, but no need to update partition of cloneset",
"BatchRelease", c.releasePlanKey, "current-batch", c.releaseStatus.CanaryStatus.CurrentBatch,
"canary-goal", canaryGoal, "stable-goal", stableGoal, "canary-replicas", currentCanaryReplicas, "partition", workloadPartition)
return true, nil
}
// if canaryReplicas is int, then we use int;
// if canaryReplicas is percentage, then we use percentage.
var partitionGoal intstr.IntOrString
@ -155,13 +150,26 @@ func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) {
partitionGoal = ParseIntegerAsPercentageIfPossible(stableGoal, c.releaseStatus.ObservedWorkloadReplicas, &canaryIntOrStr)
}
// upgrade pods
if err := c.patchCloneSetPartition(c.clone, &partitionGoal); err != nil {
return false, nil
klog.V(3).InfoS("upgraded one batch, current info:",
"BatchRelease", c.releasePlanKey,
"current-batch", currentBatch,
"canary-goal", canaryGoal,
"stable-goal", stableGoal,
"partition-goal", partitionGoal,
"partition-current", workloadPartition,
"canary-replicas", currentCanaryReplicas)
// in case of no need to upgrade pods
IsUpgradedDone := func() bool {
return currentCanaryReplicas >= canaryGoal && int32(workloadPartition) <= stableGoal
}
if !IsUpgradedDone() {
if err := c.patchCloneSetPartition(c.clone, &partitionGoal); err != nil {
return false, err
}
}
klog.V(3).InfoS("upgraded one batch", "BatchRelease", c.releasePlanKey,
"current batch", c.releaseStatus.CanaryStatus.CurrentBatch, "updateRevision size", canaryGoal)
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "SetBatchDone",
"Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
@ -170,7 +178,7 @@ func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) {
// CheckOneBatchReady checks to see if the pods are all available according to the rollout plan
func (c *CloneSetRolloutController) CheckOneBatchReady() (bool, error) {
if err := c.fetchCloneSet(); err != nil {
return false, nil
return false, err
}
// if the workload status is untrustworthy
@ -198,9 +206,14 @@ func (c *CloneSetRolloutController) CheckOneBatchReady() (bool, error) {
maxUnavailable, _ = intstr.GetValueFromIntOrPercent(c.clone.Spec.UpdateStrategy.MaxUnavailable, int(c.releaseStatus.ObservedWorkloadReplicas), true)
}
klog.InfoS("checking the batch releasing progress", "BatchRelease", c.releasePlanKey,
"current-batch", c.releaseStatus.CanaryStatus.CurrentBatch, "canary-ready-replicas", canaryReadyReplicas,
"stable-replicas", stableReplicas, "max-unavailable", maxUnavailable, "canary-goal", canaryGoal, "stable-goal", stableGoal)
klog.InfoS("checking the batch releasing progress",
"BatchRelease", c.releasePlanKey,
"current-batch", c.releaseStatus.CanaryStatus.CurrentBatch,
"canary-goal", canaryGoal,
"stable-goal", stableGoal,
"stable-replicas", stableReplicas,
"max-unavailable", maxUnavailable,
"canary-ready-replicas", canaryReadyReplicas)
// maybe, the workload replicas was scaled, we should requeue and handle the workload scaling event
if c.clone.Status.Replicas != c.releaseStatus.ObservedWorkloadReplicas {
@ -220,8 +233,8 @@ func (c *CloneSetRolloutController) CheckOneBatchReady() (bool, error) {
}
if currentBatchIsNotReadyYet() {
klog.InfoS("the batch is not ready yet", "CloneSet", c.targetNamespacedName,
"ReleasePlan", c.releasePlanKey, "current-batch", c.releaseStatus.CanaryStatus.CurrentBatch)
klog.InfoS("the batch is not ready yet", "BatchRelease",
c.releasePlanKey, "current-batch", c.releaseStatus.CanaryStatus.CurrentBatch)
return false, nil
}
@ -231,27 +244,22 @@ func (c *CloneSetRolloutController) CheckOneBatchReady() (bool, error) {
return true, nil
}
// FinalizeOneBatch isn't needed in this mode.
func (c *CloneSetRolloutController) FinalizeOneBatch() (bool, error) {
return true, nil
}
// FinalizeProgress makes sure the CloneSet is all upgraded
func (c *CloneSetRolloutController) FinalizeProgress(cleanup bool) bool {
func (c *CloneSetRolloutController) FinalizeProgress(cleanup bool) (bool, error) {
if err := c.fetchCloneSet(); client.IgnoreNotFound(err) != nil {
return false
return false, client.IgnoreNotFound(err)
}
if _, err := c.releaseCloneSet(c.clone, cleanup); err != nil {
return false
return false, err
}
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "FinalizedSuccessfully", "Rollout resource are finalized: cleanup=%v", cleanup)
return true
return true, nil
}
// SyncWorkloadInfo return change type if workload was changed during release
func (c *CloneSetRolloutController) SyncWorkloadInfo() (WorkloadEventType, *WorkloadInfo, error) {
func (c *CloneSetRolloutController) SyncWorkloadInfo() (WorkloadEventType, *util.WorkloadInfo, error) {
// ignore the sync if the release plan is deleted
if c.parentController.DeletionTimestamp != nil {
return IgnoreWorkloadEvent, nil, nil
@ -268,8 +276,8 @@ func (c *CloneSetRolloutController) SyncWorkloadInfo() (WorkloadEventType, *Work
return WorkloadStillReconciling, nil, nil
}
workloadInfo := &WorkloadInfo{
Status: &WorkloadStatus{
workloadInfo := &util.WorkloadInfo{
Status: &util.WorkloadStatus{
UpdatedReplicas: c.clone.Status.UpdatedReplicas,
UpdatedReadyReplicas: c.clone.Status.UpdatedReadyReplicas,
},
@ -280,14 +288,8 @@ func (c *CloneSetRolloutController) SyncWorkloadInfo() (WorkloadEventType, *Work
return IgnoreWorkloadEvent, workloadInfo, nil
}
// in case of that the workload is rolling back
if c.clone.Status.CurrentRevision == c.clone.Status.UpdateRevision && c.parentController.Status.UpdateRevision != c.clone.Status.UpdateRevision {
klog.Warningf("CloneSet(%v) is rolling back", c.targetNamespacedName)
return WorkloadRollback, workloadInfo, nil
}
// in case of that the workload is scaling
if *c.clone.Spec.Replicas != c.releaseStatus.ObservedWorkloadReplicas {
if *c.clone.Spec.Replicas != c.releaseStatus.ObservedWorkloadReplicas && c.releaseStatus.ObservedWorkloadReplicas != -1 {
workloadInfo.Replicas = c.clone.Spec.Replicas
klog.Warningf("CloneSet(%v) replicas changed during releasing, should pause and wait for it to complete, "+
"replicas from: %v -> %v", c.targetNamespacedName, c.releaseStatus.ObservedWorkloadReplicas, *c.clone.Spec.Replicas)

View File

@ -63,47 +63,48 @@ func NewDeploymentRolloutController(cli client.Client, recorder record.EventReco
// VerifyWorkload verifies that the workload is ready to execute release plan
func (c *DeploymentsRolloutController) VerifyWorkload() (bool, error) {
var err error
var message string
defer func() {
if err != nil {
klog.Error(err)
c.recorder.Event(c.parentController, v1.EventTypeWarning, "VerifyFailed", err.Error())
} else if message != "" {
klog.Warningf(message)
}
}()
if err = c.fetchStableDeployment(); err != nil {
return true, err
return false, err
}
if err = c.fetchCanaryDeployment(); client.IgnoreNotFound(err) != nil {
return true, err
return false, err
}
// if the workload status is untrustworthy, return and retry
if c.stable.Status.ObservedGeneration != c.stable.Generation {
err = fmt.Errorf("deployment(%v) is still reconciling, wait for it to be done", c.stableNamespacedName)
return false, err
message = fmt.Sprintf("deployment(%v) is still reconciling, wait for it to be done", c.stableNamespacedName)
return false, nil
}
// if the workload has been promoted, return and not retry
if c.stable.Status.UpdatedReplicas == *c.stable.Spec.Replicas {
err = fmt.Errorf("deployment(%v) update revision has been promoted, no need to rollout", c.stableNamespacedName)
return false, err
message = fmt.Sprintf("deployment(%v) update revision has been promoted, no need to rollout", c.stableNamespacedName)
return false, nil
}
// if the workload is not paused, no need to progress it
if !c.stable.Spec.Paused {
err = fmt.Errorf("deployment(%v) should be paused before execute the release plan", c.stableNamespacedName)
return false, err
message = fmt.Sprintf("deployment(%v) should be paused before execute the release plan", c.stableNamespacedName)
return false, nil
}
// claim the deployment is under our control, and create canary deployment if it needs.
// Do not move this function to Preparing phase., otherwise multi canary deployments
// Do not move this function to Preparing phase, otherwise multi canary deployments
// will be repeatedly created due to informer cache latency.
if _, err = c.claimDeployment(c.stable, c.canary); err != nil {
return true, err
return false, err
}
klog.Infof("Verified Deployment(%v) Successfully, Status %+v", c.stableNamespacedName, c.releaseStatus)
c.recorder.Event(c.parentController, v1.EventTypeNormal, "Verified", "ReleasePlan and the Deployment resource are verified")
return true, nil
}
@ -113,7 +114,7 @@ func (c *DeploymentsRolloutController) PrepareBeforeProgress() (bool, error) {
// the workload is verified, and we should record revision and replicas info before progressing
if err := c.recordDeploymentRevisionAndReplicas(); err != nil {
klog.Errorf("Failed to record deployment(%v) revision and replicas info, error: %v", c.stableNamespacedName, err)
return false, nil
return false, err
}
c.recorder.Event(c.parentController, v1.EventTypeNormal, "Initialized", "Rollout resource are initialized")
@ -124,10 +125,10 @@ func (c *DeploymentsRolloutController) PrepareBeforeProgress() (bool, error) {
// according to the release plan and then set the canary deployment replicas
func (c *DeploymentsRolloutController) UpgradeOneBatch() (bool, error) {
if err := c.fetchStableDeployment(); err != nil {
return false, nil
return false, err
}
if err := c.fetchCanaryDeployment(); err != nil {
return false, nil
return false, err
}
// canary replicas now we have at current state
@ -136,22 +137,21 @@ func (c *DeploymentsRolloutController) UpgradeOneBatch() (bool, error) {
// canary goal we should achieve
canaryGoal := c.calculateCurrentCanary(c.releaseStatus.ObservedWorkloadReplicas)
// in case of no need to upgrade in current batch
if currentCanaryReplicas >= canaryGoal {
klog.V(3).InfoS("upgraded one batch, but no need to update replicas of canary Deployment",
"Deployment", client.ObjectKeyFromObject(c.canary), "BatchRelease", c.releaseKey,
"current-batch", c.releaseStatus.CanaryStatus.CurrentBatch, "goal-canary-replicas", canaryGoal,
"current-canary-replicas", currentCanaryReplicas, "current-canary-pod-count", c.canary.Status.UpdatedReplicas)
return true, nil
}
klog.V(3).InfoS("upgraded one batch, but no need to update replicas of canary Deployment",
"Deployment", client.ObjectKeyFromObject(c.canary),
"BatchRelease", c.releaseKey,
"current-batch", c.releaseStatus.CanaryStatus.CurrentBatch,
"canary-goal", canaryGoal,
"current-canary-replicas", currentCanaryReplicas,
"current-canary-status-replicas", c.canary.Status.UpdatedReplicas)
// upgrade pods if it needs
if err := c.patchDeploymentReplicas(c.canary, canaryGoal); err != nil {
return false, nil
if currentCanaryReplicas < canaryGoal {
if err := c.patchDeploymentReplicas(c.canary, canaryGoal); err != nil {
return false, err
}
}
klog.V(3).Infof("Deployment(%v) upgraded one batch, BatchRelease(%v), current batch=%v, canary goal size=%v",
client.ObjectKeyFromObject(c.canary), c.releaseKey, c.releaseStatus.CanaryStatus.CurrentBatch, canaryGoal)
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "Batch Rollout", "Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
}
@ -159,10 +159,10 @@ func (c *DeploymentsRolloutController) UpgradeOneBatch() (bool, error) {
// CheckOneBatchReady checks to see if the pods are all available according to the rollout plan
func (c *DeploymentsRolloutController) CheckOneBatchReady() (bool, error) {
if err := c.fetchStableDeployment(); err != nil {
return false, nil
return false, err
}
if err := c.fetchCanaryDeployment(); err != nil {
return false, nil
return false, err
}
// in case of workload status is Untrustworthy
@ -184,9 +184,12 @@ func (c *DeploymentsRolloutController) CheckOneBatchReady() (bool, error) {
}
klog.InfoS("checking the batch releasing progress",
"BatchRelease", c.releaseKey, "current-batch", c.releaseStatus.CanaryStatus.CurrentBatch,
"canary-available-pod-count", availableCanaryPodCount, "stable-pod-count", c.stable.Status.Replicas,
"maxUnavailable-pod-allowed", maxUnavailable, "canary-goal", canaryGoal)
"BatchRelease", c.releaseKey,
"current-batch", c.releaseStatus.CanaryStatus.CurrentBatch,
"canary-goal", canaryGoal,
"canary-available-pod-count", availableCanaryPodCount,
"stable-pod-status-replicas", c.stable.Status.Replicas,
"maxUnavailable", maxUnavailable)
currentBatchIsNotReadyYet := func() bool {
// the number of upgrade pods does not achieve the goal
@ -203,36 +206,30 @@ func (c *DeploymentsRolloutController) CheckOneBatchReady() (bool, error) {
return false, nil
}
klog.InfoS("Deployment all pods in current batch are ready", "Deployment", client.ObjectKeyFromObject(c.canary), "current batch", c.releaseStatus.CanaryStatus.CurrentBatch)
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "Batch Available", "Batch %d is available", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
}
// FinalizeOneBatch isn't needed in this mode.
func (c *DeploymentsRolloutController) FinalizeOneBatch() (bool, error) {
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "BatchReady", "Batch %d is available", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
}
// FinalizeProgress makes sure restore deployments and clean up some canary settings
func (c *DeploymentsRolloutController) FinalizeProgress(cleanup bool) bool {
func (c *DeploymentsRolloutController) FinalizeProgress(cleanup bool) (bool, error) {
if err := c.fetchStableDeployment(); client.IgnoreNotFound(err) != nil {
return false
return false, err
}
// make the deployment ride out of our control, and clean up canary resources
succeed, err := c.releaseDeployment(c.stable, cleanup)
if !succeed || err != nil {
klog.Errorf("Failed to finalize deployment(%v), error: %v", c.stableNamespacedName, err)
return false
return false, err
}
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "Finalized", "Finalized: cleanup=%v", cleanup)
return true
return true, nil
}
// SyncWorkloadInfo return workloadInfo if workload info is changed during rollout
// TODO: abstract a WorkloadEventTypeJudge interface for these following `if` clauses
func (c *DeploymentsRolloutController) SyncWorkloadInfo() (WorkloadEventType, *WorkloadInfo, error) {
func (c *DeploymentsRolloutController) SyncWorkloadInfo() (WorkloadEventType, *util.WorkloadInfo, error) {
// ignore the sync if the release plan is deleted
if c.parentController.DeletionTimestamp != nil {
return IgnoreWorkloadEvent, nil, nil
@ -249,9 +246,9 @@ func (c *DeploymentsRolloutController) SyncWorkloadInfo() (WorkloadEventType, *W
return "", nil, err
}
workloadInfo := &WorkloadInfo{}
workloadInfo := util.NewWorkloadInfo()
if c.canary != nil {
workloadInfo.Status = &WorkloadStatus{
workloadInfo.Status = &util.WorkloadStatus{
UpdatedReplicas: c.canary.Status.Replicas,
UpdatedReadyReplicas: c.canary.Status.AvailableReplicas,
}
@ -274,13 +271,8 @@ func (c *DeploymentsRolloutController) SyncWorkloadInfo() (WorkloadEventType, *W
return IgnoreWorkloadEvent, workloadInfo, nil
}
// in case of that the workload needs to rollback
if needsRollBack, _ := c.isDeploymentRollBack(); needsRollBack {
return WorkloadRollback, workloadInfo, nil
}
// in case of that the workload is scaling up/down
if *c.stable.Spec.Replicas != c.releaseStatus.ObservedWorkloadReplicas {
if *c.stable.Spec.Replicas != c.releaseStatus.ObservedWorkloadReplicas && c.releaseStatus.ObservedWorkloadReplicas != -1 {
workloadInfo.Replicas = c.stable.Spec.Replicas
klog.Warningf("Deployment(%v) replicas changed during releasing, should pause and wait for it to complete, replicas from: %v -> %v",
c.stableNamespacedName, c.releaseStatus.ObservedWorkloadReplicas, *c.stable.Spec.Replicas)
@ -288,7 +280,8 @@ func (c *DeploymentsRolloutController) SyncWorkloadInfo() (WorkloadEventType, *W
}
// in case of that the workload revision was changed
if util.ComputeHash(&c.stable.Spec.Template, nil) != c.releaseStatus.UpdateRevision {
if hashRevision := util.ComputeHash(&c.stable.Spec.Template, nil); hashRevision != c.releaseStatus.UpdateRevision {
workloadInfo.Status.UpdateRevision = hashRevision
klog.Warningf("Deployment(%v) updateRevision changed during releasing", c.stableNamespacedName)
return WorkloadPodTemplateChanged, workloadInfo, nil
}
@ -371,29 +364,3 @@ func (c *DeploymentsRolloutController) recordDeploymentRevisionAndReplicas() err
c.releaseStatus.ObservedWorkloadReplicas = *c.stable.Spec.Replicas
return nil
}
// isDeploymentRollBack returns 'true' if the workload needs to rollback
func (c *DeploymentsRolloutController) isDeploymentRollBack() (bool, error) {
if c.canary != nil {
return false, nil
}
rss, err := c.listReplicaSetsFor(c.stable)
if err != nil {
return false, err
}
var stableRS *apps.ReplicaSet
for _, rs := range rss {
if rs.Spec.Replicas != nil && *rs.Spec.Replicas > 0 &&
rs.Labels[apps.DefaultDeploymentUniqueLabelKey] == c.releaseStatus.StableRevision {
stableRS = rs
break
}
}
if stableRS != nil && util.EqualIgnoreHash(&stableRS.Spec.Template, &c.stable.Spec.Template) {
return true, nil
}
return false, nil
}

View File

@ -18,7 +18,7 @@ package workloads
import (
"github.com/openkruise/rollouts/api/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/openkruise/rollouts/pkg/util"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)
@ -27,29 +27,12 @@ type WorkloadEventType string
const (
IgnoreWorkloadEvent WorkloadEventType = "workload-not-cared"
WorkloadRollback WorkloadEventType = "workload-is-rolling-back"
WorkloadPodTemplateChanged WorkloadEventType = "workload-pod-template-changed"
WorkloadReplicasChanged WorkloadEventType = "workload-replicas-changed"
WorkloadStillReconciling WorkloadEventType = "workload-is-reconciling"
WorkloadUnHealthy WorkloadEventType = "workload-is-unhealthy"
)
type WorkloadStatus struct {
Replicas int32
ReadyReplicas int32
UpdatedReplicas int32
UpdatedReadyReplicas int32
ObservedGeneration int64
}
type WorkloadInfo struct {
Paused bool
Replicas *int32
UpdateRevision *string
Status *WorkloadStatus
Metadata *metav1.ObjectMeta
}
type workloadController struct {
client client.Client
recorder record.EventRecorder
@ -61,9 +44,8 @@ type workloadController struct {
// WorkloadController is the interface that all type of cloneSet controller implements
type WorkloadController interface {
// VerifyWorkload makes sure that the workload can be upgraded according to the release plan.
// it returns 'true', the controller will requeue the request and continue to reconcile after a short duration.
// it returns 'false', the controller will not requeue the request.
// if err == nil, if the verification is successful.
// it returns 'true', if this verification is successful.
// it returns 'false' or err != nil, if this verification is failed.
// it returns not-empty error if the verification has something wrong, and should not retry.
VerifyWorkload() (bool, error)
@ -87,12 +69,6 @@ type WorkloadController interface {
// it returns not-empty error if the check operation has something wrong, and should not retry.
CheckOneBatchReady() (bool, error)
// FinalizeOneBatch makes sure that the rollout can start the next batch
// it returns 'true' if the operation is succeeded.
// it returns 'false' if the operation should be retried.
// it returns not-empty error if the check operation has something wrong, and should not retry.
FinalizeOneBatch() (bool, error)
// FinalizeProgress makes sure the resources are in a good final state.
// It might depend on if the rollout succeeded or not.
// For example, we may remove the objects which created by batchRelease.
@ -100,10 +76,10 @@ type WorkloadController interface {
// parameters:
// - pause: 'nil' means keep current state, 'true' means pause workload, 'false' means do not pause workload
// - cleanup: 'true' means clean up canary settings, 'false' means do not clean up.
FinalizeProgress(cleanup bool) bool
FinalizeProgress(cleanup bool) (bool, error)
// SyncWorkloadInfo will watch and compare the status recorded in BatchRelease.Status
// and the real-time workload info. If workload status is inconsistent with that recorded
// in release.status, will return the corresponding WorkloadEventType and info.
SyncWorkloadInfo() (WorkloadEventType, *WorkloadInfo, error)
SyncWorkloadInfo() (WorkloadEventType, *util.WorkloadInfo, error)
}

View File

@ -21,16 +21,14 @@ import (
"fmt"
"time"
appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
rolloutv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func (r *rolloutContext) runCanary() error {
@ -172,7 +170,7 @@ func (r *rolloutContext) doCanaryUpgrade() (bool, error) {
// check whether batchRelease is ready
if batch.Status.CanaryStatus.CurrentBatchState != rolloutv1alpha1.ReadyBatchState ||
batch.Status.CanaryStatus.CurrentBatch+1 != canaryStatus.CurrentStepIndex {
batch.Status.CanaryStatus.CurrentBatch+1 < canaryStatus.CurrentStepIndex {
klog.Infof("rollout(%s/%s) batch(%s) state(%s), and wait a moment",
r.rollout.Namespace, r.rollout.Name, batchData, batch.Status.CanaryStatus.CurrentBatchState)
return false, nil
@ -260,21 +258,16 @@ func (r *rolloutContext) doCanaryFinalising() (bool, error) {
}
func (r *rolloutContext) removeRolloutStateInWorkload() error {
if r.workload == nil {
if r.workload == nil || r.rollout.Spec.ObjectRef.WorkloadRef == nil {
return nil
}
if _, ok := r.workload.Annotations[util.InRolloutProgressingAnnotation]; !ok {
return nil
}
var obj client.Object
// cloneSet
if r.workload.Kind == util.ControllerKruiseKindCS.Kind {
obj = &appsv1alpha1.CloneSet{}
// deployment
} else {
obj = &apps.Deployment{}
}
workloadRef := r.rollout.Spec.ObjectRef.WorkloadRef
workloadGVK := schema.FromAPIVersionAndKind(workloadRef.APIVersion, workloadRef.Kind)
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
obj := util.GetEmptyWorkloadObject(workloadGVK)
if err := r.Get(context.TODO(), types.NamespacedName{Name: r.workload.Name, Namespace: r.workload.Namespace}, obj); err != nil {
klog.Errorf("getting updated workload(%s.%s) failed: %s", r.workload.Namespace, r.workload.Name, err.Error())
return err

View File

@ -118,7 +118,7 @@ func (r *RolloutReconciler) SetupWithManager(mgr ctrl.Manager) error {
return err
}
if util.DiscoverGVK(util.CloneSetGVK) {
if util.DiscoverGVK(util.ControllerKruiseKindCS) {
// Watch for changes to cloneset
if err = c.Watch(&source.Kind{Type: &appsv1alpha1.CloneSet{}}, &enqueueRequestForWorkload{reader: mgr.GetCache(), scheme: r.Scheme}); err != nil {
return err

View File

@ -34,7 +34,6 @@ import (
// Workload is used to return (controller, scale, selector) fields from the
// controller finder functions.
type Workload struct {
metav1.TypeMeta
metav1.ObjectMeta
// replicas
@ -127,7 +126,6 @@ func (r *ControllerFinder) getKruiseCloneSet(namespace string, ref *rolloutv1alp
CanaryReplicas: cloneSet.Status.UpdatedReplicas,
CanaryReadyReplicas: cloneSet.Status.UpdatedReadyReplicas,
ObjectMeta: cloneSet.ObjectMeta,
TypeMeta: cloneSet.TypeMeta,
Replicas: *cloneSet.Spec.Replicas,
PodTemplateHash: cloneSet.Status.UpdateRevision[strings.LastIndex(cloneSet.Status.UpdateRevision, "-")+1:],
IsStatusConsistent: true,
@ -172,7 +170,6 @@ func (r *ControllerFinder) getDeployment(namespace string, ref *rolloutv1alpha1.
workload := &Workload{
ObjectMeta: stable.ObjectMeta,
TypeMeta: stable.TypeMeta,
Replicas: *stable.Spec.Replicas,
IsStatusConsistent: true,
StableRevision: stableRs.Labels[apps.DefaultDeploymentUniqueLabelKey],

View File

@ -38,7 +38,7 @@ const (
// RolloutState is annotation[rollouts.kruise.io/in-progressing] value
type RolloutState struct {
RolloutName string
RolloutName string `json:"rolloutName"`
}
func GetRolloutState(annotations map[string]string) (*RolloutState, error) {

View File

@ -33,11 +33,16 @@ import (
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
@ -54,9 +59,32 @@ const (
alphanums = "bcdfghjklmnpqrstvwxz2456789"
)
var (
CloneSetGVK = appsv1alpha1.SchemeGroupVersion.WithKind("CloneSet")
)
type WorkloadStatus struct {
Replicas int32
ReadyReplicas int32
UpdatedReplicas int32
UpdatedReadyReplicas int32
AvailableReplicas int32
ObservedGeneration int64
UpdateRevision string
StableRevision string
}
type WorkloadInfo struct {
Paused bool
Replicas *int32
GVKWithName string
Selector labels.Selector
MaxUnavailable *intstr.IntOrString
Metadata *metav1.ObjectMeta
Status *WorkloadStatus
}
func NewWorkloadInfo() *WorkloadInfo {
return &WorkloadInfo{
Status: &WorkloadStatus{},
}
}
// DeepHashObject writes specified object to hash using the spew library
// which follows pointers and prints actual values of the nested objects
@ -99,20 +127,6 @@ func SafeEncodeString(s string) string {
return string(r)
}
func IsControlledBy(object, owner metav1.Object) bool {
controlInfo, controlled := object.GetAnnotations()[BatchReleaseControlAnnotation]
if !controlled {
return false
}
o := &metav1.OwnerReference{}
if err := json.Unmarshal([]byte(controlInfo), o); err != nil {
return false
}
return o.UID == owner.GetUID()
}
func CalculateNewBatchTarget(rolloutSpec *v1alpha1.ReleasePlan, workloadReplicas, currentBatch int) int {
batchSize, _ := intstr.GetValueFromIntOrPercent(&rolloutSpec.Batches[currentBatch].CanaryReplicas, workloadReplicas, true)
if batchSize > workloadReplicas {
@ -197,3 +211,125 @@ func GenRandomStr(length int) string {
randStr := rand.String(length)
return rand.SafeEncodeString(randStr)
}
func ParseReplicasFrom(object *unstructured.Unstructured) int32 {
replicas := int32(1)
field, found, err := unstructured.NestedInt64(object.Object, "spec", "replicas")
if err == nil && found {
replicas = int32(field)
}
return replicas
}
func ParseStatusIntFrom(object *unstructured.Unstructured, field string) int64 {
value, found, err := unstructured.NestedInt64(object.Object, "status", field)
if err == nil && found {
return value
}
return 0
}
func ParseStatusStringFrom(object *unstructured.Unstructured, field string) string {
value, found, err := unstructured.NestedFieldNoCopy(object.Object, "status", field)
if err == nil && found {
return value.(string)
}
return ""
}
func ParseMetadataFrom(object *unstructured.Unstructured) *metav1.ObjectMeta {
m, found, err := unstructured.NestedMap(object.Object, "metadata")
if err != nil || !found {
return nil
}
data, _ := json.Marshal(m)
meta := &metav1.ObjectMeta{}
_ = json.Unmarshal(data, meta)
return meta
}
func ParseMaxUnavailableFrom(object *unstructured.Unstructured) *intstr.IntOrString {
// case 1: object is statefulset
m, found, err := unstructured.NestedFieldCopy(object.Object, "spec", "updateStrategy", "rollingUpdate", "maxUnavailable")
if err == nil && found {
return parseIntStr(m)
}
// case2: object is cloneset
m, found, err = unstructured.NestedFieldCopy(object.Object, "spec", "updateStrategy", "maxUnavailable")
if err == nil && found {
return parseIntStr(m)
}
// case3: object is deployment
m, found, err = unstructured.NestedFieldCopy(object.Object, "spec", "strategy", "rollingUpdate", "maxUnavailable")
if err == nil && found {
return parseIntStr(m)
}
return nil
}
func ParseSelector(object *unstructured.Unstructured) (labels.Selector, error) {
m, found, err := unstructured.NestedFieldNoCopy(object.Object, "spec", "selector")
if err != nil || !found {
return nil, err
}
byteInfo, _ := json.Marshal(m)
labelSelector := &metav1.LabelSelector{}
_ = json.Unmarshal(byteInfo, labelSelector)
return metav1.LabelSelectorAsSelector(labelSelector)
}
func GetEmptyWorkloadObject(gvk schema.GroupVersionKind) client.Object {
switch gvk.Kind {
case ControllerKindDep.Kind:
return &apps.Deployment{}
case ControllerKruiseKindCS.Kind:
return &appsv1alpha1.CloneSet{}
default:
unstructuredObject := &unstructured.Unstructured{}
unstructuredObject.SetGroupVersionKind(gvk)
return unstructuredObject
}
}
func parseIntStr(m interface{}) *intstr.IntOrString {
field := &intstr.IntOrString{}
data, _ := json.Marshal(m)
_ = json.Unmarshal(data, field)
return field
}
func ParseWorkloadInfo(object *unstructured.Unstructured, namespacedName types.NamespacedName) *WorkloadInfo {
workloadGVKWithName := fmt.Sprintf("%v(%v)", object.GroupVersionKind().String(), namespacedName)
updateRevision := ParseStatusStringFrom(object, "updateRevision")
if len(updateRevision) > 0 {
updateRevision = updateRevision[len(object.GetName())+1:]
}
stableRevision := ParseStatusStringFrom(object, "currentRevision")
if len(stableRevision) > 0 {
stableRevision = stableRevision[len(object.GetName())+1:]
}
selector, err := ParseSelector(object)
if err != nil {
klog.Errorf("Failed to parse selector for workload(%v)", workloadGVKWithName)
}
return &WorkloadInfo{
Metadata: ParseMetadataFrom(object),
MaxUnavailable: ParseMaxUnavailableFrom(object),
Replicas: pointer.Int32(ParseReplicasFrom(object)),
GVKWithName: workloadGVKWithName,
Selector: selector,
Status: &WorkloadStatus{
ObservedGeneration: int64(ParseStatusIntFrom(object, "observedGeneration")),
Replicas: int32(ParseStatusIntFrom(object, "replicas")),
ReadyReplicas: int32(ParseStatusIntFrom(object, "readyReplicas")),
UpdatedReplicas: int32(ParseStatusIntFrom(object, "updatedReplicas")),
AvailableReplicas: int32(ParseStatusIntFrom(object, "availableReplicas")),
UpdatedReadyReplicas: int32(ParseStatusIntFrom(object, "updatedReadyReplicas")),
UpdateRevision: updateRevision,
StableRevision: stableRevision,
},
}
}

View File

@ -196,7 +196,7 @@ func TestHandlerDeployment(t *testing.T) {
expectObj: func() *apps.Deployment {
obj := deploymentDemo.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo"}`
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo"}`
obj.Spec.Paused = true
return obj
},
@ -265,18 +265,18 @@ func TestHandlerDeployment(t *testing.T) {
getObjs: func() (*apps.Deployment, *apps.Deployment) {
oldObj := deploymentDemo.DeepCopy()
oldObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
oldObj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":false}`
oldObj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo","RolloutDone":false}`
oldObj.Spec.Paused = true
newObj := deploymentDemo.DeepCopy()
newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
newObj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":false}`
newObj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo","RolloutDone":false}`
newObj.Spec.Paused = false
return oldObj, newObj
},
expectObj: func() *apps.Deployment {
obj := deploymentDemo.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":false}`
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo","RolloutDone":false}`
obj.Spec.Paused = true
return obj
},
@ -293,18 +293,18 @@ func TestHandlerDeployment(t *testing.T) {
getObjs: func() (*apps.Deployment, *apps.Deployment) {
oldObj := deploymentDemo.DeepCopy()
oldObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
oldObj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":true}`
oldObj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo","RolloutDone":true}`
oldObj.Spec.Paused = true
newObj := deploymentDemo.DeepCopy()
newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
newObj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":true}`
newObj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo","RolloutDone":true}`
newObj.Spec.Paused = false
return oldObj, newObj
},
expectObj: func() *apps.Deployment {
obj := deploymentDemo.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":true}`
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo","RolloutDone":true}`
obj.Spec.Paused = true
return obj
},
@ -372,7 +372,7 @@ func TestHandlerCloneSet(t *testing.T) {
expectObj: func() *kruisev1aplphal.CloneSet {
obj := cloneSetDemo.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo"}`
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"rolloutName":"rollout-demo"}`
obj.Spec.UpdateStrategy.Paused = true
return obj
},

View File

@ -43,6 +43,21 @@ import (
var _ = SIGDescribe("BatchRelease", func() {
var namespace string
DumpAllResources := func() {
rollout := &rolloutsv1alpha1.RolloutList{}
k8sClient.List(context.TODO(), rollout, client.InNamespace(namespace))
fmt.Println(workloads.DumpJSON(rollout))
batch := &rolloutsv1alpha1.BatchReleaseList{}
k8sClient.List(context.TODO(), batch, client.InNamespace(namespace))
fmt.Println(workloads.DumpJSON(batch))
deploy := &apps.DeploymentList{}
k8sClient.List(context.TODO(), deploy, client.InNamespace(namespace))
fmt.Println(workloads.DumpJSON(deploy))
rs := &apps.ReplicaSetList{}
k8sClient.List(context.TODO(), rs, client.InNamespace(namespace))
fmt.Println(workloads.DumpJSON(rs))
}
CreateObject := func(object client.Object, options ...client.CreateOption) {
object.SetNamespace(namespace)
Expect(k8sClient.Create(context.TODO(), object)).NotTo(HaveOccurred())
@ -157,6 +172,20 @@ var _ = SIGDescribe("BatchRelease", func() {
}, 20*time.Minute, time.Second).Should(BeTrue())
}
WaitBatchReleaseProgressing := func(object *rolloutsv1alpha1.BatchRelease) {
startTime := time.Now()
Eventually(func() rolloutsv1alpha1.RolloutPhase {
release := &rolloutsv1alpha1.BatchRelease{}
Expect(GetObject(object.Namespace, object.Name, release)).NotTo(HaveOccurred())
if startTime.Add(time.Minute * 5).Before(time.Now()) {
DumpAllResources()
Fail("Timeout waiting batchRelease to 'Progressing' phase")
}
fmt.Println(release.Status.Phase)
return release.Status.Phase
}, 10*time.Minute, time.Second).Should(Equal(rolloutsv1alpha1.RolloutPhaseProgressing))
}
BeforeEach(func() {
namespace = randomNamespaceName("batchrelease")
ns := v1.Namespace{
@ -170,7 +199,7 @@ var _ = SIGDescribe("BatchRelease", func() {
AfterEach(func() {
By("[TEST] Clean up resources after an integration test")
k8sClient.DeleteAllOf(context.TODO(), &apps.Deployment{}, client.InNamespace(namespace))
if workloads.DiscoverGVK(workloads.CloneSetGVK) {
if workloads.DiscoverGVK(workloads.ControllerKruiseKindCS) {
k8sClient.DeleteAllOf(context.TODO(), &kruiseappsv1alpha1.CloneSet{}, client.InNamespace(namespace))
}
k8sClient.DeleteAllOf(context.TODO(), &rolloutsv1alpha1.BatchRelease{}, client.InNamespace(namespace))
@ -178,7 +207,6 @@ var _ = SIGDescribe("BatchRelease", func() {
})
KruiseDescribe("CloneSet BatchRelease Checker", func() {
It("V1->V2: Percentage, 100%, Succeeded", func() {
By("Creating BatchRelease...")
release := &rolloutsv1alpha1.BatchRelease{}
@ -280,90 +308,6 @@ var _ = SIGDescribe("BatchRelease", func() {
}, 15*time.Minute, 5*time.Second).Should(Equal(int32(expectedReplicas)))
})
//It("V1->V2(Completed)->V3: Percentage, 100%, Succeeded", func() {
// By("Creating BatchRelease....")
// By("Creating BatchRelease...")
// release := &rolloutsv1alpha1.BatchRelease{}
// Expect(ReadYamlToObject("./test_data/batchrelease/cloneset_percentage_100.yaml", release)).ToNot(HaveOccurred())
// CreateObject(release)
//
// By("Creating CloneSet and waiting for all pods ready....")
// By("Creating workload and waiting for all pods ready...")
// cloneset := &kruiseappsv1alpha1.CloneSet{}
// Expect(ReadYamlToObject("./test_data/batchrelease/cloneset.yaml", cloneset)).ToNot(HaveOccurred())
// cloneset.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV1)
// CreateObject(cloneset)
// WaitCloneSetAllPodsReady(cloneset)
// stableRevisionV1 := GetUpdateRevision(cloneset)
//
// /*************************************************************************************
// Start to release V1->V2
// *************************************************************************************/
// By("Start to release V1->V2....")
// cloneset.Spec.UpdateStrategy.Paused = true
// cloneset.Spec.Replicas = pointer.Int32Ptr(5)
// cloneset.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV2)
// UpdateCloneSet(cloneset)
//
// // record canary revision --> v2
// canaryRevisionV2 := GetUpdateRevision(cloneset)
// Expect(canaryRevisionV2).ShouldNot(Equal(stableRevisionV1))
//
// By("V1->V2: Checking CloneSet updated replicas...")
// for i := range release.Spec.ReleasePlan.Batches {
// By(fmt.Sprintf("\tWaiting for batch[%v] completed...", i))
// batch := &release.Spec.ReleasePlan.Batches[i]
// expectedUpdatedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(&batch.CanaryReplicas, int(*cloneset.Spec.Replicas), true)
// Eventually(func() int32 {
// clone := &kruiseappsv1alpha1.CloneSet{}
// Expect(GetObject(cloneset.Namespace, cloneset.Name, clone)).NotTo(HaveOccurred())
// return clone.Status.UpdatedReplicas
// }, 5*time.Minute, time.Second).Should(Equal(int32(expectedUpdatedReplicas)))
// time.Sleep(time.Duration(batch.PauseSeconds) * time.Second)
// }
//
// By("V1->V2: Checking BatchRelease status...")
// Eventually(func() rolloutsv1alpha1.RolloutPhase {
// clone := &rolloutsv1alpha1.BatchRelease{}
// Expect(GetObject(release.Namespace, release.Name, clone)).NotTo(HaveOccurred())
// return clone.Status.Phase
// }, 15*time.Minute, 5*time.Second).Should(Equal(rolloutsv1alpha1.RolloutPhaseCompleted))
//
// /*************************************************************************************
// V1->V2 Succeeded, Start to release V2->V3
// *************************************************************************************/
// By("Start to release V2->V3....")
// cloneset.Spec.UpdateStrategy.Paused = true
// cloneset.Spec.Replicas = pointer.Int32Ptr(5)
// cloneset.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV3)
// UpdateCloneSet(cloneset)
//
// // record canary revision --> v3
// canaryRevisionV3 := GetUpdateRevision(cloneset)
// Expect(canaryRevisionV3).ShouldNot(Equal(stableRevisionV1))
// Expect(canaryRevisionV3).ShouldNot(Equal(canaryRevisionV2))
//
// By("V2->V3: Checking CloneSet updated replicas...")
// for i := range release.Spec.ReleasePlan.Batches {
// By(fmt.Sprintf("\tWaiting for batch[%v] completed...", i))
// batch := &release.Spec.ReleasePlan.Batches[i]
// expectedUpdatedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(&batch.CanaryReplicas, int(*cloneset.Spec.Replicas), true)
// Eventually(func() int32 {
// clone := &kruiseappsv1alpha1.CloneSet{}
// Expect(GetObject(cloneset.Namespace, cloneset.Name, clone)).NotTo(HaveOccurred())
// return clone.Status.UpdatedReplicas
// }, 5*time.Minute, time.Second).Should(Equal(int32(expectedUpdatedReplicas)))
// time.Sleep(time.Duration(batch.PauseSeconds) * time.Second)
// }
//
// By("V2->V3: Checking BatchRelease status...")
// Eventually(func() rolloutsv1alpha1.RolloutPhase {
// clone := &rolloutsv1alpha1.BatchRelease{}
// Expect(GetObject(release.Namespace, release.Name, clone)).NotTo(HaveOccurred())
// return clone.Status.Phase
// }, 15*time.Minute, 5*time.Second).Should(Equal(rolloutsv1alpha1.RolloutPhaseCompleted))
//})
It("V1->V2(UnCompleted)->V3: Percentage, 100%, Succeeded", func() {
By("Creating BatchRelease....")
By("Creating BatchRelease...")
@ -677,56 +621,6 @@ var _ = SIGDescribe("BatchRelease", func() {
return clone.Status.UpdatedReplicas == *clone.Spec.Replicas
}, 15*time.Minute, 5*time.Second).Should(BeTrue())
})
/*It("Rollback V1->V2->V1: Percentage, 100%, Succeeded", func() {
release := &rolloutsv1alpha1.BatchRelease{}
Expect(ReadYamlToObject("./test_data/batchrelease/cloneset_percentage_100.yaml", release)).ToNot(HaveOccurred())
CreateObject(release)
By("Creating workload and waiting for all pods ready...")
cloneset := &kruiseappsv1alpha1.CloneSet{}
Expect(ReadYamlToObject("./test_data/batchrelease/cloneset.yaml", cloneset)).ToNot(HaveOccurred())
cloneset.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV1)
cloneset.Spec.Template.Spec.Containers[0].ImagePullPolicy = v1.PullIfNotPresent
CreateObject(cloneset)
WaitCloneSetAllPodsReady(cloneset)
// record stable revision --> v1
stableRevision := GetUpdateRevision(cloneset)
cloneset.Spec.UpdateStrategy.Paused = true
// todo
//cloneset.Spec.Replicas = pointer.Int32Ptr(10)
cloneset.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.FailedImage)
UpdateCloneSet(cloneset)
// record canary revision --> v2
canaryRevision := GetUpdateRevision(cloneset)
Expect(canaryRevision).ShouldNot(Equal(stableRevision))
By("Waiting a minute and checking failed revision...")
time.Sleep(time.Minute)
for i := 0; i < 30; i++ {
fetchedRelease := &rolloutsv1alpha1.BatchRelease{}
Expect(GetObject(release.Namespace, release.Name, fetchedRelease)).NotTo(HaveOccurred())
Expect(fetchedRelease.Status.CanaryStatus.UpdatedReplicas).Should(Equal(int32(1)))
Expect(fetchedRelease.Status.CanaryStatus.UpdatedReadyReplicas).Should(Equal(int32(0)))
Expect(fetchedRelease.Status.CanaryStatus.CurrentBatch).Should(Equal(int32(0)))
time.Sleep(time.Second)
}
By("Updating cloneset to V1...")
cloneset.Spec.UpdateStrategy.Partition = nil
cloneset.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV1)
UpdateCloneSet(cloneset)
By("Checking BatchRelease completed status phase...")
Eventually(func() rolloutsv1alpha1.RolloutPhase {
clone := &rolloutsv1alpha1.BatchRelease{}
Expect(GetObject(release.Namespace, release.Name, clone)).NotTo(HaveOccurred())
return clone.Status.Phase
}, 15*time.Minute, 5*time.Second).Should(Equal(rolloutsv1alpha1.RolloutPhaseCancelled))
})*/
})
KruiseDescribe("Deployment BatchRelease Checker", func() {
@ -751,15 +645,13 @@ var _ = SIGDescribe("BatchRelease", func() {
deployment.Spec.Replicas = pointer.Int32Ptr(5)
deployment.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV2)
UpdateDeployment(deployment)
WaitBatchReleaseProgressing(release)
// record canary revision --> v2
canaryRevision := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
Expect(canaryRevision).ShouldNot(Equal(stableRevision))
By("Checking Deployment updated replicas...")
Eventually(func() *apps.Deployment {
return GetCanaryDeployment(release)
}, 5*time.Minute, time.Second).ShouldNot(BeNil())
for i := range release.Spec.ReleasePlan.Batches {
By(fmt.Sprintf("\tWaiting for batch[%v] completed...", i))
batch := &release.Spec.ReleasePlan.Batches[i]
@ -804,11 +696,9 @@ var _ = SIGDescribe("BatchRelease", func() {
// record canary revision --> v2
canaryRevision := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
Expect(canaryRevision).ShouldNot(Equal(stableRevision))
WaitBatchReleaseProgressing(release)
By("Checking Deployment updated replicas...")
Eventually(func() *apps.Deployment {
return GetCanaryDeployment(release)
}, 5*time.Minute, time.Second).ShouldNot(BeNil())
for i := range release.Spec.ReleasePlan.Batches {
By(fmt.Sprintf("\tWaiting for batch[%v] completed...", i))
batch := &release.Spec.ReleasePlan.Batches[i]
@ -838,88 +728,6 @@ var _ = SIGDescribe("BatchRelease", func() {
}, 15*time.Minute, 5*time.Second).Should(Equal(int32(expectedReplicas)))
})
//It("V1->V2(Completed)->V3: Percentage, 100%, Succeeded", func() {
// By("Creating BatchRelease....")
// By("Creating BatchRelease...")
// release := &rolloutsv1alpha1.BatchRelease{}
// Expect(ReadYamlToObject("./test_data/batchrelease/deployment_percentage_100.yaml", release)).ToNot(HaveOccurred())
// CreateObject(release)
//
// By("Creating Deployment and waiting for all pods ready....")
// By("Creating workload and waiting for all pods ready...")
// deployment := &apps.Deployment{}
// Expect(ReadYamlToObject("./test_data/batchrelease/deployment.yaml", deployment)).ToNot(HaveOccurred())
// deployment.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV1)
// CreateObject(deployment)
// WaitDeploymentAllPodsReady(deployment)
// stableRevisionV1 := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
//
// /*************************************************************************************
// Start to release V1->V2
// *************************************************************************************/
// By("Start to release V1->V2....")
// deployment.Spec.Paused = true
// deployment.Spec.Replicas = pointer.Int32Ptr(5)
// deployment.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV2)
// UpdateDeployment(deployment)
//
// // record canary revision --> v2
// canaryRevisionV2 := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
// Expect(canaryRevisionV2).ShouldNot(Equal(stableRevisionV1))
//
// By("V1->V2: Checking Deployment updated replicas...")
// for i := range release.Spec.ReleasePlan.Batches {
// By(fmt.Sprintf("\tWaiting for batch[%v] completed...", i))
// batch := &release.Spec.ReleasePlan.Batches[i]
// expectedUpdatedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(&batch.CanaryReplicas, int(*deployment.Spec.Replicas), true)
// Eventually(func() int32 {
// clone := GetCanaryDeployment(release)
// return clone.Status.UpdatedReplicas
// }, 5*time.Minute, time.Second).Should(Equal(int32(expectedUpdatedReplicas)))
// time.Sleep(time.Duration(batch.PauseSeconds) * time.Second)
// }
//
// By("V1->V2: Checking BatchRelease status...")
// Eventually(func() rolloutsv1alpha1.RolloutPhase {
// clone := &rolloutsv1alpha1.BatchRelease{}
// Expect(GetObject(release.Namespace, release.Name, clone)).NotTo(HaveOccurred())
// return clone.Status.Phase
// }, 15*time.Minute, 5*time.Second).Should(Equal(rolloutsv1alpha1.RolloutPhaseCompleted))
//
// /*************************************************************************************
// V1->V2 Succeeded, Start to release V2->V3
// *************************************************************************************/
// By("Start to release V2->V3....")
// deployment.Spec.Paused = true
// deployment.Spec.Replicas = pointer.Int32Ptr(5)
// deployment.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV3)
// UpdateDeployment(deployment)
//
// // record canary revision --> v3
// canaryRevisionV3 := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
// Expect(canaryRevisionV3).ShouldNot(Equal(stableRevisionV1))
// Expect(canaryRevisionV3).ShouldNot(Equal(canaryRevisionV2))
//
// By("V2->V3: Checking Deployment updated replicas...")
// for i := range release.Spec.ReleasePlan.Batches {
// By(fmt.Sprintf("\tWaiting for batch[%v] completed...", i))
// batch := &release.Spec.ReleasePlan.Batches[i]
// expectedUpdatedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(&batch.CanaryReplicas, int(*deployment.Spec.Replicas), true)
// Eventually(func() int32 {
// clone := GetCanaryDeployment(release)
// return clone.Status.UpdatedReplicas
// }, 5*time.Minute, time.Second).Should(Equal(int32(expectedUpdatedReplicas)))
// time.Sleep(time.Duration(batch.PauseSeconds) * time.Second)
// }
//
// By("V2->V3: Checking BatchRelease status...")
// Eventually(func() rolloutsv1alpha1.RolloutPhase {
// clone := &rolloutsv1alpha1.BatchRelease{}
// Expect(GetObject(release.Namespace, release.Name, clone)).NotTo(HaveOccurred())
// return clone.Status.Phase
// }, 15*time.Minute, 5*time.Second).Should(Equal(rolloutsv1alpha1.RolloutPhaseCompleted))
//})
It("V1->V2(UnCompleted)->V3: Percentage, 100%, Succeeded", func() {
By("Creating BatchRelease....")
By("Creating BatchRelease...")
@ -944,15 +752,13 @@ var _ = SIGDescribe("BatchRelease", func() {
deployment.Spec.Replicas = pointer.Int32Ptr(5)
deployment.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV2)
UpdateDeployment(deployment)
WaitBatchReleaseProgressing(release)
// record canary revision --> v2
canaryRevisionV2 := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
Expect(canaryRevisionV2).ShouldNot(Equal(stableRevisionV1))
By("V1->V2: Checking Deployment updated replicas...")
Eventually(func() *apps.Deployment {
return GetCanaryDeployment(release)
}, 5*time.Minute, time.Second).ShouldNot(BeNil())
for i := 0; i < len(release.Spec.ReleasePlan.Batches)-2; i++ {
By(fmt.Sprintf("\tWaiting for batch[%v] completed...", i))
batch := &release.Spec.ReleasePlan.Batches[i]
@ -1012,15 +818,13 @@ var _ = SIGDescribe("BatchRelease", func() {
deployment.Spec.Replicas = pointer.Int32Ptr(5)
deployment.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV2)
UpdateDeployment(deployment)
WaitBatchReleaseProgressing(release)
// record canary revision --> v2
canaryRevision := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
Expect(canaryRevision).ShouldNot(Equal(stableRevision))
By("Checking Deployment updated replicas...")
Eventually(func() *apps.Deployment {
return GetCanaryDeployment(release)
}, 5*time.Minute, time.Second).ShouldNot(BeNil())
for i := range release.Spec.ReleasePlan.Batches {
By(fmt.Sprintf("\tWaiting for batch[%v] completed...", i))
batch := &release.Spec.ReleasePlan.Batches[i]
@ -1082,15 +886,13 @@ var _ = SIGDescribe("BatchRelease", func() {
deployment.Spec.Replicas = pointer.Int32Ptr(10)
deployment.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV2)
UpdateDeployment(deployment)
WaitBatchReleaseProgressing(release)
// record canary revision --> v2
canaryRevision := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
Expect(canaryRevision).ShouldNot(Equal(stableRevision))
By("Checking Deployment updated replicas...")
Eventually(func() *apps.Deployment {
return GetCanaryDeployment(release)
}, 5*time.Minute, time.Second).ShouldNot(BeNil())
for i := range release.Spec.ReleasePlan.Batches {
By(fmt.Sprintf("\tWaiting for batch[%v] completed...", i))
batch := &release.Spec.ReleasePlan.Batches[i]
@ -1152,15 +954,13 @@ var _ = SIGDescribe("BatchRelease", func() {
deployment.Spec.Replicas = pointer.Int32Ptr(5)
deployment.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV2)
UpdateDeployment(deployment)
WaitBatchReleaseProgressing(release)
// record canary revision --> v2
canaryRevision := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
Expect(canaryRevision).ShouldNot(Equal(stableRevision))
By("Checking Deployment updated replicas...")
Eventually(func() *apps.Deployment {
return GetCanaryDeployment(release)
}, 5*time.Minute, time.Second).ShouldNot(BeNil())
for i := range release.Spec.ReleasePlan.Batches {
batch := &release.Spec.ReleasePlan.Batches[i]
fetchedDeployment := &apps.Deployment{}
@ -1221,15 +1021,13 @@ var _ = SIGDescribe("BatchRelease", func() {
deployment.Spec.Replicas = pointer.Int32Ptr(5)
deployment.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.BusyBoxV2)
UpdateDeployment(deployment)
WaitBatchReleaseProgressing(release)
// record canary revision --> v2
canaryRevision := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
Expect(canaryRevision).ShouldNot(Equal(stableRevision))
By("Checking Deployment updated replicas...")
Eventually(func() *apps.Deployment {
return GetCanaryDeployment(release)
}, 5*time.Minute, time.Second).ShouldNot(BeNil())
for i := range release.Spec.ReleasePlan.Batches {
By(fmt.Sprintf("\tWaiting for batch[%v] completed...", i))
batch := &release.Spec.ReleasePlan.Batches[i]
@ -1292,6 +1090,7 @@ var _ = SIGDescribe("BatchRelease", func() {
//deployment.Spec.Replicas = pointer.Int32Ptr(10)
deployment.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.FailedImage)
UpdateDeployment(deployment)
WaitBatchReleaseProgressing(release)
// record canary revision --> v2
canaryRevisionV2 := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
@ -1342,9 +1141,10 @@ var _ = SIGDescribe("BatchRelease", func() {
// record stable revision --> v1
stableRevisionV1 := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
deployment.Spec.Paused = false
deployment.Spec.Paused = true
deployment.Spec.Template.Spec.Containers[0].Image = images.GetE2EImage(images.FailedImage)
UpdateDeployment(deployment)
WaitBatchReleaseProgressing(release)
// record canary revision --> v2
canaryRevisionV2 := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
@ -1409,6 +1209,7 @@ var _ = SIGDescribe("BatchRelease", func() {
// record canary revision --> v2
canaryRevision := workloads.ComputeHash(&deployment.Spec.Template, deployment.Status.CollisionCount)
Expect(canaryRevision).ShouldNot(Equal(stableRevision))
WaitBatchReleaseProgressing(release)
var fetchedRelease *rolloutsv1alpha1.BatchRelease
Eventually(func() bool {
@ -1440,9 +1241,6 @@ var _ = SIGDescribe("BatchRelease", func() {
}, 15*time.Minute, 5*time.Second).Should(Equal(rolloutsv1alpha1.RolloutPhaseCompleted))
By("Checking all pod were updated when release completed...")
Eventually(func() *apps.Deployment {
return GetCanaryDeployment(release)
}, 5*time.Minute, time.Second).ShouldNot(BeNil())
Expect(GetObject(deployment.Namespace, deployment.Name, deployment)).NotTo(HaveOccurred())
Eventually(func() int32 {
canary := GetCanaryDeployment(release)

View File

@ -160,7 +160,7 @@ var _ = SIGDescribe("Rollout", func() {
Expect(GetObject(deployment.Name, clone)).NotTo(HaveOccurred())
return clone.Status.ObservedGeneration == clone.Generation &&
*clone.Spec.Replicas == clone.Status.ReadyReplicas && *clone.Spec.Replicas == clone.Status.Replicas
}, 5*time.Minute, time.Second).Should(BeTrue())
}, 10*time.Minute, time.Second).Should(BeTrue())
}
WaitRolloutCanaryStepPaused := func(name string, stepIndex int32) {

View File

@ -28,7 +28,8 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
kruise "github.com/openkruise/kruise-api/apps/v1alpha1"
kruisev1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
kruisev1beta1 "github.com/openkruise/kruise-api/apps/v1beta1"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
crdv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -63,7 +64,9 @@ var _ = BeforeSuite(func(done Done) {
Expect(err).Should(BeNil())
err = crdv1.AddToScheme(scheme)
Expect(err).Should(BeNil())
err = kruise.AddToScheme(scheme)
err = kruisev1beta1.AddToScheme(scheme)
Expect(err).Should(BeNil())
err = kruisev1alpha1.AddToScheme(scheme)
Expect(err).Should(BeNil())
By("Setting up kubernetes client")
k8sClient, err = client.New(config.GetConfigOrDie(), client.Options{Scheme: scheme})

View File

@ -4,7 +4,6 @@ metadata:
name: release-deployment-number-100
spec:
targetReference:
type: workloadRef
workloadRef:
apiVersion: apps/v1
kind: Deployment

View File

@ -4,13 +4,11 @@ metadata:
name: rollouts-demo
spec:
objectRef:
type: workloadRef
workloadRef:
apiVersion: apps/v1
kind: Deployment
name: echoserver
strategy:
type: canary
canary:
steps:
- weight: 20