add advanced deployment api (#106)

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
Co-authored-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
This commit is contained in:
Wei-Xiang Sun 2022-12-21 12:20:09 +08:00 committed by GitHub
parent 7bfc93cd73
commit 3165f4e8c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 276 additions and 36 deletions

View File

@ -0,0 +1,81 @@
package v1alpha1
import (
apps "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
const (
// DeploymentStrategyAnnotation is annotation for deployment,
// which is strategy fields of Advanced Deployment.
DeploymentStrategyAnnotation = "rollouts.kruise.io/deployment-strategy"
// DeploymentExtraStatusAnnotation is annotation for deployment,
// which is extra status field of Advanced Deployment.
DeploymentExtraStatusAnnotation = "rollouts.kruise.io/deployment-extra-status"
)
// DeploymentStrategy is strategy field for Advanced Deployment
type DeploymentStrategy struct {
// RollingStyle define the behavior of rolling for deployment.
RollingStyle RollingStyleType `json:"rollingStyle,omitempty"`
// original deployment strategy rolling update fields
RollingUpdate *apps.RollingUpdateDeployment `json:"rollingUpdate,omitempty"`
// Paused = true will block the upgrade of Pods
Paused bool `json:"paused,omitempty"`
// Partition describe how many Pods should be updated during rollout.
// We use this field to implement partition-style rolling update.
Partition intstr.IntOrString `json:"partition,omitempty"`
}
type RollingStyleType string
const (
// PartitionRollingStyleType means rolling in batches just like CloneSet, and will NOT create any extra Deployment;
PartitionRollingStyleType RollingStyleType = "Partition"
// CanaryRollingStyleType means rolling in canary way, and will create a canary Deployment.
CanaryRollingStyleType RollingStyleType = "Canary"
)
// DeploymentExtraStatus is extra status field for Advanced Deployment
type DeploymentExtraStatus struct {
// ObservedGeneration record the generation of deployment this status observed.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
// UpdatedReadyReplicas the number of pods that has been updated and ready.
UpdatedReadyReplicas int32 `json:"updatedReadyReplicas,omitempty"`
// ExpectedUpdatedReplicas is an absolute number calculated based on Partition
// and Deployment.Spec.Replicas, means how many pods are expected be updated under
// current strategy.
// This field is designed to avoid users to fall into the details of algorithm
// for Partition calculation.
ExpectedUpdatedReplicas int32 `json:"expectedUpdatedReplicas,omitempty"`
}
func SetDefaultDeploymentStrategy(strategy *DeploymentStrategy) {
if strategy.RollingStyle == CanaryRollingStyleType {
return
}
if strategy.RollingUpdate == nil {
strategy.RollingUpdate = &apps.RollingUpdateDeployment{}
}
if strategy.RollingUpdate.MaxUnavailable == nil {
// Set MaxUnavailable as 25% by default
maxUnavailable := intstr.FromString("25%")
strategy.RollingUpdate.MaxUnavailable = &maxUnavailable
}
if strategy.RollingUpdate.MaxSurge == nil {
// Set MaxSurge as 25% by default
maxSurge := intstr.FromString("25%")
strategy.RollingUpdate.MaxUnavailable = &maxSurge
}
// Cannot allow maxSurge==0 && MaxUnavailable==0, otherwise, no pod can be updated when rolling update.
maxSurge, _ := intstr.GetScaledValueFromIntOrPercent(strategy.RollingUpdate.MaxSurge, 100, true)
maxUnavailable, _ := intstr.GetScaledValueFromIntOrPercent(strategy.RollingUpdate.MaxUnavailable, 100, true)
if maxSurge == 0 && maxUnavailable == 0 {
strategy.RollingUpdate = &apps.RollingUpdateDeployment{
MaxSurge: &intstr.IntOrString{Type: intstr.Int, IntVal: 0},
MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 1},
}
}
}

View File

@ -41,6 +41,13 @@ const (
// RollbackInBatchAnnotation is set to rollout annotations.
// RollbackInBatchAnnotation allow use disable quick rollback, and will roll back in batch style.
RollbackInBatchAnnotation = "rollouts.kruise.io/rollback-in-batch"
// DeploymentRolloutStyleAnnotation define the rolling behavior for Deployment.
// must be "partition" or "canary":
// * "partition" means rolling Deployment in batches just like CloneSet, and will NOT create any extra Deployment;
// * "canary" means rolling in canary way, and will create a canary Deployment.
// Defaults to canary
DeploymentRolloutStyleAnnotation = "rollouts.kruise.io/deployment-rolling-style"
)
// RolloutSpec defines the desired state of Rollout

View File

@ -22,6 +22,7 @@ limitations under the License.
package v1alpha1
import (
"k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/gateway-api/apis/v1alpha2"
@ -265,6 +266,42 @@ func (in *CanaryStrategy) DeepCopy() *CanaryStrategy {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DeploymentExtraStatus) DeepCopyInto(out *DeploymentExtraStatus) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeploymentExtraStatus.
func (in *DeploymentExtraStatus) DeepCopy() *DeploymentExtraStatus {
if in == nil {
return nil
}
out := new(DeploymentExtraStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DeploymentStrategy) DeepCopyInto(out *DeploymentStrategy) {
*out = *in
if in.RollingUpdate != nil {
in, out := &in.RollingUpdate, &out.RollingUpdate
*out = new(v1.RollingUpdateDeployment)
(*in).DeepCopyInto(*out)
}
out.Partition = in.Partition
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeploymentStrategy.
func (in *DeploymentStrategy) DeepCopy() *DeploymentStrategy {
if in == nil {
return nil
}
out := new(DeploymentStrategy)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *GatewayTrafficRouting) DeepCopyInto(out *GatewayTrafficRouting) {
*out = *in

View File

@ -19,12 +19,10 @@ package deployment
import (
"context"
"encoding/json"
"flag"
"reflect"
"github.com/openkruise/rollouts/pkg/feature"
clientutil "github.com/openkruise/rollouts/pkg/util/client"
utilfeature "github.com/openkruise/rollouts/pkg/util/feature"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
@ -44,6 +42,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util"
"github.com/openkruise/rollouts/pkg/feature"
clientutil "github.com/openkruise/rollouts/pkg/util/client"
utilfeature "github.com/openkruise/rollouts/pkg/util/feature"
)
func init() {
@ -135,6 +139,9 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
updateHandler := func(e event.UpdateEvent) bool {
oldObject := e.ObjectOld.(*appsv1.Deployment)
newObject := e.ObjectNew.(*appsv1.Deployment)
if !deploymentutil.IsUnderRolloutControl(newObject) {
return false
}
if oldObject.Generation != newObject.Generation || newObject.DeletionTimestamp != nil {
klog.V(3).Infof("Observed updated Spec for Deployment: %s/%s", newObject.Namespace, newObject.Name)
return true
@ -153,7 +160,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Reconcile reads that state of the cluster for a Deployment object and makes changes based on the state read
// and what is in the Deployment.Spec and Deployment.Annotations
// Automatically generate RBAC rules to allow the Controller to read and write ReplicaSets
func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Request) (res reconcile.Result, retErr error) {
func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
deployment := new(appsv1.Deployment)
err := r.Get(context.TODO(), request.NamespacedName, deployment)
if err != nil {
@ -167,12 +174,12 @@ func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Req
}
// TODO: create new controller only when deployment is under our control
dc, err := r.controllerFactory.NewController(deployment)
if err != nil {
dc := r.controllerFactory.NewController(deployment)
if dc == nil {
return reconcile.Result{}, nil
}
err = dc.syncDeployment(context.Background(), request.NamespacedName.String())
err = dc.syncDeployment(context.Background(), deployment)
return ctrl.Result{}, err
}
@ -180,7 +187,27 @@ type controllerFactory DeploymentController
// NewController create a new DeploymentController
// TODO: create new controller only when deployment is under our control
func (f *controllerFactory) NewController(_ *appsv1.Deployment) (*DeploymentController, error) {
func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *DeploymentController {
if !deploymentutil.IsUnderRolloutControl(deployment) {
klog.Warningf("Deployment %v is not under rollout control, ignore", klog.KObj(deployment))
return nil
}
strategy := rolloutsv1alpha1.DeploymentStrategy{}
strategyAnno := deployment.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation]
if err := json.Unmarshal([]byte(strategyAnno), &strategy); err != nil {
klog.Errorf("Failed to unmarshal strategy for deployment %v: %v", klog.KObj(deployment), strategyAnno)
return nil
}
// We do NOT process such deployment with canary rolling style
if strategy.RollingStyle == rolloutsv1alpha1.CanaryRollingStyleType {
return nil
}
marshaled, _ := json.Marshal(&strategy)
klog.V(4).Infof("Processing deployment %v strategy %v", klog.KObj(deployment), string(marshaled))
return &DeploymentController{
client: f.client,
eventBroadcaster: f.eventBroadcaster,
@ -188,5 +215,6 @@ func (f *controllerFactory) NewController(_ *appsv1.Deployment) (*DeploymentCont
dLister: f.dLister,
rsLister: f.rsLister,
podLister: f.podLister,
}, nil
strategy: strategy,
}
}

View File

@ -22,20 +22,24 @@ package deployment
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"time"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util"
)
const (
@ -64,6 +68,9 @@ type DeploymentController struct {
rsLister appslisters.ReplicaSetLister
// podLister can list/get pods from the shared informer's store
podLister corelisters.PodLister
// we will use this strategy to replace spec.strategy of deployment
strategy rolloutsv1alpha1.DeploymentStrategy
}
// getReplicaSetsForDeployment uses ControllerRefManager to reconcile
@ -81,28 +88,13 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context,
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
return err
}
func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment *apps.Deployment) (err error) {
startTime := time.Now()
klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KObj(deployment), "startTime", startTime)
defer func() {
klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KObj(deployment), "duration", time.Since(startTime))
}()
deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
return nil
}
if err != nil {
return err
}
// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()
@ -114,39 +106,81 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, key string)
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
return nil
return
}
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
if err != nil {
return err
return
}
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(ctx, d, rsList)
}
defer func() {
err = dc.updateExtraStatus(deployment, rsList)
}()
// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a user
// resumes a Deployment with a set progressDeadlineSeconds.
if err = dc.checkPausedConditions(ctx, d); err != nil {
return err
return
}
if d.Spec.Paused {
return dc.sync(ctx, d, rsList)
err = dc.sync(ctx, d, rsList)
return
}
scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
if err != nil {
return err
return
}
if scalingEvent {
return dc.sync(ctx, d, rsList)
err = dc.sync(ctx, d, rsList)
return
}
return dc.rolloutRolling(ctx, d, rsList)
err = dc.rolloutRolling(ctx, d, rsList)
return
}
// updateExtraStatus will update extra status for advancedStatus
func (dc *DeploymentController) updateExtraStatus(deployment *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, _, err := dc.getAllReplicaSetsAndSyncRevision(context.TODO(), deployment, rsList, false)
if err != nil {
return err
}
updatedReadyReplicas := int32(0)
if newRS != nil {
updatedReadyReplicas = newRS.Status.ReadyReplicas
}
extraStatus := &rolloutsv1alpha1.DeploymentExtraStatus{
ObservedGeneration: deployment.Generation,
UpdatedReadyReplicas: updatedReadyReplicas,
ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, deployment),
}
extraStatusByte, err := json.Marshal(extraStatus)
if err != nil {
klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(deployment), err)
return nil // no need to retry
}
extraStatusAnno := string(extraStatusByte)
if deployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno {
return nil // no need to update
}
extraStatusAnno = strings.Replace(extraStatusAnno, `"`, `\"`, -1)
body := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, rolloutsv1alpha1.DeploymentExtraStatusAnnotation, extraStatusAnno))
_, err = dc.client.AppsV1().Deployments(deployment.Namespace).Patch(context.TODO(), deployment.Name, types.MergePatchType, body, metav1.PatchOptions{})
return err
}

