Cluster-autoscaler: use cloud provider interface in the code

This commit is contained in:
Marcin Wielgus 2016-07-08 13:23:10 +02:00
parent 08fa35c5e8
commit 50f57321ff
11 changed files with 131 additions and 665 deletions

View File

@ -50,10 +50,6 @@ type NodeGroup interface {
// removed nodes are deleted completely)
TargetSize() (int, error)
// GetSampleNode returns a sample node that belongs to this node group, or error
// if occurred.
SampleNode() (*kube_api.Node, error)
// IncreaseSize increases the size of the node group. To delete a node you need
// to explicitly name it and use DeleteNode. This function should wait until
// node group size is updated.

View File

@ -25,22 +25,17 @@ import (
kube_api "k8s.io/kubernetes/pkg/api"
)
// NodeProvider is a function that provides a list of nodes.
type NodeProvider func() ([]*kube_api.Node, error)
// GceCloudProvider implements CloudProvider interface.
type GceCloudProvider struct {
gceManager *GceManager
migs []*Mig
nodeProvider NodeProvider
gceManager *GceManager
migs []*Mig
}
// BuildGceCloudProvider builds CloudProvider implementation for GCE.
func BuildGceCloudProvider(gceManager *GceManager, nodeProvider NodeProvider, specs []string) (*GceCloudProvider, error) {
func BuildGceCloudProvider(gceManager *GceManager, specs []string) (*GceCloudProvider, error) {
gce := &GceCloudProvider{
gceManager: gceManager,
migs: make([]*Mig, 0),
nodeProvider: nodeProvider,
gceManager: gceManager,
migs: make([]*Mig, 0),
}
for _, spec := range specs {
if err := gce.addNodeGroup(spec); err != nil {
@ -57,20 +52,6 @@ func (gce *GceCloudProvider) addNodeGroup(spec string) error {
if err != nil {
return err
}
nodes, err := gce.nodeProvider()
if err != nil {
return err
}
// TODO: revisit how sample nodes are chosen.
for _, node := range nodes {
if belongs, err := mig.Belongs(node); err == nil && belongs {
mig.sampleNode = node
break
}
}
if mig.sampleNode == nil {
return fmt.Errorf("no sample node found for %s", mig.Id())
}
gce.migs = append(gce.migs, mig)
gce.gceManager.RegisterMig(mig)
return nil
@ -129,9 +110,8 @@ type Mig struct {
gceManager *GceManager
minSize int
maxSize int
sampleNode *kube_api.Node
minSize int
maxSize int
}
// MaxSize returns maximum size of the node group.
@ -151,15 +131,6 @@ func (mig *Mig) TargetSize() (int, error) {
return int(size), err
}
// SampleNode returns a sample node for the mig. Assumes that mig definition doesn't change over time.
// The node may not exist anymore.
func (mig *Mig) SampleNode() (*kube_api.Node, error) {
if mig.sampleNode != nil {
return mig.sampleNode, nil
}
return nil, fmt.Errorf("no sample node available")
}
// IncreaseSize increases Mig size
func (mig *Mig) IncreaseSize(delta int) error {
if delta <= 0 {

View File

@ -21,11 +21,13 @@ import (
"net/http"
"net/url"
"os"
"strings"
"time"
"k8s.io/contrib/cluster-autoscaler/cloudprovider"
"k8s.io/contrib/cluster-autoscaler/cloudprovider/gce"
"k8s.io/contrib/cluster-autoscaler/config"
"k8s.io/contrib/cluster-autoscaler/simulator"
"k8s.io/contrib/cluster-autoscaler/utils/gce"
kube_api "k8s.io/kubernetes/pkg/api"
kube_record "k8s.io/kubernetes/pkg/client/record"
kube_client "k8s.io/kubernetes/pkg/client/unversioned"
@ -34,8 +36,22 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
// MultiStringFlag is a flag for passing multiple parameters using same flag
type MultiStringFlag []string
// String returns string representation of the node groups.
func (flag *MultiStringFlag) String() string {
return "[" + strings.Join(*flag, " ") + "]"
}
// Set adds a new configuration.
func (flag *MultiStringFlag) Set(value string) error {
*flag = append(*flag, value)
return nil
}
var (
migConfigFlag config.MigConfigFlag
nodeGroupsFlag MultiStringFlag
address = flag.String("address", ":8085", "The address to expose prometheus metrics.")
kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default")
cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
@ -52,13 +68,15 @@ var (
scaleDownTrialInterval = flag.Duration("scale-down-trial-interval", 1*time.Minute,
"How often scale down possiblity is check")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce")
)
func main() {
glog.Infof("Cluster Autoscaler %s", ClusterAutoscalerVersion)
flag.Var(&migConfigFlag, "nodes", "sets min,max size and url of a MIG to be controlled by Cluster Autoscaler. "+
"Can be used multiple times. Format: <min>:<max>:<migurl>")
flag.Var(&nodeGroupsFlag, "nodes", "sets min,max size and other configuration data for a node group in a format accepted by cloud provider."+
"Can be used multiple times. Format: <min>:<max>:<other...>")
flag.Parse()
go func() {
@ -77,27 +95,6 @@ func main() {
if err != nil {
glog.Fatalf("Failed to build Kuberentes client configuration: %v", err)
}
migConfigs := make([]*config.MigConfig, 0, len(migConfigFlag))
for i := range migConfigFlag {
migConfigs = append(migConfigs, &migConfigFlag[i])
}
// GCE Manager
var gceManager *gce.GceManager
var gceError error
if *cloudConfig != "" {
config, fileErr := os.Open(*cloudConfig)
if fileErr != nil {
glog.Fatalf("Couldn't open cloud provider configuration %s: %#v", *cloudConfig, err)
}
defer config.Close()
gceManager, gceError = gce.CreateGceManager(migConfigs, config)
} else {
gceManager, gceError = gce.CreateGceManager(migConfigs, nil)
}
if gceError != nil {
glog.Fatalf("Failed to create GCE Manager: %v", err)
}
kubeClient := kube_client.NewOrDie(kubeConfig)
@ -118,6 +115,31 @@ func main() {
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
recorder := eventBroadcaster.NewRecorder(kube_api.EventSource{Component: "cluster-autoscaler"})
var cloudProvider cloudprovider.CloudProvider
if *cloudProviderFlag == "gce" {
// GCE Manager
var gceManager *gce.GceManager
var gceError error
if *cloudConfig != "" {
config, fileErr := os.Open(*cloudConfig)
if fileErr != nil {
glog.Fatalf("Couldn't open cloud provider configuration %s: %#v", *cloudConfig, err)
}
defer config.Close()
gceManager, gceError = gce.CreateGceManager(config)
} else {
gceManager, gceError = gce.CreateGceManager(nil)
}
if gceError != nil {
glog.Fatalf("Failed to create GCE Manager: %v", err)
}
cloudProvider, err = gce.BuildGceCloudProvider(gceManager, nodeGroupsFlag)
if err != nil {
glog.Fatalf("Failed to create GCE cloud provider: %v", err)
}
}
for {
select {
case <-time.After(*scanInterval):
@ -135,7 +157,7 @@ func main() {
continue
}
if err := CheckMigsAndNodes(nodes, gceManager); err != nil {
if err := CheckGroupsAndNodes(nodes, cloudProvider); err != nil {
glog.Warningf("Cluster is not ready for autoscaling: %v", err)
continue
}
@ -188,7 +210,7 @@ func main() {
} else {
scaleUpStart := time.Now()
updateLastTime("scaleup")
scaledUp, err := ScaleUp(unschedulablePodsToHelp, nodes, migConfigs, gceManager, kubeClient, predicateChecker, recorder)
scaledUp, err := ScaleUp(unschedulablePodsToHelp, nodes, cloudProvider, kubeClient, predicateChecker, recorder)
updateDuration("scaleup", scaleUpStart)
@ -245,7 +267,7 @@ func main() {
unneededNodes,
*scaleDownUnneededTime,
allScheduled,
gceManager, kubeClient, predicateChecker)
cloudProvider, kubeClient, predicateChecker)
updateDuration("scaledown", scaleDownStart)

View File

@ -1,115 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"fmt"
"strconv"
"strings"
gceurl "k8s.io/contrib/cluster-autoscaler/utils/gce_url"
kube_api "k8s.io/kubernetes/pkg/api"
)
// InstanceConfig contains instance configuration details.
type InstanceConfig struct {
Project string
Zone string
Name string
}
// InstanceConfigFromProviderId creates InstanceConfig object
// from provider id which must be in format:
// gce://<project-id>/<zone>/<name>
// TODO(piosz): add better check whether the id is correct
func InstanceConfigFromProviderId(id string) (*InstanceConfig, error) {
splitted := strings.Split(id[6:], "/")
if len(splitted) != 3 {
return nil, fmt.Errorf("Wrong id: expected format gce://<project-id>/<zone>/<name>, got %v", id)
}
return &InstanceConfig{
Project: splitted[0],
Zone: splitted[1],
Name: splitted[2],
}, nil
}
// MigConfig contains managed instance group configuration details.
type MigConfig struct {
MinSize int
MaxSize int
Project string
Zone string
Name string
}
// Url builds GCE url for the MIG.
func (migconfig *MigConfig) Url() string {
return gceurl.GenerateMigUrl(migconfig.Project, migconfig.Zone, migconfig.Name)
}
// Node returns a template/dummy node for the mig.
func (migconfig *MigConfig) Node() *kube_api.Node {
//TODO(fgrzadkowski): Implement this.
return nil
}
// MigConfigFlag is an array of MIG configuration details. Working as a multi-value flag.
type MigConfigFlag []MigConfig
// String returns string representation of the MIG.
func (migconfigflag *MigConfigFlag) String() string {
configs := make([]string, len(*migconfigflag))
for _, migconfig := range *migconfigflag {
configs = append(configs, fmt.Sprintf("%d:%d:%s:%s", migconfig.MinSize, migconfig.MaxSize, migconfig.Zone, migconfig.Name))
}
return "[" + strings.Join(configs, " ") + "]"
}
// Set adds a new configuration.
func (migconfigflag *MigConfigFlag) Set(value string) error {
tokens := strings.SplitN(value, ":", 3)
if len(tokens) != 3 {
return fmt.Errorf("wrong nodes configuration: %s", value)
}
migconfig := MigConfig{}
if size, err := strconv.Atoi(tokens[0]); err == nil {
if size <= 0 {
return fmt.Errorf("min size must be >= 1")
}
migconfig.MinSize = size
} else {
return fmt.Errorf("failed to set min size: %s, expected integer", tokens[0])
}
if size, err := strconv.Atoi(tokens[1]); err == nil {
if size < migconfig.MinSize {
return fmt.Errorf("max size must be greater or equal to min size")
}
migconfig.MaxSize = size
} else {
return fmt.Errorf("failed to set max size: %s, expected integer", tokens[1])
}
var err error
if migconfig.Project, migconfig.Zone, migconfig.Name, err = gceurl.ParseMigUrl(tokens[2]); err != nil {
return fmt.Errorf("failed to parse mig url: %s got error: %v", tokens[2], err)
}
*migconfigflag = append(*migconfigflag, migconfig)
return nil
}

View File

@ -1,40 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSet(t *testing.T) {
migConfigFlag := MigConfigFlag{}
assert.Error(t, migConfigFlag.Set("a"))
assert.Error(t, migConfigFlag.Set("a:b:c"))
assert.Error(t, migConfigFlag.Set("1:2:x"))
assert.Error(t, migConfigFlag.Set("1:2:"))
assert.NoError(t, migConfigFlag.Set("111:222:https://content.googleapis.com/compute/v1/projects/test-project/zones/test-zone/instanceGroups/test-name"))
assert.Equal(t, 111, migConfigFlag[0].MinSize)
assert.Equal(t, 222, migConfigFlag[0].MaxSize)
assert.Equal(t, "test-zone", migConfigFlag[0].Zone)
assert.Equal(t, "test-name", migConfigFlag[0].Name)
assert.Contains(t, migConfigFlag.String(), "111")
assert.Contains(t, migConfigFlag.String(), "222")
assert.Contains(t, migConfigFlag.String(), "test-zone")
assert.Contains(t, migConfigFlag.String(), "test-name")
}

View File

@ -20,9 +20,8 @@ import (
"fmt"
"time"
"k8s.io/contrib/cluster-autoscaler/config"
"k8s.io/contrib/cluster-autoscaler/cloudprovider"
"k8s.io/contrib/cluster-autoscaler/simulator"
"k8s.io/contrib/cluster-autoscaler/utils/gce"
kube_api "k8s.io/kubernetes/pkg/api"
kube_client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -106,7 +105,7 @@ func ScaleDown(
unneededNodes map[string]time.Time,
unneededTime time.Duration,
pods []*kube_api.Pod,
gceManager *gce.GceManager,
cloudProvider cloudprovider.CloudProvider,
client *kube_client.Client,
predicateChecker *simulator.PredicateChecker) (ScaleDownResult, error) {
@ -122,30 +121,24 @@ func ScaleDown(
continue
}
// Check mig size.
instance, err := config.InstanceConfigFromProviderId(node.Spec.ProviderID)
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
if err != nil {
glog.Errorf("Error while parsing providerid of %s: %v", node.Name, err)
glog.Errorf("Error while checking node group for %s: %v", node.Name, err)
continue
}
migConfig, err := gceManager.GetMigForInstance(instance)
if err != nil {
glog.Errorf("Error while checking mig config for instance %v: %v", instance, err)
continue
}
if migConfig == nil {
glog.V(4).Infof("Skipping %s - no mig config", node.Name)
if nodeGroup == nil {
glog.V(4).Infof("Skipping %s - no node group config", node.Name)
continue
}
size, err := gceManager.GetMigSize(migConfig)
size, err := nodeGroup.TargetSize()
if err != nil {
glog.Errorf("Error while checking mig size for instance %v: %v", instance, err)
glog.Errorf("Error while checking node group size %s: %v", nodeGroup.Id(), err)
continue
}
if size <= int64(migConfig.MinSize) {
glog.V(1).Infof("Skipping %s - mig min size reached", node.Name)
if size <= nodeGroup.MinSize() {
glog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
continue
}
@ -167,14 +160,18 @@ func ScaleDown(
}
nodeToRemove := nodesToRemove[0]
glog.Infof("Removing %s", nodeToRemove.Name)
instanceConfig, err := config.InstanceConfigFromProviderId(nodeToRemove.Spec.ProviderID)
nodeGroup, err := cloudProvider.NodeGroupForNode(nodeToRemove)
if err != nil {
return ScaleDownError, fmt.Errorf("Failed to get instance config for %s: %v", nodeToRemove.Name, err)
return ScaleDownError, fmt.Errorf("failed to node group for %s: %v", nodeToRemove.Name, err)
}
if nodeGroup == nil {
return ScaleDownError, fmt.Errorf("picked node that doesn't belong to a node group: %s", nodeToRemove.Name)
}
err = gceManager.DeleteInstances([]*config.InstanceConfig{instanceConfig})
err = nodeGroup.DeleteNodes([]*kube_api.Node{nodeToRemove})
if err != nil {
return ScaleDownError, fmt.Errorf("Failed to delete %v: %v", instanceConfig, err)
return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", nodeToRemove.Name, err)
}
return ScaleDownNodeDeleted, nil

View File

@ -19,10 +19,9 @@ package main
import (
"fmt"
"k8s.io/contrib/cluster-autoscaler/config"
"k8s.io/contrib/cluster-autoscaler/cloudprovider"
"k8s.io/contrib/cluster-autoscaler/estimator"
"k8s.io/contrib/cluster-autoscaler/simulator"
"k8s.io/contrib/cluster-autoscaler/utils/gce"
kube_api "k8s.io/kubernetes/pkg/api"
kube_record "k8s.io/kubernetes/pkg/client/record"
kube_client "k8s.io/kubernetes/pkg/client/unversioned"
@ -32,14 +31,13 @@ import (
// ExpansionOption describes an option to expand the cluster.
type ExpansionOption struct {
migConfig *config.MigConfig
nodeGroup cloudprovider.NodeGroup
estimator *estimator.BasicNodeEstimator
}
// ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size,
// false if it didn't and error if an error occured.
func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, migConfigs []*config.MigConfig,
gceManager *gce.GceManager, kubeClient *kube_client.Client,
func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, cloudProvider cloudprovider.CloudProvider, kubeClient *kube_client.Client,
predicateChecker *simulator.PredicateChecker, recorder kube_record.EventRecorder) (bool, error) {
// From now on we only care about unschedulable pods that were marked after the newest
@ -54,48 +52,48 @@ func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, migConfi
}
expansionOptions := make([]ExpansionOption, 0)
nodeInfos, err := GetNodeInfosForMigs(nodes, gceManager, kubeClient)
nodeInfos, err := GetNodeInfosForGroups(nodes, cloudProvider, kubeClient)
if err != nil {
return false, fmt.Errorf("failed to build node infors for migs: %v", err)
return false, fmt.Errorf("failed to build node infos for node groups: %v", err)
}
podsRemainUnshedulable := make(map[*kube_api.Pod]struct{})
for _, migConfig := range migConfigs {
for _, nodeGroup := range cloudProvider.NodeGroups() {
currentSize, err := gceManager.GetMigSize(migConfig)
currentSize, err := nodeGroup.TargetSize()
if err != nil {
glog.Errorf("Failed to get MIG size: %v", err)
glog.Errorf("Failed to get node group size: %v", err)
continue
}
if currentSize >= int64(migConfig.MaxSize) {
// skip this mig.
glog.V(4).Infof("Skipping MIG %s - max size reached", migConfig.Url())
if currentSize >= nodeGroup.MaxSize() {
// skip this node group.
glog.V(4).Infof("Skipping node group %s - max size reached", nodeGroup.Id())
continue
}
option := ExpansionOption{
migConfig: migConfig,
nodeGroup: nodeGroup,
estimator: estimator.NewBasicNodeEstimator(),
}
migHelpsSomePods := false
groupHelpsSomePods := false
nodeInfo, found := nodeInfos[migConfig.Url()]
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
glog.Errorf("No node info for: %s", migConfig.Url())
glog.Errorf("No node info for: %s", nodeGroup.Id())
continue
}
for _, pod := range unschedulablePods {
err = predicateChecker.CheckPredicates(pod, nodeInfo)
if err == nil {
migHelpsSomePods = true
groupHelpsSomePods = true
option.estimator.Add(pod)
} else {
glog.V(2).Infof("Scale-up predicate failed: %v", err)
podsRemainUnshedulable[pod] = struct{}{}
}
}
if migHelpsSomePods {
if groupHelpsSomePods {
expansionOptions = append(expansionOptions, option)
}
}
@ -103,36 +101,36 @@ func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, migConfi
// Pick some expansion option.
bestOption := BestExpansionOption(expansionOptions)
if bestOption != nil && bestOption.estimator.GetCount() > 0 {
glog.V(1).Infof("Best option to resize: %s", bestOption.migConfig.Url())
nodeInfo, found := nodeInfos[bestOption.migConfig.Url()]
glog.V(1).Infof("Best option to resize: %s", bestOption.nodeGroup.Id())
nodeInfo, found := nodeInfos[bestOption.nodeGroup.Id()]
if !found {
return false, fmt.Errorf("no sample node for: %s", bestOption.migConfig.Url())
return false, fmt.Errorf("no sample node for: %s", bestOption.nodeGroup.Id())
}
node := nodeInfo.Node()
estimate, report := bestOption.estimator.Estimate(node)
glog.V(1).Info(bestOption.estimator.GetDebug())
glog.V(1).Info(report)
glog.V(1).Infof("Estimated %d nodes needed in %s", estimate, bestOption.migConfig.Url())
glog.V(1).Infof("Estimated %d nodes needed in %s", estimate, bestOption.nodeGroup.Id())
currentSize, err := gceManager.GetMigSize(bestOption.migConfig)
currentSize, err := bestOption.nodeGroup.TargetSize()
if err != nil {
return false, fmt.Errorf("failed to get MIG size: %v", err)
return false, fmt.Errorf("failed to get node group size: %v", err)
}
newSize := currentSize + int64(estimate)
if newSize >= int64(bestOption.migConfig.MaxSize) {
glog.V(1).Infof("Capping size to MAX (%d)", bestOption.migConfig.MaxSize)
newSize = int64(bestOption.migConfig.MaxSize)
newSize := currentSize + estimate
if newSize >= bestOption.nodeGroup.MaxSize() {
glog.V(1).Infof("Capping size to MAX (%d)", bestOption.nodeGroup.MaxSize())
newSize = bestOption.nodeGroup.MaxSize()
}
glog.V(1).Infof("Setting %s size to %d", bestOption.migConfig.Url(), newSize)
glog.V(1).Infof("Setting %s size to %d", bestOption.nodeGroup.Id(), newSize)
if err := gceManager.SetMigSize(bestOption.migConfig, newSize); err != nil {
return false, fmt.Errorf("failed to set MIG size: %v", err)
if err := bestOption.nodeGroup.IncreaseSize(newSize - currentSize); err != nil {
return false, fmt.Errorf("failed to increase node group size: %v", err)
}
for pod := range bestOption.estimator.FittingPods {
recorder.Eventf(pod, kube_api.EventTypeNormal, "TriggeredScaleUp",
"pod triggered scale-up, mig: %s, sizes (current/new): %d/%d", bestOption.migConfig.Name, currentSize, newSize)
"pod triggered scale-up, group: %s, sizes (current/new): %d/%d", bestOption.nodeGroup.Id(), currentSize, newSize)
}
return true, nil

View File

@ -20,9 +20,8 @@ import (
"fmt"
"time"
"k8s.io/contrib/cluster-autoscaler/config"
"k8s.io/contrib/cluster-autoscaler/cloudprovider"
"k8s.io/contrib/cluster-autoscaler/simulator"
"k8s.io/contrib/cluster-autoscaler/utils/gce"
kube_api "k8s.io/kubernetes/pkg/api"
kube_api_unversioned "k8s.io/kubernetes/pkg/api/unversioned"
@ -227,66 +226,56 @@ func createNodeNameToInfoMap(pods []*kube_api.Pod, nodes []*kube_api.Node) map[s
return nodeNameToNodeInfo
}
// CheckMigsAndNodes checks if all migs have all required nodes.
func CheckMigsAndNodes(nodes []*kube_api.Node, gceManager *gce.GceManager) error {
migCount := make(map[string]int)
migs := make(map[string]*config.MigConfig)
// CheckGroupsAndNodes checks if all node groups have all required nodes.
func CheckGroupsAndNodes(nodes []*kube_api.Node, cloudProvider cloudprovider.CloudProvider) error {
groupCount := make(map[string]int)
for _, node := range nodes {
instanceConfig, err := config.InstanceConfigFromProviderId(node.Spec.ProviderID)
if err != nil {
return err
}
migConfig, err := gceManager.GetMigForInstance(instanceConfig)
group, err := cloudProvider.NodeGroupForNode(node)
if err != nil {
return err
}
if migConfig == nil {
if group == nil {
continue
}
url := migConfig.Url()
count, _ := migCount[url]
migCount[url] = count + 1
migs[url] = migConfig
id := group.Id()
count, _ := groupCount[id]
groupCount[id] = count + 1
}
for url, mig := range migs {
size, err := gceManager.GetMigSize(mig)
for _, nodeGroup := range cloudProvider.NodeGroups() {
size, err := nodeGroup.TargetSize()
if err != nil {
return err
}
count := migCount[url]
if size != int64(count) {
return fmt.Errorf("wrong number of nodes for mig: %s expected: %d actual: %d", url, size, count)
count := groupCount[nodeGroup.Id()]
if size != count {
return fmt.Errorf("wrong number of nodes for node group: %s expected: %d actual: %d", nodeGroup.Id(), size, count)
}
}
return nil
}
// GetNodeInfosForMigs finds NodeInfos for all migs used to manage the given nodes. It also returns a mig to sample node mapping.
// GetNodeInfosForGroups finds NodeInfos for all node groups used to manage the given nodes. It also returns a node group to sample node mapping.
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
func GetNodeInfosForMigs(nodes []*kube_api.Node, gceManager *gce.GceManager, kubeClient *kube_client.Client) (map[string]*schedulercache.NodeInfo, error) {
func GetNodeInfosForGroups(nodes []*kube_api.Node, cloudProvider cloudprovider.CloudProvider, kubeClient *kube_client.Client) (map[string]*schedulercache.NodeInfo, error) {
result := make(map[string]*schedulercache.NodeInfo)
for _, node := range nodes {
instanceConfig, err := config.InstanceConfigFromProviderId(node.Spec.ProviderID)
if err != nil {
return map[string]*schedulercache.NodeInfo{}, err
}
migConfig, err := gceManager.GetMigForInstance(instanceConfig)
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
if err != nil {
return map[string]*schedulercache.NodeInfo{}, err
}
if migConfig == nil {
if nodeGroup == nil {
continue
}
url := migConfig.Url()
nodeInfo, err := simulator.BuildNodeInfoForNode(node, kubeClient)
if err != nil {
return map[string]*schedulercache.NodeInfo{}, err
id := nodeGroup.Id()
if _, found := result[id]; !found {
nodeInfo, err := simulator.BuildNodeInfoForNode(node, kubeClient)
if err != nil {
return map[string]*schedulercache.NodeInfo{}, err
}
result[id] = nodeInfo
}
result[url] = nodeInfo
}
return result, nil
}

View File

@ -1,236 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"fmt"
"io"
"strings"
"sync"
"time"
"gopkg.in/gcfg.v1"
"k8s.io/contrib/cluster-autoscaler/config"
gceurl "k8s.io/contrib/cluster-autoscaler/utils/gce_url"
"github.com/golang/glog"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
gce "google.golang.org/api/compute/v1"
provider_gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/util/wait"
)
const (
operationWaitTimeout = 5 * time.Second
operationPollInterval = 100 * time.Millisecond
)
type migInformation struct {
config *config.MigConfig
basename string
}
// GceManager is handles gce communication and data caching.
type GceManager struct {
migs []*migInformation
service *gce.Service
migCache map[config.InstanceConfig]*config.MigConfig
cacheMutex sync.Mutex
}
// CreateGceManager constructs gceManager object.
func CreateGceManager(migs []*config.MigConfig, configReader io.Reader) (*GceManager, error) {
// Create Google Compute Engine token.
tokenSource := google.ComputeTokenSource("")
if configReader != nil {
var cfg provider_gce.Config
if err := gcfg.ReadInto(&cfg, configReader); err != nil {
glog.Errorf("Couldn't read config: %v", err)
return nil, err
}
if cfg.Global.TokenURL == "" {
glog.Warning("Empty tokenUrl in cloud config")
} else {
glog.Infof("Using TokenSource from config %#v", tokenSource)
tokenSource = provider_gce.NewAltTokenSource(cfg.Global.TokenURL, cfg.Global.TokenBody)
}
} else {
glog.Infof("Using default TokenSource %#v", tokenSource)
}
// Create Google Compute Engine service.
client := oauth2.NewClient(oauth2.NoContext, tokenSource)
gceService, err := gce.New(client)
if err != nil {
return nil, err
}
migInfos := make([]*migInformation, 0, len(migs))
for _, mig := range migs {
migInfos = append(migInfos, &migInformation{
config: mig,
})
}
manager := &GceManager{
migs: migInfos,
service: gceService,
migCache: map[config.InstanceConfig]*config.MigConfig{},
}
go wait.Forever(func() { manager.regenerateCacheIgnoreError() }, time.Hour)
return manager, nil
}
// GetMigSize gets MIG size.
func (m *GceManager) GetMigSize(migConf *config.MigConfig) (int64, error) {
mig, err := m.service.InstanceGroupManagers.Get(migConf.Project, migConf.Zone, migConf.Name).Do()
if err != nil {
return -1, err
}
return mig.TargetSize, nil
}
// SetMigSize sets MIG size.
func (m *GceManager) SetMigSize(migConf *config.MigConfig, size int64) error {
op, err := m.service.InstanceGroupManagers.Resize(migConf.Project, migConf.Zone, migConf.Name, size).Do()
if err != nil {
return err
}
if err := m.waitForOp(op, migConf.Project, migConf.Zone); err != nil {
return err
}
return nil
}
func (m *GceManager) waitForOp(operation *gce.Operation, project string, zone string) error {
for start := time.Now(); time.Since(start) < operationWaitTimeout; time.Sleep(operationPollInterval) {
glog.V(4).Infof("Waiting for operation %s %s %s", project, zone, operation.Name)
if op, err := m.service.ZoneOperations.Get(project, zone, operation.Name).Do(); err == nil {
glog.V(4).Infof("Operation %s %s %s status: %s", project, zone, operation.Name, op.Status)
if op.Status == "DONE" {
return nil
}
} else {
glog.Warningf("Error while getting operation %s on %s: %v", operation.Name, operation.TargetLink, err)
}
}
return fmt.Errorf("Timeout while waiting for operation %s on %s to complete.", operation.Name, operation.TargetLink)
}
// DeleteInstances deletes the given instances. All instances must be controlled by the same MIG.
func (m *GceManager) DeleteInstances(instances []*config.InstanceConfig) error {
if len(instances) == 0 {
return nil
}
commonMig, err := m.GetMigForInstance(instances[0])
if err != nil {
return err
}
for _, instance := range instances {
mig, err := m.GetMigForInstance(instance)
if err != nil {
return err
}
if mig != commonMig {
return fmt.Errorf("Connot delete instances which don't belong to the same MIG.")
}
}
req := gce.InstanceGroupManagersDeleteInstancesRequest{
Instances: []string{},
}
for _, instance := range instances {
req.Instances = append(req.Instances, gceurl.GenerateInstanceUrl(instance.Project, instance.Zone, instance.Name))
}
op, err := m.service.InstanceGroupManagers.DeleteInstances(commonMig.Project, commonMig.Zone, commonMig.Name, &req).Do()
if err != nil {
return err
}
if err := m.waitForOp(op, commonMig.Project, commonMig.Zone); err != nil {
return err
}
return nil
}
// GetMigForInstance returns MigConfig of the given Instance
func (m *GceManager) GetMigForInstance(instance *config.InstanceConfig) (*config.MigConfig, error) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
if mig, found := m.migCache[*instance]; found {
return mig, nil
}
for _, mig := range m.migs {
if mig.config.Project == instance.Project &&
mig.config.Zone == instance.Zone &&
strings.HasPrefix(instance.Name, mig.basename) {
if err := m.regenerateCache(); err != nil {
return nil, fmt.Errorf("Error while looking for MIG for instance %+v, error: %v", *instance, err)
}
if mig, found := m.migCache[*instance]; found {
return mig, nil
}
return nil, fmt.Errorf("Instance %+v does not belong to any configured MIG", *instance)
}
}
// Instance doesn't belong to any configured mig.
return nil, nil
}
func (m *GceManager) regenerateCacheIgnoreError() {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
if err := m.regenerateCache(); err != nil {
glog.Errorf("Error while regenerating Mig cache: %v", err)
}
}
func (m *GceManager) regenerateCache() error {
newMigCache := map[config.InstanceConfig]*config.MigConfig{}
for _, migInfo := range m.migs {
mig := migInfo.config
glog.V(4).Infof("Regenerating MIG information for %s %s %s", mig.Project, mig.Zone, mig.Name)
instanceGroupManager, err := m.service.InstanceGroupManagers.Get(mig.Project, mig.Zone, mig.Name).Do()
if err != nil {
return err
}
migInfo.basename = instanceGroupManager.BaseInstanceName
instances, err := m.service.InstanceGroupManagers.ListManagedInstances(mig.Project, mig.Zone, mig.Name).Do()
if err != nil {
glog.V(4).Infof("Failed MIG info request for %s %s %s: %v", mig.Project, mig.Zone, mig.Name, err)
return err
}
for _, instance := range instances.ManagedInstances {
project, zone, name, err := gceurl.ParseInstanceUrl(instance.Instance)
if err != nil {
return err
}
newMigCache[config.InstanceConfig{Project: project, Zone: zone, Name: name}] = mig
}
}
m.migCache = newMigCache
return nil
}

View File

@ -1,73 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gceurl
import (
"fmt"
"strings"
)
const (
gceUrlSchema = "https"
gceDomainSufix = "googleapis.com/compute/v1/projects/"
gcePrefix = gceUrlSchema + "://content." + gceDomainSufix
instanceUrlTemplate = gcePrefix + "%s/zones/%s/instances/%s"
migUrlTemplate = gcePrefix + "%s/zones/%s/instanceGroups/%s"
)
// ParseMigUrl expects url in format:
// https://content.googleapis.com/compute/v1/projects/<project-id>/zones/<zone>/instanceGroups/<name>
func ParseMigUrl(url string) (project string, zone string, name string, err error) {
return parseGceUrl(url, "instanceGroups")
}
// ParseInstanceUrl expects url in format:
// https://content.googleapis.com/compute/v1/projects/<project-id>/zones/<zone>/instances/<name>
func ParseInstanceUrl(url string) (project string, zone string, name string, err error) {
return parseGceUrl(url, "instances")
}
// GenerateInstanceUrl generates url for instance.
func GenerateInstanceUrl(project, zone, name string) string {
return fmt.Sprintf(instanceUrlTemplate, project, zone, name)
}
// GenerateMigUrl generates url for instance.
func GenerateMigUrl(project, zone, name string) string {
return fmt.Sprintf(migUrlTemplate, project, zone, name)
}
func parseGceUrl(url, expectedResource string) (project string, zone string, name string, err error) {
errMsg := fmt.Errorf("Wrong url: expected format https://content.googleapis.com/compute/v1/projects/<project-id>/zones/<zone>/%s/<name>, got %s", expectedResource, url)
if !strings.Contains(url, gceDomainSufix) {
return "", "", "", errMsg
}
if !strings.HasPrefix(url, gceUrlSchema) {
return "", "", "", errMsg
}
splitted := strings.Split(strings.Split(url, gceDomainSufix)[1], "/")
if len(splitted) != 5 || splitted[1] != "zones" {
return "", "", "", errMsg
}
if splitted[3] != expectedResource {
return "", "", "", fmt.Errorf("Wrong resource in url: expected %s, got %s", expectedResource, splitted[3])
}
project = splitted[0]
zone = splitted[2]
name = splitted[4]
return project, zone, name, nil
}

View File

@ -1,43 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gceurl
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParseUrl(t *testing.T) {
proj, zone, name, err := parseGceUrl("https://www.googleapis.com/compute/v1/projects/mwielgus-proj/zones/us-central1-b/instanceGroups/kubernetes-minion-group", "instanceGroups")
assert.Nil(t, err)
assert.Equal(t, "mwielgus-proj", proj)
assert.Equal(t, "us-central1-b", zone)
assert.Equal(t, "kubernetes-minion-group", name)
proj, zone, name, err = parseGceUrl("https://content.googleapis.com/compute/v1/projects/mwielgus-proj/zones/us-central1-b/instanceGroups/kubernetes-minion-group", "instanceGroups")
assert.Nil(t, err)
assert.Equal(t, "mwielgus-proj", proj)
assert.Equal(t, "us-central1-b", zone)
assert.Equal(t, "kubernetes-minion-group", name)
proj, zone, name, err = parseGceUrl("www.onet.pl", "instanceGroups")
assert.NotNil(t, err)
proj, zone, name, err = parseGceUrl("https://content.googleapis.com/compute/vabc/projects/mwielgus-proj/zones/us-central1-b/instanceGroups/kubernetes-minion-group", "instanceGroups")
assert.NotNil(t, err)
}