mirror of https://github.com/kubernetes/kops.git
Implement MaxUnavailable
This commit is contained in:
parent
0952374027
commit
0c3651c9c8
|
|
@ -44,6 +44,7 @@ go_test(
|
|||
"//upup/pkg/fi/cloudup/awsup:go_default_library",
|
||||
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
|
||||
"//vendor/github.com/aws/aws-sdk-go/service/autoscaling:go_default_library",
|
||||
"//vendor/github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
|
|
|
|||
|
|
@ -103,7 +103,6 @@ func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting b
|
|||
|
||||
// TODO: Temporarily increase size of ASG?
|
||||
// TODO: Remove from ASG first so status is immediately updated?
|
||||
// TODO: Batch termination, like a rolling-update
|
||||
|
||||
// RollingUpdate performs a rolling update on a list of ec2 instances.
|
||||
func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpdateCluster, cluster *api.Cluster, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) {
|
||||
|
|
@ -118,6 +117,8 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
return fmt.Errorf("rollingUpdate is missing a k8s client")
|
||||
}
|
||||
|
||||
noneReady := len(r.CloudGroup.Ready) == 0
|
||||
numInstances := len(r.CloudGroup.Ready) + len(r.CloudGroup.NeedUpdate)
|
||||
update := r.CloudGroup.NeedUpdate
|
||||
if rollingUpdateData.Force {
|
||||
update = append(update, r.CloudGroup.Ready...)
|
||||
|
|
@ -148,15 +149,40 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
}
|
||||
}
|
||||
|
||||
for _, u := range update {
|
||||
err = r.drainTerminateAndWait(u, rollingUpdateData, isBastion, sleepAfterTerminate)
|
||||
settings := resolveSettings(cluster, r.CloudGroup.InstanceGroup, numInstances)
|
||||
|
||||
concurrency := 0
|
||||
maxConcurrency := 1
|
||||
|
||||
if r.CloudGroup.InstanceGroup.Spec.Role == api.InstanceGroupRoleNode && !rollingUpdateData.Interactive {
|
||||
maxConcurrency = settings.MaxUnavailable.IntValue()
|
||||
if maxConcurrency == 0 {
|
||||
klog.Infof("Rolling updates for InstanceGroup %s are disabled", r.CloudGroup.InstanceGroup.Name)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
terminateChan := make(chan error, maxConcurrency)
|
||||
|
||||
for uIdx, u := range update {
|
||||
go r.drainTerminateAndWait(u, rollingUpdateData, terminateChan, isBastion, sleepAfterTerminate)
|
||||
concurrency++
|
||||
|
||||
// Wait until after one node is deleted and its replacement validates before the concurrent draining
|
||||
// in case the current spec does not result in usable nodes.
|
||||
if concurrency < maxConcurrency && (!noneReady || uIdx > 0) {
|
||||
continue
|
||||
}
|
||||
|
||||
err = <-terminateChan
|
||||
concurrency--
|
||||
if err != nil {
|
||||
return err
|
||||
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
|
||||
}
|
||||
|
||||
err = r.maybeValidate(rollingUpdateData, validationTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
|
||||
}
|
||||
|
||||
if rollingUpdateData.Interactive {
|
||||
|
|
@ -174,11 +200,47 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
rollingUpdateData.Interactive = false
|
||||
}
|
||||
}
|
||||
|
||||
sweep:
|
||||
for concurrency > 0 {
|
||||
select {
|
||||
case err = <-terminateChan:
|
||||
concurrency--
|
||||
if err != nil {
|
||||
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
|
||||
}
|
||||
default:
|
||||
break sweep
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if concurrency > 0 {
|
||||
for concurrency > 0 {
|
||||
err = <-terminateChan
|
||||
concurrency--
|
||||
if err != nil {
|
||||
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
|
||||
}
|
||||
}
|
||||
|
||||
err = r.maybeValidate(rollingUpdateData, validationTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func waitForPendingBeforeReturningError(concurrency int, terminateChan chan error, err error) error {
|
||||
for concurrency > 0 {
|
||||
<-terminateChan
|
||||
concurrency--
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *RollingUpdateInstanceGroup) taintAllNeedUpdate(update []*cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster) error {
|
||||
var toTaint []*corev1.Node
|
||||
for _, u := range update {
|
||||
|
|
@ -237,7 +299,7 @@ func (r *RollingUpdateInstanceGroup) patchTaint(rollingUpdateData *RollingUpdate
|
|||
return err
|
||||
}
|
||||
|
||||
func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster, isBastion bool, sleepAfterTerminate time.Duration) error {
|
||||
func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster, terminateChan chan error, isBastion bool, sleepAfterTerminate time.Duration) {
|
||||
instanceId := u.ID
|
||||
|
||||
nodeName := ""
|
||||
|
|
@ -258,7 +320,8 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
|
|||
|
||||
if err := r.DrainNode(u, rollingUpdateData); err != nil {
|
||||
if rollingUpdateData.FailOnDrainError {
|
||||
return fmt.Errorf("failed to drain node %q: %v", nodeName, err)
|
||||
terminateChan <- fmt.Errorf("failed to drain node %q: %v", nodeName, err)
|
||||
return
|
||||
}
|
||||
klog.Infof("Ignoring error draining node %q: %v", nodeName, err)
|
||||
}
|
||||
|
|
@ -275,21 +338,23 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
|
|||
} else {
|
||||
klog.Infof("deleting node %q from kubernetes", nodeName)
|
||||
if err := r.deleteNode(u.Node, rollingUpdateData); err != nil {
|
||||
return fmt.Errorf("error deleting node %q: %v", nodeName, err)
|
||||
terminateChan <- fmt.Errorf("error deleting node %q: %v", nodeName, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.DeleteInstance(u); err != nil {
|
||||
klog.Errorf("error deleting instance %q, node %q: %v", instanceId, nodeName, err)
|
||||
return err
|
||||
terminateChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for the minimum interval
|
||||
klog.Infof("waiting for %v after terminating instance", sleepAfterTerminate)
|
||||
time.Sleep(sleepAfterTerminate)
|
||||
|
||||
return nil
|
||||
terminateChan <- nil
|
||||
}
|
||||
|
||||
func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration) error {
|
||||
|
|
|
|||
|
|
@ -19,14 +19,17 @@ package instancegroups
|
|||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/autoscaling"
|
||||
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
testingclient "k8s.io/client-go/testing"
|
||||
|
|
@ -42,7 +45,7 @@ const (
|
|||
taintPatch = "{\"spec\":{\"taints\":[{\"effect\":\"PreferNoSchedule\",\"key\":\"kops.k8s.io/scheduled-for-update\"}]}}"
|
||||
)
|
||||
|
||||
func getTestSetup() (*RollingUpdateCluster, awsup.AWSCloud, *kopsapi.Cluster) {
|
||||
func getTestSetup() (*RollingUpdateCluster, *awsup.MockAWSCloud, *kopsapi.Cluster) {
|
||||
k8sClient := fake.NewSimpleClientset()
|
||||
|
||||
mockcloud := awsup.BuildMockAWSCloud("us-east-1", "abc")
|
||||
|
|
@ -602,6 +605,262 @@ func TestRollingUpdateTaintAllButOneNeedUpdate(t *testing.T) {
|
|||
assertGroupInstanceCount(t, cloud, "node-1", 1)
|
||||
}
|
||||
|
||||
func TestRollingUpdateSettingsIgnoredForMaster(t *testing.T) {
|
||||
c, cloud, cluster := getTestSetup()
|
||||
|
||||
two := intstr.FromInt(2)
|
||||
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
|
||||
MaxUnavailable: &two,
|
||||
}
|
||||
|
||||
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
|
||||
makeGroup(groups, c.K8sClient, cloud, "master-1", kopsapi.InstanceGroupRoleMaster, 3, 2)
|
||||
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
|
||||
assert.NoError(t, err, "rolling update")
|
||||
|
||||
cordoned := ""
|
||||
tainted := map[string]bool{}
|
||||
deleted := map[string]bool{}
|
||||
for _, action := range c.K8sClient.(*fake.Clientset).Actions() {
|
||||
switch a := action.(type) {
|
||||
case testingclient.PatchAction:
|
||||
if string(a.GetPatch()) == cordonPatch {
|
||||
assertCordon(t, a)
|
||||
assert.Equal(t, "", cordoned, "at most one node cordoned at a time")
|
||||
assert.True(t, tainted[a.GetName()], "node", a.GetName(), "tainted")
|
||||
cordoned = a.GetName()
|
||||
} else {
|
||||
assertTaint(t, a)
|
||||
assert.Equal(t, "", cordoned, "not tainting while node cordoned")
|
||||
assert.False(t, tainted[a.GetName()], "node", a.GetName(), "already tainted")
|
||||
tainted[a.GetName()] = true
|
||||
}
|
||||
case testingclient.DeleteAction:
|
||||
assert.Equal(t, "nodes", a.GetResource().Resource)
|
||||
assert.Equal(t, cordoned, a.GetName(), "node was cordoned before delete")
|
||||
assert.False(t, deleted[a.GetName()], "node", a.GetName(), "already deleted")
|
||||
deleted[a.GetName()] = true
|
||||
cordoned = ""
|
||||
case testingclient.ListAction:
|
||||
// Don't care
|
||||
default:
|
||||
t.Errorf("unexpected action %v", a)
|
||||
}
|
||||
}
|
||||
|
||||
assertGroupInstanceCount(t, cloud, "master-1", 1)
|
||||
}
|
||||
|
||||
func TestRollingUpdateDisabled(t *testing.T) {
|
||||
c, cloud, cluster := getTestSetup()
|
||||
|
||||
zero := intstr.FromInt(0)
|
||||
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
|
||||
MaxUnavailable: &zero,
|
||||
}
|
||||
|
||||
groups := getGroupsAllNeedUpdate(c.K8sClient, cloud)
|
||||
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
|
||||
assert.NoError(t, err, "rolling update")
|
||||
|
||||
assertGroupInstanceCount(t, cloud, "node-1", 3)
|
||||
assertGroupInstanceCount(t, cloud, "node-2", 3)
|
||||
assertGroupInstanceCount(t, cloud, "master-1", 0)
|
||||
assertGroupInstanceCount(t, cloud, "bastion-1", 0)
|
||||
}
|
||||
|
||||
func TestRollingUpdateDisabledCloudonly(t *testing.T) {
|
||||
c, cloud, cluster := getTestSetup()
|
||||
c.CloudOnly = true
|
||||
|
||||
zero := intstr.FromInt(0)
|
||||
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
|
||||
MaxUnavailable: &zero,
|
||||
}
|
||||
|
||||
groups := getGroupsAllNeedUpdate(c.K8sClient, cloud)
|
||||
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
|
||||
assert.NoError(t, err, "rolling update")
|
||||
|
||||
assertGroupInstanceCount(t, cloud, "node-1", 3)
|
||||
assertGroupInstanceCount(t, cloud, "node-2", 3)
|
||||
assertGroupInstanceCount(t, cloud, "master-1", 0)
|
||||
assertGroupInstanceCount(t, cloud, "bastion-1", 0)
|
||||
}
|
||||
|
||||
// The concurrent update tests attempt to induce the following expected update sequence:
|
||||
//
|
||||
// (Only for "all need update" tests, to verify the toe-dipping behavior)
|
||||
// Request validate (7) -->
|
||||
// <-- validated
|
||||
// Request terminate 1 node (7) -->
|
||||
// <-- 1 node terminated, 6 left
|
||||
// (end only for "all need update" tests)
|
||||
// Request validate (6) -->
|
||||
// <-- validated
|
||||
// Request terminate 2 nodes (6,5) -->
|
||||
// <-- 1 node terminated (5), 5 left
|
||||
// Request validate (4) -->
|
||||
// <-- 1 node terminated (6), 4 left
|
||||
// <-- validated
|
||||
// Request terminate 2 nodes (4,3) -->
|
||||
// <-- 1 node terminated (3), 3 left
|
||||
// Request validate (2) -->
|
||||
// <-- validated
|
||||
// Request terminate 1 node (2) -->
|
||||
// <-- 1 node terminated (2), 2 left
|
||||
// Request validate (1) -->
|
||||
// <-- 1 node terminated (4), 1 left
|
||||
// <-- validated
|
||||
// Request terminate 1 node (1) -->
|
||||
// <-- 1 node terminated, 0 left
|
||||
// Request validate (0) -->
|
||||
// <-- validated
|
||||
|
||||
type concurrentTest struct {
|
||||
autoscalingiface.AutoScalingAPI
|
||||
t *testing.T
|
||||
mutex sync.Mutex
|
||||
terminationRequestsLeft int
|
||||
previousValidation int
|
||||
validationChan chan bool
|
||||
terminationChan chan bool
|
||||
}
|
||||
|
||||
func (c *concurrentTest) Validate() (*validation.ValidationCluster, error) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
terminationRequestsLeft := c.terminationRequestsLeft
|
||||
switch terminationRequestsLeft {
|
||||
case 7, 6, 0:
|
||||
assert.Equal(c.t, terminationRequestsLeft+1, c.previousValidation, "previous validation")
|
||||
case 5, 3:
|
||||
c.t.Errorf("unexpected call to Validate with %d termination requests left", terminationRequestsLeft)
|
||||
case 4:
|
||||
assert.Equal(c.t, 6, c.previousValidation, "previous validation")
|
||||
c.terminationChan <- true
|
||||
c.mutex.Unlock()
|
||||
select {
|
||||
case <-c.validationChan:
|
||||
case <-time.After(1 * time.Second):
|
||||
c.t.Error("timed out reading from validationChan")
|
||||
}
|
||||
c.mutex.Lock()
|
||||
case 2:
|
||||
assert.Equal(c.t, 4, c.previousValidation, "previous validation")
|
||||
case 1:
|
||||
assert.Equal(c.t, 2, c.previousValidation, "previous validation")
|
||||
c.terminationChan <- true
|
||||
c.mutex.Unlock()
|
||||
select {
|
||||
case <-c.validationChan:
|
||||
case <-time.After(1 * time.Second):
|
||||
c.t.Error("timed out reading from validationChan")
|
||||
}
|
||||
c.mutex.Lock()
|
||||
}
|
||||
c.previousValidation = terminationRequestsLeft
|
||||
|
||||
return &validation.ValidationCluster{}, nil
|
||||
}
|
||||
|
||||
func (c *concurrentTest) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
terminationRequestsLeft := c.terminationRequestsLeft
|
||||
c.terminationRequestsLeft--
|
||||
switch terminationRequestsLeft {
|
||||
case 7, 2, 1:
|
||||
assert.Equal(c.t, terminationRequestsLeft, c.previousValidation, "previous validation")
|
||||
case 6, 4:
|
||||
assert.Equal(c.t, terminationRequestsLeft, c.previousValidation, "previous validation")
|
||||
c.mutex.Unlock()
|
||||
select {
|
||||
case <-c.terminationChan:
|
||||
case <-time.After(1 * time.Second):
|
||||
c.t.Error("timed out reading from terminationChan")
|
||||
}
|
||||
c.mutex.Lock()
|
||||
go c.delayThenWakeValidation()
|
||||
case 5, 3:
|
||||
assert.Equal(c.t, terminationRequestsLeft+1, c.previousValidation, "previous validation")
|
||||
}
|
||||
return c.AutoScalingAPI.TerminateInstanceInAutoScalingGroup(input)
|
||||
}
|
||||
|
||||
func (c *concurrentTest) delayThenWakeValidation() {
|
||||
time.Sleep(2 * time.Millisecond) // NodeInterval plus some
|
||||
c.validationChan <- true
|
||||
}
|
||||
|
||||
func (c *concurrentTest) AssertComplete() {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
assert.Equal(c.t, 0, c.previousValidation, "last validation")
|
||||
}
|
||||
|
||||
func newConcurrentTest(t *testing.T, cloud *awsup.MockAWSCloud, allNeedUpdate bool) *concurrentTest {
|
||||
test := concurrentTest{
|
||||
AutoScalingAPI: cloud.MockAutoscaling,
|
||||
t: t,
|
||||
terminationRequestsLeft: 6,
|
||||
validationChan: make(chan bool),
|
||||
terminationChan: make(chan bool),
|
||||
}
|
||||
if allNeedUpdate {
|
||||
test.terminationRequestsLeft = 7
|
||||
}
|
||||
test.previousValidation = test.terminationRequestsLeft + 1
|
||||
return &test
|
||||
}
|
||||
|
||||
func TestRollingUpdateMaxUnavailableAllNeedUpdate(t *testing.T) {
|
||||
c, cloud, cluster := getTestSetup()
|
||||
|
||||
concurrentTest := newConcurrentTest(t, cloud, true)
|
||||
c.ValidateSuccessDuration = 0
|
||||
c.ClusterValidator = concurrentTest
|
||||
cloud.MockAutoscaling = concurrentTest
|
||||
|
||||
two := intstr.FromInt(2)
|
||||
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
|
||||
MaxUnavailable: &two,
|
||||
}
|
||||
|
||||
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
|
||||
makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 7, 7)
|
||||
|
||||
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
|
||||
assert.NoError(t, err, "rolling update")
|
||||
|
||||
assertGroupInstanceCount(t, cloud, "node-1", 0)
|
||||
concurrentTest.AssertComplete()
|
||||
}
|
||||
|
||||
func TestRollingUpdateMaxUnavailableAllButOneNeedUpdate(t *testing.T) {
|
||||
c, cloud, cluster := getTestSetup()
|
||||
|
||||
concurrentTest := newConcurrentTest(t, cloud, false)
|
||||
c.ValidateSuccessDuration = 0
|
||||
c.ClusterValidator = concurrentTest
|
||||
cloud.MockAutoscaling = concurrentTest
|
||||
|
||||
two := intstr.FromInt(2)
|
||||
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
|
||||
MaxUnavailable: &two,
|
||||
}
|
||||
|
||||
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
|
||||
makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 7, 6)
|
||||
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
|
||||
assert.NoError(t, err, "rolling update")
|
||||
|
||||
assertGroupInstanceCount(t, cloud, "node-1", 1)
|
||||
concurrentTest.AssertComplete()
|
||||
}
|
||||
|
||||
func assertCordon(t *testing.T, action testingclient.PatchAction) {
|
||||
assert.Equal(t, "nodes", action.GetResource().Resource)
|
||||
assert.Equal(t, cordonPatch, string(action.GetPatch()))
|
||||
|
|
|
|||
Loading…
Reference in New Issue