From 50f57321ff93bb35f9ad3512e40b5442a30fad5d Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Fri, 8 Jul 2016 13:23:10 +0200 Subject: [PATCH] Cluster-autoscaler: use cloud provider interface in the code --- .../cloudprovider/cloud_provider.go | 4 - .../cloudprovider/gce/gce_cloud_provider.go | 43 +--- cluster-autoscaler/cluster_autoscaler.go | 78 +++--- cluster-autoscaler/config/migconfig.go | 115 --------- cluster-autoscaler/config/migconfig_test.go | 40 --- cluster-autoscaler/scale_down.go | 39 ++- cluster-autoscaler/scale_up.go | 64 +++-- cluster-autoscaler/utils.go | 61 ++--- cluster-autoscaler/utils/gce/gce.go | 236 ------------------ cluster-autoscaler/utils/gce_url/gce_url.go | 73 ------ .../utils/gce_url/gce_url_test.go | 43 ---- 11 files changed, 131 insertions(+), 665 deletions(-) delete mode 100644 cluster-autoscaler/config/migconfig.go delete mode 100644 cluster-autoscaler/config/migconfig_test.go delete mode 100644 cluster-autoscaler/utils/gce/gce.go delete mode 100644 cluster-autoscaler/utils/gce_url/gce_url.go delete mode 100644 cluster-autoscaler/utils/gce_url/gce_url_test.go diff --git a/cluster-autoscaler/cloudprovider/cloud_provider.go b/cluster-autoscaler/cloudprovider/cloud_provider.go index 1eeed55439..adad7517ea 100644 --- a/cluster-autoscaler/cloudprovider/cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/cloud_provider.go @@ -50,10 +50,6 @@ type NodeGroup interface { // removed nodes are deleted completely) TargetSize() (int, error) - // GetSampleNode returns a sample node that belongs to this node group, or error - // if occurred. - SampleNode() (*kube_api.Node, error) - // IncreaseSize increases the size of the node group. To delete a node you need // to explicitly name it and use DeleteNode. This function should wait until // node group size is updated. diff --git a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go index d0050b546e..e3d0f90206 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go @@ -25,22 +25,17 @@ import ( kube_api "k8s.io/kubernetes/pkg/api" ) -// NodeProvider is a function that provides a list of nodes. -type NodeProvider func() ([]*kube_api.Node, error) - // GceCloudProvider implements CloudProvider interface. type GceCloudProvider struct { - gceManager *GceManager - migs []*Mig - nodeProvider NodeProvider + gceManager *GceManager + migs []*Mig } // BuildGceCloudProvider builds CloudProvider implementation for GCE. -func BuildGceCloudProvider(gceManager *GceManager, nodeProvider NodeProvider, specs []string) (*GceCloudProvider, error) { +func BuildGceCloudProvider(gceManager *GceManager, specs []string) (*GceCloudProvider, error) { gce := &GceCloudProvider{ - gceManager: gceManager, - migs: make([]*Mig, 0), - nodeProvider: nodeProvider, + gceManager: gceManager, + migs: make([]*Mig, 0), } for _, spec := range specs { if err := gce.addNodeGroup(spec); err != nil { @@ -57,20 +52,6 @@ func (gce *GceCloudProvider) addNodeGroup(spec string) error { if err != nil { return err } - nodes, err := gce.nodeProvider() - if err != nil { - return err - } - // TODO: revisit how sample nodes are chosen. - for _, node := range nodes { - if belongs, err := mig.Belongs(node); err == nil && belongs { - mig.sampleNode = node - break - } - } - if mig.sampleNode == nil { - return fmt.Errorf("no sample node found for %s", mig.Id()) - } gce.migs = append(gce.migs, mig) gce.gceManager.RegisterMig(mig) return nil @@ -129,9 +110,8 @@ type Mig struct { gceManager *GceManager - minSize int - maxSize int - sampleNode *kube_api.Node + minSize int + maxSize int } // MaxSize returns maximum size of the node group. @@ -151,15 +131,6 @@ func (mig *Mig) TargetSize() (int, error) { return int(size), err } -// SampleNode returns a sample node for the mig. Assumes that mig definition doesn't change over time. -// The node may not exist anymore. -func (mig *Mig) SampleNode() (*kube_api.Node, error) { - if mig.sampleNode != nil { - return mig.sampleNode, nil - } - return nil, fmt.Errorf("no sample node available") -} - // IncreaseSize increases Mig size func (mig *Mig) IncreaseSize(delta int) error { if delta <= 0 { diff --git a/cluster-autoscaler/cluster_autoscaler.go b/cluster-autoscaler/cluster_autoscaler.go index e4e05ce34b..7204570c70 100644 --- a/cluster-autoscaler/cluster_autoscaler.go +++ b/cluster-autoscaler/cluster_autoscaler.go @@ -21,11 +21,13 @@ import ( "net/http" "net/url" "os" + "strings" "time" + "k8s.io/contrib/cluster-autoscaler/cloudprovider" + "k8s.io/contrib/cluster-autoscaler/cloudprovider/gce" "k8s.io/contrib/cluster-autoscaler/config" "k8s.io/contrib/cluster-autoscaler/simulator" - "k8s.io/contrib/cluster-autoscaler/utils/gce" kube_api "k8s.io/kubernetes/pkg/api" kube_record "k8s.io/kubernetes/pkg/client/record" kube_client "k8s.io/kubernetes/pkg/client/unversioned" @@ -34,8 +36,22 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// MultiStringFlag is a flag for passing multiple parameters using same flag +type MultiStringFlag []string + +// String returns string representation of the node groups. +func (flag *MultiStringFlag) String() string { + return "[" + strings.Join(*flag, " ") + "]" +} + +// Set adds a new configuration. +func (flag *MultiStringFlag) Set(value string) error { + *flag = append(*flag, value) + return nil +} + var ( - migConfigFlag config.MigConfigFlag + nodeGroupsFlag MultiStringFlag address = flag.String("address", ":8085", "The address to expose prometheus metrics.") kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default") cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") @@ -52,13 +68,15 @@ var ( scaleDownTrialInterval = flag.Duration("scale-down-trial-interval", 1*time.Minute, "How often scale down possiblity is check") scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") + + cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce") ) func main() { glog.Infof("Cluster Autoscaler %s", ClusterAutoscalerVersion) - flag.Var(&migConfigFlag, "nodes", "sets min,max size and url of a MIG to be controlled by Cluster Autoscaler. "+ - "Can be used multiple times. Format: ::") + flag.Var(&nodeGroupsFlag, "nodes", "sets min,max size and other configuration data for a node group in a format accepted by cloud provider."+ + "Can be used multiple times. Format: ::") flag.Parse() go func() { @@ -77,27 +95,6 @@ func main() { if err != nil { glog.Fatalf("Failed to build Kuberentes client configuration: %v", err) } - migConfigs := make([]*config.MigConfig, 0, len(migConfigFlag)) - for i := range migConfigFlag { - migConfigs = append(migConfigs, &migConfigFlag[i]) - } - - // GCE Manager - var gceManager *gce.GceManager - var gceError error - if *cloudConfig != "" { - config, fileErr := os.Open(*cloudConfig) - if fileErr != nil { - glog.Fatalf("Couldn't open cloud provider configuration %s: %#v", *cloudConfig, err) - } - defer config.Close() - gceManager, gceError = gce.CreateGceManager(migConfigs, config) - } else { - gceManager, gceError = gce.CreateGceManager(migConfigs, nil) - } - if gceError != nil { - glog.Fatalf("Failed to create GCE Manager: %v", err) - } kubeClient := kube_client.NewOrDie(kubeConfig) @@ -118,6 +115,31 @@ func main() { eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) recorder := eventBroadcaster.NewRecorder(kube_api.EventSource{Component: "cluster-autoscaler"}) + var cloudProvider cloudprovider.CloudProvider + + if *cloudProviderFlag == "gce" { + // GCE Manager + var gceManager *gce.GceManager + var gceError error + if *cloudConfig != "" { + config, fileErr := os.Open(*cloudConfig) + if fileErr != nil { + glog.Fatalf("Couldn't open cloud provider configuration %s: %#v", *cloudConfig, err) + } + defer config.Close() + gceManager, gceError = gce.CreateGceManager(config) + } else { + gceManager, gceError = gce.CreateGceManager(nil) + } + if gceError != nil { + glog.Fatalf("Failed to create GCE Manager: %v", err) + } + cloudProvider, err = gce.BuildGceCloudProvider(gceManager, nodeGroupsFlag) + if err != nil { + glog.Fatalf("Failed to create GCE cloud provider: %v", err) + } + } + for { select { case <-time.After(*scanInterval): @@ -135,7 +157,7 @@ func main() { continue } - if err := CheckMigsAndNodes(nodes, gceManager); err != nil { + if err := CheckGroupsAndNodes(nodes, cloudProvider); err != nil { glog.Warningf("Cluster is not ready for autoscaling: %v", err) continue } @@ -188,7 +210,7 @@ func main() { } else { scaleUpStart := time.Now() updateLastTime("scaleup") - scaledUp, err := ScaleUp(unschedulablePodsToHelp, nodes, migConfigs, gceManager, kubeClient, predicateChecker, recorder) + scaledUp, err := ScaleUp(unschedulablePodsToHelp, nodes, cloudProvider, kubeClient, predicateChecker, recorder) updateDuration("scaleup", scaleUpStart) @@ -245,7 +267,7 @@ func main() { unneededNodes, *scaleDownUnneededTime, allScheduled, - gceManager, kubeClient, predicateChecker) + cloudProvider, kubeClient, predicateChecker) updateDuration("scaledown", scaleDownStart) diff --git a/cluster-autoscaler/config/migconfig.go b/cluster-autoscaler/config/migconfig.go deleted file mode 100644 index f1c679cfbd..0000000000 --- a/cluster-autoscaler/config/migconfig.go +++ /dev/null @@ -1,115 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -import ( - "fmt" - "strconv" - "strings" - - gceurl "k8s.io/contrib/cluster-autoscaler/utils/gce_url" - kube_api "k8s.io/kubernetes/pkg/api" -) - -// InstanceConfig contains instance configuration details. -type InstanceConfig struct { - Project string - Zone string - Name string -} - -// InstanceConfigFromProviderId creates InstanceConfig object -// from provider id which must be in format: -// gce://// -// TODO(piosz): add better check whether the id is correct -func InstanceConfigFromProviderId(id string) (*InstanceConfig, error) { - splitted := strings.Split(id[6:], "/") - if len(splitted) != 3 { - return nil, fmt.Errorf("Wrong id: expected format gce:////, got %v", id) - } - return &InstanceConfig{ - Project: splitted[0], - Zone: splitted[1], - Name: splitted[2], - }, nil -} - -// MigConfig contains managed instance group configuration details. -type MigConfig struct { - MinSize int - MaxSize int - Project string - Zone string - Name string -} - -// Url builds GCE url for the MIG. -func (migconfig *MigConfig) Url() string { - return gceurl.GenerateMigUrl(migconfig.Project, migconfig.Zone, migconfig.Name) -} - -// Node returns a template/dummy node for the mig. -func (migconfig *MigConfig) Node() *kube_api.Node { - //TODO(fgrzadkowski): Implement this. - return nil -} - -// MigConfigFlag is an array of MIG configuration details. Working as a multi-value flag. -type MigConfigFlag []MigConfig - -// String returns string representation of the MIG. -func (migconfigflag *MigConfigFlag) String() string { - configs := make([]string, len(*migconfigflag)) - for _, migconfig := range *migconfigflag { - configs = append(configs, fmt.Sprintf("%d:%d:%s:%s", migconfig.MinSize, migconfig.MaxSize, migconfig.Zone, migconfig.Name)) - } - return "[" + strings.Join(configs, " ") + "]" -} - -// Set adds a new configuration. -func (migconfigflag *MigConfigFlag) Set(value string) error { - tokens := strings.SplitN(value, ":", 3) - if len(tokens) != 3 { - return fmt.Errorf("wrong nodes configuration: %s", value) - } - migconfig := MigConfig{} - if size, err := strconv.Atoi(tokens[0]); err == nil { - if size <= 0 { - return fmt.Errorf("min size must be >= 1") - } - migconfig.MinSize = size - } else { - return fmt.Errorf("failed to set min size: %s, expected integer", tokens[0]) - } - - if size, err := strconv.Atoi(tokens[1]); err == nil { - if size < migconfig.MinSize { - return fmt.Errorf("max size must be greater or equal to min size") - } - migconfig.MaxSize = size - } else { - return fmt.Errorf("failed to set max size: %s, expected integer", tokens[1]) - } - - var err error - if migconfig.Project, migconfig.Zone, migconfig.Name, err = gceurl.ParseMigUrl(tokens[2]); err != nil { - return fmt.Errorf("failed to parse mig url: %s got error: %v", tokens[2], err) - } - - *migconfigflag = append(*migconfigflag, migconfig) - return nil -} diff --git a/cluster-autoscaler/config/migconfig_test.go b/cluster-autoscaler/config/migconfig_test.go deleted file mode 100644 index ed4bebe43a..0000000000 --- a/cluster-autoscaler/config/migconfig_test.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestSet(t *testing.T) { - migConfigFlag := MigConfigFlag{} - assert.Error(t, migConfigFlag.Set("a")) - assert.Error(t, migConfigFlag.Set("a:b:c")) - assert.Error(t, migConfigFlag.Set("1:2:x")) - assert.Error(t, migConfigFlag.Set("1:2:")) - assert.NoError(t, migConfigFlag.Set("111:222:https://content.googleapis.com/compute/v1/projects/test-project/zones/test-zone/instanceGroups/test-name")) - assert.Equal(t, 111, migConfigFlag[0].MinSize) - assert.Equal(t, 222, migConfigFlag[0].MaxSize) - assert.Equal(t, "test-zone", migConfigFlag[0].Zone) - assert.Equal(t, "test-name", migConfigFlag[0].Name) - assert.Contains(t, migConfigFlag.String(), "111") - assert.Contains(t, migConfigFlag.String(), "222") - assert.Contains(t, migConfigFlag.String(), "test-zone") - assert.Contains(t, migConfigFlag.String(), "test-name") -} diff --git a/cluster-autoscaler/scale_down.go b/cluster-autoscaler/scale_down.go index 5c18f964db..ea49289814 100644 --- a/cluster-autoscaler/scale_down.go +++ b/cluster-autoscaler/scale_down.go @@ -20,9 +20,8 @@ import ( "fmt" "time" - "k8s.io/contrib/cluster-autoscaler/config" + "k8s.io/contrib/cluster-autoscaler/cloudprovider" "k8s.io/contrib/cluster-autoscaler/simulator" - "k8s.io/contrib/cluster-autoscaler/utils/gce" kube_api "k8s.io/kubernetes/pkg/api" kube_client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" @@ -106,7 +105,7 @@ func ScaleDown( unneededNodes map[string]time.Time, unneededTime time.Duration, pods []*kube_api.Pod, - gceManager *gce.GceManager, + cloudProvider cloudprovider.CloudProvider, client *kube_client.Client, predicateChecker *simulator.PredicateChecker) (ScaleDownResult, error) { @@ -122,30 +121,24 @@ func ScaleDown( continue } - // Check mig size. - instance, err := config.InstanceConfigFromProviderId(node.Spec.ProviderID) + nodeGroup, err := cloudProvider.NodeGroupForNode(node) if err != nil { - glog.Errorf("Error while parsing providerid of %s: %v", node.Name, err) + glog.Errorf("Error while checking node group for %s: %v", node.Name, err) continue } - migConfig, err := gceManager.GetMigForInstance(instance) - if err != nil { - glog.Errorf("Error while checking mig config for instance %v: %v", instance, err) - continue - } - if migConfig == nil { - glog.V(4).Infof("Skipping %s - no mig config", node.Name) + if nodeGroup == nil { + glog.V(4).Infof("Skipping %s - no node group config", node.Name) continue } - size, err := gceManager.GetMigSize(migConfig) + size, err := nodeGroup.TargetSize() if err != nil { - glog.Errorf("Error while checking mig size for instance %v: %v", instance, err) + glog.Errorf("Error while checking node group size %s: %v", nodeGroup.Id(), err) continue } - if size <= int64(migConfig.MinSize) { - glog.V(1).Infof("Skipping %s - mig min size reached", node.Name) + if size <= nodeGroup.MinSize() { + glog.V(1).Infof("Skipping %s - node group min size reached", node.Name) continue } @@ -167,14 +160,18 @@ func ScaleDown( } nodeToRemove := nodesToRemove[0] glog.Infof("Removing %s", nodeToRemove.Name) - instanceConfig, err := config.InstanceConfigFromProviderId(nodeToRemove.Spec.ProviderID) + + nodeGroup, err := cloudProvider.NodeGroupForNode(nodeToRemove) if err != nil { - return ScaleDownError, fmt.Errorf("Failed to get instance config for %s: %v", nodeToRemove.Name, err) + return ScaleDownError, fmt.Errorf("failed to node group for %s: %v", nodeToRemove.Name, err) + } + if nodeGroup == nil { + return ScaleDownError, fmt.Errorf("picked node that doesn't belong to a node group: %s", nodeToRemove.Name) } - err = gceManager.DeleteInstances([]*config.InstanceConfig{instanceConfig}) + err = nodeGroup.DeleteNodes([]*kube_api.Node{nodeToRemove}) if err != nil { - return ScaleDownError, fmt.Errorf("Failed to delete %v: %v", instanceConfig, err) + return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", nodeToRemove.Name, err) } return ScaleDownNodeDeleted, nil diff --git a/cluster-autoscaler/scale_up.go b/cluster-autoscaler/scale_up.go index 824e249d1d..ba0c663ad3 100644 --- a/cluster-autoscaler/scale_up.go +++ b/cluster-autoscaler/scale_up.go @@ -19,10 +19,9 @@ package main import ( "fmt" - "k8s.io/contrib/cluster-autoscaler/config" + "k8s.io/contrib/cluster-autoscaler/cloudprovider" "k8s.io/contrib/cluster-autoscaler/estimator" "k8s.io/contrib/cluster-autoscaler/simulator" - "k8s.io/contrib/cluster-autoscaler/utils/gce" kube_api "k8s.io/kubernetes/pkg/api" kube_record "k8s.io/kubernetes/pkg/client/record" kube_client "k8s.io/kubernetes/pkg/client/unversioned" @@ -32,14 +31,13 @@ import ( // ExpansionOption describes an option to expand the cluster. type ExpansionOption struct { - migConfig *config.MigConfig + nodeGroup cloudprovider.NodeGroup estimator *estimator.BasicNodeEstimator } // ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size, // false if it didn't and error if an error occured. -func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, migConfigs []*config.MigConfig, - gceManager *gce.GceManager, kubeClient *kube_client.Client, +func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, cloudProvider cloudprovider.CloudProvider, kubeClient *kube_client.Client, predicateChecker *simulator.PredicateChecker, recorder kube_record.EventRecorder) (bool, error) { // From now on we only care about unschedulable pods that were marked after the newest @@ -54,48 +52,48 @@ func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, migConfi } expansionOptions := make([]ExpansionOption, 0) - nodeInfos, err := GetNodeInfosForMigs(nodes, gceManager, kubeClient) + nodeInfos, err := GetNodeInfosForGroups(nodes, cloudProvider, kubeClient) if err != nil { - return false, fmt.Errorf("failed to build node infors for migs: %v", err) + return false, fmt.Errorf("failed to build node infos for node groups: %v", err) } podsRemainUnshedulable := make(map[*kube_api.Pod]struct{}) - for _, migConfig := range migConfigs { + for _, nodeGroup := range cloudProvider.NodeGroups() { - currentSize, err := gceManager.GetMigSize(migConfig) + currentSize, err := nodeGroup.TargetSize() if err != nil { - glog.Errorf("Failed to get MIG size: %v", err) + glog.Errorf("Failed to get node group size: %v", err) continue } - if currentSize >= int64(migConfig.MaxSize) { - // skip this mig. - glog.V(4).Infof("Skipping MIG %s - max size reached", migConfig.Url()) + if currentSize >= nodeGroup.MaxSize() { + // skip this node group. + glog.V(4).Infof("Skipping node group %s - max size reached", nodeGroup.Id()) continue } option := ExpansionOption{ - migConfig: migConfig, + nodeGroup: nodeGroup, estimator: estimator.NewBasicNodeEstimator(), } - migHelpsSomePods := false + groupHelpsSomePods := false - nodeInfo, found := nodeInfos[migConfig.Url()] + nodeInfo, found := nodeInfos[nodeGroup.Id()] if !found { - glog.Errorf("No node info for: %s", migConfig.Url()) + glog.Errorf("No node info for: %s", nodeGroup.Id()) continue } for _, pod := range unschedulablePods { err = predicateChecker.CheckPredicates(pod, nodeInfo) if err == nil { - migHelpsSomePods = true + groupHelpsSomePods = true option.estimator.Add(pod) } else { glog.V(2).Infof("Scale-up predicate failed: %v", err) podsRemainUnshedulable[pod] = struct{}{} } } - if migHelpsSomePods { + if groupHelpsSomePods { expansionOptions = append(expansionOptions, option) } } @@ -103,36 +101,36 @@ func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, migConfi // Pick some expansion option. bestOption := BestExpansionOption(expansionOptions) if bestOption != nil && bestOption.estimator.GetCount() > 0 { - glog.V(1).Infof("Best option to resize: %s", bestOption.migConfig.Url()) - nodeInfo, found := nodeInfos[bestOption.migConfig.Url()] + glog.V(1).Infof("Best option to resize: %s", bestOption.nodeGroup.Id()) + nodeInfo, found := nodeInfos[bestOption.nodeGroup.Id()] if !found { - return false, fmt.Errorf("no sample node for: %s", bestOption.migConfig.Url()) + return false, fmt.Errorf("no sample node for: %s", bestOption.nodeGroup.Id()) } node := nodeInfo.Node() estimate, report := bestOption.estimator.Estimate(node) glog.V(1).Info(bestOption.estimator.GetDebug()) glog.V(1).Info(report) - glog.V(1).Infof("Estimated %d nodes needed in %s", estimate, bestOption.migConfig.Url()) + glog.V(1).Infof("Estimated %d nodes needed in %s", estimate, bestOption.nodeGroup.Id()) - currentSize, err := gceManager.GetMigSize(bestOption.migConfig) + currentSize, err := bestOption.nodeGroup.TargetSize() if err != nil { - return false, fmt.Errorf("failed to get MIG size: %v", err) + return false, fmt.Errorf("failed to get node group size: %v", err) } - newSize := currentSize + int64(estimate) - if newSize >= int64(bestOption.migConfig.MaxSize) { - glog.V(1).Infof("Capping size to MAX (%d)", bestOption.migConfig.MaxSize) - newSize = int64(bestOption.migConfig.MaxSize) + newSize := currentSize + estimate + if newSize >= bestOption.nodeGroup.MaxSize() { + glog.V(1).Infof("Capping size to MAX (%d)", bestOption.nodeGroup.MaxSize()) + newSize = bestOption.nodeGroup.MaxSize() } - glog.V(1).Infof("Setting %s size to %d", bestOption.migConfig.Url(), newSize) + glog.V(1).Infof("Setting %s size to %d", bestOption.nodeGroup.Id(), newSize) - if err := gceManager.SetMigSize(bestOption.migConfig, newSize); err != nil { - return false, fmt.Errorf("failed to set MIG size: %v", err) + if err := bestOption.nodeGroup.IncreaseSize(newSize - currentSize); err != nil { + return false, fmt.Errorf("failed to increase node group size: %v", err) } for pod := range bestOption.estimator.FittingPods { recorder.Eventf(pod, kube_api.EventTypeNormal, "TriggeredScaleUp", - "pod triggered scale-up, mig: %s, sizes (current/new): %d/%d", bestOption.migConfig.Name, currentSize, newSize) + "pod triggered scale-up, group: %s, sizes (current/new): %d/%d", bestOption.nodeGroup.Id(), currentSize, newSize) } return true, nil diff --git a/cluster-autoscaler/utils.go b/cluster-autoscaler/utils.go index 7b7214b050..a9b6764b6c 100644 --- a/cluster-autoscaler/utils.go +++ b/cluster-autoscaler/utils.go @@ -20,9 +20,8 @@ import ( "fmt" "time" - "k8s.io/contrib/cluster-autoscaler/config" + "k8s.io/contrib/cluster-autoscaler/cloudprovider" "k8s.io/contrib/cluster-autoscaler/simulator" - "k8s.io/contrib/cluster-autoscaler/utils/gce" kube_api "k8s.io/kubernetes/pkg/api" kube_api_unversioned "k8s.io/kubernetes/pkg/api/unversioned" @@ -227,66 +226,56 @@ func createNodeNameToInfoMap(pods []*kube_api.Pod, nodes []*kube_api.Node) map[s return nodeNameToNodeInfo } -// CheckMigsAndNodes checks if all migs have all required nodes. -func CheckMigsAndNodes(nodes []*kube_api.Node, gceManager *gce.GceManager) error { - migCount := make(map[string]int) - migs := make(map[string]*config.MigConfig) +// CheckGroupsAndNodes checks if all node groups have all required nodes. +func CheckGroupsAndNodes(nodes []*kube_api.Node, cloudProvider cloudprovider.CloudProvider) error { + groupCount := make(map[string]int) for _, node := range nodes { - instanceConfig, err := config.InstanceConfigFromProviderId(node.Spec.ProviderID) - if err != nil { - return err - } - migConfig, err := gceManager.GetMigForInstance(instanceConfig) + group, err := cloudProvider.NodeGroupForNode(node) if err != nil { return err } - if migConfig == nil { + if group == nil { continue } - url := migConfig.Url() - count, _ := migCount[url] - migCount[url] = count + 1 - migs[url] = migConfig + id := group.Id() + count, _ := groupCount[id] + groupCount[id] = count + 1 } - for url, mig := range migs { - size, err := gceManager.GetMigSize(mig) + for _, nodeGroup := range cloudProvider.NodeGroups() { + size, err := nodeGroup.TargetSize() if err != nil { return err } - count := migCount[url] - if size != int64(count) { - return fmt.Errorf("wrong number of nodes for mig: %s expected: %d actual: %d", url, size, count) + count := groupCount[nodeGroup.Id()] + if size != count { + return fmt.Errorf("wrong number of nodes for node group: %s expected: %d actual: %d", nodeGroup.Id(), size, count) } } return nil } -// GetNodeInfosForMigs finds NodeInfos for all migs used to manage the given nodes. It also returns a mig to sample node mapping. +// GetNodeInfosForGroups finds NodeInfos for all node groups used to manage the given nodes. It also returns a node group to sample node mapping. // TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key. -func GetNodeInfosForMigs(nodes []*kube_api.Node, gceManager *gce.GceManager, kubeClient *kube_client.Client) (map[string]*schedulercache.NodeInfo, error) { +func GetNodeInfosForGroups(nodes []*kube_api.Node, cloudProvider cloudprovider.CloudProvider, kubeClient *kube_client.Client) (map[string]*schedulercache.NodeInfo, error) { result := make(map[string]*schedulercache.NodeInfo) for _, node := range nodes { - instanceConfig, err := config.InstanceConfigFromProviderId(node.Spec.ProviderID) - if err != nil { - return map[string]*schedulercache.NodeInfo{}, err - } - migConfig, err := gceManager.GetMigForInstance(instanceConfig) + nodeGroup, err := cloudProvider.NodeGroupForNode(node) if err != nil { return map[string]*schedulercache.NodeInfo{}, err } - if migConfig == nil { + if nodeGroup == nil { continue } - url := migConfig.Url() - - nodeInfo, err := simulator.BuildNodeInfoForNode(node, kubeClient) - if err != nil { - return map[string]*schedulercache.NodeInfo{}, err + id := nodeGroup.Id() + if _, found := result[id]; !found { + nodeInfo, err := simulator.BuildNodeInfoForNode(node, kubeClient) + if err != nil { + return map[string]*schedulercache.NodeInfo{}, err + } + result[id] = nodeInfo } - - result[url] = nodeInfo } return result, nil } diff --git a/cluster-autoscaler/utils/gce/gce.go b/cluster-autoscaler/utils/gce/gce.go deleted file mode 100644 index a78f83c4ae..0000000000 --- a/cluster-autoscaler/utils/gce/gce.go +++ /dev/null @@ -1,236 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package gce - -import ( - "fmt" - "io" - "strings" - "sync" - "time" - - "gopkg.in/gcfg.v1" - - "k8s.io/contrib/cluster-autoscaler/config" - gceurl "k8s.io/contrib/cluster-autoscaler/utils/gce_url" - - "github.com/golang/glog" - "golang.org/x/oauth2" - "golang.org/x/oauth2/google" - gce "google.golang.org/api/compute/v1" - provider_gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" - "k8s.io/kubernetes/pkg/util/wait" -) - -const ( - operationWaitTimeout = 5 * time.Second - operationPollInterval = 100 * time.Millisecond -) - -type migInformation struct { - config *config.MigConfig - basename string -} - -// GceManager is handles gce communication and data caching. -type GceManager struct { - migs []*migInformation - service *gce.Service - migCache map[config.InstanceConfig]*config.MigConfig - cacheMutex sync.Mutex -} - -// CreateGceManager constructs gceManager object. -func CreateGceManager(migs []*config.MigConfig, configReader io.Reader) (*GceManager, error) { - // Create Google Compute Engine token. - tokenSource := google.ComputeTokenSource("") - if configReader != nil { - var cfg provider_gce.Config - if err := gcfg.ReadInto(&cfg, configReader); err != nil { - glog.Errorf("Couldn't read config: %v", err) - return nil, err - } - if cfg.Global.TokenURL == "" { - glog.Warning("Empty tokenUrl in cloud config") - } else { - glog.Infof("Using TokenSource from config %#v", tokenSource) - tokenSource = provider_gce.NewAltTokenSource(cfg.Global.TokenURL, cfg.Global.TokenBody) - } - } else { - glog.Infof("Using default TokenSource %#v", tokenSource) - } - - // Create Google Compute Engine service. - client := oauth2.NewClient(oauth2.NoContext, tokenSource) - gceService, err := gce.New(client) - if err != nil { - return nil, err - } - - migInfos := make([]*migInformation, 0, len(migs)) - for _, mig := range migs { - migInfos = append(migInfos, &migInformation{ - config: mig, - }) - } - - manager := &GceManager{ - migs: migInfos, - service: gceService, - migCache: map[config.InstanceConfig]*config.MigConfig{}, - } - - go wait.Forever(func() { manager.regenerateCacheIgnoreError() }, time.Hour) - - return manager, nil -} - -// GetMigSize gets MIG size. -func (m *GceManager) GetMigSize(migConf *config.MigConfig) (int64, error) { - mig, err := m.service.InstanceGroupManagers.Get(migConf.Project, migConf.Zone, migConf.Name).Do() - if err != nil { - return -1, err - } - return mig.TargetSize, nil -} - -// SetMigSize sets MIG size. -func (m *GceManager) SetMigSize(migConf *config.MigConfig, size int64) error { - op, err := m.service.InstanceGroupManagers.Resize(migConf.Project, migConf.Zone, migConf.Name, size).Do() - if err != nil { - return err - } - if err := m.waitForOp(op, migConf.Project, migConf.Zone); err != nil { - return err - } - return nil -} - -func (m *GceManager) waitForOp(operation *gce.Operation, project string, zone string) error { - for start := time.Now(); time.Since(start) < operationWaitTimeout; time.Sleep(operationPollInterval) { - glog.V(4).Infof("Waiting for operation %s %s %s", project, zone, operation.Name) - if op, err := m.service.ZoneOperations.Get(project, zone, operation.Name).Do(); err == nil { - glog.V(4).Infof("Operation %s %s %s status: %s", project, zone, operation.Name, op.Status) - if op.Status == "DONE" { - return nil - } - } else { - glog.Warningf("Error while getting operation %s on %s: %v", operation.Name, operation.TargetLink, err) - } - } - return fmt.Errorf("Timeout while waiting for operation %s on %s to complete.", operation.Name, operation.TargetLink) -} - -// DeleteInstances deletes the given instances. All instances must be controlled by the same MIG. -func (m *GceManager) DeleteInstances(instances []*config.InstanceConfig) error { - if len(instances) == 0 { - return nil - } - commonMig, err := m.GetMigForInstance(instances[0]) - if err != nil { - return err - } - for _, instance := range instances { - mig, err := m.GetMigForInstance(instance) - if err != nil { - return err - } - if mig != commonMig { - return fmt.Errorf("Connot delete instances which don't belong to the same MIG.") - } - } - - req := gce.InstanceGroupManagersDeleteInstancesRequest{ - Instances: []string{}, - } - for _, instance := range instances { - req.Instances = append(req.Instances, gceurl.GenerateInstanceUrl(instance.Project, instance.Zone, instance.Name)) - } - - op, err := m.service.InstanceGroupManagers.DeleteInstances(commonMig.Project, commonMig.Zone, commonMig.Name, &req).Do() - if err != nil { - return err - } - if err := m.waitForOp(op, commonMig.Project, commonMig.Zone); err != nil { - return err - } - return nil -} - -// GetMigForInstance returns MigConfig of the given Instance -func (m *GceManager) GetMigForInstance(instance *config.InstanceConfig) (*config.MigConfig, error) { - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() - if mig, found := m.migCache[*instance]; found { - return mig, nil - } - - for _, mig := range m.migs { - if mig.config.Project == instance.Project && - mig.config.Zone == instance.Zone && - strings.HasPrefix(instance.Name, mig.basename) { - if err := m.regenerateCache(); err != nil { - return nil, fmt.Errorf("Error while looking for MIG for instance %+v, error: %v", *instance, err) - } - if mig, found := m.migCache[*instance]; found { - return mig, nil - } - return nil, fmt.Errorf("Instance %+v does not belong to any configured MIG", *instance) - } - } - // Instance doesn't belong to any configured mig. - return nil, nil -} - -func (m *GceManager) regenerateCacheIgnoreError() { - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() - if err := m.regenerateCache(); err != nil { - glog.Errorf("Error while regenerating Mig cache: %v", err) - } -} - -func (m *GceManager) regenerateCache() error { - newMigCache := map[config.InstanceConfig]*config.MigConfig{} - - for _, migInfo := range m.migs { - mig := migInfo.config - glog.V(4).Infof("Regenerating MIG information for %s %s %s", mig.Project, mig.Zone, mig.Name) - - instanceGroupManager, err := m.service.InstanceGroupManagers.Get(mig.Project, mig.Zone, mig.Name).Do() - if err != nil { - return err - } - migInfo.basename = instanceGroupManager.BaseInstanceName - - instances, err := m.service.InstanceGroupManagers.ListManagedInstances(mig.Project, mig.Zone, mig.Name).Do() - if err != nil { - glog.V(4).Infof("Failed MIG info request for %s %s %s: %v", mig.Project, mig.Zone, mig.Name, err) - return err - } - for _, instance := range instances.ManagedInstances { - project, zone, name, err := gceurl.ParseInstanceUrl(instance.Instance) - if err != nil { - return err - } - newMigCache[config.InstanceConfig{Project: project, Zone: zone, Name: name}] = mig - } - } - - m.migCache = newMigCache - return nil -} diff --git a/cluster-autoscaler/utils/gce_url/gce_url.go b/cluster-autoscaler/utils/gce_url/gce_url.go deleted file mode 100644 index 32b08b35cc..0000000000 --- a/cluster-autoscaler/utils/gce_url/gce_url.go +++ /dev/null @@ -1,73 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package gceurl - -import ( - "fmt" - "strings" -) - -const ( - gceUrlSchema = "https" - gceDomainSufix = "googleapis.com/compute/v1/projects/" - gcePrefix = gceUrlSchema + "://content." + gceDomainSufix - instanceUrlTemplate = gcePrefix + "%s/zones/%s/instances/%s" - migUrlTemplate = gcePrefix + "%s/zones/%s/instanceGroups/%s" -) - -// ParseMigUrl expects url in format: -// https://content.googleapis.com/compute/v1/projects//zones//instanceGroups/ -func ParseMigUrl(url string) (project string, zone string, name string, err error) { - return parseGceUrl(url, "instanceGroups") -} - -// ParseInstanceUrl expects url in format: -// https://content.googleapis.com/compute/v1/projects//zones//instances/ -func ParseInstanceUrl(url string) (project string, zone string, name string, err error) { - return parseGceUrl(url, "instances") -} - -// GenerateInstanceUrl generates url for instance. -func GenerateInstanceUrl(project, zone, name string) string { - return fmt.Sprintf(instanceUrlTemplate, project, zone, name) -} - -// GenerateMigUrl generates url for instance. -func GenerateMigUrl(project, zone, name string) string { - return fmt.Sprintf(migUrlTemplate, project, zone, name) -} - -func parseGceUrl(url, expectedResource string) (project string, zone string, name string, err error) { - errMsg := fmt.Errorf("Wrong url: expected format https://content.googleapis.com/compute/v1/projects//zones//%s/, got %s", expectedResource, url) - if !strings.Contains(url, gceDomainSufix) { - return "", "", "", errMsg - } - if !strings.HasPrefix(url, gceUrlSchema) { - return "", "", "", errMsg - } - splitted := strings.Split(strings.Split(url, gceDomainSufix)[1], "/") - if len(splitted) != 5 || splitted[1] != "zones" { - return "", "", "", errMsg - } - if splitted[3] != expectedResource { - return "", "", "", fmt.Errorf("Wrong resource in url: expected %s, got %s", expectedResource, splitted[3]) - } - project = splitted[0] - zone = splitted[2] - name = splitted[4] - return project, zone, name, nil -} diff --git a/cluster-autoscaler/utils/gce_url/gce_url_test.go b/cluster-autoscaler/utils/gce_url/gce_url_test.go deleted file mode 100644 index 5540979a93..0000000000 --- a/cluster-autoscaler/utils/gce_url/gce_url_test.go +++ /dev/null @@ -1,43 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package gceurl - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestParseUrl(t *testing.T) { - proj, zone, name, err := parseGceUrl("https://www.googleapis.com/compute/v1/projects/mwielgus-proj/zones/us-central1-b/instanceGroups/kubernetes-minion-group", "instanceGroups") - assert.Nil(t, err) - assert.Equal(t, "mwielgus-proj", proj) - assert.Equal(t, "us-central1-b", zone) - assert.Equal(t, "kubernetes-minion-group", name) - - proj, zone, name, err = parseGceUrl("https://content.googleapis.com/compute/v1/projects/mwielgus-proj/zones/us-central1-b/instanceGroups/kubernetes-minion-group", "instanceGroups") - assert.Nil(t, err) - assert.Equal(t, "mwielgus-proj", proj) - assert.Equal(t, "us-central1-b", zone) - assert.Equal(t, "kubernetes-minion-group", name) - - proj, zone, name, err = parseGceUrl("www.onet.pl", "instanceGroups") - assert.NotNil(t, err) - - proj, zone, name, err = parseGceUrl("https://content.googleapis.com/compute/vabc/projects/mwielgus-proj/zones/us-central1-b/instanceGroups/kubernetes-minion-group", "instanceGroups") - assert.NotNil(t, err) -}