diff --git a/api/v1alpha1/deployment_types.go b/api/v1alpha1/deployment_types.go new file mode 100644 index 0000000..c4bda99 --- /dev/null +++ b/api/v1alpha1/deployment_types.go @@ -0,0 +1,81 @@ +package v1alpha1 + +import ( + apps "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +const ( + // DeploymentStrategyAnnotation is annotation for deployment, + // which is strategy fields of Advanced Deployment. + DeploymentStrategyAnnotation = "rollouts.kruise.io/deployment-strategy" + + // DeploymentExtraStatusAnnotation is annotation for deployment, + // which is extra status field of Advanced Deployment. + DeploymentExtraStatusAnnotation = "rollouts.kruise.io/deployment-extra-status" +) + +// DeploymentStrategy is strategy field for Advanced Deployment +type DeploymentStrategy struct { + // RollingStyle define the behavior of rolling for deployment. + RollingStyle RollingStyleType `json:"rollingStyle,omitempty"` + // original deployment strategy rolling update fields + RollingUpdate *apps.RollingUpdateDeployment `json:"rollingUpdate,omitempty"` + // Paused = true will block the upgrade of Pods + Paused bool `json:"paused,omitempty"` + // Partition describe how many Pods should be updated during rollout. + // We use this field to implement partition-style rolling update. + Partition intstr.IntOrString `json:"partition,omitempty"` +} + +type RollingStyleType string + +const ( + // PartitionRollingStyleType means rolling in batches just like CloneSet, and will NOT create any extra Deployment; + PartitionRollingStyleType RollingStyleType = "Partition" + // CanaryRollingStyleType means rolling in canary way, and will create a canary Deployment. + CanaryRollingStyleType RollingStyleType = "Canary" +) + +// DeploymentExtraStatus is extra status field for Advanced Deployment +type DeploymentExtraStatus struct { + // ObservedGeneration record the generation of deployment this status observed. + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + // UpdatedReadyReplicas the number of pods that has been updated and ready. + UpdatedReadyReplicas int32 `json:"updatedReadyReplicas,omitempty"` + // ExpectedUpdatedReplicas is an absolute number calculated based on Partition + // and Deployment.Spec.Replicas, means how many pods are expected be updated under + // current strategy. + // This field is designed to avoid users to fall into the details of algorithm + // for Partition calculation. + ExpectedUpdatedReplicas int32 `json:"expectedUpdatedReplicas,omitempty"` +} + +func SetDefaultDeploymentStrategy(strategy *DeploymentStrategy) { + if strategy.RollingStyle == CanaryRollingStyleType { + return + } + if strategy.RollingUpdate == nil { + strategy.RollingUpdate = &apps.RollingUpdateDeployment{} + } + if strategy.RollingUpdate.MaxUnavailable == nil { + // Set MaxUnavailable as 25% by default + maxUnavailable := intstr.FromString("25%") + strategy.RollingUpdate.MaxUnavailable = &maxUnavailable + } + if strategy.RollingUpdate.MaxSurge == nil { + // Set MaxSurge as 25% by default + maxSurge := intstr.FromString("25%") + strategy.RollingUpdate.MaxUnavailable = &maxSurge + } + + // Cannot allow maxSurge==0 && MaxUnavailable==0, otherwise, no pod can be updated when rolling update. + maxSurge, _ := intstr.GetScaledValueFromIntOrPercent(strategy.RollingUpdate.MaxSurge, 100, true) + maxUnavailable, _ := intstr.GetScaledValueFromIntOrPercent(strategy.RollingUpdate.MaxUnavailable, 100, true) + if maxSurge == 0 && maxUnavailable == 0 { + strategy.RollingUpdate = &apps.RollingUpdateDeployment{ + MaxSurge: &intstr.IntOrString{Type: intstr.Int, IntVal: 0}, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + } + } +} diff --git a/api/v1alpha1/rollout_types.go b/api/v1alpha1/rollout_types.go index 0aba58f..db17404 100644 --- a/api/v1alpha1/rollout_types.go +++ b/api/v1alpha1/rollout_types.go @@ -41,6 +41,13 @@ const ( // RollbackInBatchAnnotation is set to rollout annotations. // RollbackInBatchAnnotation allow use disable quick rollback, and will roll back in batch style. RollbackInBatchAnnotation = "rollouts.kruise.io/rollback-in-batch" + + // DeploymentRolloutStyleAnnotation define the rolling behavior for Deployment. + // must be "partition" or "canary": + // * "partition" means rolling Deployment in batches just like CloneSet, and will NOT create any extra Deployment; + // * "canary" means rolling in canary way, and will create a canary Deployment. + // Defaults to canary + DeploymentRolloutStyleAnnotation = "rollouts.kruise.io/deployment-rolling-style" ) // RolloutSpec defines the desired state of Rollout diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 558c0c3..fcf7dc1 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ limitations under the License. package v1alpha1 import ( + "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/gateway-api/apis/v1alpha2" @@ -265,6 +266,42 @@ func (in *CanaryStrategy) DeepCopy() *CanaryStrategy { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DeploymentExtraStatus) DeepCopyInto(out *DeploymentExtraStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeploymentExtraStatus. +func (in *DeploymentExtraStatus) DeepCopy() *DeploymentExtraStatus { + if in == nil { + return nil + } + out := new(DeploymentExtraStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DeploymentStrategy) DeepCopyInto(out *DeploymentStrategy) { + *out = *in + if in.RollingUpdate != nil { + in, out := &in.RollingUpdate, &out.RollingUpdate + *out = new(v1.RollingUpdateDeployment) + (*in).DeepCopyInto(*out) + } + out.Partition = in.Partition +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeploymentStrategy. +func (in *DeploymentStrategy) DeepCopy() *DeploymentStrategy { + if in == nil { + return nil + } + out := new(DeploymentStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GatewayTrafficRouting) DeepCopyInto(out *GatewayTrafficRouting) { *out = *in diff --git a/pkg/controller/deployment/controller.go b/pkg/controller/deployment/controller.go index 222131d..a1bf24d 100644 --- a/pkg/controller/deployment/controller.go +++ b/pkg/controller/deployment/controller.go @@ -19,12 +19,10 @@ package deployment import ( "context" + "encoding/json" "flag" "reflect" - "github.com/openkruise/rollouts/pkg/feature" - clientutil "github.com/openkruise/rollouts/pkg/util/client" - utilfeature "github.com/openkruise/rollouts/pkg/util/feature" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -44,6 +42,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + + rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" + deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util" + "github.com/openkruise/rollouts/pkg/feature" + clientutil "github.com/openkruise/rollouts/pkg/util/client" + utilfeature "github.com/openkruise/rollouts/pkg/util/feature" ) func init() { @@ -135,6 +139,9 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { updateHandler := func(e event.UpdateEvent) bool { oldObject := e.ObjectOld.(*appsv1.Deployment) newObject := e.ObjectNew.(*appsv1.Deployment) + if !deploymentutil.IsUnderRolloutControl(newObject) { + return false + } if oldObject.Generation != newObject.Generation || newObject.DeletionTimestamp != nil { klog.V(3).Infof("Observed updated Spec for Deployment: %s/%s", newObject.Namespace, newObject.Name) return true @@ -153,7 +160,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { // Reconcile reads that state of the cluster for a Deployment object and makes changes based on the state read // and what is in the Deployment.Spec and Deployment.Annotations // Automatically generate RBAC rules to allow the Controller to read and write ReplicaSets -func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Request) (res reconcile.Result, retErr error) { +func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) { deployment := new(appsv1.Deployment) err := r.Get(context.TODO(), request.NamespacedName, deployment) if err != nil { @@ -167,12 +174,12 @@ func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Req } // TODO: create new controller only when deployment is under our control - dc, err := r.controllerFactory.NewController(deployment) - if err != nil { + dc := r.controllerFactory.NewController(deployment) + if dc == nil { return reconcile.Result{}, nil } - err = dc.syncDeployment(context.Background(), request.NamespacedName.String()) + err = dc.syncDeployment(context.Background(), deployment) return ctrl.Result{}, err } @@ -180,7 +187,27 @@ type controllerFactory DeploymentController // NewController create a new DeploymentController // TODO: create new controller only when deployment is under our control -func (f *controllerFactory) NewController(_ *appsv1.Deployment) (*DeploymentController, error) { +func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *DeploymentController { + if !deploymentutil.IsUnderRolloutControl(deployment) { + klog.Warningf("Deployment %v is not under rollout control, ignore", klog.KObj(deployment)) + return nil + } + + strategy := rolloutsv1alpha1.DeploymentStrategy{} + strategyAnno := deployment.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation] + if err := json.Unmarshal([]byte(strategyAnno), &strategy); err != nil { + klog.Errorf("Failed to unmarshal strategy for deployment %v: %v", klog.KObj(deployment), strategyAnno) + return nil + } + + // We do NOT process such deployment with canary rolling style + if strategy.RollingStyle == rolloutsv1alpha1.CanaryRollingStyleType { + return nil + } + + marshaled, _ := json.Marshal(&strategy) + klog.V(4).Infof("Processing deployment %v strategy %v", klog.KObj(deployment), string(marshaled)) + return &DeploymentController{ client: f.client, eventBroadcaster: f.eventBroadcaster, @@ -188,5 +215,6 @@ func (f *controllerFactory) NewController(_ *appsv1.Deployment) (*DeploymentCont dLister: f.dLister, rsLister: f.rsLister, podLister: f.podLister, - }, nil + strategy: strategy, + } } diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index b56ec28..b075fd6 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -22,20 +22,24 @@ package deployment import ( "context" + "encoding/json" "fmt" "reflect" + "strings" "time" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + + rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" + deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util" ) const ( @@ -64,6 +68,9 @@ type DeploymentController struct { rsLister appslisters.ReplicaSetLister // podLister can list/get pods from the shared informer's store podLister corelisters.PodLister + + // we will use this strategy to replace spec.strategy of deployment + strategy rolloutsv1alpha1.DeploymentStrategy } // getReplicaSetsForDeployment uses ControllerRefManager to reconcile @@ -81,28 +88,13 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, // syncDeployment will sync the deployment with the given key. // This function is not meant to be invoked concurrently with the same key. -func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error { - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key) - return err - } - +func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment *apps.Deployment) (err error) { startTime := time.Now() - klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime) + klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KObj(deployment), "startTime", startTime) defer func() { - klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime)) + klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KObj(deployment), "duration", time.Since(startTime)) }() - deployment, err := dc.dLister.Deployments(namespace).Get(name) - if errors.IsNotFound(err) { - klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name)) - return nil - } - if err != nil { - return err - } - // Deep-copy otherwise we are mutating our cache. // TODO: Deep-copy only when needed. d := deployment.DeepCopy() @@ -114,39 +106,81 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) d.Status.ObservedGeneration = d.Generation dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) } - return nil + return } // List ReplicaSets owned by this Deployment, while reconciling ControllerRef // through adoption/orphaning. rsList, err := dc.getReplicaSetsForDeployment(ctx, d) if err != nil { - return err + return } if d.DeletionTimestamp != nil { return dc.syncStatusOnly(ctx, d, rsList) } + defer func() { + err = dc.updateExtraStatus(deployment, rsList) + }() + // Update deployment conditions with an Unknown condition when pausing/resuming // a deployment. In this way, we can be sure that we won't timeout when a user // resumes a Deployment with a set progressDeadlineSeconds. if err = dc.checkPausedConditions(ctx, d); err != nil { - return err + return } if d.Spec.Paused { - return dc.sync(ctx, d, rsList) + err = dc.sync(ctx, d, rsList) + return } scalingEvent, err := dc.isScalingEvent(ctx, d, rsList) if err != nil { - return err + return } if scalingEvent { - return dc.sync(ctx, d, rsList) + err = dc.sync(ctx, d, rsList) + return } - return dc.rolloutRolling(ctx, d, rsList) + err = dc.rolloutRolling(ctx, d, rsList) + return +} + +// updateExtraStatus will update extra status for advancedStatus +func (dc *DeploymentController) updateExtraStatus(deployment *apps.Deployment, rsList []*apps.ReplicaSet) error { + newRS, _, err := dc.getAllReplicaSetsAndSyncRevision(context.TODO(), deployment, rsList, false) + if err != nil { + return err + } + + updatedReadyReplicas := int32(0) + if newRS != nil { + updatedReadyReplicas = newRS.Status.ReadyReplicas + } + + extraStatus := &rolloutsv1alpha1.DeploymentExtraStatus{ + ObservedGeneration: deployment.Generation, + UpdatedReadyReplicas: updatedReadyReplicas, + ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, deployment), + } + + extraStatusByte, err := json.Marshal(extraStatus) + if err != nil { + klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(deployment), err) + return nil // no need to retry + } + + extraStatusAnno := string(extraStatusByte) + if deployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno { + return nil // no need to update + } + + extraStatusAnno = strings.Replace(extraStatusAnno, `"`, `\"`, -1) + body := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, rolloutsv1alpha1.DeploymentExtraStatusAnnotation, extraStatusAnno)) + _, err = dc.client.AppsV1().Deployments(deployment.Namespace).Patch(context.TODO(), deployment.Name, types.MergePatchType, body, metav1.PatchOptions{}) + return err } diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index f4879ae..693c30b 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -34,6 +34,8 @@ import ( intstrutil "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" "k8s.io/utils/integer" + + "github.com/openkruise/rollouts/pkg/util" ) const ( @@ -894,3 +896,25 @@ func (o ReplicaSetsBySizeNewer) Less(i, j int) bool { --------------------------------- END --------------------------------------- **** Copied from "k8s.io/kubernetes/pkg/controller/controller_utils.go" **** */ + +// IsUnderRolloutControl return true if this deployment should be controlled by our controller. +func IsUnderRolloutControl(deployment *apps.Deployment) bool { + if deployment.Annotations[util.BatchReleaseControlAnnotation] == "" { + return false + } + if deployment.Spec.Strategy.Type != apps.RecreateDeploymentStrategyType { + return false + } + return deployment.Spec.Paused +} + +// NewRSReplicasLimit return a limited replicas of new RS calculated via partition. +func NewRSReplicasLimit(partition intstrutil.IntOrString, deployment *apps.Deployment) int32 { + replicas := int(*deployment.Spec.Replicas) + replicaLimit, _ := intstrutil.GetScaledValueFromIntOrPercent(&partition, replicas, true) + replicaLimit = integer.IntMax(integer.IntMin(replicaLimit, replicas), 0) + if replicas > 1 && partition.Type == intstrutil.String && partition.String() != "100%" { + replicaLimit = integer.IntMin(replicaLimit, replicas-1) + } + return int32(replicaLimit) +} diff --git a/pkg/controller/deployment/util/deployment_util_test.go b/pkg/controller/deployment/util/deployment_util_test.go index fa32272..16bc1ed 100644 --- a/pkg/controller/deployment/util/deployment_util_test.go +++ b/pkg/controller/deployment/util/deployment_util_test.go @@ -31,6 +31,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/integer" + "k8s.io/utils/pointer" ) func newDControllerRef(d *apps.Deployment) *metav1.OwnerReference { @@ -1167,3 +1169,30 @@ func TestReplicasAnnotationsNeedUpdate(t *testing.T) { }) } } + +func TestNewRSReplicasLimit(t *testing.T) { + for partitionInt := 0; partitionInt < 1000; partitionInt++ { + partition := intstr.FromInt(partitionInt) + deployment := apps.Deployment{Spec: apps.DeploymentSpec{Replicas: pointer.Int32(100)}} + result := NewRSReplicasLimit(partition, &deployment) + expected := integer.Int32Min(int32(partitionInt), 100) + if result != expected { + t.Errorf("case[1]: Expected %v, Got: %v", expected, result) + } + } + + for replicas := 0; replicas < 1000; replicas++ { + for partitionPercent := 0; partitionPercent <= 100; partitionPercent++ { + partition := intstr.FromString(fmt.Sprintf("%d%%", partitionPercent)) + deployment := apps.Deployment{Spec: apps.DeploymentSpec{Replicas: pointer.Int32(int32(replicas))}} + result := NewRSReplicasLimit(partition, &deployment) + expected, _ := intstr.GetScaledValueFromIntOrPercent(&partition, replicas, true) + if partitionPercent != 100 && replicas > 1 { + expected = integer.IntMin(expected, replicas-1) + } + if result != int32(expected) { + t.Errorf("case[2]: Expected %v, Got: %v, replicas %d, partition %d%%", expected, result, replicas, partitionPercent) + } + } + } +}