use runtimeClient to operate deployment uniformly
Signed-off-by: zhihao jian <zhihao.jian@shopee.com> remove dupl SetNewReplicaSetAnnotations use UnsafeDisableDeepCopy to optimize performance use optimisticLock for patch fix patch extra status always failed fix unit test add comment
This commit is contained in:
parent
51cebc1d44
commit
884be19fa3
|
|
@ -31,8 +31,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
appslisters "k8s.io/client-go/listers/apps/v1"
|
||||
toolscache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
|
|
@ -83,19 +81,6 @@ func Add(mgr manager.Manager) error {
|
|||
|
||||
// newReconciler returns a new reconcile.Reconciler
|
||||
func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
|
||||
cacher := mgr.GetCache()
|
||||
dInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("Deployment"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rsInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("ReplicaSet"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Lister
|
||||
dLister := appslisters.NewDeploymentLister(dInformer.(toolscache.SharedIndexInformer).GetIndexer())
|
||||
rsLister := appslisters.NewReplicaSetLister(rsInformer.(toolscache.SharedIndexInformer).GetIndexer())
|
||||
|
||||
// Client & Recorder
|
||||
genericClient := clientutil.GetGenericClientWithName("advanced-deployment-controller")
|
||||
|
|
@ -106,12 +91,9 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
|
|||
|
||||
// Deployment controller factory
|
||||
factory := &controllerFactory{
|
||||
client: genericClient.KubeClient,
|
||||
runtimeClient: mgr.GetClient(),
|
||||
eventBroadcaster: eventBroadcaster,
|
||||
eventRecorder: recorder,
|
||||
dLister: dLister,
|
||||
rsLister: rsLister,
|
||||
}
|
||||
return &ReconcileDeployment{Client: mgr.GetClient(), controllerFactory: factory}, nil
|
||||
}
|
||||
|
|
@ -268,12 +250,9 @@ func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *Deploy
|
|||
klog.V(4).Infof("Processing deployment %v strategy %v", klog.KObj(deployment), string(marshaled))
|
||||
|
||||
return &DeploymentController{
|
||||
client: f.client,
|
||||
runtimeClient: f.runtimeClient,
|
||||
eventBroadcaster: f.eventBroadcaster,
|
||||
eventRecorder: f.eventRecorder,
|
||||
dLister: f.dLister,
|
||||
rsLister: f.rsLister,
|
||||
strategy: strategy,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,15 +25,12 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
utilclient "github.com/openkruise/rollouts/pkg/util/client"
|
||||
apps "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
appslisters "k8s.io/client-go/listers/apps/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
|
|
@ -57,16 +54,9 @@ var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
|
|||
// DeploymentController is responsible for synchronizing Deployment objects stored
|
||||
// in the system with actual running replica sets and pods.
|
||||
type DeploymentController struct {
|
||||
client clientset.Interface
|
||||
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
eventRecorder record.EventRecorder
|
||||
|
||||
// dLister can list/get deployments from the shared informer's store
|
||||
dLister appslisters.DeploymentLister
|
||||
// rsLister can list/get replica sets from the shared informer's store
|
||||
rsLister appslisters.ReplicaSetLister
|
||||
|
||||
// we will use this strategy to replace spec.strategy of deployment
|
||||
strategy rolloutsv1alpha1.DeploymentStrategy
|
||||
|
||||
|
|
@ -81,15 +71,18 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context,
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
|
||||
}
|
||||
// List all ReplicaSets to find those we own but that no longer match our
|
||||
// selector. They will be orphaned by ClaimReplicaSets().
|
||||
allRSs, err := dc.rsLister.ReplicaSets(d.Namespace).List(deploymentSelector)
|
||||
|
||||
// List all ReplicaSets using runtimeClient
|
||||
rsList := &apps.ReplicaSetList{}
|
||||
err = dc.runtimeClient.List(ctx, rsList, client.InNamespace(d.Namespace), client.MatchingLabelsSelector{Selector: deploymentSelector}, utilclient.DisableDeepCopy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list %s/%s rs failed:%v", d.Namespace, d.Name, err)
|
||||
}
|
||||
|
||||
// select rs owner by current deployment
|
||||
ownedRSs := make([]*apps.ReplicaSet, 0)
|
||||
for _, rs := range allRSs {
|
||||
for i := range rsList.Items {
|
||||
rs := &rsList.Items[i]
|
||||
if !rs.DeletionTimestamp.IsZero() {
|
||||
continue
|
||||
}
|
||||
|
|
@ -119,7 +112,10 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment *
|
|||
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
|
||||
if d.Status.ObservedGeneration < d.Generation {
|
||||
d.Status.ObservedGeneration = d.Generation
|
||||
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
err := dc.runtimeClient.Status().Update(ctx, d)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update deployment status: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -153,38 +149,51 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment *
|
|||
|
||||
// patchExtraStatus will update extra status for advancedStatus
|
||||
func (dc *DeploymentController) patchExtraStatus(deployment *apps.Deployment) error {
|
||||
rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), deployment)
|
||||
// It is necessary to fetch the latest Deployment here because previous steps in the reconcile loop
|
||||
// may update the condition fields (such as lastTransitionTime and lastUpdateTime), causing the
|
||||
// resourceVersion to change. This can lead to patch failures if we do not use the latest object.
|
||||
// The deployment passed in here has an old resourceVersion, so we need to fetch the latest deployment
|
||||
// to ensure patch success.
|
||||
latestDeployment := &apps.Deployment{}
|
||||
err := dc.runtimeClient.Get(context.TODO(), client.ObjectKeyFromObject(deployment), latestDeployment)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get deployment: %v", err)
|
||||
return err
|
||||
}
|
||||
rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), latestDeployment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
updatedReadyReplicas := int32(0)
|
||||
newRS := deploymentutil.FindNewReplicaSet(deployment, rsList)
|
||||
newRS := deploymentutil.FindNewReplicaSet(latestDeployment, rsList)
|
||||
if newRS != nil {
|
||||
updatedReadyReplicas = newRS.Status.ReadyReplicas
|
||||
}
|
||||
|
||||
extraStatus := &rolloutsv1alpha1.DeploymentExtraStatus{
|
||||
UpdatedReadyReplicas: updatedReadyReplicas,
|
||||
ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, deployment),
|
||||
ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, latestDeployment),
|
||||
}
|
||||
|
||||
extraStatusByte, err := json.Marshal(extraStatus)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(deployment), err)
|
||||
klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(latestDeployment), err)
|
||||
return nil // no need to retry
|
||||
}
|
||||
|
||||
extraStatusAnno := string(extraStatusByte)
|
||||
if deployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno {
|
||||
if latestDeployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno {
|
||||
return nil // no need to update
|
||||
}
|
||||
deploymentCopy := latestDeployment.DeepCopy()
|
||||
deploymentCopy.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] = extraStatusAnno
|
||||
|
||||
body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`,
|
||||
rolloutsv1alpha1.DeploymentExtraStatusAnnotation,
|
||||
strings.Replace(extraStatusAnno, `"`, `\"`, -1))
|
||||
|
||||
_, err = dc.client.AppsV1().Deployments(deployment.Namespace).
|
||||
Patch(context.TODO(), deployment.Name, types.MergePatchType, []byte(body), metav1.PatchOptions{})
|
||||
patch := client.MergeFromWithOptions(latestDeployment, client.MergeFromWithOptimisticLock{})
|
||||
err = dc.runtimeClient.Patch(context.TODO(), deploymentCopy, patch)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to patch deployment extra status: %v", err)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,14 +22,11 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
appslisters "k8s.io/client-go/listers/apps/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/utils/pointer"
|
||||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
|
@ -39,123 +36,6 @@ import (
|
|||
"github.com/openkruise/rollouts/pkg/controller/deployment/util"
|
||||
)
|
||||
|
||||
// mockReplicaSetLister implements appslisters.ReplicaSetLister for testing
|
||||
type mockReplicaSetLister struct {
|
||||
client ctrlclient.Client
|
||||
}
|
||||
|
||||
func (m *mockReplicaSetLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) {
|
||||
var rsList apps.ReplicaSetList
|
||||
if err := m.client.List(context.TODO(), &rsList); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []*apps.ReplicaSet
|
||||
for i := range rsList.Items {
|
||||
if selector.Matches(labels.Set(rsList.Items[i].Labels)) {
|
||||
result = append(result, &rsList.Items[i])
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *mockReplicaSetLister) ReplicaSets(namespace string) appslisters.ReplicaSetNamespaceLister {
|
||||
return &mockReplicaSetNamespaceLister{
|
||||
client: m.client,
|
||||
namespace: namespace,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockReplicaSetLister) GetPodReplicaSets(pod *v1.Pod) ([]*apps.ReplicaSet, error) {
|
||||
// For testing purposes, return empty list
|
||||
return []*apps.ReplicaSet{}, nil
|
||||
}
|
||||
|
||||
// mockDeploymentLister implements appslisters.DeploymentLister for testing
|
||||
type mockDeploymentLister struct {
|
||||
client ctrlclient.Client
|
||||
}
|
||||
|
||||
func (m *mockDeploymentLister) List(selector labels.Selector) ([]*apps.Deployment, error) {
|
||||
var deploymentList apps.DeploymentList
|
||||
if err := m.client.List(context.TODO(), &deploymentList); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []*apps.Deployment
|
||||
for i := range deploymentList.Items {
|
||||
if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) {
|
||||
result = append(result, &deploymentList.Items[i])
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *mockDeploymentLister) Deployments(namespace string) appslisters.DeploymentNamespaceLister {
|
||||
return &mockDeploymentNamespaceLister{
|
||||
client: m.client,
|
||||
namespace: namespace,
|
||||
}
|
||||
}
|
||||
|
||||
// mockDeploymentNamespaceLister implements appslisters.DeploymentNamespaceLister for testing
|
||||
type mockDeploymentNamespaceLister struct {
|
||||
client ctrlclient.Client
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (m *mockDeploymentNamespaceLister) List(selector labels.Selector) ([]*apps.Deployment, error) {
|
||||
var deploymentList apps.DeploymentList
|
||||
if err := m.client.List(context.TODO(), &deploymentList, ctrlclient.InNamespace(m.namespace)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []*apps.Deployment
|
||||
for i := range deploymentList.Items {
|
||||
if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) {
|
||||
result = append(result, &deploymentList.Items[i])
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *mockDeploymentNamespaceLister) Get(name string) (*apps.Deployment, error) {
|
||||
var deployment apps.Deployment
|
||||
if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &deployment); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &deployment, nil
|
||||
}
|
||||
|
||||
// mockReplicaSetNamespaceLister implements appslisters.ReplicaSetNamespaceLister for testing
|
||||
type mockReplicaSetNamespaceLister struct {
|
||||
client ctrlclient.Client
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (m *mockReplicaSetNamespaceLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) {
|
||||
var rsList apps.ReplicaSetList
|
||||
if err := m.client.List(context.TODO(), &rsList, ctrlclient.InNamespace(m.namespace)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []*apps.ReplicaSet
|
||||
for i := range rsList.Items {
|
||||
if selector.Matches(labels.Set(rsList.Items[i].Labels)) {
|
||||
result = append(result, &rsList.Items[i])
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *mockReplicaSetNamespaceLister) Get(name string) (*apps.ReplicaSet, error) {
|
||||
var rs apps.ReplicaSet
|
||||
if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &rs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &rs, nil
|
||||
}
|
||||
|
||||
func TestSyncDeployment(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
oldRSsReplicas []int32
|
||||
|
|
@ -298,30 +178,8 @@ func TestSyncDeployment(t *testing.T) {
|
|||
// Create a mock event recorder
|
||||
fakeRecord := record.NewFakeRecorder(10)
|
||||
|
||||
// Create a mock deployment lister
|
||||
mockDeploymentLister := &mockDeploymentLister{client: fakeCtrlClient}
|
||||
|
||||
// Create a fake client with the same objects
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
_, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create deployment in fake client: %v", err)
|
||||
}
|
||||
|
||||
for _, obj := range allObjects {
|
||||
if rs, ok := obj.(*apps.ReplicaSet); ok {
|
||||
_, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), rs, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create replicaset in fake client: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dc := &DeploymentController{
|
||||
client: fakeClient,
|
||||
eventRecorder: fakeRecord,
|
||||
dLister: mockDeploymentLister,
|
||||
rsLister: &mockReplicaSetLister{client: fakeCtrlClient},
|
||||
runtimeClient: fakeCtrlClient,
|
||||
strategy: rolloutsv1alpha1.DeploymentStrategy{
|
||||
RollingUpdate: &apps.RollingUpdateDeployment{
|
||||
|
|
@ -332,9 +190,31 @@ func TestSyncDeployment(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = dc.syncDeployment(context.TODO(), &deployment)
|
||||
// Retry syncDeployment to handle potential resource conflicts gracefully
|
||||
// This simulates the behavior of controller-runtime's reconcile loop
|
||||
var err error
|
||||
maxRetries := 10
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
err = dc.syncDeployment(context.TODO(), &deployment)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Check if it's a conflict error (409)
|
||||
if errors.IsConflict(err) {
|
||||
if i < maxRetries-1 {
|
||||
// Wait a bit before retrying, simulating the reconcile delay
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// For non-conflict errors or after max retries, break
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("got unexpected error: %v", err)
|
||||
t.Fatalf("got unexpected error after retries: %v", err)
|
||||
}
|
||||
|
||||
var rsList apps.ReplicaSetList
|
||||
|
|
|
|||
|
|
@ -25,7 +25,6 @@ import (
|
|||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/openkruise/rollouts/pkg/controller/deployment/util"
|
||||
|
|
@ -115,7 +114,10 @@ func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs []
|
|||
|
||||
newDeployment := d
|
||||
newDeployment.Status = newStatus
|
||||
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
|
||||
err := dc.runtimeClient.Status().Update(ctx, newDeployment)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update deployment status in progress: %v", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -304,10 +304,7 @@ func TestReconcileNewReplicaSet(t *testing.T) {
|
|||
Build()
|
||||
|
||||
dc := &DeploymentController{
|
||||
client: fakeClient,
|
||||
eventRecorder: fakeRecord,
|
||||
dLister: &mockDeploymentLister{client: fakeCtrlClient},
|
||||
rsLister: &mockReplicaSetLister{client: fakeCtrlClient},
|
||||
runtimeClient: fakeCtrlClient,
|
||||
strategy: rolloutsv1alpha1.DeploymentStrategy{
|
||||
RollingUpdate: &apps.RollingUpdateDeployment{
|
||||
|
|
@ -399,7 +396,6 @@ func TestReconcileOldReplicaSet(t *testing.T) {
|
|||
|
||||
for name, test := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
fakeRecord := record.NewFakeRecorder(10)
|
||||
|
||||
var deployment apps.Deployment
|
||||
|
|
@ -417,10 +413,6 @@ func TestReconcileOldReplicaSet(t *testing.T) {
|
|||
deployment.Status.UpdatedReplicas = test.newRSReplicas
|
||||
deployment.Status.Replicas = availableReplicas
|
||||
deployment.Status.AvailableReplicas = availableReplicas
|
||||
_, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("got unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
{
|
||||
for index, replicas := range test.oldRSsReplicas {
|
||||
|
|
@ -433,10 +425,6 @@ func TestReconcileOldReplicaSet(t *testing.T) {
|
|||
rs.Status.AvailableReplicas = test.oldRSsAvailable[index]
|
||||
allRSs = append(allRSs, &rs)
|
||||
oldRSs = append(oldRSs, &rs)
|
||||
_, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("got unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
|
|
@ -447,10 +435,6 @@ func TestReconcileOldReplicaSet(t *testing.T) {
|
|||
newRS.Status.ReadyReplicas = test.newRSAvailable
|
||||
newRS.Status.AvailableReplicas = test.newRSAvailable
|
||||
allRSs = append(allRSs, &newRS)
|
||||
_, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("got unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create controller-runtime client with all objects
|
||||
|
|
@ -465,10 +449,7 @@ func TestReconcileOldReplicaSet(t *testing.T) {
|
|||
Build()
|
||||
|
||||
dc := &DeploymentController{
|
||||
client: fakeClient,
|
||||
eventRecorder: fakeRecord,
|
||||
dLister: &mockDeploymentLister{client: fakeCtrlClient},
|
||||
rsLister: &mockReplicaSetLister{client: fakeCtrlClient},
|
||||
runtimeClient: fakeCtrlClient,
|
||||
strategy: rolloutsv1alpha1.DeploymentStrategy{
|
||||
RollingUpdate: &apps.RollingUpdateDeployment{
|
||||
|
|
|
|||
|
|
@ -118,16 +118,15 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De
|
|||
annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars)
|
||||
minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
|
||||
if annotationsUpdated || minReadySecondsNeedsUpdate {
|
||||
// Use existing state directly for patching, let API Server handle conflicts
|
||||
rsCopy = existingNewRS.DeepCopy()
|
||||
deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars)
|
||||
// Update the copy with the new minReadySeconds
|
||||
if minReadySecondsNeedsUpdate {
|
||||
rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
|
||||
}
|
||||
|
||||
// Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error
|
||||
// Use MergeFrom with optimistic lock for patching, if ResourceVersion conflicts, API Server will return 409 error
|
||||
// Controller-runtime will automatically reschedule for reconciliation
|
||||
err := dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(existingNewRS))
|
||||
patch := client.MergeFromWithOptions(existingNewRS, client.MergeFromWithOptimisticLock{})
|
||||
err := dc.runtimeClient.Patch(ctx, rsCopy, patch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -149,7 +148,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De
|
|||
|
||||
if needsUpdate {
|
||||
var err error
|
||||
if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil {
|
||||
if err = dc.runtimeClient.Status().Update(ctx, d); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
|
@ -201,14 +200,19 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De
|
|||
// hash collisions. If there is any other error, we need to report it in the status of
|
||||
// the Deployment.
|
||||
alreadyExists := false
|
||||
createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{})
|
||||
var createdRS *apps.ReplicaSet
|
||||
err = dc.runtimeClient.Create(ctx, &newRS)
|
||||
if err == nil {
|
||||
createdRS = &newRS
|
||||
}
|
||||
switch {
|
||||
// We may end up hitting this due to a slow cache or a fast resync of the Deployment.
|
||||
case errors.IsAlreadyExists(err):
|
||||
alreadyExists = true
|
||||
|
||||
// Fetch a copy of the ReplicaSet.
|
||||
rs, rsErr := dc.rsLister.ReplicaSets(newRS.Namespace).Get(newRS.Name)
|
||||
rs := &apps.ReplicaSet{}
|
||||
rsErr := dc.runtimeClient.Get(ctx, client.ObjectKey{Namespace: newRS.Namespace, Name: newRS.Name}, rs)
|
||||
if rsErr != nil {
|
||||
return nil, rsErr
|
||||
}
|
||||
|
|
@ -233,9 +237,11 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De
|
|||
*d.Status.CollisionCount++
|
||||
// Update the collisionCount for the Deployment and let it requeue by returning the original
|
||||
// error.
|
||||
_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
dErr := dc.runtimeClient.Status().Update(ctx, d)
|
||||
if dErr == nil {
|
||||
klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount)
|
||||
} else {
|
||||
klog.Errorf("Failed to update deployment collision count: %v", dErr)
|
||||
}
|
||||
return nil, err
|
||||
case errors.HasStatusCause(err, v1.NamespaceTerminatingCause):
|
||||
|
|
@ -249,7 +255,9 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De
|
|||
// We don't really care about this error at this point, since we have a bigger issue to report.
|
||||
// TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account
|
||||
// these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568
|
||||
_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
if updateErr := dc.runtimeClient.Status().Update(ctx, d); updateErr != nil {
|
||||
klog.Errorf("Failed to update deployment status after RS creation failure: %v", updateErr)
|
||||
}
|
||||
}
|
||||
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg)
|
||||
return nil, err
|
||||
|
|
@ -266,7 +274,10 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De
|
|||
needsUpdate = true
|
||||
}
|
||||
if needsUpdate {
|
||||
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
if updateErr := dc.runtimeClient.Status().Update(ctx, d); updateErr != nil {
|
||||
klog.Errorf("Failed to update deployment status: %v", updateErr)
|
||||
err = updateErr
|
||||
}
|
||||
}
|
||||
return createdRS, err
|
||||
}
|
||||
|
|
@ -426,20 +437,13 @@ func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.Re
|
|||
|
||||
// Use existing state directly for patching, let API Server handle conflicts
|
||||
rsCopy := rs.DeepCopy()
|
||||
*(rsCopy.Spec.Replicas) = newScale
|
||||
deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(deployment, &dc.strategy))
|
||||
|
||||
if sizeNeedsUpdate {
|
||||
rsCopy.Spec.Replicas = &newScale
|
||||
}
|
||||
if annotationsNeedUpdate {
|
||||
// Set the annotations that need to be updated
|
||||
desiredReplicas := *(deployment.Spec.Replicas)
|
||||
maxReplicas := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(deployment, &dc.strategy)
|
||||
deploymentutil.SetReplicasAnnotations(rsCopy, desiredReplicas, maxReplicas)
|
||||
}
|
||||
|
||||
// Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error
|
||||
// Use MergeFrom with optimistic lock for patching, if ResourceVersion conflicts, API Server will return 409 error
|
||||
// Controller-runtime will automatically reschedule for reconciliation
|
||||
err = dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(rs))
|
||||
patch := client.MergeFromWithOptions(rs, client.MergeFromWithOptimisticLock{})
|
||||
err = dc.runtimeClient.Patch(ctx, rsCopy, patch)
|
||||
if err != nil {
|
||||
return scaled, rs, err
|
||||
}
|
||||
|
|
@ -482,7 +486,7 @@ func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs []
|
|||
continue
|
||||
}
|
||||
klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name)
|
||||
if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
|
||||
if err := dc.runtimeClient.Delete(ctx, rs); err != nil && !errors.IsNotFound(err) {
|
||||
// Return error instead of aggregating and continuing DELETEs on the theory
|
||||
// that we may be overloading the api server.
|
||||
return err
|
||||
|
|
@ -502,7 +506,10 @@ func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs
|
|||
|
||||
newDeployment := d
|
||||
newDeployment.Status = newStatus
|
||||
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
|
||||
err := dc.runtimeClient.Status().Update(ctx, newDeployment)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to sync deployment status: %v", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue