Merge pull request #11227 from olemarkus/warm-roll

Give kOps CLI knowledge about ASG warm pools
This commit is contained in:
Kubernetes Prow Robot 2021-04-15 09:46:07 -07:00 committed by GitHub
commit 5aa8a31819
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 223 additions and 42 deletions

View File

@ -29,6 +29,7 @@ type MockAutoscaling struct {
mutex sync.Mutex mutex sync.Mutex
Groups map[string]*autoscaling.Group Groups map[string]*autoscaling.Group
WarmPoolInstances map[string][]*autoscaling.Instance
LaunchConfigurations map[string]*autoscaling.LaunchConfiguration LaunchConfigurations map[string]*autoscaling.LaunchConfiguration
} }

View File

@ -265,6 +265,15 @@ func (m *MockAutoscaling) TerminateInstanceInAutoScalingGroup(input *autoscaling
}, nil }, nil
} }
} }
wp := m.WarmPoolInstances[*group.AutoScalingGroupName]
for i := range wp {
if aws.StringValue(wp[i].InstanceId) == aws.StringValue(input.InstanceId) {
m.WarmPoolInstances[*group.AutoScalingGroupName] = append(wp[:i], wp[i+1:]...)
return &autoscaling.TerminateInstanceInAutoScalingGroupOutput{
Activity: nil, // TODO
}, nil
}
}
} }
return nil, fmt.Errorf("Instance not found") return nil, fmt.Errorf("Instance not found")

View File

@ -174,7 +174,11 @@ func instanceOutputTable(instances []*cloudinstances.CloudInstance, out io.Write
t.AddColumn("MACHINE-TYPE", func(i *cloudinstances.CloudInstance) string { t.AddColumn("MACHINE-TYPE", func(i *cloudinstances.CloudInstance) string {
return i.MachineType return i.MachineType
}) })
columns := []string{"ID", "NODE-NAME", "STATUS", "ROLES", "INTERNAL-IP", "INSTANCE-GROUP", "MACHINE-TYPE"} t.AddColumn("STATE", func(i *cloudinstances.CloudInstance) string {
return string(i.State)
})
columns := []string{"ID", "NODE-NAME", "STATUS", "ROLES", "STATE", "INTERNAL-IP", "INSTANCE-GROUP", "MACHINE-TYPE"}
return t.Render(instances, os.Stdout, columns...) return t.Render(instances, os.Stdout, columns...)
} }

View File

