Tweaks to rolling-update CLI

This commit is contained in:
Justin Santa Barbara 2016-07-10 12:21:41 -04:00
parent 1d59f2aa80
commit c4f2fbfcaf
2 changed files with 113 additions and 85 deletions

View File

@ -2,19 +2,17 @@ package main
import ( import (
"fmt" "fmt"
"os"
"strconv"
"bytes"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup" "k8s.io/kops/upup/pkg/fi/cloudup"
"k8s.io/kops/upup/pkg/kutil" "k8s.io/kops/upup/pkg/kutil"
"os"
"text/tabwriter"
) )
type RollingUpdateClusterCmd struct { type RollingUpdateClusterCmd struct {
Yes bool Yes bool
Region string
cobraCommand *cobra.Command cobraCommand *cobra.Command
} }
@ -33,8 +31,6 @@ func init() {
cmd.Flags().BoolVar(&rollingupdateCluster.Yes, "yes", false, "Rollingupdate without confirmation") cmd.Flags().BoolVar(&rollingupdateCluster.Yes, "yes", false, "Rollingupdate without confirmation")
cmd.Flags().StringVar(&rollingupdateCluster.Region, "region", "", "region")
cmd.Run = func(cmd *cobra.Command, args []string) { cmd.Run = func(cmd *cobra.Command, args []string) {
err := rollingupdateCluster.Run() err := rollingupdateCluster.Run()
if err != nil { if err != nil {
@ -44,73 +40,77 @@ func init() {
} }
func (c *RollingUpdateClusterCmd) Run() error { func (c *RollingUpdateClusterCmd) Run() error {
if c.Region == "" { _, cluster, err := rootCommand.Cluster()
return fmt.Errorf("--region is required") if err != nil {
} return err
clusterName := rootCommand.ClusterName()
if clusterName == "" {
return fmt.Errorf("--name is required")
} }
tags := map[string]string{"KubernetesCluster": clusterName} instanceGroupRegistry, err := rootCommand.InstanceGroupRegistry()
cloud, err := awsup.NewAWSCloud(c.Region, tags)
if err != nil { if err != nil {
return fmt.Errorf("error initializing AWS client: %v", err) return err
}
instancegroups, err := instanceGroupRegistry.ReadAll()
if err != nil {
return err
}
cloud, err := cloudup.BuildCloud(cluster)
if err != nil {
return err
} }
d := &kutil.RollingUpdateCluster{} d := &kutil.RollingUpdateCluster{}
d.ClusterName = clusterName d.Cluster = cluster
d.Region = c.Region
d.Cloud = cloud d.Cloud = cloud
nodesets, err := d.ListNodesets() groups, err := d.ListInstanceGroups(instancegroups)
if err != nil { if err != nil {
return err return err
} }
err = c.printNodesets(nodesets) {
t := &Table{}
t.AddColumn("NAME", func(r *kutil.CloudInstanceGroup) string {
return r.InstanceGroup.Name
})
t.AddColumn("STATUS", func(r *kutil.CloudInstanceGroup) string {
return r.Status
})
t.AddColumn("NEEDUPDATE", func(r *kutil.CloudInstanceGroup) string {
return strconv.Itoa(len(r.NeedUpdate))
})
t.AddColumn("READY", func(r *kutil.CloudInstanceGroup) string {
return strconv.Itoa(len(r.Ready))
})
var l []*kutil.CloudInstanceGroup
for _, v := range groups {
l = append(l, v)
}
err := t.Render(l, os.Stdout, "NAME", "STATUS", "NEEDUPDATE", "READY")
if err != nil { if err != nil {
return err return err
} }
}
needUpdate := false
for _, group := range groups {
if len(group.NeedUpdate) != 0 {
needUpdate = true
}
}
if !needUpdate {
// TODO: Allow --force option to force even if not needed?
fmt.Printf("\nNo rolling-update required\n")
return nil
}
if !c.Yes { if !c.Yes {
return fmt.Errorf("Must specify --yes to rolling-update") return fmt.Errorf("Must specify --yes to rolling-update")
} }
return d.RollingUpdateNodesets(nodesets) return d.RollingUpdate(groups)
}
func (c *RollingUpdateClusterCmd) printNodesets(nodesets map[string]*kutil.Nodeset) error {
w := new(tabwriter.Writer)
var b bytes.Buffer
// Format in tab-separated columns with a tab stop of 8.
w.Init(os.Stdout, 0, 8, 0, '\t', tabwriter.StripEscape)
for _, n := range nodesets {
b.WriteByte(tabwriter.Escape)
b.WriteString(n.Name)
b.WriteByte(tabwriter.Escape)
b.WriteByte('\t')
b.WriteByte(tabwriter.Escape)
b.WriteString(n.Status)
b.WriteByte(tabwriter.Escape)
b.WriteByte('\t')
b.WriteByte(tabwriter.Escape)
b.WriteString(fmt.Sprintf("%d", len(n.NeedUpdate)))
b.WriteByte(tabwriter.Escape)
b.WriteByte('\t')
b.WriteByte(tabwriter.Escape)
b.WriteString(fmt.Sprintf("%d", len(n.Ready)))
b.WriteByte(tabwriter.Escape)
b.WriteByte('\n')
_, err := w.Write(b.Bytes())
if err != nil {
return fmt.Errorf("error writing to output: %v", err)
}
b.Reset()
}
return w.Flush()
} }

View File

@ -6,6 +6,7 @@ import (
"github.com/aws/aws-sdk-go/service/autoscaling" "github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kops/upup/pkg/api"
"k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup" "k8s.io/kops/upup/pkg/fi/cloudup/awsup"
"sync" "sync"
@ -14,17 +15,16 @@ import (
// RollingUpdateCluster restarts cluster nodes // RollingUpdateCluster restarts cluster nodes
type RollingUpdateCluster struct { type RollingUpdateCluster struct {
ClusterName string Cluster *api.Cluster
Region string
Cloud fi.Cloud Cloud fi.Cloud
} }
func (c *RollingUpdateCluster) ListNodesets() (map[string]*Nodeset, error) { func (c *RollingUpdateCluster) ListInstanceGroups(instancegroups []*api.InstanceGroup) (map[string]*CloudInstanceGroup, error) {
cloud := c.Cloud.(*awsup.AWSCloud) cloud := c.Cloud.(*awsup.AWSCloud)
nodesets := make(map[string]*Nodeset) groups := make(map[string]*CloudInstanceGroup)
tags := cloud.BuildTags(nil) tags := cloud.Tags()
asgs, err := findAutoscalingGroups(cloud, tags) asgs, err := findAutoscalingGroups(cloud, tags)
if err != nil { if err != nil {
@ -32,15 +32,40 @@ func (c *RollingUpdateCluster) ListNodesets() (map[string]*Nodeset, error) {
} }
for _, asg := range asgs { for _, asg := range asgs {
nodeset := buildNodeset(asg) name := aws.StringValue(asg.AutoScalingGroupName)
nodesets[nodeset.Name] = nodeset var instancegroup *api.InstanceGroup
for _, g := range instancegroups {
var asgName string
switch g.Spec.Role {
case api.InstanceGroupRoleMaster:
asgName = g.Name + ".masters." + c.Cluster.Name
case api.InstanceGroupRoleNode:
asgName = g.Name + "." + c.Cluster.Name
default:
glog.Warningf("Ignoring InstanceGroup of unknown role %q", g.Spec.Role)
continue
} }
return nodesets, nil if name == asgName {
if instancegroup != nil {
return nil, fmt.Errorf("Found multiple instance groups matching ASG %q", asgName)
}
instancegroup = g
}
}
if instancegroup == nil {
glog.Warningf("Found ASG with no corresponding instance group: %q", name)
continue
}
group := buildCloudInstanceGroup(instancegroup, asg)
groups[instancegroup.Name] = group
} }
func (c *RollingUpdateCluster) RollingUpdateNodesets(nodesets map[string]*Nodeset) error { return groups, nil
if len(nodesets) == 0 { }
func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*CloudInstanceGroup) error {
if len(groups) == 0 {
return nil return nil
} }
@ -48,20 +73,20 @@ func (c *RollingUpdateCluster) RollingUpdateNodesets(nodesets map[string]*Nodese
var resultsMutex sync.Mutex var resultsMutex sync.Mutex
results := make(map[string]error) results := make(map[string]error)
for k, nodeset := range nodesets { for k, group := range groups {
wg.Add(1) wg.Add(1)
go func(k string, nodeset *Nodeset) { go func(k string, group *CloudInstanceGroup) {
resultsMutex.Lock() resultsMutex.Lock()
results[k] = fmt.Errorf("function panic") results[k] = fmt.Errorf("function panic")
resultsMutex.Unlock() resultsMutex.Unlock()
defer wg.Done() defer wg.Done()
err := nodeset.RollingUpdate(c.Cloud) err := group.RollingUpdate(c.Cloud)
resultsMutex.Lock() resultsMutex.Lock()
results[k] = err results[k] = err
resultsMutex.Unlock() resultsMutex.Unlock()
}(k, nodeset) }(k, group)
} }
wg.Wait() wg.Wait()
@ -75,16 +100,19 @@ func (c *RollingUpdateCluster) RollingUpdateNodesets(nodesets map[string]*Nodese
return nil return nil
} }
type Nodeset struct { // CloudInstanceGroup is the AWS ASG backing an InstanceGroup
Name string type CloudInstanceGroup struct {
InstanceGroup *api.InstanceGroup
ASGName string
Status string Status string
Ready []*autoscaling.Instance Ready []*autoscaling.Instance
NeedUpdate []*autoscaling.Instance NeedUpdate []*autoscaling.Instance
} }
func buildNodeset(g *autoscaling.Group) *Nodeset { func buildCloudInstanceGroup(ig *api.InstanceGroup, g *autoscaling.Group) *CloudInstanceGroup {
n := &Nodeset{ n := &CloudInstanceGroup{
Name: aws.StringValue(g.AutoScalingGroupName), ASGName: aws.StringValue(g.AutoScalingGroupName),
InstanceGroup: ig,
} }
findLaunchConfigurationName := aws.StringValue(g.LaunchConfigurationName) findLaunchConfigurationName := aws.StringValue(g.LaunchConfigurationName)
@ -106,11 +134,11 @@ func buildNodeset(g *autoscaling.Group) *Nodeset {
return n return n
} }
func (n *Nodeset) RollingUpdate(cloud fi.Cloud) error { func (n *CloudInstanceGroup) RollingUpdate(cloud fi.Cloud) error {
c := cloud.(*awsup.AWSCloud) c := cloud.(*awsup.AWSCloud)
for _, i := range n.NeedUpdate { for _, i := range n.NeedUpdate {
glog.Infof("Stopping instance %q in nodeset %q", *i.InstanceId, n.Name) glog.Infof("Stopping instance %q in AWS ASG %q", *i.InstanceId, n.ASGName)
// TODO: Evacuate through k8s first? // TODO: Evacuate through k8s first?
@ -135,6 +163,6 @@ func (n *Nodeset) RollingUpdate(cloud fi.Cloud) error {
return nil return nil
} }
func (n *Nodeset) String() string { func (n *CloudInstanceGroup) String() string {
return "nodeset:" + n.Name return "CloudInstanceGroup:" + n.ASGName
} }