aggregate the HPA status from member clusters into control plane.

Signed-off-by: chaosi-zju <chaosi@zju.edu.cn>
This commit is contained in:
chaosi-zju 2023-09-15 10:23:49 +08:00
parent 5c77f4516e
commit 64ffa2a28e
4 changed files with 177 additions and 0 deletions

View File

@ -5,6 +5,7 @@ import (
"reflect"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
@ -35,6 +36,7 @@ func getAllDefaultAggregateStatusInterpreter() map[schema.GroupVersionKind]aggre
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeKind)] = aggregatePersistentVolumeStatus
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeClaimKind)] = aggregatePersistentVolumeClaimStatus
s[policyv1.SchemeGroupVersion.WithKind(util.PodDisruptionBudgetKind)] = aggregatePodDisruptionBudgetStatus
s[autoscalingv2.SchemeGroupVersion.WithKind(util.HorizontalPodAutoscalerKind)] = aggregateHorizontalPodAutoscalerStatus
return s
}
@ -554,3 +556,35 @@ func aggregatePodDisruptionBudgetStatus(object *unstructured.Unstructured, aggre
pdb.Status = *newStatus
return helper.ToUnstructured(pdb)
}
func aggregateHorizontalPodAutoscalerStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
hpa := &autoscalingv2.HorizontalPodAutoscaler{}
err := helper.ConvertToTypedObject(object, hpa)
if err != nil {
return nil, err
}
newStatus := &autoscalingv2.HorizontalPodAutoscalerStatus{}
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
temp := &autoscalingv2.HorizontalPodAutoscalerStatus{}
if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err
}
klog.V(3).Infof("Grab hpa(%s/%s) status from cluster(%s), CurrentReplicas: %d", temp.CurrentReplicas)
newStatus.CurrentReplicas += temp.CurrentReplicas
newStatus.DesiredReplicas += temp.DesiredReplicas
}
if reflect.DeepEqual(hpa.Status, *newStatus) {
klog.V(3).Infof("ignore update hpa(%s/%s) status as up to date", hpa.Namespace, hpa.Name)
return object, nil
}
hpa.Status = *newStatus
return helper.ToUnstructured(hpa)
}

View File

