GKE NodePool support for NAP - get NP/Migs via api - part 1

This commit is contained in:
Marcin Wielgus 2017-08-28 20:50:02 +02:00
parent 0e191ef68d
commit 51a5ad58c0
5 changed files with 140 additions and 32 deletions

View File

@ -37,13 +37,15 @@ import (
type CloudProviderBuilder struct { type CloudProviderBuilder struct {
cloudProviderFlag string cloudProviderFlag string
cloudConfig string cloudConfig string
clusterName string
} }
// NewCloudProviderBuilder builds a new builder from static settings // NewCloudProviderBuilder builds a new builder from static settings
func NewCloudProviderBuilder(cloudProviderFlag string, cloudConfig string) CloudProviderBuilder { func NewCloudProviderBuilder(cloudProviderFlag string, cloudConfig string, clusterName string) CloudProviderBuilder {
return CloudProviderBuilder{ return CloudProviderBuilder{
cloudProviderFlag: cloudProviderFlag, cloudProviderFlag: cloudProviderFlag,
cloudConfig: cloudConfig, cloudConfig: cloudConfig,
clusterName: clusterName,
} }
} }
@ -54,19 +56,24 @@ func (b CloudProviderBuilder) Build(discoveryOpts cloudprovider.NodeGroupDiscove
nodeGroupsFlag := discoveryOpts.NodeGroupSpecs nodeGroupsFlag := discoveryOpts.NodeGroupSpecs
if b.cloudProviderFlag == "gce" { if b.cloudProviderFlag == "gce" || b.cloudProviderFlag == "gke" {
// GCE Manager // GCE Manager
var gceManager *gce.GceManager var gceManager *gce.GceManager
var gceError error var gceError error
mode := gce.ModeGCE
if b.cloudProviderFlag == "gke" {
mode = gce.ModeGKE
}
if b.cloudConfig != "" { if b.cloudConfig != "" {
config, fileErr := os.Open(b.cloudConfig) config, fileErr := os.Open(b.cloudConfig)
if fileErr != nil { if fileErr != nil {
glog.Fatalf("Couldn't open cloud provider configuration %s: %#v", b.cloudConfig, err) glog.Fatalf("Couldn't open cloud provider configuration %s: %#v", b.cloudConfig, err)
} }
defer config.Close() defer config.Close()
gceManager, gceError = gce.CreateGceManager(config) gceManager, gceError = gce.CreateGceManager(config, mode, b.clusterName)
} else { } else {
gceManager, gceError = gce.CreateGceManager(nil) gceManager, gceError = gce.CreateGceManager(nil, mode, b.clusterName)
} }
if gceError != nil { if gceError != nil {
glog.Fatalf("Failed to create GCE Manager: %v", err) glog.Fatalf("Failed to create GCE Manager: %v", err)

View File

@ -168,6 +168,7 @@ type Mig struct {
maxSize int maxSize int
autoprovisioned bool autoprovisioned bool
exist bool exist bool
nodePoolName string
spec *autoprovisioningSpec spec *autoprovisioningSpec
} }

View File

@ -30,13 +30,26 @@ import (
"golang.org/x/oauth2" "golang.org/x/oauth2"
"golang.org/x/oauth2/google" "golang.org/x/oauth2/google"
gce "google.golang.org/api/compute/v1" gce "google.golang.org/api/compute/v1"
gke "google.golang.org/api/container/v1"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
provider_gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" provider_gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
) )
// GcpCloudProviderMode allows to pass information whether the cluster is GCE or GKE.
type GcpCloudProviderMode string
const ( const (
operationWaitTimeout = 5 * time.Second // ModeGCE means that the cluster is running on gce (or using the legacy gke setup).
operationPollInterval = 100 * time.Millisecond ModeGCE GcpCloudProviderMode = "gce"
// ModeGKE means that the cluster is running
ModeGKE GcpCloudProviderMode = "gke"
)
const (
operationWaitTimeout = 5 * time.Second
operationPollInterval = 100 * time.Millisecond
nodeAutoprovisioningPrefix = "nodeautoprovisioning"
) )
type migInformation struct { type migInformation struct {
@ -49,15 +62,19 @@ type GceManager struct {
migs []*migInformation migs []*migInformation
migCache map[GceRef]*Mig migCache map[GceRef]*Mig
service *gce.Service gceService *gce.Service
cacheMutex sync.Mutex gkeService *gke.Service
zone string
projectId string cacheMutex sync.Mutex
templates *templateBuilder zone string
projectId string
clusterName string
mode GcpCloudProviderMode
templates *templateBuilder
} }
// CreateGceManager constructs gceManager object. // CreateGceManager constructs gceManager object.
func CreateGceManager(configReader io.Reader) (*GceManager, error) { func CreateGceManager(configReader io.Reader, mode GcpCloudProviderMode, clusterName string) (*GceManager, error) {
// Create Google Compute Engine token. // Create Google Compute Engine token.
tokenSource := google.ComputeTokenSource("") tokenSource := google.ComputeTokenSource("")
if configReader != nil { if configReader != nil {
@ -88,17 +105,32 @@ func CreateGceManager(configReader io.Reader) (*GceManager, error) {
return nil, err return nil, err
} }
manager := &GceManager{ manager := &GceManager{
migs: make([]*migInformation, 0), migs: make([]*migInformation, 0),
service: gceService, gceService: gceService,
migCache: make(map[GceRef]*Mig), migCache: make(map[GceRef]*Mig),
zone: zone, zone: zone,
projectId: projectId, projectId: projectId,
clusterName: clusterName,
mode: mode,
templates: &templateBuilder{ templates: &templateBuilder{
projectId: projectId, projectId: projectId,
zone: zone, zone: zone,
service: gceService, service: gceService,
}, },
} }
if mode == ModeGKE {
gkeService, err := gke.New(client)
if err != nil {
return nil, err
}
manager.gkeService = gkeService
err = manager.fetchAllNodePools()
if err != nil {
glog.Errorf("Failed to fech node pools: %v", err)
}
}
go wait.Forever(func() { go wait.Forever(func() {
manager.cacheMutex.Lock() manager.cacheMutex.Lock()
defer manager.cacheMutex.Unlock() defer manager.cacheMutex.Unlock()
@ -106,33 +138,97 @@ func CreateGceManager(configReader io.Reader) (*GceManager, error) {
glog.Errorf("Error while regenerating Mig cache: %v", err) glog.Errorf("Error while regenerating Mig cache: %v", err)
} }
}, time.Hour) }, time.Hour)
return manager, nil return manager, nil
} }
// RegisterMig registers mig in Gce Manager. func (m *GceManager) assertGKE() {
func (m *GceManager) RegisterMig(mig *Mig) { if m.mode != ModeGKE {
panic(fmt.Errorf("This should run only in GKE mode"))
}
}
// Gets all registered node pools
func (m *GceManager) fetchAllNodePools() error {
m.assertGKE()
nodePoolsResponse, err := m.gkeService.Projects.Zones.Clusters.NodePools.List(m.projectId, m.zone, m.clusterName).Do()
if err != nil {
return err
}
for _, nodePool := range nodePoolsResponse.NodePools {
autoprovisioned := strings.Contains("name", nodeAutoprovisioningPrefix)
autoscaled := nodePool.Autoscaling != nil && nodePool.Autoscaling.Enabled
if !autoprovisioned && !autoscaled {
continue
}
// format is
// "https://www.googleapis.com/compute/v1/projects/mwielgus-proj/zones/europe-west1-b/instanceGroupManagers/gke-cluster-1-default-pool-ba78a787-grp"
for _, igurl := range nodePool.InstanceGroupUrls {
project, zone, name, err := parseGceUrl(igurl, "instanceGroupManagers")
if err != nil {
return err
}
mig := &Mig{
GceRef: GceRef{
Name: name,
Zone: zone,
Project: project,
},
gceManager: m,
exist: true,
autoprovisioned: autoprovisioned,
}
if autoscaled {
mig.minSize = int(nodePool.Autoscaling.MinNodeCount)
mig.maxSize = int(nodePool.Autoscaling.MaxNodeCount)
} else if autoprovisioned {
mig.minSize = minAutoprovisionedSize
mig.maxSize = maxAutoprovisionedSize
}
m.RegisterMig(mig)
}
// TODO - unregister migs
}
return nil
}
// RegisterMig registers mig in Gce Manager. Returns true if the node group didn't exist before.
func (m *GceManager) RegisterMig(mig *Mig) bool {
m.cacheMutex.Lock() m.cacheMutex.Lock()
defer m.cacheMutex.Unlock() defer m.cacheMutex.Unlock()
m.migs = append(m.migs, &migInformation{ updated := false
config: mig, for i := range m.migs {
}) if m.migs[i].config.GceRef == mig.GceRef {
m.migs[i].config = mig
glog.V(8).Infof("Updated Mig %s/%s/%s", mig.GceRef.Project, mig.GceRef.Zone, mig.GceRef.Name)
updated = true
}
}
if !updated {
glog.V(1).Infof("Registering %s/%s/%s", mig.GceRef.Project, mig.GceRef.Zone, mig.GceRef.Name)
m.migs = append(m.migs, &migInformation{
config: mig,
})
}
template, err := m.templates.getMigTemplate(mig) template, err := m.templates.getMigTemplate(mig)
if err != nil { if err != nil {
glog.Errorf("Failed to build template for %s", mig.Name) glog.Errorf("Failed to build template for %s", mig.Name)
} }
node, err := m.templates.buildNodeFromTemplate(mig, template) _, err = m.templates.buildNodeFromTemplate(mig, template)
if err != nil { if err != nil {
glog.Errorf("Failed to build template for %s", mig.Name) glog.Errorf("Failed to build template for %s", mig.Name)
} }
glog.V(4).Infof("Node template for mig %s - %#v", mig.Name, node) return !updated
} }
// GetMigSize gets MIG size. // GetMigSize gets MIG size.
func (m *GceManager) GetMigSize(mig *Mig) (int64, error) { func (m *GceManager) GetMigSize(mig *Mig) (int64, error) {
igm, err := m.service.InstanceGroupManagers.Get(mig.Project, mig.Zone, mig.Name).Do() igm, err := m.gceService.InstanceGroupManagers.Get(mig.Project, mig.Zone, mig.Name).Do()
if err != nil { if err != nil {
return -1, err return -1, err
} }
@ -142,7 +238,7 @@ func (m *GceManager) GetMigSize(mig *Mig) (int64, error) {
// SetMigSize sets MIG size. // SetMigSize sets MIG size.
func (m *GceManager) SetMigSize(mig *Mig, size int64) error { func (m *GceManager) SetMigSize(mig *Mig, size int64) error {
glog.V(0).Infof("Setting mig size %s to %d", mig.Id(), size) glog.V(0).Infof("Setting mig size %s to %d", mig.Id(), size)
op, err := m.service.InstanceGroupManagers.Resize(mig.Project, mig.Zone, mig.Name, size).Do() op, err := m.gceService.InstanceGroupManagers.Resize(mig.Project, mig.Zone, mig.Name, size).Do()
if err != nil { if err != nil {
return err return err
} }
@ -155,7 +251,7 @@ func (m *GceManager) SetMigSize(mig *Mig, size int64) error {
func (m *GceManager) waitForOp(operation *gce.Operation, project string, zone string) error { func (m *GceManager) waitForOp(operation *gce.Operation, project string, zone string) error {
for start := time.Now(); time.Since(start) < operationWaitTimeout; time.Sleep(operationPollInterval) { for start := time.Now(); time.Since(start) < operationWaitTimeout; time.Sleep(operationPollInterval) {
glog.V(4).Infof("Waiting for operation %s %s %s", project, zone, operation.Name) glog.V(4).Infof("Waiting for operation %s %s %s", project, zone, operation.Name)
if op, err := m.service.ZoneOperations.Get(project, zone, operation.Name).Do(); err == nil { if op, err := m.gceService.ZoneOperations.Get(project, zone, operation.Name).Do(); err == nil {
glog.V(4).Infof("Operation %s %s %s status: %s", project, zone, operation.Name, op.Status) glog.V(4).Infof("Operation %s %s %s status: %s", project, zone, operation.Name, op.Status)
if op.Status == "DONE" { if op.Status == "DONE" {
return nil return nil
@ -193,7 +289,7 @@ func (m *GceManager) DeleteInstances(instances []*GceRef) error {
req.Instances = append(req.Instances, GenerateInstanceUrl(instance.Project, instance.Zone, instance.Name)) req.Instances = append(req.Instances, GenerateInstanceUrl(instance.Project, instance.Zone, instance.Name))
} }
op, err := m.service.InstanceGroupManagers.DeleteInstances(commonMig.Project, commonMig.Zone, commonMig.Name, &req).Do() op, err := m.gceService.InstanceGroupManagers.DeleteInstances(commonMig.Project, commonMig.Zone, commonMig.Name, &req).Do()
if err != nil { if err != nil {
return err return err
} }
@ -235,13 +331,13 @@ func (m *GceManager) regenerateCache() error {
mig := migInfo.config mig := migInfo.config
glog.V(4).Infof("Regenerating MIG information for %s %s %s", mig.Project, mig.Zone, mig.Name) glog.V(4).Infof("Regenerating MIG information for %s %s %s", mig.Project, mig.Zone, mig.Name)
instanceGroupManager, err := m.service.InstanceGroupManagers.Get(mig.Project, mig.Zone, mig.Name).Do() instanceGroupManager, err := m.gceService.InstanceGroupManagers.Get(mig.Project, mig.Zone, mig.Name).Do()
if err != nil { if err != nil {
return err return err
} }
migInfo.basename = instanceGroupManager.BaseInstanceName migInfo.basename = instanceGroupManager.BaseInstanceName
instances, err := m.service.InstanceGroupManagers.ListManagedInstances(mig.Project, mig.Zone, mig.Name).Do() instances, err := m.gceService.InstanceGroupManagers.ListManagedInstances(mig.Project, mig.Zone, mig.Name).Do()
if err != nil { if err != nil {
glog.V(4).Infof("Failed MIG info request for %s %s %s: %v", mig.Project, mig.Zone, mig.Name, err) glog.V(4).Infof("Failed MIG info request for %s %s %s: %v", mig.Project, mig.Zone, mig.Name, err)
return err return err
@ -261,7 +357,7 @@ func (m *GceManager) regenerateCache() error {
// GetMigNodes returns mig nodes. // GetMigNodes returns mig nodes.
func (m *GceManager) GetMigNodes(mig *Mig) ([]string, error) { func (m *GceManager) GetMigNodes(mig *Mig) ([]string, error) {
instances, err := m.service.InstanceGroupManagers.ListManagedInstances(mig.Project, mig.Zone, mig.Name).Do() instances, err := m.gceService.InstanceGroupManagers.ListManagedInstances(mig.Project, mig.Zone, mig.Name).Do()
if err != nil { if err != nil {
return []string{}, err return []string{}, err
} }

View File

@ -102,6 +102,8 @@ type AutoscalingOptions struct {
BalanceSimilarNodeGroups bool BalanceSimilarNodeGroups bool
// ConfigNamespace is the namesapce cluster-autoscaler is running in and all related configmaps live in // ConfigNamespace is the namesapce cluster-autoscaler is running in and all related configmaps live in
ConfigNamespace string ConfigNamespace string
// ClusterName if available
ClusterName string
} }
// NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments // NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments
@ -109,7 +111,7 @@ func NewAutoscalingContext(options AutoscalingOptions, predicateChecker *simulat
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder,
logEventRecorder *utils.LogEventRecorder, listerRegistry kube_util.ListerRegistry) (*AutoscalingContext, errors.AutoscalerError) { logEventRecorder *utils.LogEventRecorder, listerRegistry kube_util.ListerRegistry) (*AutoscalingContext, errors.AutoscalerError) {
cloudProviderBuilder := builder.NewCloudProviderBuilder(options.CloudProviderName, options.CloudConfig) cloudProviderBuilder := builder.NewCloudProviderBuilder(options.CloudProviderName, options.CloudConfig, options.ClusterName)
cloudProvider := cloudProviderBuilder.Build(cloudprovider.NodeGroupDiscoveryOptions{ cloudProvider := cloudProviderBuilder.Build(cloudprovider.NodeGroupDiscoveryOptions{
NodeGroupSpecs: options.NodeGroups, NodeGroupSpecs: options.NodeGroups,
NodeGroupAutoDiscoverySpec: options.NodeGroupAutoDiscovery, NodeGroupAutoDiscoverySpec: options.NodeGroupAutoDiscovery,

View File

@ -63,6 +63,7 @@ func (flag *MultiStringFlag) Set(value string) error {
var ( var (
nodeGroupsFlag MultiStringFlag nodeGroupsFlag MultiStringFlag
clusterName = flag.String("clusterName", "", "Autoscaled cluster name, if available")
address = flag.String("address", ":8085", "The address to expose prometheus metrics.") address = flag.String("address", ":8085", "The address to expose prometheus metrics.")
kubernetes = flag.String("kubernetes", "", "Kubernetes master location. Leave blank for default") kubernetes = flag.String("kubernetes", "", "Kubernetes master location. Leave blank for default")
cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
@ -126,6 +127,7 @@ func createAutoscalerOptions() core.AutoscalerOptions {
WriteStatusConfigMap: *writeStatusConfigMapFlag, WriteStatusConfigMap: *writeStatusConfigMapFlag,
BalanceSimilarNodeGroups: *balanceSimilarNodeGroupsFlag, BalanceSimilarNodeGroups: *balanceSimilarNodeGroupsFlag,
ConfigNamespace: *namespace, ConfigNamespace: *namespace,
ClusterName: *clusterName,
} }
configFetcherOpts := dynamic.ConfigFetcherOptions{ configFetcherOpts := dynamic.ConfigFetcherOptions{