View File

@ -34,6 +34,8 @@ import (
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"k8s.io/utils/integer"
"github.com/openkruise/rollouts/pkg/util"
)
const (
@ -894,3 +896,25 @@ func (o ReplicaSetsBySizeNewer) Less(i, j int) bool {
--------------------------------- END ---------------------------------------
**** Copied from "k8s.io/kubernetes/pkg/controller/controller_utils.go" ****
*/
// IsUnderRolloutControl return true if this deployment should be controlled by our controller.
func IsUnderRolloutControl(deployment *apps.Deployment) bool {
if deployment.Annotations[util.BatchReleaseControlAnnotation] == "" {
return false
}
if deployment.Spec.Strategy.Type != apps.RecreateDeploymentStrategyType {
return false
}
return deployment.Spec.Paused
}
// NewRSReplicasLimit return a limited replicas of new RS calculated via partition.
func NewRSReplicasLimit(partition intstrutil.IntOrString, deployment *apps.Deployment) int32 {
replicas := int(*deployment.Spec.Replicas)
replicaLimit, _ := intstrutil.GetScaledValueFromIntOrPercent(&partition, replicas, true)
replicaLimit = integer.IntMax(integer.IntMin(replicaLimit, replicas), 0)
if replicas > 1 && partition.Type == intstrutil.String && partition.String() != "100%" {
replicaLimit = integer.IntMin(replicaLimit, replicas-1)
}
return int32(replicaLimit)
}

View File

@ -31,6 +31,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/integer"
"k8s.io/utils/pointer"
)
func newDControllerRef(d *apps.Deployment) *metav1.OwnerReference {
@ -1167,3 +1169,30 @@ func TestReplicasAnnotationsNeedUpdate(t *testing.T) {
})
}
}
func TestNewRSReplicasLimit(t *testing.T) {
for partitionInt := 0; partitionInt < 1000; partitionInt++ {
partition := intstr.FromInt(partitionInt)
deployment := apps.Deployment{Spec: apps.DeploymentSpec{Replicas: pointer.Int32(100)}}
result := NewRSReplicasLimit(partition, &deployment)
expected := integer.Int32Min(int32(partitionInt), 100)
if result != expected {
t.Errorf("case[1]: Expected %v, Got: %v", expected, result)
}
}
for replicas := 0; replicas < 1000; replicas++ {
for partitionPercent := 0; partitionPercent <= 100; partitionPercent++ {
partition := intstr.FromString(fmt.Sprintf("%d%%", partitionPercent))
deployment := apps.Deployment{Spec: apps.DeploymentSpec{Replicas: pointer.Int32(int32(replicas))}}
result := NewRSReplicasLimit(partition, &deployment)
expected, _ := intstr.GetScaledValueFromIntOrPercent(&partition, replicas, true)
if partitionPercent != 100 && replicas > 1 {
expected = integer.IntMin(expected, replicas-1)
}
if result != int32(expected) {
t.Errorf("case[2]: Expected %v, Got: %v, replicas %d, partition %d%%", expected, result, replicas, partitionPercent)
}
}
}
}