@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
@ -1061,3 +1062,73 @@ func Test_aggregateCronJobStatus(t *testing.T) {
})
}
}
func Test_aggregateHorizontalPodAutoscalerStatus(t *testing.T) {
curHPA, _ := helper.ToUnstructured(&autoscalingv2.HorizontalPodAutoscaler{
Status: autoscalingv2.HorizontalPodAutoscalerStatus{
CurrentReplicas: 0,
DesiredReplicas: 0,
},
})
aggregatedStatusItem1, _ := helper.BuildStatusRawExtension(map[string]interface{}{
"currentReplicas": 2,
"desiredReplicas": 2,
})
aggregatedStatusItem2, _ := helper.BuildStatusRawExtension(map[string]interface{}{
"currentReplicas": 4,
"desiredReplicas": 4,
})
expectHPA, _ := helper.ToUnstructured(&autoscalingv2.HorizontalPodAutoscaler{
Status: autoscalingv2.HorizontalPodAutoscalerStatus{
CurrentReplicas: 6,
DesiredReplicas: 6,
},
})
type args struct {
object *unstructured.Unstructured
aggregatedStatusItems []workv1alpha2.AggregatedStatusItem
}
tests := []struct {
name string
args args
want *unstructured.Unstructured
wantErr bool
}{
{
name: "update hpa status",
args: args{
object: curHPA,
aggregatedStatusItems: []workv1alpha2.AggregatedStatusItem{
{ClusterName: "member1", Status: aggregatedStatusItem1, Applied: true},
{ClusterName: "member2", Status: aggregatedStatusItem2, Applied: true},
{ClusterName: "member3", Status: nil, Applied: true},
},
},
want: expectHPA,
wantErr: false,
},
{
name: "hpa status update to dates",
args: args{
object: expectHPA,
aggregatedStatusItems: []workv1alpha2.AggregatedStatusItem{
{ClusterName: "member1", Status: aggregatedStatusItem1, Applied: true},
{ClusterName: "member2", Status: aggregatedStatusItem2, Applied: true},
{ClusterName: "member3", Status: nil, Applied: true},
},
},
want: expectHPA,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := aggregateHorizontalPodAutoscalerStatus(tt.args.object, tt.args.aggregatedStatusItems)
if (err != nil) != tt.wantErr {
t.Errorf("Test_aggregateHorizontalPodAutoscalerStatus() err = %v, wantErr %v", err, tt.wantErr)
}
assert.Equalf(t, tt.want, got, "aggregateHorizontalPodAutoscalerStatus(%v, %v)", tt.args.object, tt.args.aggregatedStatusItems)
})
}
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
@ -28,6 +29,7 @@ func getAllDefaultReflectStatusInterpreter() map[schema.GroupVersionKind]reflect
s[appsv1.SchemeGroupVersion.WithKind(util.DaemonSetKind)] = reflectDaemonSetStatus
s[appsv1.SchemeGroupVersion.WithKind(util.StatefulSetKind)] = reflectStatefulSetStatus
s[policyv1.SchemeGroupVersion.WithKind(util.PodDisruptionBudgetKind)] = reflectPodDisruptionBudgetStatus
s[autoscalingv2.SchemeGroupVersion.WithKind(util.HorizontalPodAutoscalerKind)] = reflectHorizontalPodAutoscalerStatus
return s
}
@ -220,6 +222,32 @@ func reflectPodDisruptionBudgetStatus(object *unstructured.Unstructured) (*runti
return helper.BuildStatusRawExtension(grabStatus)
}
func reflectHorizontalPodAutoscalerStatus(object *unstructured.Unstructured) (*runtime.RawExtension, error) {
statusMap, exist, err := unstructured.NestedMap(object.Object, "status")
if err != nil {
klog.Errorf("Failed to get status field from %s(%s/%s), error: %v",
object.GetKind(), object.GetNamespace(), object.GetName(), err)
return nil, err
}
if !exist {
klog.Errorf("Failed to grab status from %s(%s/%s) which should have status field.",
object.GetKind(), object.GetNamespace(), object.GetName())
return nil, nil
}
hpaStatus := &autoscalingv2.HorizontalPodAutoscalerStatus{}
err = helper.ConvertToTypedObject(statusMap, hpaStatus)
if err != nil {
return nil, fmt.Errorf("failed to convert HorizontalPodAutoscaler from map[string]interface{}: %v", err)
}
grabStatus := autoscalingv2.HorizontalPodAutoscalerStatus{
CurrentReplicas: hpaStatus.CurrentReplicas,
DesiredReplicas: hpaStatus.DesiredReplicas,
}
return helper.BuildStatusRawExtension(grabStatus)
}
func reflectWholeStatus(object *unstructured.Unstructured) (*runtime.RawExtension, error) {
statusMap, exist, err := unstructured.NestedMap(object.Object, "status")
if err != nil {

View File

@ -4,6 +4,8 @@ import (
"reflect"
"testing"
"github.com/stretchr/testify/assert"
autoscalingv2 "k8s.io/api/autoscaling/v2"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
@ -136,3 +138,45 @@ func Test_reflectPodDisruptionBudgetStatus(t *testing.T) {
})
}
}
func Test_reflectHorizontalPodAutoscalerStatus(t *testing.T) {
hpaStatus, _ := helper.ToUnstructured(&autoscalingv2.HorizontalPodAutoscaler{
Status: autoscalingv2.HorizontalPodAutoscalerStatus{
CurrentReplicas: 2,
DesiredReplicas: 2,
},
})
grabStatus, _ := helper.BuildStatusRawExtension(autoscalingv2.HorizontalPodAutoscalerStatus{
CurrentReplicas: 2,
DesiredReplicas: 2,
})
nilHpaStatus, _ := helper.ToUnstructured(&map[string]interface{}{})
tests := []struct {
name string
object *unstructured.Unstructured
want *runtime.RawExtension
wantErr bool
}{
{
name: "reflect hap status",
object: hpaStatus,
want: grabStatus,
wantErr: false,
},
{
name: "reflect nil hpa status",
object: nilHpaStatus,
want: nil,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := reflectHorizontalPodAutoscalerStatus(tt.object)
if (err != nil) != tt.wantErr {
t.Errorf("Test_reflectHorizontalPodAutoscalerStatus() err = %v, wantErr %v", err, tt.wantErr)
}
assert.Equalf(t, tt.want, got, "reflectHorizontalPodAutoscalerStatus(%v)", tt.object)
})
}
}