From 51a5ad58c031f4a6230e662b9df22f46fccf2bb6 Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Mon, 28 Aug 2017 20:50:02 +0200 Subject: [PATCH] GKE NodePool support for NAP - get NP/Migs via api - part 1 --- .../builder/cloud_provider_builder.go | 15 +- .../cloudprovider/gce/gce_cloud_provider.go | 1 + .../cloudprovider/gce/gce_manager.go | 150 ++++++++++++++---- .../core/autoscaling_context.go | 4 +- cluster-autoscaler/main.go | 2 + 5 files changed, 140 insertions(+), 32 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go b/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go index 200744d1a3..8df7a12f93 100644 --- a/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go +++ b/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go @@ -37,13 +37,15 @@ import ( type CloudProviderBuilder struct { cloudProviderFlag string cloudConfig string + clusterName string } // NewCloudProviderBuilder builds a new builder from static settings -func NewCloudProviderBuilder(cloudProviderFlag string, cloudConfig string) CloudProviderBuilder { +func NewCloudProviderBuilder(cloudProviderFlag string, cloudConfig string, clusterName string) CloudProviderBuilder { return CloudProviderBuilder{ cloudProviderFlag: cloudProviderFlag, cloudConfig: cloudConfig, + clusterName: clusterName, } } @@ -54,19 +56,24 @@ func (b CloudProviderBuilder) Build(discoveryOpts cloudprovider.NodeGroupDiscove nodeGroupsFlag := discoveryOpts.NodeGroupSpecs - if b.cloudProviderFlag == "gce" { + if b.cloudProviderFlag == "gce" || b.cloudProviderFlag == "gke" { // GCE Manager var gceManager *gce.GceManager var gceError error + mode := gce.ModeGCE + if b.cloudProviderFlag == "gke" { + mode = gce.ModeGKE + } + if b.cloudConfig != "" { config, fileErr := os.Open(b.cloudConfig) if fileErr != nil { glog.Fatalf("Couldn't open cloud provider configuration %s: %#v", b.cloudConfig, err) } defer config.Close() - gceManager, gceError = gce.CreateGceManager(config) + gceManager, gceError = gce.CreateGceManager(config, mode, b.clusterName) } else { - gceManager, gceError = gce.CreateGceManager(nil) + gceManager, gceError = gce.CreateGceManager(nil, mode, b.clusterName) } if gceError != nil { glog.Fatalf("Failed to create GCE Manager: %v", err) diff --git a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go index 063c3bb7e2..8f0e1bb78f 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go @@ -168,6 +168,7 @@ type Mig struct { maxSize int autoprovisioned bool exist bool + nodePoolName string spec *autoprovisioningSpec } diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager.go b/cluster-autoscaler/cloudprovider/gce/gce_manager.go index fa9d63c749..419c7471f4 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager.go @@ -30,13 +30,26 @@ import ( "golang.org/x/oauth2" "golang.org/x/oauth2/google" gce "google.golang.org/api/compute/v1" + gke "google.golang.org/api/container/v1" "k8s.io/apimachinery/pkg/util/wait" provider_gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" ) +// GcpCloudProviderMode allows to pass information whether the cluster is GCE or GKE. +type GcpCloudProviderMode string + const ( - operationWaitTimeout = 5 * time.Second - operationPollInterval = 100 * time.Millisecond + // ModeGCE means that the cluster is running on gce (or using the legacy gke setup). + ModeGCE GcpCloudProviderMode = "gce" + + // ModeGKE means that the cluster is running + ModeGKE GcpCloudProviderMode = "gke" +) + +const ( + operationWaitTimeout = 5 * time.Second + operationPollInterval = 100 * time.Millisecond + nodeAutoprovisioningPrefix = "nodeautoprovisioning" ) type migInformation struct { @@ -49,15 +62,19 @@ type GceManager struct { migs []*migInformation migCache map[GceRef]*Mig - service *gce.Service - cacheMutex sync.Mutex - zone string - projectId string - templates *templateBuilder + gceService *gce.Service + gkeService *gke.Service + + cacheMutex sync.Mutex + zone string + projectId string + clusterName string + mode GcpCloudProviderMode + templates *templateBuilder } // CreateGceManager constructs gceManager object. -func CreateGceManager(configReader io.Reader) (*GceManager, error) { +func CreateGceManager(configReader io.Reader, mode GcpCloudProviderMode, clusterName string) (*GceManager, error) { // Create Google Compute Engine token. tokenSource := google.ComputeTokenSource("") if configReader != nil { @@ -88,17 +105,32 @@ func CreateGceManager(configReader io.Reader) (*GceManager, error) { return nil, err } manager := &GceManager{ - migs: make([]*migInformation, 0), - service: gceService, - migCache: make(map[GceRef]*Mig), - zone: zone, - projectId: projectId, + migs: make([]*migInformation, 0), + gceService: gceService, + migCache: make(map[GceRef]*Mig), + zone: zone, + projectId: projectId, + clusterName: clusterName, + mode: mode, templates: &templateBuilder{ projectId: projectId, zone: zone, service: gceService, }, } + + if mode == ModeGKE { + gkeService, err := gke.New(client) + if err != nil { + return nil, err + } + manager.gkeService = gkeService + err = manager.fetchAllNodePools() + if err != nil { + glog.Errorf("Failed to fech node pools: %v", err) + } + } + go wait.Forever(func() { manager.cacheMutex.Lock() defer manager.cacheMutex.Unlock() @@ -106,33 +138,97 @@ func CreateGceManager(configReader io.Reader) (*GceManager, error) { glog.Errorf("Error while regenerating Mig cache: %v", err) } }, time.Hour) + return manager, nil } -// RegisterMig registers mig in Gce Manager. -func (m *GceManager) RegisterMig(mig *Mig) { +func (m *GceManager) assertGKE() { + if m.mode != ModeGKE { + panic(fmt.Errorf("This should run only in GKE mode")) + } +} + +// Gets all registered node pools +func (m *GceManager) fetchAllNodePools() error { + m.assertGKE() + + nodePoolsResponse, err := m.gkeService.Projects.Zones.Clusters.NodePools.List(m.projectId, m.zone, m.clusterName).Do() + if err != nil { + return err + } + for _, nodePool := range nodePoolsResponse.NodePools { + autoprovisioned := strings.Contains("name", nodeAutoprovisioningPrefix) + autoscaled := nodePool.Autoscaling != nil && nodePool.Autoscaling.Enabled + if !autoprovisioned && !autoscaled { + continue + } + // format is + // "https://www.googleapis.com/compute/v1/projects/mwielgus-proj/zones/europe-west1-b/instanceGroupManagers/gke-cluster-1-default-pool-ba78a787-grp" + for _, igurl := range nodePool.InstanceGroupUrls { + project, zone, name, err := parseGceUrl(igurl, "instanceGroupManagers") + if err != nil { + return err + } + mig := &Mig{ + GceRef: GceRef{ + Name: name, + Zone: zone, + Project: project, + }, + gceManager: m, + exist: true, + autoprovisioned: autoprovisioned, + } + if autoscaled { + mig.minSize = int(nodePool.Autoscaling.MinNodeCount) + mig.maxSize = int(nodePool.Autoscaling.MaxNodeCount) + } else if autoprovisioned { + mig.minSize = minAutoprovisionedSize + mig.maxSize = maxAutoprovisionedSize + } + m.RegisterMig(mig) + } + // TODO - unregister migs + } + return nil +} + +// RegisterMig registers mig in Gce Manager. Returns true if the node group didn't exist before. +func (m *GceManager) RegisterMig(mig *Mig) bool { m.cacheMutex.Lock() defer m.cacheMutex.Unlock() - m.migs = append(m.migs, &migInformation{ - config: mig, - }) + updated := false + for i := range m.migs { + if m.migs[i].config.GceRef == mig.GceRef { + m.migs[i].config = mig + glog.V(8).Infof("Updated Mig %s/%s/%s", mig.GceRef.Project, mig.GceRef.Zone, mig.GceRef.Name) + updated = true + } + } + + if !updated { + glog.V(1).Infof("Registering %s/%s/%s", mig.GceRef.Project, mig.GceRef.Zone, mig.GceRef.Name) + m.migs = append(m.migs, &migInformation{ + config: mig, + }) + } template, err := m.templates.getMigTemplate(mig) if err != nil { glog.Errorf("Failed to build template for %s", mig.Name) } - node, err := m.templates.buildNodeFromTemplate(mig, template) + _, err = m.templates.buildNodeFromTemplate(mig, template) if err != nil { glog.Errorf("Failed to build template for %s", mig.Name) } - glog.V(4).Infof("Node template for mig %s - %#v", mig.Name, node) + return !updated } // GetMigSize gets MIG size. func (m *GceManager) GetMigSize(mig *Mig) (int64, error) { - igm, err := m.service.InstanceGroupManagers.Get(mig.Project, mig.Zone, mig.Name).Do() + igm, err := m.gceService.InstanceGroupManagers.Get(mig.Project, mig.Zone, mig.Name).Do() if err != nil { return -1, err } @@ -142,7 +238,7 @@ func (m *GceManager) GetMigSize(mig *Mig) (int64, error) { // SetMigSize sets MIG size. func (m *GceManager) SetMigSize(mig *Mig, size int64) error { glog.V(0).Infof("Setting mig size %s to %d", mig.Id(), size) - op, err := m.service.InstanceGroupManagers.Resize(mig.Project, mig.Zone, mig.Name, size).Do() + op, err := m.gceService.InstanceGroupManagers.Resize(mig.Project, mig.Zone, mig.Name, size).Do() if err != nil { return err } @@ -155,7 +251,7 @@ func (m *GceManager) SetMigSize(mig *Mig, size int64) error { func (m *GceManager) waitForOp(operation *gce.Operation, project string, zone string) error { for start := time.Now(); time.Since(start) < operationWaitTimeout; time.Sleep(operationPollInterval) { glog.V(4).Infof("Waiting for operation %s %s %s", project, zone, operation.Name) - if op, err := m.service.ZoneOperations.Get(project, zone, operation.Name).Do(); err == nil { + if op, err := m.gceService.ZoneOperations.Get(project, zone, operation.Name).Do(); err == nil { glog.V(4).Infof("Operation %s %s %s status: %s", project, zone, operation.Name, op.Status) if op.Status == "DONE" { return nil @@ -193,7 +289,7 @@ func (m *GceManager) DeleteInstances(instances []*GceRef) error { req.Instances = append(req.Instances, GenerateInstanceUrl(instance.Project, instance.Zone, instance.Name)) } - op, err := m.service.InstanceGroupManagers.DeleteInstances(commonMig.Project, commonMig.Zone, commonMig.Name, &req).Do() + op, err := m.gceService.InstanceGroupManagers.DeleteInstances(commonMig.Project, commonMig.Zone, commonMig.Name, &req).Do() if err != nil { return err } @@ -235,13 +331,13 @@ func (m *GceManager) regenerateCache() error { mig := migInfo.config glog.V(4).Infof("Regenerating MIG information for %s %s %s", mig.Project, mig.Zone, mig.Name) - instanceGroupManager, err := m.service.InstanceGroupManagers.Get(mig.Project, mig.Zone, mig.Name).Do() + instanceGroupManager, err := m.gceService.InstanceGroupManagers.Get(mig.Project, mig.Zone, mig.Name).Do() if err != nil { return err } migInfo.basename = instanceGroupManager.BaseInstanceName - instances, err := m.service.InstanceGroupManagers.ListManagedInstances(mig.Project, mig.Zone, mig.Name).Do() + instances, err := m.gceService.InstanceGroupManagers.ListManagedInstances(mig.Project, mig.Zone, mig.Name).Do() if err != nil { glog.V(4).Infof("Failed MIG info request for %s %s %s: %v", mig.Project, mig.Zone, mig.Name, err) return err @@ -261,7 +357,7 @@ func (m *GceManager) regenerateCache() error { // GetMigNodes returns mig nodes. func (m *GceManager) GetMigNodes(mig *Mig) ([]string, error) { - instances, err := m.service.InstanceGroupManagers.ListManagedInstances(mig.Project, mig.Zone, mig.Name).Do() + instances, err := m.gceService.InstanceGroupManagers.ListManagedInstances(mig.Project, mig.Zone, mig.Name).Do() if err != nil { return []string{}, err } diff --git a/cluster-autoscaler/core/autoscaling_context.go b/cluster-autoscaler/core/autoscaling_context.go index ba9937d846..a9fe36c01b 100644 --- a/cluster-autoscaler/core/autoscaling_context.go +++ b/cluster-autoscaler/core/autoscaling_context.go @@ -102,6 +102,8 @@ type AutoscalingOptions struct { BalanceSimilarNodeGroups bool // ConfigNamespace is the namesapce cluster-autoscaler is running in and all related configmaps live in ConfigNamespace string + // ClusterName if available + ClusterName string } // NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments @@ -109,7 +111,7 @@ func NewAutoscalingContext(options AutoscalingOptions, predicateChecker *simulat kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, logEventRecorder *utils.LogEventRecorder, listerRegistry kube_util.ListerRegistry) (*AutoscalingContext, errors.AutoscalerError) { - cloudProviderBuilder := builder.NewCloudProviderBuilder(options.CloudProviderName, options.CloudConfig) + cloudProviderBuilder := builder.NewCloudProviderBuilder(options.CloudProviderName, options.CloudConfig, options.ClusterName) cloudProvider := cloudProviderBuilder.Build(cloudprovider.NodeGroupDiscoveryOptions{ NodeGroupSpecs: options.NodeGroups, NodeGroupAutoDiscoverySpec: options.NodeGroupAutoDiscovery, diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 249ffc7f12..4d8e3605f1 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -63,6 +63,7 @@ func (flag *MultiStringFlag) Set(value string) error { var ( nodeGroupsFlag MultiStringFlag + clusterName = flag.String("clusterName", "", "Autoscaled cluster name, if available") address = flag.String("address", ":8085", "The address to expose prometheus metrics.") kubernetes = flag.String("kubernetes", "", "Kubernetes master location. Leave blank for default") cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") @@ -126,6 +127,7 @@ func createAutoscalerOptions() core.AutoscalerOptions { WriteStatusConfigMap: *writeStatusConfigMapFlag, BalanceSimilarNodeGroups: *balanceSimilarNodeGroupsFlag, ConfigNamespace: *namespace, + ClusterName: *clusterName, } configFetcherOpts := dynamic.ConfigFetcherOptions{