Merge pull request #63 from justinsb/upup_parallel

upup: run create & delete in parallel
This commit is contained in:
Justin Santa Barbara 2016-06-07 15:34:06 -04:00
commit f43ba79a72
3 changed files with 71 additions and 19 deletions

View File

@ -16,8 +16,8 @@ import (
"time"
)
const MaxDescribeTagsAttempts = 30
const MaxCreateTagsAttempts = 30
const MaxDescribeTagsAttempts = 60
const MaxCreateTagsAttempts = 60
type AWSCloud struct {
EC2 *ec2.EC2

View File

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/golang/glog"
"strings"
"sync"
)
type executor struct {
@ -65,12 +66,15 @@ func (e *executor) RunTasks(taskMap map[string]Task) error {
}
progress := false
var errors []error
// TODO: Fork/join execution here
var tasks []*taskState
for _, ts := range canRun {
glog.V(2).Infof("Executing task %q: %v\n", ts.key, ts.task)
err := ts.task.Run(e.context)
tasks = append(tasks, ts)
}
errors := e.forkJoin(tasks)
for i, err := range errors {
ts := tasks[i]
if err != nil {
glog.Warningf("error running task %q: %v", ts.key, err)
errors = append(errors, err)
@ -104,3 +108,27 @@ func (e *executor) RunTasks(taskMap map[string]Task) error {
return nil
}
type runnable func() error
func (e *executor) forkJoin(tasks []*taskState) []error {
if len(tasks) == 0 {
return nil
}
var wg sync.WaitGroup
results := make([]error, len(tasks))
for i := 0; i < len(tasks); i++ {
wg.Add(1)
go func(ts *taskState, index int) {
results[index] = fmt.Errorf("function panic")
defer wg.Done()
glog.V(2).Infof("Executing task %q: %v\n", ts.key, ts.task)
results[index] = ts.task.Run(e.context)
}(tasks[i], i)
}
wg.Wait()
return results
}

View File

@ -12,6 +12,7 @@ import (
"k8s.io/kube-deploy/upup/pkg/fi"
"k8s.io/kube-deploy/upup/pkg/fi/cloudup/awsup"
"strings"
"sync"
"time"
)
@ -195,6 +196,8 @@ func (c *DeleteCluster) DeleteResources(resources map[string]DeletableResource)
done := make(map[string]DeletableResource)
var mutex sync.Mutex
// Initial pass to check that resources actually exist
for k, r := range resources {
hs, ok := r.(HasStatus)
@ -260,22 +263,39 @@ func (c *DeleteCluster) DeleteResources(resources map[string]DeletableResource)
break
}
// TODO: Parallel delete?
var wg sync.WaitGroup
for k, r := range phase {
fmt.Printf("Deleting resource %s: ", k)
err := r.Delete(c.Cloud)
if err != nil {
if IsDependencyViolation(err) {
fmt.Printf("still has dependencies, will retry\n")
} else {
fmt.Printf("error deleting resource, will retry: %v\n", err)
}
wg.Add(1)
go func(k string, r DeletableResource) {
mutex.Lock()
failed[k] = r
} else {
fmt.Printf(" ok\n")
done[k] = r
}
mutex.Unlock()
defer wg.Done()
glog.V(4).Infof("Deleting resource %s: ", k)
err := r.Delete(c.Cloud)
if err != nil {
mutex.Lock()
if IsDependencyViolation(err) {
fmt.Printf("%s\tstill has dependencies, will retry\n", k)
glog.V(4).Infof("API call made when had dependency %s", k)
} else {
fmt.Printf("%s\terror deleting resource, will retry: %v\n", k, err)
}
failed[k] = r
mutex.Unlock()
} else {
mutex.Lock()
fmt.Printf("%s\tok\n", k)
delete(failed, k)
done[k] = r
mutex.Unlock()
}
}(k, r)
}
wg.Wait()
}
if len(resources) == len(done) {
@ -696,6 +716,9 @@ func (r *DeletableInternetGateway) Delete(cloud fi.Cloud) error {
}
_, err := c.EC2.DetachInternetGateway(request)
if err != nil {
if IsDependencyViolation(err) {
return err
}
return fmt.Errorf("error detaching InternetGateway %q: %v", r.ID, err)
}
}
@ -716,6 +739,7 @@ func (r *DeletableInternetGateway) Delete(cloud fi.Cloud) error {
return nil
}
func (r *DeletableInternetGateway) String() string {
return "InternetGateway:" + r.ID
}