Move cloud.do from pkg/resources/digitalocean/ckoud.go to upup/pkg/fi/cloudup/do directory

This commit is contained in:
srikiz 2021-05-15 01:57:41 +05:30
parent a0f2d62f57
commit 4cecc64f67
6 changed files with 462 additions and 590 deletions

View File

@ -21,7 +21,6 @@ import (
"strings" "strings"
"k8s.io/kops/pkg/model" "k8s.io/kops/pkg/model"
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/do" "k8s.io/kops/upup/pkg/fi/cloudup/do"
"k8s.io/kops/upup/pkg/fi/cloudup/dotasks" "k8s.io/kops/upup/pkg/fi/cloudup/dotasks"
@ -76,9 +75,9 @@ func (d *DropletBuilder) Build(c *fi.ModelBuilderContext) error {
clusterTagIndex := do.TagKubernetesClusterIndex + ":" + "etcd-" + strconv.Itoa(masterIndexCount) clusterTagIndex := do.TagKubernetesClusterIndex + ":" + "etcd-" + strconv.Itoa(masterIndexCount)
droplet.Tags = append(droplet.Tags, clusterTagIndex) droplet.Tags = append(droplet.Tags, clusterTagIndex)
droplet.Tags = append(droplet.Tags, clusterMasterTag) droplet.Tags = append(droplet.Tags, clusterMasterTag)
droplet.Tags = append(droplet.Tags, digitalocean.TagKubernetesInstanceGroup+":"+ig.Name) droplet.Tags = append(droplet.Tags, do.TagKubernetesInstanceGroup+":"+ig.Name)
} else { } else {
droplet.Tags = append(droplet.Tags, digitalocean.TagKubernetesInstanceGroup+":"+ig.Name) droplet.Tags = append(droplet.Tags, do.TagKubernetesInstanceGroup+":"+ig.Name)
} }
userData, err := d.BootstrapScriptBuilder.ResourceNodeUp(c, ig) userData, err := d.BootstrapScriptBuilder.ResourceNodeUp(c, ig)

View File

