From a4828cc410debcd1e4e9b194aa3a309dfb67785c Mon Sep 17 00:00:00 2001 From: chaosi-zju Date: Tue, 19 Sep 2023 21:53:26 +0800 Subject: [PATCH] retain for hpa controlled Deployment resource Signed-off-by: chaosi-zju --- .../deploy/karmada-controller-manager.yaml | 1 + .../app/controllermanager.go | 7 +- .../hpa_replicas_syncer_controller.go | 53 ++++--- .../hpa_replicas_syncer_predicate.go | 77 ++++++++++ .../hpareplicassyncer/hpa_scale_ref_worker.go | 137 ++++++++++++++++++ .../default/native/aggregatestatus.go | 3 +- .../default/native/retain.go | 22 +++ .../default/native/retain_test.go | 94 ++++++++++++ pkg/util/constants.go | 8 + 9 files changed, 370 insertions(+), 32 deletions(-) create mode 100644 pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_predicate.go create mode 100644 pkg/controllers/hpareplicassyncer/hpa_scale_ref_worker.go create mode 100644 pkg/resourceinterpreter/default/native/retain_test.go diff --git a/artifacts/deploy/karmada-controller-manager.yaml b/artifacts/deploy/karmada-controller-manager.yaml index 67d0f14be..ab206f34d 100644 --- a/artifacts/deploy/karmada-controller-manager.yaml +++ b/artifacts/deploy/karmada-controller-manager.yaml @@ -30,6 +30,7 @@ spec: - --cluster-status-update-frequency=10s - --secure-port=10357 - --failover-eviction-timeout=30s + - --controllers=*,hpaReplicasSyncer - --feature-gates=PropagationPolicyPreemption=true - --v=4 livenessProbe: diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 60a9e3466..defa1c95b 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -602,9 +602,10 @@ func startHPAReplicasSyncerController(ctx controllerscontext.Context) (enabled b } hpaReplicasSyncer := hpareplicassyncer.HPAReplicasSyncer{ - Client: ctx.Mgr.GetClient(), - RESTMapper: ctx.Mgr.GetRESTMapper(), - ScaleClient: scaleClient, + Client: ctx.Mgr.GetClient(), + DynamicClient: ctx.DynamicClientSet, + RESTMapper: ctx.Mgr.GetRESTMapper(), + ScaleClient: scaleClient, } err = hpaReplicasSyncer.SetupWithManager(ctx.Mgr) if err != nil { diff --git a/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller.go b/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller.go index cac31d723..c14d1c4ca 100644 --- a/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller.go +++ b/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller.go @@ -9,48 +9,45 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" "k8s.io/client-go/scale" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/karmada-io/karmada/pkg/util" ) -var hpaPredicate = predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { - return false - }, - UpdateFunc: func(e event.UpdateEvent) bool { - oldHPA, ok := e.ObjectOld.(*autoscalingv2.HorizontalPodAutoscaler) - if !ok { - return false - } - - newHPA, ok := e.ObjectNew.(*autoscalingv2.HorizontalPodAutoscaler) - if !ok { - return false - } - - return oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas - }, - DeleteFunc: func(e event.DeleteEvent) bool { - return false - }, -} +const ( + // ControllerName is the controller name that will be used when reporting events. + ControllerName = "hpa-replicas-syncer" + // scaleRefWorkerNum is the async Worker number + scaleRefWorkerNum = 1 +) // HPAReplicasSyncer is to sync replicas from status of HPA to resource template. type HPAReplicasSyncer struct { - Client client.Client - RESTMapper meta.RESTMapper - ScaleClient scale.ScalesGetter + Client client.Client + DynamicClient dynamic.Interface + RESTMapper meta.RESTMapper + + ScaleClient scale.ScalesGetter + scaleRefWorker util.AsyncWorker } // SetupWithManager creates a controller and register to controller manager. func (r *HPAReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr).Named("replicas-syncer"). - For(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(hpaPredicate)). + scaleRefWorkerOptions := util.Options{ + Name: "scale ref worker", + ReconcileFunc: r.reconcileScaleRef, + } + r.scaleRefWorker = util.NewAsyncWorker(scaleRefWorkerOptions) + r.scaleRefWorker.Run(scaleRefWorkerNum, context.Background().Done()) + + return controllerruntime.NewControllerManagedBy(mgr). + Named(ControllerName). + For(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(r)). Complete(r) } diff --git a/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_predicate.go b/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_predicate.go new file mode 100644 index 000000000..0d79e4877 --- /dev/null +++ b/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_predicate.go @@ -0,0 +1,77 @@ +package hpareplicassyncer + +import ( + autoscalingv2 "k8s.io/api/autoscaling/v2" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" +) + +var _ predicate.Predicate = &HPAReplicasSyncer{} + +func (r *HPAReplicasSyncer) Create(e event.CreateEvent) bool { + hpa, ok := e.Object.(*autoscalingv2.HorizontalPodAutoscaler) + if !ok { + klog.Errorf("create predicates in hpa controller called, but obj is not hpa type") + return false + } + + // if hpa exist and has been propagated, add label to its scale ref resource + if hasBeenPropagated(hpa) { + r.scaleRefWorker.Add(labelEvent{addLabelEvent, hpa}) + } + + return false +} + +func (r *HPAReplicasSyncer) Update(e event.UpdateEvent) bool { + oldHPA, ok := e.ObjectOld.(*autoscalingv2.HorizontalPodAutoscaler) + if !ok { + klog.Errorf("update predicates in hpa controller called, but old obj is not hpa type") + return false + } + + newHPA, ok := e.ObjectNew.(*autoscalingv2.HorizontalPodAutoscaler) + if !ok { + klog.Errorf("update predicates in hpa controller called, but new obj is not hpa type") + return false + } + + // hpa scale ref changed, remove old hpa label and add to new hpa + if oldHPA.Spec.ScaleTargetRef.String() != newHPA.Spec.ScaleTargetRef.String() { + // if scale ref has label, remove label, otherwise skip + r.scaleRefWorker.Add(labelEvent{deleteLabelEvent, oldHPA}) + } + + // if new hpa exist and has been propagated, add label to its scale ref resource + if hasBeenPropagated(newHPA) { + r.scaleRefWorker.Add(labelEvent{addLabelEvent, newHPA}) + } + + return oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas +} + +func (r *HPAReplicasSyncer) Delete(e event.DeleteEvent) bool { + hpa, ok := e.Object.(*autoscalingv2.HorizontalPodAutoscaler) + if !ok { + klog.Errorf("delete predicates in hpa controller called, but obj is not hpa type") + return false + } + + // if scale ref has label, remove label, otherwise skip + r.scaleRefWorker.Add(labelEvent{deleteLabelEvent, hpa}) + + return false +} + +func (r *HPAReplicasSyncer) Generic(e event.GenericEvent) bool { + return false +} + +func hasBeenPropagated(hpa *autoscalingv2.HorizontalPodAutoscaler) bool { + _, ppExist := hpa.GetLabels()[policyv1alpha1.PropagationPolicyUIDLabel] + _, cppExist := hpa.GetLabels()[policyv1alpha1.ClusterPropagationPolicyUIDLabel] + return ppExist || cppExist +} diff --git a/pkg/controllers/hpareplicassyncer/hpa_scale_ref_worker.go b/pkg/controllers/hpareplicassyncer/hpa_scale_ref_worker.go new file mode 100644 index 000000000..604ed15c8 --- /dev/null +++ b/pkg/controllers/hpareplicassyncer/hpa_scale_ref_worker.go @@ -0,0 +1,137 @@ +package hpareplicassyncer + +import ( + "context" + "fmt" + + autoscalingv2 "k8s.io/api/autoscaling/v2" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" +) + +type labelEventKind int + +const ( + // addLabelEvent refer to addding util.RetainReplicasLabel to resource scaled by HPA + addLabelEvent labelEventKind = iota + // deleteLabelEvent refer to deleting util.RetainReplicasLabel from resource scaled by HPA + deleteLabelEvent +) + +type labelEvent struct { + kind labelEventKind + hpa *autoscalingv2.HorizontalPodAutoscaler +} + +func (r *HPAReplicasSyncer) reconcileScaleRef(key util.QueueKey) (err error) { + event, ok := key.(labelEvent) + if !ok { + klog.Errorf("Found invalid key when reconciling hpa scale ref: %+v", key) + return nil + } + + switch event.kind { + case addLabelEvent: + err = r.addHPALabelToScaleRef(context.TODO(), event.hpa) + case deleteLabelEvent: + err = r.deleteHPALabelFromScaleRef(context.TODO(), event.hpa) + default: + klog.Errorf("Found invalid key when reconciling hpa scale ref: %+v", key) + return nil + } + + if err != nil { + klog.Errorf("reconcile scale ref failed: %+v", err) + } + return err +} + +func (r *HPAReplicasSyncer) addHPALabelToScaleRef(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error { + targetGVK := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind) + mapping, err := r.RESTMapper.RESTMapping(targetGVK.GroupKind(), targetGVK.Version) + if err != nil { + return fmt.Errorf("unable to recognize scale ref resource, %s/%v, err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err) + } + + scaleRef, err := r.DynamicClient.Resource(mapping.Resource).Namespace(hpa.Namespace).Get(ctx, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef) + return nil + } + return fmt.Errorf("failed to find scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err) + } + + // use patch is better than update, when modification occur after get, patch can still success while update can not + newScaleRef := scaleRef.DeepCopy() + util.MergeLabel(newScaleRef, util.RetainReplicasLabel, util.RetainReplicasValue) + patchBytes, err := helper.GenMergePatch(scaleRef, newScaleRef) + if err != nil { + return fmt.Errorf("failed to gen merge patch (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err) + } + if len(patchBytes) == 0 { + klog.Infof("hpa labels already exist, skip adding (%s/%v)", hpa.Namespace, hpa.Spec.ScaleTargetRef) + return nil + } + + _, err = r.DynamicClient.Resource(mapping.Resource).Namespace(newScaleRef.GetNamespace()). + Patch(ctx, newScaleRef.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef) + return nil + } + return fmt.Errorf("failed to patch scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err) + } + + klog.Infof("add hpa labels to %s/%v success", hpa.Namespace, hpa.Spec.ScaleTargetRef) + return nil +} + +func (r *HPAReplicasSyncer) deleteHPALabelFromScaleRef(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error { + targetGVK := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind) + mapping, err := r.RESTMapper.RESTMapping(targetGVK.GroupKind(), targetGVK.Version) + if err != nil { + return fmt.Errorf("unable to recognize scale ref resource, %s/%v, err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err) + } + + scaleRef, err := r.DynamicClient.Resource(mapping.Resource).Namespace(hpa.Namespace).Get(ctx, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef) + return nil + } + return fmt.Errorf("failed to find scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err) + } + + // use patch is better than update, when modification occur after get, patch can still success while update can not + newScaleRef := scaleRef.DeepCopy() + util.RemoveLabels(newScaleRef, util.RetainReplicasLabel) + patchBytes, err := helper.GenMergePatch(scaleRef, newScaleRef) + if err != nil { + return fmt.Errorf("failed to gen merge patch (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err) + } + if len(patchBytes) == 0 { + klog.Infof("hpa labels not exist, skip deleting (%s/%v)", hpa.Namespace, hpa.Spec.ScaleTargetRef) + return nil + } + + _, err = r.DynamicClient.Resource(mapping.Resource).Namespace(newScaleRef.GetNamespace()). + Patch(ctx, newScaleRef.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef) + return nil + } + return fmt.Errorf("failed to patch scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err) + } + + klog.Infof("delete hpa labels from %s/%+v success", hpa.Namespace, hpa.Spec.ScaleTargetRef) + return nil +} diff --git a/pkg/resourceinterpreter/default/native/aggregatestatus.go b/pkg/resourceinterpreter/default/native/aggregatestatus.go index dd1a27a1d..291416b03 100644 --- a/pkg/resourceinterpreter/default/native/aggregatestatus.go +++ b/pkg/resourceinterpreter/default/native/aggregatestatus.go @@ -574,7 +574,8 @@ func aggregateHorizontalPodAutoscalerStatus(object *unstructured.Unstructured, a if err = json.Unmarshal(item.Status.Raw, temp); err != nil { return nil, err } - klog.V(3).Infof("Grab hpa(%s/%s) status from cluster(%s), CurrentReplicas: %d", temp.CurrentReplicas) + klog.V(3).Infof("Grab hpa(%s/%s) status from cluster(%s), CurrentReplicas: %d, DesiredReplicas: %d", + hpa.Namespace, hpa.Name, item.ClusterName, temp.CurrentReplicas, temp.DesiredReplicas) newStatus.CurrentReplicas += temp.CurrentReplicas newStatus.DesiredReplicas += temp.DesiredReplicas diff --git a/pkg/resourceinterpreter/default/native/retain.go b/pkg/resourceinterpreter/default/native/retain.go index 086351cf3..5fb720b51 100644 --- a/pkg/resourceinterpreter/default/native/retain.go +++ b/pkg/resourceinterpreter/default/native/retain.go @@ -3,6 +3,7 @@ package native import ( "fmt" + appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -18,6 +19,7 @@ type retentionInterpreter func(desired *unstructured.Unstructured, observed *uns func getAllDefaultRetentionInterpreter() map[schema.GroupVersionKind]retentionInterpreter { s := make(map[schema.GroupVersionKind]retentionInterpreter) + s[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)] = retainWorkloadReplicas s[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = retainPodFields s[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = lifted.RetainServiceFields s[corev1.SchemeGroupVersion.WithKind(util.ServiceAccountKind)] = lifted.RetainServiceAccountFields @@ -122,3 +124,23 @@ func retainJobSelectorFields(desired, observed *unstructured.Unstructured) (*uns } return desired, nil } + +func retainWorkloadReplicas(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) { + labels, _, err := unstructured.NestedStringMap(desired.Object, "metadata", "labels") + if err != nil { + return nil, fmt.Errorf("failed to get metadata.label from desired.Object: %+v", err) + } + + if label, labelExist := labels[util.RetainReplicasLabel]; labelExist && label == util.RetainReplicasValue { + replicas, exist, err := unstructured.NestedInt64(observed.Object, "spec", "replicas") + if err != nil || !exist { + return nil, fmt.Errorf("failed to get spec.replicas from %s %s/%s", observed.GetKind(), observed.GetNamespace(), observed.GetName()) + } + err = unstructured.SetNestedField(desired.Object, replicas, "spec", "replicas") + if err != nil { + return nil, fmt.Errorf("failed to set spec.replicas to %s %s/%s", desired.GetKind(), desired.GetNamespace(), desired.GetName()) + } + } + + return desired, nil +} diff --git a/pkg/resourceinterpreter/default/native/retain_test.go b/pkg/resourceinterpreter/default/native/retain_test.go new file mode 100644 index 000000000..2d32a3cdf --- /dev/null +++ b/pkg/resourceinterpreter/default/native/retain_test.go @@ -0,0 +1,94 @@ +package native + +import ( + "testing" + + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" +) + +func Test_retainK8sWorkloadReplicas(t *testing.T) { + desiredNum, observedNum := int32(2), int32(4) + observed, _ := helper.ToUnstructured(&appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx", + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &observedNum, + }, + }) + desired, _ := helper.ToUnstructured(&appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx", + Labels: map[string]string{ + util.RetainReplicasLabel: util.RetainReplicasValue, + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &desiredNum, + }, + }) + want, _ := helper.ToUnstructured(&appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx", + Labels: map[string]string{ + util.RetainReplicasLabel: util.RetainReplicasValue, + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &observedNum, + }, + }) + desired2, _ := helper.ToUnstructured(&appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx", + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &desiredNum, + }, + }) + type args struct { + desired *unstructured.Unstructured + observed *unstructured.Unstructured + } + tests := []struct { + name string + args args + want *unstructured.Unstructured + wantErr bool + }{ + { + name: "deployment is control by hpa", + args: args{ + desired: desired, + observed: observed, + }, + want: want, + wantErr: false, + }, + { + name: "deployment is not control by hpa", + args: args{ + desired: desired2, + observed: observed, + }, + want: desired2, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := retainWorkloadReplicas(tt.args.desired, tt.args.observed) + if (err != nil) != tt.wantErr { + t.Errorf("reflectPodDisruptionBudgetStatus() error = %v, wantErr %v", err, tt.wantErr) + return + } + assert.Equalf(t, tt.want, got, "retainDeploymentFields(%v, %v)", tt.args.desired, tt.args.observed) + }) + } +} diff --git a/pkg/util/constants.go b/pkg/util/constants.go index 010f4d5db..9b560e3b9 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -28,6 +28,14 @@ const ( // ManagedByKarmadaLabelValue indicates that resources are managed by karmada controllers. ManagedByKarmadaLabelValue = "true" + + // RetainReplicasLabel is a reserved label to indicate whether the replicas should be retained. e.g: + // resourcetemplate.karmada.io/retain-replicas: true // with value `true` indicates retain + // resourcetemplate.karmada.io/retain-replicas: false // with value `false` and others, indicates not retain + RetainReplicasLabel = "resourcetemplate.karmada.io/retain-replicas" + + // RetainReplicasValue is an optional value of RetainReplicasLabel, indicating retain + RetainReplicasValue = "true" ) // Define annotations used by karmada system.