fix goroutine race bug

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
This commit is contained in:
mingzhou.swx 2022-08-05 13:06:38 +08:00
parent 794003c150
commit 7a4631c1ea
12 changed files with 265 additions and 294 deletions

View File

@ -184,12 +184,9 @@ func (r *BatchReleaseReconciler) Reconcile(ctx context.Context, req ctrl.Request
return reconcile.Result{}, err
}
// set the release info for executor before executing.
r.executor.SetReleaseInfo(release)
// executor start to execute the batch release plan.
startTimestamp := time.Now()
result, currentStatus, err := r.executor.Do()
result, currentStatus, err := r.executor.Do(release)
if err != nil {
return reconcile.Result{}, err
}

View File

@ -44,65 +44,45 @@ const (
type Executor struct {
client client.Client
recorder record.EventRecorder
release *v1alpha1.BatchRelease
releasePlan *v1alpha1.ReleasePlan
releaseStatus *v1alpha1.BatchReleaseStatus
releaseKey types.NamespacedName
workloadKey types.NamespacedName
}
// NewReleasePlanExecutor creates a RolloutPlanController
func NewReleasePlanExecutor(client client.Client, recorder record.EventRecorder) *Executor {
func NewReleasePlanExecutor(cli client.Client, recorder record.EventRecorder) *Executor {
return &Executor{
client: client,
client: cli,
recorder: recorder,
}
}
func (r *Executor) SetReleaseInfo(release *v1alpha1.BatchRelease) {
r.release = release
r.releaseStatus = release.Status.DeepCopy()
r.releasePlan = release.Spec.ReleasePlan.DeepCopy()
initializeStatusIfNeeds(r.releaseStatus)
r.releaseKey = client.ObjectKeyFromObject(release)
if release.Spec.TargetRef.WorkloadRef != nil {
r.workloadKey = types.NamespacedName{
Namespace: release.Namespace,
Name: release.Spec.TargetRef.WorkloadRef.Name,
}
}
}
// Do execute the release plan
func (r *Executor) Do() (reconcile.Result, *v1alpha1.BatchReleaseStatus, error) {
func (r *Executor) Do(release *v1alpha1.BatchRelease) (reconcile.Result, *v1alpha1.BatchReleaseStatus, error) {
klog.InfoS("Starting one round of reconciling release plan",
"BatchRelease", client.ObjectKeyFromObject(r.release),
"phase", r.releaseStatus.Phase,
"current-batch", r.releaseStatus.CanaryStatus.CurrentBatch,
"current-batch-state", r.releaseStatus.CanaryStatus.CurrentBatchState)
"BatchRelease", client.ObjectKeyFromObject(release),
"phase", release.Status.Phase,
"current-batch", release.Status.CanaryStatus.CurrentBatch,
"current-batch-state", release.Status.CanaryStatus.CurrentBatchState)
workloadController, err := r.GetWorkloadController()
newStatus := getInitializedStatus(&release.Status)
workloadController, err := r.getWorkloadController(release, newStatus)
if err != nil || workloadController == nil {
return reconcile.Result{}, r.releaseStatus, nil
return reconcile.Result{}, nil, nil
}
shouldStopThisRound, result, err := r.checkHealthBeforeExecution(workloadController)
if shouldStopThisRound || err != nil {
return result, r.releaseStatus, err
stop, result, err := r.syncStatusBeforeExecuting(release, newStatus, workloadController)
if stop || err != nil {
return result, newStatus, err
}
return r.executeBatchReleasePlan(workloadController)
return r.executeBatchReleasePlan(release, newStatus, workloadController)
}
func (r *Executor) executeBatchReleasePlan(workloadController workloads.WorkloadController) (reconcile.Result, *v1alpha1.BatchReleaseStatus, error) {
func (r *Executor) executeBatchReleasePlan(release *v1alpha1.BatchRelease, newStatus *v1alpha1.BatchReleaseStatus, workloadController workloads.WorkloadController) (reconcile.Result, *v1alpha1.BatchReleaseStatus, error) {
var err error
status := r.releaseStatus
result := reconcile.Result{}
klog.V(3).Infof("BatchRelease(%v) State Machine into '%s' state", r.releaseKey, status.Phase)
klog.V(3).Infof("BatchRelease(%v) State Machine into '%s' state", klog.KObj(release), newStatus.Phase)
switch status.Phase {
switch newStatus.Phase {
case v1alpha1.RolloutPhaseInitial:
// if this batchRelease was created but workload doest not exist,
// should keep this phase and do nothing util workload is created.
@ -113,11 +93,11 @@ func (r *Executor) executeBatchReleasePlan(workloadController workloads.Workload
verifiedDone, err = workloadController.VerifyWorkload()
switch {
case err != nil:
setCondition(status, v1alpha1.VerifyingBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
setCondition(newStatus, v1alpha1.VerifyingBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
case verifiedDone:
status.Phase = v1alpha1.RolloutPhasePreparing
newStatus.Phase = v1alpha1.RolloutPhasePreparing
result = reconcile.Result{RequeueAfter: DefaultDuration}
setCondition(status, v1alpha1.PreparingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is preparing for progress")
setCondition(newStatus, v1alpha1.PreparingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is preparing for progress")
}
case v1alpha1.RolloutPhasePreparing:
@ -126,21 +106,21 @@ func (r *Executor) executeBatchReleasePlan(workloadController workloads.Workload
preparedDone, err = workloadController.PrepareBeforeProgress()
switch {
case err != nil:
setCondition(status, v1alpha1.PreparingBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
setCondition(newStatus, v1alpha1.PreparingBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
case preparedDone:
status.Phase = v1alpha1.RolloutPhaseProgressing
newStatus.Phase = v1alpha1.RolloutPhaseProgressing
result = reconcile.Result{RequeueAfter: DefaultDuration}
setCondition(status, v1alpha1.ProgressingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is progressing")
setCondition(newStatus, v1alpha1.ProgressingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is progressing")
}
case v1alpha1.RolloutPhaseProgressing:
// progress the release plan in this state.
var progressDone bool
progressDone, result, err = r.progressBatches(workloadController)
progressDone, result, err = r.progressBatches(release, newStatus, workloadController)
switch {
case progressDone:
status.Phase = v1alpha1.RolloutPhaseFinalizing
setCondition(status, v1alpha1.FinalizingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is finalizing")
newStatus.Phase = v1alpha1.RolloutPhaseFinalizing
setCondition(newStatus, v1alpha1.FinalizingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is finalizing")
}
case v1alpha1.RolloutPhaseFinalizing:
@ -151,14 +131,14 @@ func (r *Executor) executeBatchReleasePlan(workloadController workloads.Workload
finalizedDone, err = workloadController.FinalizeProgress(false)
switch {
case err != nil:
setCondition(status, v1alpha1.CompletedBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
setCondition(newStatus, v1alpha1.CompletedBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
case finalizedDone:
if IsAllBatchReady(r.releasePlan, r.releaseStatus) {
status.Phase = v1alpha1.RolloutPhaseCompleted
setCondition(status, v1alpha1.CompletedBatchReleaseCondition, v1.ConditionTrue, v1alpha1.SucceededBatchReleaseConditionReason, "BatchRelease is completed")
if IsAllBatchReady(release) {
newStatus.Phase = v1alpha1.RolloutPhaseCompleted
setCondition(newStatus, v1alpha1.CompletedBatchReleaseCondition, v1.ConditionTrue, v1alpha1.SucceededBatchReleaseConditionReason, "BatchRelease is completed")
} else {
status.Phase = v1alpha1.RolloutPhaseCancelled
setCondition(status, v1alpha1.CancelledBatchReleaseCondition, v1.ConditionTrue, v1alpha1.SucceededBatchReleaseConditionReason, "BatchRelease is cancelled")
newStatus.Phase = v1alpha1.RolloutPhaseCancelled
setCondition(newStatus, v1alpha1.CancelledBatchReleaseCondition, v1.ConditionTrue, v1alpha1.SucceededBatchReleaseConditionReason, "BatchRelease is cancelled")
}
default:
result = reconcile.Result{RequeueAfter: DefaultDuration}
@ -169,9 +149,9 @@ func (r *Executor) executeBatchReleasePlan(workloadController workloads.Workload
finalizedDone, err = workloadController.FinalizeProgress(true)
switch {
case err != nil:
setCondition(status, v1alpha1.CompletedBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
setCondition(newStatus, v1alpha1.CompletedBatchReleaseCondition, v1.ConditionFalse, v1alpha1.FailedBatchReleaseConditionReason, err.Error())
case finalizedDone:
setCondition(status, v1alpha1.TerminatedBatchReleaseCondition, v1.ConditionTrue, v1alpha1.SucceededBatchReleaseConditionReason, "BatchRelease is terminated")
setCondition(newStatus, v1alpha1.TerminatedBatchReleaseCondition, v1.ConditionTrue, v1alpha1.SucceededBatchReleaseConditionReason, "BatchRelease is terminated")
default:
result = reconcile.Result{RequeueAfter: DefaultDuration}
}
@ -180,33 +160,32 @@ func (r *Executor) executeBatchReleasePlan(workloadController workloads.Workload
// this state indicates that the plan is executed/cancelled successfully, should do nothing in these states.
default:
klog.V(3).Infof("BatchRelease(%v) State Machine into %s state", r.releaseKey, "Unknown")
panic(fmt.Sprintf("illegal release status %+v", status))
klog.V(3).Infof("BatchRelease(%v) State Machine into %s state", klog.KObj(release), "Unknown")
panic(fmt.Sprintf("illegal release status %+v", newStatus))
}
return result, status, err
return result, newStatus, err
}
// reconcile logic when we are in the middle of release, we have to go through finalizing state before succeed or fail
func (r *Executor) progressBatches(workloadController workloads.WorkloadController) (bool, reconcile.Result, error) {
func (r *Executor) progressBatches(release *v1alpha1.BatchRelease, newStatus *v1alpha1.BatchReleaseStatus, workloadController workloads.WorkloadController) (bool, reconcile.Result, error) {
var err error
progressDone := false
status := r.releaseStatus
result := reconcile.Result{}
klog.V(3).Infof("BatchRelease(%v) Canary Batch State Machine into '%s' state", r.releaseKey, status.CanaryStatus.CurrentBatchState)
klog.V(3).Infof("BatchRelease(%v) Canary Batch State Machine into '%s' state", klog.KObj(release), newStatus.CanaryStatus.CurrentBatchState)
switch status.CanaryStatus.CurrentBatchState {
switch newStatus.CanaryStatus.CurrentBatchState {
case "", v1alpha1.UpgradingBatchState:
// modify workload replicas/partition based on release plan in this state.
upgradeDone, upgradeErr := workloadController.UpgradeOneBatch()
switch {
case upgradeErr != nil:
err = upgradeErr
setCondition(status, "Progressing", v1.ConditionFalse, "UpgradeBatchFailed", err.Error())
setCondition(newStatus, "Progressing", v1.ConditionFalse, "UpgradeBatchFailed", err.Error())
case upgradeDone:
result = reconcile.Result{RequeueAfter: DefaultDuration}
status.CanaryStatus.CurrentBatchState = v1alpha1.VerifyingBatchState
newStatus.CanaryStatus.CurrentBatchState = v1alpha1.VerifyingBatchState
}
case v1alpha1.VerifyingBatchState:
@ -216,47 +195,47 @@ func (r *Executor) progressBatches(workloadController workloads.WorkloadControll
switch {
case verifiedErr != nil:
err = verifiedErr
setCondition(status, "Progressing", v1.ConditionFalse, "VerifyBatchFailed", err.Error())
setCondition(newStatus, "Progressing", v1.ConditionFalse, "VerifyBatchFailed", err.Error())
case verified:
now := metav1.Now()
status.CanaryStatus.BatchReadyTime = &now
newStatus.CanaryStatus.BatchReadyTime = &now
result = reconcile.Result{RequeueAfter: DefaultDuration}
status.CanaryStatus.CurrentBatchState = v1alpha1.ReadyBatchState
newStatus.CanaryStatus.CurrentBatchState = v1alpha1.ReadyBatchState
default:
status.CanaryStatus.CurrentBatchState = v1alpha1.UpgradingBatchState
newStatus.CanaryStatus.CurrentBatchState = v1alpha1.UpgradingBatchState
}
case v1alpha1.ReadyBatchState:
if !IsPartitioned(r.releasePlan, r.releaseStatus) {
if !IsPartitioned(release) {
currentTimestamp := time.Now()
currentBatch := r.releasePlan.Batches[r.releaseStatus.CanaryStatus.CurrentBatch]
currentBatch := release.Spec.ReleasePlan.Batches[release.Status.CanaryStatus.CurrentBatch]
waitDuration := time.Duration(currentBatch.PauseSeconds) * time.Second
if waitDuration > 0 && r.releaseStatus.CanaryStatus.BatchReadyTime.Time.Add(waitDuration).After(currentTimestamp) {
restDuration := r.releaseStatus.CanaryStatus.BatchReadyTime.Time.Add(waitDuration).Sub(currentTimestamp)
if waitDuration > 0 && release.Status.CanaryStatus.BatchReadyTime.Time.Add(waitDuration).After(currentTimestamp) {
restDuration := release.Status.CanaryStatus.BatchReadyTime.Time.Add(waitDuration).Sub(currentTimestamp)
result = reconcile.Result{RequeueAfter: restDuration}
setCondition(status, "Progressing", v1.ConditionFalse, "Paused", fmt.Sprintf("BatchRelease will resume after %v", restDuration))
klog.Infof("BatchRelease (%v) paused and will continue to reconcile after %v", r.releaseKey, restDuration)
setCondition(newStatus, "Progressing", v1.ConditionFalse, "Paused", fmt.Sprintf("BatchRelease will resume after %v", restDuration))
klog.Infof("BatchRelease (%v) paused and will continue to reconcile after %v", klog.KObj(release), restDuration)
} else {
// expected pods in the batch are upgraded and the state is ready, then try to move to the next batch
progressDone = r.moveToNextBatch()
progressDone = r.moveToNextBatch(release, newStatus)
result = reconcile.Result{RequeueAfter: DefaultDuration}
setCondition(status, v1alpha1.ProgressingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is progressing")
setCondition(newStatus, v1alpha1.ProgressingBatchReleaseCondition, v1.ConditionTrue, "", "BatchRelease is progressing")
}
} else {
setCondition(status, "Progressing", v1.ConditionFalse, "Paused", fmt.Sprintf("BatchRelease is partitioned in %v-th batch", status.CanaryStatus.CurrentBatch))
setCondition(newStatus, "Progressing", v1.ConditionFalse, "Paused", fmt.Sprintf("BatchRelease is partitioned in %v-th batch", newStatus.CanaryStatus.CurrentBatch))
}
default:
klog.V(3).Infof("ReleasePlan(%v) Batch State Machine into %s state", "Unknown")
panic(fmt.Sprintf("illegal status %+v", r.releaseStatus))
panic(fmt.Sprintf("illegal status %+v", newStatus))
}
return progressDone, result, err
}
// GetWorkloadController pick the right workload controller to work on the workload
func (r *Executor) GetWorkloadController() (workloads.WorkloadController, error) {
targetRef := r.release.Spec.TargetRef.WorkloadRef
func (r *Executor) getWorkloadController(release *v1alpha1.BatchRelease, newStatus *v1alpha1.BatchReleaseStatus) (workloads.WorkloadController, error) {
targetRef := release.Spec.TargetRef.WorkloadRef
if targetRef == nil {
return nil, nil
}
@ -264,12 +243,12 @@ func (r *Executor) GetWorkloadController() (workloads.WorkloadController, error)
gvk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind)
if !util.IsSupportedWorkload(gvk) {
message := fmt.Sprintf("the workload type '%v' is not supported", gvk)
r.recorder.Event(r.release, v1.EventTypeWarning, "UnsupportedWorkload", message)
r.recorder.Event(release, v1.EventTypeWarning, "UnsupportedWorkload", message)
return nil, fmt.Errorf(message)
}
targetKey := types.NamespacedName{
Namespace: r.release.Namespace,
Namespace: release.Namespace,
Name: targetRef.Name,
}
@ -277,31 +256,31 @@ func (r *Executor) GetWorkloadController() (workloads.WorkloadController, error)
case appsv1alpha1.GroupVersion.String():
if targetRef.Kind == reflect.TypeOf(appsv1alpha1.CloneSet{}).Name() {
klog.InfoS("using cloneset batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
return workloads.NewCloneSetRolloutController(r.client, r.recorder, r.release, r.releasePlan, r.releaseStatus, targetKey), nil
return workloads.NewCloneSetRolloutController(r.client, r.recorder, release, newStatus, targetKey), nil
}
case apps.SchemeGroupVersion.String():
if targetRef.Kind == reflect.TypeOf(apps.Deployment{}).Name() {
klog.InfoS("using deployment batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
return workloads.NewDeploymentRolloutController(r.client, r.recorder, r.release, r.releasePlan, r.releaseStatus, targetKey), nil
return workloads.NewDeploymentRolloutController(r.client, r.recorder, release, newStatus, targetKey), nil
}
}
klog.InfoS("using statefulset-like batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
return workloads.NewUnifiedWorkloadRolloutControlPlane(workloads.NewStatefulSetLikeController, r.client, r.recorder, r.release, r.releaseStatus, targetKey, gvk), nil
return workloads.NewUnifiedWorkloadRolloutControlPlane(workloads.NewStatefulSetLikeController, r.client, r.recorder, release, newStatus, targetKey, gvk), nil
}
func (r *Executor) moveToNextBatch() bool {
currentBatch := int(r.releaseStatus.CanaryStatus.CurrentBatch)
if currentBatch >= len(r.releasePlan.Batches)-1 {
klog.V(3).Infof("BatchRelease(%v) finished all batch, release current batch: %v", r.releaseKey, r.releaseStatus.CanaryStatus.CurrentBatch)
func (r *Executor) moveToNextBatch(release *v1alpha1.BatchRelease, status *v1alpha1.BatchReleaseStatus) bool {
currentBatch := int(status.CanaryStatus.CurrentBatch)
if currentBatch >= len(release.Spec.ReleasePlan.Batches)-1 {
klog.V(3).Infof("BatchRelease(%v) finished all batch, release current batch: %v", klog.KObj(release), status.CanaryStatus.CurrentBatch)
return true
} else {
if r.releasePlan.BatchPartition == nil || *r.releasePlan.BatchPartition > r.releaseStatus.CanaryStatus.CurrentBatch {
r.releaseStatus.CanaryStatus.CurrentBatch++
if release.Spec.ReleasePlan.BatchPartition == nil || *release.Spec.ReleasePlan.BatchPartition > status.CanaryStatus.CurrentBatch {
status.CanaryStatus.CurrentBatch++
}
r.releaseStatus.CanaryStatus.CurrentBatchState = v1alpha1.UpgradingBatchState
klog.V(3).Infof("BatchRelease(%v) finished one batch, release current batch: %v", r.releaseKey, r.releaseStatus.CanaryStatus.CurrentBatch)
status.CanaryStatus.CurrentBatchState = v1alpha1.UpgradingBatchState
klog.V(3).Infof("BatchRelease(%v) finished one batch, release current batch: %v", klog.KObj(release), status.CanaryStatus.CurrentBatch)
return false
}
}

View File

@ -29,15 +29,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadController) (bool, reconcile.Result, error) {
func (r *Executor) syncStatusBeforeExecuting(release *v1alpha1.BatchRelease, newStatus *v1alpha1.BatchReleaseStatus, controller workloads.WorkloadController) (bool, reconcile.Result, error) {
var err error
var reason string
var message string
var needRetry bool
needStopThisRound := false
result := reconcile.Result{}
oldStatus := r.releaseStatus.DeepCopy()
// sync the workload info and watch the workload change event
workloadEvent, workloadInfo, err := controller.SyncWorkloadInfo()
@ -51,25 +49,25 @@ func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadContr
// (2). Plan is paused during rollout
// (3). Plan is changed during rollout
// (4). Plan status is unexpected/unhealthy
case isPlanTerminating(r.release, r.releaseStatus):
case isPlanTerminating(release):
// handle the case that the plan is deleted or is terminating
reason = "PlanTerminating"
message = "Release plan is deleted or cancelled, then terminate"
signalTerminating(r.releaseStatus)
signalTerminating(newStatus)
case isPlanPaused(workloadEvent, r.release, r.releaseStatus):
case isPlanPaused(workloadEvent, release):
// handle the case that releasePlan.paused = true
reason = "PlanPaused"
message = "release plan is paused, then stop reconcile"
needStopThisRound = true
case isPlanChanged(r.releasePlan, r.releaseStatus):
case isPlanChanged(release):
// handle the case that release plan is changed during progressing
reason = "PlanChanged"
message = "release plan is changed, then recalculate status"
signalRecalculate(r.release, r.releaseStatus)
signalRecalculate(release, newStatus)
case isPlanUnhealthy(r.releasePlan, r.releaseStatus):
case isPlanUnhealthy(release):
// handle the case that release status is chaos which may lead to panic
reason = "PlanStatusUnhealthy"
message = "release plan is unhealthy, then restart"
@ -92,40 +90,40 @@ func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadContr
reason = "GetWorkloadError"
message = err.Error()
case isWorkloadGone(workloadEvent, r.releaseStatus):
case isWorkloadGone(workloadEvent, release):
// handle the case that the workload is deleted
reason = "WorkloadGone"
message = "target workload has gone, then terminate"
signalTerminating(r.releaseStatus)
signalTerminating(newStatus)
case isWorkloadLocated(err, r.releaseStatus):
case isWorkloadLocated(err, release):
// handle the case that workload is newly created
reason = "WorkloadLocated"
message = "workload is located, then start"
signalLocated(r.releaseStatus)
signalLocated(newStatus)
case isWorkloadScaling(workloadEvent, r.releaseStatus):
case isWorkloadScaling(workloadEvent, release):
// handle the case that workload is scaling during progressing
reason = "ReplicasChanged"
message = "workload is scaling, then reinitialize batch status"
signalReinitializeBatch(r.releaseStatus)
signalReinitializeBatch(newStatus)
// we must ensure that this field is updated only when we have observed
// the workload scaling event, otherwise this event may be lost.
r.releaseStatus.ObservedWorkloadReplicas = *workloadInfo.Replicas
newStatus.ObservedWorkloadReplicas = *workloadInfo.Replicas
case isWorkloadChanged(workloadEvent, r.releaseStatus):
case isWorkloadChanged(workloadEvent, release):
// handle the case of continuous release v1 -> v2 -> v3
reason = "TargetRevisionChanged"
message = "workload revision was changed, then abort"
signalFinalize(r.releaseStatus)
signalFinalize(newStatus)
case isWorkloadUnhealthy(workloadEvent, r.releaseStatus):
case isWorkloadUnhealthy(workloadEvent, release):
// handle the case that workload is unhealthy, and rollout plan cannot go on
reason = "WorkloadUnHealthy"
message = "workload is UnHealthy, then stop"
needStopThisRound = true
case isWorkloadUnstable(workloadEvent, r.releaseStatus):
case isWorkloadUnstable(workloadEvent, release):
// handle the case that workload.Generation != workload.Status.ObservedGeneration
reason = "WorkloadNotStable"
message = "workload status is not stable, then wait"
@ -134,18 +132,18 @@ func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadContr
// log the special event info
if len(message) > 0 {
r.recorder.Eventf(r.release, v1.EventTypeWarning, reason, message)
klog.Warningf("Special case occurred in BatchRelease(%v), message: %v", r.releaseKey, message)
r.recorder.Eventf(release, v1.EventTypeWarning, reason, message)
klog.Warningf("Special case occurred in BatchRelease(%v), message: %v", klog.KObj(release), message)
}
// sync workload info with status
refreshStatus(r.release, r.releaseStatus, workloadInfo)
refreshStatus(release, newStatus, workloadInfo)
// If it needs to retry or status phase or state changed, should
// stop and retry. This is because we must ensure that the phase
// and state is persistent in ETCD, or will lead to the chaos of
// state machine.
if !reflect.DeepEqual(oldStatus, r.releaseStatus) {
if !reflect.DeepEqual(&release.Status, newStatus) {
needRetry = true
}
@ -172,46 +170,46 @@ func refreshStatus(release *v1alpha1.BatchRelease, newStatus *v1alpha1.BatchRele
}
}
func isPlanTerminating(release *v1alpha1.BatchRelease, status *v1alpha1.BatchReleaseStatus) bool {
return release.DeletionTimestamp != nil || status.Phase == v1alpha1.RolloutPhaseTerminating
func isPlanTerminating(release *v1alpha1.BatchRelease) bool {
return release.DeletionTimestamp != nil || release.Status.Phase == v1alpha1.RolloutPhaseTerminating
}
func isPlanChanged(plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseStatus) bool {
return status.ObservedReleasePlanHash != util.HashReleasePlanBatches(plan) && status.Phase == v1alpha1.RolloutPhaseProgressing
func isPlanChanged(release *v1alpha1.BatchRelease) bool {
return release.Status.ObservedReleasePlanHash != util.HashReleasePlanBatches(&release.Spec.ReleasePlan) && release.Status.Phase == v1alpha1.RolloutPhaseProgressing
}
func isPlanUnhealthy(plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseStatus) bool {
return int(status.CanaryStatus.CurrentBatch) >= len(plan.Batches) && status.Phase == v1alpha1.RolloutPhaseProgressing
func isPlanUnhealthy(release *v1alpha1.BatchRelease) bool {
return int(release.Status.CanaryStatus.CurrentBatch) >= len(release.Spec.ReleasePlan.Batches) && release.Status.Phase == v1alpha1.RolloutPhaseProgressing
}
func isPlanPaused(event workloads.WorkloadEventType, release *v1alpha1.BatchRelease, status *v1alpha1.BatchReleaseStatus) bool {
return release.Spec.Paused && status.Phase == v1alpha1.RolloutPhaseProgressing && !isWorkloadGone(event, status)
func isPlanPaused(event workloads.WorkloadEventType, release *v1alpha1.BatchRelease) bool {
return release.Spec.Paused && release.Status.Phase == v1alpha1.RolloutPhaseProgressing && !isWorkloadGone(event, release)
}
func isGetWorkloadInfoError(err error) bool {
return err != nil && !errors.IsNotFound(err)
}
func isWorkloadLocated(err error, status *v1alpha1.BatchReleaseStatus) bool {
return err == nil && status.Phase == v1alpha1.RolloutPhaseInitial
func isWorkloadLocated(err error, release *v1alpha1.BatchRelease) bool {
return err == nil && release.Status.Phase == v1alpha1.RolloutPhaseInitial
}
func isWorkloadGone(event workloads.WorkloadEventType, status *v1alpha1.BatchReleaseStatus) bool {
return event == workloads.WorkloadHasGone && status.Phase != v1alpha1.RolloutPhaseInitial
func isWorkloadGone(event workloads.WorkloadEventType, release *v1alpha1.BatchRelease) bool {
return event == workloads.WorkloadHasGone && release.Status.Phase != v1alpha1.RolloutPhaseInitial
}
func isWorkloadScaling(event workloads.WorkloadEventType, status *v1alpha1.BatchReleaseStatus) bool {
return event == workloads.WorkloadReplicasChanged && status.Phase == v1alpha1.RolloutPhaseProgressing
func isWorkloadScaling(event workloads.WorkloadEventType, release *v1alpha1.BatchRelease) bool {
return event == workloads.WorkloadReplicasChanged && release.Status.Phase == v1alpha1.RolloutPhaseProgressing
}
func isWorkloadChanged(event workloads.WorkloadEventType, status *v1alpha1.BatchReleaseStatus) bool {
return event == workloads.WorkloadPodTemplateChanged && status.Phase == v1alpha1.RolloutPhaseProgressing
func isWorkloadChanged(event workloads.WorkloadEventType, release *v1alpha1.BatchRelease) bool {
return event == workloads.WorkloadPodTemplateChanged && release.Status.Phase == v1alpha1.RolloutPhaseProgressing
}
func isWorkloadUnhealthy(event workloads.WorkloadEventType, status *v1alpha1.BatchReleaseStatus) bool {
return event == workloads.WorkloadUnHealthy && status.Phase == v1alpha1.RolloutPhaseProgressing
func isWorkloadUnhealthy(event workloads.WorkloadEventType, release *v1alpha1.BatchRelease) bool {
return event == workloads.WorkloadUnHealthy && release.Status.Phase == v1alpha1.RolloutPhaseProgressing
}
func isWorkloadUnstable(event workloads.WorkloadEventType, _ *v1alpha1.BatchReleaseStatus) bool {
func isWorkloadUnstable(event workloads.WorkloadEventType, _ *v1alpha1.BatchRelease) bool {
return event == workloads.WorkloadStillReconciling
}

View File

@ -36,10 +36,12 @@ func HasTerminatingCondition(status v1alpha1.BatchReleaseStatus) bool {
return false
}
func initializeStatusIfNeeds(status *v1alpha1.BatchReleaseStatus) {
func getInitializedStatus(status *v1alpha1.BatchReleaseStatus) *v1alpha1.BatchReleaseStatus {
newStatus := status.DeepCopy()
if len(status.Phase) == 0 {
resetStatus(status)
resetStatus(newStatus)
}
return newStatus
}
func signalReinitializeBatch(status *v1alpha1.BatchReleaseStatus) {
@ -120,10 +122,10 @@ func setCondition(status *v1alpha1.BatchReleaseStatus, condType v1alpha1.Rollout
}
}
func IsPartitioned(plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseStatus) bool {
return plan.BatchPartition != nil && *plan.BatchPartition <= status.CanaryStatus.CurrentBatch
func IsPartitioned(release *v1alpha1.BatchRelease) bool {
return release.Spec.ReleasePlan.BatchPartition != nil && *release.Spec.ReleasePlan.BatchPartition <= release.Status.CanaryStatus.CurrentBatch
}
func IsAllBatchReady(plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseStatus) bool {
return len(plan.Batches)-1 == int(status.CanaryStatus.CurrentBatch) && status.CanaryStatus.CurrentBatchState == v1alpha1.ReadyBatchState
func IsAllBatchReady(release *v1alpha1.BatchRelease) bool {
return len(release.Spec.ReleasePlan.Batches)-1 == int(release.Status.CanaryStatus.CurrentBatch) && release.Status.CanaryStatus.CurrentBatchState == v1alpha1.ReadyBatchState
}

View File

@ -39,15 +39,14 @@ type CloneSetRolloutController struct {
}
// NewCloneSetRolloutController creates a new CloneSet rollout controller
func NewCloneSetRolloutController(cli client.Client, recorder record.EventRecorder, release *v1alpha1.BatchRelease, plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseStatus, targetNamespacedName types.NamespacedName) *CloneSetRolloutController {
func NewCloneSetRolloutController(cli client.Client, recorder record.EventRecorder, release *v1alpha1.BatchRelease, newStatus *v1alpha1.BatchReleaseStatus, targetNamespacedName types.NamespacedName) *CloneSetRolloutController {
return &CloneSetRolloutController{
cloneSetController: cloneSetController{
workloadController: workloadController{
client: cli,
recorder: recorder,
parentController: release,
releasePlan: plan,
releaseStatus: status,
client: cli,
recorder: recorder,
release: release,
newStatus: newStatus,
},
releasePlanKey: client.ObjectKeyFromObject(release),
targetNamespacedName: targetNamespacedName,
@ -61,7 +60,7 @@ func (c *CloneSetRolloutController) VerifyWorkload() (bool, error) {
var message string
defer func() {
if err != nil {
c.recorder.Event(c.parentController, v1.EventTypeWarning, "VerifyFailed", err.Error())
c.recorder.Event(c.release, v1.EventTypeWarning, "VerifyFailed", err.Error())
} else if message != "" {
klog.Warningf(message)
}
@ -90,7 +89,7 @@ func (c *CloneSetRolloutController) VerifyWorkload() (bool, error) {
return false, nil
}
c.recorder.Event(c.parentController, v1.EventTypeNormal, "VerifiedSuccessfully", "ReleasePlan and the CloneSet resource are verified")
c.recorder.Event(c.release, v1.EventTypeNormal, "VerifiedSuccessfully", "ReleasePlan and the CloneSet resource are verified")
return true, nil
}
@ -108,7 +107,7 @@ func (c *CloneSetRolloutController) PrepareBeforeProgress() (bool, error) {
// record revisions and replicas info to BatchRelease.Status
c.recordCloneSetRevisionAndReplicas()
c.recorder.Event(c.parentController, v1.EventTypeNormal, "InitializedSuccessfully", "Rollout resource are initialized")
c.recorder.Event(c.release, v1.EventTypeNormal, "InitializedSuccessfully", "Rollout resource are initialized")
return true, nil
}
@ -119,7 +118,7 @@ func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) {
return false, err
}
if c.releaseStatus.ObservedWorkloadReplicas == 0 {
if c.newStatus.ObservedWorkloadReplicas == 0 {
klog.Infof("BatchRelease(%v) observed workload replicas is 0, no need to upgrade", c.releasePlanKey)
return true, nil
}
@ -129,25 +128,25 @@ func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) {
return false, nil
}
currentBatch := c.parentController.Status.CanaryStatus.CurrentBatch
currentBatch := c.release.Status.CanaryStatus.CurrentBatch
// the number of canary pods should have in current batch
canaryGoal := c.calculateCurrentCanary(c.releaseStatus.ObservedWorkloadReplicas)
canaryGoal := c.calculateCurrentCanary(c.newStatus.ObservedWorkloadReplicas)
// the number of stable pods should have in current batch
stableGoal := c.calculateCurrentStable(c.releaseStatus.ObservedWorkloadReplicas)
stableGoal := c.calculateCurrentStable(c.newStatus.ObservedWorkloadReplicas)
// the number of canary pods now we have in current state
currentCanaryReplicas := c.clone.Status.UpdatedReplicas
// workload partition calculated
workloadPartition, _ := intstr.GetValueFromIntOrPercent(c.clone.Spec.UpdateStrategy.Partition,
int(c.releaseStatus.ObservedWorkloadReplicas), true)
int(c.newStatus.ObservedWorkloadReplicas), true)
// if canaryReplicas is int, then we use int;
// if canaryReplicas is percentage, then we use percentage.
var partitionGoal intstr.IntOrString
canaryIntOrStr := c.releasePlan.Batches[c.releaseStatus.CanaryStatus.CurrentBatch].CanaryReplicas
canaryIntOrStr := c.release.Spec.ReleasePlan.Batches[c.newStatus.CanaryStatus.CurrentBatch].CanaryReplicas
if canaryIntOrStr.Type == intstr.Int {
partitionGoal = intstr.FromInt(int(stableGoal))
} else if c.releaseStatus.ObservedWorkloadReplicas > 0 {
partitionGoal = ParseIntegerAsPercentageIfPossible(stableGoal, c.releaseStatus.ObservedWorkloadReplicas, &canaryIntOrStr)
} else if c.newStatus.ObservedWorkloadReplicas > 0 {
partitionGoal = ParseIntegerAsPercentageIfPossible(stableGoal, c.newStatus.ObservedWorkloadReplicas, &canaryIntOrStr)
}
klog.V(3).InfoS("upgraded one batch, current info:",
@ -176,8 +175,8 @@ func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) {
return false, err
}
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "SetBatchDone",
"Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch)
c.recorder.Eventf(c.release, v1.EventTypeNormal, "SetBatchDone",
"Finished submitting all upgrade quests for batch %d", c.newStatus.CanaryStatus.CurrentBatch)
return true, nil
}
@ -200,12 +199,12 @@ func (c *CloneSetRolloutController) CheckOneBatchReady() (bool, error) {
canaryReadyReplicas := c.clone.Status.UpdatedReadyReplicas
// the number of expected stable pods should have in current batch, but this number may
// be inconsistent with the real canary goal due to the accuracy of percent-type partition
expectedStableGoal := c.calculateCurrentStable(c.releaseStatus.ObservedWorkloadReplicas)
expectedStableGoal := c.calculateCurrentStable(c.newStatus.ObservedWorkloadReplicas)
// the number of the real canary pods should have in current batch
originalGoal := &c.releasePlan.Batches[c.releaseStatus.CanaryStatus.CurrentBatch].CanaryReplicas
canaryGoal := CalculateRealCanaryReplicasGoal(expectedStableGoal, c.releaseStatus.ObservedWorkloadReplicas, originalGoal)
originalGoal := &c.release.Spec.ReleasePlan.Batches[c.newStatus.CanaryStatus.CurrentBatch].CanaryReplicas
canaryGoal := CalculateRealCanaryReplicasGoal(expectedStableGoal, c.newStatus.ObservedWorkloadReplicas, originalGoal)
// the number of the real stable pods should have in current batch
stableGoal := c.releaseStatus.ObservedWorkloadReplicas - canaryGoal
stableGoal := c.newStatus.ObservedWorkloadReplicas - canaryGoal
// the number of max unavailable canary pods allowed by this workload
maxUnavailable := 0
if c.clone.Spec.UpdateStrategy.MaxUnavailable != nil {
@ -214,7 +213,7 @@ func (c *CloneSetRolloutController) CheckOneBatchReady() (bool, error) {
klog.InfoS("checking the batch releasing progress",
"BatchRelease", c.releasePlanKey,
"current-batch", c.releaseStatus.CanaryStatus.CurrentBatch,
"current-batch", c.newStatus.CanaryStatus.CurrentBatch,
"canary-goal", canaryGoal,
"stable-goal", stableGoal,
"stable-replicas", stableReplicas,
@ -222,10 +221,10 @@ func (c *CloneSetRolloutController) CheckOneBatchReady() (bool, error) {
"canary-ready-replicas", canaryReadyReplicas)
// maybe, the workload replicas was scaled, we should requeue and handle the workload scaling event
if c.clone.Status.Replicas != c.releaseStatus.ObservedWorkloadReplicas {
if c.clone.Status.Replicas != c.newStatus.ObservedWorkloadReplicas {
err := fmt.Errorf("CloneSet(%v) replicas don't match ObservedWorkloadReplicas, workload status replicas: %v, observed workload replicas: %v",
c.targetNamespacedName, c.clone.Status.Replicas, c.releaseStatus.ObservedWorkloadReplicas)
klog.ErrorS(err, "the batch is not valid", "current-batch", c.releaseStatus.CanaryStatus.CurrentBatch)
c.targetNamespacedName, c.clone.Status.Replicas, c.newStatus.ObservedWorkloadReplicas)
klog.ErrorS(err, "the batch is not valid", "current-batch", c.newStatus.CanaryStatus.CurrentBatch)
return false, nil
}
@ -240,13 +239,13 @@ func (c *CloneSetRolloutController) CheckOneBatchReady() (bool, error) {
if currentBatchIsNotReadyYet() {
klog.InfoS("the batch is not ready yet", "BatchRelease",
c.releasePlanKey, "current-batch", c.releaseStatus.CanaryStatus.CurrentBatch)
c.releasePlanKey, "current-batch", c.newStatus.CanaryStatus.CurrentBatch)
return false, nil
}
klog.Infof("All pods of CloneSet(%v) in current batch are ready, BatchRelease(%v), current-batch=%v",
c.targetNamespacedName, c.releasePlanKey, c.releaseStatus.CanaryStatus.CurrentBatch)
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "BatchAvailable", "Batch %d is available", c.releaseStatus.CanaryStatus.CurrentBatch)
c.targetNamespacedName, c.releasePlanKey, c.newStatus.CanaryStatus.CurrentBatch)
c.recorder.Eventf(c.release, v1.EventTypeNormal, "BatchAvailable", "Batch %d is available", c.newStatus.CanaryStatus.CurrentBatch)
return true, nil
}
@ -260,14 +259,14 @@ func (c *CloneSetRolloutController) FinalizeProgress(cleanup bool) (bool, error)
return false, err
}
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "FinalizedSuccessfully", "Rollout resource are finalized: cleanup=%v", cleanup)
c.recorder.Eventf(c.release, v1.EventTypeNormal, "FinalizedSuccessfully", "Rollout resource are finalized: cleanup=%v", cleanup)
return true, nil
}
// SyncWorkloadInfo return change type if workload was changed during release
func (c *CloneSetRolloutController) SyncWorkloadInfo() (WorkloadEventType, *util.WorkloadInfo, error) {
// ignore the sync if the release plan is deleted
if c.parentController.DeletionTimestamp != nil {
if c.release.DeletionTimestamp != nil {
return IgnoreWorkloadEvent, nil, nil
}
@ -298,17 +297,17 @@ func (c *CloneSetRolloutController) SyncWorkloadInfo() (WorkloadEventType, *util
}
// in case of that the workload is scaling
if *c.clone.Spec.Replicas != c.releaseStatus.ObservedWorkloadReplicas && c.releaseStatus.ObservedWorkloadReplicas != -1 {
if *c.clone.Spec.Replicas != c.newStatus.ObservedWorkloadReplicas && c.newStatus.ObservedWorkloadReplicas != -1 {
workloadInfo.Replicas = c.clone.Spec.Replicas
klog.Warningf("CloneSet(%v) replicas changed during releasing, should pause and wait for it to complete, "+
"replicas from: %v -> %v", c.targetNamespacedName, c.releaseStatus.ObservedWorkloadReplicas, *c.clone.Spec.Replicas)
"replicas from: %v -> %v", c.targetNamespacedName, c.newStatus.ObservedWorkloadReplicas, *c.clone.Spec.Replicas)
return WorkloadReplicasChanged, workloadInfo, nil
}
// in case of that the workload was changed
if c.clone.Status.UpdateRevision != c.releaseStatus.UpdateRevision {
if c.clone.Status.UpdateRevision != c.newStatus.UpdateRevision {
klog.Warningf("CloneSet(%v) updateRevision changed during releasing, should try to restart the release plan, "+
"updateRevision from: %v -> %v", c.targetNamespacedName, c.releaseStatus.UpdateRevision, c.clone.Status.UpdateRevision)
"updateRevision from: %v -> %v", c.targetNamespacedName, c.newStatus.UpdateRevision, c.clone.Status.UpdateRevision)
return WorkloadPodTemplateChanged, workloadInfo, nil
}
@ -323,7 +322,7 @@ func (c *CloneSetRolloutController) fetchCloneSet() error {
clone := &kruiseappsv1alpha1.CloneSet{}
if err := c.client.Get(context.TODO(), c.targetNamespacedName, clone); err != nil {
if !apierrors.IsNotFound(err) {
c.recorder.Event(c.parentController, v1.EventTypeWarning, "GetCloneSetFailed", err.Error())
c.recorder.Event(c.release, v1.EventTypeWarning, "GetCloneSetFailed", err.Error())
}
return err
}
@ -332,13 +331,13 @@ func (c *CloneSetRolloutController) fetchCloneSet() error {
}
func (c *CloneSetRolloutController) recordCloneSetRevisionAndReplicas() {
c.releaseStatus.ObservedWorkloadReplicas = *c.clone.Spec.Replicas
c.releaseStatus.StableRevision = c.clone.Status.CurrentRevision
c.releaseStatus.UpdateRevision = c.clone.Status.UpdateRevision
c.newStatus.ObservedWorkloadReplicas = *c.clone.Spec.Replicas
c.newStatus.StableRevision = c.clone.Status.CurrentRevision
c.newStatus.UpdateRevision = c.clone.Status.UpdateRevision
}
func (c *CloneSetRolloutController) patchPodBatchLabel(canaryGoal int32) (bool, error) {
rolloutID, exist := c.parentController.Labels[util.RolloutIDLabel]
rolloutID, exist := c.release.Labels[util.RolloutIDLabel]
if !exist || rolloutID == "" {
return true, nil
}
@ -349,7 +348,7 @@ func (c *CloneSetRolloutController) patchPodBatchLabel(canaryGoal int32) (bool,
return false, err
}
batchID := c.parentController.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.parentController.Status.UpdateRevision
batchID := c.release.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.release.Status.UpdateRevision
return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, c.releasePlanKey)
}

View File

@ -45,7 +45,7 @@ func (c *cloneSetController) claimCloneSet(clone *kruiseappsv1alpha1.CloneSet) (
if controlInfo, ok := clone.Annotations[util.BatchReleaseControlAnnotation]; ok && controlInfo != "" {
ref := &metav1.OwnerReference{}
err := json.Unmarshal([]byte(controlInfo), ref)
if err == nil && ref.UID == c.parentController.UID {
if err == nil && ref.UID == c.release.UID {
controlled = true
klog.V(3).Infof("CloneSet(%v) has been controlled by this BatchRelease(%v), no need to claim again",
c.targetNamespacedName, c.releasePlanKey)
@ -57,7 +57,7 @@ func (c *cloneSetController) claimCloneSet(clone *kruiseappsv1alpha1.CloneSet) (
patch := map[string]interface{}{}
switch {
// if the cloneSet has been claimed by this parentController
// if the cloneSet has been claimed by this release
case controlled:
// make sure paused=false
if clone.Spec.UpdateStrategy.Paused {
@ -80,7 +80,7 @@ func (c *cloneSetController) claimCloneSet(clone *kruiseappsv1alpha1.CloneSet) (
},
}
controlInfo := metav1.NewControllerRef(c.parentController, c.parentController.GetObjectKind().GroupVersionKind())
controlInfo := metav1.NewControllerRef(c.release, c.release.GetObjectKind().GroupVersionKind())
controlByte, _ := json.Marshal(controlInfo)
patch["metadata"] = map[string]interface{}{
"annotations": map[string]string{
@ -93,7 +93,7 @@ func (c *cloneSetController) claimCloneSet(clone *kruiseappsv1alpha1.CloneSet) (
cloneObj := clone.DeepCopy()
patchByte, _ := json.Marshal(patch)
if err := c.client.Patch(context.TODO(), cloneObj, client.RawPatch(types.MergePatchType, patchByte)); err != nil {
c.recorder.Eventf(c.parentController, v1.EventTypeWarning, "ClaimCloneSetFailed", err.Error())
c.recorder.Eventf(c.release, v1.EventTypeWarning, "ClaimCloneSetFailed", err.Error())
return false, err
}
}
@ -115,7 +115,7 @@ func (c *cloneSetController) releaseCloneSet(clone *kruiseappsv1alpha1.CloneSet,
if err := json.Unmarshal([]byte(refByte), ref); err != nil {
found = false
klog.Errorf("failed to decode controller annotations of BatchRelease")
} else if ref.UID != c.parentController.UID {
} else if ref.UID != c.release.UID {
found = false
}
}
@ -128,7 +128,7 @@ func (c *cloneSetController) releaseCloneSet(clone *kruiseappsv1alpha1.CloneSet,
cloneObj := clone.DeepCopy()
patchByte := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, util.BatchReleaseControlAnnotation))
if err := c.client.Patch(context.TODO(), cloneObj, client.RawPatch(types.MergePatchType, patchByte)); err != nil {
c.recorder.Eventf(c.parentController, v1.EventTypeWarning, "ReleaseCloneSetFailed", err.Error())
c.recorder.Eventf(c.release, v1.EventTypeWarning, "ReleaseCloneSetFailed", err.Error())
return false, err
}
@ -149,24 +149,24 @@ func (c *cloneSetController) patchCloneSetPartition(clone *kruiseappsv1alpha1.Cl
cloneObj := clone.DeepCopy()
patchByte, _ := json.Marshal(patch)
if err := c.client.Patch(context.TODO(), cloneObj, client.RawPatch(types.MergePatchType, patchByte)); err != nil {
c.recorder.Eventf(c.parentController, v1.EventTypeWarning, "PatchPartitionFailed",
c.recorder.Eventf(c.release, v1.EventTypeWarning, "PatchPartitionFailed",
"Failed to update the CloneSet(%v) to the correct target partition %d, error: %v",
c.targetNamespacedName, partition, err)
return err
}
klog.InfoS("Submitted modified partition quest for CloneSet", "CloneSet", c.targetNamespacedName,
"target partition size", partition, "batch", c.releaseStatus.CanaryStatus.CurrentBatch)
"target partition size", partition, "batch", c.newStatus.CanaryStatus.CurrentBatch)
return nil
}
// the canary workload size for the current batch
func (c *cloneSetController) calculateCurrentCanary(totalSize int32) int32 {
targetSize := int32(util.CalculateNewBatchTarget(c.releasePlan, int(totalSize), int(c.releaseStatus.CanaryStatus.CurrentBatch)))
targetSize := int32(util.CalculateNewBatchTarget(&c.release.Spec.ReleasePlan, int(totalSize), int(c.newStatus.CanaryStatus.CurrentBatch)))
klog.InfoS("Calculated the number of pods in the target CloneSet after current batch",
"CloneSet", c.targetNamespacedName, "BatchRelease", c.releasePlanKey,
"current batch", c.releaseStatus.CanaryStatus.CurrentBatch, "workload updateRevision size", targetSize)
"current batch", c.newStatus.CanaryStatus.CurrentBatch, "workload updateRevision size", targetSize)
return targetSize
}
@ -175,7 +175,7 @@ func (c *cloneSetController) calculateCurrentStable(totalSize int32) int32 {
sourceSize := totalSize - c.calculateCurrentCanary(totalSize)
klog.InfoS("Calculated the number of pods in the source CloneSet after current batch",
"CloneSet", c.targetNamespacedName, "BatchRelease", c.releasePlanKey,
"current batch", c.releaseStatus.CanaryStatus.CurrentBatch, "workload stableRevision size", sourceSize)
"current batch", c.newStatus.CanaryStatus.CurrentBatch, "workload stableRevision size", sourceSize)
return sourceSize
}

View File

@ -175,11 +175,10 @@ func TestCloneSetController(t *testing.T) {
rec := record.NewFakeRecorder(100)
c := cloneSetController{
workloadController: workloadController{
client: cli,
recorder: rec,
parentController: releaseClone,
releasePlan: &releaseClone.Spec.ReleasePlan,
releaseStatus: &releaseClone.Status,
client: cli,
recorder: rec,
release: releaseClone,
newStatus: &releaseClone.Status,
},
targetNamespacedName: client.ObjectKeyFromObject(stableClone),
}

View File

@ -44,15 +44,14 @@ type DeploymentsRolloutController struct {
}
// NewDeploymentRolloutController creates a new Deployment rollout controller
func NewDeploymentRolloutController(cli client.Client, recorder record.EventRecorder, release *v1alpha1.BatchRelease, plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseStatus, stableNamespacedName types.NamespacedName) *DeploymentsRolloutController {
func NewDeploymentRolloutController(cli client.Client, recorder record.EventRecorder, release *v1alpha1.BatchRelease, newStatus *v1alpha1.BatchReleaseStatus, stableNamespacedName types.NamespacedName) *DeploymentsRolloutController {
return &DeploymentsRolloutController{
deploymentController: deploymentController{
workloadController: workloadController{
client: cli,
recorder: recorder,
parentController: release,
releasePlan: plan,
releaseStatus: status,
client: cli,
recorder: recorder,
release: release,
newStatus: newStatus,
},
stableNamespacedName: stableNamespacedName,
canaryNamespacedName: stableNamespacedName,
@ -67,7 +66,7 @@ func (c *DeploymentsRolloutController) VerifyWorkload() (bool, error) {
var message string
defer func() {
if err != nil {
c.recorder.Event(c.parentController, v1.EventTypeWarning, "VerifyFailed", err.Error())
c.recorder.Event(c.release, v1.EventTypeWarning, "VerifyFailed", err.Error())
} else if message != "" {
klog.Warningf(message)
}
@ -106,7 +105,7 @@ func (c *DeploymentsRolloutController) VerifyWorkload() (bool, error) {
return false, err
}
c.recorder.Event(c.parentController, v1.EventTypeNormal, "Verified", "ReleasePlan and the Deployment resource are verified")
c.recorder.Event(c.release, v1.EventTypeNormal, "Verified", "ReleasePlan and the Deployment resource are verified")
return true, nil
}
@ -118,7 +117,7 @@ func (c *DeploymentsRolloutController) PrepareBeforeProgress() (bool, error) {
return false, err
}
c.recorder.Event(c.parentController, v1.EventTypeNormal, "Initialized", "Rollout resource are initialized")
c.recorder.Event(c.release, v1.EventTypeNormal, "Initialized", "Rollout resource are initialized")
return true, nil
}
@ -136,12 +135,12 @@ func (c *DeploymentsRolloutController) UpgradeOneBatch() (bool, error) {
currentCanaryReplicas := *c.canary.Spec.Replicas
// canary goal we should achieve
canaryGoal := c.calculateCurrentCanary(c.releaseStatus.ObservedWorkloadReplicas)
canaryGoal := c.calculateCurrentCanary(c.newStatus.ObservedWorkloadReplicas)
klog.V(3).InfoS("upgraded one batch, but no need to update replicas of canary Deployment",
"Deployment", client.ObjectKeyFromObject(c.canary),
"BatchRelease", c.releaseKey,
"current-batch", c.releaseStatus.CanaryStatus.CurrentBatch,
"current-batch", c.newStatus.CanaryStatus.CurrentBatch,
"canary-goal", canaryGoal,
"current-canary-replicas", currentCanaryReplicas,
"current-canary-status-replicas", c.canary.Status.UpdatedReplicas)
@ -159,7 +158,7 @@ func (c *DeploymentsRolloutController) UpgradeOneBatch() (bool, error) {
return false, err
}
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "Batch Rollout", "Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch)
c.recorder.Eventf(c.release, v1.EventTypeNormal, "Batch Rollout", "Finished submitting all upgrade quests for batch %d", c.newStatus.CanaryStatus.CurrentBatch)
return true, nil
}
@ -182,7 +181,7 @@ func (c *DeploymentsRolloutController) CheckOneBatchReady() (bool, error) {
// canary pods that have been available
availableCanaryPodCount := c.canary.Status.AvailableReplicas
// canary goal that should have in current batch
canaryGoal := c.calculateCurrentCanary(c.releaseStatus.ObservedWorkloadReplicas)
canaryGoal := c.calculateCurrentCanary(c.newStatus.ObservedWorkloadReplicas)
// max unavailable allowed replicas
maxUnavailable := 0
if c.canary.Spec.Strategy.RollingUpdate != nil &&
@ -192,7 +191,7 @@ func (c *DeploymentsRolloutController) CheckOneBatchReady() (bool, error) {
klog.InfoS("checking the batch releasing progress",
"BatchRelease", c.releaseKey,
"current-batch", c.releaseStatus.CanaryStatus.CurrentBatch,
"current-batch", c.newStatus.CanaryStatus.CurrentBatch,
"canary-goal", canaryGoal,
"canary-available-pod-count", availableCanaryPodCount,
"stable-pod-status-replicas", c.stable.Status.Replicas,
@ -209,11 +208,11 @@ func (c *DeploymentsRolloutController) CheckOneBatchReady() (bool, error) {
// make sure there is at least one pod is available
if currentBatchIsNotReadyYet() {
klog.Infof("BatchRelease(%v) batch is not ready yet, current batch=%v", c.releaseKey, c.releaseStatus.CanaryStatus.CurrentBatch)
klog.Infof("BatchRelease(%v) batch is not ready yet, current batch=%v", c.releaseKey, c.newStatus.CanaryStatus.CurrentBatch)
return false, nil
}
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "BatchReady", "Batch %d is available", c.releaseStatus.CanaryStatus.CurrentBatch)
c.recorder.Eventf(c.release, v1.EventTypeNormal, "BatchReady", "Batch %d is available", c.newStatus.CanaryStatus.CurrentBatch)
return true, nil
}
@ -230,7 +229,7 @@ func (c *DeploymentsRolloutController) FinalizeProgress(cleanup bool) (bool, err
return false, err
}
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "Finalized", "Finalized: cleanup=%v", cleanup)
c.recorder.Eventf(c.release, v1.EventTypeNormal, "Finalized", "Finalized: cleanup=%v", cleanup)
return true, nil
}
@ -238,7 +237,7 @@ func (c *DeploymentsRolloutController) FinalizeProgress(cleanup bool) (bool, err
// TODO: abstract a WorkloadEventTypeJudge interface for these following `if` clauses
func (c *DeploymentsRolloutController) SyncWorkloadInfo() (WorkloadEventType, *util.WorkloadInfo, error) {
// ignore the sync if the release plan is deleted
if c.parentController.DeletionTimestamp != nil {
if c.release.DeletionTimestamp != nil {
return IgnoreWorkloadEvent, nil, nil
}
@ -282,15 +281,15 @@ func (c *DeploymentsRolloutController) SyncWorkloadInfo() (WorkloadEventType, *u
}
// in case of that the workload is scaling up/down
if *c.stable.Spec.Replicas != c.releaseStatus.ObservedWorkloadReplicas && c.releaseStatus.ObservedWorkloadReplicas != -1 {
if *c.stable.Spec.Replicas != c.newStatus.ObservedWorkloadReplicas && c.newStatus.ObservedWorkloadReplicas != -1 {
workloadInfo.Replicas = c.stable.Spec.Replicas
klog.Warningf("Deployment(%v) replicas changed during releasing, should pause and wait for it to complete, replicas from: %v -> %v",
c.stableNamespacedName, c.releaseStatus.ObservedWorkloadReplicas, *c.stable.Spec.Replicas)
c.stableNamespacedName, c.newStatus.ObservedWorkloadReplicas, *c.stable.Spec.Replicas)
return WorkloadReplicasChanged, workloadInfo, nil
}
// in case of that the workload revision was changed
if hashRevision := util.ComputeHash(&c.stable.Spec.Template, nil); hashRevision != c.releaseStatus.UpdateRevision {
if hashRevision := util.ComputeHash(&c.stable.Spec.Template, nil); hashRevision != c.newStatus.UpdateRevision {
workloadInfo.Status.UpdateRevision = hashRevision
klog.Warningf("Deployment(%v) updateRevision changed during releasing", c.stableNamespacedName)
return WorkloadPodTemplateChanged, workloadInfo, nil
@ -365,14 +364,14 @@ func (c *DeploymentsRolloutController) recordDeploymentRevisionAndReplicas() err
if err != nil {
return err
}
c.releaseStatus.StableRevision = stableRevision
c.releaseStatus.UpdateRevision = updateRevision
c.releaseStatus.ObservedWorkloadReplicas = *c.stable.Spec.Replicas
c.newStatus.StableRevision = stableRevision
c.newStatus.UpdateRevision = updateRevision
c.newStatus.ObservedWorkloadReplicas = *c.stable.Spec.Replicas
return nil
}
func (c *DeploymentsRolloutController) patchPodBatchLabel(canaryGoal int32) (bool, error) {
rolloutID, exist := c.parentController.Labels[util.RolloutIDLabel]
rolloutID, exist := c.release.Labels[util.RolloutIDLabel]
if !exist || rolloutID == "" || c.canary == nil {
return true, nil
}
@ -383,7 +382,7 @@ func (c *DeploymentsRolloutController) patchPodBatchLabel(canaryGoal int32) (boo
return false, err
}
batchID := c.parentController.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.parentController.Status.UpdateRevision
batchID := c.release.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.release.Status.UpdateRevision
return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, c.releaseKey)
}

View File

@ -50,7 +50,7 @@ func (c *deploymentController) claimDeployment(stableDeploy, canaryDeploy *apps.
if controlInfo, ok := stableDeploy.Annotations[util.BatchReleaseControlAnnotation]; ok && controlInfo != "" {
ref := &metav1.OwnerReference{}
err := json.Unmarshal([]byte(controlInfo), ref)
if err == nil && ref.UID == c.parentController.UID {
if err == nil && ref.UID == c.release.UID {
klog.V(3).Infof("Deployment(%v) has been controlled by this BatchRelease(%v), no need to claim again",
c.stableNamespacedName, c.releaseKey)
controlled = true
@ -62,7 +62,7 @@ func (c *deploymentController) claimDeployment(stableDeploy, canaryDeploy *apps.
// patch control info to stable deployments if it needs
if !controlled {
controlInfo, _ := json.Marshal(metav1.NewControllerRef(c.parentController, c.parentController.GetObjectKind().GroupVersionKind()))
controlInfo, _ := json.Marshal(metav1.NewControllerRef(c.release, c.release.GetObjectKind().GroupVersionKind()))
patchedInfo := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]string{
@ -121,11 +121,11 @@ func (c *deploymentController) createCanaryDeployment(stableDeploy *apps.Deploym
}
canaryDeploy.Finalizers = append(canaryDeploy.Finalizers, util.CanaryDeploymentFinalizer)
canaryDeploy.OwnerReferences = append(canaryDeploy.OwnerReferences, *metav1.NewControllerRef(c.parentController, c.parentController.GroupVersionKind()))
canaryDeploy.OwnerReferences = append(canaryDeploy.OwnerReferences, *metav1.NewControllerRef(c.release, c.release.GroupVersionKind()))
// set extra labels & annotations
canaryDeploy.Labels[util.CanaryDeploymentLabel] = c.stableNamespacedName.Name
owner := metav1.NewControllerRef(c.parentController, c.parentController.GroupVersionKind())
owner := metav1.NewControllerRef(c.release, c.release.GroupVersionKind())
if owner != nil {
ownerInfo, _ := json.Marshal(owner)
canaryDeploy.Annotations[util.BatchReleaseControlAnnotation] = string(ownerInfo)
@ -212,13 +212,13 @@ func (c *deploymentController) patchDeploymentReplicas(deploy *apps.Deployment,
cloneObj := deploy.DeepCopy()
patchByte, _ := json.Marshal(patch)
if err := c.client.Patch(context.TODO(), cloneObj, client.RawPatch(types.MergePatchType, patchByte)); err != nil {
c.recorder.Eventf(c.parentController, v1.EventTypeWarning, "PatchPartitionFailed",
c.recorder.Eventf(c.release, v1.EventTypeWarning, "PatchPartitionFailed",
"Failed to update the canary Deployment to the correct canary replicas %d, error: %v", replicas, err)
return err
}
klog.InfoS("Submitted modified partition quest for canary Deployment", "Deployment",
client.ObjectKeyFromObject(deploy), "target canary replicas size", replicas, "batch", c.releaseStatus.CanaryStatus.CurrentBatch)
client.ObjectKeyFromObject(deploy), "target canary replicas size", replicas, "batch", c.newStatus.CanaryStatus.CurrentBatch)
return nil
}
@ -284,7 +284,7 @@ func (c *deploymentController) listCanaryDeployment(options ...client.ListOption
for i := range dList.Items {
d := &dList.Items[i]
o := metav1.GetControllerOf(d)
if o == nil || o.UID != c.parentController.UID {
if o == nil || o.UID != c.release.UID {
continue
}
ds = append(ds, d)
@ -295,10 +295,10 @@ func (c *deploymentController) listCanaryDeployment(options ...client.ListOption
// the target workload size for the current batch
func (c *deploymentController) calculateCurrentCanary(totalSize int32) int32 {
targetSize := int32(util.CalculateNewBatchTarget(c.releasePlan, int(totalSize), int(c.releaseStatus.CanaryStatus.CurrentBatch)))
targetSize := int32(util.CalculateNewBatchTarget(&c.release.Spec.ReleasePlan, int(totalSize), int(c.newStatus.CanaryStatus.CurrentBatch)))
klog.InfoS("Calculated the number of pods in the canary Deployment after current batch",
"Deployment", c.stableNamespacedName, "BatchRelease", c.releaseKey,
"current batch", c.releaseStatus.CanaryStatus.CurrentBatch, "workload updateRevision size", targetSize)
"current batch", c.newStatus.CanaryStatus.CurrentBatch, "workload updateRevision size", targetSize)
return targetSize
}
@ -307,6 +307,6 @@ func (c *deploymentController) calculateCurrentStable(totalSize int32) int32 {
sourceSize := totalSize - c.calculateCurrentCanary(totalSize)
klog.InfoS("Calculated the number of pods in the stable Deployment after current batch",
"Deployment", c.stableNamespacedName, "BatchRelease", c.releaseKey,
"current batch", c.releaseStatus.CanaryStatus.CurrentBatch, "workload stableRevision size", sourceSize)
"current batch", c.newStatus.CanaryStatus.CurrentBatch, "workload stableRevision size", sourceSize)
return sourceSize
}

View File

@ -157,11 +157,10 @@ func TestDeploymentController(t *testing.T) {
rec := record.NewFakeRecorder(100)
c := deploymentController{
workloadController: workloadController{
client: cli,
recorder: rec,
parentController: releaseDeploy,
releasePlan: &releaseDeploy.Spec.ReleasePlan,
releaseStatus: &releaseDeploy.Status,
client: cli,
recorder: rec,
release: releaseDeploy,
newStatus: &releaseDeploy.Status,
},
stableNamespacedName: client.ObjectKeyFromObject(stableDeploy),
canaryNamespacedName: client.ObjectKeyFromObject(stableDeploy),

View File

@ -43,10 +43,10 @@ type UnifiedWorkloadController interface {
// UnifiedWorkloadRolloutControlPlane is responsible for handling rollout StatefulSet type of workloads
type UnifiedWorkloadRolloutControlPlane struct {
UnifiedWorkloadController
client client.Client
recorder record.EventRecorder
planController *v1alpha1.BatchRelease
newStatus *v1alpha1.BatchReleaseStatus
client client.Client
recorder record.EventRecorder
release *v1alpha1.BatchRelease
newStatus *v1alpha1.BatchReleaseStatus
}
type NewUnifiedControllerFunc = func(c client.Client, r record.EventRecorder, p *v1alpha1.BatchRelease, n types.NamespacedName, gvk schema.GroupVersionKind) UnifiedWorkloadController
@ -56,7 +56,7 @@ func NewUnifiedWorkloadRolloutControlPlane(f NewUnifiedControllerFunc, c client.
return &UnifiedWorkloadRolloutControlPlane{
client: c,
recorder: r,
planController: p,
release: p,
newStatus: newStatus,
UnifiedWorkloadController: f(c, r, p, n, gvk),
}
@ -68,7 +68,7 @@ func (c *UnifiedWorkloadRolloutControlPlane) VerifyWorkload() (bool, error) {
var message string
defer func() {
if err != nil {
c.recorder.Event(c.planController, v1.EventTypeWarning, "VerifyFailed", err.Error())
c.recorder.Event(c.release, v1.EventTypeWarning, "VerifyFailed", err.Error())
} else if message != "" {
klog.Warningf(message)
}
@ -97,7 +97,7 @@ func (c *UnifiedWorkloadRolloutControlPlane) VerifyWorkload() (bool, error) {
return false, nil
}
c.recorder.Event(c.planController, v1.EventTypeNormal, "VerifiedSuccessfully", "ReleasePlan and the workload resource are verified")
c.recorder.Event(c.release, v1.EventTypeNormal, "VerifiedSuccessfully", "ReleasePlan and the workload resource are verified")
return true, nil
}
@ -115,7 +115,7 @@ func (c *UnifiedWorkloadRolloutControlPlane) PrepareBeforeProgress() (bool, erro
return false, err
}
c.recorder.Event(c.planController, v1.EventTypeNormal, "InitializedSuccessfully", "Rollout resource are initialized")
c.recorder.Event(c.release, v1.EventTypeNormal, "InitializedSuccessfully", "Rollout resource are initialized")
return true, nil
}
@ -127,8 +127,8 @@ func (c *UnifiedWorkloadRolloutControlPlane) UpgradeOneBatch() (bool, error) {
return false, err
}
if c.planController.Status.ObservedWorkloadReplicas == 0 {
klog.Infof("BatchRelease(%v) observed workload replicas is 0, no need to upgrade", client.ObjectKeyFromObject(c.planController))
if c.release.Status.ObservedWorkloadReplicas == 0 {
klog.Infof("BatchRelease(%v) observed workload replicas is 0, no need to upgrade", client.ObjectKeyFromObject(c.release))
return true, nil
}
@ -139,15 +139,15 @@ func (c *UnifiedWorkloadRolloutControlPlane) UpgradeOneBatch() (bool, error) {
currentBatch := c.newStatus.CanaryStatus.CurrentBatch
// the number of canary pods should have in current batch
canaryGoal := c.calculateCurrentCanary(c.planController.Status.ObservedWorkloadReplicas)
canaryGoal := c.calculateCurrentCanary(c.release.Status.ObservedWorkloadReplicas)
// the number of stable pods should have in current batch
stableGoal := c.calculateCurrentStable(c.planController.Status.ObservedWorkloadReplicas)
stableGoal := c.calculateCurrentStable(c.release.Status.ObservedWorkloadReplicas)
// the number of canary pods now we have in current state
currentCanaryReplicas := workloadInfo.Status.UpdatedReplicas
// in case of no need to upgrade pods
klog.V(3).InfoS("upgraded one batch, status info:",
"BatchRelease", client.ObjectKeyFromObject(c.planController),
"BatchRelease", client.ObjectKeyFromObject(c.release),
"current-batch", currentBatch,
"canary-goal", canaryGoal,
"stable-goal", stableGoal,
@ -163,8 +163,8 @@ func (c *UnifiedWorkloadRolloutControlPlane) UpgradeOneBatch() (bool, error) {
return false, err
}
c.recorder.Eventf(c.planController, v1.EventTypeNormal, "SetBatchDone",
"Finished submitting all upgrade quests for batch %d", c.planController.Status.CanaryStatus.CurrentBatch)
c.recorder.Eventf(c.release, v1.EventTypeNormal, "SetBatchDone",
"Finished submitting all upgrade quests for batch %d", c.release.Status.CanaryStatus.CurrentBatch)
return true, nil
}
@ -175,8 +175,8 @@ func (c *UnifiedWorkloadRolloutControlPlane) CheckOneBatchReady() (bool, error)
return false, err
}
if c.planController.Status.ObservedWorkloadReplicas == 0 {
klog.Infof("BatchRelease(%v) observed workload replicas is 0, no need to check", client.ObjectKeyFromObject(c.planController))
if c.release.Status.ObservedWorkloadReplicas == 0 {
klog.Infof("BatchRelease(%v) observed workload replicas is 0, no need to check", client.ObjectKeyFromObject(c.release))
return true, nil
}
@ -192,18 +192,18 @@ func (c *UnifiedWorkloadRolloutControlPlane) CheckOneBatchReady() (bool, error)
// the number of canary pods that have been ready in current state
canaryReadyReplicas := workloadInfo.Status.UpdatedReadyReplicas
// the number of the real canary pods should have in current batch
canaryGoal := c.calculateCurrentCanary(c.planController.Status.ObservedWorkloadReplicas)
canaryGoal := c.calculateCurrentCanary(c.release.Status.ObservedWorkloadReplicas)
// the number of the real stable pods should have in current batch
stableGoal := c.calculateCurrentStable(c.planController.Status.ObservedWorkloadReplicas)
stableGoal := c.calculateCurrentStable(c.release.Status.ObservedWorkloadReplicas)
// the number of max unavailable canary pods allowed by this workload
maxUnavailable := 0
if workloadInfo.MaxUnavailable != nil {
maxUnavailable, _ = intstr.GetValueFromIntOrPercent(workloadInfo.MaxUnavailable, int(c.planController.Status.ObservedWorkloadReplicas), true)
maxUnavailable, _ = intstr.GetValueFromIntOrPercent(workloadInfo.MaxUnavailable, int(c.release.Status.ObservedWorkloadReplicas), true)
}
klog.InfoS("checking the batch releasing progress",
"BatchRelease", client.ObjectKeyFromObject(c.planController),
"current-batch", c.planController.Status.CanaryStatus.CurrentBatch,
"BatchRelease", client.ObjectKeyFromObject(c.release),
"current-batch", c.release.Status.CanaryStatus.CurrentBatch,
"canary-goal", canaryGoal,
"stable-goal", stableGoal,
"stable-replicas", stableReplicas,
@ -211,22 +211,22 @@ func (c *UnifiedWorkloadRolloutControlPlane) CheckOneBatchReady() (bool, error)
"maxUnavailable", maxUnavailable)
// maybe, the workload replicas was scaled, we should requeue and handle the workload scaling event
if workloadInfo.Status.Replicas != c.planController.Status.ObservedWorkloadReplicas {
if workloadInfo.Status.Replicas != c.release.Status.ObservedWorkloadReplicas {
err := fmt.Errorf("%v replicas don't match ObservedWorkloadReplicas, workload status replicas: %v, observed workload replicas: %v",
workloadInfo.GVKWithName, workloadInfo.Status.Replicas, c.planController.Status.ObservedWorkloadReplicas)
klog.ErrorS(err, "the batch is not valid", "current-batch", c.planController.Status.CanaryStatus.CurrentBatch)
workloadInfo.GVKWithName, workloadInfo.Status.Replicas, c.release.Status.ObservedWorkloadReplicas)
klog.ErrorS(err, "the batch is not valid", "current-batch", c.release.Status.CanaryStatus.CurrentBatch)
return false, err
}
if ready, err := c.IsBatchReady(canaryGoal, stableGoal); err != nil || !ready {
klog.InfoS("the batch is not ready yet", "Workload", workloadInfo.GVKWithName,
"ReleasePlan", client.ObjectKeyFromObject(c.planController), "current-batch", c.planController.Status.CanaryStatus.CurrentBatch)
"ReleasePlan", client.ObjectKeyFromObject(c.release), "current-batch", c.release.Status.CanaryStatus.CurrentBatch)
return false, nil
}
klog.Infof("All pods of %v in current batch are ready, BatchRelease(%v), current-batch=%v",
workloadInfo.GVKWithName, client.ObjectKeyFromObject(c.planController), c.planController.Status.CanaryStatus.CurrentBatch)
c.recorder.Eventf(c.planController, v1.EventTypeNormal, "BatchAvailable", "Batch %d is available", c.planController.Status.CanaryStatus.CurrentBatch)
workloadInfo.GVKWithName, client.ObjectKeyFromObject(c.release), c.release.Status.CanaryStatus.CurrentBatch)
c.recorder.Eventf(c.release, v1.EventTypeNormal, "BatchAvailable", "Batch %d is available", c.release.Status.CanaryStatus.CurrentBatch)
return true, nil
}
@ -235,14 +235,14 @@ func (c *UnifiedWorkloadRolloutControlPlane) FinalizeProgress(cleanup bool) (boo
if _, err := c.ReleaseWorkload(cleanup); err != nil {
return false, err
}
c.recorder.Eventf(c.planController, v1.EventTypeNormal, "FinalizedSuccessfully", "Rollout resource are finalized: cleanup=%v", cleanup)
c.recorder.Eventf(c.release, v1.EventTypeNormal, "FinalizedSuccessfully", "Rollout resource are finalized: cleanup=%v", cleanup)
return true, nil
}
// SyncWorkloadInfo return change type if workload was changed during release
func (c *UnifiedWorkloadRolloutControlPlane) SyncWorkloadInfo() (WorkloadEventType, *util.WorkloadInfo, error) {
// ignore the sync if the release plan is deleted
if c.planController.DeletionTimestamp != nil {
if c.release.DeletionTimestamp != nil {
return IgnoreWorkloadEvent, nil, nil
}
@ -267,16 +267,16 @@ func (c *UnifiedWorkloadRolloutControlPlane) SyncWorkloadInfo() (WorkloadEventTy
}
// in case of that the workload is scaling
if *workloadInfo.Replicas != c.planController.Status.ObservedWorkloadReplicas {
if *workloadInfo.Replicas != c.release.Status.ObservedWorkloadReplicas {
klog.Warningf("%v replicas changed during releasing, should pause and wait for it to complete, "+
"replicas from: %v -> %v", workloadInfo.GVKWithName, c.planController.Status.ObservedWorkloadReplicas, *workloadInfo.Replicas)
"replicas from: %v -> %v", workloadInfo.GVKWithName, c.release.Status.ObservedWorkloadReplicas, *workloadInfo.Replicas)
return WorkloadReplicasChanged, workloadInfo, nil
}
// in case of that the workload was changed
if workloadInfo.Status.UpdateRevision != c.planController.Status.UpdateRevision {
if workloadInfo.Status.UpdateRevision != c.release.Status.UpdateRevision {
klog.Warningf("%v updateRevision changed during releasing, should try to restart the release plan, "+
"updateRevision from: %v -> %v", workloadInfo.GVKWithName, c.planController.Status.UpdateRevision, workloadInfo.Status.UpdateRevision)
"updateRevision from: %v -> %v", workloadInfo.GVKWithName, c.release.Status.UpdateRevision, workloadInfo.Status.UpdateRevision)
return WorkloadPodTemplateChanged, workloadInfo, nil
}
@ -285,17 +285,17 @@ func (c *UnifiedWorkloadRolloutControlPlane) SyncWorkloadInfo() (WorkloadEventTy
// the canary workload size for the current batch
func (c *UnifiedWorkloadRolloutControlPlane) calculateCurrentCanary(totalSize int32) int32 {
canaryGoal := int32(util.CalculateNewBatchTarget(&c.planController.Spec.ReleasePlan, int(totalSize), int(c.planController.Status.CanaryStatus.CurrentBatch)))
klog.InfoS("Calculated the number of pods in the target workload after current batch", "BatchRelease", client.ObjectKeyFromObject(c.planController),
"current batch", c.planController.Status.CanaryStatus.CurrentBatch, "workload canary goal replicas goal", canaryGoal)
canaryGoal := int32(util.CalculateNewBatchTarget(&c.release.Spec.ReleasePlan, int(totalSize), int(c.release.Status.CanaryStatus.CurrentBatch)))
klog.InfoS("Calculated the number of pods in the target workload after current batch", "BatchRelease", client.ObjectKeyFromObject(c.release),
"current batch", c.release.Status.CanaryStatus.CurrentBatch, "workload canary goal replicas goal", canaryGoal)
return canaryGoal
}
// the source workload size for the current batch
func (c *UnifiedWorkloadRolloutControlPlane) calculateCurrentStable(totalSize int32) int32 {
stableGoal := totalSize - c.calculateCurrentCanary(totalSize)
klog.InfoS("Calculated the number of pods in the target workload after current batch", "BatchRelease", client.ObjectKeyFromObject(c.planController),
"current batch", c.planController.Status.CanaryStatus.CurrentBatch, "workload stable goal replicas goal", stableGoal)
klog.InfoS("Calculated the number of pods in the target workload after current batch", "BatchRelease", client.ObjectKeyFromObject(c.release),
"current batch", c.release.Status.CanaryStatus.CurrentBatch, "workload stable goal replicas goal", stableGoal)
return stableGoal
}
@ -312,7 +312,7 @@ func (c *UnifiedWorkloadRolloutControlPlane) RecordWorkloadRevisionAndReplicas()
}
func (c *UnifiedWorkloadRolloutControlPlane) patchPodBatchLabel(workloadInfo *util.WorkloadInfo, canaryGoal int32) (bool, error) {
rolloutID, exist := c.planController.Labels[util.RolloutIDLabel]
rolloutID, exist := c.release.Labels[util.RolloutIDLabel]
if !exist || rolloutID == "" {
return true, nil
}
@ -323,7 +323,7 @@ func (c *UnifiedWorkloadRolloutControlPlane) patchPodBatchLabel(workloadInfo *ut
return false, err
}
batchID := c.planController.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.planController.Status.UpdateRevision
return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, client.ObjectKeyFromObject(c.planController))
batchID := c.release.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.release.Status.UpdateRevision
return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, client.ObjectKeyFromObject(c.release))
}

View File

@ -41,11 +41,10 @@ const (
)
type workloadController struct {
client client.Client
recorder record.EventRecorder
releasePlan *v1alpha1.ReleasePlan
releaseStatus *v1alpha1.BatchReleaseStatus
parentController *v1alpha1.BatchRelease
client client.Client
recorder record.EventRecorder
newStatus *v1alpha1.BatchReleaseStatus
release *v1alpha1.BatchRelease
}
// WorkloadController is the interface that all type of cloneSet controller implements