mirror of https://github.com/kubernetes/kops.git
				
				
				
			
		
			
				
	
	
		
			457 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			457 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
| Copyright 2019 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package spotinst
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	v1 "k8s.io/api/core/v1"
 | |
| 	"k8s.io/klog/v2"
 | |
| 	"k8s.io/kops/pkg/apis/kops"
 | |
| 	"k8s.io/kops/pkg/cloudinstances"
 | |
| 	"k8s.io/kops/pkg/resources"
 | |
| 	"k8s.io/kops/upup/pkg/fi"
 | |
| )
 | |
| 
 | |
| // ListResources returns a list of all resources.
 | |
| func ListResources(cloud Cloud, clusterName string) ([]*resources.Resource, error) {
 | |
| 	klog.V(2).Info("Listing all resources")
 | |
| 
 | |
| 	fns := []func(Cloud, string) ([]*resources.Resource, error){
 | |
| 		ListElastigroupResources,
 | |
| 		ListOceanResources,
 | |
| 	}
 | |
| 
 | |
| 	var resourceTrackers []*resources.Resource
 | |
| 	for _, fn := range fns {
 | |
| 		resources, err := fn(cloud, clusterName)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("spotinst: error listing resources: %v", err)
 | |
| 		}
 | |
| 
 | |
| 		resourceTrackers = append(resourceTrackers, resources...)
 | |
| 	}
 | |
| 
 | |
| 	return resourceTrackers, nil
 | |
| }
 | |
| 
 | |
| // ListElastigroupResources returns a list of all Elastigroup resources.
 | |
| func ListElastigroupResources(cloud Cloud, clusterName string) ([]*resources.Resource, error) {
 | |
| 	klog.V(2).Info("Listing all Elastigroup resources")
 | |
| 
 | |
| 	// List all Elastigroup instance groups.
 | |
| 	groups, err := listInstanceGroups(cloud.Elastigroup(), clusterName)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return groups, nil
 | |
| }
 | |
| 
 | |
| // ListOceanResources returns a list of all Ocean resources.
 | |
| func ListOceanResources(cloud Cloud, clusterName string) ([]*resources.Resource, error) {
 | |
| 	klog.V(2).Info("Listing all Ocean resources")
 | |
| 	var resourceTrackers []*resources.Resource
 | |
| 
 | |
| 	// List all Ocean instance groups.
 | |
| 	oceans, err := listInstanceGroups(cloud.Ocean(), clusterName)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	resourceTrackers = append(resourceTrackers, oceans...)
 | |
| 
 | |
| 	// List all Ocean launch specs.
 | |
| 	for _, ocean := range oceans {
 | |
| 		specs, err := listLaunchSpecs(cloud.LaunchSpec(), ocean.ID)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		resourceTrackers = append(resourceTrackers, specs...)
 | |
| 	}
 | |
| 
 | |
| 	return resourceTrackers, nil
 | |
| }
 | |
| 
 | |
| // listInstanceGroups returns a list of all instance groups.
 | |
