495 lines
16 KiB
Go
495 lines
16 KiB
Go
/*
|
|
Copyright 2018 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package azure
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"reflect"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute"
|
|
"github.com/Azure/go-autorest/autorest/to"
|
|
"github.com/Azure/skewer"
|
|
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
|
providerazureconsts "sigs.k8s.io/cloud-provider-azure/pkg/consts"
|
|
|
|
"k8s.io/klog/v2"
|
|
)
|
|
|
|
var (
|
|
virtualMachineRE = regexp.MustCompile(`^azure://(?:.*)/providers/Microsoft.Compute/virtualMachines/(.+)$`)
|
|
)
|
|
|
|
// azureCache is used for caching cluster resources state.
|
|
//
|
|
// It is needed to:
|
|
// - keep track of node groups (VM and VMSS types) in the cluster,
|
|
// - keep track of instances and which node group they belong to,
|
|
// (for VMSS it only keeps track of instanceid-to-nodegroup mapping)
|
|
// - limit repetitive Azure API calls.
|
|
//
|
|
// It backs efficient responds to
|
|
// - cloudprovider.NodeGroups() (= registeredNodeGroups)
|
|
// - cloudprovider.NodeGroupForNode (via azureManager.GetNodeGroupForInstance => FindForInstance,
|
|
// using instanceToNodeGroup and unownedInstances)
|
|
//
|
|
// CloudProvider.Refresh, called before every autoscaler loop (every 10s by defaul),
|
|
// is implemented by AzureManager.Refresh which makes the cache refresh decision,
|
|
// based on AzureManager.lastRefresh and azureCache.refreshInterval.
|
|
type azureCache struct {
|
|
mutex sync.Mutex
|
|
interrupt chan struct{}
|
|
azClient *azClient
|
|
|
|
// refreshInterval specifies how often azureCache needs to be refreshed.
|
|
// The value comes from AZURE_VMSS_CACHE_TTL env var (or 1min if not specified),
|
|
// and is also used by some other caches. Together with AzureManager.lastRefresh,
|
|
// it is uses to decide whether a refresh is needed.
|
|
refreshInterval time.Duration
|
|
|
|
// Cache content.
|
|
|
|
// resourceGroup specifies the name of the resource group that this cache tracks
|
|
resourceGroup string
|
|
|
|
// vmType can be one of vmTypeVMSS (default), vmTypeStandard
|
|
vmType string
|
|
|
|
vmsPoolSet map[string]struct{} // track the nodepools that're vms pool
|
|
|
|
// scaleSets keeps the set of all known scalesets in the resource group, populated/refreshed via VMSS.List() call.
|
|
// It is only used/populated if vmType is vmTypeVMSS (default).
|
|
scaleSets map[string]compute.VirtualMachineScaleSet
|
|
// virtualMachines keeps the set of all VMs in the resource group.
|
|
// It is only used/populated if vmType is vmTypeStandard.
|
|
virtualMachines map[string][]compute.VirtualMachine
|
|
|
|
// registeredNodeGroups represents all known NodeGroups.
|
|
registeredNodeGroups []cloudprovider.NodeGroup
|
|
|
|
// instanceToNodeGroup maintains a mapping from instance Ids to nodegroups.
|
|
// It is populated from the results of calling Nodes() on each nodegroup.
|
|
// It is used (together with unownedInstances) when looking up the nodegroup
|
|
// for a given instance id (see FindForInstance).
|
|
instanceToNodeGroup map[azureRef]cloudprovider.NodeGroup
|
|
|
|
// unownedInstance maintains a set of instance ids not belonging to any nodegroup.
|
|
// It is used (together with instanceToNodeGroup) when looking up the nodegroup for a given instance id.
|
|
// It is reset by invalidateUnownedInstanceCache().
|
|
unownedInstances map[azureRef]bool
|
|
|
|
autoscalingOptions map[azureRef]map[string]string
|
|
skus *skewer.Cache
|
|
}
|
|
|
|
func newAzureCache(client *azClient, cacheTTL time.Duration, config Config) (*azureCache, error) {
|
|
cache := &azureCache{
|
|
interrupt: make(chan struct{}),
|
|
azClient: client,
|
|
refreshInterval: cacheTTL,
|
|
resourceGroup: config.ResourceGroup,
|
|
vmType: config.VMType,
|
|
vmsPoolSet: make(map[string]struct{}),
|
|
scaleSets: make(map[string]compute.VirtualMachineScaleSet),
|
|
virtualMachines: make(map[string][]compute.VirtualMachine),
|
|
registeredNodeGroups: make([]cloudprovider.NodeGroup, 0),
|
|
instanceToNodeGroup: make(map[azureRef]cloudprovider.NodeGroup),
|
|
unownedInstances: make(map[azureRef]bool),
|
|
autoscalingOptions: make(map[azureRef]map[string]string),
|
|
skus: &skewer.Cache{}, // populated iff config.EnableDynamicInstanceList
|
|
}
|
|
|
|
if err := cache.regenerate(); err != nil {
|
|
klog.Errorf("Error while regenerating Azure cache: %v", err)
|
|
}
|
|
|
|
if config.EnableDynamicInstanceList {
|
|
if err := cache.fetchSKUCache(config.Location); err != nil {
|
|
klog.Errorf("Error while populating SKU list: %v", err)
|
|
}
|
|
}
|
|
|
|
return cache, nil
|
|
}
|
|
|
|
func (m *azureCache) getVMsPoolSet() map[string]struct{} {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
return m.vmsPoolSet
|
|
}
|
|
|
|
func (m *azureCache) getVirtualMachines() map[string][]compute.VirtualMachine {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
return m.virtualMachines
|
|
}
|
|
|
|
func (m *azureCache) getScaleSets() map[string]compute.VirtualMachineScaleSet {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
return m.scaleSets
|
|
}
|
|
|
|
// Cleanup closes the channel to signal the go routine to stop that is handling the cache
|
|
func (m *azureCache) Cleanup() {
|
|
close(m.interrupt)
|
|
}
|
|
|
|
func (m *azureCache) regenerate() error {
|
|
err := m.fetchAzureResources()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Regenerate instance to node groups mapping.
|
|
newInstanceToNodeGroupCache := make(map[azureRef]cloudprovider.NodeGroup)
|
|
for _, ng := range m.registeredNodeGroups {
|
|
klog.V(4).Infof("regenerate: finding nodes for node group %s", ng.Id())
|
|
instances, err := ng.Nodes()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
klog.V(4).Infof("regenerate: found %d nodes for node group %s: %+v", len(instances), ng.Id(), instances)
|
|
|
|
for _, instance := range instances {
|
|
ref := azureRef{Name: instance.Id}
|
|
newInstanceToNodeGroupCache[ref] = ng
|
|
}
|
|
}
|
|
|
|
// Regenerate VMSS to autoscaling options mapping.
|
|
newAutoscalingOptions := make(map[azureRef]map[string]string)
|
|
for _, vmss := range m.scaleSets {
|
|
ref := azureRef{Name: *vmss.Name}
|
|
options := extractAutoscalingOptionsFromScaleSetTags(vmss.Tags)
|
|
if !reflect.DeepEqual(m.getAutoscalingOptions(ref), options) {
|
|
klog.V(4).Infof("Extracted autoscaling options from %q ScaleSet tags: %v", *vmss.Name, options)
|
|
}
|
|
newAutoscalingOptions[ref] = options
|
|
}
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.instanceToNodeGroup = newInstanceToNodeGroupCache
|
|
m.autoscalingOptions = newAutoscalingOptions
|
|
|
|
// Reset unowned instances cache.
|
|
m.unownedInstances = make(map[azureRef]bool)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *azureCache) fetchSKUCache(location string) error {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
cache, err := m.fetchSKUs(context.Background(), location)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.skus = cache
|
|
return nil
|
|
}
|
|
|
|
// fetchAzureResources retrieves and updates the cached Azure resources.
|
|
//
|
|
// This function performs the following:
|
|
// - Fetches and updates the list of Virtual Machine Scale Sets (VMSS) in the specified resource group.
|
|
// - Fetches and updates the list of Virtual Machines (VMs) and identifies the node pools they belong to.
|
|
// - Maintains a set of VMs pools and VMSS resources which helps the Cluster Autoscaler (CAS) operate on mixed node pools.
|
|
//
|
|
// Returns an error if any of the Azure API calls fail.
|
|
func (m *azureCache) fetchAzureResources() error {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
// NOTE: this lists virtual machine scale sets, not virtual machine
|
|
// scale set instances
|
|
vmssResult, err := m.fetchScaleSets()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.scaleSets = vmssResult
|
|
vmResult, vmsPoolSet, err := m.fetchVirtualMachines()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// we fetch both sets of resources since CAS may operate on mixed nodepools
|
|
m.virtualMachines = vmResult
|
|
m.vmsPoolSet = vmsPoolSet
|
|
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
legacyAgentpoolNameTag = "poolName"
|
|
agentpoolNameTag = "aks-managed-poolName"
|
|
agentpoolTypeTag = "aks-managed-agentpool-type"
|
|
vmsPoolType = "VirtualMachines"
|
|
)
|
|
|
|
// fetchVirtualMachines returns the updated list of virtual machines in the config resource group using the Azure API.
|
|
func (m *azureCache) fetchVirtualMachines() (map[string][]compute.VirtualMachine, map[string]struct{}, error) {
|
|
ctx, cancel := getContextWithCancel()
|
|
defer cancel()
|
|
|
|
result, err := m.azClient.virtualMachinesClient.List(ctx, m.resourceGroup)
|
|
if err != nil {
|
|
klog.Errorf("VirtualMachinesClient.List in resource group %q failed: %v", m.resourceGroup, err)
|
|
return nil, nil, err.Error()
|
|
}
|
|
|
|
instances := make(map[string][]compute.VirtualMachine)
|
|
// track the nodepools that're vms pools
|
|
vmsPoolSet := make(map[string]struct{})
|
|
for _, instance := range result {
|
|
if instance.Tags == nil {
|
|
continue
|
|
}
|
|
|
|
tags := instance.Tags
|
|
vmPoolName := tags[agentpoolNameTag]
|
|
// fall back to legacy tag name if not found
|
|
if vmPoolName == nil {
|
|
vmPoolName = tags[legacyAgentpoolNameTag]
|
|
}
|
|
if vmPoolName == nil {
|
|
continue
|
|
}
|
|
|
|
instances[to.String(vmPoolName)] = append(instances[to.String(vmPoolName)], instance)
|
|
|
|
// if the nodepool is already in the map, skip it
|
|
if _, ok := vmsPoolSet[to.String(vmPoolName)]; ok {
|
|
continue
|
|
}
|
|
|
|
// nodes from vms pool will have tag "aks-managed-agentpool-type" set to "VirtualMachines"
|
|
if agentpoolType := tags[agentpoolTypeTag]; agentpoolType != nil {
|
|
if strings.EqualFold(to.String(agentpoolType), vmsPoolType) {
|
|
vmsPoolSet[to.String(vmPoolName)] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
return instances, vmsPoolSet, nil
|
|
}
|
|
|
|
// fetchScaleSets returns the updated list of scale sets in the config resource group using the Azure API.
|
|
func (m *azureCache) fetchScaleSets() (map[string]compute.VirtualMachineScaleSet, error) {
|
|
ctx, cancel := getContextWithTimeout(vmssContextTimeout)
|
|
defer cancel()
|
|
|
|
result, err := m.azClient.virtualMachineScaleSetsClient.List(ctx, m.resourceGroup)
|
|
if err != nil {
|
|
klog.Errorf("VirtualMachineScaleSetsClient.List in resource group %q failed: %v", m.resourceGroup, err)
|
|
return nil, err.Error()
|
|
}
|
|
|
|
sets := make(map[string]compute.VirtualMachineScaleSet)
|
|
for _, vmss := range result {
|
|
sets[*vmss.Name] = vmss
|
|
}
|
|
return sets, nil
|
|
}
|
|
|
|
// Register registers a node group if it hasn't been registered.
|
|
func (m *azureCache) Register(nodeGroup cloudprovider.NodeGroup) bool {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
for i := range m.registeredNodeGroups {
|
|
if existing := m.registeredNodeGroups[i]; strings.EqualFold(existing.Id(), nodeGroup.Id()) {
|
|
if existing.MinSize() == nodeGroup.MinSize() && existing.MaxSize() == nodeGroup.MaxSize() {
|
|
// Node group is already registered and min/max size haven't changed, no action required.
|
|
return false
|
|
}
|
|
m.registeredNodeGroups[i] = nodeGroup
|
|
klog.V(4).Infof("Node group %q updated", nodeGroup.Id())
|
|
m.invalidateUnownedInstanceCache()
|
|
return true
|
|
}
|
|
}
|
|
|
|
klog.V(4).Infof("Registering Node Group %q", nodeGroup.Id())
|
|
|
|
m.registeredNodeGroups = append(m.registeredNodeGroups, nodeGroup)
|
|
m.invalidateUnownedInstanceCache()
|
|
return true
|
|
}
|
|
|
|
func (m *azureCache) invalidateUnownedInstanceCache() {
|
|
klog.V(4).Info("Invalidating unowned instance cache")
|
|
m.unownedInstances = make(map[azureRef]bool)
|
|
}
|
|
|
|
// Unregister node group. Returns true if the node group was unregistered.
|
|
func (m *azureCache) Unregister(nodeGroup cloudprovider.NodeGroup) bool {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
updated := make([]cloudprovider.NodeGroup, 0, len(m.registeredNodeGroups))
|
|
changed := false
|
|
for _, existing := range m.registeredNodeGroups {
|
|
if strings.EqualFold(existing.Id(), nodeGroup.Id()) {
|
|
klog.V(1).Infof("Unregistered node group %s", nodeGroup.Id())
|
|
changed = true
|
|
continue
|
|
}
|
|
updated = append(updated, existing)
|
|
}
|
|
m.registeredNodeGroups = updated
|
|
return changed
|
|
}
|
|
|
|
func (m *azureCache) fetchSKUs(ctx context.Context, location string) (*skewer.Cache, error) {
|
|
if location == "" {
|
|
return nil, errors.New("location not specified")
|
|
}
|
|
|
|
return skewer.NewCache(ctx,
|
|
skewer.WithLocation(location),
|
|
skewer.WithResourceClient(m.azClient.skuClient),
|
|
)
|
|
}
|
|
|
|
// HasVMSKUS returns true if the cache has any VM SKUs. Can be used to judge success of loading.
|
|
func (m *azureCache) HasVMSKUs() bool {
|
|
// not nil or empty (using the only exposed semi-efficient way to check)
|
|
return !(m.skus == nil || m.skus.Equal(&skewer.Cache{}))
|
|
}
|
|
|
|
func (m *azureCache) GetSKU(ctx context.Context, skuName, location string) (skewer.SKU, error) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
cache := m.skus
|
|
return cache.Get(ctx, skuName, skewer.VirtualMachines, location)
|
|
}
|
|
|
|
func (m *azureCache) getRegisteredNodeGroups() []cloudprovider.NodeGroup {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
return m.registeredNodeGroups
|
|
}
|
|
|
|
func (m *azureCache) getAutoscalingOptions(ref azureRef) map[string]string {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
return m.autoscalingOptions[ref]
|
|
}
|
|
|
|
// HasInstance returns if a given instance exists in the azure cache
|
|
func (m *azureCache) HasInstance(providerID string) (bool, error) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
resourceID, err := convertResourceGroupNameToLower(providerID)
|
|
if err != nil {
|
|
// Most likely an invalid resource id, we should return an error
|
|
// most of these shouldn't make it here do to higher level
|
|
// validation in the HasInstance azure.cloudprovider function
|
|
return false, err
|
|
}
|
|
|
|
if m.getInstanceFromCache(resourceID) != nil {
|
|
return true, nil
|
|
}
|
|
// couldn't find instance in the cache, assume it's deleted
|
|
return false, cloudprovider.ErrNotImplemented
|
|
}
|
|
|
|
// FindForInstance returns node group of the given Instance
|
|
func (m *azureCache) FindForInstance(instance *azureRef, vmType string) (cloudprovider.NodeGroup, error) {
|
|
vmsPoolSet := m.getVMsPoolSet()
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
klog.V(4).Infof("FindForInstance: starts, ref: %s", instance.Name)
|
|
resourceID, err := convertResourceGroupNameToLower(instance.Name)
|
|
klog.V(4).Infof("FindForInstance: resourceID: %s", resourceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
inst := azureRef{Name: resourceID}
|
|
if m.unownedInstances[inst] {
|
|
// We already know we don't own this instance. Return early and avoid
|
|
// additional calls.
|
|
klog.V(4).Infof("FindForInstance: Couldn't find NodeGroup of instance %q", inst)
|
|
return nil, nil
|
|
}
|
|
|
|
// cluster with vmss pool only
|
|
if vmType == providerazureconsts.VMTypeVMSS && len(vmsPoolSet) == 0 {
|
|
if m.areAllScaleSetsUniform() {
|
|
// Omit virtual machines not managed by vmss only in case of uniform scale set.
|
|
if ok := virtualMachineRE.Match([]byte(inst.Name)); ok {
|
|
klog.V(3).Infof("Instance %q is not managed by vmss, omit it in autoscaler", instance.Name)
|
|
m.unownedInstances[inst] = true
|
|
return nil, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
if vmType == providerazureconsts.VMTypeStandard {
|
|
// Omit virtual machines with providerID not in Azure resource ID format.
|
|
if ok := virtualMachineRE.Match([]byte(inst.Name)); !ok {
|
|
klog.V(3).Infof("Instance %q is not in Azure resource ID format, omit it in autoscaler", instance.Name)
|
|
m.unownedInstances[inst] = true
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
// Look up caches for the instance.
|
|
klog.V(6).Infof("FindForInstance: attempting to retrieve instance %v from cache", m.instanceToNodeGroup)
|
|
if nodeGroup := m.getInstanceFromCache(inst.Name); nodeGroup != nil {
|
|
klog.V(4).Infof("FindForInstance: found node group %q in cache", nodeGroup.Id())
|
|
return nodeGroup, nil
|
|
}
|
|
klog.V(4).Infof("FindForInstance: Couldn't find node group of instance %q", inst)
|
|
return nil, nil
|
|
}
|
|
|
|
// isAllScaleSetsAreUniform determines if all the scale set autoscaler is monitoring are Uniform or not.
|
|
func (m *azureCache) areAllScaleSetsUniform() bool {
|
|
for _, scaleSet := range m.scaleSets {
|
|
if scaleSet.VirtualMachineScaleSetProperties.OrchestrationMode == compute.Flexible {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// getInstanceFromCache gets the node group from cache. Returns nil if not found.
|
|
// Should be called with lock.
|
|
func (m *azureCache) getInstanceFromCache(providerID string) cloudprovider.NodeGroup {
|
|
for instanceID, nodeGroup := range m.instanceToNodeGroup {
|
|
if strings.EqualFold(instanceID.GetKey(), providerID) {
|
|
return nodeGroup
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|