Merge pull request #3658 from RainbowMango/pr_implements_increase_nodegroup_size

[CA] implements node group increase size
This commit is contained in:
Kubernetes Prow Robot 2020-11-02 06:40:53 -08:00 committed by GitHub
commit ef0497e31f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 118 additions and 2 deletions

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3/core/sdktime"
huaweicloudsdkas "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3/services/as/v1"
huaweicloudsdkasmodel "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3/services/as/v1/model"
huaweicloudsdkecs "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2"
@ -188,9 +189,75 @@ func (csm *cloudServiceManager) GetInstances(groupID string) ([]cloudprovider.In
return instances, nil
}
// IncreaseSizeInstance increases a scaling group's instance size.
// The workflow works as follows:
// 1. create scaling policy with scheduled type.
// 2. execute the scaling policy immediately(not waiting the policy's launch time).
// 3. wait for the instance number be increased and remove the scaling policy.
func (csm *cloudServiceManager) IncreaseSizeInstance(groupID string, delta int) error {
// TODO(RainbowMango) finish implementation later
return nil
originalInstanceSize, err := csm.GetDesireInstanceNumber(groupID)
if err != nil {
return err
}
// create a scaling policy
launchTime := sdktime.SdkTime(time.Now().Add(time.Hour))
addOperation := huaweicloudsdkasmodel.GetScalingPolicyActionOperationEnum().ADD
instanceNum := int32(delta)
opts := &huaweicloudsdkasmodel.CreateScalingPolicyRequest{
Body: &huaweicloudsdkasmodel.CreateScalingPolicyRequestBody{
// It's not mandatory for AS service to set a unique policy name.
ScalingPolicyName: "huaweicloudautoscaler",
ScalingGroupId: groupID,
ScalingPolicyType: huaweicloudsdkasmodel.GetCreateScalingPolicyRequestBodyScalingPolicyTypeEnum().SCHEDULED,
ScheduledPolicy: &huaweicloudsdkasmodel.ScheduledPolicy{
LaunchTime: &launchTime,
},
ScalingPolicyAction: &huaweicloudsdkasmodel.ScalingPolicyAction{
Operation: &addOperation,
InstanceNumber: &instanceNum,
},
},
}
spID, err := csm.createScalingPolicy(opts)
if err != nil {
return err
}
// make sure scaling policy will be cleaned up.
deletePolicyOps := &huaweicloudsdkasmodel.DeleteScalingPolicyRequest{
ScalingPolicyId: spID,
}
defer csm.deleteScalingPolicy(deletePolicyOps)
// execute policy immediately
executeAction := huaweicloudsdkasmodel.GetExecuteScalingPolicyRequestBodyActionEnum()
executeOpts := &huaweicloudsdkasmodel.ExecuteScalingPolicyRequest{
ScalingPolicyId: spID,
Body: &huaweicloudsdkasmodel.ExecuteScalingPolicyRequestBody{
Action: &executeAction.EXECUTE,
},
}
err = csm.executeScalingPolicy(executeOpts)
if err != nil {
return err
}
// wait for instance number indeed be increased
return wait.Poll(5*time.Second, 300*time.Second, func() (done bool, err error) {
currentInstanceSize, err := csm.GetDesireInstanceNumber(groupID)
if err != nil {
return false, err
}
if currentInstanceSize == originalInstanceSize+delta {
return true, nil
}
klog.V(1).Infof("waiting instance increase from %d to %d, now is: %d", originalInstanceSize, originalInstanceSize+delta, currentInstanceSize)
return false, nil
})
}
func (csm *cloudServiceManager) ListScalingGroups() ([]AutoScalingGroup, error) {
@ -268,3 +335,52 @@ func (csm *cloudServiceManager) transformInstanceState(lifeCycleState huaweiclou
return instanceStatus
}
func (csm *cloudServiceManager) createScalingPolicy(opts *huaweicloudsdkasmodel.CreateScalingPolicyRequest) (scalingPolicyID string, err error) {
asClient := csm.getASClientFunc()
if asClient == nil {
return "", fmt.Errorf("failed to get as client")
}
response, err := asClient.CreateScalingPolicy(opts)
if err != nil {
klog.Warningf("create scaling policy failed. policy: %s, error: %v", opts.String(), err)
return "", err
}
klog.V(1).Infof("create scaling policy succeed. policy id: %s", *(response.ScalingPolicyId))
return *(response.ScalingPolicyId), nil
}
func (csm *cloudServiceManager) executeScalingPolicy(opts *huaweicloudsdkasmodel.ExecuteScalingPolicyRequest) error {
asClient := csm.getASClientFunc()
if asClient == nil {
return fmt.Errorf("failed to get as client")
}
_, err := asClient.ExecuteScalingPolicy(opts)
if err != nil {
klog.Warningf("execute scaling policy failed. policy id: %s, error: %v", opts.ScalingPolicyId, err)
return err
}
klog.V(1).Infof("execute scaling policy succeed. policy id: %s", opts.ScalingPolicyId)
return nil
}
func (csm *cloudServiceManager) deleteScalingPolicy(opts *huaweicloudsdkasmodel.DeleteScalingPolicyRequest) error {
asClient := csm.getASClientFunc()
if asClient == nil {
return fmt.Errorf("failed to get as client")
}
_, err := asClient.DeleteScalingPolicy(opts)
if err != nil {
klog.Warningf("failed to delete scaling policy. policy id: %s, error: %v", opts.ScalingPolicyId, err)
return err
}
klog.V(1).Infof("delete scaling policy succeed. policy id: %s", opts.ScalingPolicyId)
return nil
}