From c8a680239a37a5f4b2e0866c2e93a18df788d3a2 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Tue, 16 Jan 2018 12:46:16 +0800 Subject: [PATCH 1/3] Ensure azureRef consistent with providerID from kubernetes cloud provider --- .../cloudprovider/azure/azure_agent_pool.go | 36 +++---------------- .../cloudprovider/azure/azure_cache.go | 19 ++-------- .../cloudprovider/azure/azure_manager.go | 4 --- .../cloudprovider/azure/azure_scale_set.go | 16 ++++++--- .../cloudprovider/azure/azure_util.go | 11 ++++++ 5 files changed, 31 insertions(+), 55 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go b/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go index 3cba1b04ea..eb91f19599 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go @@ -52,12 +52,6 @@ type AgentPool struct { provisioning bool } -// VirtualMachineID contains VMID and ID of a virtual machine. -type VirtualMachineID struct { - ID string - VMID string -} - // NewAgentPool creates a new AgentPool. func NewAgentPool(spec *dynamic.NodeGroupSpec, az *AzureManager) (*AgentPool, error) { as := &AgentPool{ @@ -150,14 +144,14 @@ func (as *AgentPool) MaxSize() int { } // GetVMIndexes gets indexes of all virtual machines belongting to the agent pool. -func (as *AgentPool) GetVMIndexes() ([]int, map[int]VirtualMachineID, error) { +func (as *AgentPool) GetVMIndexes() ([]int, map[int]string, error) { instances, err := as.GetVirtualMachines() if err != nil { return nil, nil, err } indexes := make([]int, 0) - indexToVM := make(map[int]VirtualMachineID) + indexToVM := make(map[int]string) for _, instance := range instances { index, err := GetVMNameIndex(instance.StorageProfile.OsDisk.OsType, *instance.Name) if err != nil { @@ -165,10 +159,7 @@ func (as *AgentPool) GetVMIndexes() ([]int, map[int]VirtualMachineID, error) { } indexes = append(indexes, index) - indexToVM[index] = VirtualMachineID{ - ID: "azure://" + strings.ToLower(*instance.ID), - VMID: "azure://" + strings.ToLower(*instance.VMID), - } + indexToVM[index] = "azure://" + strings.ToLower(*instance.ID) } sortedIndexes := sort.IntSlice(indexes) @@ -415,8 +406,8 @@ func (as *AgentPool) Nodes() ([]string, error) { nodes := make([]string, 0, len(instances)) for _, instance := range instances { - // Convert to lower because instance.ID is in different in different API calls (e.g. GET and LIST). - name := "azure://" + strings.ToLower(*instance.ID) + // To keep consistent with providerID from kubernetes cloud provider, do not convert ID to lower case. + name := "azure://" + *instance.ID nodes = append(nodes, name) } @@ -517,20 +508,3 @@ func (as *AgentPool) deleteVirtualMachine(name string) error { func (as *AgentPool) getAzureRef() azureRef { return as.azureRef } - -func (as *AgentPool) getInstanceIDs() (map[azureRef]string, error) { - _, indexToVM, err := as.GetVMIndexes() - if err != nil { - return nil, err - } - - result := make(map[azureRef]string) - for i, vm := range indexToVM { - ref := azureRef{ - Name: vm.ID, - } - result[ref] = fmt.Sprintf("%d", i) - } - - return result, nil -} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_cache.go b/cluster-autoscaler/cloudprovider/azure/azure_cache.go index 2872fa8109..83154ff07a 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_cache.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_cache.go @@ -19,6 +19,7 @@ package azure import ( "fmt" "reflect" + "strings" "sync" "time" @@ -32,13 +33,11 @@ type Asg interface { cloudprovider.NodeGroup getAzureRef() azureRef - getInstanceIDs() (map[azureRef]string, error) } type asgCache struct { registeredAsgs []Asg instanceToAsg map[azureRef]Asg - instanceToID map[azureRef]string notInRegisteredAsg map[azureRef]bool mutex sync.Mutex interrupt chan struct{} @@ -48,7 +47,6 @@ func newAsgCache() (*asgCache, error) { cache := &asgCache{ registeredAsgs: make([]Asg, 0), instanceToAsg: make(map[azureRef]Asg), - instanceToID: make(map[azureRef]string), notInRegisteredAsg: make(map[azureRef]bool), interrupt: make(chan struct{}), } @@ -119,18 +117,6 @@ func (m *asgCache) get() []Asg { return m.registeredAsgs } -func (m *asgCache) getInstanceIDs(instances []*azureRef) []string { - m.mutex.Lock() - defer m.mutex.Unlock() - - instanceIds := make([]string, len(instances)) - for i, instance := range instances { - instanceIds[i] = m.instanceToID[*instance] - } - - return instanceIds -} - // FindForInstance returns Asg of the given Instance func (m *asgCache) FindForInstance(instance *azureRef) (Asg, error) { m.mutex.Lock() @@ -172,7 +158,8 @@ func (m *asgCache) regenerate() error { } for _, instance := range instances { - ref := azureRef{Name: instance} + // Convert to lower because instance.ID is in different in different API calls (e.g. GET and LIST). + ref := azureRef{Name: strings.ToLower(instance)} newCache[ref] = nsg } } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager.go b/cluster-autoscaler/cloudprovider/azure/azure_manager.go index bfac02311e..80befc9428 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager.go @@ -290,10 +290,6 @@ func (m *AzureManager) getAsgs() []Asg { return m.asgCache.get() } -func (m *AzureManager) getInstanceIDs(instances []*azureRef) []string { - return m.asgCache.getInstanceIDs(instances) -} - // RegisterAsg registers an ASG. func (m *AzureManager) RegisterAsg(asg Asg) bool { return m.asgCache.Register(asg) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index 74e48cb67b..6d0f31eb1b 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -222,6 +222,7 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error { return err } + instanceIDs := []string{} for _, instance := range instances { asg, err := scaleSet.manager.GetAsgForInstance(instance) if err != nil { @@ -231,11 +232,18 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error { if asg != commonAsg { return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.GetKey(), commonAsg) } + + instanceID, err := getLastSegment(instance.GetKey()) + if err != nil { + glog.Errorf("getLastSegment failed with error: %v", err) + return err + } + + instanceIDs = append(instanceIDs, instanceID) } - instanceIds := scaleSet.manager.getInstanceIDs(instances) requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ - InstanceIds: &instanceIds, + InstanceIds: &instanceIDs, } cancel := make(chan struct{}) resourceGroup := scaleSet.manager.config.ResourceGroup @@ -299,8 +307,8 @@ func (scaleSet *ScaleSet) Nodes() ([]string, error) { result := make([]string, len(vms)) for i := range vms { - // Convert to lower because instance.ID is in different in different API calls (e.g. GET and LIST). - name := "azure://" + strings.ToLower(*vms[i].ID) + // To keep consistent with providerID from kubernetes cloud provider, do not convert ID to lower case. + name := "azure://" + *vms[i].ID result = append(result, name) } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_util.go b/cluster-autoscaler/cloudprovider/azure/azure_util.go index 963395c890..b4a6a6af1f 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_util.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_util.go @@ -409,3 +409,14 @@ func validateConfig(cfg *Config) error { return nil } + +// getLastSegment gets the last segment splited by '/'. +func getLastSegment(ID string) (string, error) { + parts := strings.Split(strings.TrimSpace(ID), "/") + name := parts[len(parts)-1] + if len(name) == 0 { + return "", fmt.Errorf("identifier '/' not found in resource name %q", ID) + } + + return name, nil +} From 18847169e7b8a237bc26c307d440dddd169a36d8 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Tue, 16 Jan 2018 14:04:30 +0800 Subject: [PATCH 2/3] Reduce API calls for Azure VMSS --- .../cloudprovider/azure/azure_cache.go | 46 +++++++++------ .../azure/azure_cloud_provider_test.go | 2 +- .../cloudprovider/azure/azure_manager.go | 40 +++++++------ .../cloudprovider/azure/azure_scale_set.go | 57 ++++++++++++++----- 4 files changed, 97 insertions(+), 48 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_cache.go b/cluster-autoscaler/cloudprovider/azure/azure_cache.go index 83154ff07a..f6bfe54864 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_cache.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_cache.go @@ -19,6 +19,7 @@ package azure import ( "fmt" "reflect" + "regexp" "strings" "sync" "time" @@ -28,16 +29,11 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" ) -// Asg is a wrapper over NodeGroup interface. -type Asg interface { - cloudprovider.NodeGroup - - getAzureRef() azureRef -} +var virtualMachineRE = regexp.MustCompile(`^azure://(?:.*)/providers/microsoft.compute/virtualmachines/(.+)$`) type asgCache struct { - registeredAsgs []Asg - instanceToAsg map[azureRef]Asg + registeredAsgs []cloudprovider.NodeGroup + instanceToAsg map[azureRef]cloudprovider.NodeGroup notInRegisteredAsg map[azureRef]bool mutex sync.Mutex interrupt chan struct{} @@ -45,8 +41,8 @@ type asgCache struct { func newAsgCache() (*asgCache, error) { cache := &asgCache{ - registeredAsgs: make([]Asg, 0), - instanceToAsg: make(map[azureRef]Asg), + registeredAsgs: make([]cloudprovider.NodeGroup, 0), + instanceToAsg: make(map[azureRef]cloudprovider.NodeGroup), notInRegisteredAsg: make(map[azureRef]bool), interrupt: make(chan struct{}), } @@ -63,7 +59,7 @@ func newAsgCache() (*asgCache, error) { } // Register registers a node group if it hasn't been registered. -func (m *asgCache) Register(asg Asg) bool { +func (m *asgCache) Register(asg cloudprovider.NodeGroup) bool { m.mutex.Lock() defer m.mutex.Unlock() @@ -92,11 +88,11 @@ func (m *asgCache) invalidateUnownedInstanceCache() { } // Unregister ASG. Returns true if the ASG was unregistered. -func (m *asgCache) Unregister(asg Asg) bool { +func (m *asgCache) Unregister(asg cloudprovider.NodeGroup) bool { m.mutex.Lock() defer m.mutex.Unlock() - updated := make([]Asg, 0, len(m.registeredAsgs)) + updated := make([]cloudprovider.NodeGroup, 0, len(m.registeredAsgs)) changed := false for _, existing := range m.registeredAsgs { if existing.Id() == asg.Id() { @@ -110,7 +106,7 @@ func (m *asgCache) Unregister(asg Asg) bool { return changed } -func (m *asgCache) get() []Asg { +func (m *asgCache) get() []cloudprovider.NodeGroup { m.mutex.Lock() defer m.mutex.Unlock() @@ -118,7 +114,7 @@ func (m *asgCache) get() []Asg { } // FindForInstance returns Asg of the given Instance -func (m *asgCache) FindForInstance(instance *azureRef) (Asg, error) { +func (m *asgCache) FindForInstance(instance *azureRef, vmType string) (cloudprovider.NodeGroup, error) { m.mutex.Lock() defer m.mutex.Unlock() @@ -128,6 +124,24 @@ func (m *asgCache) FindForInstance(instance *azureRef) (Asg, error) { return nil, nil } + if vmType == vmTypeVMSS { + // Omit virtual machines not managed by vmss. + if ok := virtualMachineRE.Match([]byte(instance.Name)); ok { + glog.V(3).Infof("Instance %q is not managed by vmss, omit it in autoscaler", instance.Name) + m.notInRegisteredAsg[*instance] = true + return nil, nil + } + } + + if vmType == vmTypeStandard { + // Omit virtual machines with providerID not in Azure resource ID format. + if ok := virtualMachineRE.Match([]byte(instance.Name)); ok { + glog.V(3).Infof("Instance %q is not in Azure resource ID format, omit it in autoscaler", instance.Name) + m.notInRegisteredAsg[*instance] = true + return nil, nil + } + } + if asg, found := m.instanceToAsg[*instance]; found { return asg, nil } @@ -149,7 +163,7 @@ func (m *asgCache) Cleanup() { } func (m *asgCache) regenerate() error { - newCache := make(map[azureRef]Asg) + newCache := make(map[azureRef]cloudprovider.NodeGroup) for _, nsg := range m.registeredAsgs { instances, err := nsg.Nodes() diff --git a/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go index b0f3a18225..9c615bdcdf 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go @@ -31,7 +31,7 @@ import ( func newTestAzureManager(t *testing.T) *AzureManager { manager := &AzureManager{ env: azure.PublicCloud, - explicitlyConfigured: make(map[azureRef]bool), + explicitlyConfigured: make(map[string]bool), config: &Config{ ResourceGroup: "test", VMType: vmTypeVMSS, diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager.go b/cluster-autoscaler/cloudprovider/azure/azure_manager.go index 80befc9428..2afe6191ca 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager.go @@ -49,7 +49,7 @@ type AzureManager struct { asgCache *asgCache lastRefresh time.Time asgAutoDiscoverySpecs []cloudprovider.LabelAutoDiscoveryConfig - explicitlyConfigured map[azureRef]bool + explicitlyConfigured map[string]bool } // Config holds the configuration parsed from the --cloud-config flag @@ -161,7 +161,7 @@ func CreateAzureManager(configReader io.Reader, discoveryOpts cloudprovider.Node config: &cfg, env: env, azClient: azClient, - explicitlyConfigured: make(map[azureRef]bool), + explicitlyConfigured: make(map[string]bool), } cache, err := newAsgCache() @@ -197,7 +197,7 @@ func (m *AzureManager) fetchExplicitAsgs(specs []string) error { if m.RegisterAsg(asg) { changed = true } - m.explicitlyConfigured[asg.getAzureRef()] = true + m.explicitlyConfigured[asg.Id()] = true } if changed { @@ -208,7 +208,7 @@ func (m *AzureManager) fetchExplicitAsgs(specs []string) error { return nil } -func (m *AzureManager) buildAsgFromSpec(spec string) (Asg, error) { +func (m *AzureManager) buildAsgFromSpec(spec string) (cloudprovider.NodeGroup, error) { s, err := dynamic.SpecFromString(spec, scaleToZeroSupported) if err != nil { return nil, fmt.Errorf("failed to parse node group spec: %v", err) @@ -252,11 +252,11 @@ func (m *AzureManager) fetchAutoAsgs() error { } changed := false - exists := make(map[azureRef]bool) + exists := make(map[string]bool) for _, asg := range groups { - azRef := asg.getAzureRef() - exists[azRef] = true - if m.explicitlyConfigured[azRef] { + asgID := asg.Id() + exists[asgID] = true + if m.explicitlyConfigured[asgID] { // This ASG was explicitly configured, but would also be // autodiscovered. We want the explicitly configured min and max // nodes to take precedence. @@ -270,8 +270,8 @@ func (m *AzureManager) fetchAutoAsgs() error { } for _, asg := range m.getAsgs() { - azRef := asg.getAzureRef() - if !exists[azRef] && !m.explicitlyConfigured[azRef] { + asgID := asg.Id() + if !exists[asgID] && !m.explicitlyConfigured[asgID] { m.UnregisterAsg(asg) changed = true } @@ -286,23 +286,23 @@ func (m *AzureManager) fetchAutoAsgs() error { return nil } -func (m *AzureManager) getAsgs() []Asg { +func (m *AzureManager) getAsgs() []cloudprovider.NodeGroup { return m.asgCache.get() } // RegisterAsg registers an ASG. -func (m *AzureManager) RegisterAsg(asg Asg) bool { +func (m *AzureManager) RegisterAsg(asg cloudprovider.NodeGroup) bool { return m.asgCache.Register(asg) } // UnregisterAsg unregisters an ASG. -func (m *AzureManager) UnregisterAsg(asg Asg) bool { +func (m *AzureManager) UnregisterAsg(asg cloudprovider.NodeGroup) bool { return m.asgCache.Unregister(asg) } // GetAsgForInstance returns AsgConfig of the given Instance -func (m *AzureManager) GetAsgForInstance(instance *azureRef) (Asg, error) { - return m.asgCache.FindForInstance(instance) +func (m *AzureManager) GetAsgForInstance(instance *azureRef) (cloudprovider.NodeGroup, error) { + return m.asgCache.FindForInstance(instance, m.config.VMType) } func (m *AzureManager) regenerateCache() error { @@ -316,7 +316,11 @@ func (m *AzureManager) Cleanup() { m.asgCache.Cleanup() } -func (m *AzureManager) getFilteredAutoscalingGroups(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []Asg, err error) { +func (m *AzureManager) getFilteredAutoscalingGroups(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []cloudprovider.NodeGroup, err error) { + if len(filter) == 0 { + return nil, nil + } + switch m.config.VMType { case vmTypeVMSS: asgs, err = m.listScaleSets(filter) @@ -333,7 +337,7 @@ func (m *AzureManager) getFilteredAutoscalingGroups(filter []cloudprovider.Label } // listScaleSets gets a list of scale sets and instanceIDs. -func (m *AzureManager) listScaleSets(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []Asg, err error) { +func (m *AzureManager) listScaleSets(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []cloudprovider.NodeGroup, err error) { result, err := m.azClient.virtualMachineScaleSetsClient.List(m.config.ResourceGroup) if err != nil { glog.Errorf("VirtualMachineScaleSetsClient.List for %v failed: %v", m.config.ResourceGroup, err) @@ -381,7 +385,7 @@ func (m *AzureManager) listScaleSets(filter []cloudprovider.LabelAutoDiscoveryCo // listAgentPools gets a list of agent pools and instanceIDs. // Note: filter won't take effect for agent pools. -func (m *AzureManager) listAgentPools(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []Asg, err error) { +func (m *AzureManager) listAgentPools(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []cloudprovider.NodeGroup, err error) { deploy, err := m.azClient.deploymentsClient.Get(m.config.ResourceGroup, m.config.Deployment) if err != nil { glog.Errorf("deploymentsClient.Get(%s, %s) failed: %v", m.config.ResourceGroup, m.config.Deployment, err) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index 6d0f31eb1b..2d5cb8370b 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -19,6 +19,8 @@ package azure import ( "fmt" "strings" + "sync" + "time" "github.com/Azure/azure-sdk-for-go/arm/compute" "github.com/golang/glog" @@ -36,6 +38,10 @@ type ScaleSet struct { minSize int maxSize int + + mutex sync.Mutex + lastRefresh time.Time + curSize int64 } // NewScaleSet creates a new NewScaleSet. @@ -47,6 +53,7 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager) (*ScaleSet, erro minSize: spec.MinSize, maxSize: spec.MaxSize, manager: az, + curSize: -1, } return scaleSet, nil @@ -84,8 +91,14 @@ func (scaleSet *ScaleSet) MaxSize() int { return scaleSet.maxSize } -// GetScaleSetSize gets Scale Set size. -func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) { +func (scaleSet *ScaleSet) getCurSize() (int64, error) { + scaleSet.mutex.Lock() + defer scaleSet.mutex.Unlock() + + if scaleSet.lastRefresh.Add(15 * time.Second).After(time.Now()) { + return scaleSet.curSize, nil + } + glog.V(5).Infof("Get scale set size for %q", scaleSet.Name) resourceGroup := scaleSet.manager.config.ResourceGroup set, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(resourceGroup, scaleSet.Name) @@ -93,11 +106,22 @@ func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) { return -1, err } glog.V(5).Infof("Returning scale set (%q) capacity: %d\n", scaleSet.Name, *set.Sku.Capacity) - return *set.Sku.Capacity, nil + + scaleSet.curSize = *set.Sku.Capacity + scaleSet.lastRefresh = time.Now() + return scaleSet.curSize, nil +} + +// GetScaleSetSize gets Scale Set size. +func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) { + return scaleSet.getCurSize() } // SetScaleSetSize sets ScaleSet size. func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { + scaleSet.mutex.Lock() + defer scaleSet.mutex.Unlock() + resourceGroup := scaleSet.manager.config.ResourceGroup op, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(resourceGroup, scaleSet.Name) if err != nil { @@ -107,9 +131,16 @@ func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { op.Sku.Capacity = &size op.VirtualMachineScaleSetProperties.ProvisioningState = nil cancel := make(chan struct{}) - _, errChan := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdate(resourceGroup, scaleSet.Name, op, cancel) - return <-errChan + err = <-errChan + if err != nil { + glog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, err) + return err + } + + scaleSet.curSize = size + scaleSet.lastRefresh = time.Now() + return nil } // TargetSize returns the current TARGET size of the node group. It is possible that the @@ -217,6 +248,8 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error { return nil } + glog.V(3).Infof("Deleting vmss instances %q", instances) + commonAsg, err := scaleSet.manager.GetAsgForInstance(instances[0]) if err != nil { return err @@ -230,10 +263,10 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error { } if asg != commonAsg { - return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.GetKey(), commonAsg) + return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg) } - instanceID, err := getLastSegment(instance.GetKey()) + instanceID, err := getLastSegment(instance.Name) if err != nil { glog.Errorf("getLastSegment failed with error: %v", err) return err @@ -253,7 +286,7 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error { // DeleteNodes deletes the nodes from the group. func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { - glog.V(8).Infof("Delete nodes requested: %v\n", nodes) + glog.V(8).Infof("Delete nodes requested: %q\n", nodes) size, err := scaleSet.GetScaleSetSize() if err != nil { return err @@ -307,6 +340,9 @@ func (scaleSet *ScaleSet) Nodes() ([]string, error) { result := make([]string, len(vms)) for i := range vms { + if len(*vms[i].ID) == 0 { + continue + } // To keep consistent with providerID from kubernetes cloud provider, do not convert ID to lower case. name := "azure://" + *vms[i].ID result = append(result, name) @@ -332,8 +368,3 @@ func (scaleSet *ScaleSet) getInstanceIDs() (map[azureRef]string, error) { return result, nil } - -// GetAzureRef gets AzureRef fot the scale set. -func (scaleSet *ScaleSet) getAzureRef() azureRef { - return scaleSet.azureRef -} From 18827c1571b6937596ca517c837f57897922724c Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Tue, 16 Jan 2018 16:53:05 +0800 Subject: [PATCH 3/3] Reduce API calls for Azure VMAS --- .../cloudprovider/azure/azure_agent_pool.go | 61 +++++++++++++------ .../cloudprovider/azure/azure_cache.go | 2 +- .../cloudprovider/azure/azure_util.go | 8 +++ 3 files changed, 51 insertions(+), 20 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go b/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go index eb91f19599..6e37865267 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go @@ -47,9 +47,9 @@ type AgentPool struct { template map[string]interface{} parameters map[string]interface{} - mutex sync.Mutex - targetSize int - provisioning bool + mutex sync.Mutex + lastRefresh time.Time + curSize int64 } // NewAgentPool creates a new AgentPool. @@ -58,10 +58,9 @@ func NewAgentPool(spec *dynamic.NodeGroupSpec, az *AzureManager) (*AgentPool, er azureRef: azureRef{ Name: spec.Name, }, - minSize: spec.MinSize, - maxSize: spec.MaxSize, - targetSize: -1, - manager: az, + minSize: spec.MinSize, + maxSize: spec.MaxSize, + manager: az, } if err := as.initialize(); err != nil { @@ -167,23 +166,35 @@ func (as *AgentPool) GetVMIndexes() ([]int, map[int]string, error) { return sortedIndexes, indexToVM, nil } -// TargetSize returns the current TARGET size of the node group. It is possible that the -// number is different from the number of nodes registered in Kubernetes. -func (as *AgentPool) TargetSize() (int, error) { +func (as *AgentPool) getCurSize() (int64, error) { as.mutex.Lock() defer as.mutex.Unlock() - if as.targetSize != -1 { - return as.targetSize, nil + if as.lastRefresh.Add(15 * time.Second).After(time.Now()) { + return as.curSize, nil } + glog.V(5).Infof("Get agent pool size for %q", as.Name) indexes, _, err := as.GetVMIndexes() if err != nil { return 0, err } + glog.V(5).Infof("Returning agent pool (%q) size: %d\n", as.Name, len(indexes)) - as.targetSize = len(indexes) - return as.targetSize, nil + as.curSize = int64(len(indexes)) + as.lastRefresh = time.Now() + return as.curSize, nil +} + +// TargetSize returns the current TARGET size of the node group. It is possible that the +// number is different from the number of nodes registered in Kubernetes. +func (as *AgentPool) TargetSize() (int, error) { + size, err := as.getCurSize() + if err != nil { + return -1, err + } + + return int(size), nil } // IncreaseSize increases agent pool size @@ -206,8 +217,8 @@ func (as *AgentPool) IncreaseSize(delta int) error { } highestUsedIndex := indexes[len(indexes)-1] - countForTemplate := curSize + delta - as.targetSize = countForTemplate + expectedSize := curSize + delta + countForTemplate := expectedSize if highestUsedIndex != 0 { countForTemplate += highestUsedIndex + 1 - curSize } @@ -225,7 +236,14 @@ func (as *AgentPool) IncreaseSize(delta int) error { } _, errChan := as.manager.azClient.deploymentsClient.CreateOrUpdate(as.manager.config.ResourceGroup, newDeploymentName, newDeployment, cancel) glog.V(3).Infof("Waiting for deploymentsClient.CreateOrUpdate(%s, %s, %s)", as.manager.config.ResourceGroup, newDeploymentName, newDeployment) - return <-errChan + err = <-errChan + if err != nil { + return err + } + + as.curSize = int64(expectedSize) + as.lastRefresh = time.Now() + return err } // GetVirtualMachines returns list of nodes for the given agent pool. @@ -280,13 +298,14 @@ func (as *AgentPool) DecreaseTargetSize(delta int) error { return err } - curTargetSize := as.targetSize + curTargetSize := int(as.curSize) if curTargetSize+delta < len(nodes) { return fmt.Errorf("attempt to delete existing nodes targetSize:%d delta:%d existingNodes: %d", curTargetSize, delta, len(nodes)) } - as.targetSize = curTargetSize + delta + as.curSize = int64(curTargetSize + delta) + as.lastRefresh = time.Now() return nil } @@ -406,6 +425,10 @@ func (as *AgentPool) Nodes() ([]string, error) { nodes := make([]string, 0, len(instances)) for _, instance := range instances { + if len(*instance.ID) == 0 { + continue + } + // To keep consistent with providerID from kubernetes cloud provider, do not convert ID to lower case. name := "azure://" + *instance.ID nodes = append(nodes, name) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_cache.go b/cluster-autoscaler/cloudprovider/azure/azure_cache.go index f6bfe54864..8353c95f6e 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_cache.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_cache.go @@ -135,7 +135,7 @@ func (m *asgCache) FindForInstance(instance *azureRef, vmType string) (cloudprov if vmType == vmTypeStandard { // Omit virtual machines with providerID not in Azure resource ID format. - if ok := virtualMachineRE.Match([]byte(instance.Name)); ok { + if ok := virtualMachineRE.Match([]byte(instance.Name)); !ok { glog.V(3).Infof("Instance %q is not in Azure resource ID format, omit it in autoscaler", instance.Name) m.notInRegisteredAsg[*instance] = true return nil, nil diff --git a/cluster-autoscaler/cloudprovider/azure/azure_util.go b/cluster-autoscaler/cloudprovider/azure/azure_util.go index b4a6a6af1f..38827a834f 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_util.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_util.go @@ -350,7 +350,15 @@ func GetVMNameIndex(osType compute.OperatingSystemTypes, vmName string) (int, er } func matchDiscoveryConfig(labels map[string]*string, configs []cloudprovider.LabelAutoDiscoveryConfig) bool { + if len(configs) == 0 { + return false + } + for _, c := range configs { + if len(c.Selector) == 0 { + return false + } + for k, v := range c.Selector { value, ok := labels[k] if !ok {