@ -27,6 +27,11 @@ const CloudInstanceStatusNeedsUpdate = "NeedsUpdate"
// CloudInstanceStatusReady means the instance has joined the cluster, is not detached, and is up to date. // CloudInstanceStatusReady means the instance has joined the cluster, is not detached, and is up to date.
const CloudInstanceStatusUpToDate = "UpToDate" const CloudInstanceStatusUpToDate = "UpToDate"
type State string
// WarmPool means the instance is in the warm pool
const WarmPool State = "WarmPool"
// CloudInstance describes an instance in a CloudInstanceGroup group. // CloudInstance describes an instance in a CloudInstanceGroup group.
type CloudInstance struct { type CloudInstance struct {
// ID is a unique identifier for the instance, meaningful to the cloud // ID is a unique identifier for the instance, meaningful to the cloud
@ -43,4 +48,6 @@ type CloudInstance struct {
MachineType string MachineType string
// Private IP is the private ip address of the instance. // Private IP is the private ip address of the instance.
PrivateIP string PrivateIP string
// State is in which state the instance is in
State State
} }

View File

@ -41,7 +41,7 @@ type CloudInstanceGroup struct {
} }
// NewCloudInstance creates a new CloudInstance // NewCloudInstance creates a new CloudInstance
func (c *CloudInstanceGroup) NewCloudInstance(instanceId string, status string, nodeMap map[string]*v1.Node) (*CloudInstance, error) { func (c *CloudInstanceGroup) NewCloudInstance(instanceId string, status string, node *v1.Node) (*CloudInstance, error) {
if instanceId == "" { if instanceId == "" {
return nil, fmt.Errorf("instance id for cloud instance member cannot be empty") return nil, fmt.Errorf("instance id for cloud instance member cannot be empty")
} }
@ -58,7 +58,6 @@ func (c *CloudInstanceGroup) NewCloudInstance(instanceId string, status string,
cm.Status = status cm.Status = status
node := nodeMap[instanceId]
if node != nil { if node != nil {
cm.Node = node cm.Node = node
} else { } else {

View File

@ -36,6 +36,7 @@ go_test(
srcs = [ srcs = [
"rollingupdate_os_test.go", "rollingupdate_os_test.go",
"rollingupdate_test.go", "rollingupdate_test.go",
"rollingupdate_warmpool_test.go",
"settings_test.go", "settings_test.go",
], ],
embed = [":go_default_library"], embed = [":go_default_library"],

View File

@ -124,6 +124,21 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(group *cloudinstances.
} }
} }
nonWarmPool := []*cloudinstances.CloudInstance{}
// Run through the warm pool and delete all instances directly
for _, instance := range update {
if instance.State == cloudinstances.WarmPool {
klog.Infof("deleting warm pool instance %q", instance.ID)
err := c.Cloud.DeleteInstance(instance)
if err != nil {
return fmt.Errorf("failed to delete warm pool instance %q: %w", instance.ID, err)
}
} else {
nonWarmPool = append(nonWarmPool, instance)
}
}
update = nonWarmPool
if c.Interactive { if c.Interactive {
if maxSurge > 1 { if maxSurge > 1 {
maxSurge = 1 maxSurge = 1

View File

@ -53,7 +53,9 @@ func getTestSetup() (*RollingUpdateCluster, *awsup.MockAWSCloud) {
k8sClient := fake.NewSimpleClientset() k8sClient := fake.NewSimpleClientset()
mockcloud := awsup.BuildMockAWSCloud("us-east-1", "abc") mockcloud := awsup.BuildMockAWSCloud("us-east-1", "abc")
mockAutoscaling := &mockautoscaling.MockAutoscaling{} mockAutoscaling := &mockautoscaling.MockAutoscaling{
WarmPoolInstances: make(map[string][]*autoscaling.Instance),
}
mockcloud.MockAutoscaling = mockAutoscaling mockcloud.MockAutoscaling = mockAutoscaling
mockcloud.MockEC2 = mockAutoscaling.GetEC2Shim(mockcloud.MockEC2) mockcloud.MockEC2 = mockAutoscaling.GetEC2Shim(mockcloud.MockEC2)
@ -135,7 +137,7 @@ func (v *assertNotCalledClusterValidator) Validate() (*validation.ValidationClus
func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient kubernetes.Interface, cloud awsup.AWSCloud, name string, role kopsapi.InstanceGroupRole, count int, needUpdate int) { func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient kubernetes.Interface, cloud awsup.AWSCloud, name string, role kopsapi.InstanceGroupRole, count int, needUpdate int) {
fakeClient := k8sClient.(*fake.Clientset) fakeClient := k8sClient.(*fake.Clientset)
groups[name] = &cloudinstances.CloudInstanceGroup{ group := &cloudinstances.CloudInstanceGroup{
HumanName: name, HumanName: name,
InstanceGroup: &kopsapi.InstanceGroup{ InstanceGroup: &kopsapi.InstanceGroup{
ObjectMeta: v1meta.ObjectMeta{ ObjectMeta: v1meta.ObjectMeta{
@ -147,6 +149,8 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k
}, },
Raw: &autoscaling.Group{AutoScalingGroupName: aws.String("asg-" + name)}, Raw: &autoscaling.Group{AutoScalingGroupName: aws.String("asg-" + name)},
} }
groups[name] = group
cloud.Autoscaling().CreateAutoScalingGroup(&autoscaling.CreateAutoScalingGroupInput{ cloud.Autoscaling().CreateAutoScalingGroup(&autoscaling.CreateAutoScalingGroupInput{
AutoScalingGroupName: aws.String(name), AutoScalingGroupName: aws.String(name),
DesiredCapacity: aws.Int64(int64(count)), DesiredCapacity: aws.Int64(int64(count)),
@ -164,16 +168,16 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k
} }
_ = fakeClient.Tracker().Add(node) _ = fakeClient.Tracker().Add(node)
} }
member := cloudinstances.CloudInstance{
ID: id, var status string
Node: node,
CloudInstanceGroup: groups[name],
}
if i < needUpdate { if i < needUpdate {
groups[name].NeedUpdate = append(groups[name].NeedUpdate, &member) status = cloudinstances.CloudInstanceStatusNeedsUpdate
} else { } else {
groups[name].Ready = append(groups[name].Ready, &member) status = cloudinstances.CloudInstanceStatusUpToDate
} }
group.NewCloudInstance(id, status, node)
instanceIds = append(instanceIds, aws.String(id)) instanceIds = append(instanceIds, aws.String(id))
} }
cloud.Autoscaling().AttachInstances(&autoscaling.AttachInstancesInput{ cloud.Autoscaling().AttachInstances(&autoscaling.AttachInstancesInput{

View File

@ -0,0 +1,116 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package instancegroups
import (
"testing"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"
"k8s.io/kops/cloudmock/aws/mockautoscaling"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
)
// Here we have three nodes that are up to date, while three warm nodes need updating.
// Only the initial cluster validation should be run
func TestRollingUpdateOnlyWarmPoolNodes(t *testing.T) {
c, cloud := getTestSetup()
k8sClient := c.K8sClient
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroupWithWarmPool(groups, k8sClient, cloud, "node-1", kops.InstanceGroupRoleNode, 3, 0, 3, 3)
validator := &countingValidator{}
c.ClusterValidator = validator
assert.Equal(t, 3, len(groups["node-1"].NeedUpdate), "number of nodes needing update")
err := c.RollingUpdate(groups, &kops.InstanceGroupList{})
assert.NoError(t, err, "rolling update")
assert.Equal(t, 1, validator.numValidations, "number of validations")
}
func TestRollingWarmPoolBeforeJoinedNodes(t *testing.T) {
c, cloud := getTestSetup()
k8sClient := c.K8sClient
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroupWithWarmPool(groups, k8sClient, cloud, "node-1", kops.InstanceGroupRoleNode, 3, 3, 3, 3)
warmPoolBeforeJoinedNodesTest := &warmPoolBeforeJoinedNodesTest{
EC2API: cloud.MockEC2,
t: t,
}
cloud.MockEC2 = warmPoolBeforeJoinedNodesTest
err := c.RollingUpdate(groups, &kops.InstanceGroupList{})
assert.NoError(t, err, "rolling update")
assert.Equal(t, 6, warmPoolBeforeJoinedNodesTest.numTerminations, "Number of terminations")
}
type countingValidator struct {
numValidations int
}
func (c *countingValidator) Validate() (*validation.ValidationCluster, error) {
c.numValidations++
return &validation.ValidationCluster{}, nil
}
func makeGroupWithWarmPool(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient kubernetes.Interface, cloud *awsup.MockAWSCloud, name string, role kops.InstanceGroupRole, count int, needUpdate int, warmCount int, warmNeedUpdate int) {
makeGroup(groups, k8sClient, cloud, name, role, count, needUpdate)
group := groups[name]
wpInstances := []*autoscaling.Instance{}
warmStoppedState := autoscaling.LifecycleStateWarmedStopped
for i := 0; i < warmCount; i++ {
id := name + "-wp-" + string(rune('a'+i))
instance := &autoscaling.Instance{
InstanceId: &id,
LifecycleState: &warmStoppedState,
}
wpInstances = append(wpInstances, instance)
cm, _ := group.NewCloudInstance(id, cloudinstances.CloudInstanceStatusNeedsUpdate, nil)
cm.State = cloudinstances.WarmPool
}
// There is no API to write to warm pools, so we need to cheat.
mockASG := cloud.MockAutoscaling.(*mockautoscaling.MockAutoscaling)
mockASG.WarmPoolInstances[name] = wpInstances
}
type warmPoolBeforeJoinedNodesTest struct {
ec2iface.EC2API
t *testing.T
numTerminations int
}
func (t *warmPoolBeforeJoinedNodesTest) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) {
t.numTerminations++
return t.EC2API.TerminateInstances(input)
}

View File

@ -447,7 +447,7 @@ func buildCloudInstanceGroup(c *Cloud, ig *kops.InstanceGroup, g DOInstanceGroup
for _, member := range g.Members { for _, member := range g.Members {
// TODO use a hash of the godo.DropletCreateRequest fields to calculate the second parameter. // TODO use a hash of the godo.DropletCreateRequest fields to calculate the second parameter.
_, err := cg.NewCloudInstance(member, cloudinstances.CloudInstanceStatusUpToDate, nodeMap) _, err := cg.NewCloudInstance(member, cloudinstances.CloudInstanceStatusUpToDate, nodeMap[member])
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err) return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
} }

View File

@ -383,7 +383,7 @@ func registerCloudInstances(instanceGroup *cloudinstances.CloudInstanceGroup, no
status = cloudinstances.CloudInstanceStatusNeedsUpdate status = cloudinstances.CloudInstanceStatusNeedsUpdate
} }
if _, err := instanceGroup.NewCloudInstance( if _, err := instanceGroup.NewCloudInstance(
instance.Id(), status, nodeMap); err != nil { instance.Id(), status, nodeMap[instance.Id()]); err != nil {
return fmt.Errorf("error creating cloud instance group member: %v", err) return fmt.Errorf("error creating cloud instance group member: %v", err)
} }
} }

View File

@ -307,6 +307,9 @@ func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances
// bastion nodes don't join the cluster // bastion nodes don't join the cluster
nodeExpectedToJoin = false nodeExpectedToJoin = false
} }
if member.State == cloudinstances.WarmPool {
nodeExpectedToJoin = false
}
if nodeExpectedToJoin { if nodeExpectedToJoin {
v.addError(&ValidationError{ v.addError(&ValidationError{

View File

@ -279,7 +279,7 @@ func buildCloudInstanceGroup(c ALICloud, ig *kops.InstanceGroup, g ess.ScalingGr
if newLaunchConfigName != i.ScalingConfigurationId { if newLaunchConfigName != i.ScalingConfigurationId {
status = cloudinstances.CloudInstanceStatusNeedsUpdate status = cloudinstances.CloudInstanceStatusNeedsUpdate
} }
_, err := cg.NewCloudInstance(instanceId, status, nodeMap) _, err := cg.NewCloudInstance(instanceId, status, nodeMap[instanceId])
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err) return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
} }

View File

@ -780,34 +780,24 @@ func awsBuildCloudInstanceGroup(c AWSCloud, cluster *kops.Cluster, ig *kops.Inst
} }
for _, i := range g.Instances { for _, i := range g.Instances {
id := aws.StringValue(i.InstanceId) err := buildCloudInstance(i, instances, instanceSeen, nodeMap, cg, newConfigName)
if id == "" {
klog.Warningf("ignoring instance with no instance id: %s in autoscaling group: %s", id, cg.HumanName)
continue
}
instanceSeen[id] = true
// @step: check if the instance is terminating
if aws.StringValue(i.LifecycleState) == autoscaling.LifecycleStateTerminating {
klog.Warningf("ignoring instance as it is terminating: %s in autoscaling group: %s", id, cg.HumanName)
continue
}
if instances[id] == nil {
continue
}
currentConfigName := findInstanceLaunchConfiguration(i)
status := cloudinstances.CloudInstanceStatusUpToDate
if newConfigName != currentConfigName {
status = cloudinstances.CloudInstanceStatusNeedsUpdate
}
cm, err := cg.NewCloudInstance(id, status, nodeMap)
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err) return nil, err
} }
addCloudInstanceData(cm, instances[id])
} }
result, err := c.Autoscaling().DescribeWarmPool(&autoscaling.DescribeWarmPoolInput{
AutoScalingGroupName: g.AutoScalingGroupName,
})
if err != nil {
return nil, err
}
for _, i := range result.Instances {
err := buildCloudInstance(i, instances, instanceSeen, nodeMap, cg, newConfigName)
if err != nil {
return nil, err
}
}
var detached []*string var detached []*string
for id, instance := range instances { for id, instance := range instances {
for _, tag := range instance.Tags { for _, tag := range instance.Tags {
@ -821,7 +811,7 @@ func awsBuildCloudInstanceGroup(c AWSCloud, cluster *kops.Cluster, ig *kops.Inst
} }
for _, id := range detached { for _, id := range detached {
if id != nil && *id != "" && !instanceSeen[*id] { if id != nil && *id != "" && !instanceSeen[*id] {
cm, err := cg.NewCloudInstance(*id, cloudinstances.CloudInstanceStatusDetached, nodeMap) cm, err := cg.NewCloudInstance(*id, cloudinstances.CloudInstanceStatusDetached, nodeMap[*id])
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err) return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
} }
@ -833,6 +823,38 @@ func awsBuildCloudInstanceGroup(c AWSCloud, cluster *kops.Cluster, ig *kops.Inst
return cg, nil return cg, nil
} }
func buildCloudInstance(i *autoscaling.Instance, instances map[string]*ec2.Instance, instanceSeen map[string]bool, nodeMap map[string]*v1.Node, cg *cloudinstances.CloudInstanceGroup, newConfigName string) error {
id := aws.StringValue(i.InstanceId)
if id == "" {
klog.Warningf("ignoring instance with no instance id: %s in autoscaling group: %s", id, cg.HumanName)
return nil
}
instanceSeen[id] = true
// @step: check if the instance is terminating
if aws.StringValue(i.LifecycleState) == autoscaling.LifecycleStateTerminating {
klog.Warningf("ignoring instance as it is terminating: %s in autoscaling group: %s", id, cg.HumanName)
return nil
}
if instances[id] == nil {
return nil
}
currentConfigName := findInstanceLaunchConfiguration(i)
status := cloudinstances.CloudInstanceStatusUpToDate
if newConfigName != currentConfigName {
status = cloudinstances.CloudInstanceStatusNeedsUpdate
}
cm, err := cg.NewCloudInstance(id, status, nodeMap[id])
if err != nil {
return fmt.Errorf("error creating cloud instance group member: %v", err)
}
if strings.HasPrefix(*i.LifecycleState, "Warmed") {
cm.State = cloudinstances.WarmPool
}
addCloudInstanceData(cm, instances[id])
return nil
}
func addCloudInstanceData(cm *cloudinstances.CloudInstance, instance *ec2.Instance) { func addCloudInstanceData(cm *cloudinstances.CloudInstance, instance *ec2.Instance) {
cm.MachineType = aws.StringValue(instance.InstanceType) cm.MachineType = aws.StringValue(instance.InstanceType)
for _, tag := range instance.Tags { for _, tag := range instance.Tags {

View File

@ -183,7 +183,7 @@ func (c *azureCloudImplementation) buildCloudInstanceGroup(
// TODO(kenji): Set the status properly so that kops can // TODO(kenji): Set the status properly so that kops can
// tell whether a VM is up-to-date or not. // tell whether a VM is up-to-date or not.
status := cloudinstances.CloudInstanceStatusUpToDate status := cloudinstances.CloudInstanceStatusUpToDate
_, err := cg.NewCloudInstance(*vm.Name, status, nodeMap) _, err := cg.NewCloudInstance(*vm.Name, status, nodeMap[*vm.Name])
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %s", err) return nil, fmt.Errorf("error creating cloud instance group member: %s", err)
} }

View File

@ -137,7 +137,7 @@ func osBuildCloudInstanceGroup(c OpenstackCloud, cluster *kops.Cluster, ig *kops
if generationName != observedName { if generationName != observedName {
status = cloudinstances.CloudInstanceStatusNeedsUpdate status = cloudinstances.CloudInstanceStatusNeedsUpdate
} }
cm, err := cg.NewCloudInstance(instanceId, status, nodeMap) cm, err := cg.NewCloudInstance(instanceId, status, nodeMap[instanceId])
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err) return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
} }