Merge pull request #4078 from chaosi-zju/retain-2

retain for hpa controlled Deployment resource (labels method)
This commit is contained in:
karmada-bot 2023-09-21 22:52:00 +08:00 committed by GitHub
commit 681515fbf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 370 additions and 32 deletions

View File

@ -30,6 +30,7 @@ spec:
- --cluster-status-update-frequency=10s - --cluster-status-update-frequency=10s
- --secure-port=10357 - --secure-port=10357
- --failover-eviction-timeout=30s - --failover-eviction-timeout=30s
- --controllers=*,hpaReplicasSyncer
- --feature-gates=PropagationPolicyPreemption=true - --feature-gates=PropagationPolicyPreemption=true
- --v=4 - --v=4
livenessProbe: livenessProbe:

View File

@ -603,6 +603,7 @@ func startHPAReplicasSyncerController(ctx controllerscontext.Context) (enabled b
hpaReplicasSyncer := hpareplicassyncer.HPAReplicasSyncer{ hpaReplicasSyncer := hpareplicassyncer.HPAReplicasSyncer{
Client: ctx.Mgr.GetClient(), Client: ctx.Mgr.GetClient(),
DynamicClient: ctx.DynamicClientSet,
RESTMapper: ctx.Mgr.GetRESTMapper(), RESTMapper: ctx.Mgr.GetRESTMapper(),
ScaleClient: scaleClient, ScaleClient: scaleClient,
} }

View File

@ -9,48 +9,45 @@ import (
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/scale" "k8s.io/client-go/scale"
"k8s.io/klog/v2" "k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime" controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/karmada-io/karmada/pkg/util"
) )
var hpaPredicate = predicate.Funcs{ const (
CreateFunc: func(e event.CreateEvent) bool { // ControllerName is the controller name that will be used when reporting events.
return false ControllerName = "hpa-replicas-syncer"
}, // scaleRefWorkerNum is the async Worker number
UpdateFunc: func(e event.UpdateEvent) bool { scaleRefWorkerNum = 1
oldHPA, ok := e.ObjectOld.(*autoscalingv2.HorizontalPodAutoscaler) )
if !ok {
return false
}
newHPA, ok := e.ObjectNew.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
return false
}
return oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas
},
DeleteFunc: func(e event.DeleteEvent) bool {
return false
},
}
// HPAReplicasSyncer is to sync replicas from status of HPA to resource template. // HPAReplicasSyncer is to sync replicas from status of HPA to resource template.
type HPAReplicasSyncer struct { type HPAReplicasSyncer struct {
Client client.Client Client client.Client
DynamicClient dynamic.Interface
RESTMapper meta.RESTMapper RESTMapper meta.RESTMapper
ScaleClient scale.ScalesGetter ScaleClient scale.ScalesGetter
scaleRefWorker util.AsyncWorker
} }
// SetupWithManager creates a controller and register to controller manager. // SetupWithManager creates a controller and register to controller manager.
func (r *HPAReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error { func (r *HPAReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).Named("replicas-syncer"). scaleRefWorkerOptions := util.Options{
For(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(hpaPredicate)). Name: "scale ref worker",
ReconcileFunc: r.reconcileScaleRef,
}
r.scaleRefWorker = util.NewAsyncWorker(scaleRefWorkerOptions)
r.scaleRefWorker.Run(scaleRefWorkerNum, context.Background().Done())
return controllerruntime.NewControllerManagedBy(mgr).
Named(ControllerName).
For(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(r)).
Complete(r) Complete(r)
} }

View File

@ -0,0 +1,77 @@
package hpareplicassyncer
import (
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
)
var _ predicate.Predicate = &HPAReplicasSyncer{}
func (r *HPAReplicasSyncer) Create(e event.CreateEvent) bool {
hpa, ok := e.Object.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
klog.Errorf("create predicates in hpa controller called, but obj is not hpa type")
return false
}
// if hpa exist and has been propagated, add label to its scale ref resource
if hasBeenPropagated(hpa) {
r.scaleRefWorker.Add(labelEvent{addLabelEvent, hpa})
}
return false
}
func (r *HPAReplicasSyncer) Update(e event.UpdateEvent) bool {
oldHPA, ok := e.ObjectOld.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
klog.Errorf("update predicates in hpa controller called, but old obj is not hpa type")
return false
}
newHPA, ok := e.ObjectNew.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
klog.Errorf("update predicates in hpa controller called, but new obj is not hpa type")
return false
}
// hpa scale ref changed, remove old hpa label and add to new hpa
if oldHPA.Spec.ScaleTargetRef.String() != newHPA.Spec.ScaleTargetRef.String() {
// if scale ref has label, remove label, otherwise skip
r.scaleRefWorker.Add(labelEvent{deleteLabelEvent, oldHPA})
}
// if new hpa exist and has been propagated, add label to its scale ref resource
if hasBeenPropagated(newHPA) {
r.scaleRefWorker.Add(labelEvent{addLabelEvent, newHPA})
}
return oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas
}
func (r *HPAReplicasSyncer) Delete(e event.DeleteEvent) bool {
hpa, ok := e.Object.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
klog.Errorf("delete predicates in hpa controller called, but obj is not hpa type")
return false
}
// if scale ref has label, remove label, otherwise skip
r.scaleRefWorker.Add(labelEvent{deleteLabelEvent, hpa})
return false
}
func (r *HPAReplicasSyncer) Generic(e event.GenericEvent) bool {
return false
}
func hasBeenPropagated(hpa *autoscalingv2.HorizontalPodAutoscaler) bool {
_, ppExist := hpa.GetLabels()[policyv1alpha1.PropagationPolicyUIDLabel]
_, cppExist := hpa.GetLabels()[policyv1alpha1.ClusterPropagationPolicyUIDLabel]
return ppExist || cppExist
}

