add advanced deployment controller

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
This commit is contained in:
mingzhou.swx 2023-01-04 00:30:34 +08:00
parent 3165f4e8c6
commit 0babeee8af
18 changed files with 1592 additions and 162 deletions

View File

@ -0,0 +1,110 @@
name: E2E-Advanced-Deployment-1.19
on:
push:
branches:
- master
- release-*
pull_request: {}
workflow_dispatch: {}
env:
# Common versions
GO_VERSION: '1.17'
KIND_IMAGE: 'kindest/node:v1.19.16'
KIND_CLUSTER_NAME: 'ci-testing'
jobs:
rollout:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: ${{ env.GO_VERSION }}
- name: Setup Kind Cluster
uses: helm/kind-action@v1.2.0
with:
node_image: ${{ env.KIND_IMAGE }}
cluster_name: ${{ env.KIND_CLUSTER_NAME }}
config: ./test/kind-conf.yaml
- name: Build image
run: |
export IMAGE="openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID}"
docker build --pull --no-cache . -t $IMAGE
kind load docker-image --name=${KIND_CLUSTER_NAME} $IMAGE || { echo >&2 "kind not installed or error loading image: $IMAGE"; exit 1; }
- name: Install Kruise
run: |
set -ex
kubectl cluster-info
make helm
helm repo add openkruise https://openkruise.github.io/charts/
helm repo update
helm install kruise openkruise/kruise
for ((i=1;i<10;i++));
do
set +e
PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l)
set -e
if [ "$PODS" -eq "2" ]; then
break
fi
sleep 3
done
set +e
PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l)
set -e
if [ "$PODS" -eq "2" ]; then
echo "Wait for kruise-manager ready successfully"
else
echo "Timeout to wait for kruise-manager ready"
exit 1
fi
- name: Install Kruise Rollout
run: |
set -ex
kubectl cluster-info
IMG=openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh
for ((i=1;i<10;i++));
do
set +e
PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l)
set -e
if [ "$PODS" -eq "1" ]; then
break
fi
sleep 3
done
set +e
PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l)
kubectl get node -o yaml
kubectl get all -n kruise-rollout -o yaml
set -e
if [ "$PODS" -eq "1" ]; then
echo "Wait for kruise-rollout ready successfully"
else
echo "Timeout to wait for kruise-rollout ready"
exit 1
fi
- name: Run E2E Tests
run: |
export KUBECONFIG=/home/runner/.kube/config
make ginkgo
set +e
./bin/ginkgo -timeout 60m -v --focus='Advanced Deployment' test/e2e
retVal=$?
# kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout
restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}')
if [ "${restartCount}" -eq "0" ];then
echo "Kruise-rollout has not restarted"
else
kubectl get pod -n kruise-rollout --no-headers
echo "Kruise-rollout has restarted, abort!!!"
kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout
exit 1
fi
exit $retVal

View File

@ -0,0 +1,110 @@
name: E2E-Advanced-Deployment-1.23
on:
push:
branches:
- master
- release-*
pull_request: {}
workflow_dispatch: {}
env:
# Common versions
GO_VERSION: '1.17'
KIND_IMAGE: 'kindest/node:v1.23.3'
KIND_CLUSTER_NAME: 'ci-testing'
jobs:
rollout:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: ${{ env.GO_VERSION }}
- name: Setup Kind Cluster
uses: helm/kind-action@v1.2.0
with:
node_image: ${{ env.KIND_IMAGE }}
cluster_name: ${{ env.KIND_CLUSTER_NAME }}
config: ./test/kind-conf.yaml
- name: Build image
run: |
export IMAGE="openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID}"
docker build --pull --no-cache . -t $IMAGE
kind load docker-image --name=${KIND_CLUSTER_NAME} $IMAGE || { echo >&2 "kind not installed or error loading image: $IMAGE"; exit 1; }
- name: Install Kruise
run: |
set -ex
kubectl cluster-info
make helm
helm repo add openkruise https://openkruise.github.io/charts/
helm repo update
helm install kruise openkruise/kruise
for ((i=1;i<10;i++));
do
set +e
PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l)
set -e
if [ "$PODS" -eq "2" ]; then
break
fi
sleep 3
done
set +e
PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l)
set -e
if [ "$PODS" -eq "2" ]; then
echo "Wait for kruise-manager ready successfully"
else
echo "Timeout to wait for kruise-manager ready"
exit 1
fi
- name: Install Kruise Rollout
run: |
set -ex
kubectl cluster-info
IMG=openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh
for ((i=1;i<10;i++));
do
set +e
PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l)
set -e
if [ "$PODS" -eq "1" ]; then
break
fi
sleep 3
done
set +e
PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l)
kubectl get node -o yaml
kubectl get all -n kruise-rollout -o yaml
set -e
if [ "$PODS" -eq "1" ]; then
echo "Wait for kruise-rollout ready successfully"
else
echo "Timeout to wait for kruise-rollout ready"
exit 1
fi
- name: Run E2E Tests
run: |
export KUBECONFIG=/home/runner/.kube/config
make ginkgo
set +e
./bin/ginkgo -timeout 60m -v --focus='Advanced Deployment' test/e2e
retVal=$?
# kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout
restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}')
if [ "${restartCount}" -eq "0" ];then
echo "Kruise-rollout has not restarted"
else
kubectl get pod -n kruise-rollout --no-headers
echo "Kruise-rollout has restarted, abort!!!"
kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout
exit 1
fi
exit $retVal

View File

@ -31,16 +31,14 @@ type DeploymentStrategy struct {
type RollingStyleType string
const (
// PartitionRollingStyleType means rolling in batches just like CloneSet, and will NOT create any extra Deployment;
PartitionRollingStyleType RollingStyleType = "Partition"
// CanaryRollingStyleType means rolling in canary way, and will create a canary Deployment.
CanaryRollingStyleType RollingStyleType = "Canary"
// PartitionRollingStyle means rolling in batches just like CloneSet, and will NOT create any extra Deployment;
PartitionRollingStyle RollingStyleType = "Partition"
// CanaryRollingStyle means rolling in canary way, and will create a canary Deployment.
CanaryRollingStyle RollingStyleType = "Canary"
)
// DeploymentExtraStatus is extra status field for Advanced Deployment
type DeploymentExtraStatus struct {
// ObservedGeneration record the generation of deployment this status observed.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
// UpdatedReadyReplicas the number of pods that has been updated and ready.
UpdatedReadyReplicas int32 `json:"updatedReadyReplicas,omitempty"`
// ExpectedUpdatedReplicas is an absolute number calculated based on Partition
@ -52,7 +50,7 @@ type DeploymentExtraStatus struct {
}
func SetDefaultDeploymentStrategy(strategy *DeploymentStrategy) {
if strategy.RollingStyle == CanaryRollingStyleType {
if strategy.RollingStyle == CanaryRollingStyle {
return
}
if strategy.RollingUpdate == nil {

View File

@ -14,4 +14,5 @@ spec:
- "--health-probe-bind-address=:8081"
- "--metrics-bind-address=127.0.0.1:8080"
- "--leader-elect"
- "--feature-gates=AdvancedDeployment=true"
- "--v=3"

View File

@ -29,6 +29,7 @@ spec:
- /manager
args:
- --leader-elect
- --feature-gates=AdvancedDeployment=true
image: controller:latest
name: manager
securityContext:

View File

@ -24,6 +24,7 @@ import (
kruisev1beta1 "github.com/openkruise/kruise-api/apps/v1beta1"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
br "github.com/openkruise/rollouts/pkg/controller/batchrelease"
"github.com/openkruise/rollouts/pkg/controller/deployment"
"github.com/openkruise/rollouts/pkg/controller/rollout"
"github.com/openkruise/rollouts/pkg/controller/rollouthistory"
utilclient "github.com/openkruise/rollouts/pkg/util/client"
@ -116,6 +117,10 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "rollouthistory")
os.Exit(1)
}
if err = deployment.Add(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "advanceddeployment")
os.Exit(1)
}
//+kubebuilder:scaffold:builder
setupLog.Info("setup webhook")

View File

@ -22,14 +22,15 @@ import (
"encoding/json"
"flag"
"reflect"
"time"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
@ -51,9 +52,13 @@ import (
)
func init() {
flag.IntVar(&concurrentReconciles, "deployment-workers", concurrentReconciles, "Max concurrent workers for StatefulSet controller.")
flag.IntVar(&concurrentReconciles, "deployment-workers", concurrentReconciles, "Max concurrent workers for advanced deployment controller.")
}
const (
DefaultRetryDuration = 2 * time.Second
)
var (
concurrentReconciles = 3
)
@ -75,10 +80,6 @@ func Add(mgr manager.Manager) error {
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
cacher := mgr.GetCache()
podInformer, err := cacher.GetInformerForKind(context.TODO(), v1.SchemeGroupVersion.WithKind("Pod"))
if err != nil {
return nil, err
}
dInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("Deployment"))
if err != nil {
return nil, err
@ -91,7 +92,6 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
// Lister
dLister := appslisters.NewDeploymentLister(dInformer.(toolscache.SharedIndexInformer).GetIndexer())
rsLister := appslisters.NewReplicaSetLister(rsInformer.(toolscache.SharedIndexInformer).GetIndexer())
podLister := corelisters.NewPodLister(podInformer.(toolscache.SharedIndexInformer).GetIndexer())
// Client & Recorder
genericClient := clientutil.GetGenericClientWithName("advanced-deployment-controller")
@ -107,7 +107,6 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
eventRecorder: recorder,
dLister: dLister,
rsLister: rsLister,
podLister: podLister,
}
return &ReconcileDeployment{Client: mgr.GetClient(), controllerFactory: factory}, nil
}
@ -179,8 +178,22 @@ func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Req
return reconcile.Result{}, nil
}
errList := field.ErrorList{}
err = dc.syncDeployment(context.Background(), deployment)
return ctrl.Result{}, err
if err != nil {
errList = append(errList, field.InternalError(field.NewPath("syncDeployment"), err))
}
err = dc.patchExtraStatus(deployment)
if err != nil {
errList = append(errList, field.InternalError(field.NewPath("patchExtraStatus"), err))
}
err = deploymentutil.DeploymentRolloutSatisfied(deployment, dc.strategy.Partition)
if err != nil {
klog.V(3).Infof("Deployment %v is still rolling: %v", klog.KObj(deployment), err)
return reconcile.Result{RequeueAfter: DefaultRetryDuration}, errList.ToAggregate()
}
return ctrl.Result{}, errList.ToAggregate()
}
type controllerFactory DeploymentController
@ -201,7 +214,7 @@ func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *Deploy
}
// We do NOT process such deployment with canary rolling style
if strategy.RollingStyle == rolloutsv1alpha1.CanaryRollingStyleType {
if strategy.RollingStyle == rolloutsv1alpha1.CanaryRollingStyle {
return nil
}
@ -214,7 +227,6 @@ func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *Deploy
eventRecorder: f.eventRecorder,
dLister: f.dLister,
rsLister: f.rsLister,
podLister: f.podLister,
strategy: strategy,
}
}

