From 884be19fa3b3d8291158f2c63c2829392b7d145b Mon Sep 17 00:00:00 2001 From: zhihao jian Date: Mon, 14 Jul 2025 19:37:01 +0800 Subject: [PATCH] use runtimeClient to operate deployment uniformly Signed-off-by: zhihao jian remove dupl SetNewReplicaSetAnnotations use UnsafeDisableDeepCopy to optimize performance use optimisticLock for patch fix patch extra status always failed fix unit test add comment --- pkg/controller/deployment/controller.go | 21 --- .../deployment/deployment_controller.go | 63 ++++--- .../deployment/deployment_controller_test.go | 172 +++--------------- pkg/controller/deployment/progress.go | 6 +- pkg/controller/deployment/rolling_test.go | 19 -- pkg/controller/deployment/sync.go | 57 +++--- 6 files changed, 98 insertions(+), 240 deletions(-) diff --git a/pkg/controller/deployment/controller.go b/pkg/controller/deployment/controller.go index 313f873..313f6d0 100644 --- a/pkg/controller/deployment/controller.go +++ b/pkg/controller/deployment/controller.go @@ -31,8 +31,6 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" - appslisters "k8s.io/client-go/listers/apps/v1" - toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -83,19 +81,6 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { - cacher := mgr.GetCache() - dInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("Deployment")) - if err != nil { - return nil, err - } - rsInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("ReplicaSet")) - if err != nil { - return nil, err - } - - // Lister - dLister := appslisters.NewDeploymentLister(dInformer.(toolscache.SharedIndexInformer).GetIndexer()) - rsLister := appslisters.NewReplicaSetLister(rsInformer.(toolscache.SharedIndexInformer).GetIndexer()) // Client & Recorder genericClient := clientutil.GetGenericClientWithName("advanced-deployment-controller") @@ -106,12 +91,9 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { // Deployment controller factory factory := &controllerFactory{ - client: genericClient.KubeClient, runtimeClient: mgr.GetClient(), eventBroadcaster: eventBroadcaster, eventRecorder: recorder, - dLister: dLister, - rsLister: rsLister, } return &ReconcileDeployment{Client: mgr.GetClient(), controllerFactory: factory}, nil } @@ -268,12 +250,9 @@ func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *Deploy klog.V(4).Infof("Processing deployment %v strategy %v", klog.KObj(deployment), string(marshaled)) return &DeploymentController{ - client: f.client, runtimeClient: f.runtimeClient, eventBroadcaster: f.eventBroadcaster, eventRecorder: f.eventRecorder, - dLister: f.dLister, - rsLister: f.rsLister, strategy: strategy, } } diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index f56f7cd..5f584da 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -25,15 +25,12 @@ import ( "encoding/json" "fmt" "reflect" - "strings" "time" + utilclient "github.com/openkruise/rollouts/pkg/util/client" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - clientset "k8s.io/client-go/kubernetes" - appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -57,16 +54,9 @@ var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment") // DeploymentController is responsible for synchronizing Deployment objects stored // in the system with actual running replica sets and pods. type DeploymentController struct { - client clientset.Interface - eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder - // dLister can list/get deployments from the shared informer's store - dLister appslisters.DeploymentLister - // rsLister can list/get replica sets from the shared informer's store - rsLister appslisters.ReplicaSetLister - // we will use this strategy to replace spec.strategy of deployment strategy rolloutsv1alpha1.DeploymentStrategy @@ -81,15 +71,18 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, if err != nil { return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err) } - // List all ReplicaSets to find those we own but that no longer match our - // selector. They will be orphaned by ClaimReplicaSets(). - allRSs, err := dc.rsLister.ReplicaSets(d.Namespace).List(deploymentSelector) + + // List all ReplicaSets using runtimeClient + rsList := &apps.ReplicaSetList{} + err = dc.runtimeClient.List(ctx, rsList, client.InNamespace(d.Namespace), client.MatchingLabelsSelector{Selector: deploymentSelector}, utilclient.DisableDeepCopy) if err != nil { return nil, fmt.Errorf("list %s/%s rs failed:%v", d.Namespace, d.Name, err) } + // select rs owner by current deployment ownedRSs := make([]*apps.ReplicaSet, 0) - for _, rs := range allRSs { + for i := range rsList.Items { + rs := &rsList.Items[i] if !rs.DeletionTimestamp.IsZero() { continue } @@ -119,7 +112,10 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment * dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation - dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + err := dc.runtimeClient.Status().Update(ctx, d) + if err != nil { + klog.Errorf("Failed to update deployment status: %v", err) + } } return nil } @@ -153,38 +149,51 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment * // patchExtraStatus will update extra status for advancedStatus func (dc *DeploymentController) patchExtraStatus(deployment *apps.Deployment) error { - rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), deployment) + // It is necessary to fetch the latest Deployment here because previous steps in the reconcile loop + // may update the condition fields (such as lastTransitionTime and lastUpdateTime), causing the + // resourceVersion to change. This can lead to patch failures if we do not use the latest object. + // The deployment passed in here has an old resourceVersion, so we need to fetch the latest deployment + // to ensure patch success. + latestDeployment := &apps.Deployment{} + err := dc.runtimeClient.Get(context.TODO(), client.ObjectKeyFromObject(deployment), latestDeployment) + if err != nil { + klog.Errorf("Failed to get deployment: %v", err) + return err + } + rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), latestDeployment) if err != nil { return err } updatedReadyReplicas := int32(0) - newRS := deploymentutil.FindNewReplicaSet(deployment, rsList) + newRS := deploymentutil.FindNewReplicaSet(latestDeployment, rsList) if newRS != nil { updatedReadyReplicas = newRS.Status.ReadyReplicas } extraStatus := &rolloutsv1alpha1.DeploymentExtraStatus{ UpdatedReadyReplicas: updatedReadyReplicas, - ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, deployment), + ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, latestDeployment), } extraStatusByte, err := json.Marshal(extraStatus) if err != nil { - klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(deployment), err) + klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(latestDeployment), err) return nil // no need to retry } extraStatusAnno := string(extraStatusByte) - if deployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno { + if latestDeployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno { return nil // no need to update } + deploymentCopy := latestDeployment.DeepCopy() + deploymentCopy.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] = extraStatusAnno - body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, - rolloutsv1alpha1.DeploymentExtraStatusAnnotation, - strings.Replace(extraStatusAnno, `"`, `\"`, -1)) - - _, err = dc.client.AppsV1().Deployments(deployment.Namespace). - Patch(context.TODO(), deployment.Name, types.MergePatchType, []byte(body), metav1.PatchOptions{}) + patch := client.MergeFromWithOptions(latestDeployment, client.MergeFromWithOptimisticLock{}) + err = dc.runtimeClient.Patch(context.TODO(), deploymentCopy, patch) + if err != nil { + klog.Errorf("Failed to patch deployment extra status: %v", err) + return err + } return err } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 53e8459..20d48d0 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -22,14 +22,11 @@ import ( "strconv" "strings" "testing" + "time" apps "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/api/errors" intstrutil "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes/fake" - appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -39,123 +36,6 @@ import ( "github.com/openkruise/rollouts/pkg/controller/deployment/util" ) -// mockReplicaSetLister implements appslisters.ReplicaSetLister for testing -type mockReplicaSetLister struct { - client ctrlclient.Client -} - -func (m *mockReplicaSetLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) { - var rsList apps.ReplicaSetList - if err := m.client.List(context.TODO(), &rsList); err != nil { - return nil, err - } - - var result []*apps.ReplicaSet - for i := range rsList.Items { - if selector.Matches(labels.Set(rsList.Items[i].Labels)) { - result = append(result, &rsList.Items[i]) - } - } - return result, nil -} - -func (m *mockReplicaSetLister) ReplicaSets(namespace string) appslisters.ReplicaSetNamespaceLister { - return &mockReplicaSetNamespaceLister{ - client: m.client, - namespace: namespace, - } -} - -func (m *mockReplicaSetLister) GetPodReplicaSets(pod *v1.Pod) ([]*apps.ReplicaSet, error) { - // For testing purposes, return empty list - return []*apps.ReplicaSet{}, nil -} - -// mockDeploymentLister implements appslisters.DeploymentLister for testing -type mockDeploymentLister struct { - client ctrlclient.Client -} - -func (m *mockDeploymentLister) List(selector labels.Selector) ([]*apps.Deployment, error) { - var deploymentList apps.DeploymentList - if err := m.client.List(context.TODO(), &deploymentList); err != nil { - return nil, err - } - - var result []*apps.Deployment - for i := range deploymentList.Items { - if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) { - result = append(result, &deploymentList.Items[i]) - } - } - return result, nil -} - -func (m *mockDeploymentLister) Deployments(namespace string) appslisters.DeploymentNamespaceLister { - return &mockDeploymentNamespaceLister{ - client: m.client, - namespace: namespace, - } -} - -// mockDeploymentNamespaceLister implements appslisters.DeploymentNamespaceLister for testing -type mockDeploymentNamespaceLister struct { - client ctrlclient.Client - namespace string -} - -func (m *mockDeploymentNamespaceLister) List(selector labels.Selector) ([]*apps.Deployment, error) { - var deploymentList apps.DeploymentList - if err := m.client.List(context.TODO(), &deploymentList, ctrlclient.InNamespace(m.namespace)); err != nil { - return nil, err - } - - var result []*apps.Deployment - for i := range deploymentList.Items { - if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) { - result = append(result, &deploymentList.Items[i]) - } - } - return result, nil -} - -func (m *mockDeploymentNamespaceLister) Get(name string) (*apps.Deployment, error) { - var deployment apps.Deployment - if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &deployment); err != nil { - return nil, err - } - return &deployment, nil -} - -// mockReplicaSetNamespaceLister implements appslisters.ReplicaSetNamespaceLister for testing -type mockReplicaSetNamespaceLister struct { - client ctrlclient.Client - namespace string -} - -func (m *mockReplicaSetNamespaceLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) { - var rsList apps.ReplicaSetList - if err := m.client.List(context.TODO(), &rsList, ctrlclient.InNamespace(m.namespace)); err != nil { - return nil, err - } - - var result []*apps.ReplicaSet - for i := range rsList.Items { - if selector.Matches(labels.Set(rsList.Items[i].Labels)) { - result = append(result, &rsList.Items[i]) - } - } - return result, nil -} - -func (m *mockReplicaSetNamespaceLister) Get(name string) (*apps.ReplicaSet, error) { - var rs apps.ReplicaSet - if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &rs); err != nil { - return nil, err - } - return &rs, nil -} - func TestSyncDeployment(t *testing.T) { tests := map[string]struct { oldRSsReplicas []int32 @@ -298,30 +178,8 @@ func TestSyncDeployment(t *testing.T) { // Create a mock event recorder fakeRecord := record.NewFakeRecorder(10) - // Create a mock deployment lister - mockDeploymentLister := &mockDeploymentLister{client: fakeCtrlClient} - - // Create a fake client with the same objects - fakeClient := fake.NewSimpleClientset() - _, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("failed to create deployment in fake client: %v", err) - } - - for _, obj := range allObjects { - if rs, ok := obj.(*apps.ReplicaSet); ok { - _, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), rs, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("failed to create replicaset in fake client: %v", err) - } - } - } - dc := &DeploymentController{ - client: fakeClient, eventRecorder: fakeRecord, - dLister: mockDeploymentLister, - rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, runtimeClient: fakeCtrlClient, strategy: rolloutsv1alpha1.DeploymentStrategy{ RollingUpdate: &apps.RollingUpdateDeployment{ @@ -332,9 +190,31 @@ func TestSyncDeployment(t *testing.T) { }, } - err = dc.syncDeployment(context.TODO(), &deployment) + // Retry syncDeployment to handle potential resource conflicts gracefully + // This simulates the behavior of controller-runtime's reconcile loop + var err error + maxRetries := 10 + for i := 0; i < maxRetries; i++ { + err = dc.syncDeployment(context.TODO(), &deployment) + if err == nil { + break + } + + // Check if it's a conflict error (409) + if errors.IsConflict(err) { + if i < maxRetries-1 { + // Wait a bit before retrying, simulating the reconcile delay + time.Sleep(1 * time.Second) + continue + } + } + + // For non-conflict errors or after max retries, break + break + } + if err != nil { - t.Fatalf("got unexpected error: %v", err) + t.Fatalf("got unexpected error after retries: %v", err) } var rsList apps.ReplicaSetList diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index d8dc2e5..73199e2 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -25,7 +25,6 @@ import ( apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "github.com/openkruise/rollouts/pkg/controller/deployment/util" @@ -115,7 +114,10 @@ func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs [] newDeployment := d newDeployment.Status = newStatus - _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{}) + err := dc.runtimeClient.Status().Update(ctx, newDeployment) + if err != nil { + klog.Errorf("Failed to update deployment status in progress: %v", err) + } return err } diff --git a/pkg/controller/deployment/rolling_test.go b/pkg/controller/deployment/rolling_test.go index 642844e..04f1f7a 100644 --- a/pkg/controller/deployment/rolling_test.go +++ b/pkg/controller/deployment/rolling_test.go @@ -304,10 +304,7 @@ func TestReconcileNewReplicaSet(t *testing.T) { Build() dc := &DeploymentController{ - client: fakeClient, eventRecorder: fakeRecord, - dLister: &mockDeploymentLister{client: fakeCtrlClient}, - rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, runtimeClient: fakeCtrlClient, strategy: rolloutsv1alpha1.DeploymentStrategy{ RollingUpdate: &apps.RollingUpdateDeployment{ @@ -399,7 +396,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - fakeClient := fake.NewSimpleClientset() fakeRecord := record.NewFakeRecorder(10) var deployment apps.Deployment @@ -417,10 +413,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { deployment.Status.UpdatedReplicas = test.newRSReplicas deployment.Status.Replicas = availableReplicas deployment.Status.AvailableReplicas = availableReplicas - _, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } } { for index, replicas := range test.oldRSsReplicas { @@ -433,10 +425,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { rs.Status.AvailableReplicas = test.oldRSsAvailable[index] allRSs = append(allRSs, &rs) oldRSs = append(oldRSs, &rs) - _, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } } } { @@ -447,10 +435,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { newRS.Status.ReadyReplicas = test.newRSAvailable newRS.Status.AvailableReplicas = test.newRSAvailable allRSs = append(allRSs, &newRS) - _, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } } // Create controller-runtime client with all objects @@ -465,10 +449,7 @@ func TestReconcileOldReplicaSet(t *testing.T) { Build() dc := &DeploymentController{ - client: fakeClient, eventRecorder: fakeRecord, - dLister: &mockDeploymentLister{client: fakeCtrlClient}, - rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, runtimeClient: fakeCtrlClient, strategy: rolloutsv1alpha1.DeploymentStrategy{ RollingUpdate: &apps.RollingUpdateDeployment{ diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 26eac08..8a0a63d 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -118,16 +118,15 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars) minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds if annotationsUpdated || minReadySecondsNeedsUpdate { - // Use existing state directly for patching, let API Server handle conflicts - rsCopy = existingNewRS.DeepCopy() - deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars) + // Update the copy with the new minReadySeconds if minReadySecondsNeedsUpdate { rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds } - // Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error + // Use MergeFrom with optimistic lock for patching, if ResourceVersion conflicts, API Server will return 409 error // Controller-runtime will automatically reschedule for reconciliation - err := dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(existingNewRS)) + patch := client.MergeFromWithOptions(existingNewRS, client.MergeFromWithOptimisticLock{}) + err := dc.runtimeClient.Patch(ctx, rsCopy, patch) if err != nil { return nil, err } @@ -149,7 +148,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De if needsUpdate { var err error - if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil { + if err = dc.runtimeClient.Status().Update(ctx, d); err != nil { return nil, err } } @@ -201,14 +200,19 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De // hash collisions. If there is any other error, we need to report it in the status of // the Deployment. alreadyExists := false - createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{}) + var createdRS *apps.ReplicaSet + err = dc.runtimeClient.Create(ctx, &newRS) + if err == nil { + createdRS = &newRS + } switch { // We may end up hitting this due to a slow cache or a fast resync of the Deployment. case errors.IsAlreadyExists(err): alreadyExists = true // Fetch a copy of the ReplicaSet. - rs, rsErr := dc.rsLister.ReplicaSets(newRS.Namespace).Get(newRS.Name) + rs := &apps.ReplicaSet{} + rsErr := dc.runtimeClient.Get(ctx, client.ObjectKey{Namespace: newRS.Namespace, Name: newRS.Name}, rs) if rsErr != nil { return nil, rsErr } @@ -233,9 +237,11 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De *d.Status.CollisionCount++ // Update the collisionCount for the Deployment and let it requeue by returning the original // error. - _, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + dErr := dc.runtimeClient.Status().Update(ctx, d) if dErr == nil { klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount) + } else { + klog.Errorf("Failed to update deployment collision count: %v", dErr) } return nil, err case errors.HasStatusCause(err, v1.NamespaceTerminatingCause): @@ -249,7 +255,9 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De // We don't really care about this error at this point, since we have a bigger issue to report. // TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account // these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568 - _, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + if updateErr := dc.runtimeClient.Status().Update(ctx, d); updateErr != nil { + klog.Errorf("Failed to update deployment status after RS creation failure: %v", updateErr) + } } dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg) return nil, err @@ -266,7 +274,10 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De needsUpdate = true } if needsUpdate { - _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + if updateErr := dc.runtimeClient.Status().Update(ctx, d); updateErr != nil { + klog.Errorf("Failed to update deployment status: %v", updateErr) + err = updateErr + } } return createdRS, err } @@ -426,20 +437,13 @@ func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.Re // Use existing state directly for patching, let API Server handle conflicts rsCopy := rs.DeepCopy() + *(rsCopy.Spec.Replicas) = newScale + deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(deployment, &dc.strategy)) - if sizeNeedsUpdate { - rsCopy.Spec.Replicas = &newScale - } - if annotationsNeedUpdate { - // Set the annotations that need to be updated - desiredReplicas := *(deployment.Spec.Replicas) - maxReplicas := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(deployment, &dc.strategy) - deploymentutil.SetReplicasAnnotations(rsCopy, desiredReplicas, maxReplicas) - } - - // Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error + // Use MergeFrom with optimistic lock for patching, if ResourceVersion conflicts, API Server will return 409 error // Controller-runtime will automatically reschedule for reconciliation - err = dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(rs)) + patch := client.MergeFromWithOptions(rs, client.MergeFromWithOptimisticLock{}) + err = dc.runtimeClient.Patch(ctx, rsCopy, patch) if err != nil { return scaled, rs, err } @@ -482,7 +486,7 @@ func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs [] continue } klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name) - if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + if err := dc.runtimeClient.Delete(ctx, rs); err != nil && !errors.IsNotFound(err) { // Return error instead of aggregating and continuing DELETEs on the theory // that we may be overloading the api server. return err @@ -502,7 +506,10 @@ func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs newDeployment := d newDeployment.Status = newStatus - _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{}) + err := dc.runtimeClient.Status().Update(ctx, newDeployment) + if err != nil { + klog.Errorf("Failed to sync deployment status: %v", err) + } return err }