420 lines
13 KiB
Go
420 lines
13 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
//go:generate go run azure_instance_types/gen.go
|
|
|
|
package azure
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/Azure/go-autorest/autorest/azure"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
|
"k8s.io/autoscaler/cluster-autoscaler/config"
|
|
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
|
|
kretry "k8s.io/client-go/util/retry"
|
|
klog "k8s.io/klog/v2"
|
|
providerazureconsts "sigs.k8s.io/cloud-provider-azure/pkg/consts"
|
|
"sigs.k8s.io/cloud-provider-azure/pkg/retry"
|
|
)
|
|
|
|
const (
|
|
azurePrefix = "azure://"
|
|
|
|
scaleToZeroSupportedStandard = false
|
|
scaleToZeroSupportedVMSS = true
|
|
refreshInterval = 1 * time.Minute
|
|
)
|
|
|
|
// AzureManager handles Azure communication and data caching.
|
|
type AzureManager struct {
|
|
config *Config
|
|
azClient *azClient
|
|
env azure.Environment
|
|
|
|
// azureCache is used for caching Azure resources.
|
|
// It keeps track of nodegroups and instances
|
|
// (and of which nodegroup instances belong to)
|
|
azureCache *azureCache
|
|
// lastRefresh is the time azureCache was last refreshed.
|
|
// Together with azureCache.refreshInterval is it used to decide whether
|
|
// it is time to refresh the cache from Azure resources.
|
|
//
|
|
// Cache invalidation can also be requested via invalidateCache()
|
|
// (used by both AzureManager and ScaleSet), which manipulates
|
|
// lastRefresh to force refresh on the next check.
|
|
lastRefresh time.Time
|
|
|
|
autoDiscoverySpecs []labelAutoDiscoveryConfig
|
|
explicitlyConfigured map[string]bool
|
|
}
|
|
|
|
// createAzureManagerInternal allows for a custom azClient to be passed in by tests.
|
|
func createAzureManagerInternal(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, azClient *azClient) (*AzureManager, error) {
|
|
cfg, err := BuildAzureConfig(configReader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Defaulting env to Azure Public Cloud.
|
|
env := azure.PublicCloud
|
|
if cfg.Cloud != "" {
|
|
env, err = azure.EnvironmentFromName(cfg.Cloud)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
klog.Infof("Starting azure manager with subscription ID %q", cfg.SubscriptionID)
|
|
|
|
if azClient == nil {
|
|
azClient, err = newAzClient(cfg, &env)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Create azure manager.
|
|
manager := &AzureManager{
|
|
config: cfg,
|
|
env: env,
|
|
azClient: azClient,
|
|
explicitlyConfigured: make(map[string]bool),
|
|
}
|
|
|
|
cacheTTL := refreshInterval
|
|
if cfg.VmssCacheTTLInSeconds != 0 {
|
|
cacheTTL = time.Duration(cfg.VmssCacheTTLInSeconds) * time.Second
|
|
}
|
|
cache, err := newAzureCache(azClient, cacheTTL, *cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
manager.azureCache = cache
|
|
|
|
if !manager.azureCache.HasVMSKUs() {
|
|
klog.Warning("No VM SKU info loaded, using only static SKU list")
|
|
cfg.EnableDynamicInstanceList = false
|
|
}
|
|
|
|
specs, err := ParseLabelAutoDiscoverySpecs(discoveryOpts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
manager.autoDiscoverySpecs = specs
|
|
|
|
if err := manager.fetchExplicitNodeGroups(discoveryOpts.NodeGroupSpecs); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
retryBackoff := wait.Backoff{
|
|
Duration: 2 * time.Minute,
|
|
Factor: 1.0,
|
|
Jitter: 0.1,
|
|
Steps: 6,
|
|
Cap: 10 * time.Minute,
|
|
}
|
|
|
|
// skuCache will already be created at this step by newAzureCache()
|
|
err = kretry.OnError(retryBackoff, retry.IsErrorRetriable, func() (err error) {
|
|
return manager.forceRefresh()
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return manager, nil
|
|
}
|
|
|
|
// CreateAzureManager creates Azure Manager object to work with Azure.
|
|
func CreateAzureManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*AzureManager, error) {
|
|
return createAzureManagerInternal(configReader, discoveryOpts, nil)
|
|
}
|
|
|
|
func (m *AzureManager) fetchExplicitNodeGroups(specs []string) error {
|
|
changed := false
|
|
for _, spec := range specs {
|
|
nodeGroup, err := m.buildNodeGroupFromSpec(spec)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse node group spec: %v", err)
|
|
}
|
|
if m.RegisterNodeGroup(nodeGroup) {
|
|
changed = true
|
|
}
|
|
m.explicitlyConfigured[nodeGroup.Id()] = true
|
|
}
|
|
|
|
if changed {
|
|
m.invalidateCache()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// parseSKUAndVMsAgentpoolNameFromSpecName parses the spec name for a mixed-SKU VMs pool.
|
|
// The spec name should be in the format <agentpoolname>/<sku>, e.g., "mypool1/Standard_D2s_v3", if the agent pool is a VMs pool.
|
|
// This method returns a boolean indicating if the agent pool is a VMs pool, along with the agent pool name and SKU.
|
|
func (m *AzureManager) parseSKUAndVMsAgentpoolNameFromSpecName(name string) (bool, string, string) {
|
|
parts := strings.Split(name, "/")
|
|
if len(parts) == 2 {
|
|
agentPoolName := parts[0]
|
|
sku := parts[1]
|
|
|
|
vmsPoolMap := m.azureCache.getVMsPoolMap()
|
|
if _, ok := vmsPoolMap[agentPoolName]; ok {
|
|
return true, agentPoolName, sku
|
|
}
|
|
}
|
|
return false, "", ""
|
|
}
|
|
|
|
func (m *AzureManager) buildNodeGroupFromSpec(spec string) (cloudprovider.NodeGroup, error) {
|
|
scaleToZeroSupported := scaleToZeroSupportedStandard
|
|
if strings.EqualFold(m.config.VMType, providerazureconsts.VMTypeVMSS) {
|
|
scaleToZeroSupported = scaleToZeroSupportedVMSS
|
|
}
|
|
s, err := dynamic.SpecFromString(spec, scaleToZeroSupported)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse node group spec: %v", err)
|
|
}
|
|
|
|
// Starting from release 1.30, a cluster may have both VMSS and VMs pools.
|
|
// Therefore, we cannot solely rely on the VMType to determine the node group type.
|
|
// Instead, we need to check the cache to determine if the agent pool is a VMs pool.
|
|
isVMsPool, agentPoolName, sku := m.parseSKUAndVMsAgentpoolNameFromSpecName(s.Name)
|
|
if isVMsPool {
|
|
return NewVMPool(s, m, agentPoolName, sku)
|
|
}
|
|
|
|
switch m.config.VMType {
|
|
case providerazureconsts.VMTypeStandard:
|
|
return NewAgentPool(s, m)
|
|
case providerazureconsts.VMTypeVMSS:
|
|
return NewScaleSet(s, m, -1, false)
|
|
default:
|
|
return nil, fmt.Errorf("vmtype %s not supported", m.config.VMType)
|
|
}
|
|
}
|
|
|
|
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
|
|
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
|
|
func (m *AzureManager) Refresh() error {
|
|
if m.lastRefresh.Add(m.azureCache.refreshInterval).After(time.Now()) {
|
|
return nil
|
|
}
|
|
return m.forceRefresh()
|
|
}
|
|
|
|
func (m *AzureManager) forceRefresh() error {
|
|
if err := m.fetchAutoNodeGroups(); err != nil {
|
|
klog.Errorf("Failed to fetch autodiscovered nodegroups: %v", err)
|
|
}
|
|
if err := m.azureCache.regenerate(); err != nil {
|
|
klog.Errorf("Failed to regenerate Azure cache: %v", err)
|
|
return err
|
|
}
|
|
m.lastRefresh = time.Now()
|
|
klog.V(2).Infof("Refreshed Azure VM and VMSS list, next refresh after %v", m.lastRefresh.Add(m.azureCache.refreshInterval))
|
|
return nil
|
|
}
|
|
|
|
// invalidateCache forces cache reload on the next check
|
|
// by manipulating lastRefresh timestamp
|
|
func (m *AzureManager) invalidateCache() {
|
|
m.lastRefresh = time.Now().Add(-1 * m.azureCache.refreshInterval)
|
|
klog.V(2).Infof("Invalidated Azure cache")
|
|
}
|
|
|
|
// Fetch automatically discovered NodeGroups. These NodeGroups should be unregistered if
|
|
// they no longer exist in Azure.
|
|
func (m *AzureManager) fetchAutoNodeGroups() error {
|
|
groups, err := m.getFilteredNodeGroups(m.autoDiscoverySpecs)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot autodiscover NodeGroups: %s", err)
|
|
}
|
|
|
|
changed := false
|
|
exists := make(map[string]bool)
|
|
for _, group := range groups {
|
|
id := group.Id()
|
|
exists[id] = true
|
|
if m.explicitlyConfigured[id] {
|
|
// This NodeGroup was explicitly configured, but would also be
|
|
// autodiscovered. We want the explicitly configured min and max
|
|
// nodes to take precedence.
|
|
klog.V(3).Infof("Ignoring explicitly configured NodeGroup %s for autodiscovery.", group.Id())
|
|
continue
|
|
}
|
|
if m.RegisterNodeGroup(group) {
|
|
klog.V(3).Infof("Autodiscovered NodeGroup %s using tags %v", group.Id(), m.autoDiscoverySpecs)
|
|
changed = true
|
|
}
|
|
}
|
|
|
|
for _, nodeGroup := range m.getNodeGroups() {
|
|
nodeGroupID := nodeGroup.Id()
|
|
if !exists[nodeGroupID] && !m.explicitlyConfigured[nodeGroupID] {
|
|
m.UnregisterNodeGroup(nodeGroup)
|
|
changed = true
|
|
}
|
|
}
|
|
|
|
if changed {
|
|
m.invalidateCache()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *AzureManager) getNodeGroups() []cloudprovider.NodeGroup {
|
|
return m.azureCache.getRegisteredNodeGroups()
|
|
}
|
|
|
|
// RegisterNodeGroup registers an a NodeGroup.
|
|
func (m *AzureManager) RegisterNodeGroup(nodeGroup cloudprovider.NodeGroup) bool {
|
|
return m.azureCache.Register(nodeGroup)
|
|
}
|
|
|
|
// UnregisterNodeGroup unregisters a NodeGroup.
|
|
func (m *AzureManager) UnregisterNodeGroup(nodeGroup cloudprovider.NodeGroup) bool {
|
|
return m.azureCache.Unregister(nodeGroup)
|
|
}
|
|
|
|
// GetNodeGroupForInstance returns the NodeGroup of the given Instance
|
|
func (m *AzureManager) GetNodeGroupForInstance(instance *azureRef) (cloudprovider.NodeGroup, error) {
|
|
return m.azureCache.FindForInstance(instance, m.config.VMType)
|
|
}
|
|
|
|
// GetScaleSetOptions parse options extracted from VMSS tags and merges them with provided defaults
|
|
func (m *AzureManager) GetScaleSetOptions(scaleSetName string, defaults config.NodeGroupAutoscalingOptions) *config.NodeGroupAutoscalingOptions {
|
|
options := m.azureCache.getAutoscalingOptions(azureRef{Name: scaleSetName})
|
|
if options == nil || len(options) == 0 {
|
|
return &defaults
|
|
}
|
|
|
|
if opt, ok := getFloat64Option(options, scaleSetName, config.DefaultScaleDownUtilizationThresholdKey); ok {
|
|
defaults.ScaleDownUtilizationThreshold = opt
|
|
}
|
|
if opt, ok := getFloat64Option(options, scaleSetName, config.DefaultScaleDownGpuUtilizationThresholdKey); ok {
|
|
defaults.ScaleDownGpuUtilizationThreshold = opt
|
|
}
|
|
if opt, ok := getDurationOption(options, scaleSetName, config.DefaultScaleDownUnneededTimeKey); ok {
|
|
defaults.ScaleDownUnneededTime = opt
|
|
}
|
|
if opt, ok := getDurationOption(options, scaleSetName, config.DefaultScaleDownUnreadyTimeKey); ok {
|
|
defaults.ScaleDownUnreadyTime = opt
|
|
}
|
|
|
|
return &defaults
|
|
}
|
|
|
|
// Cleanup the cache.
|
|
func (m *AzureManager) Cleanup() {
|
|
m.azureCache.Cleanup()
|
|
}
|
|
|
|
func (m *AzureManager) getFilteredNodeGroups(filter []labelAutoDiscoveryConfig) (nodeGroups []cloudprovider.NodeGroup, err error) {
|
|
if len(filter) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
if m.config.VMType == providerazureconsts.VMTypeVMSS {
|
|
return m.getFilteredScaleSets(filter)
|
|
}
|
|
|
|
return nil, fmt.Errorf("vmType %q does not support autodiscovery", m.config.VMType)
|
|
}
|
|
|
|
// getFilteredScaleSets gets a list of scale sets and instanceIDs.
|
|
func (m *AzureManager) getFilteredScaleSets(filter []labelAutoDiscoveryConfig) ([]cloudprovider.NodeGroup, error) {
|
|
vmssList := m.azureCache.getScaleSets()
|
|
|
|
var nodeGroups []cloudprovider.NodeGroup
|
|
for _, scaleSet := range vmssList {
|
|
var cfgSizes *autoDiscoveryConfigSizes
|
|
if len(filter) > 0 {
|
|
if scaleSet.Tags == nil || len(scaleSet.Tags) == 0 {
|
|
continue
|
|
}
|
|
|
|
if cfgSizes = matchDiscoveryConfig(scaleSet.Tags, filter); cfgSizes == nil {
|
|
continue
|
|
}
|
|
}
|
|
spec := &dynamic.NodeGroupSpec{
|
|
Name: *scaleSet.Name,
|
|
MinSize: 1,
|
|
MaxSize: -1,
|
|
SupportScaleToZero: scaleToZeroSupportedVMSS,
|
|
}
|
|
|
|
if val, ok := scaleSet.Tags["min"]; ok {
|
|
if minSize, err := strconv.Atoi(*val); err == nil {
|
|
spec.MinSize = minSize
|
|
} else {
|
|
klog.Warningf("ignoring vmss %q because of invalid minimum size specified for vmss: %s", *scaleSet.Name, err)
|
|
continue
|
|
}
|
|
} else if cfgSizes.Min >= 0 {
|
|
spec.MinSize = cfgSizes.Min
|
|
} else {
|
|
klog.Warningf("ignoring vmss %q because of no minimum size specified for vmss", *scaleSet.Name)
|
|
continue
|
|
}
|
|
if spec.MinSize < 0 {
|
|
klog.Warningf("ignoring vmss %q because of minimum size must be a non-negative number of nodes", *scaleSet.Name)
|
|
continue
|
|
}
|
|
if val, ok := scaleSet.Tags["max"]; ok {
|
|
if maxSize, err := strconv.Atoi(*val); err == nil {
|
|
spec.MaxSize = maxSize
|
|
} else {
|
|
klog.Warningf("ignoring vmss %q because of invalid maximum size specified for vmss: %s", *scaleSet.Name, err)
|
|
continue
|
|
}
|
|
} else if cfgSizes.Max >= 0 {
|
|
spec.MaxSize = cfgSizes.Max
|
|
} else {
|
|
klog.Warningf("ignoring vmss %q because of no maximum size specified for vmss", *scaleSet.Name)
|
|
continue
|
|
}
|
|
if spec.MaxSize < spec.MinSize {
|
|
klog.Warningf("ignoring vmss %q because of maximum size must be greater than or equal to minimum size: max=%d < min=%d", *scaleSet.Name, spec.MaxSize, spec.MinSize)
|
|
continue
|
|
}
|
|
|
|
curSize := int64(-1)
|
|
if scaleSet.Sku != nil && scaleSet.Sku.Capacity != nil {
|
|
curSize = *scaleSet.Sku.Capacity
|
|
}
|
|
|
|
dedicatedHost := scaleSet.VirtualMachineScaleSetProperties != nil && scaleSet.VirtualMachineScaleSetProperties.HostGroup != nil
|
|
|
|
vmss, err := NewScaleSet(spec, m, curSize, dedicatedHost)
|
|
if err != nil {
|
|
klog.Warningf("ignoring vmss %q %s", *scaleSet.Name, err)
|
|
continue
|
|
}
|
|
nodeGroups = append(nodeGroups, vmss)
|
|
}
|
|
|
|
return nodeGroups, nil
|
|
}
|