Merge pull request #552 from feiskyer/vmss-compatible

Reduce API calls and avoid touching Azure rate limits
This commit is contained in:
Marcin Wielgus 2018-01-17 15:22:28 +01:00 committed by GitHub
commit 93635e09fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 177 additions and 121 deletions

View File

@ -47,15 +47,9 @@ type AgentPool struct {
template map[string]interface{}
parameters map[string]interface{}
mutex sync.Mutex
targetSize int
provisioning bool
}
// VirtualMachineID contains VMID and ID of a virtual machine.
type VirtualMachineID struct {
ID string
VMID string
mutex sync.Mutex
lastRefresh time.Time
curSize int64
}
// NewAgentPool creates a new AgentPool.
@ -64,10 +58,9 @@ func NewAgentPool(spec *dynamic.NodeGroupSpec, az *AzureManager) (*AgentPool, er
azureRef: azureRef{
Name: spec.Name,
},
minSize: spec.MinSize,
maxSize: spec.MaxSize,
targetSize: -1,
manager: az,
minSize: spec.MinSize,
maxSize: spec.MaxSize,
manager: az,
}
if err := as.initialize(); err != nil {
@ -150,14 +143,14 @@ func (as *AgentPool) MaxSize() int {
}
// GetVMIndexes gets indexes of all virtual machines belongting to the agent pool.
func (as *AgentPool) GetVMIndexes() ([]int, map[int]VirtualMachineID, error) {
func (as *AgentPool) GetVMIndexes() ([]int, map[int]string, error) {
instances, err := as.GetVirtualMachines()
if err != nil {
return nil, nil, err
}
indexes := make([]int, 0)
indexToVM := make(map[int]VirtualMachineID)
indexToVM := make(map[int]string)
for _, instance := range instances {
index, err := GetVMNameIndex(instance.StorageProfile.OsDisk.OsType, *instance.Name)
if err != nil {
@ -165,10 +158,7 @@ func (as *AgentPool) GetVMIndexes() ([]int, map[int]VirtualMachineID, error) {
}
indexes = append(indexes, index)
indexToVM[index] = VirtualMachineID{
ID: "azure://" + strings.ToLower(*instance.ID),
VMID: "azure://" + strings.ToLower(*instance.VMID),
}
indexToVM[index] = "azure://" + strings.ToLower(*instance.ID)
}
sortedIndexes := sort.IntSlice(indexes)
@ -176,23 +166,35 @@ func (as *AgentPool) GetVMIndexes() ([]int, map[int]VirtualMachineID, error) {
return sortedIndexes, indexToVM, nil
}
// TargetSize returns the current TARGET size of the node group. It is possible that the
// number is different from the number of nodes registered in Kubernetes.
func (as *AgentPool) TargetSize() (int, error) {
func (as *AgentPool) getCurSize() (int64, error) {
as.mutex.Lock()
defer as.mutex.Unlock()
if as.targetSize != -1 {
return as.targetSize, nil
if as.lastRefresh.Add(15 * time.Second).After(time.Now()) {
return as.curSize, nil
}
glog.V(5).Infof("Get agent pool size for %q", as.Name)
indexes, _, err := as.GetVMIndexes()
if err != nil {
return 0, err
}
glog.V(5).Infof("Returning agent pool (%q) size: %d\n", as.Name, len(indexes))
as.targetSize = len(indexes)
return as.targetSize, nil
as.curSize = int64(len(indexes))
as.lastRefresh = time.Now()
return as.curSize, nil
}
// TargetSize returns the current TARGET size of the node group. It is possible that the
// number is different from the number of nodes registered in Kubernetes.
func (as *AgentPool) TargetSize() (int, error) {
size, err := as.getCurSize()
if err != nil {
return -1, err
}
return int(size), nil
}
// IncreaseSize increases agent pool size
@ -215,8 +217,8 @@ func (as *AgentPool) IncreaseSize(delta int) error {
}
highestUsedIndex := indexes[len(indexes)-1]
countForTemplate := curSize + delta
as.targetSize = countForTemplate
expectedSize := curSize + delta
countForTemplate := expectedSize
if highestUsedIndex != 0 {
countForTemplate += highestUsedIndex + 1 - curSize
}
@ -234,7 +236,14 @@ func (as *AgentPool) IncreaseSize(delta int) error {
}
_, errChan := as.manager.azClient.deploymentsClient.CreateOrUpdate(as.manager.config.ResourceGroup, newDeploymentName, newDeployment, cancel)
glog.V(3).Infof("Waiting for deploymentsClient.CreateOrUpdate(%s, %s, %s)", as.manager.config.ResourceGroup, newDeploymentName, newDeployment)
return <-errChan
err = <-errChan
if err != nil {
return err
}
as.curSize = int64(expectedSize)
as.lastRefresh = time.Now()
return err
}
// GetVirtualMachines returns list of nodes for the given agent pool.
@ -289,13 +298,14 @@ func (as *AgentPool) DecreaseTargetSize(delta int) error {
return err
}
curTargetSize := as.targetSize
curTargetSize := int(as.curSize)
if curTargetSize+delta < len(nodes) {
return fmt.Errorf("attempt to delete existing nodes targetSize:%d delta:%d existingNodes: %d",
curTargetSize, delta, len(nodes))
}
as.targetSize = curTargetSize + delta
as.curSize = int64(curTargetSize + delta)
as.lastRefresh = time.Now()
return nil
}
@ -415,8 +425,12 @@ func (as *AgentPool) Nodes() ([]string, error) {
nodes := make([]string, 0, len(instances))
for _, instance := range instances {
// Convert to lower because instance.ID is in different in different API calls (e.g. GET and LIST).
name := "azure://" + strings.ToLower(*instance.ID)
if len(*instance.ID) == 0 {
continue
}
// To keep consistent with providerID from kubernetes cloud provider, do not convert ID to lower case.
name := "azure://" + *instance.ID
nodes = append(nodes, name)
}
@ -517,20 +531,3 @@ func (as *AgentPool) deleteVirtualMachine(name string) error {
func (as *AgentPool) getAzureRef() azureRef {
return as.azureRef
}
func (as *AgentPool) getInstanceIDs() (map[azureRef]string, error) {
_, indexToVM, err := as.GetVMIndexes()
if err != nil {
return nil, err
}
result := make(map[azureRef]string)
for i, vm := range indexToVM {
ref := azureRef{
Name: vm.ID,
}
result[ref] = fmt.Sprintf("%d", i)
}
return result, nil
}

View File

@ -19,6 +19,8 @@ package azure
import (
"fmt"
"reflect"
"regexp"
"strings"
"sync"
"time"
@ -27,18 +29,11 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)
// Asg is a wrapper over NodeGroup interface.
type Asg interface {
cloudprovider.NodeGroup
getAzureRef() azureRef
getInstanceIDs() (map[azureRef]string, error)
}
var virtualMachineRE = regexp.MustCompile(`^azure://(?:.*)/providers/microsoft.compute/virtualmachines/(.+)$`)
type asgCache struct {
registeredAsgs []Asg
instanceToAsg map[azureRef]Asg
instanceToID map[azureRef]string
registeredAsgs []cloudprovider.NodeGroup
instanceToAsg map[azureRef]cloudprovider.NodeGroup
notInRegisteredAsg map[azureRef]bool
mutex sync.Mutex
interrupt chan struct{}
@ -46,9 +41,8 @@ type asgCache struct {
func newAsgCache() (*asgCache, error) {
cache := &asgCache{
registeredAsgs: make([]Asg, 0),
instanceToAsg: make(map[azureRef]Asg),
instanceToID: make(map[azureRef]string),
registeredAsgs: make([]cloudprovider.NodeGroup, 0),
instanceToAsg: make(map[azureRef]cloudprovider.NodeGroup),
notInRegisteredAsg: make(map[azureRef]bool),
interrupt: make(chan struct{}),
}
@ -65,7 +59,7 @@ func newAsgCache() (*asgCache, error) {
}
// Register registers a node group if it hasn't been registered.
func (m *asgCache) Register(asg Asg) bool {
func (m *asgCache) Register(asg cloudprovider.NodeGroup) bool {
m.mutex.Lock()
defer m.mutex.Unlock()
@ -94,11 +88,11 @@ func (m *asgCache) invalidateUnownedInstanceCache() {
}
// Unregister ASG. Returns true if the ASG was unregistered.
func (m *asgCache) Unregister(asg Asg) bool {
func (m *asgCache) Unregister(asg cloudprovider.NodeGroup) bool {
m.mutex.Lock()
defer m.mutex.Unlock()
updated := make([]Asg, 0, len(m.registeredAsgs))
updated := make([]cloudprovider.NodeGroup, 0, len(m.registeredAsgs))
changed := false
for _, existing := range m.registeredAsgs {
if existing.Id() == asg.Id() {
@ -112,27 +106,15 @@ func (m *asgCache) Unregister(asg Asg) bool {
return changed
}
func (m *asgCache) get() []Asg {
func (m *asgCache) get() []cloudprovider.NodeGroup {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.registeredAsgs
}
func (m *asgCache) getInstanceIDs(instances []*azureRef) []string {
m.mutex.Lock()
defer m.mutex.Unlock()
instanceIds := make([]string, len(instances))
for i, instance := range instances {
instanceIds[i] = m.instanceToID[*instance]
}
return instanceIds
}
// FindForInstance returns Asg of the given Instance
func (m *asgCache) FindForInstance(instance *azureRef) (Asg, error) {
func (m *asgCache) FindForInstance(instance *azureRef, vmType string) (cloudprovider.NodeGroup, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
@ -142,6 +124,24 @@ func (m *asgCache) FindForInstance(instance *azureRef) (Asg, error) {
return nil, nil
}
if vmType == vmTypeVMSS {
// Omit virtual machines not managed by vmss.
if ok := virtualMachineRE.Match([]byte(instance.Name)); ok {
glog.V(3).Infof("Instance %q is not managed by vmss, omit it in autoscaler", instance.Name)
m.notInRegisteredAsg[*instance] = true
return nil, nil
}
}
if vmType == vmTypeStandard {
// Omit virtual machines with providerID not in Azure resource ID format.
if ok := virtualMachineRE.Match([]byte(instance.Name)); !ok {
glog.V(3).Infof("Instance %q is not in Azure resource ID format, omit it in autoscaler", instance.Name)
m.notInRegisteredAsg[*instance] = true
return nil, nil
}
}
if asg, found := m.instanceToAsg[*instance]; found {
return asg, nil
}
@ -163,7 +163,7 @@ func (m *asgCache) Cleanup() {
}
func (m *asgCache) regenerate() error {
newCache := make(map[azureRef]Asg)
newCache := make(map[azureRef]cloudprovider.NodeGroup)
for _, nsg := range m.registeredAsgs {
instances, err := nsg.Nodes()
@ -172,7 +172,8 @@ func (m *asgCache) regenerate() error {
}
for _, instance := range instances {
ref := azureRef{Name: instance}
// Convert to lower because instance.ID is in different in different API calls (e.g. GET and LIST).
ref := azureRef{Name: strings.ToLower(instance)}
newCache[ref] = nsg
}
}

View File

@ -31,7 +31,7 @@ import (
func newTestAzureManager(t *testing.T) *AzureManager {
manager := &AzureManager{
env: azure.PublicCloud,
explicitlyConfigured: make(map[azureRef]bool),
explicitlyConfigured: make(map[string]bool),
config: &Config{
ResourceGroup: "test",
VMType: vmTypeVMSS,

View File

@ -49,7 +49,7 @@ type AzureManager struct {
asgCache *asgCache
lastRefresh time.Time
asgAutoDiscoverySpecs []cloudprovider.LabelAutoDiscoveryConfig
explicitlyConfigured map[azureRef]bool
explicitlyConfigured map[string]bool
}
// Config holds the configuration parsed from the --cloud-config flag
@ -161,7 +161,7 @@ func CreateAzureManager(configReader io.Reader, discoveryOpts cloudprovider.Node
config: &cfg,
env: env,
azClient: azClient,
explicitlyConfigured: make(map[azureRef]bool),
explicitlyConfigured: make(map[string]bool),
}
cache, err := newAsgCache()
@ -197,7 +197,7 @@ func (m *AzureManager) fetchExplicitAsgs(specs []string) error {
if m.RegisterAsg(asg) {
changed = true
}
m.explicitlyConfigured[asg.getAzureRef()] = true
m.explicitlyConfigured[asg.Id()] = true
}
if changed {
@ -208,7 +208,7 @@ func (m *AzureManager) fetchExplicitAsgs(specs []string) error {
return nil
}
func (m *AzureManager) buildAsgFromSpec(spec string) (Asg, error) {
func (m *AzureManager) buildAsgFromSpec(spec string) (cloudprovider.NodeGroup, error) {
s, err := dynamic.SpecFromString(spec, scaleToZeroSupported)
if err != nil {
return nil, fmt.Errorf("failed to parse node group spec: %v", err)
@ -252,11 +252,11 @@ func (m *AzureManager) fetchAutoAsgs() error {
}
changed := false
exists := make(map[azureRef]bool)
exists := make(map[string]bool)
for _, asg := range groups {
azRef := asg.getAzureRef()
exists[azRef] = true
if m.explicitlyConfigured[azRef] {
asgID := asg.Id()
exists[asgID] = true
if m.explicitlyConfigured[asgID] {
// This ASG was explicitly configured, but would also be
// autodiscovered. We want the explicitly configured min and max
// nodes to take precedence.
@ -270,8 +270,8 @@ func (m *AzureManager) fetchAutoAsgs() error {
}
for _, asg := range m.getAsgs() {
azRef := asg.getAzureRef()
if !exists[azRef] && !m.explicitlyConfigured[azRef] {
asgID := asg.Id()
if !exists[asgID] && !m.explicitlyConfigured[asgID] {
m.UnregisterAsg(asg)
changed = true
}
@ -286,27 +286,23 @@ func (m *AzureManager) fetchAutoAsgs() error {
return nil
}
func (m *AzureManager) getAsgs() []Asg {
func (m *AzureManager) getAsgs() []cloudprovider.NodeGroup {
return m.asgCache.get()
}
func (m *AzureManager) getInstanceIDs(instances []*azureRef) []string {
return m.asgCache.getInstanceIDs(instances)
}
// RegisterAsg registers an ASG.
func (m *AzureManager) RegisterAsg(asg Asg) bool {
func (m *AzureManager) RegisterAsg(asg cloudprovider.NodeGroup) bool {
return m.asgCache.Register(asg)
}
// UnregisterAsg unregisters an ASG.
func (m *AzureManager) UnregisterAsg(asg Asg) bool {
func (m *AzureManager) UnregisterAsg(asg cloudprovider.NodeGroup) bool {
return m.asgCache.Unregister(asg)
}
// GetAsgForInstance returns AsgConfig of the given Instance
func (m *AzureManager) GetAsgForInstance(instance *azureRef) (Asg, error) {
return m.asgCache.FindForInstance(instance)
func (m *AzureManager) GetAsgForInstance(instance *azureRef) (cloudprovider.NodeGroup, error) {
return m.asgCache.FindForInstance(instance, m.config.VMType)
}
func (m *AzureManager) regenerateCache() error {
@ -320,7 +316,11 @@ func (m *AzureManager) Cleanup() {
m.asgCache.Cleanup()
}
func (m *AzureManager) getFilteredAutoscalingGroups(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []Asg, err error) {
func (m *AzureManager) getFilteredAutoscalingGroups(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []cloudprovider.NodeGroup, err error) {
if len(filter) == 0 {
return nil, nil
}
switch m.config.VMType {
case vmTypeVMSS:
asgs, err = m.listScaleSets(filter)
@ -337,7 +337,7 @@ func (m *AzureManager) getFilteredAutoscalingGroups(filter []cloudprovider.Label
}
// listScaleSets gets a list of scale sets and instanceIDs.
func (m *AzureManager) listScaleSets(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []Asg, err error) {
func (m *AzureManager) listScaleSets(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []cloudprovider.NodeGroup, err error) {
result, err := m.azClient.virtualMachineScaleSetsClient.List(m.config.ResourceGroup)
if err != nil {
glog.Errorf("VirtualMachineScaleSetsClient.List for %v failed: %v", m.config.ResourceGroup, err)
@ -385,7 +385,7 @@ func (m *AzureManager) listScaleSets(filter []cloudprovider.LabelAutoDiscoveryCo
// listAgentPools gets a list of agent pools and instanceIDs.
// Note: filter won't take effect for agent pools.
func (m *AzureManager) listAgentPools(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []Asg, err error) {
func (m *AzureManager) listAgentPools(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []cloudprovider.NodeGroup, err error) {
deploy, err := m.azClient.deploymentsClient.Get(m.config.ResourceGroup, m.config.Deployment)
if err != nil {
glog.Errorf("deploymentsClient.Get(%s, %s) failed: %v", m.config.ResourceGroup, m.config.Deployment, err)

View File

@ -19,6 +19,8 @@ package azure
import (
"fmt"
"strings"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/golang/glog"
@ -36,6 +38,10 @@ type ScaleSet struct {
minSize int
maxSize int
mutex sync.Mutex
lastRefresh time.Time
curSize int64
}
// NewScaleSet creates a new NewScaleSet.
@ -47,6 +53,7 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager) (*ScaleSet, erro
minSize: spec.MinSize,
maxSize: spec.MaxSize,
manager: az,
curSize: -1,
}
return scaleSet, nil
@ -84,8 +91,14 @@ func (scaleSet *ScaleSet) MaxSize() int {
return scaleSet.maxSize
}
// GetScaleSetSize gets Scale Set size.
func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) {
func (scaleSet *ScaleSet) getCurSize() (int64, error) {
scaleSet.mutex.Lock()
defer scaleSet.mutex.Unlock()
if scaleSet.lastRefresh.Add(15 * time.Second).After(time.Now()) {
return scaleSet.curSize, nil
}
glog.V(5).Infof("Get scale set size for %q", scaleSet.Name)
resourceGroup := scaleSet.manager.config.ResourceGroup
set, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(resourceGroup, scaleSet.Name)
@ -93,11 +106,22 @@ func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) {
return -1, err
}
glog.V(5).Infof("Returning scale set (%q) capacity: %d\n", scaleSet.Name, *set.Sku.Capacity)
return *set.Sku.Capacity, nil
scaleSet.curSize = *set.Sku.Capacity
scaleSet.lastRefresh = time.Now()
return scaleSet.curSize, nil
}
// GetScaleSetSize gets Scale Set size.
func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) {
return scaleSet.getCurSize()
}
// SetScaleSetSize sets ScaleSet size.
func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error {
scaleSet.mutex.Lock()
defer scaleSet.mutex.Unlock()
resourceGroup := scaleSet.manager.config.ResourceGroup
op, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(resourceGroup, scaleSet.Name)
if err != nil {
@ -107,9 +131,16 @@ func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error {
op.Sku.Capacity = &size
op.VirtualMachineScaleSetProperties.ProvisioningState = nil
cancel := make(chan struct{})
_, errChan := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdate(resourceGroup, scaleSet.Name, op, cancel)
return <-errChan
err = <-errChan
if err != nil {
glog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, err)
return err
}
scaleSet.curSize = size
scaleSet.lastRefresh = time.Now()
return nil
}
// TargetSize returns the current TARGET size of the node group. It is possible that the
@ -217,11 +248,14 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
return nil
}
glog.V(3).Infof("Deleting vmss instances %q", instances)
commonAsg, err := scaleSet.manager.GetAsgForInstance(instances[0])
if err != nil {
return err
}
instanceIDs := []string{}
for _, instance := range instances {
asg, err := scaleSet.manager.GetAsgForInstance(instance)
if err != nil {
@ -229,13 +263,20 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
}
if asg != commonAsg {
return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.GetKey(), commonAsg)
return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg)
}
instanceID, err := getLastSegment(instance.Name)
if err != nil {
glog.Errorf("getLastSegment failed with error: %v", err)
return err
}
instanceIDs = append(instanceIDs, instanceID)
}
instanceIds := scaleSet.manager.getInstanceIDs(instances)
requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIds,
InstanceIds: &instanceIDs,
}
cancel := make(chan struct{})
resourceGroup := scaleSet.manager.config.ResourceGroup
@ -245,7 +286,7 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
// DeleteNodes deletes the nodes from the group.
func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error {
glog.V(8).Infof("Delete nodes requested: %v\n", nodes)
glog.V(8).Infof("Delete nodes requested: %q\n", nodes)
size, err := scaleSet.GetScaleSetSize()
if err != nil {
return err
@ -299,8 +340,11 @@ func (scaleSet *ScaleSet) Nodes() ([]string, error) {
result := make([]string, len(vms))
for i := range vms {
// Convert to lower because instance.ID is in different in different API calls (e.g. GET and LIST).
name := "azure://" + strings.ToLower(*vms[i].ID)
if len(*vms[i].ID) == 0 {
continue
}
// To keep consistent with providerID from kubernetes cloud provider, do not convert ID to lower case.
name := "azure://" + *vms[i].ID
result = append(result, name)
}
@ -324,8 +368,3 @@ func (scaleSet *ScaleSet) getInstanceIDs() (map[azureRef]string, error) {
return result, nil
}
// GetAzureRef gets AzureRef fot the scale set.
func (scaleSet *ScaleSet) getAzureRef() azureRef {
return scaleSet.azureRef
}

View File

@ -350,7 +350,15 @@ func GetVMNameIndex(osType compute.OperatingSystemTypes, vmName string) (int, er
}
func matchDiscoveryConfig(labels map[string]*string, configs []cloudprovider.LabelAutoDiscoveryConfig) bool {
if len(configs) == 0 {
return false
}
for _, c := range configs {
if len(c.Selector) == 0 {
return false
}
for k, v := range c.Selector {
value, ok := labels[k]
if !ok {
@ -409,3 +417,14 @@ func validateConfig(cfg *Config) error {
return nil
}
// getLastSegment gets the last segment splited by '/'.
func getLastSegment(ID string) (string, error) {
parts := strings.Split(strings.TrimSpace(ID), "/")
name := parts[len(parts)-1]
if len(name) == 0 {
return "", fmt.Errorf("identifier '/' not found in resource name %q", ID)
}
return name, nil
}