autoscaler/cluster-autoscaler/cloudprovider/azure/azure_manager.go

577 lines
17 KiB
Go

/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
//go:generate go run azure_instance_types/gen.go
package azure
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
"time"
"github.com/Azure/go-autorest/autorest/azure"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/klog"
providerazure "k8s.io/legacy-cloud-providers/azure"
)
const (
vmTypeVMSS = "vmss"
vmTypeStandard = "standard"
vmTypeAKS = "aks"
scaleToZeroSupportedStandard = false
scaleToZeroSupportedVMSS = true
refreshInterval = 1 * time.Minute
// The path of deployment parameters for standard vm.
deploymentParametersPath = "/var/lib/azure/azuredeploy.parameters.json"
vmssTagMin = "min"
vmssTagMax = "max"
autoDiscovererTypeLabel = "label"
labelAutoDiscovererKeyMinNodes = "min"
labelAutoDiscovererKeyMaxNodes = "max"
metadataURL = "http://169.254.169.254/metadata/instance"
)
var validLabelAutoDiscovererKeys = strings.Join([]string{
labelAutoDiscovererKeyMinNodes,
labelAutoDiscovererKeyMaxNodes,
}, ", ")
// A labelAutoDiscoveryConfig specifies how to autodiscover Azure scale sets.
type labelAutoDiscoveryConfig struct {
// Key-values to match on.
Selector map[string]string
}
// AzureManager handles Azure communication and data caching.
type AzureManager struct {
config *Config
azClient *azClient
env azure.Environment
asgCache *asgCache
lastRefresh time.Time
asgAutoDiscoverySpecs []labelAutoDiscoveryConfig
explicitlyConfigured map[string]bool
}
// Config holds the configuration parsed from the --cloud-config flag
type Config struct {
Cloud string `json:"cloud" yaml:"cloud"`
TenantID string `json:"tenantId" yaml:"tenantId"`
SubscriptionID string `json:"subscriptionId" yaml:"subscriptionId"`
ResourceGroup string `json:"resourceGroup" yaml:"resourceGroup"`
VMType string `json:"vmType" yaml:"vmType"`
AADClientID string `json:"aadClientId" yaml:"aadClientId"`
AADClientSecret string `json:"aadClientSecret" yaml:"aadClientSecret"`
AADClientCertPath string `json:"aadClientCertPath" yaml:"aadClientCertPath"`
AADClientCertPassword string `json:"aadClientCertPassword" yaml:"aadClientCertPassword"`
UseManagedIdentityExtension bool `json:"useManagedIdentityExtension" yaml:"useManagedIdentityExtension"`
UserAssignedIdentityID string `json:"userAssignedIdentityID" yaml:"userAssignedIdentityID"`
// Configs only for standard vmType (agent pools).
Deployment string `json:"deployment" yaml:"deployment"`
DeploymentParameters map[string]interface{} `json:"deploymentParameters" yaml:"deploymentParameters"`
//Configs only for AKS
ClusterName string `json:"clusterName" yaml:"clusterName"`
//Config only for AKS
NodeResourceGroup string `json:"nodeResourceGroup" yaml:"nodeResourceGroup"`
// ASG cache TTL in seconds
AsgCacheTTL int64 `json:"asgCacheTTL" yaml:"asgCacheTTL"`
// number of latest deployments that will not be deleted
MaxDeploymentsCount int64 `json:"maxDeploymentsCount" yaml:"maxDeploymentsCount"`
}
// TrimSpace removes all leading and trailing white spaces.
func (c *Config) TrimSpace() {
c.Cloud = strings.TrimSpace(c.Cloud)
c.TenantID = strings.TrimSpace(c.TenantID)
c.SubscriptionID = strings.TrimSpace(c.SubscriptionID)
c.ResourceGroup = strings.TrimSpace(c.ResourceGroup)
c.VMType = strings.TrimSpace(c.VMType)
c.AADClientID = strings.TrimSpace(c.AADClientID)
c.AADClientSecret = strings.TrimSpace(c.AADClientSecret)
c.AADClientCertPath = strings.TrimSpace(c.AADClientCertPath)
c.AADClientCertPassword = strings.TrimSpace(c.AADClientCertPassword)
c.Deployment = strings.TrimSpace(c.Deployment)
c.ClusterName = strings.TrimSpace(c.ClusterName)
c.NodeResourceGroup = strings.TrimSpace(c.NodeResourceGroup)
}
// CreateAzureManager creates Azure Manager object to work with Azure.
func CreateAzureManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*AzureManager, error) {
var err error
cfg := &Config{}
if configReader != nil {
body, err := ioutil.ReadAll(configReader)
if err != nil {
return nil, fmt.Errorf("failed to read config: %v", err)
}
err = json.Unmarshal(body, cfg)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal config body: %v", err)
}
} else {
cfg.Cloud = os.Getenv("ARM_CLOUD")
cfg.ResourceGroup = os.Getenv("ARM_RESOURCE_GROUP")
cfg.TenantID = os.Getenv("ARM_TENANT_ID")
cfg.AADClientID = os.Getenv("ARM_CLIENT_ID")
cfg.AADClientSecret = os.Getenv("ARM_CLIENT_SECRET")
cfg.VMType = strings.ToLower(os.Getenv("ARM_VM_TYPE"))
cfg.AADClientCertPath = os.Getenv("ARM_CLIENT_CERT_PATH")
cfg.AADClientCertPassword = os.Getenv("ARM_CLIENT_CERT_PASSWORD")
cfg.Deployment = os.Getenv("ARM_DEPLOYMENT")
cfg.ClusterName = os.Getenv("AZURE_CLUSTER_NAME")
cfg.NodeResourceGroup = os.Getenv("AZURE_NODE_RESOURCE_GROUP")
subscriptionID, err := getSubscriptionIdFromInstanceMetadata()
if err != nil {
return nil, err
}
cfg.SubscriptionID = subscriptionID
useManagedIdentityExtensionFromEnv := os.Getenv("ARM_USE_MANAGED_IDENTITY_EXTENSION")
if len(useManagedIdentityExtensionFromEnv) > 0 {
cfg.UseManagedIdentityExtension, err = strconv.ParseBool(useManagedIdentityExtensionFromEnv)
if err != nil {
return nil, err
}
}
userAssignedIdentityIDFromEnv := os.Getenv("ARM_USER_ASSIGNED_IDENTITY_ID")
if userAssignedIdentityIDFromEnv != "" {
cfg.UserAssignedIdentityID = userAssignedIdentityIDFromEnv
}
if asgCacheTTL := os.Getenv("AZURE_ASG_CACHE_TTL"); asgCacheTTL != "" {
cfg.AsgCacheTTL, err = strconv.ParseInt(asgCacheTTL, 10, 0)
if err != nil {
return nil, fmt.Errorf("failed to parse AZURE_ASG_CACHE_TTL %q: %v", asgCacheTTL, err)
}
}
if threshold := os.Getenv("AZURE_MAX_DEPLOYMENT_COUNT"); threshold != "" {
cfg.MaxDeploymentsCount, err = strconv.ParseInt(threshold, 10, 0)
if err != nil {
return nil, fmt.Errorf("failed to parse AZURE_MAX_DEPLOYMENT_COUNT %q: %v", threshold, err)
}
}
}
cfg.TrimSpace()
// Defaulting vmType to vmss.
if cfg.VMType == "" {
cfg.VMType = vmTypeVMSS
}
// Read parameters from deploymentParametersPath if it is not set.
if cfg.VMType == vmTypeStandard && len(cfg.DeploymentParameters) == 0 {
parameters, err := readDeploymentParameters(deploymentParametersPath)
if err != nil {
klog.Errorf("readDeploymentParameters failed with error: %v", err)
return nil, err
}
cfg.DeploymentParameters = parameters
}
if cfg.AsgCacheTTL == 0 {
cfg.AsgCacheTTL = int64(defaultAsgCacheTTL)
}
if cfg.MaxDeploymentsCount == 0 {
cfg.MaxDeploymentsCount = int64(defaultMaxDeploymentsCount)
}
// Defaulting env to Azure Public Cloud.
env := azure.PublicCloud
if cfg.Cloud != "" {
env, err = azure.EnvironmentFromName(cfg.Cloud)
if err != nil {
return nil, err
}
}
if err := validateConfig(cfg); err != nil {
return nil, err
}
klog.Infof("Starting azure manager with subscription ID %q", cfg.SubscriptionID)
azClient, err := newAzClient(cfg, &env)
if err != nil {
return nil, err
}
// Create azure manager.
manager := &AzureManager{
config: cfg,
env: env,
azClient: azClient,
explicitlyConfigured: make(map[string]bool),
}
cache, err := newAsgCache(cfg.AsgCacheTTL)
if err != nil {
return nil, err
}
manager.asgCache = cache
specs, err := parseLabelAutoDiscoverySpecs(discoveryOpts)
if err != nil {
return nil, err
}
manager.asgAutoDiscoverySpecs = specs
if err := manager.fetchExplicitAsgs(discoveryOpts.NodeGroupSpecs); err != nil {
return nil, err
}
if err := manager.forceRefresh(); err != nil {
return nil, err
}
return manager, nil
}
func (m *AzureManager) fetchExplicitAsgs(specs []string) error {
changed := false
for _, spec := range specs {
asg, err := m.buildAsgFromSpec(spec)
if err != nil {
return fmt.Errorf("failed to parse node group spec: %v", err)
}
if m.RegisterAsg(asg) {
changed = true
}
m.explicitlyConfigured[asg.Id()] = true
}
if changed {
if err := m.regenerateCache(); err != nil {
return err
}
}
return nil
}
func (m *AzureManager) buildAsgFromSpec(spec string) (cloudprovider.NodeGroup, error) {
scaleToZeroSupported := scaleToZeroSupportedStandard
if strings.EqualFold(m.config.VMType, vmTypeVMSS) {
scaleToZeroSupported = scaleToZeroSupportedVMSS
}
s, err := dynamic.SpecFromString(spec, scaleToZeroSupported)
if err != nil {
return nil, fmt.Errorf("failed to parse node group spec: %v", err)
}
switch m.config.VMType {
case vmTypeStandard:
return NewAgentPool(s, m)
case vmTypeVMSS:
return NewScaleSet(s, m)
case vmTypeAKS:
return NewAKSAgentPool(s, m)
default:
return nil, fmt.Errorf("vmtype %s not supported", m.config.VMType)
}
}
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
func (m *AzureManager) Refresh() error {
if m.lastRefresh.Add(refreshInterval).After(time.Now()) {
return nil
}
return m.forceRefresh()
}
func (m *AzureManager) forceRefresh() error {
if err := m.fetchAutoAsgs(); err != nil {
klog.Errorf("Failed to fetch ASGs: %v", err)
return err
}
m.lastRefresh = time.Now()
klog.V(2).Infof("Refreshed ASG list, next refresh after %v", m.lastRefresh.Add(refreshInterval))
return nil
}
// Fetch automatically discovered ASGs. These ASGs should be unregistered if
// they no longer exist in Azure.
func (m *AzureManager) fetchAutoAsgs() error {
groups, err := m.getFilteredAutoscalingGroups(m.asgAutoDiscoverySpecs)
if err != nil {
return fmt.Errorf("cannot autodiscover ASGs: %s", err)
}
changed := false
exists := make(map[string]bool)
for _, asg := range groups {
asgID := asg.Id()
exists[asgID] = true
if m.explicitlyConfigured[asgID] {
// This ASG was explicitly configured, but would also be
// autodiscovered. We want the explicitly configured min and max
// nodes to take precedence.
klog.V(3).Infof("Ignoring explicitly configured ASG %s for autodiscovery.", asg.Id())
continue
}
if m.RegisterAsg(asg) {
klog.V(3).Infof("Autodiscovered ASG %s using tags %v", asg.Id(), m.asgAutoDiscoverySpecs)
changed = true
}
}
for _, asg := range m.getAsgs() {
asgID := asg.Id()
if !exists[asgID] && !m.explicitlyConfigured[asgID] {
m.UnregisterAsg(asg)
changed = true
}
}
if changed {
if err := m.regenerateCache(); err != nil {
return err
}
}
return nil
}
func (m *AzureManager) getAsgs() []cloudprovider.NodeGroup {
return m.asgCache.get()
}
// RegisterAsg registers an ASG.
func (m *AzureManager) RegisterAsg(asg cloudprovider.NodeGroup) bool {
return m.asgCache.Register(asg)
}
// UnregisterAsg unregisters an ASG.
func (m *AzureManager) UnregisterAsg(asg cloudprovider.NodeGroup) bool {
return m.asgCache.Unregister(asg)
}
// GetAsgForInstance returns AsgConfig of the given Instance
func (m *AzureManager) GetAsgForInstance(instance *azureRef) (cloudprovider.NodeGroup, error) {
return m.asgCache.FindForInstance(instance, m.config.VMType)
}
func (m *AzureManager) regenerateCache() error {
m.asgCache.mutex.Lock()
defer m.asgCache.mutex.Unlock()
return m.asgCache.regenerate()
}
// Cleanup the ASG cache.
func (m *AzureManager) Cleanup() {
m.asgCache.Cleanup()
}
func (m *AzureManager) getFilteredAutoscalingGroups(filter []labelAutoDiscoveryConfig) (asgs []cloudprovider.NodeGroup, err error) {
if len(filter) == 0 {
return nil, nil
}
switch m.config.VMType {
case vmTypeVMSS:
asgs, err = m.listScaleSets(filter)
case vmTypeStandard:
asgs, err = m.listAgentPools(filter)
case vmTypeAKS:
return nil, nil
default:
err = fmt.Errorf("vmType %q not supported", m.config.VMType)
}
if err != nil {
return nil, err
}
return asgs, nil
}
// listScaleSets gets a list of scale sets and instanceIDs.
func (m *AzureManager) listScaleSets(filter []labelAutoDiscoveryConfig) (asgs []cloudprovider.NodeGroup, err error) {
ctx, cancel := getContextWithCancel()
defer cancel()
result, err := m.azClient.virtualMachineScaleSetsClient.List(ctx, m.config.ResourceGroup)
if err != nil {
klog.Errorf("VirtualMachineScaleSetsClient.List for %v failed: %v", m.config.ResourceGroup, err)
return nil, err
}
for _, scaleSet := range result {
if len(filter) > 0 {
if scaleSet.Tags == nil || len(scaleSet.Tags) == 0 {
continue
}
if !matchDiscoveryConfig(scaleSet.Tags, filter) {
continue
}
}
spec := &dynamic.NodeGroupSpec{
Name: *scaleSet.Name,
MinSize: 1,
MaxSize: -1,
SupportScaleToZero: scaleToZeroSupportedVMSS,
}
if val, ok := scaleSet.Tags["min"]; ok {
if minSize, err := strconv.Atoi(*val); err == nil {
spec.MinSize = minSize
} else {
return asgs, fmt.Errorf("invalid minimum size specified for vmss: %s", err)
}
} else {
return asgs, fmt.Errorf("no minimum size specified for vmss: %s", *scaleSet.Name)
}
if spec.MinSize < 0 {
return asgs, fmt.Errorf("minimum size must be a non-negative number of nodes")
}
if val, ok := scaleSet.Tags["max"]; ok {
if maxSize, err := strconv.Atoi(*val); err == nil {
spec.MaxSize = maxSize
} else {
return asgs, fmt.Errorf("invalid maximum size specified for vmss: %s", err)
}
} else {
return asgs, fmt.Errorf("no maximum size specified for vmss: %s", *scaleSet.Name)
}
if spec.MaxSize < 1 {
return asgs, fmt.Errorf("maximum size must be greater than 1 node")
}
if spec.MaxSize < spec.MinSize {
return asgs, fmt.Errorf("maximum size must be greater than minimum size")
}
asg, _ := NewScaleSet(spec, m)
asgs = append(asgs, asg)
}
return asgs, nil
}
// listAgentPools gets a list of agent pools and instanceIDs.
// Note: filter won't take effect for agent pools.
func (m *AzureManager) listAgentPools(filter []labelAutoDiscoveryConfig) (asgs []cloudprovider.NodeGroup, err error) {
ctx, cancel := getContextWithCancel()
defer cancel()
deploy, err := m.azClient.deploymentsClient.Get(ctx, m.config.ResourceGroup, m.config.Deployment)
if err != nil {
klog.Errorf("deploymentsClient.Get(%s, %s) failed: %v", m.config.ResourceGroup, m.config.Deployment, err)
return nil, err
}
parameters := deploy.Properties.Parameters.(map[string]interface{})
for k := range parameters {
if k == "masterVMSize" || !strings.HasSuffix(k, "VMSize") {
continue
}
poolName := strings.TrimRight(k, "VMSize")
spec := &dynamic.NodeGroupSpec{
Name: poolName,
MinSize: 1,
MaxSize: -1,
SupportScaleToZero: scaleToZeroSupportedStandard,
}
asg, _ := NewAgentPool(spec, m)
asgs = append(asgs, asg)
}
return asgs, nil
}
// ParseLabelAutoDiscoverySpecs returns any provided NodeGroupAutoDiscoverySpecs
// parsed into configuration appropriate for ASG autodiscovery.
func parseLabelAutoDiscoverySpecs(o cloudprovider.NodeGroupDiscoveryOptions) ([]labelAutoDiscoveryConfig, error) {
cfgs := make([]labelAutoDiscoveryConfig, len(o.NodeGroupAutoDiscoverySpecs))
var err error
for i, spec := range o.NodeGroupAutoDiscoverySpecs {
cfgs[i], err = parseLabelAutoDiscoverySpec(spec)
if err != nil {
return nil, err
}
}
return cfgs, nil
}
// parseLabelAutoDiscoverySpec parses a single spec and returns the corredponding node group spec.
func parseLabelAutoDiscoverySpec(spec string) (labelAutoDiscoveryConfig, error) {
cfg := labelAutoDiscoveryConfig{
Selector: make(map[string]string),
}
tokens := strings.Split(spec, ":")
if len(tokens) != 2 {
return cfg, fmt.Errorf("spec \"%s\" should be discoverer:key=value,key=value", spec)
}
discoverer := tokens[0]
if discoverer != autoDiscovererTypeLabel {
return cfg, fmt.Errorf("unsupported discoverer specified: %s", discoverer)
}
for _, arg := range strings.Split(tokens[1], ",") {
kv := strings.Split(arg, "=")
if len(kv) != 2 {
return cfg, fmt.Errorf("invalid key=value pair %s", kv)
}
k, v := kv[0], kv[1]
if k == "" || v == "" {
return cfg, fmt.Errorf("empty value not allowed in key=value tag pairs")
}
cfg.Selector[k] = v
}
return cfg, nil
}
// getSubscriptionId reads the Subscription ID from the instance metadata.
func getSubscriptionIdFromInstanceMetadata() (string, error) {
subscriptionID, present := os.LookupEnv("ARM_SUBSCRIPTION_ID")
if !present {
metadataService, err := providerazure.NewInstanceMetadataService(metadataURL)
if err != nil {
return "", err
}
metadata, err := metadataService.GetMetadata(0)
if err != nil {
return "", err
}
return metadata.Compute.SubscriptionID, nil
}
return subscriptionID, nil
}