Merge pull request #9799 from olemarkus/cloudinstances-refactor

Cloudinstances refactor
This commit is contained in:
Kubernetes Prow Robot 2020-08-31 23:23:50 -07:00 committed by GitHub
commit e11146c0df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 214 additions and 126 deletions

View File

@ -267,12 +267,12 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
return d.UpdateSingleInstance(ctx, cloudMember, options.Surge)
}
func deleteNodeMatch(cloudMember *cloudinstances.CloudInstanceGroupMember, options *deleteInstanceOptions) bool {
func deleteNodeMatch(cloudMember *cloudinstances.CloudInstance, options *deleteInstanceOptions) bool {
return cloudMember.ID == options.InstanceID ||
(!options.CloudOnly && cloudMember.Node != nil && cloudMember.Node.Name == options.InstanceID)
}
func findDeletionNode(groups map[string]*cloudinstances.CloudInstanceGroup, options *deleteInstanceOptions) *cloudinstances.CloudInstanceGroupMember {
func findDeletionNode(groups map[string]*cloudinstances.CloudInstanceGroup, options *deleteInstanceOptions) *cloudinstances.CloudInstance {
for _, group := range groups {
for _, r := range group.Ready {
if deleteNodeMatch(r, options) {

View File

@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["cloud_instance_group.go"],
srcs = [
"cloud_instance.go",
"cloud_instance_group.go",
],
importpath = "k8s.io/kops/pkg/cloudinstances",
visibility = ["//visibility:public"],
deps = [

View File

@ -0,0 +1,46 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudinstances
import v1 "k8s.io/api/core/v1"
// CloudInstanceStatusDetached means the instance needs update and has been detached.
const CloudInstanceStatusDetached = "Detached"
// CloudInstanceStatusNeedsUpdate means the instance has joined the cluster, is not detached, and needs to be updated.
const CloudInstanceStatusNeedsUpdate = "NeedsUpdate"
// CloudInstanceStatusReady means the instance has joined the cluster, is not detached, and is up to date.
const CloudInstanceStatusUpToDate = "UpToDate"
// CloudInstance describes an instance in a CloudInstanceGroup group.
type CloudInstance struct {
// ID is a unique identifier for the instance, meaningful to the cloud
ID string
// Node is the associated k8s instance, if it is known
Node *v1.Node
// CloudInstanceGroup is the managing CloudInstanceGroup
CloudInstanceGroup *CloudInstanceGroup
// Status indicates if the instance has joined the cluster and if it needs any updates.
Status string
// Roles are the roles the instance have.
Roles []string
// MachineType is the hardware resource class of the instance.
MachineType string
// Private IP is the private ip address of the instance.
PrivateIP string
}

View File

@ -30,8 +30,8 @@ type CloudInstanceGroup struct {
// HumanName is a user-friendly name for the group
HumanName string
InstanceGroup *kopsapi.InstanceGroup
Ready []*CloudInstanceGroupMember
NeedUpdate []*CloudInstanceGroupMember
Ready []*CloudInstance
NeedUpdate []*CloudInstance
MinSize int
TargetSize int
MaxSize int
@ -40,63 +40,31 @@ type CloudInstanceGroup struct {
Raw interface{}
}
// CloudInstanceGroupMember describes an instance in a CloudInstanceGroup group.
type CloudInstanceGroupMember struct {
// ID is a unique identifier for the instance, meaningful to the cloud
ID string
// Node is the associated k8s instance, if it is known
Node *v1.Node
// CloudInstanceGroup is the managing CloudInstanceGroup
CloudInstanceGroup *CloudInstanceGroup
// Detached is whether fi.Cloud.DetachInstance has been successfully called on the instance.
Detached bool
}
// NewCloudInstanceGroupMember creates a new CloudInstanceGroupMember
func (c *CloudInstanceGroup) NewCloudInstanceGroupMember(instanceId string, newGroupName string, currentGroupName string, nodeMap map[string]*v1.Node) error {
// NewCloudInstance creates a new CloudInstance
func (c *CloudInstanceGroup) NewCloudInstance(instanceId string, status string, nodeMap map[string]*v1.Node) (*CloudInstance, error) {
if instanceId == "" {
return fmt.Errorf("instance id for cloud instance member cannot be empty")
return nil, fmt.Errorf("instance id for cloud instance member cannot be empty")
}
cm := &CloudInstanceGroupMember{
cm := &CloudInstance{
ID: instanceId,
CloudInstanceGroup: c,
}
node := nodeMap[instanceId]
if node != nil {
cm.Node = node
} else {
klog.V(8).Infof("unable to find node for instance: %s", instanceId)
}
if newGroupName == currentGroupName {
if status == CloudInstanceStatusUpToDate {
c.Ready = append(c.Ready, cm)
} else {
c.NeedUpdate = append(c.NeedUpdate, cm)
}
return nil
}
cm.Status = status
// NewDetachedCloudInstanceGroupMember creates a new CloudInstanceGroupMember for a detached instance
func (c *CloudInstanceGroup) NewDetachedCloudInstanceGroupMember(instanceId string, nodeMap map[string]*v1.Node) error {
if instanceId == "" {
return fmt.Errorf("instance id for cloud instance member cannot be empty")
}
cm := &CloudInstanceGroupMember{
ID: instanceId,
CloudInstanceGroup: c,
Detached: true,
}
node := nodeMap[instanceId]
if node != nil {
cm.Node = node
} else {
klog.V(8).Infof("unable to find node for instance: %s", instanceId)
}
c.NeedUpdate = append(c.NeedUpdate, cm)
return nil
return cm, nil
}
// Status returns a human-readable Status indicating whether an update is needed

View File

@ -132,7 +132,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, c
if maxSurge > 0 && !c.CloudOnly {
for numSurge := 1; numSurge <= maxSurge; numSurge++ {
u := update[len(update)-numSurge]
if !u.Detached {
if u.Status != cloudinstances.CloudInstanceStatusDetached {
if err := c.detachInstance(u); err != nil {
return err
}
@ -161,7 +161,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, c
terminateChan := make(chan error, maxConcurrency)
for uIdx, u := range update {
go func(m *cloudinstances.CloudInstanceGroupMember) {
go func(m *cloudinstances.CloudInstance) {
terminateChan <- c.drainTerminateAndWait(ctx, m, sleepAfterTerminate)
}(u)
runningDrains++
@ -233,15 +233,15 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, c
return nil
}
func prioritizeUpdate(update []*cloudinstances.CloudInstanceGroupMember) []*cloudinstances.CloudInstanceGroupMember {
func prioritizeUpdate(update []*cloudinstances.CloudInstance) []*cloudinstances.CloudInstance {
// The priorities are, in order:
// attached before detached
// TODO unhealthy before healthy
// NeedUpdate before Ready (preserve original order)
result := make([]*cloudinstances.CloudInstanceGroupMember, 0, len(update))
var detached []*cloudinstances.CloudInstanceGroupMember
result := make([]*cloudinstances.CloudInstance, 0, len(update))
var detached []*cloudinstances.CloudInstance
for _, u := range update {
if u.Detached {
if u.Status == cloudinstances.CloudInstanceStatusDetached {
detached = append(detached, u)
} else {
result = append(result, u)
@ -260,7 +260,7 @@ func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan er
return err
}
func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstanceGroupMember) error {
func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstance) error {
var toTaint []*corev1.Node
for _, u := range update {
if u.Node != nil && !u.Node.Spec.Unschedulable {
@ -321,7 +321,7 @@ func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node
return err
}
func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstanceGroupMember, sleepAfterTerminate time.Duration) error {
func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstance, sleepAfterTerminate time.Duration) error {
instanceID := u.ID
nodeName := ""
@ -454,7 +454,7 @@ func (c *RollingUpdateCluster) validateClusterWithTimeout(validateCount int) err
}
// detachInstance detaches a Cloud Instance
func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstanceGroupMember) error {
func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstance) error {
id := u.ID
nodeName := ""
if u.Node != nil {
@ -477,7 +477,7 @@ func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstanceGro
}
// deleteInstance deletes an Cloud Instance.
func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstanceGroupMember) error {
func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstance) error {
id := u.ID
nodeName := ""
if u.Node != nil {
@ -500,7 +500,7 @@ func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstanceGro
}
// drainNode drains a K8s node.
func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstanceGroupMember) error {
func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error {
if c.K8sClient == nil {
return fmt.Errorf("K8sClient not set")
}
@ -566,7 +566,7 @@ func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node
}
// UpdateSingeInstance performs a rolling update on a single instance
func (c *RollingUpdateCluster) UpdateSingleInstance(ctx context.Context, cloudMember *cloudinstances.CloudInstanceGroupMember, detach bool) error {
func (c *RollingUpdateCluster) UpdateSingleInstance(ctx context.Context, cloudMember *cloudinstances.CloudInstance, detach bool) error {
if detach {
if cloudMember.CloudInstanceGroup.InstanceGroup.IsMaster() {
klog.Warning("cannot detach master instances. Assuming --surge=false")

View File

@ -78,7 +78,7 @@ type RollingUpdateCluster struct {
func (c *RollingUpdateCluster) AdjustNeedUpdate(groups map[string]*cloudinstances.CloudInstanceGroup, cluster *api.Cluster, instanceGroups *api.InstanceGroupList) error {
for _, group := range groups {
if group.Ready != nil {
var newReady []*cloudinstances.CloudInstanceGroupMember
var newReady []*cloudinstances.CloudInstance
for _, member := range group.Ready {
makeNotReady := false
if member.Node != nil && member.Node.Annotations != nil {
@ -89,6 +89,7 @@ func (c *RollingUpdateCluster) AdjustNeedUpdate(groups map[string]*cloudinstance
if makeNotReady {
group.NeedUpdate = append(group.NeedUpdate, member)
member.Status = cloudinstances.CloudInstanceStatusNeedsUpdate
} else {
newReady = append(newReady, member)
}

View File

@ -144,7 +144,7 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k
}
_ = fakeClient.Tracker().Add(node)
}
member := cloudinstances.CloudInstanceGroupMember{
member := cloudinstances.CloudInstance{
ID: id,
Node: node,
CloudInstanceGroup: groups[name],
@ -1371,7 +1371,7 @@ func TestRollingUpdateMaxSurgeAllNeedUpdateOneAlreadyDetached(t *testing.T) {
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 4, 4)
alreadyDetachedTest.detached[groups["node-1"].NeedUpdate[3].ID] = true
groups["node-1"].NeedUpdate[3].Detached = true
groups["node-1"].NeedUpdate[3].Status = cloudinstances.CloudInstanceStatusDetached
err := c.RollingUpdate(ctx, groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")
@ -1397,8 +1397,13 @@ func TestRollingUpdateMaxSurgeAllNeedUpdateMaxAlreadyDetached(t *testing.T) {
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 7, 7)
groups["node-1"].NeedUpdate[1].Detached = true
groups["node-1"].NeedUpdate[3].Detached = true
groups["node-1"].NeedUpdate[0].Status = cloudinstances.CloudInstanceStatusNeedsUpdate
groups["node-1"].NeedUpdate[1].Status = cloudinstances.CloudInstanceStatusDetached
groups["node-1"].NeedUpdate[2].Status = cloudinstances.CloudInstanceStatusNeedsUpdate
groups["node-1"].NeedUpdate[3].Status = cloudinstances.CloudInstanceStatusDetached
groups["node-1"].NeedUpdate[4].Status = cloudinstances.CloudInstanceStatusNeedsUpdate
groups["node-1"].NeedUpdate[5].Status = cloudinstances.CloudInstanceStatusNeedsUpdate
groups["node-1"].NeedUpdate[6].Status = cloudinstances.CloudInstanceStatusNeedsUpdate
// TODO verify those are the last two instances terminated
err := c.RollingUpdate(ctx, groups, cluster, &kopsapi.InstanceGroupList{})

View File

@ -107,13 +107,13 @@ func (c *Cloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
}
// DeleteInstance is not implemented yet, is func needs to delete a DO instance.
func (c *Cloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *Cloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("digitalocean cloud provider DeleteInstance not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support deleting cloud instances at this time")
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("digitalocean cloud provider DetachInstance not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support surging")
}
@ -407,8 +407,8 @@ func buildCloudInstanceGroup(c *Cloud, ig *kops.InstanceGroup, g DOInstanceGroup
for _, member := range g.Members {
// TODO use a hash of the godo.DropletCreateRequest fields for second and third parameters.
err := cg.NewCloudInstanceGroupMember(member, g.GroupType, g.GroupType, nodeMap)
// TODO use a hash of the godo.DropletCreateRequest fields to calculate the second parameter.
_, err := cg.NewCloudInstance(member, cloudinstances.CloudInstanceStatusUpToDate, nodeMap)
if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
}

View File

@ -166,7 +166,7 @@ func DeleteInstanceGroup(cloud Cloud, group *cloudinstances.CloudInstanceGroup)
}
// DeleteInstance removes an instance from its instance group.
func DeleteInstance(cloud Cloud, instance *cloudinstances.CloudInstanceGroupMember) error {
func DeleteInstance(cloud Cloud, instance *cloudinstances.CloudInstance) error {
klog.V(2).Infof("Detaching instance %q from instance group: %q",
instance.ID, instance.CloudInstanceGroup.HumanName)
@ -194,7 +194,7 @@ func DeleteInstance(cloud Cloud, instance *cloudinstances.CloudInstanceGroupMemb
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func DetachInstance(cloud Cloud, instance *cloudinstances.CloudInstanceGroupMember) error {
func DetachInstance(cloud Cloud, instance *cloudinstances.CloudInstance) error {
return fmt.Errorf("spotinst does not support surging")
}
@ -299,7 +299,7 @@ func buildCloudInstanceGroupFromInstanceGroup(cloud Cloud, ig *kops.InstanceGrou
}
// Register all instances as group members.
if err := registerCloudInstanceGroupMembers(instanceGroup, nodeMap,
if err := registerCloudInstances(instanceGroup, nodeMap,
instances, group.Name(), group.UpdatedAt()); err != nil {
return nil, err
}
@ -332,7 +332,7 @@ func buildCloudInstanceGroupFromLaunchSpec(cloud Cloud, ig *kops.InstanceGroup,
}
// Register all instances as group members.
if err := registerCloudInstanceGroupMembers(instanceGroup, nodeMap,
if err := registerCloudInstances(instanceGroup, nodeMap,
instances, spec.Name(), spec.UpdatedAt()); err != nil {
return nil, err
}
@ -340,7 +340,7 @@ func buildCloudInstanceGroupFromLaunchSpec(cloud Cloud, ig *kops.InstanceGroup,
return instanceGroup, nil
}
func registerCloudInstanceGroupMembers(instanceGroup *cloudinstances.CloudInstanceGroup, nodeMap map[string]*v1.Node,
func registerCloudInstances(instanceGroup *cloudinstances.CloudInstanceGroup, nodeMap map[string]*v1.Node,
instances []Instance, currentInstanceGroupName string, instanceGroupUpdatedAt time.Time) error {
// The instance registration below registers all active instances with
@ -378,8 +378,12 @@ func registerCloudInstanceGroupMembers(instanceGroup *cloudinstances.CloudInstan
instance.Id(), instance.CreatedAt().Format(time.RFC3339),
currentInstanceGroupName, instanceGroupUpdatedAt.Format(time.RFC3339))
if err := instanceGroup.NewCloudInstanceGroupMember(
instance.Id(), newInstanceGroupName, currentInstanceGroupName, nodeMap); err != nil {
status := cloudinstances.CloudInstanceStatusUpToDate
if newInstanceGroupName != currentInstanceGroupName {
status = cloudinstances.CloudInstanceStatusNeedsUpdate
}
if _, err := instanceGroup.NewCloudInstance(
instance.Id(), status, nodeMap); err != nil {
return fmt.Errorf("error creating cloud instance group member: %v", err)
}
}

View File

@ -291,14 +291,14 @@ func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances
groupsSeen := map[string]bool{}
for _, cloudGroup := range cloudGroups {
var allMembers []*cloudinstances.CloudInstanceGroupMember
var allMembers []*cloudinstances.CloudInstance
allMembers = append(allMembers, cloudGroup.Ready...)
allMembers = append(allMembers, cloudGroup.NeedUpdate...)
groupsSeen[cloudGroup.InstanceGroup.Name] = true
numNodes := 0
for _, m := range allMembers {
if !m.Detached {
if m.Status != cloudinstances.CloudInstanceStatusDetached {
numNodes++
}
}

View File

@ -85,7 +85,7 @@ func testValidate(t *testing.T, groups map[string]*cloudinstances.CloudInstanceG
},
},
MinSize: 1,
Ready: []*cloudinstances.CloudInstanceGroupMember{
Ready: []*cloudinstances.CloudInstance{
{
ID: "i-00001",
Node: &v1.Node{
@ -172,7 +172,7 @@ func Test_ValidateNodesNotEnough(t *testing.T) {
},
MinSize: 2,
TargetSize: 3,
Ready: []*cloudinstances.CloudInstanceGroupMember{
Ready: []*cloudinstances.CloudInstance{
{
ID: "i-00001",
Node: &v1.Node{
@ -185,7 +185,7 @@ func Test_ValidateNodesNotEnough(t *testing.T) {
},
},
},
NeedUpdate: []*cloudinstances.CloudInstanceGroupMember{
NeedUpdate: []*cloudinstances.CloudInstance{
{
ID: "i-00002",
Node: &v1.Node{
@ -225,7 +225,7 @@ func Test_ValidateDetachedNodesDontCount(t *testing.T) {
},
MinSize: 2,
TargetSize: 2,
Ready: []*cloudinstances.CloudInstanceGroupMember{
Ready: []*cloudinstances.CloudInstance{
{
ID: "i-00001",
Node: &v1.Node{
@ -238,7 +238,7 @@ func Test_ValidateDetachedNodesDontCount(t *testing.T) {
},
},
},
NeedUpdate: []*cloudinstances.CloudInstanceGroupMember{
NeedUpdate: []*cloudinstances.CloudInstance{
{
ID: "i-00002",
Node: &v1.Node{
@ -249,7 +249,7 @@ func Test_ValidateDetachedNodesDontCount(t *testing.T) {
},
},
},
Detached: true,
Status: cloudinstances.CloudInstanceStatusDetached,
},
},
}
@ -279,7 +279,7 @@ func Test_ValidateNodeNotReady(t *testing.T) {
},
MinSize: 2,
TargetSize: 2,
Ready: []*cloudinstances.CloudInstanceGroupMember{
Ready: []*cloudinstances.CloudInstance{
{
ID: "i-00001",
Node: &v1.Node{
@ -292,7 +292,7 @@ func Test_ValidateNodeNotReady(t *testing.T) {
},
},
},
NeedUpdate: []*cloudinstances.CloudInstanceGroupMember{
NeedUpdate: []*cloudinstances.CloudInstance{
{
ID: "i-00002",
Node: &v1.Node{
@ -332,7 +332,7 @@ func Test_ValidateMastersNotEnough(t *testing.T) {
},
MinSize: 2,
TargetSize: 3,
Ready: []*cloudinstances.CloudInstanceGroupMember{
Ready: []*cloudinstances.CloudInstance{
{
ID: "i-00001",
Node: &v1.Node{
@ -345,7 +345,7 @@ func Test_ValidateMastersNotEnough(t *testing.T) {
},
},
},
NeedUpdate: []*cloudinstances.CloudInstanceGroupMember{
NeedUpdate: []*cloudinstances.CloudInstance{
{
ID: "i-00002",
Node: &v1.Node{
@ -385,7 +385,7 @@ func Test_ValidateMasterNotReady(t *testing.T) {
},
MinSize: 2,
TargetSize: 2,
Ready: []*cloudinstances.CloudInstanceGroupMember{
Ready: []*cloudinstances.CloudInstance{
{
ID: "i-00001",
Node: &v1.Node{
@ -398,7 +398,7 @@ func Test_ValidateMasterNotReady(t *testing.T) {
},
},
},
NeedUpdate: []*cloudinstances.CloudInstanceGroupMember{
NeedUpdate: []*cloudinstances.CloudInstance{
{
ID: "i-00002",
Node: &v1.Node{
@ -438,7 +438,7 @@ func Test_ValidateMasterStaticPods(t *testing.T) {
},
MinSize: 1,
TargetSize: 1,
Ready: []*cloudinstances.CloudInstanceGroupMember{
Ready: []*cloudinstances.CloudInstance{
{
ID: "i-00001",
Node: &v1.Node{
@ -459,7 +459,7 @@ func Test_ValidateMasterStaticPods(t *testing.T) {
},
},
},
NeedUpdate: []*cloudinstances.CloudInstanceGroupMember{
NeedUpdate: []*cloudinstances.CloudInstance{
{
ID: "i-00002",
Node: &v1.Node{
@ -770,7 +770,7 @@ func Test_ValidateBastionNodes(t *testing.T) {
Name: "ig1",
},
},
Ready: []*cloudinstances.CloudInstanceGroupMember{
Ready: []*cloudinstances.CloudInstance{
{
ID: "i-00001",
Node: nil,

View File

@ -32,13 +32,13 @@ type Cloud interface {
FindVPCInfo(id string) (*VPCInfo, error)
// DeleteInstance deletes a cloud instance.
DeleteInstance(instance *cloudinstances.CloudInstanceGroupMember) error
DeleteInstance(instance *cloudinstances.CloudInstance) error
// DeleteGroup deletes the cloud resources that make up a CloudInstanceGroup, including the instances.
DeleteGroup(group *cloudinstances.CloudInstanceGroup) error
// DetachInstance causes a cloud instance to no longer be counted against the group's size limits.
DetachInstance(instance *cloudinstances.CloudInstanceGroupMember) error
DetachInstance(instance *cloudinstances.CloudInstance) error
// GetCloudGroups returns a map of cloud instances that back a kops cluster.
// Detached instances must be returned in the NeedUpdate slice.

View File

@ -168,10 +168,10 @@ func (c *aliCloudImplementation) DeleteGroup(g *cloudinstances.CloudInstanceGrou
return errors.New("DeleteGroup not implemented on aliCloud")
}
func (c *aliCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *aliCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstance) error {
id := i.ID
if id == "" {
return fmt.Errorf("id was not set on CloudInstanceGroupMember: %v", i)
return fmt.Errorf("id was not set on CloudInstance: %v", i)
}
if err := c.EcsClient().StopInstance(id, false); err != nil {
@ -206,7 +206,7 @@ func (c *aliCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstanceG
return nil
}
func (c *aliCloudImplementation) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *aliCloudImplementation) DetachInstance(i *cloudinstances.CloudInstance) error {
return errors.New("aliCloud cloud provider does not support surging")
}

View File

@ -275,7 +275,11 @@ func buildCloudInstanceGroup(c ALICloud, ig *kops.InstanceGroup, g ess.ScalingGr
klog.Warningf("ignoring instance with no instance id: %s", i)
continue
}
err := cg.NewCloudInstanceGroupMember(instanceId, newLaunchConfigName, i.ScalingConfigurationId, nodeMap)
status := cloudinstances.CloudInstanceStatusUpToDate
if newLaunchConfigName != i.ScalingConfigurationId {
status = cloudinstances.CloudInstanceStatusNeedsUpdate
}
_, err := cg.NewCloudInstance(instanceId, status, nodeMap)
if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
}

View File

@ -24,6 +24,7 @@ go_library(
"//pkg/apis/kops/model:go_default_library",
"//pkg/cloudinstances:go_default_library",
"//pkg/featureflag:go_default_library",
"//pkg/nodeidentity/aws:go_default_library",
"//pkg/resources/spotinst:go_default_library",
"//protokube/pkg/etcd:go_default_library",
"//upup/pkg/fi:go_default_library",

View File

@ -50,6 +50,7 @@ import (
"k8s.io/kops/pkg/apis/kops/model"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/featureflag"
identity_aws "k8s.io/kops/pkg/nodeidentity/aws"
"k8s.io/kops/pkg/resources/spotinst"
"k8s.io/kops/upup/pkg/fi"
k8s_aws "k8s.io/legacy-cloud-providers/aws"
@ -442,7 +443,7 @@ func deleteGroup(c AWSCloud, g *cloudinstances.CloudInstanceGroup) error {
}
// DeleteInstance deletes an aws instance
func (c *awsCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *awsCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstance) error {
if c.spotinst != nil {
if featureflag.SpotinstHybrid.Enabled() {
if _, ok := i.CloudInstanceGroup.Raw.(*autoscaling.Group); ok {
@ -456,10 +457,10 @@ func (c *awsCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstanceG
return deleteInstance(c, i)
}
func deleteInstance(c AWSCloud, i *cloudinstances.CloudInstanceGroupMember) error {
func deleteInstance(c AWSCloud, i *cloudinstances.CloudInstance) error {
id := i.ID
if id == "" {
return fmt.Errorf("id was not set on CloudInstanceGroupMember: %v", i)
return fmt.Errorf("id was not set on CloudInstance: %v", i)
}
request := &ec2.TerminateInstancesInput{
@ -476,8 +477,8 @@ func deleteInstance(c AWSCloud, i *cloudinstances.CloudInstanceGroupMember) erro
}
// DetachInstance causes an aws instance to no longer be counted against the ASG's size limits.
func (c *awsCloudImplementation) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
if i.Detached {
func (c *awsCloudImplementation) DetachInstance(i *cloudinstances.CloudInstance) error {
if i.Status == cloudinstances.CloudInstanceStatusDetached {
return nil
}
if c.spotinst != nil {
@ -487,10 +488,10 @@ func (c *awsCloudImplementation) DetachInstance(i *cloudinstances.CloudInstanceG
return detachInstance(c, i)
}
func detachInstance(c AWSCloud, i *cloudinstances.CloudInstanceGroupMember) error {
func detachInstance(c AWSCloud, i *cloudinstances.CloudInstance) error {
id := i.ID
if id == "" {
return fmt.Errorf("id was not set on CloudInstanceGroupMember: %v", i)
return fmt.Errorf("id was not set on CloudInstance: %v", i)
}
asg := i.CloudInstanceGroup.Raw.(*autoscaling.Group)
@ -751,6 +752,10 @@ func awsBuildCloudInstanceGroup(c AWSCloud, cluster *kops.Cluster, ig *kops.Inst
}
instanceSeen := map[string]bool{}
instances, err := findInstances(c, ig)
if err != nil {
return nil, fmt.Errorf("failed to fetch instances: %v", err)
}
cg := &cloudinstances.CloudInstanceGroup{
HumanName: aws.StringValue(g.AutoScalingGroupName),
@ -774,19 +779,44 @@ func awsBuildCloudInstanceGroup(c AWSCloud, cluster *kops.Cluster, ig *kops.Inst
continue
}
currentConfigName := findInstanceLaunchConfiguration(i)
if err := cg.NewCloudInstanceGroupMember(id, newConfigName, currentConfigName, nodeMap); err != nil {
status := cloudinstances.CloudInstanceStatusUpToDate
if newConfigName != currentConfigName {
status = cloudinstances.CloudInstanceStatusNeedsUpdate
}
cm, err := cg.NewCloudInstance(id, status, nodeMap)
if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
}
cm.MachineType = aws.StringValue(i.InstanceType)
instance := instances[id]
for _, tag := range instance.Tags {
key := aws.StringValue(tag.Key)
if !strings.HasPrefix(key, TagNameRolePrefix) {
continue
}
role := strings.TrimPrefix(key, TagNameRolePrefix)
cm.Roles = append(cm.Roles, role)
cm.PrivateIP = aws.StringValue(instance.PrivateIpAddress)
}
detached, err := findDetachedInstances(c, g)
}
var detached []*string
for id, instance := range instances {
for _, tag := range instance.Tags {
if aws.StringValue(tag.Key) == tagNameDetachedInstance {
detached = append(detached, aws.String(id))
}
}
}
if err != nil {
return nil, fmt.Errorf("error searching for detached instances: %v", err)
}
for _, id := range detached {
if id != nil && *id != "" && !instanceSeen[*id] {
if err := cg.NewDetachedCloudInstanceGroupMember(*id, nodeMap); err != nil {
_, err := cg.NewCloudInstance(*id, cloudinstances.CloudInstanceStatusDetached, nodeMap)
if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
}
instanceSeen[*id] = true
@ -796,10 +826,10 @@ func awsBuildCloudInstanceGroup(c AWSCloud, cluster *kops.Cluster, ig *kops.Inst
return cg, nil
}
func findDetachedInstances(c AWSCloud, g *autoscaling.Group) ([]*string, error) {
func findInstances(c AWSCloud, ig *kops.InstanceGroup) (map[string]*ec2.Instance, error) {
req := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
NewEC2Filter("tag:"+tagNameDetachedInstance, aws.StringValue(g.AutoScalingGroupName)),
NewEC2Filter("tag:"+identity_aws.CloudTagInstanceGroupName, ig.ObjectMeta.Name),
NewEC2Filter("instance-state-name", "pending", "running", "stopping", "stopped"),
},
}
@ -809,6 +839,28 @@ func findDetachedInstances(c AWSCloud, g *autoscaling.Group) ([]*string, error)
return nil, err
}
instances := make(map[string]*ec2.Instance)
for _, r := range result.Reservations {
for _, i := range r.Instances {
id := aws.StringValue(i.InstanceId)
instances[id] = i
}
}
return instances, nil
}
func findDetachedInstances(c AWSCloud, g *autoscaling.Group) ([]*string, error) {
req := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
NewEC2Filter("tag:"+tagNameDetachedInstance, aws.StringValue(g.AutoScalingGroupName)),
NewEC2Filter("instance-state-name", "pending", "running", "stopping", "stopped"),
},
}
result, err := c.EC2().DescribeInstances(req)
if err != nil {
return nil, err
}
var detached []*string
for _, r := range result.Reservations {
for _, i := range r.Instances {

View File

@ -86,11 +86,11 @@ func (c *MockAWSCloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
return deleteGroup(c, g)
}
func (c *MockAWSCloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *MockAWSCloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
return deleteInstance(c, i)
}
func (c *MockAWSCloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *MockAWSCloud) DetachInstance(i *cloudinstances.CloudInstance) error {
return detachInstance(c, i)
}

View File

@ -52,29 +52,29 @@ func (c *mockGCECloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
}
// DeleteInstance deletes a GCE instance
func (c *gceCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember) error {
return recreateCloudInstanceGroupMember(c, i)
func (c *gceCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstance) error {
return recreateCloudInstance(c, i)
}
// DeleteInstance deletes a GCE instance
func (c *mockGCECloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember) error {
return recreateCloudInstanceGroupMember(c, i)
func (c *mockGCECloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
return recreateCloudInstance(c, i)
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *gceCloudImplementation) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *gceCloudImplementation) DetachInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("gce cloud provider DetachInstance not implemented yet")
return fmt.Errorf("gce cloud provider does not support surging")
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *mockGCECloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *mockGCECloud) DetachInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("gce cloud provider DetachInstance not implemented yet")
return fmt.Errorf("gce cloud provider does not support surging")
}
// recreateCloudInstanceGroupMember recreates the specified instances, managed by an InstanceGroupManager
func recreateCloudInstanceGroupMember(c GCECloud, i *cloudinstances.CloudInstanceGroupMember) error {
// recreateCloudInstance recreates the specified instances, managed by an InstanceGroupManager
func recreateCloudInstance(c GCECloud, i *cloudinstances.CloudInstance) error {
mig := i.CloudInstanceGroup.Raw.(*compute.InstanceGroupManager)
klog.V(2).Infof("Recreating GCE Instance %s in MIG %s", i.ID, mig.Name)
@ -184,7 +184,7 @@ func getCloudGroups(c GCECloud, cluster *kops.Cluster, instancegroups []*kops.In
for _, i := range instances {
id := i.Instance
cm := &cloudinstances.CloudInstanceGroupMember{
cm := &cloudinstances.CloudInstance{
ID: id,
CloudInstanceGroup: g,
}

View File

@ -112,11 +112,11 @@ func listServerFloatingIPs(c OpenstackCloud, instanceID string, floatingEnabled
return result, nil
}
func (c *openstackCloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *openstackCloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
return deleteInstance(c, i)
}
func deleteInstance(c OpenstackCloud, i *cloudinstances.CloudInstanceGroupMember) error {
func deleteInstance(c OpenstackCloud, i *cloudinstances.CloudInstance) error {
klog.Warning("This does not work without running kops update cluster --yes in another terminal")
return deleteInstanceWithID(c, i.ID)
}
@ -130,11 +130,11 @@ func deleteInstanceWithID(c OpenstackCloud, instanceID string) error {
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *openstackCloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *openstackCloud) DetachInstance(i *cloudinstances.CloudInstance) error {
return detachInstance(c, i)
}
func detachInstance(c OpenstackCloud, i *cloudinstances.CloudInstanceGroupMember) error {
func detachInstance(c OpenstackCloud, i *cloudinstances.CloudInstance) error {
klog.V(8).Info("openstack cloud provider DetachInstance not implemented yet")
return fmt.Errorf("openstack cloud provider does not support surging")
}

View File

@ -126,11 +126,11 @@ func (c *MockCloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
return deleteGroup(c, g)
}
func (c *MockCloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *MockCloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
return deleteInstance(c, i)
}
func (c *MockCloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *MockCloud) DetachInstance(i *cloudinstances.CloudInstance) error {
return detachInstance(c, i)
}

View File

@ -133,7 +133,11 @@ func osBuildCloudInstanceGroup(c OpenstackCloud, cluster *kops.Cluster, ig *kops
observedName := fmt.Sprintf("%s-%s", clusterObservedGeneration, igObservedGeneration)
generationName := fmt.Sprintf("%d-%d", cluster.GetGeneration(), ig.Generation)
err = cg.NewCloudInstanceGroupMember(instanceId, generationName, observedName, nodeMap)
status := cloudinstances.CloudInstanceStatusUpToDate
if generationName != observedName {
status = cloudinstances.CloudInstanceStatusNeedsUpdate
}
_, err = cg.NewCloudInstance(instanceId, status, nodeMap)
if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
}