View File

@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
@ -66,8 +65,6 @@ type DeploymentController struct {
dLister appslisters.DeploymentLister
// rsLister can list/get replica sets from the shared informer's store
rsLister appslisters.ReplicaSetLister
// podLister can list/get pods from the shared informer's store
podLister corelisters.PodLister
// we will use this strategy to replace spec.strategy of deployment
strategy rolloutsv1alpha1.DeploymentStrategy
@ -88,7 +85,7 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context,
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment *apps.Deployment) (err error) {
func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment *apps.Deployment) error {
startTime := time.Now()
klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KObj(deployment), "startTime", startTime)
defer func() {
@ -106,64 +103,50 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment *
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
return
return nil
}
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
if err != nil {
return
return err
}
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(ctx, d, rsList)
}
defer func() {
err = dc.updateExtraStatus(deployment, rsList)
}()
// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a user
// resumes a Deployment with a set progressDeadlineSeconds.
if err = dc.checkPausedConditions(ctx, d); err != nil {
return
}
if d.Spec.Paused {
err = dc.sync(ctx, d, rsList)
return
if dc.strategy.Paused {
return dc.sync(ctx, d, rsList)
}
scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
if err != nil {
return
return err
}
if scalingEvent {
err = dc.sync(ctx, d, rsList)
return
return dc.sync(ctx, d, rsList)
}
err = dc.rolloutRolling(ctx, d, rsList)
return
return dc.rolloutRolling(ctx, d, rsList)
}
// updateExtraStatus will update extra status for advancedStatus
func (dc *DeploymentController) updateExtraStatus(deployment *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, _, err := dc.getAllReplicaSetsAndSyncRevision(context.TODO(), deployment, rsList, false)
// patchExtraStatus will update extra status for advancedStatus
func (dc *DeploymentController) patchExtraStatus(deployment *apps.Deployment) error {
rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), deployment)
if err != nil {
return err
}
updatedReadyReplicas := int32(0)
newRS := deploymentutil.FindNewReplicaSet(deployment, rsList)
if newRS != nil {
updatedReadyReplicas = newRS.Status.ReadyReplicas
}
extraStatus := &rolloutsv1alpha1.DeploymentExtraStatus{
ObservedGeneration: deployment.Generation,
UpdatedReadyReplicas: updatedReadyReplicas,
ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, deployment),
}
@ -179,8 +162,11 @@ func (dc *DeploymentController) updateExtraStatus(deployment *apps.Deployment, r
return nil // no need to update
}
extraStatusAnno = strings.Replace(extraStatusAnno, `"`, `\"`, -1)
body := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, rolloutsv1alpha1.DeploymentExtraStatusAnnotation, extraStatusAnno))
_, err = dc.client.AppsV1().Deployments(deployment.Namespace).Patch(context.TODO(), deployment.Name, types.MergePatchType, body, metav1.PatchOptions{})
body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`,
rolloutsv1alpha1.DeploymentExtraStatusAnnotation,
strings.Replace(extraStatusAnno, `"`, `\"`, -1))
_, err = dc.client.AppsV1().Deployments(deployment.Namespace).
Patch(context.TODO(), deployment.Name, types.MergePatchType, []byte(body), metav1.PatchOptions{})
return err
}

View File

@ -0,0 +1,216 @@
package deployment
import (
"context"
"fmt"
"strconv"
"strings"
"testing"
apps "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/controller/deployment/util"
)
func TestSyncDeployment(t *testing.T) {
tests := map[string]struct {
oldRSsReplicas []int32
newRSReplicas int32
dReplicas int32
oldRSsAvailable []int32
newRSAvailable int32
dAvailable int32
partition intstrutil.IntOrString
maxSurge intstrutil.IntOrString
maxUnavailable intstrutil.IntOrString
expectOldReplicas int32
expectNewReplicas int32
}{
"rolling new: surge new first, limited by maxSurge": {
[]int32{6, 4}, 0, 10,
[]int32{0, 0}, 0, 0,
intstrutil.FromInt(4), intstrutil.FromInt(2), intstrutil.FromString("50%"),
10, 2,
},
"rolling new: surge new first, limited by partition": {
[]int32{6, 4}, 0, 10,
[]int32{0, 0}, 0, 0,
intstrutil.FromInt(4), intstrutil.FromInt(10), intstrutil.FromString("50%"),
10, 4,
},
"rolling old: limited by maxUnavailable": {
[]int32{6, 4}, 2, 10,
[]int32{6, 4}, 0, 10,
intstrutil.FromInt(4), intstrutil.FromInt(2), intstrutil.FromString("20%"),
8, 2,
},
"rolling new: limited by partition": {
[]int32{6, 2}, 2, 10,
[]int32{6, 2}, 0, 8,
intstrutil.FromInt(3), intstrutil.FromInt(5), intstrutil.FromString("40%"),
8, 3,
},
"rolling new: scaling down old first, limited by partition": {
[]int32{6, 4}, 0, 10,
[]int32{6, 4}, 0, 10,
intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromString("40%"),
7, 0,
},
"rolling new: scaling down old first, limited by maxUnavailable": {
[]int32{6, 4}, 0, 10,
[]int32{6, 4}, 0, 10,
intstrutil.FromInt(5), intstrutil.FromInt(0), intstrutil.FromString("20%"),
8, 0,
},
"no op: partition satisfied, maxSurge>0, new pod unavailable": {
[]int32{3, 4}, 3, 10,
[]int32{3, 4}, 0, 7,
intstrutil.FromInt(3), intstrutil.FromInt(2), intstrutil.FromString("30%"),
7, 3,
},
"no op: partition satisfied, maxSurge>0, new pod available": {
[]int32{3, 4}, 3, 10,
[]int32{3, 4}, 3, 10,
intstrutil.FromInt(3), intstrutil.FromInt(2), intstrutil.FromString("30%"),
7, 3,
},
"rolling old: scale down old to satisfied replicas": {
[]int32{3}, 3, 5,
[]int32{3}, 3, 6,
intstrutil.FromInt(3), intstrutil.FromInt(2), intstrutil.FromString("25%"),
2, 3,
},
"scale up: scale down old first": {
[]int32{4}, 0, 5,
[]int32{4}, 0, 4,
intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromInt(1),
5, 0,
},
"scale up": {
[]int32{5}, 5, 20,
[]int32{5}, 5, 10,
intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromString("30%"),
10, 10,
},
"scale down": {
[]int32{12}, 8, 10,
[]int32{12}, 8, 20,
intstrutil.FromInt(5), intstrutil.FromInt(0), intstrutil.FromString("30%"),
6, 4,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fakeRecord := record.NewFakeRecorder(10)
informers := informers.NewSharedInformerFactory(fakeClient, 0)
rsInformer := informers.Apps().V1().ReplicaSets().Informer()
dInformer := informers.Apps().V1().Deployments().Informer()
var deployment apps.Deployment
var newRS apps.ReplicaSet
{
deployment = generateDeployment("busybox")
deployment.Spec.Replicas = pointer.Int32(test.dReplicas)
deployment.Status.ReadyReplicas = test.newRSReplicas
availableReplicas := test.newRSAvailable
for _, available := range test.oldRSsAvailable {
availableReplicas += available
}
deployment.Status.UpdatedReplicas = test.newRSReplicas
deployment.Status.Replicas = availableReplicas
deployment.Status.AvailableReplicas = availableReplicas
dInformer.GetIndexer().Add(&deployment)
_, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
{
for index, replicas := range test.oldRSsReplicas {
rs := generateRS(deployment)
rs.SetName(fmt.Sprintf("rs-%d", index))
rs.Spec.Replicas = pointer.Int32(replicas)
rs.Status.Replicas = replicas
if strings.HasPrefix(name, "scale") {
rs.Annotations = map[string]string{
util.DesiredReplicasAnnotation: strconv.Itoa(-1),
util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)),
}
}
rs.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("old-version-%d", index)
rs.Status.ReadyReplicas = test.oldRSsAvailable[index]
rs.Status.AvailableReplicas = test.oldRSsAvailable[index]
rsInformer.GetIndexer().Add(&rs)
_, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
}
{
newRS = generateRS(deployment)
newRS.SetName("rs-new")
newRS.Spec.Replicas = pointer.Int32(test.newRSReplicas)
if strings.HasPrefix(name, "scale") {
newRS.Annotations = map[string]string{
util.DesiredReplicasAnnotation: strconv.Itoa(-1),
util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)),
}
}
newRS.Status.Replicas = test.newRSReplicas
newRS.Status.ReadyReplicas = test.newRSAvailable
newRS.Status.AvailableReplicas = test.newRSAvailable
rsInformer.GetIndexer().Add(&newRS)
_, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
dc := &DeploymentController{
client: fakeClient,
eventRecorder: fakeRecord,
dLister: appslisters.NewDeploymentLister(dInformer.GetIndexer()),
rsLister: appslisters.NewReplicaSetLister(rsInformer.GetIndexer()),
strategy: rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &test.maxSurge,
MaxUnavailable: &test.maxUnavailable,
},
Partition: test.partition,
},
}
err := dc.syncDeployment(context.TODO(), &deployment)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
rss, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
resultOld := int32(0)
resultNew := int32(0)
for _, rs := range rss.Items {
if rs.GetName() != "rs-new" {
resultOld += *rs.Spec.Replicas
} else {
resultNew = *rs.Spec.Replicas
}
}
if resultOld != test.expectOldReplicas || resultNew != test.expectNewReplicas {
t.Fatalf("expect new %d, but got new %d; expect old %d, but got old %d ", test.expectNewReplicas, resultNew, test.expectOldReplicas, resultOld)
}
})
}
}

