Merge pull request #11592 from srikiz/DO-Use-Interfaces

[Digital Ocean] Code cleanup with no functional modifications
This commit is contained in:
Kubernetes Prow Robot 2021-06-01 07:18:27 -07:00 committed by GitHub
commit 91d8ffeea5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 793 additions and 642 deletions

View File

@ -13,7 +13,6 @@ go_library(
"//pkg/apis/kops:go_default_library",
"//pkg/dns:go_default_library",
"//pkg/model:go_default_library",
"//pkg/resources/digitalocean:go_default_library",
"//upup/pkg/fi:go_default_library",
"//upup/pkg/fi/cloudup/do:go_default_library",
"//upup/pkg/fi/cloudup/dotasks:go_default_library",

View File

@ -21,7 +21,6 @@ import (
"strings"
"k8s.io/kops/pkg/model"
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/do"
"k8s.io/kops/upup/pkg/fi/cloudup/dotasks"
@ -76,9 +75,9 @@ func (d *DropletBuilder) Build(c *fi.ModelBuilderContext) error {
clusterTagIndex := do.TagKubernetesClusterIndex + ":" + "etcd-" + strconv.Itoa(masterIndexCount)
droplet.Tags = append(droplet.Tags, clusterTagIndex)
droplet.Tags = append(droplet.Tags, clusterMasterTag)
droplet.Tags = append(droplet.Tags, digitalocean.TagKubernetesInstanceGroup+":"+ig.Name)
droplet.Tags = append(droplet.Tags, do.TagKubernetesInstanceGroup+":"+ig.Name)
} else {
droplet.Tags = append(droplet.Tags, digitalocean.TagKubernetesInstanceGroup+":"+ig.Name)
droplet.Tags = append(droplet.Tags, do.TagKubernetesInstanceGroup+":"+ig.Name)
}
userData, err := d.BootstrapScriptBuilder.ResourceNodeUp(c, ig)

View File

@ -2,24 +2,16 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"cloud.go",
"resources.go",
],
srcs = ["resources.go"],
importpath = "k8s.io/kops/pkg/resources/digitalocean",
visibility = ["//visibility:public"],
deps = [
"//dns-controller/pkg/dns:go_default_library",
"//dnsprovider/pkg/dnsprovider:go_default_library",
"//pkg/apis/kops:go_default_library",
"//pkg/cloudinstances:go_default_library",
"//pkg/resources:go_default_library",
"//pkg/resources/digitalocean/dns:go_default_library",
"//protokube/pkg/etcd:go_default_library",
"//upup/pkg/fi:go_default_library",
"//upup/pkg/fi/cloudup/do:go_default_library",
"//vendor/github.com/digitalocean/godo:go_default_library",
"//vendor/golang.org/x/oauth2:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

View File

@ -1,457 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package digitalocean
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/digitalocean/godo"
"golang.org/x/oauth2"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/kops/dnsprovider/pkg/dnsprovider"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/resources/digitalocean/dns"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/upup/pkg/fi"
)
const TagKubernetesClusterIndex = "k8s-index"
const TagKubernetesClusterNamePrefix = "KubernetesCluster"
const TagKubernetesInstanceGroup = "kops-instancegroup"
// TokenSource implements oauth2.TokenSource
type TokenSource struct {
AccessToken string
}
// Token() returns oauth2.Token
func (t *TokenSource) Token() (*oauth2.Token, error) {
token := &oauth2.Token{
AccessToken: t.AccessToken,
}
return token, nil
}
// Cloud exposes all the interfaces required to operate on DigitalOcean resources
type Cloud struct {
Client *godo.Client
dns dnsprovider.Interface
// RegionName holds the region, renamed to avoid conflict with Region()
RegionName string
}
type DOInstanceGroup struct {
ClusterName string
InstanceGroupName string
GroupType string // will be either "master" or "worker"
Members []string // will store the droplet names that matches.
}
var _ fi.Cloud = &Cloud{}
// NewCloud returns a Cloud, expecting the env var DIGITALOCEAN_ACCESS_TOKEN
// NewCloud will return an err if DIGITALOCEAN_ACCESS_TOKEN is not defined
func NewCloud(region string) (*Cloud, error) {
accessToken := os.Getenv("DIGITALOCEAN_ACCESS_TOKEN")
if accessToken == "" {
return nil, errors.New("DIGITALOCEAN_ACCESS_TOKEN is required")
}
tokenSource := &TokenSource{
AccessToken: accessToken,
}
oauthClient := oauth2.NewClient(context.TODO(), tokenSource)
client := godo.NewClient(oauthClient)
return &Cloud{
Client: client,
dns: dns.NewProvider(client),
RegionName: region,
}, nil
}
func (c *Cloud) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
return getCloudGroups(c, cluster, instancegroups, warnUnmatched, nodes)
}
// DeleteGroup is not implemented yet, is a func that needs to delete a DO instance group.
func (c *Cloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
klog.V(8).Info("digitalocean cloud provider DeleteGroup not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support deleting cloud groups at this time")
}
func (c *Cloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
dropletID, err := strconv.Atoi(i.ID)
if err != nil {
return fmt.Errorf("failed to convert droplet ID to int: %s", err)
}
_, _, err = c.Client.DropletActions.Shutdown(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error stopping instance %q: %v", dropletID, err)
}
// Wait for 5 min to stop the instance
for i := 0; i < 5; i++ {
droplet, _, err := c.Client.Droplets.Get(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error describing instance %q: %v", dropletID, err)
}
klog.V(8).Infof("stopping DO instance %q, current Status: %q", droplet, droplet.Status)
if droplet.Status == "off" {
break
}
if i == 5 {
return fmt.Errorf("fail to stop DO instance %v in 5 mins", dropletID)
}
time.Sleep(time.Minute * 1)
}
_, err = c.Client.Droplets.Delete(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error stopping instance %q: %v", dropletID, err)
}
klog.V(8).Infof("deleted droplet instance %q", dropletID)
return nil
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("digitalocean cloud provider DetachInstance not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support surging")
}
// ProviderID returns the kops api identifier for DigitalOcean cloud provider
func (c *Cloud) ProviderID() kops.CloudProviderID {
return kops.CloudProviderDO
}
// Region returns the DO region we will target
func (c *Cloud) Region() string {
return c.RegionName
}
// DNS returns a DO implementation for dnsprovider.Interface
func (c *Cloud) DNS() (dnsprovider.Interface, error) {
return c.dns, nil
}
// Volumes returns an implementation of godo.StorageService
func (c *Cloud) Volumes() godo.StorageService {
return c.Client.Storage
}
// VolumeActions returns an implementation of godo.StorageActionsService
func (c *Cloud) VolumeActions() godo.StorageActionsService {
return c.Client.StorageActions
}
func (c *Cloud) Droplets() godo.DropletsService {
return c.Client.Droplets
}
func (c *Cloud) DropletActions() godo.DropletActionsService {
return c.Client.DropletActions
}
func (c *Cloud) LoadBalancers() godo.LoadBalancersService {
return c.Client.LoadBalancers
}
func (c *Cloud) GetAllLoadBalancers() ([]godo.LoadBalancer, error) {
return getAllLoadBalancers(c)
}
// FindVPCInfo is not implemented, it's only here to satisfy the fi.Cloud interface
func (c *Cloud) FindVPCInfo(id string) (*fi.VPCInfo, error) {
return nil, errors.New("not implemented")
}
func (c *Cloud) GetApiIngressStatus(cluster *kops.Cluster) ([]fi.ApiIngressStatus, error) {
var ingresses []fi.ApiIngressStatus
if cluster.Spec.MasterPublicName != "" {
// Note that this must match Digital Ocean's lb name
klog.V(2).Infof("Querying DO to find Loadbalancers for API (%q)", cluster.Name)
loadBalancers, err := getAllLoadBalancers(c)
if err != nil {
return nil, fmt.Errorf("LoadBalancers.List returned error: %v", err)
}
lbName := "api-" + strings.Replace(cluster.Name, ".", "-", -1)
for _, lb := range loadBalancers {
if lb.Name == lbName {
klog.V(10).Infof("Matching LB name found for API (%q)", cluster.Name)
if lb.Status != "active" {
return nil, fmt.Errorf("load-balancer is not yet active (current status: %s)", lb.Status)
}
address := lb.IP
ingresses = append(ingresses, fi.ApiIngressStatus{IP: address})
return ingresses, nil
}
}
}
return nil, nil
}
// FindClusterStatus discovers the status of the cluster, by looking for the tagged etcd volumes
func (c *Cloud) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
etcdStatus, err := findEtcdStatus(c, cluster)
if err != nil {
return nil, err
}
status := &kops.ClusterStatus{
EtcdClusters: etcdStatus,
}
klog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status))
return status, nil
}
// findEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes
func findEtcdStatus(c *Cloud, cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) {
statusMap := make(map[string]*kops.EtcdClusterStatus)
volumes, err := getAllVolumesByRegion(c, c.RegionName)
if err != nil {
return nil, fmt.Errorf("failed to get all volumes by region from %s: %v", c.RegionName, err)
}
for _, volume := range volumes {
volumeID := volume.ID
etcdClusterName := ""
var etcdClusterSpec *etcd.EtcdClusterSpec
for _, myTag := range volume.Tags {
klog.V(8).Infof("findEtcdStatus status (from cloud): checking if volume with tag %q belongs to cluster", myTag)
// check if volume belongs to this cluster.
// tag will be in the format "KubernetesCluster:dev5-k8s-local" (where clusterName is dev5.k8s.local)
clusterName := strings.Replace(cluster.Name, ".", "-", -1)
if strings.Contains(myTag, fmt.Sprintf("%s:%s", TagKubernetesClusterNamePrefix, clusterName)) {
klog.V(10).Infof("findEtcdStatus cluster comparison matched for tag: %v", myTag)
// this volume belongs to our cluster, add this to our etcdClusterSpec.
// loop through the tags again and
for _, volumeTag := range volume.Tags {
if strings.Contains(volumeTag, TagKubernetesClusterIndex) {
volumeTagParts := strings.Split(volumeTag, ":")
if len(volumeTagParts) < 2 {
return nil, fmt.Errorf("volume tag split failed, too few components for tag %q on volume %q", volumeTag, volume)
}
dropletIndex := volumeTagParts[1]
etcdClusterSpec, err = c.getEtcdClusterSpec(volume.Name, dropletIndex)
if err != nil {
return nil, fmt.Errorf("error parsing etcd cluster tag %q on volume %q: %v", volumeTag, volumeID, err)
}
klog.V(10).Infof("findEtcdStatus etcdClusterSpec: %v", fi.DebugAsJsonString(etcdClusterSpec))
etcdClusterName = etcdClusterSpec.ClusterKey
status := statusMap[etcdClusterName]
if status == nil {
status = &kops.EtcdClusterStatus{
Name: etcdClusterName,
}
statusMap[etcdClusterName] = status
}
memberName := etcdClusterSpec.NodeName
status.Members = append(status.Members, &kops.EtcdMemberStatus{
Name: memberName,
VolumeId: volume.ID,
})
}
}
}
}
}
status := make([]kops.EtcdClusterStatus, 0, len(statusMap))
for _, v := range statusMap {
status = append(status, *v)
}
return status, nil
}
func (c *Cloud) getEtcdClusterSpec(volumeName string, dropletName string) (*etcd.EtcdClusterSpec, error) {
var clusterKey string
if strings.Contains(volumeName, "etcd-main") {
clusterKey = "main"
} else if strings.Contains(volumeName, "etcd-events") {
clusterKey = "events"
} else {
return nil, fmt.Errorf("could not determine etcd cluster type for volume: %s", volumeName)
}
return &etcd.EtcdClusterSpec{
ClusterKey: clusterKey,
NodeName: dropletName,
NodeNames: []string{dropletName},
}, nil
}
func getCloudGroups(c *Cloud, cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
nodeMap := cloudinstances.GetNodeMap(nodes, cluster)
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
instanceGroups, err := FindInstanceGroups(c, cluster.ObjectMeta.Name)
if err != nil {
return nil, fmt.Errorf("unable to find autoscale groups: %v", err)
}
for _, doGroup := range instanceGroups {
name := doGroup.InstanceGroupName
instancegroup, err := matchInstanceGroup(name, cluster.ObjectMeta.Name, instancegroups)
if err != nil {
return nil, fmt.Errorf("error getting instance group for doGroup %q", name)
}
if instancegroup == nil {
if warnUnmatched {
klog.Warningf("Found doGroup with no corresponding instance group %q", name)
}
continue
}
groups[instancegroup.ObjectMeta.Name], err = buildCloudInstanceGroup(c, instancegroup, doGroup, nodeMap)
if err != nil {
return nil, fmt.Errorf("error getting cloud instance group %q: %v", instancegroup.ObjectMeta.Name, err)
}
}
klog.V(8).Infof("Cloud Instance Group Info = %v", groups)
return groups, nil
}
// FindInstanceGroups finds instance groups matching the specified tags
func FindInstanceGroups(c *Cloud, clusterName string) ([]DOInstanceGroup, error) {
var result []DOInstanceGroup
instanceGroupMap := make(map[string][]string) // map of instance group name with droplet ids
clusterTag := "KubernetesCluster:" + strings.Replace(clusterName, ".", "-", -1)
droplets, err := getAllDropletsByTag(c, clusterTag)
if err != nil {
return nil, fmt.Errorf("get all droplets for tag %s returned error. Error=%v", clusterTag, err)
}
instanceGroupName := ""
for _, droplet := range droplets {
doInstanceGroup, err := getDropletInstanceGroup(droplet.Tags)
if err != nil {
return nil, fmt.Errorf("get droplets Instance group for tags %v returned error. Error=%v", droplet.Tags, err)
}
instanceGroupName = fmt.Sprintf("%s-%s", clusterName, doInstanceGroup)
instanceGroupMap[instanceGroupName] = append(instanceGroupMap[instanceGroupName], strconv.Itoa(droplet.ID))
result = append(result, DOInstanceGroup{
InstanceGroupName: instanceGroupName,
GroupType: instanceGroupName,
ClusterName: clusterName,
Members: instanceGroupMap[instanceGroupName],
})
}
klog.V(8).Infof("InstanceGroup Info = %v", result)
return result, nil
}
func getDropletInstanceGroup(tags []string) (string, error) {
for _, tag := range tags {
klog.V(8).Infof("Check tag = %s", tag)
if strings.Contains(strings.ToLower(tag), TagKubernetesInstanceGroup) {
tagParts := strings.Split(tag, ":")
if len(tagParts) < 2 {
return "", fmt.Errorf("tag split failed, too few components for tag %q", tag)
}
return tagParts[1], nil
}
}
return "", fmt.Errorf("Didn't find k8s-instancegroup for tag %v", tags)
}
// matchInstanceGroup filters a list of instancegroups for recognized cloud groups
func matchInstanceGroup(name string, clusterName string, instancegroups []*kops.InstanceGroup) (*kops.InstanceGroup, error) {
var instancegroup *kops.InstanceGroup
for _, g := range instancegroups {
var groupName string
switch g.Spec.Role {
case kops.InstanceGroupRoleMaster, kops.InstanceGroupRoleNode:
groupName = clusterName + "-" + g.ObjectMeta.Name
default:
klog.Warningf("Ignoring InstanceGroup of unknown role %q", g.Spec.Role)
continue
}
if name == groupName {
if instancegroup != nil {
return nil, fmt.Errorf("found multiple instance groups matching servergrp %q", groupName)
}
instancegroup = g
}
}
return instancegroup, nil
}
func buildCloudInstanceGroup(c *Cloud, ig *kops.InstanceGroup, g DOInstanceGroup, nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error) {
cg := &cloudinstances.CloudInstanceGroup{
HumanName: g.InstanceGroupName,
InstanceGroup: ig,
Raw: g,
MinSize: int(fi.Int32Value(ig.Spec.MinSize)),
TargetSize: int(fi.Int32Value(ig.Spec.MinSize)),
MaxSize: int(fi.Int32Value(ig.Spec.MaxSize)),
}
for _, member := range g.Members {
// TODO use a hash of the godo.DropletCreateRequest fields to calculate the second parameter.
_, err := cg.NewCloudInstance(member, cloudinstances.CloudInstanceStatusUpToDate, nodeMap[member])
if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
}
}
return cg, nil
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/resources"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/do"
)
const (
@ -42,7 +43,7 @@ const (
type listFn func(fi.Cloud, string) ([]*resources.Resource, error)
func ListResources(cloud *Cloud, clusterName string) (map[string]*resources.Resource, error) {
func ListResources(cloud do.DOCloud, clusterName string) (map[string]*resources.Resource, error) {
resourceTrackers := make(map[string]*resources.Resource)
listFunctions := []listFn{
@ -66,12 +67,12 @@ func ListResources(cloud *Cloud, clusterName string) (map[string]*resources.Reso
}
func listDroplets(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(*Cloud)
c := cloud.(do.DOCloud)
var resourceTrackers []*resources.Resource
clusterTag := "KubernetesCluster:" + strings.Replace(clusterName, ".", "-", -1)
droplets, err := getAllDropletsByTag(c, clusterTag)
droplets, err := c.GetAllDropletsByTag(clusterTag)
if err != nil {
return nil, fmt.Errorf("failed to list droplets: %v", err)
}
@ -92,40 +93,13 @@ func listDroplets(cloud fi.Cloud, clusterName string) ([]*resources.Resource, er
return resourceTrackers, nil
}
func getAllDropletsByTag(cloud *Cloud, tag string) ([]godo.Droplet, error) {
allDroplets := []godo.Droplet{}
opt := &godo.ListOptions{}
for {
droplets, resp, err := cloud.Droplets().ListByTag(context.TODO(), tag, opt)
if err != nil {
return nil, err
}
allDroplets = append(allDroplets, droplets...)
if resp.Links == nil || resp.Links.IsLastPage() {
break
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}
opt.Page = page + 1
}
return allDroplets, nil
}
func listVolumes(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(*Cloud)
c := cloud.(do.DOCloud)
var resourceTrackers []*resources.Resource
volumeMatch := strings.Replace(clusterName, ".", "-", -1)
volumes, err := getAllVolumesByRegion(c, c.Region())
volumes, err := c.GetAllVolumesByRegion()
if err != nil {
return nil, fmt.Errorf("failed to list volumes: %s", err)
}
@ -153,41 +127,9 @@ func listVolumes(cloud fi.Cloud, clusterName string) ([]*resources.Resource, err
return resourceTrackers, nil
}
func getAllVolumesByRegion(cloud *Cloud, region string) ([]godo.Volume, error) {
allVolumes := []godo.Volume{}
opt := &godo.ListOptions{}
for {
volumes, resp, err := cloud.Volumes().ListVolumes(context.TODO(), &godo.ListVolumeParams{
Region: region,
ListOptions: opt,
})
if err != nil {
return nil, err
}
allVolumes = append(allVolumes, volumes...)
if resp.Links == nil || resp.Links.IsLastPage() {
break
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}
opt.Page = page + 1
}
return allVolumes, nil
}
func listDNS(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(*Cloud)
domains, _, err := c.Client.Domains.List(context.TODO(), &godo.ListOptions{})
c := cloud.(do.DOCloud)
domains, _, err := c.DomainService().List(context.TODO(), &godo.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list domains: %s", err)
}
@ -242,12 +184,12 @@ func listDNS(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error)
return resourceTrackers, nil
}
func getAllRecordsByDomain(cloud *Cloud, domain string) ([]godo.DomainRecord, error) {
func getAllRecordsByDomain(cloud do.DOCloud, domain string) ([]godo.DomainRecord, error) {
allRecords := []godo.DomainRecord{}
opt := &godo.ListOptions{}
for {
records, resp, err := cloud.Client.Domains.Records(context.TODO(), domain, opt)
records, resp, err := cloud.DomainService().Records(context.TODO(), domain, opt)
if err != nil {
return nil, err
}
@ -270,12 +212,12 @@ func getAllRecordsByDomain(cloud *Cloud, domain string) ([]godo.DomainRecord, er
}
func listLoadBalancers(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(*Cloud)
c := cloud.(do.DOCloud)
var resourceTrackers []*resources.Resource
clusterTag := "KubernetesCluster-Master:" + strings.Replace(clusterName, ".", "-", -1)
lbs, err := getAllLoadBalancers(c)
lbs, err := c.GetAllLoadBalancers()
if err != nil {
return nil, fmt.Errorf("failed to list lbs: %v", err)
}
@ -303,42 +245,14 @@ func listLoadBalancers(cloud fi.Cloud, clusterName string) ([]*resources.Resourc
return resourceTrackers, nil
}
func getAllLoadBalancers(cloud *Cloud) ([]godo.LoadBalancer, error) {
allLoadBalancers := []godo.LoadBalancer{}
opt := &godo.ListOptions{}
for {
lbs, resp, err := cloud.LoadBalancers().List(context.TODO(), opt)
if err != nil {
return nil, err
}
allLoadBalancers = append(allLoadBalancers, lbs...)
if resp.Links == nil || resp.Links.IsLastPage() {
break
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}
opt.Page = page + 1
}
return allLoadBalancers, nil
}
func deleteDroplet(cloud fi.Cloud, t *resources.Resource) error {
c := cloud.(*Cloud)
c := cloud.(do.DOCloud)
dropletID, err := strconv.Atoi(t.ID)
if err != nil {
return fmt.Errorf("failed to convert droplet ID to int: %s", err)
}
_, err = c.Droplets().Delete(context.TODO(), dropletID)
_, err = c.DropletsService().Delete(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("failed to delete droplet: %d, err: %s", dropletID, err)
}
@ -347,11 +261,10 @@ func deleteDroplet(cloud fi.Cloud, t *resources.Resource) error {
}
func deleteVolume(cloud fi.Cloud, t *resources.Resource) error {
c := cloud.(*Cloud)
c := cloud.(do.DOCloud)
volume := t.Obj.(godo.Volume)
for _, dropletID := range volume.DropletIDs {
action, _, err := c.VolumeActions().DetachByDropletID(context.TODO(), volume.ID, dropletID)
action, _, err := c.VolumeActionService().DetachByDropletID(context.TODO(), volume.ID, dropletID)
if err != nil {
return fmt.Errorf("failed to detach volume: %s, err: %s", volume.ID, err)
}
@ -360,7 +273,7 @@ func deleteVolume(cloud fi.Cloud, t *resources.Resource) error {
}
}
_, err := c.Volumes().DeleteVolume(context.TODO(), t.ID)
_, err := c.VolumeService().DeleteVolume(context.TODO(), t.ID)
if err != nil {
return fmt.Errorf("failed to delete volume: %s, err: %s", t.ID, err)
}
@ -369,10 +282,10 @@ func deleteVolume(cloud fi.Cloud, t *resources.Resource) error {
}
func deleteRecord(cloud fi.Cloud, domain string, t *resources.Resource) error {
c := cloud.(*Cloud)
c := cloud.(do.DOCloud)
record := t.Obj.(godo.DomainRecord)
_, err := c.Client.Domains.DeleteRecord(context.TODO(), domain, record.ID)
_, err := c.DomainService().DeleteRecord(context.TODO(), domain, record.ID)
if err != nil {
return fmt.Errorf("failed to delete record for domain %s: %d", domain, record.ID)
}
@ -381,9 +294,9 @@ func deleteRecord(cloud fi.Cloud, domain string, t *resources.Resource) error {
}
func deleteLoadBalancer(cloud fi.Cloud, t *resources.Resource) error {
c := cloud.(*Cloud)
c := cloud.(do.DOCloud)
lb := t.Obj.(godo.LoadBalancer)
_, err := c.Client.LoadBalancers.Delete(context.TODO(), lb.ID)
_, err := c.LoadBalancersService().Delete(context.TODO(), lb.ID)
if err != nil {
return fmt.Errorf("failed to delete load balancer with name %s %v", lb.Name, err)
@ -392,7 +305,7 @@ func deleteLoadBalancer(cloud fi.Cloud, t *resources.Resource) error {
return nil
}
func waitForDetach(cloud *Cloud, action *godo.Action) error {
func waitForDetach(cloud do.DOCloud, action *godo.Action) error {
timeout := time.After(10 * time.Second)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
@ -401,7 +314,7 @@ func waitForDetach(cloud *Cloud, action *godo.Action) error {
case <-timeout:
return errors.New("timed out waiting for volume to detach")
case <-ticker.C:
updatedAction, _, err := cloud.Client.Actions.Get(context.TODO(), action.ID)
updatedAction, _, err := cloud.ActionsService().Get(context.TODO(), action.ID)
if err != nil {
return err
}

View File

@ -21,6 +21,7 @@ go_library(
"//upup/pkg/fi/cloudup/aliup:go_default_library",
"//upup/pkg/fi/cloudup/awsup:go_default_library",
"//upup/pkg/fi/cloudup/azure:go_default_library",
"//upup/pkg/fi/cloudup/do:go_default_library",
"//upup/pkg/fi/cloudup/gce:go_default_library",
"//upup/pkg/fi/cloudup/openstack:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",

View File

@ -31,6 +31,7 @@ import (
cloudali "k8s.io/kops/upup/pkg/fi/cloudup/aliup"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
cloudazure "k8s.io/kops/upup/pkg/fi/cloudup/azure"
clouddo "k8s.io/kops/upup/pkg/fi/cloudup/do"
cloudgce "k8s.io/kops/upup/pkg/fi/cloudup/gce"
cloudopenstack "k8s.io/kops/upup/pkg/fi/cloudup/openstack"
)
@ -42,7 +43,7 @@ func ListResources(cloud fi.Cloud, cluster *kops.Cluster, region string) (map[st
case kops.CloudProviderAWS:
return aws.ListResourcesAWS(cloud.(awsup.AWSCloud), clusterName)
case kops.CloudProviderDO:
return digitalocean.ListResources(cloud.(*digitalocean.Cloud), clusterName)
return digitalocean.ListResources(cloud.(clouddo.DOCloud), clusterName)
case kops.CloudProviderGCE:
return gce.ListResourcesGCE(cloud.(cloudgce.GCECloud), clusterName, region)
case kops.CloudProviderOpenstack:

View File

@ -6,8 +6,8 @@ go_library(
importpath = "k8s.io/kops/protokube/pkg/gossip/do",
visibility = ["//visibility:public"],
deps = [
"//pkg/resources/digitalocean:go_default_library",
"//protokube/pkg/gossip:go_default_library",
"//vendor/github.com/digitalocean/godo:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

View File

@ -21,14 +21,14 @@ import (
"fmt"
"strings"
"github.com/digitalocean/godo"
"k8s.io/klog/v2"
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/protokube/pkg/gossip"
)
type SeedProvider struct {
cloud *digitalocean.Cloud
tag string
godoClient *godo.Client
tag string
}
var _ gossip.SeedProvider = &SeedProvider{}
@ -36,7 +36,7 @@ var _ gossip.SeedProvider = &SeedProvider{}
func (p *SeedProvider) GetSeeds() ([]string, error) {
var seeds []string
droplets, _, err := p.cloud.Droplets().List(context.TODO(), nil)
droplets, _, err := p.godoClient.Droplets.List(context.TODO(), nil)
if err != nil {
return nil, fmt.Errorf("Droplets.ListByTag returned error: %v", err)
@ -64,11 +64,11 @@ func (p *SeedProvider) GetSeeds() ([]string, error) {
return seeds, nil
}
func NewSeedProvider(cloud *digitalocean.Cloud, tag string) (*SeedProvider, error) {
func NewSeedProvider(godoClient *godo.Client, tag string) (*SeedProvider, error) {
klog.V(4).Infof("Trying new seed provider with cluster tag:%s", tag)
return &SeedProvider{
cloud: cloud,
tag: tag,
godoClient: godoClient,
tag: tag,
}, nil
}

View File

@ -32,7 +32,6 @@ go_library(
"//pkg/k8scodecs:go_default_library",
"//pkg/kubemanifest:go_default_library",
"//pkg/nodelabels:go_default_library",
"//pkg/resources/digitalocean:go_default_library",
"//protokube/pkg/etcd:go_default_library",
"//protokube/pkg/gossip:go_default_library",
"//protokube/pkg/gossip/ali:go_default_library",
@ -63,6 +62,7 @@ go_library(
"//vendor/github.com/digitalocean/godo:go_default_library",
"//vendor/github.com/gophercloud/gophercloud/openstack/blockstorage/v3/volumes:go_default_library",
"//vendor/github.com/gophercloud/gophercloud/openstack/compute/v2/extensions/volumeattach:go_default_library",
"//vendor/golang.org/x/oauth2:go_default_library",
"//vendor/google.golang.org/api/compute/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/rbac/v1beta1:go_default_library",

View File

@ -18,6 +18,7 @@ package protokube
import (
"context"
"errors"
"fmt"
"io/ioutil"
"net"
@ -28,8 +29,8 @@ import (
"time"
"github.com/digitalocean/godo"
"golang.org/x/oauth2"
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/protokube/pkg/gossip"
gossipdo "k8s.io/kops/protokube/pkg/gossip/do"
@ -44,9 +45,14 @@ const (
localDevicePrefix = "/dev/disk/by-id/scsi-0DO_Volume_"
)
// TokenSource implements oauth2.TokenSource
type TokenSource struct {
AccessToken string
}
type DOVolumes struct {
ClusterID string
Cloud *digitalocean.Cloud
ClusterID string
godoClient *godo.Client
region string
dropletName string
@ -103,7 +109,7 @@ func NewDOVolumes() (*DOVolumes, error) {
return nil, fmt.Errorf("failed to get droplet name: %s", err)
}
cloud, err := digitalocean.NewCloud(region)
godoClient, err := NewDOCloud()
if err != nil {
return nil, fmt.Errorf("failed to initialize digitalocean cloud: %s", err)
}
@ -119,7 +125,7 @@ func NewDOVolumes() (*DOVolumes, error) {
}
return &DOVolumes{
Cloud: cloud,
godoClient: godoClient,
ClusterID: clusterID,
dropletID: dropletIDInt,
dropletName: dropletName,
@ -128,9 +134,33 @@ func NewDOVolumes() (*DOVolumes, error) {
}, nil
}
// Token() returns oauth2.Token
func (t *TokenSource) Token() (*oauth2.Token, error) {
token := &oauth2.Token{
AccessToken: t.AccessToken,
}
return token, nil
}
func NewDOCloud() (*godo.Client, error) {
accessToken := os.Getenv("DIGITALOCEAN_ACCESS_TOKEN")
if accessToken == "" {
return nil, errors.New("DIGITALOCEAN_ACCESS_TOKEN is required")
}
tokenSource := &TokenSource{
AccessToken: accessToken,
}
oauthClient := oauth2.NewClient(context.TODO(), tokenSource)
client := godo.NewClient(oauthClient)
return client, nil
}
func (d *DOVolumes) AttachVolume(volume *Volume) error {
for {
action, _, err := d.Cloud.VolumeActions().Attach(context.TODO(), volume.ID, d.dropletID)
action, _, err := d.godoClient.StorageActions.Attach(context.TODO(), volume.ID, d.dropletID)
if err != nil {
return fmt.Errorf("error attaching volume: %s", err)
}
@ -158,7 +188,7 @@ func (d *DOVolumes) AttachVolume(volume *Volume) error {
}
func (d *DOVolumes) FindVolumes() ([]*Volume, error) {
doVolumes, err := getAllVolumesByRegion(d.Cloud, d.region)
doVolumes, err := getAllVolumesByRegion(d.godoClient, d.region)
if err != nil {
return nil, fmt.Errorf("failed to list volumes: %s", err)
}
@ -195,12 +225,12 @@ func (d *DOVolumes) FindVolumes() ([]*Volume, error) {
return volumes, nil
}
func getAllVolumesByRegion(cloud *digitalocean.Cloud, region string) ([]godo.Volume, error) {
func getAllVolumesByRegion(godoClient *godo.Client, region string) ([]godo.Volume, error) {
allVolumes := []godo.Volume{}
opt := &godo.ListOptions{}
for {
volumes, resp, err := cloud.Volumes().ListVolumes(context.TODO(), &godo.ListVolumeParams{
volumes, resp, err := godoClient.Storage.ListVolumes(context.TODO(), &godo.ListVolumeParams{
Region: region,
ListOptions: opt,
})
@ -243,7 +273,7 @@ func (d *DOVolumes) FindMountedVolume(volume *Volume) (string, error) {
}
func (d *DOVolumes) getVolumeByID(id string) (*godo.Volume, error) {
vol, _, err := d.Cloud.Volumes().GetVolume(context.TODO(), id)
vol, _, err := d.godoClient.Storage.GetVolume(context.TODO(), id)
return vol, err
}
@ -280,7 +310,7 @@ func getLocalDeviceName(vol *godo.Volume) string {
func (d *DOVolumes) GossipSeeds() (gossip.SeedProvider, error) {
for _, dropletTag := range d.dropletTags {
if strings.Contains(dropletTag, strings.Replace(d.ClusterID, ".", "-", -1)) {
return gossipdo.NewSeedProvider(d.Cloud, dropletTag)
return gossipdo.NewSeedProvider(d.godoClient, dropletTag)
}
}

View File

@ -53,7 +53,6 @@ go_library(
"//pkg/model/gcemodel:go_default_library",
"//pkg/model/iam:go_default_library",
"//pkg/model/openstackmodel:go_default_library",
"//pkg/resources/digitalocean:go_default_library",
"//pkg/resources/spotinst:go_default_library",
"//pkg/templates:go_default_library",
"//pkg/util/subnet:go_default_library",

View File

@ -54,7 +54,6 @@ import (
"k8s.io/kops/pkg/model/gcemodel"
"k8s.io/kops/pkg/model/iam"
"k8s.io/kops/pkg/model/openstackmodel"
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/pkg/templates"
"k8s.io/kops/pkg/wellknownports"
"k8s.io/kops/upup/models"
@ -683,7 +682,7 @@ func (c *ApplyClusterCmd) Run(ctx context.Context) error {
case kops.CloudProviderAWS:
target = awsup.NewAWSAPITarget(cloud.(awsup.AWSCloud))
case kops.CloudProviderDO:
target = do.NewDOAPITarget(cloud.(*digitalocean.Cloud))
target = do.NewDOAPITarget(cloud.(do.DOCloud))
case kops.CloudProviderOpenstack:
target = openstack.NewOpenstackAPITarget(cloud.(openstack.OpenstackCloud))
case kops.CloudProviderALI:

View File

@ -5,11 +5,21 @@ go_library(
srcs = [
"api_target.go",
"cloud.go",
"mock_do_cloud.go",
"utils.go",
],
importpath = "k8s.io/kops/upup/pkg/fi/cloudup/do",
visibility = ["//visibility:public"],
deps = [
"//pkg/resources/digitalocean:go_default_library",
"//dnsprovider/pkg/dnsprovider:go_default_library",
"//pkg/apis/kops:go_default_library",
"//pkg/cloudinstances:go_default_library",
"//pkg/resources/digitalocean/dns:go_default_library",
"//protokube/pkg/etcd:go_default_library",
"//upup/pkg/fi:go_default_library",
"//vendor/github.com/digitalocean/godo:go_default_library",
"//vendor/golang.org/x/oauth2:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

View File

@ -17,17 +17,16 @@ limitations under the License.
package do
import (
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/upup/pkg/fi"
)
type DOAPITarget struct {
Cloud *digitalocean.Cloud
Cloud DOCloud
}
var _ fi.Target = &DOAPITarget{}
func NewDOAPITarget(cloud *digitalocean.Cloud) *DOAPITarget {
func NewDOAPITarget(cloud DOCloud) *DOAPITarget {
return &DOAPITarget{
Cloud: cloud,
}

View File

@ -17,9 +17,23 @@ limitations under the License.
package do
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"time"
"k8s.io/kops/pkg/resources/digitalocean"
"github.com/digitalocean/godo"
"golang.org/x/oauth2"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kops/dnsprovider/pkg/dnsprovider"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/resources/digitalocean/dns"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/upup/pkg/fi"
)
@ -28,13 +42,524 @@ const TagNameEtcdClusterPrefix = "etcdCluster-"
const TagNameRolePrefix = "k8s.io/role/"
const TagKubernetesClusterNamePrefix = "KubernetesCluster"
const TagKubernetesClusterMasterPrefix = "KubernetesCluster-Master"
const TagKubernetesInstanceGroup = "kops-instancegroup"
func SafeClusterName(clusterName string) string {
// DO does not support . in tags / names
safeClusterName := strings.Replace(clusterName, ".", "-", -1)
return safeClusterName
type DOInstanceGroup struct {
ClusterName string
InstanceGroupName string
GroupType string // will be either "master" or "worker"
Members []string // will store the droplet names that matches.
}
func NewDOCloud(region string) (fi.Cloud, error) {
return digitalocean.NewCloud(region)
// TokenSource implements oauth2.TokenSource
type TokenSource struct {
AccessToken string
}
// DOCloud exposes all the interfaces required to operate on DigitalOcean resources
type DOCloud interface {
fi.Cloud
DropletsService() godo.DropletsService
DropletActionService() godo.DropletActionsService
VolumeService() godo.StorageService
VolumeActionService() godo.StorageActionsService
LoadBalancersService() godo.LoadBalancersService
DomainService() godo.DomainsService
ActionsService() godo.ActionsService
FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error)
GetAllLoadBalancers() ([]godo.LoadBalancer, error)
GetAllDropletsByTag(tag string) ([]godo.Droplet, error)
GetAllVolumesByRegion() ([]godo.Volume, error)
}
// static compile time check to validate DOCloud's fi.Cloud Interface.
var _ fi.Cloud = &doCloudImplementation{}
// doCloudImplementation holds the godo client object to interact with DO resources.
type doCloudImplementation struct {
Client *godo.Client
dns dnsprovider.Interface
// region holds the DO region.
region string
}
// Token() returns oauth2.Token
func (t *TokenSource) Token() (*oauth2.Token, error) {
token := &oauth2.Token{
AccessToken: t.AccessToken,
}
return token, nil
}
// NewCloud returns a Cloud, expecting the env var DIGITALOCEAN_ACCESS_TOKEN
// NewCloud will return an err if DIGITALOCEAN_ACCESS_TOKEN is not defined
func NewDOCloud(region string) (DOCloud, error) {
accessToken := os.Getenv("DIGITALOCEAN_ACCESS_TOKEN")
if accessToken == "" {
return nil, errors.New("DIGITALOCEAN_ACCESS_TOKEN is required")
}
tokenSource := &TokenSource{
AccessToken: accessToken,
}
oauthClient := oauth2.NewClient(context.TODO(), tokenSource)
client := godo.NewClient(oauthClient)
return &doCloudImplementation{
Client: client,
dns: dns.NewProvider(client),
region: region,
}, nil
}
func (c *doCloudImplementation) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
return getCloudGroups(c, cluster, instancegroups, warnUnmatched, nodes)
}
// DeleteGroup is not implemented yet, is a func that needs to delete a DO instance group.
func (c *doCloudImplementation) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
klog.V(8).Info("digitalocean cloud provider DeleteGroup not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support deleting cloud groups at this time")
}
func (c *doCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstance) error {
dropletID, err := strconv.Atoi(i.ID)
if err != nil {
return fmt.Errorf("failed to convert droplet ID to int: %s", err)
}
_, _, err = c.Client.DropletActions.Shutdown(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error stopping instance %q: %v", dropletID, err)
}
// Wait for 5 min to stop the instance
for i := 0; i < 5; i++ {
droplet, _, err := c.Client.Droplets.Get(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error describing instance %q: %v", dropletID, err)
}
klog.V(8).Infof("stopping DO instance %q, current Status: %q", droplet, droplet.Status)
if droplet.Status == "off" {
break
}
if i == 5 {
return fmt.Errorf("fail to stop DO instance %v in 5 mins", dropletID)
}
time.Sleep(time.Minute * 1)
}
_, err = c.Client.Droplets.Delete(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("error stopping instance %q: %v", dropletID, err)
}
klog.V(8).Infof("deleted droplet instance %q", dropletID)
return nil
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *doCloudImplementation) DetachInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("digitalocean cloud provider DetachInstance not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support surging")
}
// ProviderID returns the kops api identifier for DigitalOcean cloud provider
func (c *doCloudImplementation) ProviderID() kops.CloudProviderID {
return kops.CloudProviderDO
}
// Region returns the DO region we will target
func (c *doCloudImplementation) Region() string {
return c.region
}
func (c *doCloudImplementation) DNS() (dnsprovider.Interface, error) {
return c.dns, nil
}
// Volumes returns an implementation of godo.StorageService
func (c *doCloudImplementation) VolumeService() godo.StorageService {
return c.Client.Storage
}
// VolumeActions returns an implementation of godo.StorageActionsService
func (c *doCloudImplementation) VolumeActionService() godo.StorageActionsService {
return c.Client.StorageActions
}
// DropletsService returns the droplets client interface.
func (c *doCloudImplementation) DropletsService() godo.DropletsService {
return c.Client.Droplets
}
func (c *doCloudImplementation) DropletActionService() godo.DropletActionsService {
return c.Client.DropletActions
}
func (c *doCloudImplementation) LoadBalancersService() godo.LoadBalancersService {
return c.Client.LoadBalancers
}
func (c *doCloudImplementation) DomainService() godo.DomainsService {
return c.Client.Domains
}
func (c *doCloudImplementation) ActionsService() godo.ActionsService {
return c.Client.Actions
}
// FindVPCInfo is not implemented, it's only here to satisfy the fi.Cloud interface
func (c *doCloudImplementation) FindVPCInfo(id string) (*fi.VPCInfo, error) {
return nil, errors.New("not implemented")
}
func (c *doCloudImplementation) GetApiIngressStatus(cluster *kops.Cluster) ([]fi.ApiIngressStatus, error) {
var ingresses []fi.ApiIngressStatus
if cluster.Spec.MasterPublicName != "" {
// Note that this must match Digital Ocean's lb name
klog.V(2).Infof("Querying DO to find Loadbalancers for API (%q)", cluster.Name)
loadBalancers, err := c.GetAllLoadBalancers()
if err != nil {
return nil, fmt.Errorf("LoadBalancers.List returned error: %v", err)
}
lbName := "api-" + strings.Replace(cluster.Name, ".", "-", -1)
for _, lb := range loadBalancers {
if lb.Name == lbName {
klog.V(10).Infof("Matching LB name found for API (%q)", cluster.Name)
if lb.Status != "active" {
return nil, fmt.Errorf("load-balancer is not yet active (current status: %s)", lb.Status)
}
address := lb.IP
ingresses = append(ingresses, fi.ApiIngressStatus{IP: address})
return ingresses, nil
}
}
}
return nil, nil
}
// FindClusterStatus discovers the status of the cluster, by looking for the tagged etcd volumes
func (c *doCloudImplementation) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
etcdStatus, err := findEtcdStatus(c, cluster)
if err != nil {
return nil, err
}
status := &kops.ClusterStatus{
EtcdClusters: etcdStatus,
}
klog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status))
return status, nil
}
// findEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes
func findEtcdStatus(c *doCloudImplementation, cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) {
statusMap := make(map[string]*kops.EtcdClusterStatus)
volumes, err := c.GetAllVolumesByRegion()
if err != nil {
return nil, fmt.Errorf("failed to get all volumes by region from %s: %v", c.Region(), err)
}
for _, volume := range volumes {
volumeID := volume.ID
etcdClusterName := ""
var etcdClusterSpec *etcd.EtcdClusterSpec
for _, myTag := range volume.Tags {
klog.V(8).Infof("findEtcdStatus status (from cloud): checking if volume with tag %q belongs to cluster", myTag)
// check if volume belongs to this cluster.
// tag will be in the format "KubernetesCluster:dev5-k8s-local" (where clusterName is dev5.k8s.local)
clusterName := strings.Replace(cluster.Name, ".", "-", -1)
if strings.Contains(myTag, fmt.Sprintf("%s:%s", TagKubernetesClusterNamePrefix, clusterName)) {
klog.V(10).Infof("findEtcdStatus cluster comparison matched for tag: %v", myTag)
// this volume belongs to our cluster, add this to our etcdClusterSpec.
// loop through the tags again and
for _, volumeTag := range volume.Tags {
if strings.Contains(volumeTag, TagKubernetesClusterIndex) {
volumeTagParts := strings.Split(volumeTag, ":")
if len(volumeTagParts) < 2 {
return nil, fmt.Errorf("volume tag split failed, too few components for tag %q on volume %q", volumeTag, volume)
}
dropletIndex := volumeTagParts[1]
etcdClusterSpec, err = c.getEtcdClusterSpec(volume.Name, dropletIndex)
if err != nil {
return nil, fmt.Errorf("error parsing etcd cluster tag %q on volume %q: %v", volumeTag, volumeID, err)
}
klog.V(10).Infof("findEtcdStatus etcdClusterSpec: %v", fi.DebugAsJsonString(etcdClusterSpec))
etcdClusterName = etcdClusterSpec.ClusterKey
status := statusMap[etcdClusterName]
if status == nil {
status = &kops.EtcdClusterStatus{
Name: etcdClusterName,
}
statusMap[etcdClusterName] = status
}
memberName := etcdClusterSpec.NodeName
status.Members = append(status.Members, &kops.EtcdMemberStatus{
Name: memberName,
VolumeId: volume.ID,
})
}
}
}
}
}
status := make([]kops.EtcdClusterStatus, 0, len(statusMap))
for _, v := range statusMap {
status = append(status, *v)
}
return status, nil
}
func (c *doCloudImplementation) getEtcdClusterSpec(volumeName string, dropletName string) (*etcd.EtcdClusterSpec, error) {
var clusterKey string
if strings.Contains(volumeName, "etcd-main") {
clusterKey = "main"
} else if strings.Contains(volumeName, "etcd-events") {
clusterKey = "events"
} else {
return nil, fmt.Errorf("could not determine etcd cluster type for volume: %s", volumeName)
}
return &etcd.EtcdClusterSpec{
ClusterKey: clusterKey,
NodeName: dropletName,
NodeNames: []string{dropletName},
}, nil
}
func getCloudGroups(c *doCloudImplementation, cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
nodeMap := cloudinstances.GetNodeMap(nodes, cluster)
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
instanceGroups, err := findInstanceGroups(c, cluster.ObjectMeta.Name)
if err != nil {
return nil, fmt.Errorf("unable to find autoscale groups: %v", err)
}
for _, doGroup := range instanceGroups {
name := doGroup.InstanceGroupName
instancegroup, err := matchInstanceGroup(name, cluster.ObjectMeta.Name, instancegroups)
if err != nil {
return nil, fmt.Errorf("error getting instance group for doGroup %q", name)
}
if instancegroup == nil {
if warnUnmatched {
klog.Warningf("Found doGroup with no corresponding instance group %q", name)
}
continue
}
groups[instancegroup.ObjectMeta.Name], err = buildCloudInstanceGroup(c, instancegroup, doGroup, nodeMap)
if err != nil {
return nil, fmt.Errorf("error getting cloud instance group %q: %v", instancegroup.ObjectMeta.Name, err)
}
}
klog.V(8).Infof("Cloud Instance Group Info = %v", groups)
return groups, nil
}
// findInstanceGroups finds instance groups matching the specified tags
func findInstanceGroups(c *doCloudImplementation, clusterName string) ([]DOInstanceGroup, error) {
var result []DOInstanceGroup
instanceGroupMap := make(map[string][]string) // map of instance group name with droplet ids
clusterTag := "KubernetesCluster:" + strings.Replace(clusterName, ".", "-", -1)
droplets, err := c.GetAllDropletsByTag(clusterTag)
if err != nil {
return nil, fmt.Errorf("get all droplets for tag %s returned error. Error=%v", clusterTag, err)
}
instanceGroupName := ""
for _, droplet := range droplets {
doInstanceGroup, err := getDropletInstanceGroup(droplet.Tags)
if err != nil {
return nil, fmt.Errorf("get droplets Instance group for tags %v returned error. Error=%v", droplet.Tags, err)
}
instanceGroupName = fmt.Sprintf("%s-%s", clusterName, doInstanceGroup)
instanceGroupMap[instanceGroupName] = append(instanceGroupMap[instanceGroupName], strconv.Itoa(droplet.ID))
result = append(result, DOInstanceGroup{
InstanceGroupName: instanceGroupName,
GroupType: instanceGroupName,
ClusterName: clusterName,
Members: instanceGroupMap[instanceGroupName],
})
}
klog.V(8).Infof("InstanceGroup Info = %v", result)
return result, nil
}
func getDropletInstanceGroup(tags []string) (string, error) {
for _, tag := range tags {
klog.V(8).Infof("Check tag = %s", tag)
if strings.Contains(strings.ToLower(tag), TagKubernetesInstanceGroup) {
tagParts := strings.Split(tag, ":")
if len(tagParts) < 2 {
return "", fmt.Errorf("tag split failed, too few components for tag %q", tag)
}
return tagParts[1], nil
}
}
return "", fmt.Errorf("Didn't find k8s-instancegroup for tag %v", tags)
}
// matchInstanceGroup filters a list of instancegroups for recognized cloud groups
func matchInstanceGroup(name string, clusterName string, instancegroups []*kops.InstanceGroup) (*kops.InstanceGroup, error) {
var instancegroup *kops.InstanceGroup
for _, g := range instancegroups {
var groupName string
switch g.Spec.Role {
case kops.InstanceGroupRoleMaster, kops.InstanceGroupRoleNode:
groupName = clusterName + "-" + g.ObjectMeta.Name
default:
klog.Warningf("Ignoring InstanceGroup of unknown role %q", g.Spec.Role)
continue
}
if name == groupName {
if instancegroup != nil {
return nil, fmt.Errorf("found multiple instance groups matching servergrp %q", groupName)
}
instancegroup = g
}
}
return instancegroup, nil
}
func buildCloudInstanceGroup(c *doCloudImplementation, ig *kops.InstanceGroup, g DOInstanceGroup, nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error) {
cg := &cloudinstances.CloudInstanceGroup{
HumanName: g.InstanceGroupName,
InstanceGroup: ig,
Raw: g,
MinSize: int(fi.Int32Value(ig.Spec.MinSize)),
TargetSize: int(fi.Int32Value(ig.Spec.MinSize)),
MaxSize: int(fi.Int32Value(ig.Spec.MaxSize)),
}
for _, member := range g.Members {
// TODO use a hash of the godo.DropletCreateRequest fields to calculate the second parameter.
_, err := cg.NewCloudInstance(member, cloudinstances.CloudInstanceStatusUpToDate, nodeMap[member])
if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
}
}
return cg, nil
}
func (c *doCloudImplementation) GetAllLoadBalancers() ([]godo.LoadBalancer, error) {
allLoadBalancers := []godo.LoadBalancer{}
opt := &godo.ListOptions{}
for {
lbs, resp, err := c.LoadBalancersService().List(context.TODO(), opt)
if err != nil {
return nil, err
}
allLoadBalancers = append(allLoadBalancers, lbs...)
if resp.Links == nil || resp.Links.IsLastPage() {
break
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}
opt.Page = page + 1
}
return allLoadBalancers, nil
}
func (c *doCloudImplementation) GetAllDropletsByTag(tag string) ([]godo.Droplet, error) {
allDroplets := []godo.Droplet{}
opt := &godo.ListOptions{}
for {
droplets, resp, err := c.DropletsService().ListByTag(context.TODO(), tag, opt)
if err != nil {
return nil, err
}
allDroplets = append(allDroplets, droplets...)
if resp.Links == nil || resp.Links.IsLastPage() {
break
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}
opt.Page = page + 1
}
return allDroplets, nil
}
func (c *doCloudImplementation) GetAllVolumesByRegion() ([]godo.Volume, error) {
allVolumes := []godo.Volume{}
opt := &godo.ListOptions{}
for {
volumes, resp, err := c.VolumeService().ListVolumes(context.TODO(), &godo.ListVolumeParams{
Region: c.Region(),
ListOptions: opt,
})
if err != nil {
return nil, err
}
allVolumes = append(allVolumes, volumes...)
if resp.Links == nil || resp.Links.IsLastPage() {
break
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}
opt.Page = page + 1
}
return allVolumes, nil
}

View File

@ -0,0 +1,127 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package do
import (
"errors"
"fmt"
"github.com/digitalocean/godo"
v1 "k8s.io/api/core/v1"
"k8s.io/kops/dnsprovider/pkg/dnsprovider"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/resources/digitalocean/dns"
"k8s.io/kops/upup/pkg/fi"
)
var _ fi.Cloud = (*doCloudMockImplementation)(nil)
type doCloudMockImplementation struct {
Client *godo.Client
region string
}
func BuildMockDOCloud(region string) *doCloudMockImplementation {
return &doCloudMockImplementation{region: region, Client: godo.NewClient(nil)}
}
func (c *doCloudMockImplementation) ProviderID() kops.CloudProviderID {
return kops.CloudProviderDO
}
// Region returns the DO region we will target
func (c *doCloudMockImplementation) Region() string {
return c.region
}
func (c *doCloudMockImplementation) DNS() (dnsprovider.Interface, error) {
provider := dns.NewProvider(c.Client)
return provider, nil
}
// FindVPCInfo is not implemented, it's only here to satisfy the fi.Cloud interface
func (c *doCloudMockImplementation) FindVPCInfo(id string) (*fi.VPCInfo, error) {
return nil, errors.New("not implemented")
}
// DeleteGroup is not implemented yet, is a func that needs to delete a DO instance group.
func (c *doCloudMockImplementation) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
return fmt.Errorf("digital ocean cloud provider does not support deleting cloud groups at this time")
}
func (c *doCloudMockImplementation) DeleteInstance(instance *cloudinstances.CloudInstance) error {
return errors.New("not tested")
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *doCloudMockImplementation) DetachInstance(i *cloudinstances.CloudInstance) error {
return fmt.Errorf("digital ocean cloud provider does not support surging")
}
func (c *doCloudMockImplementation) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
return nil, errors.New("not tested")
}
// FindClusterStatus discovers the status of the cluster, by inspecting the cloud objects
func (c *doCloudMockImplementation) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
return nil, errors.New("not tested")
}
func (c *doCloudMockImplementation) GetApiIngressStatus(cluster *kops.Cluster) ([]fi.ApiIngressStatus, error) {
return nil, errors.New("not tested")
}
func (c *doCloudMockImplementation) DropletsService() godo.DropletsService {
return c.Client.Droplets
}
func (c *doCloudMockImplementation) DropletActionService() godo.DropletActionsService {
return c.Client.DropletActions
}
func (c *doCloudMockImplementation) VolumeService() godo.StorageService {
return c.Client.Storage
}
func (c *doCloudMockImplementation) VolumeActionService() godo.StorageActionsService {
return c.Client.StorageActions
}
func (c *doCloudMockImplementation) LoadBalancersService() godo.LoadBalancersService {
return c.Client.LoadBalancers
}
func (c *doCloudMockImplementation) DomainService() godo.DomainsService {
return c.Client.Domains
}
func (c *doCloudMockImplementation) ActionsService() godo.ActionsService {
return c.Client.Actions
}
func (c *doCloudMockImplementation) GetAllLoadBalancers() ([]godo.LoadBalancer, error) {
return nil, nil
}
func (c *doCloudMockImplementation) GetAllDropletsByTag(tag string) ([]godo.Droplet, error) {
return nil, nil
}
func (c *doCloudMockImplementation) GetAllVolumesByRegion() ([]godo.Volume, error) {
return nil, nil
}

View File

@ -0,0 +1,25 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package do
import "strings"
func SafeClusterName(clusterName string) string {
// DO does not support . in tags / names
safeClusterName := strings.Replace(clusterName, ".", "-", -1)
return safeClusterName
}

View File

@ -13,7 +13,6 @@ go_library(
importpath = "k8s.io/kops/upup/pkg/fi/cloudup/dotasks",
visibility = ["//visibility:public"],
deps = [
"//pkg/resources/digitalocean:go_default_library",
"//upup/pkg/fi:go_default_library",
"//upup/pkg/fi/cloudup/do:go_default_library",
"//upup/pkg/fi/cloudup/terraform:go_default_library",
@ -27,8 +26,8 @@ go_test(
srcs = ["volume_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/resources/digitalocean:go_default_library",
"//upup/pkg/fi:go_default_library",
"//upup/pkg/fi/cloudup/do:go_default_library",
"//vendor/github.com/digitalocean/godo:go_default_library",
],
)

View File

@ -23,7 +23,6 @@ import (
"github.com/digitalocean/godo"
"k8s.io/klog/v2"
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/do"
_ "k8s.io/kops/upup/pkg/fi/cloudup/terraform"
@ -53,7 +52,7 @@ func (d *Droplet) CompareWithID() *string {
}
func (d *Droplet) Find(c *fi.Context) (*Droplet, error) {
cloud := c.Cloud.(*digitalocean.Cloud)
cloud := c.Cloud.(do.DOCloud)
droplets, err := listDroplets(cloud)
if err != nil {
@ -88,12 +87,12 @@ func (d *Droplet) Find(c *fi.Context) (*Droplet, error) {
}, nil
}
func listDroplets(cloud *digitalocean.Cloud) ([]godo.Droplet, error) {
func listDroplets(cloud do.DOCloud) ([]godo.Droplet, error) {
allDroplets := []godo.Droplet{}
opt := &godo.ListOptions{}
for {
droplets, resp, err := cloud.Droplets().List(context.TODO(), opt)
droplets, resp, err := cloud.DropletsService().List(context.TODO(), opt)
if err != nil {
return nil, err
}
@ -145,7 +144,7 @@ func (_ *Droplet) RenderDO(t *do.DOAPITarget, a, e, changes *Droplet) error {
}
for i := 0; i < newDropletCount; i++ {
_, _, err = t.Cloud.Droplets().Create(context.TODO(), &godo.DropletCreateRequest{
_, _, err = t.Cloud.DropletsService().Create(context.TODO(), &godo.DropletCreateRequest{
Name: fi.StringValue(e.Name),
Region: fi.StringValue(e.Region),
Size: fi.StringValue(e.Size),

View File

@ -27,7 +27,6 @@ import (
"github.com/digitalocean/godo"
"k8s.io/klog/v2"
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/do"
)
@ -58,8 +57,8 @@ func (lb *LoadBalancer) Find(c *fi.Context) (*LoadBalancer, error) {
return nil, nil
}
cloud := c.Cloud.(*digitalocean.Cloud)
lbService := cloud.LoadBalancers()
cloud := c.Cloud.(do.DOCloud)
lbService := cloud.LoadBalancersService()
loadbalancer, _, err := lbService.Get(context.TODO(), fi.StringValue(lb.ID))
if err != nil {
@ -150,7 +149,7 @@ func (_ *LoadBalancer) RenderDO(t *do.DOAPITarget, a, e, changes *LoadBalancer)
// load balancer doesn't exist. Create one.
klog.V(10).Infof("Creating load balancer for DO")
loadBalancerService := t.Cloud.LoadBalancers()
loadBalancerService := t.Cloud.LoadBalancersService()
loadbalancer, _, err := loadBalancerService.Create(context.TODO(), &godo.LoadBalancerRequest{
Name: fi.StringValue(e.Name),
Region: fi.StringValue(e.Region),
@ -175,8 +174,8 @@ func (lb *LoadBalancer) IsForAPIServer() bool {
}
func (lb *LoadBalancer) FindIPAddress(c *fi.Context) (*string, error) {
cloud := c.Cloud.(*digitalocean.Cloud)
loadBalancerService := cloud.LoadBalancers()
cloud := c.Cloud.(do.DOCloud)
loadBalancerService := cloud.LoadBalancersService()
if len(fi.StringValue(lb.ID)) > 0 {
// able to retrieve ID.

View File

@ -23,7 +23,6 @@ import (
"github.com/digitalocean/godo"
"k8s.io/klog/v2"
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/do"
"k8s.io/kops/upup/pkg/fi/cloudup/terraform"
@ -47,8 +46,8 @@ func (v *Volume) CompareWithID() *string {
}
func (v *Volume) Find(c *fi.Context) (*Volume, error) {
cloud := c.Cloud.(*digitalocean.Cloud)
volService := cloud.Volumes()
cloud := c.Cloud.(do.DOCloud)
volService := cloud.VolumeService()
volumes, _, err := volService.ListVolumes(context.TODO(), &godo.ListVolumeParams{
Region: cloud.Region(),
@ -119,7 +118,7 @@ func (_ *Volume) RenderDO(t *do.DOAPITarget, a, e, changes *Volume) error {
tagArray = append(tagArray, fmt.Sprintf("%s:%s", k, v))
}
volService := t.Cloud.Volumes()
volService := t.Cloud.VolumeService()
_, _, err := volService.CreateVolume(context.TODO(), &godo.VolumeCreateRequest{
Name: fi.StringValue(e.Name),
Region: fi.StringValue(e.Region),

View File

@ -23,8 +23,8 @@ import (
"testing"
"github.com/digitalocean/godo"
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/do"
)
type fakeStorageClient struct {
@ -70,13 +70,6 @@ func (f fakeStorageClient) DeleteSnapshot(ctx context.Context, id string) (*godo
return f.deleteSnapshotFn(ctx, id)
}
func newCloud(client *godo.Client) *digitalocean.Cloud {
return &digitalocean.Cloud{
Client: client,
RegionName: "nyc1",
}
}
func newContext(cloud fi.Cloud) *fi.Context {
return &fi.Context{
Cloud: cloud,
@ -155,7 +148,7 @@ func Test_Find(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
cloud := newCloud(godo.NewClient(nil))
cloud := do.BuildMockDOCloud("nyc1")
cloud.Client.Storage = tc.storage
ctx := newContext(cloud)