Cluster-autoscaler: check node/mig state

This commit is contained in:
Marcin Wielgus 2016-04-22 16:18:08 +02:00
parent 2467111dc5
commit cb9f8b493e
5 changed files with 84 additions and 25 deletions

View File

@ -22,23 +22,22 @@ import (
"time"
"k8s.io/contrib/cluster-autoscaler/config"
"k8s.io/contrib/cluster-autoscaler/utils/gce"
kube_client "k8s.io/kubernetes/pkg/client/unversioned"
"github.com/golang/glog"
)
var (
migConfig config.MigConfigFlag
kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default")
migConfigFlag config.MigConfigFlag
kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default")
)
func main() {
flag.Var(&migConfig, "nodes", "sets min,max size and url of a MIG to be controlled by Cluster Autoscaler. "+
flag.Var(&migConfigFlag, "nodes", "sets min,max size and url of a MIG to be controlled by Cluster Autoscaler. "+
"Can be used multiple times. Format: <min>:<max>:<migurl>")
flag.Parse()
glog.Infof("MIG: %s\n", migConfig.String())
url, err := url.Parse(*kubernetes)
if err != nil {
glog.Fatalf("Failed to parse Kuberentes url: %v", err)
@ -52,6 +51,12 @@ func main() {
unscheduledPodLister := NewUnscheduledPodLister(kubeClient)
nodeLister := NewNodeLister(kubeClient)
migConfigs := make([]*config.MigConfig, 0, len(migConfigFlag))
gceManager, err := gce.CreateGceManager(migConfigs)
if err != nil {
glog.Fatalf("Failed to create GCE Manager %v", err)
}
for {
select {
case <-time.After(time.Minute):
@ -80,7 +85,10 @@ func main() {
continue
}
// TODO: Checking if all nodes are present.
if err := CheckMigsAndNodes(nodes, gceManager); err != nil {
glog.Warningf("Cluster is not ready for autoscaling: %v", err)
continue
}
// Checks if scheduler tried to schedule the pods after thew newest node was added.
newestNode := GetNewestNode(nodes)

View File

@ -56,9 +56,15 @@ type MigConfig struct {
Name string
}
// Url builds GCE url for the MIG.
func (migconfig *MigConfig) Url() string {
return gceurl.GenerateMigUrl(migconfig.Project, migconfig.Zone, migconfig.Name)
}
// MigConfigFlag is an array of MIG configuration details. Working as a multi-value flag.
type MigConfigFlag []MigConfig
// String returns string representation of the MIG.
func (migconfigflag *MigConfigFlag) String() string {
configs := make([]string, len(*migconfigflag))
for _, migconfig := range *migconfigflag {

View File

@ -17,8 +17,12 @@ limitations under the License.
package main
import (
"fmt"
"time"
"k8s.io/contrib/cluster-autoscaler/config"
"k8s.io/contrib/cluster-autoscaler/utils/gce"
kube_api "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kube_client "k8s.io/kubernetes/pkg/client/unversioned"
@ -58,16 +62,16 @@ type ReadyNodeLister struct {
}
// List returns ready nodes.
func (readyNodeLister *ReadyNodeLister) List() ([]kube_api.Node, error) {
func (readyNodeLister *ReadyNodeLister) List() ([]*kube_api.Node, error) {
nodes, err := readyNodeLister.nodeLister.List()
if err != nil {
return []kube_api.Node{}, err
return []*kube_api.Node{}, err
}
readyNodes := make([]kube_api.Node, 0, len(nodes.Items))
for _, node := range nodes.Items {
readyNodes := make([]*kube_api.Node, 0, len(nodes.Items))
for i, node := range nodes.Items {
for _, condition := range node.Status.Conditions {
if condition.Type == kube_api.NodeReady && condition.Status == kube_api.ConditionTrue {
readyNodes = append(readyNodes, node)
readyNodes = append(readyNodes, &nodes.Items[i])
break
}
}
@ -87,11 +91,11 @@ func NewNodeLister(kubeClient *kube_client.Client) *ReadyNodeLister {
}
// GetNewestNode returns the newest node from the given list.
func GetNewestNode(nodes []kube_api.Node) *kube_api.Node {
func GetNewestNode(nodes []*kube_api.Node) *kube_api.Node {
var result *kube_api.Node
for i, node := range nodes {
for _, node := range nodes {
if result == nil || node.CreationTimestamp.After(result.CreationTimestamp.Time) {
result = &(nodes[i])
result = node
}
}
return result
@ -105,3 +109,35 @@ func GetOldestFailedSchedulingTrail(pods []*kube_api.Pod) *time.Time {
now := time.Now()
return &now
}
// CheckMigsAndNodes checks if all migs have all required nodes.
func CheckMigsAndNodes(nodes []*kube_api.Node, gceManager *gce.GceManager) error {
migCount := make(map[string]int)
migs := make(map[string]*config.MigConfig)
for _, node := range nodes {
instanceConfig, err := config.InstanceConfigFromProviderId(node.Spec.ProviderID)
if err != nil {
return err
}
migConfig, err := gceManager.GetMigForInstance(instanceConfig)
if err != nil {
return err
}
url := migConfig.Url()
count, _ := migCount[url]
migCount[url] = count + 1
migs[url] = migConfig
}
for url, mig := range migs {
size, err := gceManager.GetMigSize(mig)
if err != nil {
return err
}
count := migCount[url]
if size != int64(count) {
return fmt.Errorf("wrong number of nodes for mig: %s expected: %d actual: %d", url, size, count)
}
}
return nil
}

View File

@ -36,7 +36,8 @@ const (
operationPollInterval = 100 * time.Millisecond
)
type gceManager struct {
// GceManager is handles gce communication and data caching.
type GceManager struct {
migs []*config.MigConfig
service *gce.Service
migCache map[config.InstanceConfig]*config.MigConfig
@ -44,7 +45,7 @@ type gceManager struct {
}
// CreateGceManager constructs gceManager object.
func CreateGceManager(migs []*config.MigConfig) (*gceManager, error) {
func CreateGceManager(migs []*config.MigConfig) (*GceManager, error) {
// Create Google Compute Engine service.
client := oauth2.NewClient(oauth2.NoContext, google.ComputeTokenSource(""))
gceService, err := gce.New(client)
@ -52,7 +53,7 @@ func CreateGceManager(migs []*config.MigConfig) (*gceManager, error) {
return nil, err
}
manager := &gceManager{
manager := &GceManager{
migs: migs,
service: gceService,
migCache: map[config.InstanceConfig]*config.MigConfig{},
@ -63,7 +64,8 @@ func CreateGceManager(migs []*config.MigConfig) (*gceManager, error) {
return manager, nil
}
func (m *gceManager) GetMigSize(migConf *config.MigConfig) (int64, error) {
// GetMigSize gets MIG size.
func (m *GceManager) GetMigSize(migConf *config.MigConfig) (int64, error) {
mig, err := m.service.InstanceGroupManagers.Get(migConf.Project, migConf.Zone, migConf.Name).Do()
if err != nil {
return -1, err
@ -71,7 +73,8 @@ func (m *gceManager) GetMigSize(migConf *config.MigConfig) (int64, error) {
return mig.TargetSize, nil
}
func (m *gceManager) SetMigSize(migConf *config.MigConfig, size int64) error {
// SetMigSize sets MIG size.
func (m *GceManager) SetMigSize(migConf *config.MigConfig, size int64) error {
op, err := m.service.InstanceGroupManagers.Resize(migConf.Project, migConf.Zone, migConf.Name, size).Do()
if err != nil {
return err
@ -82,7 +85,7 @@ func (m *gceManager) SetMigSize(migConf *config.MigConfig, size int64) error {
return nil
}
func (m *gceManager) waitForOp(operation *gce.Operation, project string) error {
func (m *GceManager) waitForOp(operation *gce.Operation, project string) error {
for start := time.Now(); time.Since(start) < operationWaitTimeout; time.Sleep(operationPollInterval) {
if op, err := m.service.ZoneOperations.Get(project, operation.Zone, operation.Name).Do(); err == nil {
if op.Status == "DONE" {
@ -95,8 +98,8 @@ func (m *gceManager) waitForOp(operation *gce.Operation, project string) error {
return fmt.Errorf("Timeout while waiting for operation %s on %s to complete.", operation.Name, operation.TargetLink)
}
// All instances must be controlled by the same MIG.
func (m *gceManager) DeleteInstances(instances []*config.InstanceConfig) error {
// DeleteInstances deletes the given instances. All instances must be controlled by the same MIG.
func (m *GceManager) DeleteInstances(instances []*config.InstanceConfig) error {
if len(instances) == 0 {
return nil
}
@ -131,7 +134,8 @@ func (m *gceManager) DeleteInstances(instances []*config.InstanceConfig) error {
return nil
}
func (m *gceManager) GetMigForInstance(instance *config.InstanceConfig) (*config.MigConfig, error) {
// GetMigForInstance returns MigConfig of the given Instance
func (m *GceManager) GetMigForInstance(instance *config.InstanceConfig) (*config.MigConfig, error) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
if mig, found := m.migCache[*instance]; found {
@ -146,7 +150,7 @@ func (m *gceManager) GetMigForInstance(instance *config.InstanceConfig) (*config
return nil, fmt.Errorf("Instance %+v does not belong to any known MIG", *instance)
}
func (m *gceManager) regenerateCacheIgnoreError() {
func (m *GceManager) regenerateCacheIgnoreError() {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
if err := m.regenerateCache(); err != nil {
@ -154,7 +158,7 @@ func (m *gceManager) regenerateCacheIgnoreError() {
}
}
func (m *gceManager) regenerateCache() error {
func (m *GceManager) regenerateCache() error {
newMigCache := map[config.InstanceConfig]*config.MigConfig{}
for _, mig := range m.migs {

View File

@ -44,6 +44,11 @@ func GenerateInstanceUrl(project, zone, name string) string {
return fmt.Sprintf(instanceUrlTemplate, project, zone, name)
}
// GenerateMigUrl generates url for instance.
func GenerateMigUrl(project, zone, name string) string {
return fmt.Sprintf(migUrlTemplate, project, zone, name)
}
func parseGceUrl(url, expectedResource string) (project string, zone string, name string, err error) {
errMsg := fmt.Errorf("Wrong url: expected format https://content.googleapis.com/compute/v1/projects/<project-id>/zones/<zone>/%s/<name>, got %s", expectedResource, url)
if !strings.HasPrefix(url, gcePrefix) {