View File

@ -35,7 +35,7 @@ import (
// for example a resync of the deployment after it was scaled up. In those cases,
// we shouldn't try to estimate any progress.
func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
newStatus := calculateStatus(allRSs, newRS, d)
newStatus := calculateStatus(allRSs, newRS, d, &dc.strategy)
// If there is no progressDeadlineSeconds set, remove any Progressing condition.
if !util.HasProgressDeadline(d) {

View File

@ -22,6 +22,7 @@ import (
"sort"
apps "k8s.io/api/apps/v1"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"k8s.io/utils/integer"
@ -56,12 +57,6 @@ func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Depl
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}
// Sync deployment status
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
@ -76,7 +71,7 @@ func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allR
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
return scaled, err
}
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS, &dc.strategy)
if err != nil {
return false, err
}
@ -93,7 +88,14 @@ func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, all
allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
klog.V(4).Infof("New replica set %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRS.Status.AvailableReplicas)
maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
maxUnavailable := deploymentutil.MaxUnavailable(deployment, &dc.strategy)
// Old RSes should obey the limitation of partition.
ScaleDownOldLimit := ScaleDownLimitForOld(oldRSs, newRS, deployment, dc.strategy.Partition)
if ScaleDownOldLimit <= 0 {
// Old replica sets do not satisfied as partition expectation, scale up.
return dc.scaleUpOldReplicaSets(ctx, oldRSs, -ScaleDownOldLimit, deployment)
}
// Check if we can scale down. We can scale down in the following 2 cases:
// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
@ -128,6 +130,8 @@ func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, all
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
// But, do not exceed the number of the desired partition.
maxScaledDown = integer.Int32Min(maxScaledDown, ScaleDownOldLimit)
if maxScaledDown <= 0 {
return false, nil
}
@ -191,7 +195,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, ol
// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
// Need check maxUnavailable to ensure availability
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
maxUnavailable := deploymentutil.MaxUnavailable(deployment, &dc.strategy)
// Check if we can scale down.
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
@ -203,10 +207,15 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx cont
}
klog.V(4).Infof("Found %d available pods in deployment %s, scaling down old RSes", availablePodCount, deployment.Name)
sort.Sort(deploymentutil.ReplicaSetsByCreationTimestamp(oldRSs))
// We expected scaled down the middle revision firstly.
sort.Sort(deploymentutil.ReplicaSetsBySmallerRevision(oldRSs))
totalScaledDown := int32(0)
totalScaleDownCount := availablePodCount - minAvailable
newRS := deploymentutil.FindNewReplicaSet(deployment, allRSs)
// Old RSes should obey the limitation of partition.
ScaleDownOldLimit := ScaleDownLimitForOld(oldRSs, newRS, deployment, dc.strategy.Partition)
totalScaleDownCount = integer.Int32Min(totalScaleDownCount, ScaleDownOldLimit)
for _, targetRS := range oldRSs {
if totalScaledDown >= totalScaleDownCount {
// No further scaling required.
@ -232,3 +241,39 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx cont
return totalScaledDown, nil
}
// scaleUpOldReplicaSets is different from native deployment: consider partition limitation.
func (dc *DeploymentController) scaleUpOldReplicaSets(ctx context.Context, oldRSs []*apps.ReplicaSet, scaledUpCount int32, deployment *apps.Deployment) (bool, error) {
if scaledUpCount <= 0 || len(oldRSs) == 0 {
return false, nil
}
// Scale up the biggest one or older.
sort.Sort(deploymentutil.ReplicaSetsBySizeOlder(oldRSs))
newScale := (*oldRSs[0].Spec.Replicas) + scaledUpCount
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, oldRSs[0], newScale, deployment)
return scaled, err
}
// ScaleDownLimitForOld return the limitation of old replica sets under the partition settings.
func ScaleDownLimitForOld(oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment, partition intstrutil.IntOrString) int32 {
newRSUpdateLimit := deploymentutil.NewRSReplicasLimit(partition, deployment)
// Expected replicas of the new replica set under the partition settings.
newRSDesiredCount := integer.Int32Max(newRSUpdateLimit, *newRS.Spec.Replicas)
// Expected total replicas for old replica sets.
oldRSDesiredCount := *(deployment.Spec.Replicas) - newRSDesiredCount
// Actual total replicas for old replica sets.
oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
// oldRSDesiredDiff is the gap between the reality and the desired.
scaleDownOldLimit := oldPodsCount - oldRSDesiredCount
klog.V(4).InfoS("Calculate scale down limit for ",
"Deployment", klog.KObj(deployment),
// About the new replica set
"Replicas(New)", *(newRS.Spec.Replicas), "DesiredReplicas(New)", newRSDesiredCount,
// About the old replica sets
"ReplicaS(Old)", oldPodsCount, "DesiredReplicas(Old)", oldRSDesiredCount, "ScaleDownLimit(Old)", scaleDownOldLimit,
// About the deployment
"Replicas(Deployment)", *(deployment.Spec.Replicas), "Partition(Deployment)", newRSUpdateLimit)
return scaleDownOldLimit
}

View File