| func listInstanceGroups(svc InstanceGroupService, clusterName string) ([]*resources.Resource, error) {
 | |
| 	groups, err := svc.List(context.Background())
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var resourceTrackers []*resources.Resource
 | |
| 	for _, group := range groups {
 | |
| 		if strings.HasSuffix(group.Name(), fmt.Sprintf(".%s", clusterName)) &&
 | |
| 			!strings.HasPrefix(strings.ToLower(group.Name()), "spotinst::ocean::") {
 | |
| 			resource := &resources.Resource{
 | |
| 				ID:      group.Id(),
 | |
| 				Name:    group.Name(),
 | |
| 				Type:    string(ResourceTypeInstanceGroup),
 | |
| 				Obj:     group,
 | |
| 				Deleter: instanceGroupDeleter(svc, group),
 | |
| 				Dumper:  dumper,
 | |
| 			}
 | |
| 			resourceTrackers = append(resourceTrackers, resource)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return resourceTrackers, nil
 | |
| }
 | |
| 
 | |
| // listLaunchSpecs returns a list of all launch specs.
 | |
| func listLaunchSpecs(svc LaunchSpecService, oceanID string) ([]*resources.Resource, error) {
 | |
| 	specs, err := svc.List(context.Background(), oceanID)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var resourceTrackers []*resources.Resource
 | |
| 	for _, spec := range specs {
 | |
| 		resource := &resources.Resource{
 | |
| 			ID:      spec.Id(),
 | |
| 			Name:    spec.Name(),
 | |
| 			Type:    string(ResourceTypeLaunchSpec),
 | |
| 			Obj:     spec,
 | |
| 			Deleter: launchSpecDeleter(svc, spec),
 | |
| 			Dumper:  dumper,
 | |
| 		}
 | |
| 		resourceTrackers = append(resourceTrackers, resource)
 | |
| 	}
 | |
| 
 | |
| 	return resourceTrackers, nil
 | |
| }
 | |
| 
 | |
| // DeleteInstanceGroup deletes an existing InstanceGroup.
 | |
| func DeleteInstanceGroup(cloud Cloud, group *cloudinstances.CloudInstanceGroup) error {
 | |
| 	klog.V(2).Infof("Deleting instance group: %q", group.HumanName)
 | |
| 
 | |
| 	switch obj := group.Raw.(type) {
 | |
| 	case InstanceGroup:
 | |
| 		{
 | |
| 			var svc InstanceGroupService
 | |
| 			switch obj.Type() {
 | |
| 			case InstanceGroupElastigroup:
 | |
| 				svc = cloud.Elastigroup()
 | |
| 			case InstanceGroupOcean:
 | |
| 				svc = cloud.Ocean()
 | |
| 			}
 | |
| 
 | |
| 			return svc.Delete(context.Background(), obj.Id())
 | |
| 		}
 | |
| 	case LaunchSpec:
 | |
| 		{
 | |
| 			return cloud.LaunchSpec().Delete(context.Background(), obj.Id())
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return fmt.Errorf("spotinst: unexpected instance group type, got: %T", group.Raw)
 | |
| }
 | |
| 
 | |
| // DeleteInstance removes an instance from its instance group.
 | |
| func DeleteInstance(cloud Cloud, instance *cloudinstances.CloudInstance) error {
 | |
| 	klog.V(2).Infof("Detaching instance %q from instance group: %q",
 | |
| 		instance.ID, instance.CloudInstanceGroup.HumanName)
 | |
| 
 | |
| 	group := instance.CloudInstanceGroup
 | |
| 	switch obj := group.Raw.(type) {
 | |
| 	case InstanceGroup:
 | |
| 		{
 | |
| 			var svc InstanceGroupService
 | |
| 			switch obj.Type() {
 | |
| 			case InstanceGroupElastigroup:
 | |
| 				svc = cloud.Elastigroup()
 | |
| 			case InstanceGroupOcean:
 | |
| 				svc = cloud.Ocean()
 | |
| 			}
 | |
| 
 | |
| 			return svc.Detach(context.Background(), obj.Id(), []string{instance.ID})
 | |
| 		}
 | |
| 	case LaunchSpec:
 | |
| 		{
 | |
| 			return cloud.Ocean().Detach(context.Background(), obj.OceanId(), []string{instance.ID})
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return fmt.Errorf("spotinst: unexpected instance group type, got: %T", group.Raw)
 | |
| }
 | |
| 
 | |
| // DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
 | |
| func DetachInstance(cloud Cloud, instance *cloudinstances.CloudInstance) error {
 | |
| 	return fmt.Errorf("spotinst does not support surging")
 | |
| }
 | |
| 
 | |
| // GetCloudGroups returns a list of InstanceGroups as CloudInstanceGroup objects.
 | |
| func GetCloudGroups(cloud Cloud, cluster *kops.Cluster, instanceGroups []*kops.InstanceGroup,
 | |
| 	warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error,
 | |
| ) {
 | |
| 	cloudInstanceGroups := make(map[string]*cloudinstances.CloudInstanceGroup)
 | |
| 	nodeMap := cloudinstances.GetNodeMap(nodes, cluster)
 | |
| 
 | |
| 	// List all resources.
 | |
| 	resources, err := ListResources(cloud, cluster.Name)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Build all cloud instance groups.
 | |
| 	for _, resource := range resources {
 | |
| 
 | |
| 		// Filter out the Ocean resources (they're not needed for now since
 | |
| 		// we fetch the instances from the launch specs).
 | |
| 		if ResourceType(resource.Type) == ResourceTypeInstanceGroup {
 | |
| 			if resource.Obj.(InstanceGroup).Type() == InstanceGroupOcean {
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Build cloud instance group.
 | |
| 		ig, err := buildCloudInstanceGroupFromResource(cloud, cluster, instanceGroups, resource, nodeMap)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("spotinst: error building cloud instance group: %v", err)
 | |
| 		}
 | |
| 		if ig == nil {
 | |
| 			if warnUnmatched {
 | |
| 				klog.V(2).Infof("Found group with no corresponding instance group: %q", resource.Name)
 | |
| 			}
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		cloudInstanceGroups[resource.Name] = ig
 | |
| 	}
 | |
| 
 | |
| 	return cloudInstanceGroups, nil
 | |
| }
 | |
| 
 | |
| func buildCloudInstanceGroupFromResource(cloud Cloud, cluster *kops.Cluster,
 | |
| 	instanceGroups []*kops.InstanceGroup, resource *resources.Resource,
 | |
| 	nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error,
 | |
| ) {
 | |
| 	// Find corresponding instance group.
 | |
| 	ig, err := findInstanceGroupFromResource(cluster, instanceGroups, resource)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to find instance group of resource %q: %v", resource.Name, err)
 | |
| 	}
 | |
| 	if ig == nil {
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 
 | |
| 	switch ResourceType(resource.Type) {
 | |
| 	case ResourceTypeInstanceGroup:
 | |
| 		{
 | |
| 			if group, ok := resource.Obj.(InstanceGroup); ok {
 | |
| 				return buildCloudInstanceGroupFromInstanceGroup(cloud, ig, group, nodeMap)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 	case ResourceTypeLaunchSpec:
 | |
| 		{
 | |
| 			if spec, ok := resource.Obj.(LaunchSpec); ok {
 | |
| 				return buildCloudInstanceGroupFromLaunchSpec(cloud, ig, spec, nodeMap)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil, fmt.Errorf("unexpected resource type: %s", resource.Type)
 | |
| }
 | |
| 
 | |
| func buildCloudInstanceGroupFromInstanceGroup(cloud Cloud, ig *kops.InstanceGroup, group InstanceGroup,
 | |
| 	nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error,
 | |
| ) {
 | |
| 	instanceGroup := &cloudinstances.CloudInstanceGroup{
 | |
| 		HumanName:     group.Name(),
 | |
| 		InstanceGroup: ig,
 | |
| 		MinSize:       group.MinSize(),
 | |
| 		TargetSize:    group.MinSize(), // TODO: Retrieve the target size from the cloud provider
 | |
| 		MaxSize:       group.MaxSize(),
 | |
| 		Raw:           group,
 | |
| 	}
 | |
| 
 | |
| 	var svc InstanceGroupService
 | |
| 	switch group.Type() {
 | |
| 	case InstanceGroupElastigroup:
 | |
| 		svc = cloud.Elastigroup()
 | |
| 	case InstanceGroupOcean:
 | |
| 		svc = cloud.Ocean()
 | |
| 	}
 | |
| 
 | |
| 	klog.V(2).Infof("Attempting to fetch all instances of instance group: %q (id: %q)", group.Name(), group.Id())
 | |
| 	instances, err := svc.Instances(context.Background(), group.Id())
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Register all instances as group members.
 | |
| 	if err := registerCloudInstances(instanceGroup, nodeMap,
 | |
| 		instances, group.Name(), group.UpdatedAt()); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return instanceGroup, nil
 | |
| }
 | |
| 
 | |
| // TODO(liran): We should fetch Ocean's instances using a query param of `?launchSpecId=foo`,
 | |
| // but, since we do not support it at the moment, we should fetch all instances only once.
 | |
| var fetchOceanInstances sync.Once
 | |
| 
 | |
| func buildCloudInstanceGroupFromLaunchSpec(cloud Cloud, ig *kops.InstanceGroup, spec LaunchSpec,
 | |
| 	nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error,
 | |
| ) {
 | |
| 	instanceGroup := &cloudinstances.CloudInstanceGroup{
 | |
| 		HumanName:     spec.Name(),
 | |
| 		InstanceGroup: ig,
 | |
| 		Raw:           spec,
 | |
| 	}
 | |
| 
 | |
| 	var instances []Instance
 | |
| 	var err error
 | |
| 
 | |
| 	fetchOceanInstances.Do(func() {
 | |
| 		klog.V(2).Infof("Attempting to fetch all instances of instance group: %q (id: %q)", spec.Name(), spec.Id())
 | |
| 		instances, err = cloud.Ocean().Instances(context.Background(), spec.OceanId())
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Register all instances as group members.
 | |
| 	if err := registerCloudInstances(instanceGroup, nodeMap,
 | |
| 		instances, spec.Name(), spec.UpdatedAt()); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return instanceGroup, nil
 | |
| }
 | |
| 
 | |
| func registerCloudInstances(instanceGroup *cloudinstances.CloudInstanceGroup, nodeMap map[string]*v1.Node,
 | |
| 	instances []Instance, currentInstanceGroupName string, instanceGroupUpdatedAt time.Time) error {
 | |
| 	// The instance registration below registers all active instances with
 | |
| 	// their instance group. In addition, it looks for outdated instances by
 | |
| 	// comparing each instance creation timestamp against the modification
 | |
| 	// timestamp of its instance group.
 | |
| 	//
 | |
| 	// In a rolling-update operation, one or more detach operations are
 | |
| 	// performed to replace existing instances. This is done by updating the
 | |
| 	// instance group and results in updating the modification timestamp to the
 | |
| 	// current time.
 | |
| 	//
 | |
| 	// The update of the modification timestamp occurs only after the detach
 | |
| 	// operation is completed, meaning that new instances have already been
 | |
| 	// created, so our comparison may be incorrect.
 | |
| 	//
 | |
| 	// In order to work around this issue, we assume that the detach operation
 | |
| 	// will take up to two minutes, and therefore we subtract this duration from
 | |
| 	// the modification timestamp of the instance group before the comparison.
 | |
| 	instanceGroupUpdatedAt = instanceGroupUpdatedAt.Add(-2 * time.Minute)
 | |
| 
 | |
| 	for _, instance := range instances {
 | |
| 		if instance.Id() == "" {
 | |
| 			klog.Warningf("Ignoring instance with no ID: %v", instance)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// If the instance was created before the last update, mark it as `NeedUpdate`.
 | |
| 		newInstanceGroupName := currentInstanceGroupName
 | |
| 		if instance.CreatedAt().Before(instanceGroupUpdatedAt) {
 | |
| 			newInstanceGroupName = fmt.Sprintf("%s:%d", currentInstanceGroupName, time.Now().Nanosecond())
 | |
| 		}
 | |
| 
 | |
| 		klog.V(2).Infof("Adding instance %q (created at: %s) to instance group: %q (updated at: %s)",
 | |
| 			instance.Id(), instance.CreatedAt().Format(time.RFC3339),
 | |
| 			currentInstanceGroupName, instanceGroupUpdatedAt.Format(time.RFC3339))
 | |
| 
 | |
| 		status := cloudinstances.CloudInstanceStatusUpToDate
 | |
| 		if newInstanceGroupName != currentInstanceGroupName {
 | |
| 			status = cloudinstances.CloudInstanceStatusNeedsUpdate
 | |
| 		}
 | |
| 		if _, err := instanceGroup.NewCloudInstance(
 | |
| 			instance.Id(), status, nodeMap[instance.Id()]); err != nil {
 | |
| 			return fmt.Errorf("error creating cloud instance group member: %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func findInstanceGroupFromResource(cluster *kops.Cluster, instanceGroups []*kops.InstanceGroup,
 | |
| 	resource *resources.Resource) (*kops.InstanceGroup, error,
 | |
| ) {
 | |
| 	var instanceGroup *kops.InstanceGroup
 | |
| 	for _, ig := range instanceGroups {
 | |
| 		name := getGroupNameByRole(cluster, ig)
 | |
| 		if name == "" {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if name == resource.Name {
 | |
| 			if instanceGroup != nil {
 | |
| 				return nil, fmt.Errorf("found multiple instance groups matching group: %q", name)
 | |
| 			}
 | |
| 
 | |
| 			klog.V(2).Infof("Found group with corresponding instance group: %q", name)
 | |
| 			instanceGroup = ig
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return instanceGroup, nil
 | |
| }
 | |
| 
 | |
| func getGroupNameByRole(cluster *kops.Cluster, ig *kops.InstanceGroup) string {
 | |
| 	var groupName string
 | |
| 
 | |
| 	switch ig.Spec.Role {
 | |
| 	case kops.InstanceGroupRoleControlPlane:
 | |
| 		groupName = ig.ObjectMeta.Name + ".masters." + cluster.ObjectMeta.Name
 | |
| 	case kops.InstanceGroupRoleNode:
 | |
| 		groupName = ig.ObjectMeta.Name + "." + cluster.ObjectMeta.Name
 | |
| 	case kops.InstanceGroupRoleBastion:
 | |
| 		groupName = ig.ObjectMeta.Name + "." + cluster.ObjectMeta.Name
 | |
| 	default:
 | |
| 		klog.Warningf("Ignoring InstanceGroup of unknown role %q", ig.Spec.Role)
 | |
| 	}
 | |
| 
 | |
| 	return groupName
 | |
| }
 | |
| 
 | |
| func instanceGroupDeleter(svc InstanceGroupService, group InstanceGroup) func(fi.Cloud, *resources.Resource) error {
 | |
| 	return func(cloud fi.Cloud, resource *resources.Resource) error {
 | |
| 		klog.V(2).Infof("Deleting instance group: %q", group.Id())
 | |
| 		return svc.Delete(context.Background(), group.Id())
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func launchSpecDeleter(svc LaunchSpecService, spec LaunchSpec) func(fi.Cloud, *resources.Resource) error {
 | |
| 	return func(cloud fi.Cloud, resource *resources.Resource) error {
 | |
| 		klog.V(2).Infof("Deleting launch spec: %q", spec.Id())
 | |
| 		return svc.Delete(context.Background(), spec.Id())
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func dumper(op *resources.DumpOperation, resource *resources.Resource) error {
 | |
| 	data := make(map[string]interface{})
 | |
| 
 | |
| 	data["id"] = resource.ID
 | |
| 	data["type"] = resource.Type
 | |
| 	data["raw"] = resource.Obj
 | |
| 
 | |
| 	op.Dump.Resources = append(op.Dump.Resources, data)
 | |
| 	return nil
 | |
| }
 |