Merge pull request #2645 from andrewsykim/refactor-cluster-resources

Generalize Cluster Resources
This commit is contained in:
Chris Love 2017-05-31 11:22:01 -06:00 committed by GitHub
commit ab36953f68
7 changed files with 301 additions and 310 deletions

View File

@ -18,7 +18,6 @@ package main
import (
"fmt"
"io"
"github.com/golang/glog"
@ -142,8 +141,7 @@ func RunDeleteCluster(f *util.Factory, out io.Writer, options *DeleteClusterOpti
}
}
// Todo lets make this smart enough to detect the cloud and switch on the ClusterResources interface
d := &resources.AwsCluster{}
d := &resources.ClusterResources{}
d.ClusterName = clusterName
d.Cloud = cloud

View File

@ -108,7 +108,7 @@ func RunToolboxDump(f *util.Factory, out io.Writer, options *ToolboxDumpOptions)
}
// Todo lets make this smart enough to detect the cloud and switch on the ClusterResources interface
d := &resources.AwsCluster{}
d := &resources.ClusterResources{}
d.ClusterName = options.ClusterName
d.Cloud = cloud

View File

@ -21,6 +21,9 @@ import (
"bytes"
"compress/gzip"
"fmt"
"io"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/cloudformation"
@ -29,13 +32,9 @@ import (
"github.com/aws/aws-sdk-go/service/iam"
"github.com/aws/aws-sdk-go/service/route53"
"github.com/golang/glog"
"io"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
"strings"
"sync"
"time"
)
const (
@ -45,26 +44,133 @@ const (
TypeLoadBalancer = "load-balancer"
)
type ResourceTracker struct {
Name string
Type string
ID string
blocks []string
blocked []string
done bool
deleter func(cloud fi.Cloud, tracker *ResourceTracker) error
groupKey string
groupDeleter func(cloud fi.Cloud, trackers []*ResourceTracker) error
Dumper func(r *ResourceTracker) (interface{}, error)
obj interface{}
}
type listFn func(fi.Cloud, string) ([]*ResourceTracker, error)
func (c *ClusterResources) listResourcesAWS() (map[string]*ResourceTracker, error) {
cloud := c.Cloud.(awsup.AWSCloud)
resources := make(map[string]*ResourceTracker)
// These are the functions that are used for looking up
// cluster resources by their tags.
listFunctions := []listFn{
// CloudFormation
//ListCloudFormationStacks,
// EC2
ListInstances,
ListKeypairs,
ListSecurityGroups,
ListVolumes,
// EC2 VPC
ListDhcpOptions,
ListInternetGateways,
ListRouteTables,
ListSubnets,
ListVPCs,
// ELBs
ListELBs,
// ASG
ListAutoScalingGroups,
// Route 53
ListRoute53Records,
// IAM
ListIAMInstanceProfiles,
ListIAMRoles,
}
for _, fn := range listFunctions {
trackers, err := fn(cloud, c.ClusterName)
if err != nil {
return nil, err
}
for _, t := range trackers {
resources[t.Type+":"+t.ID] = t
}
}
{
// Gateways weren't tagged in kube-up
// If we are deleting the VPC, we should delete the attached gateway
// (no real reason not to; easy to recreate; no real state etc)
gateways, err := DescribeInternetGatewaysIgnoreTags(cloud)
if err != nil {
return nil, err
}
for _, igw := range gateways {
for _, attachment := range igw.Attachments {
vpcID := aws.StringValue(attachment.VpcId)
igwID := aws.StringValue(igw.InternetGatewayId)
if vpcID == "" || igwID == "" {
continue
}
if resources["vpc:"+vpcID] != nil && resources["internet-gateway:"+igwID] == nil {
resources["internet-gateway:"+igwID] = &ResourceTracker{
Name: FindName(igw.Tags),
ID: igwID,
Type: "internet-gateway",
deleter: DeleteInternetGateway,
}
}
}
}
}
{
// We delete a launch configuration if it is bound to one of the tagged security groups
securityGroups := sets.NewString()
for k := range resources {
if !strings.HasPrefix(k, "security-group:") {
continue
}
id := strings.TrimPrefix(k, "security-group:")
securityGroups.Insert(id)
}
lcs, err := FindAutoScalingLaunchConfigurations(cloud, securityGroups)
if err != nil {
return nil, err
}
for _, t := range lcs {
resources[t.Type+":"+t.ID] = t
}
}
if err := addUntaggedRouteTables(cloud, c.ClusterName, resources); err != nil {
return nil, err
}
{
// We delete a NAT gateway if it is linked to our route table
routeTableIds := sets.NewString()
for k := range resources {
if !strings.HasPrefix(k, ec2.ResourceTypeRouteTable+":") {
continue
}
id := strings.TrimPrefix(k, ec2.ResourceTypeRouteTable+":")
routeTableIds.Insert(id)
}
natGateways, err := FindNatGateways(cloud, routeTableIds)
if err != nil {
return nil, err
}
for _, t := range natGateways {
resources[t.Type+":"+t.ID] = t
}
}
for k, t := range resources {
if t.done {
delete(resources, k)
}
}
return resources, nil
}
func gunzipBytes(d []byte) ([]byte, error) {
var out bytes.Buffer
in := bytes.NewReader(d)
@ -139,155 +245,6 @@ func addUntaggedRouteTables(cloud awsup.AWSCloud, clusterName string, resources
return nil
}
func (c *AwsCluster) DeleteResources(resources map[string]*ResourceTracker) error {
depMap := make(map[string][]string)
done := make(map[string]*ResourceTracker)
var mutex sync.Mutex
for k, t := range resources {
for _, block := range t.blocks {
depMap[block] = append(depMap[block], k)
}
for _, blocked := range t.blocked {
depMap[k] = append(depMap[k], blocked)
}
if t.done {
done[k] = t
}
}
glog.V(2).Infof("Dependencies")
for k, v := range depMap {
glog.V(2).Infof("\t%s\t%v", k, v)
}
iterationsWithNoProgress := 0
for {
// TODO: Some form of default ordering based on types?
failed := make(map[string]*ResourceTracker)
for {
phase := make(map[string]*ResourceTracker)
for k, r := range resources {
if _, d := done[k]; d {
continue
}
if _, d := failed[k]; d {
// Only attempt each resource once per pass
continue
}
ready := true
for _, dep := range depMap[k] {
if _, d := done[dep]; !d {
glog.V(4).Infof("dependency %q of %q not deleted; skipping", dep, k)
ready = false
}
}
if !ready {
continue
}
phase[k] = r
}
if len(phase) == 0 {
break
}
groups := make(map[string][]*ResourceTracker)
for k, t := range phase {
groupKey := t.groupKey
if groupKey == "" {
groupKey = "_" + k
}
groups[groupKey] = append(groups[groupKey], t)
}
var wg sync.WaitGroup
for _, trackers := range groups {
wg.Add(1)
go func(trackers []*ResourceTracker) {
mutex.Lock()
for _, t := range trackers {
k := t.Type + ":" + t.ID
failed[k] = t
}
mutex.Unlock()
defer wg.Done()
human := trackers[0].Type + ":" + trackers[0].ID
var err error
if trackers[0].groupDeleter != nil {
err = trackers[0].groupDeleter(c.Cloud, trackers)
} else {
if len(trackers) != 1 {
glog.Fatalf("found group without groupKey")
}
err = trackers[0].deleter(c.Cloud, trackers[0])
}
if err != nil {
mutex.Lock()
if IsDependencyViolation(err) {
fmt.Printf("%s\tstill has dependencies, will retry\n", human)
glog.V(4).Infof("API call made when had dependency: %s", human)
} else {
fmt.Printf("%s\terror deleting resource, will retry: %v\n", human, err)
}
for _, t := range trackers {
k := t.Type + ":" + t.ID
failed[k] = t
}
mutex.Unlock()
} else {
mutex.Lock()
fmt.Printf("%s\tok\n", human)
iterationsWithNoProgress = 0
for _, t := range trackers {
k := t.Type + ":" + t.ID
delete(failed, k)
done[k] = t
}
mutex.Unlock()
}
}(trackers)
}
wg.Wait()
}
if len(resources) == len(done) {
return nil
}
fmt.Printf("Not all resources deleted; waiting before reattempting deletion\n")
for k := range resources {
if _, d := done[k]; d {
continue
}
fmt.Printf("\t%s\n", k)
}
iterationsWithNoProgress++
if iterationsWithNoProgress > 42 {
return fmt.Errorf("Not making progress deleting resources; giving up")
}
time.Sleep(10 * time.Second)
}
}
func matchesAsgTags(tags map[string]string, actual []*autoscaling.TagDescription) bool {
for k, v := range tags {
found := false

View File

@ -18,38 +18,50 @@ package resources
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"k8s.io/apimachinery/pkg/util/sets"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
"strings"
)
// Todo
// After this is stable, we should pull this out of kutil so we can start mapping out the implementation
// for each cloud. Just taking the first steps now into separating these out, but we probably have some
// bigger concerns as this grows.. Just starting to draw some boundaries.
type ResourceTracker struct {
Name string
Type string
ID string
// ClusterResources is a representation of a cluster with abilities to ListResources and DeleteResources
type ClusterResources interface {
blocks []string
blocked []string
done bool
deleter func(cloud fi.Cloud, tracker *ResourceTracker) error
groupKey string
groupDeleter func(cloud fi.Cloud, trackers []*ResourceTracker) error
Dumper func(r *ResourceTracker) (interface{}, error)
obj interface{}
}
var _ Resources = &ClusterResources{}
// Resources is a representation of a cluster with abilities to ListResources and DeleteResources
type Resources interface {
ListResources() (map[string]*ResourceTracker, error)
DeleteResources(resources map[string]*ResourceTracker) error
}
// AwsCluster is an implementation of ClusterResources
// ClusterResources is an implementation of Resources
// The algorithm is pretty simple: it discovers all the resources it can (primary using tags)
// There are a few tweaks to that approach, like choosing a default ordering, but it is not much
// smarter.
// There are a few tweaks to that approach, like choosing a default ordering, but it is not much smarter.
// Some dependencies are invisible (e.g. ELB dependencies).
//
type AwsCluster struct {
type ClusterResources struct {
ClusterName string
Cloud fi.Cloud
Region string
}
func (c *AwsCluster) ListResources() (map[string]*ResourceTracker, error) {
func (c *ClusterResources) ListResources() (map[string]*ResourceTracker, error) {
switch c.Cloud.ProviderID() {
case fi.CloudProviderAWS:
return c.listResourcesAWS()
@ -62,127 +74,151 @@ func (c *AwsCluster) ListResources() (map[string]*ResourceTracker, error) {
}
}
func (c *AwsCluster) listResourcesAWS() (map[string]*ResourceTracker, error) {
cloud := c.Cloud.(awsup.AWSCloud)
func (c *ClusterResources) DeleteResources(resources map[string]*ResourceTracker) error {
depMap := make(map[string][]string)
resources := make(map[string]*ResourceTracker)
done := make(map[string]*ResourceTracker)
// These are the functions that are used for looking up
// cluster resources by their tags.
listFunctions := []listFn{
// CloudFormation
//ListCloudFormationStacks,
// EC2
ListInstances,
ListKeypairs,
ListSecurityGroups,
ListVolumes,
// EC2 VPC
ListDhcpOptions,
ListInternetGateways,
ListRouteTables,
ListSubnets,
ListVPCs,
// ELBs
ListELBs,
// ASG
ListAutoScalingGroups,
// Route 53
ListRoute53Records,
// IAM
ListIAMInstanceProfiles,
ListIAMRoles,
}
for _, fn := range listFunctions {
trackers, err := fn(cloud, c.ClusterName)
if err != nil {
return nil, err
}
for _, t := range trackers {
resources[t.Type+":"+t.ID] = t
}
}
{
// Gateways weren't tagged in kube-up
// If we are deleting the VPC, we should delete the attached gateway
// (no real reason not to; easy to recreate; no real state etc)
gateways, err := DescribeInternetGatewaysIgnoreTags(cloud)
if err != nil {
return nil, err
}
for _, igw := range gateways {
for _, attachment := range igw.Attachments {
vpcID := aws.StringValue(attachment.VpcId)
igwID := aws.StringValue(igw.InternetGatewayId)
if vpcID == "" || igwID == "" {
continue
}
if resources["vpc:"+vpcID] != nil && resources["internet-gateway:"+igwID] == nil {
resources["internet-gateway:"+igwID] = &ResourceTracker{
Name: FindName(igw.Tags),
ID: igwID,
Type: "internet-gateway",
deleter: DeleteInternetGateway,
}
}
}
}
}
{
// We delete a launch configuration if it is bound to one of the tagged security groups
securityGroups := sets.NewString()
for k := range resources {
if !strings.HasPrefix(k, "security-group:") {
continue
}
id := strings.TrimPrefix(k, "security-group:")
securityGroups.Insert(id)
}
lcs, err := FindAutoScalingLaunchConfigurations(cloud, securityGroups)
if err != nil {
return nil, err
}
for _, t := range lcs {
resources[t.Type+":"+t.ID] = t
}
}
if err := addUntaggedRouteTables(cloud, c.ClusterName, resources); err != nil {
return nil, err
}
{
// We delete a NAT gateway if it is linked to our route table
routeTableIds := sets.NewString()
for k := range resources {
if !strings.HasPrefix(k, ec2.ResourceTypeRouteTable+":") {
continue
}
id := strings.TrimPrefix(k, ec2.ResourceTypeRouteTable+":")
routeTableIds.Insert(id)
}
natGateways, err := FindNatGateways(cloud, routeTableIds)
if err != nil {
return nil, err
}
for _, t := range natGateways {
resources[t.Type+":"+t.ID] = t
}
}
var mutex sync.Mutex
for k, t := range resources {
for _, block := range t.blocks {
depMap[block] = append(depMap[block], k)
}
for _, blocked := range t.blocked {
depMap[k] = append(depMap[k], blocked)
}
if t.done {
delete(resources, k)
done[k] = t
}
}
return resources, nil
glog.V(2).Infof("Dependencies")
for k, v := range depMap {
glog.V(2).Infof("\t%s\t%v", k, v)
}
iterationsWithNoProgress := 0
for {
// TODO: Some form of default ordering based on types?
failed := make(map[string]*ResourceTracker)
for {
phase := make(map[string]*ResourceTracker)
for k, r := range resources {
if _, d := done[k]; d {
continue
}
if _, d := failed[k]; d {
// Only attempt each resource once per pass
continue
}
ready := true
for _, dep := range depMap[k] {
if _, d := done[dep]; !d {
glog.V(4).Infof("dependency %q of %q not deleted; skipping", dep, k)
ready = false
}
}
if !ready {
continue
}
phase[k] = r
}
if len(phase) == 0 {
break
}
groups := make(map[string][]*ResourceTracker)
for k, t := range phase {
groupKey := t.groupKey
if groupKey == "" {
groupKey = "_" + k
}
groups[groupKey] = append(groups[groupKey], t)
}
var wg sync.WaitGroup
for _, trackers := range groups {
wg.Add(1)
go func(trackers []*ResourceTracker) {
mutex.Lock()
for _, t := range trackers {
k := t.Type + ":" + t.ID
failed[k] = t
}
mutex.Unlock()
defer wg.Done()
human := trackers[0].Type + ":" + trackers[0].ID
var err error
if trackers[0].groupDeleter != nil {
err = trackers[0].groupDeleter(c.Cloud, trackers)
} else {
if len(trackers) != 1 {
glog.Fatalf("found group without groupKey")
}
err = trackers[0].deleter(c.Cloud, trackers[0])
}
if err != nil {
mutex.Lock()
if IsDependencyViolation(err) {
fmt.Printf("%s\tstill has dependencies, will retry\n", human)
glog.V(4).Infof("API call made when had dependency: %s", human)
} else {
fmt.Printf("%s\terror deleting resource, will retry: %v\n", human, err)
}
for _, t := range trackers {
k := t.Type + ":" + t.ID
failed[k] = t
}
mutex.Unlock()
} else {
mutex.Lock()
fmt.Printf("%s\tok\n", human)
iterationsWithNoProgress = 0
for _, t := range trackers {
k := t.Type + ":" + t.ID
delete(failed, k)
done[k] = t
}
mutex.Unlock()
}
}(trackers)
}
wg.Wait()
}
if len(resources) == len(done) {
return nil
}
fmt.Printf("Not all resources deleted; waiting before reattempting deletion\n")
for k := range resources {
if _, d := done[k]; d {
continue
}
fmt.Printf("\t%s\n", k)
}
iterationsWithNoProgress++
if iterationsWithNoProgress > 42 {
return fmt.Errorf("Not making progress deleting resources; giving up")
}
time.Sleep(10 * time.Second)
}
}

View File

@ -44,7 +44,7 @@ const (
typeRoute = "Route"
)
func (c *AwsCluster) listResourcesGCE() (map[string]*ResourceTracker, error) {
func (c *ClusterResources) listResourcesGCE() (map[string]*ResourceTracker, error) {
gceCloud := c.Cloud.(*gce.GCECloud)
if c.Region == "" {
c.Region = gceCloud.Region

View File

@ -37,7 +37,7 @@ type clusterDiscoveryVSphere struct {
type vsphereListFn func() ([]*ResourceTracker, error)
func (c *AwsCluster) listResourcesVSphere() (map[string]*ResourceTracker, error) {
func (c *ClusterResources) listResourcesVSphere() (map[string]*ResourceTracker, error) {
vsphereCloud := c.Cloud.(*vsphere.VSphereCloud)
resources := make(map[string]*ResourceTracker)