@ -0,0 +1,449 @@
package deployment
import (
"context"
"fmt"
"math/rand"
"strconv"
"testing"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
)
func newDControllerRef(d *apps.Deployment) *metav1.OwnerReference {
isController := true
return &metav1.OwnerReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: d.GetName(),
UID: d.GetUID(),
Controller: &isController,
}
}
// generateRS creates a replica set, with the input deployment's template as its template
func generateRS(deployment apps.Deployment) apps.ReplicaSet {
template := deployment.Spec.Template.DeepCopy()
return apps.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
UID: randomUID(),
Name: randomName("replicaset"),
Labels: template.Labels,
OwnerReferences: []metav1.OwnerReference{*newDControllerRef(&deployment)},
},
Spec: apps.ReplicaSetSpec{
Replicas: new(int32),
Template: *template,
Selector: &metav1.LabelSelector{MatchLabels: template.Labels},
},
}
}
func randomUID() types.UID {
return types.UID(strconv.FormatInt(rand.Int63(), 10))
}
func randomName(prefix string) string {
return fmt.Sprintf("%s-%s", prefix, strconv.FormatInt(5, 10))
}
// generateDeployment creates a deployment, with the input image as its template
func generateDeployment(image string) apps.Deployment {
podLabels := map[string]string{"name": image}
terminationSec := int64(30)
enableServiceLinks := v1.DefaultEnableServiceLinks
return apps.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: image,
Annotations: make(map[string]string),
},
Spec: apps.DeploymentSpec{
Replicas: func(i int32) *int32 { return &i }(1),
Selector: &metav1.LabelSelector{MatchLabels: podLabels},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: image,
Image: image,
ImagePullPolicy: v1.PullAlways,
TerminationMessagePath: v1.TerminationMessagePathDefault,
},
},
DNSPolicy: v1.DNSClusterFirst,
TerminationGracePeriodSeconds: &terminationSec,
RestartPolicy: v1.RestartPolicyAlways,
SecurityContext: &v1.PodSecurityContext{},
EnableServiceLinks: &enableServiceLinks,
},
},
},
}
}
func TestScaleDownLimitForOld(t *testing.T) {
tDeployment := apps.Deployment{Spec: apps.DeploymentSpec{Replicas: pointer.Int32(10)}}
tReplicaSet := &apps.ReplicaSet{Spec: apps.ReplicaSetSpec{Replicas: pointer.Int32(5)}}
tOldRSes := func() []*apps.ReplicaSet {
return []*apps.ReplicaSet{
{Spec: apps.ReplicaSetSpec{Replicas: pointer.Int32(2)}},
{Spec: apps.ReplicaSetSpec{Replicas: pointer.Int32(2)}},
{Spec: apps.ReplicaSetSpec{Replicas: pointer.Int32(2)}},
}
}
tests := map[string]struct {
deployment func() *apps.Deployment
oldRSes func() []*apps.ReplicaSet
newRS func() *apps.ReplicaSet
partition intstrutil.IntOrString
expect int32
}{
"ScaleDownLimit > 0": {
deployment: func() *apps.Deployment {
return tDeployment.DeepCopy()
},
oldRSes: func() []*apps.ReplicaSet {
return tOldRSes()
},
newRS: func() *apps.ReplicaSet {
return tReplicaSet.DeepCopy()
},
partition: intstrutil.FromInt(5),
expect: 1,
},
"ScaleDownLimit = 0": {
deployment: func() *apps.Deployment {
return tDeployment.DeepCopy()
},
oldRSes: func() []*apps.ReplicaSet {
return tOldRSes()
},
newRS: func() *apps.ReplicaSet {
newRS := tReplicaSet.DeepCopy()
newRS.Spec.Replicas = pointer.Int32(4)
return newRS
},
partition: intstrutil.FromInt(4),
expect: 0,
},
"ScaleDownLimit < 0": {
deployment: func() *apps.Deployment {
return tDeployment.DeepCopy()
},
oldRSes: func() []*apps.ReplicaSet {
return tOldRSes()
},
newRS: func() *apps.ReplicaSet {
newRS := tReplicaSet.DeepCopy()
newRS.Spec.Replicas = pointer.Int32(2)
return newRS
},
partition: intstrutil.FromInt(2),
expect: -2,
},
"newRS replicas > partition": {
deployment: func() *apps.Deployment {
return tDeployment.DeepCopy()
},
oldRSes: func() []*apps.ReplicaSet {
return tOldRSes()
},
newRS: func() *apps.ReplicaSet {
return tReplicaSet.DeepCopy()
},
partition: intstrutil.FromInt(2),
expect: 1,
},
"newRS replicas < partition": {
deployment: func() *apps.Deployment {
return tDeployment.DeepCopy()
},
oldRSes: func() []*apps.ReplicaSet {
return tOldRSes()
},
newRS: func() *apps.ReplicaSet {
newRS := tReplicaSet.DeepCopy()
newRS.Spec.Replicas = pointer.Int32(2)
return newRS
},
partition: intstrutil.FromInt(5),
expect: 1,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
result := ScaleDownLimitForOld(test.oldRSes(), test.newRS(), test.deployment(), test.partition)
if result != test.expect {
t.Fatalf("expect %d, but got %d", test.expect, result)
}
})
}
}
func TestReconcileNewReplicaSet(t *testing.T) {
tests := map[string]struct {
oldRSs []int32
newRS int32
deployment int32
partition intstrutil.IntOrString
maxSurge intstrutil.IntOrString
expect int32
}{
"limited by partition": {
[]int32{2, 3}, 3, 10,
intstrutil.FromInt(4), intstrutil.FromInt(0), 4,
},
"limited by deployment replicas": {
[]int32{2, 3}, 3, 10,
intstrutil.FromInt(10), intstrutil.FromInt(0), 5,
},
"surge first": {
[]int32{10}, 0, 10,
intstrutil.FromInt(3), intstrutil.FromInt(2), 2,
},
"surge first, but limited by partition": {
[]int32{10}, 0, 10,
intstrutil.FromInt(2), intstrutil.FromInt(3), 2,
},
"partition satisfied, no scale": {
[]int32{7}, 3, 10,
intstrutil.FromInt(3), intstrutil.FromInt(3), 3,
},
"partition satisfied, no scale even though deployment replicas not reach": {
[]int32{5}, 3, 10,
intstrutil.FromInt(3), intstrutil.FromInt(3), 3,
},
"new replica set has been greater than partition, no scale down": {
[]int32{7}, 3, 10,
intstrutil.FromInt(1), intstrutil.FromInt(3), 3,
},
"total replicas are more than deployment desired, no scale down": {
[]int32{7}, 3, 10,
intstrutil.FromInt(6), intstrutil.FromInt(0), 3,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fakeRecord := record.NewFakeRecorder(10)
dc := &DeploymentController{
client: fakeClient,
eventRecorder: fakeRecord,
strategy: rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &test.maxSurge,
},
Partition: test.partition,
},
}
var deployment apps.Deployment
var newRS apps.ReplicaSet
var allRSs []*apps.ReplicaSet
{
deployment = generateDeployment("busybox")
deployment.Spec.Replicas = pointer.Int32(test.deployment)
_, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
{
for index, replicas := range test.oldRSs {
rs := generateRS(deployment)
rs.SetName(fmt.Sprintf("rs-%d", index))
rs.Spec.Replicas = pointer.Int32(replicas)
allRSs = append(allRSs, &rs)
_, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
}
{
newRS = generateRS(deployment)
newRS.SetName("rs-new")
newRS.Spec.Replicas = pointer.Int32(test.newRS)
allRSs = append(allRSs, &newRS)
_, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
_, err := dc.reconcileNewReplicaSet(context.TODO(), allRSs, &newRS, &deployment)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
result, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).Get(context.TODO(), newRS.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
if *result.Spec.Replicas != test.expect {
t.Fatalf("expect %d, but got %d", test.expect, *result.Spec.Replicas)
}
})
}
}
func TestReconcileOldReplicaSet(t *testing.T) {
tests := map[string]struct {
oldRSsReplicas []int32
newRSReplicas int32
dReplicas int32
oldRSsAvailable []int32
newRSAvailable int32
dAvailable int32
partition intstrutil.IntOrString
maxSurge intstrutil.IntOrString
maxUnavailable intstrutil.IntOrString
expectOldReplicas int32
}{
"scale down: all unavailable, limited by partition": {
[]int32{6, 4}, 0, 10,
[]int32{0, 0}, 0, 0,
intstrutil.FromInt(4), intstrutil.FromInt(2), intstrutil.FromString("50%"),
6,
},
"scale down: all available, limited by maxUnavailable": {
[]int32{6, 4}, 0, 10,
[]int32{6, 4}, 0, 10,
intstrutil.FromInt(4), intstrutil.FromInt(2), intstrutil.FromString("20%"),
8,
},
"scale down: scale unavailable first, then scale available, limited by maxUnavailable": {
[]int32{6, 4}, 0, 10,
[]int32{6, 2}, 0, 8,
intstrutil.FromInt(5), intstrutil.FromInt(0), intstrutil.FromString("40%"),
6,
},
"scale down: scale unavailable first, then scale available, limited by partition": {
[]int32{6, 4}, 0, 10,
[]int32{6, 2}, 0, 8,
intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromString("40%"),
7,
},
"no op: newRS replicas more than partition, limited by newRS replicas": {
[]int32{0, 5}, 5, 10,
[]int32{0, 5}, 5, 10,
intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromString("40%"),
5,
},
"no op: limited by unavailable newRS": {
[]int32{3, 4}, 3, 10,
[]int32{3, 4}, 0, 7,
intstrutil.FromInt(5), intstrutil.FromInt(0), intstrutil.FromString("30%"),
7,
},
"scale up oldRS": {
[]int32{3, 0}, 3, 10,
[]int32{3, 0}, 3, 6,
intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromString("30%"),
7,
},
"scale down oldRS": {
[]int32{3}, 3, 5,
[]int32{3}, 3, 6,
intstrutil.FromString("60%"), intstrutil.FromString("25%"), intstrutil.FromString("25%"),
2,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fakeRecord := record.NewFakeRecorder(10)
dc := &DeploymentController{
client: fakeClient,
eventRecorder: fakeRecord,
strategy: rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &test.maxSurge,
MaxUnavailable: &test.maxUnavailable,
},
Partition: test.partition,
},
}
var deployment apps.Deployment
var newRS apps.ReplicaSet
var allRSs []*apps.ReplicaSet
var oldRSs []*apps.ReplicaSet
{
deployment = generateDeployment("busybox:latest")
deployment.Spec.Replicas = pointer.Int32(test.dReplicas)
deployment.Status.ReadyReplicas = test.newRSReplicas
availableReplicas := test.newRSAvailable
for _, available := range test.oldRSsAvailable {
availableReplicas += available
}
deployment.Status.UpdatedReplicas = test.newRSReplicas
deployment.Status.Replicas = availableReplicas
deployment.Status.AvailableReplicas = availableReplicas
_, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
{
for index, replicas := range test.oldRSsReplicas {
rs := generateRS(deployment)
rs.SetName(fmt.Sprintf("rs-%d", index))
rs.Spec.Replicas = pointer.Int32(replicas)
rs.Status.Replicas = replicas
rs.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("old-version-%d", index)
rs.Status.ReadyReplicas = test.oldRSsAvailable[index]
rs.Status.AvailableReplicas = test.oldRSsAvailable[index]
allRSs = append(allRSs, &rs)
oldRSs = append(oldRSs, &rs)
_, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
}
{
newRS = generateRS(deployment)
newRS.SetName("rs-new")
newRS.Spec.Replicas = pointer.Int32(test.newRSReplicas)
newRS.Status.Replicas = test.newRSReplicas
newRS.Status.ReadyReplicas = test.newRSAvailable
newRS.Status.AvailableReplicas = test.newRSAvailable
allRSs = append(allRSs, &newRS)
_, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
_, err := dc.reconcileOldReplicaSets(context.TODO(), allRSs, oldRSs, &newRS, &deployment)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
rss, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
result := int32(0)
for _, rs := range rss.Items {
if rs.GetName() != "rs-new" {
result += *rs.Spec.Replicas
}
}
if result != test.expectOldReplicas {
t.Fatalf("expect %d, but got %d", test.expectOldReplicas, result)
}
})
}
}