@ -1,457 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package digitalocean
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/digitalocean/godo"
"golang.org/x/oauth2"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/kops/dnsprovider/pkg/dnsprovider"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/resources/digitalocean/dns"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/upup/pkg/fi"
)
const TagKubernetesClusterIndex = "k8s-index"
const TagKubernetesClusterNamePrefix = "KubernetesCluster"
const TagKubernetesInstanceGroup = "kops-instancegroup"
// TokenSource implements oauth2.TokenSource
type TokenSource struct {
AccessToken string
}
// Token() returns oauth2.Token
func (t *TokenSource) Token() (*oauth2.Token, error) {
token := &oauth2.Token{
AccessToken: t.AccessToken,
}
return token, nil
}
// Cloud exposes all the interfaces required to operate on DigitalOcean resources
type Cloud struct {
Client *godo.Client
dns dnsprovider.Interface
// RegionName holds the region, renamed to avoid conflict with Region()
RegionName string
}
type DOInstanceGroup struct {
ClusterName string
InstanceGroupName string
GroupType string // will be either "master" or "worker"
Members []string // will store the droplet names that matches.
}
var _ fi.Cloud = &Cloud{}
// NewCloud returns a Cloud, expecting the env var DIGITALOCEAN_ACCESS_TOKEN
// NewCloud will return an err if DIGITALOCEAN_ACCESS_TOKEN is not defined
func NewCloud(region string) (*Cloud, error) {
accessToken := os.Getenv("DIGITALOCEAN_ACCESS_TOKEN")
if accessToken == "" {
return nil, errors.New("DIGITALOCEAN_ACCESS_TOKEN is required")
}
tokenSource := &TokenSource{
AccessToken: accessToken,
}
oauthClient := oauth2.NewClient(context.TODO(), tokenSource)
client := godo.NewClient(oauthClient)
return &Cloud{
Client: client,
dns: dns.NewProvider(client),
RegionName: region,
}, nil
}
func (c *Cloud) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
return getCloudGroups(c, cluster, instancegroups, warnUnmatched, nodes)
}
// DeleteGroup is not implemented yet, is a func that needs to delete a DO instance group.
func (c *Cloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
klog.V(8).Info("digitalocean cloud provider DeleteGroup not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support deleting cloud groups at this time")
}
func (c *Cloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
dropletID, err := strconv.Atoi(i.ID)
if err != nil {
return fmt.Errorf("failed to convert droplet ID to int: %s", err)
}
_, _, err = c.Client.DropletActions.Shutdown(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error stopping instance %q: %v", dropletID, err)
}
// Wait for 5 min to stop the instance
for i := 0; i < 5; i++ {
droplet, _, err := c.Client.Droplets.Get(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error describing instance %q: %v", dropletID, err)
}
klog.V(8).Infof("stopping DO instance %q, current Status: %q", droplet, droplet.Status)
if droplet.Status == "off" {
break
}
if i == 5 {
return fmt.Errorf("fail to stop DO instance %v in 5 mins", dropletID)
}
time.Sleep(time.Minute * 1)
}
_, err = c.Client.Droplets.Delete(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error stopping instance %q: %v", dropletID, err)
}
klog.V(8).Infof("deleted droplet instance %q", dropletID)
return nil
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("digitalocean cloud provider DetachInstance not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support surging")
}
// ProviderID returns the kops api identifier for DigitalOcean cloud provider
func (c *Cloud) ProviderID() kops.CloudProviderID {
return kops.CloudProviderDO
}
// Region returns the DO region we will target
func (c *Cloud) Region() string {
return c.RegionName
}
// DNS returns a DO implementation for dnsprovider.Interface
func (c *Cloud) DNS() (dnsprovider.Interface, error) {
return c.dns, nil
}
// Volumes returns an implementation of godo.StorageService
func (c *Cloud) Volumes() godo.StorageService {
return c.Client.Storage
}
// VolumeActions returns an implementation of godo.StorageActionsService
func (c *Cloud) VolumeActions() godo.StorageActionsService {
return c.Client.StorageActions
}
func (c *Cloud) Droplets() godo.DropletsService {
return c.Client.Droplets
}
func (c *Cloud) DropletActions() godo.DropletActionsService {
return c.Client.DropletActions
}
func (c *Cloud) LoadBalancers() godo.LoadBalancersService {
return c.Client.LoadBalancers
}
func (c *Cloud) GetAllLoadBalancers() ([]godo.LoadBalancer, error) {
return getAllLoadBalancers(c)
}
// FindVPCInfo is not implemented, it's only here to satisfy the fi.Cloud interface
func (c *Cloud) FindVPCInfo(id string) (*fi.VPCInfo, error) {
return nil, errors.New("not implemented")
}
func (c *Cloud) GetApiIngressStatus(cluster *kops.Cluster) ([]fi.ApiIngressStatus, error) {
var ingresses []fi.ApiIngressStatus
if cluster.Spec.MasterPublicName != "" {
// Note that this must match Digital Ocean's lb name
klog.V(2).Infof("Querying DO to find Loadbalancers for API (%q)", cluster.Name)
loadBalancers, err := getAllLoadBalancers(c)
if err != nil {
return nil, fmt.Errorf("LoadBalancers.List returned error: %v", err)
}
lbName := "api-" + strings.Replace(cluster.Name, ".", "-", -1)
for _, lb := range loadBalancers {
if lb.Name == lbName {
klog.V(10).Infof("Matching LB name found for API (%q)", cluster.Name)
if lb.Status != "active" {
return nil, fmt.Errorf("load-balancer is not yet active (current status: %s)", lb.Status)
}
address := lb.IP
ingresses = append(ingresses, fi.ApiIngressStatus{IP: address})
return ingresses, nil
}
}
}
return nil, nil
}
// FindClusterStatus discovers the status of the cluster, by looking for the tagged etcd volumes
func (c *Cloud) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
etcdStatus, err := findEtcdStatus(c, cluster)
if err != nil {
return nil, err
}
status := &kops.ClusterStatus{
EtcdClusters: etcdStatus,
}
klog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status))
return status, nil
}
// findEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes
func findEtcdStatus(c *Cloud, cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) {
statusMap := make(map[string]*kops.EtcdClusterStatus)
volumes, err := getAllVolumesByRegion(c, c.RegionName)
if err != nil {
return nil, fmt.Errorf("failed to get all volumes by region from %s: %v", c.RegionName, err)
}
for _, volume := range volumes {
volumeID := volume.ID
etcdClusterName := ""
var etcdClusterSpec *etcd.EtcdClusterSpec
for _, myTag := range volume.Tags {
klog.V(8).Infof("findEtcdStatus status (from cloud): checking if volume with tag %q belongs to cluster", myTag)
// check if volume belongs to this cluster.
// tag will be in the format "KubernetesCluster:dev5-k8s-local" (where clusterName is dev5.k8s.local)
clusterName := strings.Replace(cluster.Name, ".", "-", -1)
if strings.Contains(myTag, fmt.Sprintf("%s:%s", TagKubernetesClusterNamePrefix, clusterName)) {
klog.V(10).Infof("findEtcdStatus cluster comparison matched for tag: %v", myTag)
// this volume belongs to our cluster, add this to our etcdClusterSpec.
// loop through the tags again and
for _, volumeTag := range volume.Tags {
if strings.Contains(volumeTag, TagKubernetesClusterIndex) {
volumeTagParts := strings.Split(volumeTag, ":")
if len(volumeTagParts) < 2 {
return nil, fmt.Errorf("volume tag split failed, too few components for tag %q on volume %q", volumeTag, volume)
}
dropletIndex := volumeTagParts[1]
etcdClusterSpec, err = c.getEtcdClusterSpec(volume.Name, dropletIndex)
if err != nil {
return nil, fmt.Errorf("error parsing etcd cluster tag %q on volume %q: %v", volumeTag, volumeID, err)
}
klog.V(10).Infof("findEtcdStatus etcdClusterSpec: %v", fi.DebugAsJsonString(etcdClusterSpec))
etcdClusterName = etcdClusterSpec.ClusterKey
status := statusMap[etcdClusterName]
if status == nil {
status = &kops.EtcdClusterStatus{
Name: etcdClusterName,
}
statusMap[etcdClusterName] = status
}
memberName := etcdClusterSpec.NodeName
status.Members = append(status.Members, &kops.EtcdMemberStatus{
Name: memberName,
VolumeId: volume.ID,
})
}
}
}
}
}
status := make([]kops.EtcdClusterStatus, 0, len(statusMap))
for _, v := range statusMap {
status = append(status, *v)
}
return status, nil
}
func (c *Cloud) getEtcdClusterSpec(volumeName string, dropletName string) (*etcd.EtcdClusterSpec, error) {
var clusterKey string
if strings.Contains(volumeName, "etcd-main") {
clusterKey = "main"
} else if strings.Contains(volumeName, "etcd-events") {
clusterKey = "events"
} else {
return nil, fmt.Errorf("could not determine etcd cluster type for volume: %s", volumeName)
}
return &etcd.EtcdClusterSpec{
ClusterKey: clusterKey,
NodeName: dropletName,
NodeNames: []string{dropletName},
}, nil
}
func getCloudGroups(c *Cloud, cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
nodeMap := cloudinstances.GetNodeMap(nodes, cluster)
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
instanceGroups, err := FindInstanceGroups(c, cluster.ObjectMeta.Name)
if err != nil {
return nil, fmt.Errorf("unable to find autoscale groups: %v", err)
}
for _, doGroup := range instanceGroups {
name := doGroup.InstanceGroupName
instancegroup, err := matchInstanceGroup(name, cluster.ObjectMeta.Name, instancegroups)
if err != nil {
return nil, fmt.Errorf("error getting instance group for doGroup %q", name)
}
if instancegroup == nil {
if warnUnmatched {
klog.Warningf("Found doGroup with no corresponding instance group %q", name)
}
continue
}
groups[instancegroup.ObjectMeta.Name], err = buildCloudInstanceGroup(c, instancegroup, doGroup, nodeMap)
if err != nil {
return nil, fmt.Errorf("error getting cloud instance group %q: %v", instancegroup.ObjectMeta.Name, err)
}
}
klog.V(8).Infof("Cloud Instance Group Info = %v", groups)
return groups, nil
}
// FindInstanceGroups finds instance groups matching the specified tags
func FindInstanceGroups(c *Cloud, clusterName string) ([]DOInstanceGroup, error) {
var result []DOInstanceGroup
instanceGroupMap := make(map[string][]string) // map of instance group name with droplet ids
clusterTag := "KubernetesCluster:" + strings.Replace(clusterName, ".", "-", -1)
droplets, err := getAllDropletsByTag(c, clusterTag)
if err != nil {
return nil, fmt.Errorf("get all droplets for tag %s returned error. Error=%v", clusterTag, err)
}
instanceGroupName := ""
for _, droplet := range droplets {
doInstanceGroup, err := getDropletInstanceGroup(droplet.Tags)
if err != nil {
return nil, fmt.Errorf("get droplets Instance group for tags %v returned error. Error=%v", droplet.Tags, err)
}
instanceGroupName = fmt.Sprintf("%s-%s", clusterName, doInstanceGroup)
instanceGroupMap[instanceGroupName] = append(instanceGroupMap[instanceGroupName], strconv.Itoa(droplet.ID))
result = append(result, DOInstanceGroup{
InstanceGroupName: instanceGroupName,
GroupType: instanceGroupName,
ClusterName: clusterName,
Members: instanceGroupMap[instanceGroupName],
})
}
klog.V(8).Infof("InstanceGroup Info = %v", result)
return result, nil
}
func getDropletInstanceGroup(tags []string) (string, error) {
for _, tag := range tags {
klog.V(8).Infof("Check tag = %s", tag)
if strings.Contains(strings.ToLower(tag), TagKubernetesInstanceGroup) {
tagParts := strings.Split(tag, ":")
if len(tagParts) < 2 {
return "", fmt.Errorf("tag split failed, too few components for tag %q", tag)
}
return tagParts[1], nil
}
}
return "", fmt.Errorf("Didn't find k8s-instancegroup for tag %v", tags)
}
// matchInstanceGroup filters a list of instancegroups for recognized cloud groups
func matchInstanceGroup(name string, clusterName string, instancegroups []*kops.InstanceGroup) (*kops.InstanceGroup, error) {
var instancegroup *kops.InstanceGroup
for _, g := range instancegroups {
var groupName string
switch g.Spec.Role {
case kops.InstanceGroupRoleMaster, kops.InstanceGroupRoleNode:
groupName = clusterName + "-" + g.ObjectMeta.Name
default:
klog.Warningf("Ignoring InstanceGroup of unknown role %q", g.Spec.Role)
continue
}
if name == groupName {
if instancegroup != nil {
return nil, fmt.Errorf("found multiple instance groups matching servergrp %q", groupName)
}
instancegroup = g
}
}
return instancegroup, nil
}
func buildCloudInstanceGroup(c *Cloud, ig *kops.InstanceGroup, g DOInstanceGroup, nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error) {
cg := &cloudinstances.CloudInstanceGroup{
HumanName: g.InstanceGroupName,
InstanceGroup: ig,
Raw: g,
MinSize: int(fi.Int32Value(ig.Spec.MinSize)),
TargetSize: int(fi.Int32Value(ig.Spec.MinSize)),
MaxSize: int(fi.Int32Value(ig.Spec.MaxSize)),
}
for _, member := range g.Members {
// TODO use a hash of the godo.DropletCreateRequest fields to calculate the second parameter.
_, err := cg.NewCloudInstance(member, cloudinstances.CloudInstanceStatusUpToDate, nodeMap[member])
if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
}
}
return cg, nil
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/resources" "k8s.io/kops/pkg/resources"
"k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/do"
) )
const ( const (
@ -42,7 +43,7 @@ const (
type listFn func(fi.Cloud, string) ([]*resources.Resource, error) type listFn func(fi.Cloud, string) ([]*resources.Resource, error)
func ListResources(cloud *Cloud, clusterName string) (map[string]*resources.Resource, error) { func ListResources(cloud do.DOCloud, clusterName string) (map[string]*resources.Resource, error) {
resourceTrackers := make(map[string]*resources.Resource) resourceTrackers := make(map[string]*resources.Resource)
listFunctions := []listFn{ listFunctions := []listFn{
@ -65,13 +66,12 @@ func ListResources(cloud *Cloud, clusterName string) (map[string]*resources.Reso
return resourceTrackers, nil return resourceTrackers, nil
} }
func listDroplets(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) { func listDroplets(cloud do.DOCloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(*Cloud)
var resourceTrackers []*resources.Resource var resourceTrackers []*resources.Resource
clusterTag := "KubernetesCluster:" + strings.Replace(clusterName, ".", "-", -1) clusterTag := "KubernetesCluster:" + strings.Replace(clusterName, ".", "-", -1)
droplets, err := getAllDropletsByTag(c, clusterTag) droplets, err := cloud.GetAllDropletsByTag(clusterTag)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to list droplets: %v", err) return nil, fmt.Errorf("failed to list droplets: %v", err)
} }
@ -92,40 +92,12 @@ func listDroplets(cloud fi.Cloud, clusterName string) ([]*resources.Resource, er
return resourceTrackers, nil return resourceTrackers, nil
} }
func getAllDropletsByTag(cloud *Cloud, tag string) ([]godo.Droplet, error) { func listVolumes(cloud do.DOCloud, clusterName string) ([]*resources.Resource, error) {
allDroplets := []godo.Droplet{}
opt := &godo.ListOptions{}
for {
droplets, resp, err := cloud.Droplets().ListByTag(context.TODO(), tag, opt)
if err != nil {
return nil, err
}
allDroplets = append(allDroplets, droplets...)
if resp.Links == nil || resp.Links.IsLastPage() {
break
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}
opt.Page = page + 1
}
return allDroplets, nil
}
func listVolumes(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(*Cloud)
var resourceTrackers []*resources.Resource var resourceTrackers []*resources.Resource
volumeMatch := strings.Replace(clusterName, ".", "-", -1) volumeMatch := strings.Replace(clusterName, ".", "-", -1)
volumes, err := getAllVolumesByRegion(c, c.Region()) volumes, err := cloud.GetAllVolumesByRegion()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to list volumes: %s", err) return nil, fmt.Errorf("failed to list volumes: %s", err)
} }
@ -153,41 +125,8 @@ func listVolumes(cloud fi.Cloud, clusterName string) ([]*resources.Resource, err
return resourceTrackers, nil return resourceTrackers, nil
} }
func getAllVolumesByRegion(cloud *Cloud, region string) ([]godo.Volume, error) { func listDNS(cloud do.DOCloud, clusterName string) ([]*resources.Resource, error) {
allVolumes := []godo.Volume{} domains, _, err := cloud.DomainService.List(context.TODO(), &godo.ListOptions{})
opt := &godo.ListOptions{}
for {
volumes, resp, err := cloud.Volumes().ListVolumes(context.TODO(), &godo.ListVolumeParams{
Region: region,
ListOptions: opt,
})
if err != nil {
return nil, err
}
allVolumes = append(allVolumes, volumes...)
if resp.Links == nil || resp.Links.IsLastPage() {
break
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}
opt.Page = page + 1
}
return allVolumes, nil
}
func listDNS(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(*Cloud)
domains, _, err := c.Client.Domains.List(context.TODO(), &godo.ListOptions{})
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to list domains: %s", err) return nil, fmt.Errorf("failed to list domains: %s", err)
} }
@ -242,12 +181,12 @@ func listDNS(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error)
return resourceTrackers, nil return resourceTrackers, nil
} }
func getAllRecordsByDomain(cloud *Cloud, domain string) ([]godo.DomainRecord, error) { func getAllRecordsByDomain(cloud do.DOCloud, domain string) ([]godo.DomainRecord, error) {
allRecords := []godo.DomainRecord{} allRecords := []godo.DomainRecord{}
opt := &godo.ListOptions{} opt := &godo.ListOptions{}
for { for {
records, resp, err := cloud.Client.Domains.Records(context.TODO(), domain, opt) records, resp, err := cloud.DomainService.Records(context.TODO(), domain, opt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -269,13 +208,12 @@ func getAllRecordsByDomain(cloud *Cloud, domain string) ([]godo.DomainRecord, er
return allRecords, nil return allRecords, nil
} }
func listLoadBalancers(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) { func listLoadBalancers(cloud do.DOCloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(*Cloud)
var resourceTrackers []*resources.Resource var resourceTrackers []*resources.Resource
clusterTag := "KubernetesCluster-Master:" + strings.Replace(clusterName, ".", "-", -1) clusterTag := "KubernetesCluster-Master:" + strings.Replace(clusterName, ".", "-", -1)
lbs, err := getAllLoadBalancers(c) lbs, err := cloud.GetAllLoadBalancers()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to list lbs: %v", err) return nil, fmt.Errorf("failed to list lbs: %v", err)
} }
@ -303,42 +241,13 @@ func listLoadBalancers(cloud fi.Cloud, clusterName string) ([]*resources.Resourc
return resourceTrackers, nil return resourceTrackers, nil
} }
func getAllLoadBalancers(cloud *Cloud) ([]godo.LoadBalancer, error) { func deleteDroplet(cloud do.DOCloud, t *resources.Resource) error {
allLoadBalancers := []godo.LoadBalancer{}
opt := &godo.ListOptions{}
for {
lbs, resp, err := cloud.LoadBalancers().List(context.TODO(), opt)
if err != nil {
return nil, err
}
allLoadBalancers = append(allLoadBalancers, lbs...)
if resp.Links == nil || resp.Links.IsLastPage() {
break
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}
opt.Page = page + 1
}
return allLoadBalancers, nil
}
func deleteDroplet(cloud fi.Cloud, t *resources.Resource) error {
c := cloud.(*Cloud)
dropletID, err := strconv.Atoi(t.ID) dropletID, err := strconv.Atoi(t.ID)
if err != nil { if err != nil {
return fmt.Errorf("failed to convert droplet ID to int: %s", err) return fmt.Errorf("failed to convert droplet ID to int: %s", err)
} }
_, err = c.Droplets().Delete(context.TODO(), dropletID) _, err = cloud.DropletsService().Delete(context.TODO(), dropletID)
if err != nil { if err != nil {
return fmt.Errorf("failed to delete droplet: %d, err: %s", dropletID, err) return fmt.Errorf("failed to delete droplet: %d, err: %s", dropletID, err)
} }
@ -346,12 +255,10 @@ func deleteDroplet(cloud fi.Cloud, t *resources.Resource) error {
return nil return nil
} }
func deleteVolume(cloud fi.Cloud, t *resources.Resource) error { func deleteVolume(cloud do.DOCloud, t *resources.Resource) error {
c := cloud.(*Cloud)
volume := t.Obj.(godo.Volume) volume := t.Obj.(godo.Volume)
for _, dropletID := range volume.DropletIDs { for _, dropletID := range volume.DropletIDs {
action, _, err := c.VolumeActions().DetachByDropletID(context.TODO(), volume.ID, dropletID) action, _, err := cloud.VolumeActionService().DetachByDropletID(context.TODO(), volume.ID, dropletID)
if err != nil { if err != nil {
return fmt.Errorf("failed to detach volume: %s, err: %s", volume.ID, err) return fmt.Errorf("failed to detach volume: %s, err: %s", volume.ID, err)
} }
@ -360,7 +267,7 @@ func deleteVolume(cloud fi.Cloud, t *resources.Resource) error {
} }
} }
_, err := c.Volumes().DeleteVolume(context.TODO(), t.ID) _, err := cloud.VolumeService().DeleteVolume(context.TODO(), t.ID)
if err != nil { if err != nil {
return fmt.Errorf("failed to delete volume: %s, err: %s", t.ID, err) return fmt.Errorf("failed to delete volume: %s, err: %s", t.ID, err)
} }
@ -368,11 +275,10 @@ func deleteVolume(cloud fi.Cloud, t *resources.Resource) error {
return nil return nil
} }
func deleteRecord(cloud fi.Cloud, domain string, t *resources.Resource) error { func deleteRecord(cloud do.DOCloud, domain string, t *resources.Resource) error {
c := cloud.(*Cloud)
record := t.Obj.(godo.DomainRecord) record := t.Obj.(godo.DomainRecord)
_, err := c.Client.Domains.DeleteRecord(context.TODO(), domain, record.ID) _, err := cloud.DomainService.DeleteRecord(context.TODO(), domain, record.ID)
if err != nil { if err != nil {
return fmt.Errorf("failed to delete record for domain %s: %d", domain, record.ID) return fmt.Errorf("failed to delete record for domain %s: %d", domain, record.ID)
} }
@ -380,10 +286,9 @@ func deleteRecord(cloud fi.Cloud, domain string, t *resources.Resource) error {
return nil return nil
} }
func deleteLoadBalancer(cloud fi.Cloud, t *resources.Resource) error { func deleteLoadBalancer(cloud do.DOCloud, t *resources.Resource) error {
c := cloud.(*Cloud)
lb := t.Obj.(godo.LoadBalancer) lb := t.Obj.(godo.LoadBalancer)
_, err := c.Client.LoadBalancers.Delete(context.TODO(), lb.ID) _, err := cloud.LoadBalancersService.Delete(context.TODO(), lb.ID)
if err != nil { if err != nil {
return fmt.Errorf("failed to delete load balancer with name %s %v", lb.Name, err) return fmt.Errorf("failed to delete load balancer with name %s %v", lb.Name, err)

View File

@ -17,17 +17,16 @@ limitations under the License.
package do package do
import ( import (
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi"
) )
type DOAPITarget struct { type DOAPITarget struct {
Cloud *digitalocean.Cloud Cloud DOCloud
} }
var _ fi.Target = &DOAPITarget{} var _ fi.Target = &DOAPITarget{}
func NewDOAPITarget(cloud *digitalocean.Cloud) *DOAPITarget { func NewDOAPITarget(cloud DOCloud) *DOAPITarget {
return &DOAPITarget{ return &DOAPITarget{
Cloud: cloud, Cloud: cloud,
} }

View File

@ -14,27 +14,444 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package do package digitalocean
import ( import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings" "strings"
"time"
"k8s.io/kops/pkg/resources/digitalocean" "github.com/digitalocean/godo"
"golang.org/x/oauth2"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/kops/dnsprovider/pkg/dnsprovider"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/resources/digitalocean/dns"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi"
) )
const TagKubernetesClusterIndex = "k8s-index" const TagKubernetesClusterIndex = "k8s-index"
const TagNameEtcdClusterPrefix = "etcdCluster-"
const TagNameRolePrefix = "k8s.io/role/"
const TagKubernetesClusterNamePrefix = "KubernetesCluster" const TagKubernetesClusterNamePrefix = "KubernetesCluster"
const TagKubernetesClusterMasterPrefix = "KubernetesCluster-Master" const TagKubernetesInstanceGroup = "kops-instancegroup"
func SafeClusterName(clusterName string) string { // TokenSource implements oauth2.TokenSource
// DO does not support . in tags / names type TokenSource struct {
safeClusterName := strings.Replace(clusterName, ".", "-", -1) AccessToken string
return safeClusterName
} }
func NewDOCloud(region string) (fi.Cloud, error) { // Token() returns oauth2.Token
return digitalocean.NewCloud(region) func (t *TokenSource) Token() (*oauth2.Token, error) {
token := &oauth2.Token{
AccessToken: t.AccessToken,
}
return token, nil
}
// Cloud exposes all the interfaces required to operate on DigitalOcean resources
type Cloud struct {
Client *godo.Client
dns dnsprovider.Interface
// RegionName holds the region, renamed to avoid conflict with Region()
RegionName string
}
type DOInstanceGroup struct {
ClusterName string
InstanceGroupName string
GroupType string // will be either "master" or "worker"
Members []string // will store the droplet names that matches.
}
var _ fi.Cloud = &Cloud{}
// NewCloud returns a Cloud, expecting the env var DIGITALOCEAN_ACCESS_TOKEN
// NewCloud will return an err if DIGITALOCEAN_ACCESS_TOKEN is not defined
func NewCloud(region string) (*Cloud, error) {
accessToken := os.Getenv("DIGITALOCEAN_ACCESS_TOKEN")
if accessToken == "" {
return nil, errors.New("DIGITALOCEAN_ACCESS_TOKEN is required")
}
tokenSource := &TokenSource{
AccessToken: accessToken,
}
oauthClient := oauth2.NewClient(context.TODO(), tokenSource)
client := godo.NewClient(oauthClient)
return &Cloud{
Client: client,
dns: dns.NewProvider(client),
RegionName: region,
}, nil
}
func (c *Cloud) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
return getCloudGroups(c, cluster, instancegroups, warnUnmatched, nodes)
}
// DeleteGroup is not implemented yet, is a func that needs to delete a DO instance group.
func (c *Cloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
klog.V(8).Info("digitalocean cloud provider DeleteGroup not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support deleting cloud groups at this time")
}
func (c *Cloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
dropletID, err := strconv.Atoi(i.ID)
if err != nil {
return fmt.Errorf("failed to convert droplet ID to int: %s", err)
}
_, _, err = c.Client.DropletActions.Shutdown(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error stopping instance %q: %v", dropletID, err)
}
// Wait for 5 min to stop the instance
for i := 0; i < 5; i++ {
droplet, _, err := c.Client.Droplets.Get(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error describing instance %q: %v", dropletID, err)
}
klog.V(8).Infof("stopping DO instance %q, current Status: %q", droplet, droplet.Status)
if droplet.Status == "off" {
break
}
if i == 5 {
return fmt.Errorf("fail to stop DO instance %v in 5 mins", dropletID)
}
time.Sleep(time.Minute * 1)
}
_, err = c.Client.Droplets.Delete(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error stopping instance %q: %v", dropletID, err)
}
klog.V(8).Infof("deleted droplet instance %q", dropletID)
return nil
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("digitalocean cloud provider DetachInstance not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support surging")
}
// ProviderID returns the kops api identifier for DigitalOcean cloud provider
func (c *Cloud) ProviderID() kops.CloudProviderID {
return kops.CloudProviderDO
}
// Region returns the DO region we will target
func (c *Cloud) Region() string {
return c.RegionName
}
// DNS returns a DO implementation for dnsprovider.Interface
func (c *Cloud) DNS() (dnsprovider.Interface, error) {
return c.dns, nil
}
// Volumes returns an implementation of godo.StorageService
func (c *Cloud) Volumes() godo.StorageService {
return c.Client.Storage
}
// VolumeActions returns an implementation of godo.StorageActionsService
func (c *Cloud) VolumeActions() godo.StorageActionsService {
return c.Client.StorageActions
}
func (c *Cloud) Droplets() godo.DropletsService {
return c.Client.Droplets
}
func (c *Cloud) DropletActions() godo.DropletActionsService {
return c.Client.DropletActions
}
func (c *Cloud) LoadBalancers() godo.LoadBalancersService {
return c.Client.LoadBalancers
}
func (c *Cloud) GetAllLoadBalancers() ([]godo.LoadBalancer, error) {
return getAllLoadBalancers(c)
}
// FindVPCInfo is not implemented, it's only here to satisfy the fi.Cloud interface
func (c *Cloud) FindVPCInfo(id string) (*fi.VPCInfo, error) {
return nil, errors.New("not implemented")
}
func (c *Cloud) GetApiIngressStatus(cluster *kops.Cluster) ([]fi.ApiIngressStatus, error) {
var ingresses []fi.ApiIngressStatus
if cluster.Spec.MasterPublicName != "" {
// Note that this must match Digital Ocean's lb name
klog.V(2).Infof("Querying DO to find Loadbalancers for API (%q)", cluster.Name)
loadBalancers, err := getAllLoadBalancers(c)
if err != nil {
return nil, fmt.Errorf("LoadBalancers.List returned error: %v", err)
}
lbName := "api-" + strings.Replace(cluster.Name, ".", "-", -1)
for _, lb := range loadBalancers {
if lb.Name == lbName {
klog.V(10).Infof("Matching LB name found for API (%q)", cluster.Name)
if lb.Status != "active" {
return nil, fmt.Errorf("load-balancer is not yet active (current status: %s)", lb.Status)
}
address := lb.IP
ingresses = append(ingresses, fi.ApiIngressStatus{IP: address})
return ingresses, nil
}
}
}
return nil, nil
}
// FindClusterStatus discovers the status of the cluster, by looking for the tagged etcd volumes
func (c *Cloud) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
etcdStatus, err := findEtcdStatus(c, cluster)
if err != nil {
return nil, err
}
status := &kops.ClusterStatus{
EtcdClusters: etcdStatus,
}
klog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status))
return status, nil
}
// findEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes
func findEtcdStatus(c *Cloud, cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) {
statusMap := make(map[string]*kops.EtcdClusterStatus)
volumes, err := getAllVolumesByRegion(c, c.RegionName)
if err != nil {
return nil, fmt.Errorf("failed to get all volumes by region from %s: %v", c.RegionName, err)
}
for _, volume := range volumes {
volumeID := volume.ID
etcdClusterName := ""
var etcdClusterSpec *etcd.EtcdClusterSpec
for _, myTag := range volume.Tags {
klog.V(8).Infof("findEtcdStatus status (from cloud): checking if volume with tag %q belongs to cluster", myTag)
// check if volume belongs to this cluster.
// tag will be in the format "KubernetesCluster:dev5-k8s-local" (where clusterName is dev5.k8s.local)
clusterName := strings.Replace(cluster.Name, ".", "-", -1)
if strings.Contains(myTag, fmt.Sprintf("%s:%s", TagKubernetesClusterNamePrefix, clusterName)) {
klog.V(10).Infof("findEtcdStatus cluster comparison matched for tag: %v", myTag)
// this volume belongs to our cluster, add this to our etcdClusterSpec.
// loop through the tags again and
for _, volumeTag := range volume.Tags {
if strings.Contains(volumeTag, TagKubernetesClusterIndex) {
volumeTagParts := strings.Split(volumeTag, ":")
if len(volumeTagParts) < 2 {
return nil, fmt.Errorf("volume tag split failed, too few components for tag %q on volume %q", volumeTag, volume)
}
dropletIndex := volumeTagParts[1]
etcdClusterSpec, err = c.getEtcdClusterSpec(volume.Name, dropletIndex)
if err != nil {
return nil, fmt.Errorf("error parsing etcd cluster tag %q on volume %q: %v", volumeTag, volumeID, err)
}
klog.V(10).Infof("findEtcdStatus etcdClusterSpec: %v", fi.DebugAsJsonString(etcdClusterSpec))
etcdClusterName = etcdClusterSpec.ClusterKey
status := statusMap[etcdClusterName]
if status == nil {
status = &kops.EtcdClusterStatus{
Name: etcdClusterName,
}
statusMap[etcdClusterName] = status
}
memberName := etcdClusterSpec.NodeName
status.Members = append(status.Members, &kops.EtcdMemberStatus{
Name: memberName,
VolumeId: volume.ID,
})
}
}
}
}
}
status := make([]kops.EtcdClusterStatus, 0, len(statusMap))
for _, v := range statusMap {
status = append(status, *v)
}
return status, nil
}
func (c *Cloud) getEtcdClusterSpec(volumeName string, dropletName string) (*etcd.EtcdClusterSpec, error) {
var clusterKey string
if strings.Contains(volumeName, "etcd-main") {
clusterKey = "main"
} else if strings.Contains(volumeName, "etcd-events") {
clusterKey = "events"
} else {
return nil, fmt.Errorf("could not determine etcd cluster type for volume: %s", volumeName)
}
return &etcd.EtcdClusterSpec{
ClusterKey: clusterKey,
NodeName: dropletName,
NodeNames: []string{dropletName},
}, nil
}
func getCloudGroups(c *Cloud, cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
nodeMap := cloudinstances.GetNodeMap(nodes, cluster)
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
instanceGroups, err := FindInstanceGroups(c, cluster.ObjectMeta.Name)
if err != nil {
return nil, fmt.Errorf("unable to find autoscale groups: %v", err)
}
for _, doGroup := range instanceGroups {
name := doGroup.InstanceGroupName
instancegroup, err := matchInstanceGroup(name, cluster.ObjectMeta.Name, instancegroups)
if err != nil {
return nil, fmt.Errorf("error getting instance group for doGroup %q", name)
}
if instancegroup == nil {
if warnUnmatched {
klog.Warningf("Found doGroup with no corresponding instance group %q", name)
}
continue
}
groups[instancegroup.ObjectMeta.Name], err = buildCloudInstanceGroup(c, instancegroup, doGroup, nodeMap)
if err != nil {
return nil, fmt.Errorf("error getting cloud instance group %q: %v", instancegroup.ObjectMeta.Name, err)
}
}
klog.V(8).Infof("Cloud Instance Group Info = %v", groups)
return groups, nil
}
// FindInstanceGroups finds instance groups matching the specified tags
func FindInstanceGroups(c *Cloud, clusterName string) ([]DOInstanceGroup, error) {
var result []DOInstanceGroup
instanceGroupMap := make(map[string][]string) // map of instance group name with droplet ids
clusterTag := "KubernetesCluster:" + strings.Replace(clusterName, ".", "-", -1)
droplets, err := getAllDropletsByTag(c, clusterTag)
if err != nil {
return nil, fmt.Errorf("get all droplets for tag %s returned error. Error=%v", clusterTag, err)
}
instanceGroupName := ""
for _, droplet := range droplets {
doInstanceGroup, err := getDropletInstanceGroup(droplet.Tags)
if err != nil {
return nil, fmt.Errorf("get droplets Instance group for tags %v returned error. Error=%v", droplet.Tags, err)
}
instanceGroupName = fmt.Sprintf("%s-%s", clusterName, doInstanceGroup)
instanceGroupMap[instanceGroupName] = append(instanceGroupMap[instanceGroupName], strconv.Itoa(droplet.ID))
result = append(result, DOInstanceGroup{
InstanceGroupName: instanceGroupName,
GroupType: instanceGroupName,
ClusterName: clusterName,
Members: instanceGroupMap[instanceGroupName],
})
}
klog.V(8).Infof("InstanceGroup Info = %v", result)
return result, nil
}
func getDropletInstanceGroup(tags []string) (string, error) {
for _, tag := range tags {
klog.V(8).Infof("Check tag = %s", tag)
if strings.Contains(strings.ToLower(tag), TagKubernetesInstanceGroup) {
tagParts := strings.Split(tag, ":")
if len(tagParts) < 2 {
return "", fmt.Errorf("tag split failed, too few components for tag %q", tag)
}
return tagParts[1], nil
}
}
return "", fmt.Errorf("Didn't find k8s-instancegroup for tag %v", tags)
}
// matchInstanceGroup filters a list of instancegroups for recognized cloud groups
func matchInstanceGroup(name string, clusterName string, instancegroups []*kops.InstanceGroup) (*kops.InstanceGroup, error) {
var instancegroup *kops.InstanceGroup
for _, g := range instancegroups {
var groupName string
switch g.Spec.Role {
case kops.InstanceGroupRoleMaster, kops.InstanceGroupRoleNode:
groupName = clusterName + "-" + g.ObjectMeta.Name
default:
klog.Warningf("Ignoring InstanceGroup of unknown role %q", g.Spec.Role)
continue
}
if name == groupName {
if instancegroup != nil {
return nil, fmt.Errorf("found multiple instance groups matching servergrp %q", groupName)
}
instancegroup = g
}
}
return instancegroup, nil
}
func buildCloudInstanceGroup(c *Cloud, ig *kops.InstanceGroup, g DOInstanceGroup, nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error) {
cg := &cloudinstances.CloudInstanceGroup{
HumanName: g.InstanceGroupName,
InstanceGroup: ig,
Raw: g,
MinSize: int(fi.Int32Value(ig.Spec.MinSize)),
TargetSize: int(fi.Int32Value(ig.Spec.MinSize)),
MaxSize: int(fi.Int32Value(ig.Spec.MaxSize)),
}
for _, member := range g.Members {
// TODO use a hash of the godo.DropletCreateRequest fields to calculate the second parameter.
_, err := cg.NewCloudInstance(member, cloudinstances.CloudInstanceStatusUpToDate, nodeMap[member])
if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
}
}
return cg, nil
} }

View File

@ -0,0 +1,9 @@
package do
import "strings"
func SafeClusterName(clusterName string) string {
// DO does not support . in tags / names
safeClusterName := strings.Replace(clusterName, ".", "-", -1)
return safeClusterName
}