diff --git a/upup/pkg/fi/cloudup/awsup/aws_cloud.go b/upup/pkg/fi/cloudup/awsup/aws_cloud.go index 148ddbc688..4c5d934dea 100644 --- a/upup/pkg/fi/cloudup/awsup/aws_cloud.go +++ b/upup/pkg/fi/cloudup/awsup/aws_cloud.go @@ -16,8 +16,8 @@ import ( "time" ) -const MaxDescribeTagsAttempts = 30 -const MaxCreateTagsAttempts = 30 +const MaxDescribeTagsAttempts = 60 +const MaxCreateTagsAttempts = 60 type AWSCloud struct { EC2 *ec2.EC2 diff --git a/upup/pkg/fi/executor.go b/upup/pkg/fi/executor.go index 03bcae9c5e..2356b258ae 100644 --- a/upup/pkg/fi/executor.go +++ b/upup/pkg/fi/executor.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/golang/glog" "strings" + "sync" ) type executor struct { @@ -65,12 +66,15 @@ func (e *executor) RunTasks(taskMap map[string]Task) error { } progress := false - var errors []error - // TODO: Fork/join execution here + var tasks []*taskState for _, ts := range canRun { - glog.V(2).Infof("Executing task %q: %v\n", ts.key, ts.task) - err := ts.task.Run(e.context) + tasks = append(tasks, ts) + } + + errors := e.forkJoin(tasks) + for i, err := range errors { + ts := tasks[i] if err != nil { glog.Warningf("error running task %q: %v", ts.key, err) errors = append(errors, err) @@ -104,3 +108,27 @@ func (e *executor) RunTasks(taskMap map[string]Task) error { return nil } + +type runnable func() error + +func (e *executor) forkJoin(tasks []*taskState) []error { + if len(tasks) == 0 { + return nil + } + + var wg sync.WaitGroup + results := make([]error, len(tasks)) + for i := 0; i < len(tasks); i++ { + wg.Add(1) + go func(ts *taskState, index int) { + results[index] = fmt.Errorf("function panic") + defer wg.Done() + glog.V(2).Infof("Executing task %q: %v\n", ts.key, ts.task) + results[index] = ts.task.Run(e.context) + }(tasks[i], i) + } + + wg.Wait() + + return results +} diff --git a/upup/pkg/kutil/delete_cluster.go b/upup/pkg/kutil/delete_cluster.go index 572433532a..699e92b6f6 100644 --- a/upup/pkg/kutil/delete_cluster.go +++ b/upup/pkg/kutil/delete_cluster.go @@ -12,6 +12,7 @@ import ( "k8s.io/kube-deploy/upup/pkg/fi" "k8s.io/kube-deploy/upup/pkg/fi/cloudup/awsup" "strings" + "sync" "time" ) @@ -195,6 +196,8 @@ func (c *DeleteCluster) DeleteResources(resources map[string]DeletableResource) done := make(map[string]DeletableResource) + var mutex sync.Mutex + // Initial pass to check that resources actually exist for k, r := range resources { hs, ok := r.(HasStatus) @@ -260,22 +263,39 @@ func (c *DeleteCluster) DeleteResources(resources map[string]DeletableResource) break } - // TODO: Parallel delete? + var wg sync.WaitGroup for k, r := range phase { - fmt.Printf("Deleting resource %s: ", k) - err := r.Delete(c.Cloud) - if err != nil { - if IsDependencyViolation(err) { - fmt.Printf("still has dependencies, will retry\n") - } else { - fmt.Printf("error deleting resource, will retry: %v\n", err) - } + wg.Add(1) + + go func(k string, r DeletableResource) { + mutex.Lock() failed[k] = r - } else { - fmt.Printf(" ok\n") - done[k] = r - } + mutex.Unlock() + + defer wg.Done() + glog.V(4).Infof("Deleting resource %s: ", k) + err := r.Delete(c.Cloud) + if err != nil { + mutex.Lock() + if IsDependencyViolation(err) { + fmt.Printf("%s\tstill has dependencies, will retry\n", k) + glog.V(4).Infof("API call made when had dependency %s", k) + } else { + fmt.Printf("%s\terror deleting resource, will retry: %v\n", k, err) + } + failed[k] = r + mutex.Unlock() + } else { + mutex.Lock() + fmt.Printf("%s\tok\n", k) + + delete(failed, k) + done[k] = r + mutex.Unlock() + } + }(k, r) } + wg.Wait() } if len(resources) == len(done) { @@ -696,6 +716,9 @@ func (r *DeletableInternetGateway) Delete(cloud fi.Cloud) error { } _, err := c.EC2.DetachInternetGateway(request) if err != nil { + if IsDependencyViolation(err) { + return err + } return fmt.Errorf("error detaching InternetGateway %q: %v", r.ID, err) } } @@ -716,6 +739,7 @@ func (r *DeletableInternetGateway) Delete(cloud fi.Cloud) error { return nil } + func (r *DeletableInternetGateway) String() string { return "InternetGateway:" + r.ID }