View File

@ -28,7 +28,9 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/utils/integer"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util"
"github.com/openkruise/rollouts/pkg/util"
labelsutil "github.com/openkruise/rollouts/pkg/util/labels"
@ -62,40 +64,6 @@ func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rs
return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
}
// checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition.
// These conditions are needed so that we won't accidentally report lack of progress for resumed deployments
// that were paused for longer than progressDeadlineSeconds.
func (dc *DeploymentController) checkPausedConditions(ctx context.Context, d *apps.Deployment) error {
if !deploymentutil.HasProgressDeadline(d) {
return nil
}
cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
if cond != nil && cond.Reason == deploymentutil.TimedOutReason {
// If we have reported lack of progress, do not overwrite it with a paused condition.
return nil
}
pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason
needsUpdate := false
if d.Spec.Paused && !pausedCondExists {
condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused")
deploymentutil.SetDeploymentCondition(&d.Status, *condition)
needsUpdate = true
} else if !d.Spec.Paused && pausedCondExists {
condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed")
deploymentutil.SetDeploymentCondition(&d.Status, *condition)
needsUpdate = true
}
if !needsUpdate {
return nil
}
var err error
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
return err
}
// getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated.
//
// rsList should come from getReplicaSetsForDeployment(d).
@ -145,7 +113,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De
rsCopy := existingNewRS.DeepCopy()
// Set existing new replica set's annotation
annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, newRevision, true, maxRevHistoryLengthInChars)
annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars)
minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
if annotationsUpdated || minReadySecondsNeedsUpdate {
rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
@ -202,14 +170,19 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De
},
}
allRSs := append(oldRSs, &newRS)
newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS)
newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS, &dc.strategy)
if err != nil {
return nil, err
}
*(newRS.Spec.Replicas) = newReplicasCount
// We ensure that newReplicasLowerBound is greater than 0 unless deployment is 0,
// this is because if we set new replicas as 0, the native deployment controller
// will flight with ours.
newReplicasLowerBound := deploymentutil.NewRSReplicasLowerBound(d, &dc.strategy)
*(newRS.Spec.Replicas) = integer.Int32Max(newReplicasCount, newReplicasLowerBound)
// Set new replica set's annotation
deploymentutil.SetNewReplicaSetAnnotations(d, &newRS, newRevision, false, maxRevHistoryLengthInChars)
deploymentutil.SetNewReplicaSetAnnotations(d, &newRS, &dc.strategy, newRevision, false, maxRevHistoryLengthInChars)
// Create the new ReplicaSet. If it already exists, then we need to check for possible
// hash collisions. If there is any other error, we need to report it in the status of
// the Deployment.
@ -320,7 +293,7 @@ func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Depl
allowedSize := int32(0)
if *(deployment.Spec.Replicas) > 0 {
allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
allowedSize = *(deployment.Spec.Replicas)
}
// Number of additional replicas that can be either added or removed from the total
@ -355,7 +328,7 @@ func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Depl
// Estimate proportions if we have replicas to add, otherwise simply populate
// nameToSize with the current sizes for each replica set.
if deploymentReplicasToAdd != 0 {
proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
proportion := deploymentutil.GetProportion(rs, *deployment, &dc.strategy, deploymentReplicasToAdd, deploymentReplicasAdded)
nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
deploymentReplicasAdded += proportion
@ -406,7 +379,7 @@ func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.Re
sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale
annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(deployment, &dc.strategy))
scaled := false
var err error
@ -414,7 +387,7 @@ func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.Re
oldScale := *(rs.Spec.Replicas)
rsCopy := rs.DeepCopy()
*(rsCopy.Spec.Replicas) = newScale
deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(deployment, &dc.strategy))
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
if err == nil && sizeNeedsUpdate {
scaled = true
@ -465,7 +438,7 @@ func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs []
// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
newStatus := calculateStatus(allRSs, newRS, d)
newStatus := calculateStatus(allRSs, newRS, d, &dc.strategy)
if reflect.DeepEqual(d.Status, newStatus) {
return nil
@ -478,7 +451,7 @@ func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs
}
// calculateStatus calculates the latest status for the provided deployment by looking into the provided replica sets.
func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) apps.DeploymentStatus {
func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) apps.DeploymentStatus {
availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
unavailableReplicas := totalReplicas - availableReplicas
@ -505,7 +478,7 @@ func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployme
status.Conditions = append(status.Conditions, conditions[i])
}
if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) {
if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(deployment, strategy) {
minAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.")
deploymentutil.SetDeploymentCondition(&status, *minAvailability)
} else {

View File

@ -35,6 +35,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/utils/integer"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
)
@ -200,7 +201,7 @@ func Revision(obj runtime.Object) (int64, error) {
// SetNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
// copying required deployment annotations to it; it returns true if replica set's annotation is changed.
func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.ReplicaSet, newRevision string, exists bool, revHistoryLimitInChars int) bool {
func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.ReplicaSet, strategy *rolloutsv1alpha1.DeploymentStrategy, newRevision string, exists bool, revHistoryLimitInChars int) bool {
// First, copy deployment's annotations (except for apply and revision annotations)
annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS)
// Then, update replica set's revision annotation
@ -256,7 +257,7 @@ func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.Replic
}
}
// If the new replica set is about to be created, we need to add replica annotations to it.
if !exists && SetReplicasAnnotations(newRS, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+MaxSurge(*deployment)) {
if !exists && SetReplicasAnnotations(newRS, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+MaxSurge(deployment, strategy)) {
annotationChanged = true
}
return annotationChanged
@ -382,12 +383,12 @@ func ReplicasAnnotationsNeedUpdate(rs *apps.ReplicaSet, desiredReplicas, maxRepl
}
// MaxUnavailable returns the maximum unavailable pods a rolling deployment can take.
func MaxUnavailable(deployment apps.Deployment) int32 {
if !IsRollingUpdate(&deployment) || *(deployment.Spec.Replicas) == 0 {
func MaxUnavailable(deployment *apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) int32 {
if strategy == nil || strategy.RollingUpdate == nil || *(deployment.Spec.Replicas) == 0 {
return int32(0)
}
// Error caught by validation
_, maxUnavailable, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas))
_, maxUnavailable, _ := ResolveFenceposts(strategy.RollingUpdate.MaxSurge, strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas))
if maxUnavailable > *deployment.Spec.Replicas {
return *deployment.Spec.Replicas
}
@ -395,32 +396,32 @@ func MaxUnavailable(deployment apps.Deployment) int32 {
}
// MinAvailable returns the minimum available pods of a given deployment
func MinAvailable(deployment *apps.Deployment) int32 {
if !IsRollingUpdate(deployment) {
func MinAvailable(deployment *apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) int32 {
if strategy == nil || strategy.RollingUpdate == nil {
return int32(0)
}
return *(deployment.Spec.Replicas) - MaxUnavailable(*deployment)
return *(deployment.Spec.Replicas) - MaxUnavailable(deployment, strategy)
}
// MaxSurge returns the maximum surge pods a rolling deployment can take.
func MaxSurge(deployment apps.Deployment) int32 {
if !IsRollingUpdate(&deployment) {
func MaxSurge(deployment *apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) int32 {
if strategy == nil || strategy.RollingUpdate == nil {
return int32(0)
}
// Error caught by validation
maxSurge, _, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas))
maxSurge, _, _ := ResolveFenceposts(strategy.RollingUpdate.MaxSurge, strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas))
return maxSurge
}
// GetProportion will estimate the proportion for the provided replica set using 1. the current size
// of the parent deployment, 2. the replica count that needs be added on the replica sets of the
// deployment, and 3. the total replicas added in the replica sets of the deployment so far.
func GetProportion(rs *apps.ReplicaSet, d apps.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 {
func GetProportion(rs *apps.ReplicaSet, d apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 {
if rs == nil || *(rs.Spec.Replicas) == 0 || deploymentReplicasToAdd == 0 || deploymentReplicasToAdd == deploymentReplicasAdded {
return int32(0)
}
rsFraction := getReplicaSetFraction(*rs, d)
rsFraction := getReplicaSetFraction(*rs, d, strategy)
allowed := deploymentReplicasToAdd - deploymentReplicasAdded
if deploymentReplicasToAdd > 0 {
@ -437,13 +438,13 @@ func GetProportion(rs *apps.ReplicaSet, d apps.Deployment, deploymentReplicasToA
// getReplicaSetFraction estimates the fraction of replicas a replica set can have in
// 1. a scaling event during a rollout or 2. when scaling a paused deployment.
func getReplicaSetFraction(rs apps.ReplicaSet, d apps.Deployment) int32 {
func getReplicaSetFraction(rs apps.ReplicaSet, d apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) int32 {
// If we are scaling down to zero then the fraction of this replica set is its whole size (negative)
if *(d.Spec.Replicas) == int32(0) {
return -*(rs.Spec.Replicas)
}
deploymentReplicas := *(d.Spec.Replicas) + MaxSurge(d)
deploymentReplicas := *(d.Spec.Replicas) + MaxSurge(&d, strategy)
annotatedReplicas, ok := getMaxReplicasAnnotation(&rs)
if !ok {
// If we cannot find the annotation then fallback to the current deployment size. Note that this
@ -622,8 +623,8 @@ func GetAvailableReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int3
}
// IsRollingUpdate returns true if the strategy type is a rolling update.
func IsRollingUpdate(deployment *apps.Deployment) bool {
return deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType
func IsRollingUpdate(_ *apps.Deployment) bool {
return true
}
// DeploymentComplete considers a deployment to be complete once all of its desired replicas
@ -704,17 +705,19 @@ func DeploymentTimedOut(deployment *apps.Deployment, newStatus *apps.DeploymentS
// When one of the followings is true, we're rolling out the deployment; otherwise, we're scaling it.
// 1) The new RS is saturated: newRS's replicas == deployment's replicas
// 2) Max number of pods allowed is reached: deployment's replicas + maxSurge == all RSs' replicas
func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) {
switch deployment.Spec.Strategy.Type {
case apps.RollingUpdateDeploymentStrategyType:
// Check if we can scale up.
maxSurge, err := intstrutil.GetScaledValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true)
if err != nil {
return 0, err
func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, strategy *rolloutsv1alpha1.DeploymentStrategy) (int32, error) {
// Find the total number of pods
currentPodCount := GetReplicaCountForReplicaSets(allRSs)
switch {
case currentPodCount > *newRS.Spec.Replicas:
// Do not scale down due to partition settings.
scaleUpLimit := NewRSReplicasLimit(strategy.Partition, deployment)
if *newRS.Spec.Replicas >= scaleUpLimit {
// Cannot scale up.
return *(newRS.Spec.Replicas), nil
}
// Find the total number of pods
currentPodCount := GetReplicaCountForReplicaSets(allRSs)
maxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge)
// Do not scale up due to exceeded current replicas.
maxTotalPods := *(deployment.Spec.Replicas) + MaxSurge(deployment, strategy)
if currentPodCount >= maxTotalPods {
// Cannot scale up.
return *(newRS.Spec.Replicas), nil
@ -723,11 +726,11 @@ func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, ne
scaleUpCount := maxTotalPods - currentPodCount
// Do not exceed the number of desired replicas.
scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas))))
return *(newRS.Spec.Replicas) + scaleUpCount, nil
case apps.RecreateDeploymentStrategyType:
return *(deployment.Spec.Replicas), nil
// Do not exceed the number of partition replicas.
return integer.Int32Min(*(newRS.Spec.Replicas)+scaleUpCount, scaleUpLimit), nil
default:
return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
// If there is ONLY ONE active replica set, just be in line with deployment replicas.
return *(deployment.Spec.Replicas), nil
}
}
@ -813,6 +816,21 @@ func (o ReplicaSetsByRevision) Less(i, j int) bool {
-------------------------------- BEGIN --------------------------------------
*/
// ReplicaSetsBySmallerRevision sorts a list of ReplicaSet by revision in desc, using their creation timestamp or name as a tie breaker.
// By using the creation timestamp, this sorts from old to new replica sets.
type ReplicaSetsBySmallerRevision []*apps.ReplicaSet
func (o ReplicaSetsBySmallerRevision) Len() int { return len(o) }
func (o ReplicaSetsBySmallerRevision) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o ReplicaSetsBySmallerRevision) Less(i, j int) bool {
revision1, err1 := Revision(o[i])
revision2, err2 := Revision(o[j])
if err1 != nil || err2 != nil || revision1 == revision2 {
return ReplicaSetsByCreationTimestamp(o).Less(i, j)
}
return revision1 > revision2
}
// FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods.
func FilterActiveReplicaSets(replicaSets []*apps.ReplicaSet) []*apps.ReplicaSet {
activeFilter := func(rs *apps.ReplicaSet) bool {
@ -918,3 +936,32 @@ func NewRSReplicasLimit(partition intstrutil.IntOrString, deployment *apps.Deplo
}
return int32(replicaLimit)
}
// DeploymentRolloutSatisfied return nil if deployment has satisfied partition and replicas,
// or will return an error.
func DeploymentRolloutSatisfied(deployment *apps.Deployment, partition intstrutil.IntOrString) error {
if deployment.Status.ObservedGeneration < deployment.Generation {
return fmt.Errorf("deployment %v observed generation %d less than generation %d",
klog.KObj(deployment), deployment.Status.ObservedGeneration, deployment.Generation)
}
if deployment.Status.Replicas != *(deployment.Spec.Replicas) {
return fmt.Errorf("deployment %v status replicas %d not equals to replicas %d",
klog.KObj(deployment), deployment.Status.Replicas, *deployment.Spec.Replicas)
}
newRSReplicasLimit := NewRSReplicasLimit(partition, deployment)
if deployment.Status.UpdatedReplicas < newRSReplicasLimit {
return fmt.Errorf("deployment %v updated replicas %d less than partition %d",
klog.KObj(deployment), deployment.Status.UpdatedReplicas, newRSReplicasLimit)
}
return nil
}
// NewRSReplicasLowerBound ensure that newReplicasLowerBound is greater than 0 when create newRS
// unless deployment is 0 or MaxSurge > 0, this is because if we set new replicas as 0, the native
// deployment controller will flight with ours.
func NewRSReplicasLowerBound(deployment *apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) int32 {
if MaxSurge(deployment, strategy) > 0 {
return int32(0)
}
return integer.Int32Min(int32(1), *deployment.Spec.Replicas)
}

View File

@ -33,6 +33,8 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/integer"
"k8s.io/utils/pointer"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
)
func newDControllerRef(d *apps.Deployment) *metav1.OwnerReference {
@ -481,9 +483,19 @@ func newString(s string) *string {
}
func TestNewRSNewReplicas(t *testing.T) {
newDeployment := generateDeployment("nginx")
newRC := generateRS(newDeployment)
rss := make([]*apps.ReplicaSet, 10)
for i := range rss {
rs := generateRS(newDeployment)
rs.Spec.Replicas = pointer.Int32(int32(i))
rss[i] = &rs
}
tests := []struct {
Name string
strategyType apps.DeploymentStrategyType
oldRSs []*apps.ReplicaSet
partition intstr.IntOrString
depReplicas int32
newRSReplicas int32
maxSurge int
@ -491,41 +503,61 @@ func TestNewRSNewReplicas(t *testing.T) {
}{
{
"can not scale up - to newRSReplicas",
apps.RollingUpdateDeploymentStrategyType,
[]*apps.ReplicaSet{rss[5]},
intstr.FromInt(1),
1, 5, 1, 5,
},
{
"scale up - to depReplicas",
apps.RollingUpdateDeploymentStrategyType,
[]*apps.ReplicaSet{rss[0]},
intstr.FromString("100%"),
6, 2, 10, 6,
},
{
"recreate - to depReplicas",
apps.RecreateDeploymentStrategyType,
3, 1, 1, 3,
"scale up - to int-type partition",
[]*apps.ReplicaSet{rss[8]},
intstr.FromInt(4),
10, 2, 10, 4,
},
{
"scala up without old - to depReplicas",
[]*apps.ReplicaSet{},
intstr.FromInt(2),
10, 2, 10, 10,
},
{
"cannot scale due to partition - to newRSReplica",
[]*apps.ReplicaSet{rss[8]},
intstr.FromInt(2),
10, 2, 10, 2,
},
{
"cannot scale because new replicas grater than partition - to newRSReplica",
[]*apps.ReplicaSet{rss[5]},
intstr.FromInt(2),
10, 5, 10, 5,
},
}
newDeployment := generateDeployment("nginx")
newRC := generateRS(newDeployment)
rs5 := generateRS(newDeployment)
*(rs5.Spec.Replicas) = 5
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
*(newDeployment.Spec.Replicas) = test.depReplicas
newDeployment.Spec.Strategy = apps.DeploymentStrategy{Type: test.strategyType}
newDeployment.Spec.Strategy.RollingUpdate = &apps.RollingUpdateDeployment{
MaxUnavailable: func(i int) *intstr.IntOrString {
x := intstr.FromInt(i)
return &x
}(1),
MaxSurge: func(i int) *intstr.IntOrString {
x := intstr.FromInt(i)
return &x
}(test.maxSurge),
strategy := &rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: &apps.RollingUpdateDeployment{
MaxUnavailable: func(i int) *intstr.IntOrString {
x := intstr.FromInt(i)
return &x
}(1),
MaxSurge: func(i int) *intstr.IntOrString {
x := intstr.FromInt(i)
return &x
}(test.maxSurge),
},
Partition: test.partition,
}
*(newRC.Spec.Replicas) = test.newRSReplicas
rs, err := NewRSNewReplicas(&newDeployment, []*apps.ReplicaSet{&rs5}, &newRC)
allRSs := append(test.oldRSs, &newRC)
rs, err := NewRSNewReplicas(&newDeployment, allRSs, &newRC, strategy)
if err != nil {
t.Errorf("In test case %s, got unexpected error %v", test.Name, err)
}
@ -1023,7 +1055,10 @@ func TestMaxUnavailable(t *testing.T) {
for _, test := range tests {
t.Log(test.name)
t.Run(test.name, func(t *testing.T) {
maxUnavailable := MaxUnavailable(test.deployment)
strategy := rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: test.deployment.Spec.Strategy.RollingUpdate,
}
maxUnavailable := MaxUnavailable(&test.deployment, &strategy)
if test.expected != maxUnavailable {
t.Fatalf("expected:%v, got:%v", test.expected, maxUnavailable)
}
@ -1045,7 +1080,10 @@ func TestAnnotationUtils(t *testing.T) {
for i := 10; i < 20; i++ {
nextRevision := fmt.Sprintf("%d", i+1)
SetNewReplicaSetAnnotations(&tDeployment, &tRS, nextRevision, true, 5)
strategy := rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: tDeployment.Spec.Strategy.RollingUpdate,
}
SetNewReplicaSetAnnotations(&tDeployment, &tRS, &strategy, nextRevision, true, 5)
//Now the ReplicaSets Revision Annotation should be i+1
if i >= 12 {
@ -1196,3 +1234,71 @@ func TestNewRSReplicasLimit(t *testing.T) {
}
}
}
func TestDeploymentRolloutSatisfied(t *testing.T) {
tPartition := intstr.FromInt(3)
tDeployment := apps.Deployment{
ObjectMeta: metav1.ObjectMeta{Generation: 2},
Spec: apps.DeploymentSpec{Replicas: pointer.Int32(10)},
Status: apps.DeploymentStatus{Replicas: 10, UpdatedReplicas: 3, ObservedGeneration: 2},
}
tests := map[string]struct {
deployment func() *apps.Deployment
partition intstr.IntOrString
expect bool
}{
"generation unsatisfied": {
deployment: func() *apps.Deployment {
d := tDeployment.DeepCopy()
d.Status.ObservedGeneration = 1
return d
},
partition: tPartition,
expect: false,
},
"status.replicas greater than replicas": {
deployment: func() *apps.Deployment {
d := tDeployment.DeepCopy()
d.Status.Replicas = 11
return d
},
partition: tPartition,
expect: false,
},
"status.replicas less than replicas": {
deployment: func() *apps.Deployment {
d := tDeployment.DeepCopy()
d.Status.Replicas = 9
return d
},
partition: tPartition,
expect: false,
},
"partition greater than new replica set": {
deployment: func() *apps.Deployment {
d := tDeployment.DeepCopy()
d.Status.UpdatedReplicas = 2
return d
},
partition: tPartition,
expect: false,
},
"partition less than new replicas set": {
deployment: func() *apps.Deployment {
d := tDeployment.DeepCopy()
d.Status.UpdatedReplicas = 5
return d
},
partition: tPartition,
expect: true,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
err := DeploymentRolloutSatisfied(test.deployment(), test.partition)
if (test.expect && err != nil) || (!test.expect && err == nil) {
t.Fatalf("expect error %v, but got %v", test.expect, err)
}
})
}
}

339
test/e2e/deployment_test.go Normal file
View File

@ -0,0 +1,339 @@
package e2e
import (
"context"
"encoding/json"
"fmt"
"reflect"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
)
var _ = SIGDescribe("Advanced Deployment", func() {
var namespace string
defaultRetry := wait.Backoff{
Steps: 10,
Duration: 10 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}
CreateObject := func(object client.Object, options ...client.CreateOption) {
By(fmt.Sprintf("create deployment %v", client.ObjectKeyFromObject(object)))
object.SetNamespace(namespace)
Expect(k8sClient.Create(context.TODO(), object)).NotTo(HaveOccurred())
}
GetObject := func(namespace, name string, object client.Object) error {
key := types.NamespacedName{Namespace: namespace, Name: name}
return k8sClient.Get(context.TODO(), key, object)
}
UpdateDeployment := func(deployment *apps.Deployment, version string) *apps.Deployment {
By(fmt.Sprintf("update deployment %v to version: %v", client.ObjectKeyFromObject(deployment), version))
var clone *apps.Deployment
Expect(retry.RetryOnConflict(defaultRetry, func() error {
clone = &apps.Deployment{}
err := GetObject(deployment.Namespace, deployment.Name, clone)
if err != nil {
return err
}
clone.Spec.Template.Spec.Containers[0].Image = deployment.Spec.Template.Spec.Containers[0].Image
clone.Spec.Template.Spec.Containers[0].Env[0].Value = version
strategy := unmarshal(clone.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation])
strategy.Paused = true
clone.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation] = marshal(strategy)
return k8sClient.Update(context.TODO(), clone)
})).NotTo(HaveOccurred())
Eventually(func() bool {
clone = &apps.Deployment{}
err := GetObject(deployment.Namespace, deployment.Name, clone)
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("image: %s, version env: %s", clone.Spec.Template.Spec.Containers[0].Image, clone.Spec.Template.Spec.Containers[0].Env[0].Value))
return clone.Status.ObservedGeneration >= clone.Generation
}, time.Minute, time.Second).Should(BeTrue())
return clone
}
UpdatePartitionWithoutCheck := func(deployment *apps.Deployment, desired intstr.IntOrString) *apps.Deployment {
By(fmt.Sprintf("update deployment %v to desired: %v", client.ObjectKeyFromObject(deployment), desired))
var clone *apps.Deployment
Expect(retry.RetryOnConflict(defaultRetry, func() error {
clone = &apps.Deployment{}
err := GetObject(deployment.Namespace, deployment.Name, clone)
if err != nil {
return err
}
strategy := unmarshal(clone.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation])
if reflect.DeepEqual(desired, strategy.Partition) {
return nil
}
strategy.Paused = false
strategy.Partition = desired
clone.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation] = marshal(strategy)
return k8sClient.Update(context.TODO(), clone)
})).NotTo(HaveOccurred())
return clone
}
ListPods := func(namespace string, labelSelector *metav1.LabelSelector) ([]*v1.Pod, error) {
appList := &v1.PodList{}
selector, _ := metav1.LabelSelectorAsSelector(labelSelector)
err := k8sClient.List(context.TODO(), appList, &client.ListOptions{Namespace: namespace, LabelSelector: selector})
if err != nil {
return nil, err
}
apps := make([]*v1.Pod, 0)
for i := range appList.Items {
pod := &appList.Items[i]
if pod.DeletionTimestamp.IsZero() {
apps = append(apps, pod)
}
}
return apps, nil
}
ListReplicaSets := func(namespace string, labelSelector *metav1.LabelSelector) ([]*apps.ReplicaSet, error) {
appList := &apps.ReplicaSetList{}
selector, _ := metav1.LabelSelectorAsSelector(labelSelector)
err := k8sClient.List(context.TODO(), appList, &client.ListOptions{Namespace: namespace, LabelSelector: selector})
if err != nil {
return nil, err
}
apps := make([]*apps.ReplicaSet, 0)
for i := range appList.Items {
pod := &appList.Items[i]
if pod.DeletionTimestamp.IsZero() {
apps = append(apps, pod)
}
}
return apps, nil
}
CheckReplicas := func(deployment *apps.Deployment, replicas, available, updated int32) {
var clone *apps.Deployment
Eventually(func() bool {
clone = &apps.Deployment{}
err := GetObject(deployment.Namespace, deployment.Name, clone)
Expect(err).NotTo(HaveOccurred())
fmt.Printf("replicas %d, available: %d, updated: %d\n",
clone.Status.Replicas, clone.Status.AvailableReplicas, clone.Status.UpdatedReplicas)
return clone.Status.Replicas == replicas && clone.Status.AvailableReplicas == available && clone.Status.UpdatedReplicas == updated
}, 10*time.Minute, time.Second).Should(BeTrue())
Eventually(func() int {
pods, err := ListPods(deployment.Namespace, deployment.Spec.Selector)
Expect(err).NotTo(HaveOccurred())
return len(pods)
}, 10*time.Second, time.Second).Should(BeNumerically("==", replicas))
rss, err := ListReplicaSets(deployment.Namespace, deployment.Spec.Selector)
Expect(err).NotTo(HaveOccurred())
var rsReplicas, rsAvailable, rsUpdated int32
for _, rs := range rss {
if !rs.DeletionTimestamp.IsZero() {
continue
}
if util.EqualIgnoreHash(&rs.Spec.Template, &clone.Spec.Template) {
rsUpdated = rs.Status.Replicas
}
rsReplicas += rs.Status.Replicas
rsAvailable += rs.Status.AvailableReplicas
}
Expect(rsUpdated).Should(BeNumerically("==", updated))
Expect(rsReplicas).Should(BeNumerically("==", replicas))
Expect(rsAvailable).Should(BeNumerically("==", available))
}
ScaleDeployment := func(deployment *apps.Deployment, replicas int32) *apps.Deployment {
By(fmt.Sprintf("update deployment %v to replicas: %v", client.ObjectKeyFromObject(deployment), replicas))
var clone *apps.Deployment
Expect(retry.RetryOnConflict(defaultRetry, func() error {
clone = &apps.Deployment{}
err := GetObject(deployment.Namespace, deployment.Name, clone)
if err != nil {
return err
}
clone.Spec.Replicas = pointer.Int32(replicas)
return k8sClient.Update(context.TODO(), clone)
})).NotTo(HaveOccurred())
Eventually(func() bool {
clone = &apps.Deployment{}
err := GetObject(deployment.Namespace, deployment.Name, clone)
Expect(err).NotTo(HaveOccurred())
return clone.Status.ObservedGeneration >= clone.Generation
}, time.Minute, time.Second).Should(BeTrue())
return clone
}
UpdatePartitionWithCheck := func(deployment *apps.Deployment, desired intstr.IntOrString) {
By(fmt.Sprintf("update deployment %v to desired: %v, strategy: %v, and check",
client.ObjectKeyFromObject(deployment), deployment.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation], desired))
clone := UpdatePartitionWithoutCheck(deployment, desired)
count := 5
for count > 0 {
desiredUpdatedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(&desired, int(*deployment.Spec.Replicas), true)
CheckReplicas(deployment, *clone.Spec.Replicas, *clone.Spec.Replicas, int32(desiredUpdatedReplicas))
time.Sleep(time.Second)
count--
}
}
BeforeEach(func() {
namespace = randomNamespaceName("deployment")
ns := v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
},
}
Expect(k8sClient.Create(context.TODO(), &ns)).Should(SatisfyAny(BeNil()))
})
AfterEach(func() {
By("[TEST] Clean up resources after an integration test")
k8sClient.DeleteAllOf(context.TODO(), &apps.Deployment{}, client.InNamespace(namespace))
k8sClient.DeleteAllOf(context.TODO(), &v1.Service{}, client.InNamespace(namespace))
k8sClient.DeleteAllOf(context.TODO(), &netv1.Ingress{}, client.InNamespace(namespace))
Expect(k8sClient.Delete(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, client.PropagationPolicy(metav1.DeletePropagationForeground))).Should(Succeed())
time.Sleep(time.Second * 3)
})
KruiseDescribe("Advanced Deployment Checker", func() {
It("update with partition", func() {
deployment := &apps.Deployment{}
deployment.Namespace = namespace
Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred())
CreateObject(deployment)
CheckReplicas(deployment, 5, 5, 5)
UpdateDeployment(deployment, "version2")
UpdatePartitionWithCheck(deployment, intstr.FromInt(0))
UpdatePartitionWithCheck(deployment, intstr.FromInt(1))
UpdatePartitionWithCheck(deployment, intstr.FromInt(2))
UpdatePartitionWithCheck(deployment, intstr.FromInt(3))
UpdatePartitionWithCheck(deployment, intstr.FromInt(5))
})
It("update with scale up", func() {
deployment := &apps.Deployment{}
deployment.Namespace = namespace
Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred())
CreateObject(deployment)
UpdateDeployment(deployment, "version2")
UpdatePartitionWithCheck(deployment, intstr.FromInt(0))
UpdatePartitionWithCheck(deployment, intstr.FromInt(1))
UpdatePartitionWithCheck(deployment, intstr.FromInt(2))
deployment = ScaleDeployment(deployment, 10)
CheckReplicas(deployment, 10, 10, 4)
UpdatePartitionWithCheck(deployment, intstr.FromInt(7))
UpdatePartitionWithCheck(deployment, intstr.FromInt(10))
})
It("update with scale down", func() {
deployment := &apps.Deployment{}
deployment.Namespace = namespace
Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred())
deployment.Spec.Replicas = pointer.Int32(10)
CreateObject(deployment)
UpdateDeployment(deployment, "version2")
UpdatePartitionWithCheck(deployment, intstr.FromString("0%"))
UpdatePartitionWithCheck(deployment, intstr.FromString("40%"))
deployment = ScaleDeployment(deployment, 5)
CheckReplicas(deployment, 5, 5, 2)
UpdatePartitionWithCheck(deployment, intstr.FromString("60%"))
UpdatePartitionWithCheck(deployment, intstr.FromString("100%"))
})
It("update with MaxSurge=1, MaxUnavailable=0", func() {
deployment := &apps.Deployment{}
deployment.Namespace = namespace
Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred())
deployment.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation] =
`{"rollingStyle":"Partition","rollingUpdate":{"maxUnavailable":0,"maxSurge":1}}`
CreateObject(deployment)
CheckReplicas(deployment, 5, 5, 5)
deployment.Spec.Template.Spec.Containers[0].Image = "failed_image:failed"
UpdateDeployment(deployment, "version2")
UpdatePartitionWithCheck(deployment, intstr.FromInt(0))
UpdatePartitionWithoutCheck(deployment, intstr.FromInt(3))
CheckReplicas(deployment, 6, 5, 1)
})
It("update with MaxSurge=0, MaxUnavailable=1", func() {
deployment := &apps.Deployment{}
deployment.Namespace = namespace
Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred())
deployment.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation] =
`{"rollingStyle":"Partition","rollingUpdate":{"maxUnavailable":1,"maxSurge":0}}`
deployment.Spec.MinReadySeconds = 10
CreateObject(deployment)
UpdateDeployment(deployment, "version2")
UpdatePartitionWithCheck(deployment, intstr.FromInt(0))
UpdatePartitionWithoutCheck(deployment, intstr.FromInt(3))
CheckReplicas(deployment, 5, 4, 1)
CheckReplicas(deployment, 5, 4, 2)
CheckReplicas(deployment, 5, 4, 3)
UpdatePartitionWithoutCheck(deployment, intstr.FromInt(5))
CheckReplicas(deployment, 5, 4, 4)
CheckReplicas(deployment, 5, 5, 5)
})
It("continuous update", func() {
deployment := &apps.Deployment{}
deployment.Namespace = namespace
Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred())
CreateObject(deployment)
UpdateDeployment(deployment, "version2")
UpdatePartitionWithCheck(deployment, intstr.FromInt(0))
UpdatePartitionWithCheck(deployment, intstr.FromInt(2))
UpdateDeployment(deployment, "version3")
UpdatePartitionWithCheck(deployment, intstr.FromInt(0))
UpdatePartitionWithCheck(deployment, intstr.FromInt(3))
UpdatePartitionWithCheck(deployment, intstr.FromInt(5))
})
It("rollback", func() {
deployment := &apps.Deployment{}
deployment.Namespace = namespace
Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred())
CreateObject(deployment)
UpdateDeployment(deployment, "version2")
UpdatePartitionWithCheck(deployment, intstr.FromInt(0))
UpdatePartitionWithCheck(deployment, intstr.FromInt(2))
UpdateDeployment(deployment, "version3")
UpdatePartitionWithCheck(deployment, intstr.FromInt(0))
UpdatePartitionWithCheck(deployment, intstr.FromInt(3))
UpdateDeployment(deployment, "version2")
UpdatePartitionWithCheck(deployment, intstr.FromInt(2))
UpdatePartitionWithCheck(deployment, intstr.FromInt(3))
UpdatePartitionWithCheck(deployment, intstr.FromInt(5))
})
})
})
func unmarshal(strategyAnno string) *rolloutsv1alpha1.DeploymentStrategy {
strategy := &rolloutsv1alpha1.DeploymentStrategy{}
_ = json.Unmarshal([]byte(strategyAnno), strategy)
return strategy
}
func marshal(strategy *rolloutsv1alpha1.DeploymentStrategy) string {
strategyAnno, _ := json.Marshal(strategy)
return string(strategyAnno)
}

