Merge pull request #4916 from andrewsykim/digitalocean-resources

digitalocean: list/delete resources
This commit is contained in:
k8s-ci-robot 2018-04-05 05:50:08 -07:00 committed by GitHub
commit f30a8dbcce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 180 additions and 64 deletions

View File

@ -4,7 +4,6 @@ go_library(
name = "go_default_library",
srcs = [
"cloud.go",
"do.go",
"resources.go",
],
importpath = "k8s.io/kops/pkg/resources/digitalocean",

View File

@ -6,6 +6,7 @@ go_library(
importpath = "k8s.io/kops/pkg/resources/digitalocean/dns",
visibility = ["//visibility:public"],
deps = [
"//dns-controller/pkg/dns:go_default_library",
"//dnsprovider/pkg/dnsprovider:go_default_library",
"//dnsprovider/pkg/dnsprovider/rrstype:go_default_library",
"//vendor/github.com/digitalocean/godo:go_default_library",

View File

@ -30,6 +30,7 @@ import (
"golang.org/x/oauth2"
"k8s.io/kops/dns-controller/pkg/dns"
"k8s.io/kops/dnsprovider/pkg/dnsprovider"
"k8s.io/kops/dnsprovider/pkg/dnsprovider/rrstype"
)
@ -188,8 +189,11 @@ func (r *resourceRecordSets) List() ([]dnsprovider.ResourceRecordSet, error) {
var rrset *resourceRecordSet
var rrsets []dnsprovider.ResourceRecordSet
for _, record := range records {
// digitalocean API returns the record without the zone
// but the consumers of this interface expect the zone to be included
recordName := dns.EnsureDotSuffix(record.Name) + r.Zone().Name()
rrset = &resourceRecordSet{
name: record.Name,
name: recordName,
data: record.Data,
ttl: record.TTL,
recordType: rrstype.RrsType(record.Type),
@ -318,26 +322,6 @@ func (r *resourceRecordChangeset) Apply() error {
return nil
}
if len(r.removals) > 0 {
records, err := getRecords(r.client, r.zone.Name())
if err != nil {
return err
}
for _, record := range r.removals {
for _, domainRecord := range records {
if domainRecord.Name == record.Name() {
err := deleteRecord(r.client, r.zone.Name(), domainRecord.ID)
if err != nil {
return fmt.Errorf("failed to delete record: %v", err)
}
}
}
}
glog.V(2).Infof("record change set removals complete")
}
if len(r.additions) > 0 {
for _, rrset := range r.additions {
err := r.applyResourceRecordSet(rrset)
@ -360,6 +344,26 @@ func (r *resourceRecordChangeset) Apply() error {
glog.V(2).Infof("record change set upserts complete")
}
if len(r.removals) > 0 {
records, err := getRecords(r.client, r.zone.Name())
if err != nil {
return err
}
for _, record := range r.removals {
for _, domainRecord := range records {
if domainRecord.Name == record.Name() {
err := deleteRecord(r.client, r.zone.Name(), domainRecord.ID)
if err != nil {
return fmt.Errorf("failed to delete record: %v", err)
}
}
}
}
glog.V(2).Infof("record change set removals complete")
}
glog.V(2).Infof("record change sets successfully applied")
return nil
}

View File

@ -338,7 +338,7 @@ func TestNewResourceRecordSet(t *testing.T) {
t.Errorf("unexpected number of records: %d", len(rrsets))
}
records, err := rrset.Get("test")
records, err := rrset.Get("test.example.com")
if err != nil {
t.Errorf("unexpected error getting resource record set: %v", err)
}
@ -347,7 +347,7 @@ func TestNewResourceRecordSet(t *testing.T) {
t.Errorf("unexpected records from resource record set: %d, expected 1 record", len(records))
}
if records[0].Name() != "test" {
if records[0].Name() != "test.example.com" {
t.Errorf("unexpected record name: %s, expected 'test'", records[0].Name())
}

View File

@ -1,31 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package digitalocean
import (
"k8s.io/kops/pkg/resources"
"k8s.io/kops/upup/pkg/fi"
)
func ListResources(cloud fi.Cloud, clusterName string) (map[string]*resources.Resource, error) {
r := Resources{
Cloud: cloud,
ClusterName: clusterName,
}
return r.ListResources()
}

View File

@ -17,21 +17,164 @@ limitations under the License.
package digitalocean
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/digitalocean/godo"
"k8s.io/kops/pkg/resources"
"k8s.io/kops/upup/pkg/fi"
)
type Resources struct {
Cloud fi.Cloud
ClusterName string
const (
resourceTypeDroplet = "droplet"
resourceTypeVolume = "volume"
)
type listFn func(fi.Cloud, string) ([]*resources.Resource, error)
func ListResources(cloud *Cloud, clusterName string) (map[string]*resources.Resource, error) {
resourceTrackers := make(map[string]*resources.Resource)
listFunctions := []listFn{
listVolumes,
listDroplets,
}
for _, fn := range listFunctions {
rt, err := fn(cloud, clusterName)
if err != nil {
return nil, err
}
for _, t := range rt {
resourceTrackers[t.Type+":"+t.ID] = t
}
}
return resourceTrackers, nil
}
// ListResources fetches all digitalocean resources into tracker.Resources
func (r *Resources) ListResources() (map[string]*resources.Resource, error) {
return nil, nil
func listDroplets(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(*Cloud)
var resourceTrackers []*resources.Resource
clusterTag := "KubernetesCluster:" + strings.Replace(clusterName, ".", "-", -1)
droplets, _, err := c.Droplets().ListByTag(context.TODO(), clusterTag, &godo.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list droplets: %v", err)
}
for _, droplet := range droplets {
resourceTracker := &resources.Resource{
Name: droplet.Name,
ID: strconv.Itoa(droplet.ID),
Type: resourceTypeDroplet,
Deleter: deleteDroplet,
Obj: droplet,
}
resourceTrackers = append(resourceTrackers, resourceTracker)
}
return resourceTrackers, nil
}
// DeleteResources deletes all resources passed in the form in tracker.Resources
func (r *Resources) DeleteResources(resources map[string]*resources.Resource) error {
func listVolumes(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(*Cloud)
var resourceTrackers []*resources.Resource
volumeMatch := strings.Replace(clusterName, ".", "-", -1)
volumes, _, err := c.Volumes().ListVolumes(context.TODO(), &godo.ListVolumeParams{
Region: c.Region,
})
if err != nil {
return nil, fmt.Errorf("failed to list volumes: %s", err)
}
for _, volume := range volumes {
if strings.Contains(volume.Name, volumeMatch) {
resourceTracker := &resources.Resource{
Name: volume.Name,
ID: volume.ID,
Type: resourceTypeVolume,
Deleter: deleteVolume,
Obj: volume,
}
var blocks []string
for _, dropletID := range volume.DropletIDs {
blocks = append(blocks, "droplet:"+strconv.Itoa(dropletID))
}
resourceTracker.Blocks = blocks
resourceTrackers = append(resourceTrackers, resourceTracker)
}
}
return resourceTrackers, nil
}
func deleteDroplet(cloud fi.Cloud, t *resources.Resource) error {
c := cloud.(*Cloud)
dropletID, err := strconv.Atoi(t.ID)
if err != nil {
return fmt.Errorf("failed to convert droplet ID to int: %s", err)
}
_, err = c.Droplets().Delete(context.TODO(), dropletID)
if err != nil {
return fmt.Errorf("failed to delete droplet: %d, err: %s", dropletID, err)
}
return nil
}
func deleteVolume(cloud fi.Cloud, t *resources.Resource) error {
c := cloud.(*Cloud)
volume := t.Obj.(godo.Volume)
for _, dropletID := range volume.DropletIDs {
action, _, err := c.VolumeActions().DetachByDropletID(context.TODO(), volume.ID, dropletID)
if err != nil {
return fmt.Errorf("failed to detach volume: %s, err: %s", volume.ID, err)
}
if err := waitForDetach(c, action); err != nil {
return fmt.Errorf("error while waiting for volume %s to detach: %s", volume.ID, err)
}
}
_, err := c.Volumes().DeleteVolume(context.TODO(), t.ID)
if err != nil {
return fmt.Errorf("failed to delete volume: %s, err: %s", t.ID, err)
}
return nil
}
func waitForDetach(cloud *Cloud, action *godo.Action) error {
timeout := time.After(10 * time.Second)
tick := time.Tick(500 * time.Millisecond)
for {
select {
case <-timeout:
return errors.New("timed out waiting for volume to detach")
case <-tick:
updatedAction, _, err := cloud.Client.Actions.Get(context.TODO(), action.ID)
if err != nil {
return err
}
if updatedAction.Status == godo.ActionCompleted {
return nil
}
}
}
}

View File

@ -36,7 +36,7 @@ func ListResources(cloud fi.Cloud, clusterName string, region string) (map[strin
case kops.CloudProviderAWS:
return aws.ListResourcesAWS(cloud.(awsup.AWSCloud), clusterName)
case kops.CloudProviderDO:
return digitalocean.ListResources(cloud, clusterName)
return digitalocean.ListResources(cloud.(*digitalocean.Cloud), clusterName)
case kops.CloudProviderGCE:
return gce.ListResourcesGCE(cloud.(cloudgce.GCECloud), clusterName, region)
case kops.CloudProviderVSphere: