change workload controller to use patch instead of update

Signed-off-by: zhihao jian <zhihao.jian@shopee.com>

fix test

use MergeFrom func to patch data

get latest rs

fix unit test

fix lint

do not get latest status before patch
This commit is contained in:
zhihao jian 2025-07-02 15:00:20 +08:00
parent deaa5f38b5
commit 51cebc1d44
6 changed files with 300 additions and 101 deletions

View File

@ -107,6 +107,7 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
// Deployment controller factory
factory := &controllerFactory{
client: genericClient.KubeClient,
runtimeClient: mgr.GetClient(),
eventBroadcaster: eventBroadcaster,
eventRecorder: recorder,
dLister: dLister,
@ -268,6 +269,7 @@ func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *Deploy
return &DeploymentController{
client: f.client,
runtimeClient: f.runtimeClient,
eventBroadcaster: f.eventBroadcaster,
eventRecorder: f.eventRecorder,
dLister: f.dLister,

View File

@ -39,6 +39,7 @@ import (
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
@ -68,6 +69,8 @@ type DeploymentController struct {
// we will use this strategy to replace spec.strategy of deployment
strategy rolloutsv1alpha1.DeploymentStrategy
runtimeClient client.Client
}
// getReplicaSetsForDeployment uses ControllerRefManager to reconcile

View File

@ -24,18 +24,138 @@ import (
"testing"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/controller/deployment/util"
)
// mockReplicaSetLister implements appslisters.ReplicaSetLister for testing
type mockReplicaSetLister struct {
client ctrlclient.Client
}
func (m *mockReplicaSetLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) {
var rsList apps.ReplicaSetList
if err := m.client.List(context.TODO(), &rsList); err != nil {
return nil, err
}
var result []*apps.ReplicaSet
for i := range rsList.Items {
if selector.Matches(labels.Set(rsList.Items[i].Labels)) {
result = append(result, &rsList.Items[i])
}
}
return result, nil
}
func (m *mockReplicaSetLister) ReplicaSets(namespace string) appslisters.ReplicaSetNamespaceLister {
return &mockReplicaSetNamespaceLister{
client: m.client,
namespace: namespace,
}
}
func (m *mockReplicaSetLister) GetPodReplicaSets(pod *v1.Pod) ([]*apps.ReplicaSet, error) {
// For testing purposes, return empty list
return []*apps.ReplicaSet{}, nil
}
// mockDeploymentLister implements appslisters.DeploymentLister for testing
type mockDeploymentLister struct {
client ctrlclient.Client
}
func (m *mockDeploymentLister) List(selector labels.Selector) ([]*apps.Deployment, error) {
var deploymentList apps.DeploymentList
if err := m.client.List(context.TODO(), &deploymentList); err != nil {
return nil, err
}
var result []*apps.Deployment
for i := range deploymentList.Items {
if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) {
result = append(result, &deploymentList.Items[i])
}
}
return result, nil
}
func (m *mockDeploymentLister) Deployments(namespace string) appslisters.DeploymentNamespaceLister {
return &mockDeploymentNamespaceLister{
client: m.client,
namespace: namespace,
}
}
// mockDeploymentNamespaceLister implements appslisters.DeploymentNamespaceLister for testing
type mockDeploymentNamespaceLister struct {
client ctrlclient.Client
namespace string
}
func (m *mockDeploymentNamespaceLister) List(selector labels.Selector) ([]*apps.Deployment, error) {
var deploymentList apps.DeploymentList
if err := m.client.List(context.TODO(), &deploymentList, ctrlclient.InNamespace(m.namespace)); err != nil {
return nil, err
}
var result []*apps.Deployment
for i := range deploymentList.Items {
if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) {
result = append(result, &deploymentList.Items[i])
}
}
return result, nil
}
func (m *mockDeploymentNamespaceLister) Get(name string) (*apps.Deployment, error) {
var deployment apps.Deployment
if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &deployment); err != nil {
return nil, err
}
return &deployment, nil
}
// mockReplicaSetNamespaceLister implements appslisters.ReplicaSetNamespaceLister for testing
type mockReplicaSetNamespaceLister struct {
client ctrlclient.Client
namespace string
}
func (m *mockReplicaSetNamespaceLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) {
var rsList apps.ReplicaSetList
if err := m.client.List(context.TODO(), &rsList, ctrlclient.InNamespace(m.namespace)); err != nil {
return nil, err
}
var result []*apps.ReplicaSet
for i := range rsList.Items {
if selector.Matches(labels.Set(rsList.Items[i].Labels)) {
result = append(result, &rsList.Items[i])
}
}
return result, nil
}
func (m *mockReplicaSetNamespaceLister) Get(name string) (*apps.ReplicaSet, error) {
var rs apps.ReplicaSet
if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &rs); err != nil {
return nil, err
}
return &rs, nil
}
func TestSyncDeployment(t *testing.T) {
tests := map[string]struct {
oldRSsReplicas []int32
@ -126,78 +246,83 @@ func TestSyncDeployment(t *testing.T) {
for name, test := range tests {
t.Run(name, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fakeRecord := record.NewFakeRecorder(10)
informers := informers.NewSharedInformerFactory(fakeClient, 0)
rsInformer := informers.Apps().V1().ReplicaSets().Informer()
dInformer := informers.Apps().V1().Deployments().Informer()
deployment := generateDeployment("busybox")
deployment.Spec.Replicas = pointer.Int32(test.dReplicas)
deployment.Status.ReadyReplicas = test.newRSReplicas
availableReplicas := test.newRSAvailable
for _, available := range test.oldRSsAvailable {
availableReplicas += available
}
deployment.Status.UpdatedReplicas = test.newRSReplicas
deployment.Status.Replicas = availableReplicas
deployment.Status.AvailableReplicas = availableReplicas
var deployment apps.Deployment
var newRS apps.ReplicaSet
{
deployment = generateDeployment("busybox")
deployment.Spec.Replicas = pointer.Int32(test.dReplicas)
deployment.Status.ReadyReplicas = test.newRSReplicas
availableReplicas := test.newRSAvailable
for _, available := range test.oldRSsAvailable {
availableReplicas += available
}
deployment.Status.UpdatedReplicas = test.newRSReplicas
deployment.Status.Replicas = availableReplicas
deployment.Status.AvailableReplicas = availableReplicas
dInformer.GetIndexer().Add(&deployment)
_, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
{
for index, replicas := range test.oldRSsReplicas {
rs := generateRS(deployment)
rs.SetName(fmt.Sprintf("rs-%d", index))
rs.Spec.Replicas = pointer.Int32(replicas)
rs.Status.Replicas = replicas
if strings.HasPrefix(name, "scale") {
rs.Annotations = map[string]string{
util.ReplicasAnnotation: strconv.Itoa(-1),
util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)),
}
}
rs.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("old-version-%d", index)
rs.Status.ReadyReplicas = test.oldRSsAvailable[index]
rs.Status.AvailableReplicas = test.oldRSsAvailable[index]
rsInformer.GetIndexer().Add(&rs)
_, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
}
{
newRS = generateRS(deployment)
newRS.SetName("rs-new")
newRS.Spec.Replicas = pointer.Int32(test.newRSReplicas)
var allObjects []ctrlclient.Object
allObjects = append(allObjects, &deployment)
for index, replicas := range test.oldRSsReplicas {
rs := generateRS(deployment)
rs.SetName(fmt.Sprintf("rs-%d", index))
rs.Spec.Replicas = pointer.Int32(replicas)
rs.Status.Replicas = replicas
if strings.HasPrefix(name, "scale") {
newRS.Annotations = map[string]string{
rs.Annotations = map[string]string{
util.ReplicasAnnotation: strconv.Itoa(-1),
util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)),
}
}
newRS.Status.Replicas = test.newRSReplicas
newRS.Status.ReadyReplicas = test.newRSAvailable
newRS.Status.AvailableReplicas = test.newRSAvailable
rsInformer.GetIndexer().Add(&newRS)
_, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
rs.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("old-version-%d", index)
rs.Status.ReadyReplicas = test.oldRSsAvailable[index]
rs.Status.AvailableReplicas = test.oldRSsAvailable[index]
allObjects = append(allObjects, &rs)
}
newRS := generateRS(deployment)
newRS.SetName("rs-new")
newRS.Spec.Replicas = pointer.Int32(test.newRSReplicas)
if strings.HasPrefix(name, "scale") {
newRS.Annotations = map[string]string{
util.ReplicasAnnotation: strconv.Itoa(-1),
util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)),
}
}
newRS.Status.Replicas = test.newRSReplicas
newRS.Status.ReadyReplicas = test.newRSAvailable
newRS.Status.AvailableReplicas = test.newRSAvailable
allObjects = append(allObjects, &newRS)
fakeCtrlClient := ctrlfake.NewClientBuilder().
WithObjects(allObjects...).
Build()
// Create a mock event recorder
fakeRecord := record.NewFakeRecorder(10)
// Create a mock deployment lister
mockDeploymentLister := &mockDeploymentLister{client: fakeCtrlClient}
// Create a fake client with the same objects
fakeClient := fake.NewSimpleClientset()
_, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create deployment in fake client: %v", err)
}
for _, obj := range allObjects {
if rs, ok := obj.(*apps.ReplicaSet); ok {
_, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), rs, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create replicaset in fake client: %v", err)
}
}
}
dc := &DeploymentController{
client: fakeClient,
eventRecorder: fakeRecord,
dLister: appslisters.NewDeploymentLister(dInformer.GetIndexer()),
rsLister: appslisters.NewReplicaSetLister(rsInformer.GetIndexer()),
dLister: mockDeploymentLister,
rsLister: &mockReplicaSetLister{client: fakeCtrlClient},
runtimeClient: fakeCtrlClient,
strategy: rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &test.maxSurge,
@ -207,17 +332,18 @@ func TestSyncDeployment(t *testing.T) {
},
}
err := dc.syncDeployment(context.TODO(), &deployment)
err = dc.syncDeployment(context.TODO(), &deployment)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
rss, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
var rsList apps.ReplicaSetList
if err := fakeCtrlClient.List(context.TODO(), &rsList); err != nil {
t.Fatalf("list rs error: %v", err)
}
resultOld := int32(0)
resultNew := int32(0)
for _, rs := range rss.Items {
for _, rs := range rsList.Items {
if rs.GetName() != "rs-new" {
resultOld += *rs.Spec.Replicas
} else {

View File

@ -33,6 +33,8 @@ import (
"k8s.io/utils/pointer"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake"
)
func newDControllerRef(d *apps.Deployment) *metav1.OwnerReference {
@ -255,16 +257,6 @@ func TestReconcileNewReplicaSet(t *testing.T) {
t.Run(name, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fakeRecord := record.NewFakeRecorder(10)
dc := &DeploymentController{
client: fakeClient,
eventRecorder: fakeRecord,
strategy: rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &test.maxSurge,
},
Partition: test.partition,
},
}
var deployment apps.Deployment
var newRS apps.ReplicaSet
@ -299,16 +291,44 @@ func TestReconcileNewReplicaSet(t *testing.T) {
t.Fatalf("got unexpected error: %v", err)
}
}
// Create controller-runtime client with all objects
var allObjects []ctrlclient.Object
allObjects = append(allObjects, &deployment)
for _, rs := range allRSs {
allObjects = append(allObjects, rs)
}
fakeCtrlClient := ctrlfake.NewClientBuilder().
WithObjects(allObjects...).
Build()
dc := &DeploymentController{
client: fakeClient,
eventRecorder: fakeRecord,
dLister: &mockDeploymentLister{client: fakeCtrlClient},
rsLister: &mockReplicaSetLister{client: fakeCtrlClient},
runtimeClient: fakeCtrlClient,
strategy: rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &test.maxSurge,
},
Partition: test.partition,
},
}
_, err := dc.reconcileNewReplicaSet(context.TODO(), allRSs, &newRS, &deployment)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
result, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).Get(context.TODO(), newRS.Name, metav1.GetOptions{})
if err != nil {
// Check result from runtimeClient instead of fakeClient
var resultRS apps.ReplicaSet
if err := fakeCtrlClient.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: newRS.Namespace, Name: newRS.Name}, &resultRS); err != nil {
t.Fatalf("got unexpected error: %v", err)
}
if *result.Spec.Replicas != test.expect {
t.Fatalf("expect %d, but got %d", test.expect, *result.Spec.Replicas)
if *resultRS.Spec.Replicas != test.expect {
t.Fatalf("expect %d, but got %d", test.expect, *resultRS.Spec.Replicas)
}
})
}
@ -381,17 +401,6 @@ func TestReconcileOldReplicaSet(t *testing.T) {
t.Run(name, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fakeRecord := record.NewFakeRecorder(10)
dc := &DeploymentController{
client: fakeClient,
eventRecorder: fakeRecord,
strategy: rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &test.maxSurge,
MaxUnavailable: &test.maxUnavailable,
},
Partition: test.partition,
},
}
var deployment apps.Deployment
var newRS apps.ReplicaSet
@ -443,16 +452,45 @@ func TestReconcileOldReplicaSet(t *testing.T) {
t.Fatalf("got unexpected error: %v", err)
}
}
// Create controller-runtime client with all objects
var allObjects []ctrlclient.Object
allObjects = append(allObjects, &deployment)
for _, rs := range allRSs {
allObjects = append(allObjects, rs)
}
fakeCtrlClient := ctrlfake.NewClientBuilder().
WithObjects(allObjects...).
Build()
dc := &DeploymentController{
client: fakeClient,
eventRecorder: fakeRecord,
dLister: &mockDeploymentLister{client: fakeCtrlClient},
rsLister: &mockReplicaSetLister{client: fakeCtrlClient},
runtimeClient: fakeCtrlClient,
strategy: rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &test.maxSurge,
MaxUnavailable: &test.maxUnavailable,
},
Partition: test.partition,
},
}
_, err := dc.reconcileOldReplicaSets(context.TODO(), allRSs, oldRSs, &newRS, &deployment)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
rss, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
// Check result from runtimeClient instead of fakeClient
var rsList apps.ReplicaSetList
if err := fakeCtrlClient.List(context.TODO(), &rsList); err != nil {
t.Fatalf("got unexpected error: %v", err)
}
result := int32(0)
for _, rs := range rss.Items {
for _, rs := range rsList.Items {
if rs.GetName() != "rs-new" {
result += *rs.Spec.Replicas
}

View File

@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/utils/integer"
"sigs.k8s.io/controller-runtime/pkg/client"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util"
@ -117,8 +118,20 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De
annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars)
minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
if annotationsUpdated || minReadySecondsNeedsUpdate {
rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
// Use existing state directly for patching, let API Server handle conflicts
rsCopy = existingNewRS.DeepCopy()
deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars)
if minReadySecondsNeedsUpdate {
rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
}
// Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error
// Controller-runtime will automatically reschedule for reconciliation
err := dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(existingNewRS))
if err != nil {
return nil, err
}
return rsCopy, nil
}
// Should use the revision in existingNewRS's annotation, since it set by before
@ -410,11 +423,29 @@ func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.Re
var err error
if sizeNeedsUpdate || annotationsNeedUpdate {
oldScale := *(rs.Spec.Replicas)
// Use existing state directly for patching, let API Server handle conflicts
rsCopy := rs.DeepCopy()
*(rsCopy.Spec.Replicas) = newScale
deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(deployment, &dc.strategy))
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
if err == nil && sizeNeedsUpdate {
if sizeNeedsUpdate {
rsCopy.Spec.Replicas = &newScale
}
if annotationsNeedUpdate {
// Set the annotations that need to be updated
desiredReplicas := *(deployment.Spec.Replicas)
maxReplicas := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(deployment, &dc.strategy)
deploymentutil.SetReplicasAnnotations(rsCopy, desiredReplicas, maxReplicas)
}
// Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error
// Controller-runtime will automatically reschedule for reconciliation
err = dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(rs))
if err != nil {
return scaled, rs, err
}
rs = rsCopy
if sizeNeedsUpdate {
scaled = true
dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d from %d", scalingOperation, rs.Name, newScale, oldScale)
}

View File

@ -41,5 +41,4 @@ func TestCommonPatch(t *testing.T) {
if !reflect.DeepEqual(patchReq.String(), expectedPatchBody) {
t.Fatalf("Not equal: \n%s \n%s", expectedPatchBody, patchReq.String())
}
}