Merge pull request #5245 from justinsb/gc_launchconfigurations

AWS: Delete old LaunchConfigurations
This commit is contained in:
k8s-ci-robot 2018-06-20 10:33:08 -07:00 committed by GitHub
commit 792f02a5c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 331 additions and 64 deletions

View File

@ -95,6 +95,9 @@ func (m *MockAutoscaling) CreateLaunchConfiguration(request *autoscaling.CreateL
if m.LaunchConfigurations == nil {
m.LaunchConfigurations = make(map[string]*autoscaling.LaunchConfiguration)
}
if m.LaunchConfigurations[*lc.LaunchConfigurationName] != nil {
return nil, fmt.Errorf("duplicate LaunchConfigurationName %s", *lc.LaunchConfigurationName)
}
m.LaunchConfigurations[*lc.LaunchConfigurationName] = lc
return &autoscaling.CreateLaunchConfigurationOutput{}, nil

View File

@ -226,7 +226,7 @@ func runTest(t *testing.T, h *testutils.IntegrationTestHarness, clusterName stri
options.InitDefaults()
options.Target = "terraform"
options.OutDir = path.Join(h.TempDir, "out")
options.MaxTaskDuration = 30 * time.Second
options.RunTasksOptions.MaxTaskDuration = 30 * time.Second
if phase != nil {
options.Phase = string(*phase)
}
@ -514,7 +514,7 @@ func runTestCloudformation(t *testing.T, clusterName string, srcDir string, vers
options.InitDefaults()
options.Target = "cloudformation"
options.OutDir = path.Join(h.TempDir, "out")
options.MaxTaskDuration = 30 * time.Second
options.RunTasksOptions.MaxTaskDuration = 30 * time.Second
// We don't test it here, and it adds a dependency on kubectl
options.CreateKubecfg = false

View File

@ -144,7 +144,7 @@ func runLifecycleTest(h *testutils.IntegrationTestHarness, o *LifecycleTestOptio
{
options := &UpdateClusterOptions{}
options.InitDefaults()
options.MaxTaskDuration = 10 * time.Second
options.RunTasksOptions.MaxTaskDuration = 10 * time.Second
options.Yes = true
// We don't test it here, and it adds a dependency on kubectl
@ -160,7 +160,7 @@ func runLifecycleTest(h *testutils.IntegrationTestHarness, o *LifecycleTestOptio
options := &UpdateClusterOptions{}
options.InitDefaults()
options.Target = cloudup.TargetDryRun
options.MaxTaskDuration = 10 * time.Second
options.RunTasksOptions.MaxTaskDuration = 10 * time.Second
// We don't test it here, and it adds a dependency on kubectl
options.CreateKubecfg = false

View File

@ -23,7 +23,6 @@ import (
"io/ioutil"
"path/filepath"
"strings"
"time"
"github.com/golang/glog"
"github.com/spf13/cobra"
@ -63,7 +62,7 @@ type UpdateClusterOptions struct {
Models string
OutDir string
SSHPublicKey string
MaxTaskDuration time.Duration
RunTasksOptions fi.RunTasksOptions
CreateKubecfg bool
Phase string
@ -79,8 +78,8 @@ func (o *UpdateClusterOptions) InitDefaults() {
o.Models = strings.Join(cloudup.CloudupModels, ",")
o.SSHPublicKey = ""
o.OutDir = ""
o.MaxTaskDuration = cloudup.DefaultMaxTaskDuration
o.CreateKubecfg = true
o.RunTasksOptions.InitDefaults()
}
func NewCmdUpdateCluster(f *util.Factory, out io.Writer) *cobra.Command {
@ -246,7 +245,7 @@ func RunUpdateCluster(f *util.Factory, clusterName string, out io.Writer, c *Upd
Cluster: cluster,
DryRun: isDryrun,
InstanceGroups: instanceGroups,
MaxTaskDuration: c.MaxTaskDuration,
RunTasksOptions: &c.RunTasksOptions,
Models: strings.Split(c.Models, ","),
OutDir: c.OutDir,
Phase: phase,

View File

@ -104,11 +104,12 @@ func main() {
command = append(command, s)
}
i := bootstrap.Installation{
MaxTaskDuration: 5 * time.Minute,
CacheDir: flagCacheDir,
Command: command,
FSRoot: flagRootFS,
CacheDir: flagCacheDir,
Command: command,
FSRoot: flagRootFS,
}
i.RunTasksOptions.InitDefaults()
i.RunTasksOptions.MaxTaskDuration = 5 * time.Minute
err = i.Run()
if err == nil {
fmt.Printf("service installed")

View File

@ -0,0 +1,18 @@
## Release notes for kops 1.10 series
# Significant changes
* Old LaunchConfigurations are now deleted on AWS. By default the 3 most recent LaunchConfigurations for each InstanceGroup are kept, and older ones are automatically removed. To keep the existing behaviour set the KeepLaunchConfigurations feature flag i.e. `export KOPS_FEATURE_FLAGS=KeepLaunchConfigurations`
# Required Actions
None known at this time
# Highlighted changes
(to follow)
# Full change list
(to follow)

View File

@ -21,7 +21,6 @@ import (
"fmt"
"os"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/sets"
@ -36,7 +35,7 @@ import (
type Installation struct {
FSRoot string
CacheDir string
MaxTaskDuration time.Duration
RunTasksOptions fi.RunTasksOptions
Command []string
}
@ -86,7 +85,7 @@ func (i *Installation) Run() error {
}
defer context.Close()
err = context.RunTasks(i.MaxTaskDuration)
err = context.RunTasks(i.RunTasksOptions)
if err != nil {
return fmt.Errorf("error running tasks: %v", err)
}

View File

@ -36,6 +36,9 @@ func Bool(b bool) *bool {
return &b
}
// KeepLaunchConfigurations can be set to prevent garbage collection of old launch configurations
var KeepLaunchConfigurations = New("KeepLaunchConfigurations", Bool(false))
// DNSPreCreate controls whether we pre-create DNS records.
var DNSPreCreate = New("DNSPreCreate", Bool(true))

View File

@ -62,7 +62,8 @@ var _ fi.Target = &MockTarget{}
func TestKeypairUpgrade(t *testing.T) {
lifecycle := fi.LifecycleSync
defaultDeadline := 2 * time.Second
runTasksOptions := fi.RunTasksOptions{}
runTasksOptions.MaxTaskDuration = 2 * time.Second
target := &MockTarget{}
@ -119,7 +120,7 @@ func TestKeypairUpgrade(t *testing.T) {
t.Fatalf("error building context: %v", err)
}
if err := context.RunTasks(defaultDeadline); err != nil {
if err := context.RunTasks(runTasksOptions); err != nil {
t.Fatalf("unexpected error during Run: %v", err)
}
}
@ -160,7 +161,7 @@ func TestKeypairUpgrade(t *testing.T) {
t.Fatalf("error building context: %v", err)
}
if err := context.RunTasks(defaultDeadline); err != nil {
if err := context.RunTasks(runTasksOptions); err != nil {
t.Fatalf("unexpected error during Run: %v", err)
}
}
@ -180,7 +181,7 @@ func TestKeypairUpgrade(t *testing.T) {
t.Fatalf("error building context: %v", err)
}
if err := context.RunTasks(defaultDeadline); err != nil {
if err := context.RunTasks(runTasksOptions); err != nil {
t.Fatalf("unexpected error during Run: %v", err)
}
}

View File

@ -22,7 +22,6 @@ import (
"os"
"path"
"strings"
"time"
"github.com/blang/semver"
"github.com/golang/glog"
@ -71,8 +70,7 @@ import (
)
const (
DefaultMaxTaskDuration = 10 * time.Minute
starline = "*********************************************************************************\n"
starline = "*********************************************************************************\n"
)
var (
@ -124,7 +122,8 @@ type ApplyClusterCmd struct {
// DryRun is true if this is only a dry run
DryRun bool
MaxTaskDuration time.Duration
// RunTasksOptions defines parameters for task execution, e.g. retry interval
RunTasksOptions *fi.RunTasksOptions
// The channel we are using
channel *kops.Channel
@ -142,10 +141,6 @@ type ApplyClusterCmd struct {
}
func (c *ApplyClusterCmd) Run() error {
if c.MaxTaskDuration == 0 {
c.MaxTaskDuration = DefaultMaxTaskDuration
}
if c.InstanceGroups == nil {
list, err := c.Clientset.InstanceGroupsFor(c.Cluster).List(metav1.ListOptions{})
if err != nil {
@ -806,7 +801,14 @@ func (c *ApplyClusterCmd) Run() error {
}
defer context.Close()
err = context.RunTasks(c.MaxTaskDuration)
var options fi.RunTasksOptions
if c.RunTasksOptions != nil {
options = *c.RunTasksOptions
} else {
options.InitDefaults()
}
err = context.RunTasks(options)
if err != nil {
return fmt.Errorf("error running tasks: %v", err)
}

View File

@ -96,12 +96,14 @@ go_test(
"ebsvolume_test.go",
"elastic_ip_test.go",
"internetgateway_test.go",
"launchconfiguration_test.go",
"securitygroup_test.go",
"subnet_test.go",
"vpc_test.go",
],
embed = [":go_default_library"],
deps = [
"//cloudmock/aws/mockautoscaling:go_default_library",
"//cloudmock/aws/mockec2:go_default_library",
"//pkg/apis/kops:go_default_library",
"//pkg/assets:go_default_library",

View File

@ -32,7 +32,10 @@ import (
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
)
const defaultDeadline = 2 * time.Second
var testRunTasksOptions = fi.RunTasksOptions{
MaxTaskDuration: 2 * time.Second,
WaitAfterAllTasksFailed: 500 * time.Millisecond,
}
func TestElasticIPCreate(t *testing.T) {
cloud := awsup.BuildMockAWSCloud("us-east-1", "abc")
@ -77,7 +80,7 @@ func TestElasticIPCreate(t *testing.T) {
t.Fatalf("error building context: %v", err)
}
if err := context.RunTasks(defaultDeadline); err != nil {
if err := context.RunTasks(testRunTasksOptions); err != nil {
t.Fatalf("unexpected error during Run: %v", err)
}
@ -119,7 +122,7 @@ func checkNoChanges(t *testing.T, cloud fi.Cloud, allTasks map[string]fi.Task) {
t.Fatalf("error building context: %v", err)
}
if err := context.RunTasks(defaultDeadline); err != nil {
if err := context.RunTasks(testRunTasksOptions); err != nil {
t.Fatalf("unexpected error during Run: %v", err)
}

View File

@ -111,7 +111,7 @@ func TestSharedInternetGatewayDoesNotRename(t *testing.T) {
t.Fatalf("error building context: %v", err)
}
if err := context.RunTasks(defaultDeadline); err != nil {
if err := context.RunTasks(testRunTasksOptions); err != nil {
t.Fatalf("unexpected error during Run: %v", err)
}

View File

@ -19,6 +19,7 @@ package awstasks
import (
"encoding/base64"
"fmt"
"math"
"sort"
"strings"
"time"
@ -27,12 +28,25 @@ import (
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kops/pkg/featureflag"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
"k8s.io/kops/upup/pkg/fi/cloudup/cloudformation"
"k8s.io/kops/upup/pkg/fi/cloudup/terraform"
)
// defaultRetainLaunchConfigurationCount is the number of launch configurations (matching the name prefix) that we should
// keep, we delete older ones
var defaultRetainLaunchConfigurationCount = 3
// RetainLaunchConfigurationCount returns the number of launch configurations to keep
func RetainLaunchConfigurationCount() int {
if featureflag.KeepLaunchConfigurations.Enabled() {
return math.MaxInt32
}
return defaultRetainLaunchConfigurationCount
}
//go:generate fitask -type=LaunchConfiguration
type LaunchConfiguration struct {
Name *string
@ -68,24 +82,26 @@ type LaunchConfiguration struct {
var _ fi.CompareWithID = &LaunchConfiguration{}
var _ fi.ProducesDeletions = &LaunchConfiguration{}
func (e *LaunchConfiguration) CompareWithID() *string {
return e.ID
}
func (e *LaunchConfiguration) Find(c *fi.Context) (*LaunchConfiguration, error) {
// findLaunchConfigurations returns matching LaunchConfigurations, sorted by CreatedTime (ascending)
func (e *LaunchConfiguration) findLaunchConfigurations(c *fi.Context) ([]*autoscaling.LaunchConfiguration, error) {
cloud := c.Cloud.(awsup.AWSCloud)
request := &autoscaling.DescribeLaunchConfigurationsInput{}
prefix := *e.Name + "-"
configurations := map[string]*autoscaling.LaunchConfiguration{}
var configurations []*autoscaling.LaunchConfiguration
err := cloud.Autoscaling().DescribeLaunchConfigurationsPages(request, func(page *autoscaling.DescribeLaunchConfigurationsOutput, lastPage bool) bool {
for _, l := range page.LaunchConfigurations {
name := aws.StringValue(l.LaunchConfigurationName)
if strings.HasPrefix(name, prefix) {
suffix := name[len(prefix):]
configurations[suffix] = l
configurations = append(configurations, l)
}
}
return true
@ -94,21 +110,36 @@ func (e *LaunchConfiguration) Find(c *fi.Context) (*LaunchConfiguration, error)
return nil, fmt.Errorf("error listing AutoscalingLaunchConfigurations: %v", err)
}
sort.Slice(configurations, func(i, j int) bool {
ti := configurations[i].CreatedTime
tj := configurations[j].CreatedTime
if tj == nil {
return true
}
if ti == nil {
return false
}
return ti.UnixNano() < tj.UnixNano()
})
return configurations, nil
}
func (e *LaunchConfiguration) Find(c *fi.Context) (*LaunchConfiguration, error) {
cloud := c.Cloud.(awsup.AWSCloud)
configurations, err := e.findLaunchConfigurations(c)
if err != nil {
return nil, err
}
if len(configurations) == 0 {
return nil, nil
}
var newest *autoscaling.LaunchConfiguration
var newestTime int64
for _, lc := range configurations {
t := lc.CreatedTime.UnixNano()
if t > newestTime {
newestTime = t
newest = lc
}
}
lc := newest
// We pick up the latest launch configuration
// (TODO: this might not actually be attached to the AutoScalingGroup, if something went wrong previously)
lc := configurations[len(configurations)-1]
glog.V(2).Infof("found existing AutoscalingLaunchConfiguration: %q", *lc.LaunchConfigurationName)
@ -117,15 +148,21 @@ func (e *LaunchConfiguration) Find(c *fi.Context) (*LaunchConfiguration, error)
ID: lc.LaunchConfigurationName,
ImageID: lc.ImageId,
InstanceType: lc.InstanceType,
SSHKey: &SSHKey{Name: lc.KeyName},
AssociatePublicIP: lc.AssociatePublicIpAddress,
IAMInstanceProfile: &IAMInstanceProfile{Name: lc.IamInstanceProfile},
InstanceMonitoring: lc.InstanceMonitoring.Enabled,
SpotPrice: aws.StringValue(lc.SpotPrice),
Tenancy: lc.PlacementTenancy,
RootVolumeOptimization: lc.EbsOptimized,
}
if lc.KeyName != nil {
actual.SSHKey = &SSHKey{Name: lc.KeyName}
}
if lc.IamInstanceProfile != nil {
actual.IAMInstanceProfile = &IAMInstanceProfile{Name: lc.IamInstanceProfile}
}
securityGroups := []*SecurityGroup{}
for _, sgID := range lc.SecurityGroups {
securityGroups = append(securityGroups, &SecurityGroup{ID: sgID})
@ -145,11 +182,13 @@ func (e *LaunchConfiguration) Find(c *fi.Context) (*LaunchConfiguration, error)
actual.RootVolumeIops = b.Ebs.Iops
}
userData, err := base64.StdEncoding.DecodeString(aws.StringValue(lc.UserData))
if err != nil {
return nil, fmt.Errorf("error decoding UserData: %v", err)
if lc.UserData != nil {
userData, err := base64.StdEncoding.DecodeString(aws.StringValue(lc.UserData))
if err != nil {
return nil, fmt.Errorf("error decoding UserData: %v", err)
}
actual.UserData = fi.WrapResource(fi.NewStringResource(string(userData)))
}
actual.UserData = fi.WrapResource(fi.NewStringResource(string(userData)))
// Avoid spurious changes on ImageId
if e.ImageID != nil && actual.ImageID != nil && *actual.ImageID != *e.ImageID {
@ -617,3 +656,68 @@ func (_ *LaunchConfiguration) RenderCloudformation(t *cloudformation.Cloudformat
func (e *LaunchConfiguration) CloudformationLink() *cloudformation.Literal {
return cloudformation.Ref("AWS::AutoScaling::LaunchConfiguration", *e.Name)
}
// deleteLaunchConfiguration tracks a LaunchConfiguration that we're going to delete
// It implements fi.Deletion
type deleteLaunchConfiguration struct {
lc *autoscaling.LaunchConfiguration
}
var _ fi.Deletion = &deleteLaunchConfiguration{}
func (d *deleteLaunchConfiguration) TaskName() string {
return "LaunchConfiguration"
}
func (d *deleteLaunchConfiguration) Item() string {
return aws.StringValue(d.lc.LaunchConfigurationName)
}
func (d *deleteLaunchConfiguration) Delete(t fi.Target) error {
glog.V(2).Infof("deleting launch configuration %v", d)
awsTarget, ok := t.(*awsup.AWSAPITarget)
if !ok {
return fmt.Errorf("unexpected target type for deletion: %T", t)
}
request := &autoscaling.DeleteLaunchConfigurationInput{
LaunchConfigurationName: d.lc.LaunchConfigurationName,
}
name := aws.StringValue(request.LaunchConfigurationName)
glog.V(2).Infof("Calling autoscaling DeleteLaunchConfiguration for %s", name)
_, err := awsTarget.Cloud.Autoscaling().DeleteLaunchConfiguration(request)
if err != nil {
return fmt.Errorf("error deleting autoscaling LaunchConfiguration %s: %v", name, err)
}
return nil
}
func (d *deleteLaunchConfiguration) String() string {
return d.TaskName() + "-" + d.Item()
}
func (e *LaunchConfiguration) FindDeletions(c *fi.Context) ([]fi.Deletion, error) {
var removals []fi.Deletion
configurations, err := e.findLaunchConfigurations(c)
if err != nil {
return nil, err
}
if len(configurations) <= RetainLaunchConfigurationCount() {
return nil, nil
}
configurations = configurations[:len(configurations)-RetainLaunchConfigurationCount()]
for _, configuration := range configurations {
removals = append(removals, &deleteLaunchConfiguration{lc: configuration})
}
glog.V(2).Infof("will delete launch configurations: %v", removals)
return removals, nil
}

View File

@ -0,0 +1,117 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package awstasks
import (
"strconv"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"k8s.io/kops/cloudmock/aws/mockautoscaling"
"k8s.io/kops/cloudmock/aws/mockec2"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
)
func TestLaunchConfigurationGarbageCollection(t *testing.T) {
cloud := awsup.BuildMockAWSCloud("us-east-1", "abc")
mockEC2 := &mockec2.MockEC2{}
cloud.MockEC2 = mockEC2
as := &mockautoscaling.MockAutoscaling{}
cloud.MockAutoscaling = as
mockEC2.Images = append(mockEC2.Images, &ec2.Image{
CreationDate: aws.String("2016-10-21T20:07:19.000Z"),
ImageId: aws.String("ami-12345678"),
Name: aws.String("k8s-1.4-debian-jessie-amd64-hvm-ebs-2016-10-21"),
OwnerId: aws.String(awsup.WellKnownAccountKopeio),
RootDeviceName: aws.String("/dev/xvda"),
})
// We define a function so we can rebuild the tasks, because we modify in-place when running
buildTasks := func(spotPrice string) map[string]fi.Task {
lc := &LaunchConfiguration{
Name: s("lc1"),
SpotPrice: spotPrice,
ImageID: s("ami-12345678"),
InstanceType: s("m3.medium"),
SecurityGroups: []*SecurityGroup{},
}
return map[string]fi.Task{
"lc1": lc,
}
}
// We change the launch configuration 5 times, verifying that new launch configurations are created,
// and that older ones are eventually GCed
for i := 0; i < 5; i++ {
spotPrice := strconv.Itoa(i + 1)
{
allTasks := buildTasks(spotPrice)
lc1 := allTasks["lc1"].(*LaunchConfiguration)
target := &awsup.AWSAPITarget{
Cloud: cloud,
}
context, err := fi.NewContext(target, nil, cloud, nil, nil, nil, true, allTasks)
if err != nil {
t.Fatalf("error building context: %v", err)
}
// We use a longer deadline because we know we often need to
// retry here, because we create different versions of
// launchconfigurations using the timestamp, but only to
// per-second granularity. This normally works out because we
// retry for O(minutes), so after a few retries the clock has
// advanced. But if we use too short a deadline in our tests we
// don't get this behaviour.
options := testRunTasksOptions
options.MaxTaskDuration = 5 * time.Second
if err := context.RunTasks(options); err != nil {
t.Fatalf("unexpected error during Run: %v", err)
}
if fi.StringValue(lc1.ID) == "" {
t.Fatalf("ID not set after create")
}
expectedCount := i + 1
if expectedCount > RetainLaunchConfigurationCount() {
expectedCount = RetainLaunchConfigurationCount()
}
if len(as.LaunchConfigurations) != expectedCount {
t.Fatalf("Expected exactly %d LaunchConfigurations; found %v", expectedCount, as.LaunchConfigurations)
}
// TODO: verify that we retained the N latest
actual := as.LaunchConfigurations[*lc1.ID]
if aws.StringValue(actual.SpotPrice) != spotPrice {
t.Fatalf("Unexpected spotPrice: expected=%v actual=%v", spotPrice, aws.StringValue(actual.SpotPrice))
}
}
{
allTasks := buildTasks(spotPrice)
checkNoChanges(t, cloud, allTasks)
}
}
}

View File

@ -132,7 +132,7 @@ func TestSecurityGroupCreate(t *testing.T) {
t.Fatalf("error building context: %v", err)
}
if err := context.RunTasks(defaultDeadline); err != nil {
if err := context.RunTasks(testRunTasksOptions); err != nil {
t.Fatalf("unexpected error during Run: %v", err)
}

View File

@ -101,7 +101,7 @@ func TestSubnetCreate(t *testing.T) {
t.Fatalf("error building context: %v", err)
}
if err := context.RunTasks(defaultDeadline); err != nil {
if err := context.RunTasks(testRunTasksOptions); err != nil {
t.Fatalf("unexpected error during Run: %v", err)
}
@ -219,7 +219,7 @@ func TestSharedSubnetCreateDoesNotCreateNew(t *testing.T) {
t.Fatalf("error building context: %v", err)
}
if err := context.RunTasks(defaultDeadline); err != nil {
if err := context.RunTasks(testRunTasksOptions); err != nil {
t.Fatalf("unexpected error during Run: %v", err)
}

View File

@ -57,7 +57,7 @@ func TestVPCCreate(t *testing.T) {
t.Fatalf("error building context: %v", err)
}
if err := context.RunTasks(defaultDeadline); err != nil {
if err := context.RunTasks(testRunTasksOptions); err != nil {
t.Fatalf("unexpected error during Run: %v", err)
}

View File

@ -23,7 +23,6 @@ import (
"os"
"reflect"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/kops/dnsprovider/pkg/dnsprovider"
@ -80,11 +79,12 @@ func (c *Context) AllTasks() map[string]Task {
return c.tasks
}
func (c *Context) RunTasks(maxTaskDuration time.Duration) error {
func (c *Context) RunTasks(options RunTasksOptions) error {
e := &executor{
context: c,
options: options,
}
return e.RunTasks(c.tasks, maxTaskDuration)
return e.RunTasks(c.tasks)
}
func (c *Context) Close() {

View File

@ -27,6 +27,8 @@ import (
type executor struct {
context *Context
options RunTasksOptions
}
type taskState struct {
@ -38,9 +40,19 @@ type taskState struct {
dependencies []*taskState
}
type RunTasksOptions struct {
MaxTaskDuration time.Duration
WaitAfterAllTasksFailed time.Duration
}
func (o *RunTasksOptions) InitDefaults() {
o.MaxTaskDuration = 10 * time.Minute
o.WaitAfterAllTasksFailed = 10 * time.Second
}
// RunTasks executes all the tasks, considering their dependencies
// It will perform some re-execution on error, retrying as long as progress is still being made
func (e *executor) RunTasks(taskMap map[string]Task, maxTaskDuration time.Duration) error {
func (e *executor) RunTasks(taskMap map[string]Task) error {
dependencies := FindTaskDependencies(taskMap)
taskStates := make(map[string]*taskState)
@ -80,7 +92,7 @@ func (e *executor) RunTasks(taskMap map[string]Task, maxTaskDuration time.Durati
}
if ready {
if ts.deadline.IsZero() {
ts.deadline = time.Now().Add(maxTaskDuration)
ts.deadline = time.Now().Add(e.options.MaxTaskDuration)
} else if time.Now().After(ts.deadline) {
return fmt.Errorf("deadline exceeded executing task %v. Example error: %v", ts.key, ts.lastError)
}
@ -131,7 +143,7 @@ func (e *executor) RunTasks(taskMap map[string]Task, maxTaskDuration time.Durati
panic("did not make progress executing tasks; but no errors reported")
}
glog.Infof("No progress made, sleeping before retrying %d failed task(s)", len(errors))
time.Sleep(10 * time.Second)
time.Sleep(e.options.WaitAfterAllTasksFailed)
}
}

View File

@ -291,7 +291,10 @@ func (c *NodeUpCommand) Run(out io.Writer) error {
}
defer context.Close()
err = context.RunTasks(MaxTaskDuration)
var options fi.RunTasksOptions
options.InitDefaults()
err = context.RunTasks(options)
if err != nil {
glog.Exitf("error running tasks: %v", err)
}