View File

@ -0,0 +1,137 @@
package hpareplicassyncer
import (
"context"
"fmt"
autoscalingv2 "k8s.io/api/autoscaling/v2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
)
type labelEventKind int
const (
// addLabelEvent refer to addding util.RetainReplicasLabel to resource scaled by HPA
addLabelEvent labelEventKind = iota
// deleteLabelEvent refer to deleting util.RetainReplicasLabel from resource scaled by HPA
deleteLabelEvent
)
type labelEvent struct {
kind labelEventKind
hpa *autoscalingv2.HorizontalPodAutoscaler
}
func (r *HPAReplicasSyncer) reconcileScaleRef(key util.QueueKey) (err error) {
event, ok := key.(labelEvent)
if !ok {
klog.Errorf("Found invalid key when reconciling hpa scale ref: %+v", key)
return nil
}
switch event.kind {
case addLabelEvent:
err = r.addHPALabelToScaleRef(context.TODO(), event.hpa)
case deleteLabelEvent:
err = r.deleteHPALabelFromScaleRef(context.TODO(), event.hpa)
default:
klog.Errorf("Found invalid key when reconciling hpa scale ref: %+v", key)
return nil
}
if err != nil {
klog.Errorf("reconcile scale ref failed: %+v", err)
}
return err
}
func (r *HPAReplicasSyncer) addHPALabelToScaleRef(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error {
targetGVK := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind)
mapping, err := r.RESTMapper.RESTMapping(targetGVK.GroupKind(), targetGVK.Version)
if err != nil {
return fmt.Errorf("unable to recognize scale ref resource, %s/%v, err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}
scaleRef, err := r.DynamicClient.Resource(mapping.Resource).Namespace(hpa.Namespace).Get(ctx, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
return fmt.Errorf("failed to find scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}
// use patch is better than update, when modification occur after get, patch can still success while update can not
newScaleRef := scaleRef.DeepCopy()
util.MergeLabel(newScaleRef, util.RetainReplicasLabel, util.RetainReplicasValue)
patchBytes, err := helper.GenMergePatch(scaleRef, newScaleRef)
if err != nil {
return fmt.Errorf("failed to gen merge patch (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}
if len(patchBytes) == 0 {
klog.Infof("hpa labels already exist, skip adding (%s/%v)", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
_, err = r.DynamicClient.Resource(mapping.Resource).Namespace(newScaleRef.GetNamespace()).
Patch(ctx, newScaleRef.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
return fmt.Errorf("failed to patch scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}
klog.Infof("add hpa labels to %s/%v success", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
func (r *HPAReplicasSyncer) deleteHPALabelFromScaleRef(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error {
targetGVK := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind)
mapping, err := r.RESTMapper.RESTMapping(targetGVK.GroupKind(), targetGVK.Version)
if err != nil {
return fmt.Errorf("unable to recognize scale ref resource, %s/%v, err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}
scaleRef, err := r.DynamicClient.Resource(mapping.Resource).Namespace(hpa.Namespace).Get(ctx, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
return fmt.Errorf("failed to find scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}
// use patch is better than update, when modification occur after get, patch can still success while update can not
newScaleRef := scaleRef.DeepCopy()
util.RemoveLabels(newScaleRef, util.RetainReplicasLabel)
patchBytes, err := helper.GenMergePatch(scaleRef, newScaleRef)
if err != nil {
return fmt.Errorf("failed to gen merge patch (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}
if len(patchBytes) == 0 {
klog.Infof("hpa labels not exist, skip deleting (%s/%v)", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
_, err = r.DynamicClient.Resource(mapping.Resource).Namespace(newScaleRef.GetNamespace()).
Patch(ctx, newScaleRef.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
return fmt.Errorf("failed to patch scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}
klog.Infof("delete hpa labels from %s/%+v success", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}

View File

@ -574,7 +574,8 @@ func aggregateHorizontalPodAutoscalerStatus(object *unstructured.Unstructured, a
if err = json.Unmarshal(item.Status.Raw, temp); err != nil { if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err return nil, err
} }
klog.V(3).Infof("Grab hpa(%s/%s) status from cluster(%s), CurrentReplicas: %d", temp.CurrentReplicas) klog.V(3).Infof("Grab hpa(%s/%s) status from cluster(%s), CurrentReplicas: %d, DesiredReplicas: %d",
hpa.Namespace, hpa.Name, item.ClusterName, temp.CurrentReplicas, temp.DesiredReplicas)
newStatus.CurrentReplicas += temp.CurrentReplicas newStatus.CurrentReplicas += temp.CurrentReplicas
newStatus.DesiredReplicas += temp.DesiredReplicas newStatus.DesiredReplicas += temp.DesiredReplicas

View File

@ -3,6 +3,7 @@ package native
import ( import (
"fmt" "fmt"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -18,6 +19,7 @@ type retentionInterpreter func(desired *unstructured.Unstructured, observed *uns
func getAllDefaultRetentionInterpreter() map[schema.GroupVersionKind]retentionInterpreter { func getAllDefaultRetentionInterpreter() map[schema.GroupVersionKind]retentionInterpreter {
s := make(map[schema.GroupVersionKind]retentionInterpreter) s := make(map[schema.GroupVersionKind]retentionInterpreter)
s[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)] = retainWorkloadReplicas
s[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = retainPodFields s[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = retainPodFields
s[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = lifted.RetainServiceFields s[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = lifted.RetainServiceFields
s[corev1.SchemeGroupVersion.WithKind(util.ServiceAccountKind)] = lifted.RetainServiceAccountFields s[corev1.SchemeGroupVersion.WithKind(util.ServiceAccountKind)] = lifted.RetainServiceAccountFields
@ -122,3 +124,23 @@ func retainJobSelectorFields(desired, observed *unstructured.Unstructured) (*uns
} }
return desired, nil return desired, nil
} }
func retainWorkloadReplicas(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
labels, _, err := unstructured.NestedStringMap(desired.Object, "metadata", "labels")
if err != nil {
return nil, fmt.Errorf("failed to get metadata.label from desired.Object: %+v", err)
}
if label, labelExist := labels[util.RetainReplicasLabel]; labelExist && label == util.RetainReplicasValue {
replicas, exist, err := unstructured.NestedInt64(observed.Object, "spec", "replicas")
if err != nil || !exist {
return nil, fmt.Errorf("failed to get spec.replicas from %s %s/%s", observed.GetKind(), observed.GetNamespace(), observed.GetName())
}
err = unstructured.SetNestedField(desired.Object, replicas, "spec", "replicas")
if err != nil {
return nil, fmt.Errorf("failed to set spec.replicas to %s %s/%s", desired.GetKind(), desired.GetNamespace(), desired.GetName())
}
}
return desired, nil
}

View File

@ -0,0 +1,94 @@
package native
import (
"testing"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
)
func Test_retainK8sWorkloadReplicas(t *testing.T) {
desiredNum, observedNum := int32(2), int32(4)
observed, _ := helper.ToUnstructured(&appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx",
},
Spec: appsv1.DeploymentSpec{
Replicas: &observedNum,
},
})
desired, _ := helper.ToUnstructured(&appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx",
Labels: map[string]string{
util.RetainReplicasLabel: util.RetainReplicasValue,
},
},
Spec: appsv1.DeploymentSpec{
Replicas: &desiredNum,
},
})
want, _ := helper.ToUnstructured(&appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx",
Labels: map[string]string{
util.RetainReplicasLabel: util.RetainReplicasValue,
},
},
Spec: appsv1.DeploymentSpec{
Replicas: &observedNum,
},
})
desired2, _ := helper.ToUnstructured(&appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx",
},
Spec: appsv1.DeploymentSpec{
Replicas: &desiredNum,
},
})
type args struct {
desired *unstructured.Unstructured
observed *unstructured.Unstructured
}
tests := []struct {
name string
args args
want *unstructured.Unstructured
wantErr bool
}{
{
name: "deployment is control by hpa",
args: args{
desired: desired,
observed: observed,
},
want: want,
wantErr: false,
},
{
name: "deployment is not control by hpa",
args: args{
desired: desired2,
observed: observed,
},
want: desired2,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := retainWorkloadReplicas(tt.args.desired, tt.args.observed)
if (err != nil) != tt.wantErr {
t.Errorf("reflectPodDisruptionBudgetStatus() error = %v, wantErr %v", err, tt.wantErr)
return
}
assert.Equalf(t, tt.want, got, "retainDeploymentFields(%v, %v)", tt.args.desired, tt.args.observed)
})
}
}

View File

@ -28,6 +28,14 @@ const (
// ManagedByKarmadaLabelValue indicates that resources are managed by karmada controllers. // ManagedByKarmadaLabelValue indicates that resources are managed by karmada controllers.
ManagedByKarmadaLabelValue = "true" ManagedByKarmadaLabelValue = "true"
// RetainReplicasLabel is a reserved label to indicate whether the replicas should be retained. e.g:
// resourcetemplate.karmada.io/retain-replicas: true // with value `true` indicates retain
// resourcetemplate.karmada.io/retain-replicas: false // with value `false` and others, indicates not retain
RetainReplicasLabel = "resourcetemplate.karmada.io/retain-replicas"
// RetainReplicasValue is an optional value of RetainReplicasLabel, indicating retain
RetainReplicasValue = "true"
) )
// Define annotations used by karmada system. // Define annotations used by karmada system.