View File

@ -1391,8 +1391,6 @@ var _ = SIGDescribe("Rollout", func() {
WaitRolloutNotFound(rollout.Name)
Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred())
fmt.Println(util.DumpJSON(workload))
Expect(workload.Spec.Paused).Should(BeTrue())
Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 0))
workload.Spec.Paused = false
UpdateDeployment(workload)
By("Update deployment paused=false")

View File

@ -0,0 +1,34 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: sample
labels:
app: busybox
Annotations:
batchrelease.rollouts.kruise.io/control-info: '{"apiVersion":"rollouts.kruise.io/v1alpha1","kind":"BatchRelease","name":"rollouts-demo","uid":"45891961-8c29-4ea9-8e61-fd5a1fd19ffa","controller":true,"blockOwnerDeletion":true}'
rollouts.kruise.io/deployment-strategy: '{"rollingUpdate":{"maxUnavailable":"25%","maxSurge":"25%"}}'
spec:
paused: true
replicas: 5
strategy:
type: Recreate
selector:
matchLabels:
app: busybox
template:
metadata:
labels:
app: busybox
spec:
containers:
- name: busybox
image: busybox:1.32
imagePullPolicy: IfNotPresent
command: ["/bin/sh", "-c", "sleep 10000"]
env:
- name: VERSION
value: version1
resources:
limits:
memory: "10Mi"
cpu: "10m"