Refactor cloud providers and remove unused code from Protokube

This commit is contained in:
Ciprian Hacman 2022-05-28 11:27:31 +03:00
parent c049e0a03b
commit a656804c8b
11 changed files with 112 additions and 1149 deletions

View File

@ -19,7 +19,6 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"net"
"os" "os"
"path" "path"
"strings" "strings"
@ -92,79 +91,62 @@ func run() error {
flags.AddGoFlagSet(flag.CommandLine) flags.AddGoFlagSet(flag.CommandLine)
flags.Parse(os.Args) flags.Parse(os.Args)
var volumes protokube.Volumes var cloudProvider protokube.CloudProvider
var internalIP net.IP
if cloud == "aws" { if cloud == "aws" {
awsVolumes, err := protokube.NewAWSVolumes() awsCloudProvider, err := protokube.NewAWSCloudProvider()
if err != nil { if err != nil {
klog.Errorf("Error initializing AWS: %q", err) klog.Errorf("Error initializing AWS: %q", err)
os.Exit(1) os.Exit(1)
} }
volumes = awsVolumes cloudProvider = awsCloudProvider
internalIP = awsVolumes.InternalIP()
} else if cloud == "digitalocean" { } else if cloud == "digitalocean" {
doVolumes, err := protokube.NewDOVolumes() doCloudProvider, err := protokube.NewDOCloudProvider()
if err != nil { if err != nil {
klog.Errorf("Error initializing DigitalOcean: %q", err) klog.Errorf("Error initializing DigitalOcean: %q", err)
os.Exit(1) os.Exit(1)
} }
volumes = doVolumes cloudProvider = doCloudProvider
internalIP, err = protokube.GetDropletInternalIP()
if err != nil {
klog.Errorf("Error getting droplet internal IP: %s", err)
os.Exit(1)
}
} else if cloud == "hetzner" { } else if cloud == "hetzner" {
hetznerVolumes, err := protokube.NewHetznerVolumes() hetznerCloudProvider, err := protokube.NewHetznerCloudProvider()
if err != nil { if err != nil {
klog.Errorf("error initializing Hetzner Cloud: %q", err) klog.Errorf("error initializing Hetzner Cloud: %q", err)
os.Exit(1) os.Exit(1)
} }
volumes = hetznerVolumes cloudProvider = hetznerCloudProvider
internalIP, err = hetznerVolumes.InternalIP()
if err != nil {
klog.Errorf("error getting server internal IP: %s", err)
os.Exit(1)
}
} else if cloud == "gce" { } else if cloud == "gce" {
gceVolumes, err := protokube.NewGCEVolumes() gceCloudProvider, err := protokube.NewGCECloudProvider()
if err != nil { if err != nil {
klog.Errorf("Error initializing GCE: %q", err) klog.Errorf("Error initializing GCE: %q", err)
os.Exit(1) os.Exit(1)
} }
volumes = gceVolumes cloudProvider = gceCloudProvider
internalIP = gceVolumes.InternalIP()
} else if cloud == "openstack" { } else if cloud == "openstack" {
klog.Info("Initializing openstack volumes") osCloudProvider, err := protokube.NewOpenStackCloudProvider()
osVolumes, err := protokube.NewOpenstackVolumes()
if err != nil { if err != nil {
klog.Errorf("Error initializing openstack: %q", err) klog.Errorf("Error initializing OpenStack: %q", err)
os.Exit(1) os.Exit(1)
} }
volumes = osVolumes cloudProvider = osCloudProvider
internalIP = osVolumes.InternalIP()
} else if cloud == "azure" { } else if cloud == "azure" {
klog.Info("Initializing Azure volumes") azureVolumes, err := protokube.NewAzureCloudProvider()
azureVolumes, err := protokube.NewAzureVolumes()
if err != nil { if err != nil {
klog.Errorf("Error initializing Azure: %q", err) klog.Errorf("Error initializing Azure: %q", err)
os.Exit(1) os.Exit(1)
} }
volumes = azureVolumes cloudProvider = azureVolumes
internalIP = azureVolumes.InternalIP()
} else { } else {
klog.Errorf("Unknown cloud %q", cloud) klog.Errorf("Unknown cloud %q", cloud)
os.Exit(1) os.Exit(1)
} }
internalIP := cloudProvider.InstanceInternalIP()
if internalIP == nil { if internalIP == nil {
klog.Errorf("Cannot determine internal IP") klog.Errorf("Cannot determine internal IP")
os.Exit(1) os.Exit(1)
@ -196,71 +178,31 @@ func run() error {
Path: path.Join(rootfs, "etc/hosts"), Path: path.Join(rootfs, "etc/hosts"),
} }
var gossipSeeds gossiputils.SeedProvider gossipName := cloudProvider.InstanceID()
var err error gossipSeeds, err := cloudProvider.GossipSeeds()
var gossipName string
if cloud == "aws" {
gossipSeeds, err = volumes.(*protokube.AWSVolumes).GossipSeeds()
if err != nil { if err != nil {
return err klog.Errorf("error finding gossip seeds: %w", err)
}
gossipName = volumes.(*protokube.AWSVolumes).InstanceID()
} else if cloud == "gce" {
gossipSeeds, err = volumes.(*protokube.GCEVolumes).GossipSeeds()
if err != nil {
return err
}
gossipName = volumes.(*protokube.GCEVolumes).InstanceName()
} else if cloud == "openstack" {
gossipSeeds, err = volumes.(*protokube.OpenstackVolumes).GossipSeeds()
if err != nil {
return err
}
gossipName = volumes.(*protokube.OpenstackVolumes).InstanceName()
} else if cloud == "digitalocean" {
gossipSeeds, err = volumes.(*protokube.DOVolumes).GossipSeeds()
if err != nil {
return err
}
gossipName = volumes.(*protokube.DOVolumes).InstanceName()
} else if cloud == "hetzner" {
gossipSeeds, err = volumes.(*protokube.HetznerVolumes).GossipSeeds()
if err != nil {
return err
}
gossipName = volumes.(*protokube.HetznerVolumes).InstanceID()
} else if cloud == "azure" {
gossipSeeds, err = volumes.(*protokube.AzureVolumes).GossipSeeds()
if err != nil {
return err
}
gossipName = volumes.(*protokube.AzureVolumes).InstanceID()
} else {
klog.Fatalf("seed provider for %q not yet implemented", cloud)
} }
channelName := "dns" channelName := "dns"
var gossipState gossiputils.GossipState gossipState, err := gossiputils.GetGossipState(gossipProtocol, gossipListen, channelName, gossipName, []byte(gossipSecret), gossipSeeds)
gossipState, err = gossiputils.GetGossipState(gossipProtocol, gossipListen, channelName, gossipName, []byte(gossipSecret), gossipSeeds)
if err != nil { if err != nil {
klog.Errorf("Error initializing gossip: %v", err) klog.Errorf("error initializing gossip: %w", err)
os.Exit(1) os.Exit(1)
} }
if gossipProtocolSecondary != "" { if gossipProtocolSecondary != "" {
secondaryGossipState, err := gossiputils.GetGossipState(gossipProtocolSecondary, gossipListenSecondary, channelName, gossipName, []byte(gossipSecretSecondary), gossipSeeds) secondaryGossipState, err := gossiputils.GetGossipState(gossipProtocolSecondary, gossipListenSecondary, channelName, gossipName, []byte(gossipSecretSecondary), gossipSeeds)
if err != nil { if err != nil {
klog.Errorf("Error initializing secondary gossip: %v", err) klog.Errorf("error initializing secondary gossip: %w", err)
os.Exit(1) os.Exit(1)
} }
gossipState = &gossiputils.MultiGossipState{ gossipState = &gossiputils.MultiGossipState{
Primary: gossipState, Primary: gossipState,
Secondary: secondaryGossipState, Secondary: secondaryGossipState,
} }
} }
go func() { go func() {
err := gossipState.Start() err := gossipState.Start()
if err != nil { if err != nil {

View File

@ -19,11 +19,7 @@ package protokube
import ( import (
"fmt" "fmt"
"net" "net"
"os"
"path/filepath"
"strings"
"sync" "sync"
"time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/ec2metadata"
@ -31,8 +27,6 @@ import (
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/protokube/pkg/gossip" "k8s.io/kops/protokube/pkg/gossip"
gossipaws "k8s.io/kops/protokube/pkg/gossip/aws" gossipaws "k8s.io/kops/protokube/pkg/gossip/aws"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup" "k8s.io/kops/upup/pkg/fi/cloudup/awsup"
@ -40,8 +34,8 @@ import (
var devices = []string{"/dev/xvdu", "/dev/xvdv", "/dev/xvdx", "/dev/xvdx", "/dev/xvdy", "/dev/xvdz"} var devices = []string{"/dev/xvdu", "/dev/xvdv", "/dev/xvdx", "/dev/xvdx", "/dev/xvdy", "/dev/xvdz"}
// AWSVolumes defines the aws volume implementation // AWSCloudProvider defines the AWS cloud provider implementation
type AWSVolumes struct { type AWSCloudProvider struct {
mutex sync.Mutex mutex sync.Mutex
clusterTag string clusterTag string
@ -53,11 +47,11 @@ type AWSVolumes struct {
zone string zone string
} }
var _ Volumes = &AWSVolumes{} var _ CloudProvider = &AWSCloudProvider{}
// NewAWSVolumes returns a new aws volume provider // NewAWSCloudProvider returns a new aws volume provider
func NewAWSVolumes() (*AWSVolumes, error) { func NewAWSCloudProvider() (*AWSCloudProvider, error) {
a := &AWSVolumes{ a := &AWSCloudProvider{
deviceMap: make(map[string]string), deviceMap: make(map[string]string),
} }
@ -100,11 +94,11 @@ func NewAWSVolumes() (*AWSVolumes, error) {
return a, nil return a, nil
} }
func (a *AWSVolumes) InternalIP() net.IP { func (a *AWSCloudProvider) InstanceInternalIP() net.IP {
return a.internalIP return a.internalIP
} }
func (a *AWSVolumes) discoverTags() error { func (a *AWSCloudProvider) discoverTags() error {
instance, err := a.describeInstance() instance, err := a.describeInstance()
if err != nil { if err != nil {
return err return err
@ -133,7 +127,7 @@ func (a *AWSVolumes) discoverTags() error {
return nil return nil
} }
func (a *AWSVolumes) describeInstance() (*ec2.Instance, error) { func (a *AWSCloudProvider) describeInstance() (*ec2.Instance, error) {
request := &ec2.DescribeInstancesInput{} request := &ec2.DescribeInstancesInput{}
request.InstanceIds = []*string{&a.instanceId} request.InstanceIds = []*string{&a.instanceId}
@ -155,298 +149,13 @@ func (a *AWSVolumes) describeInstance() (*ec2.Instance, error) {
return instances[0], nil return instances[0], nil
} }
func newEc2Filter(name string, value string) *ec2.Filter { func (a *AWSCloudProvider) GossipSeeds() (gossip.SeedProvider, error) {
filter := &ec2.Filter{
Name: aws.String(name),
Values: []*string{
aws.String(value),
},
}
return filter
}
func (a *AWSVolumes) findVolumes(request *ec2.DescribeVolumesInput) ([]*Volume, error) {
var volumes []*Volume
err := a.ec2.DescribeVolumesPages(request, func(p *ec2.DescribeVolumesOutput, lastPage bool) (shouldContinue bool) {
for _, v := range p.Volumes {
volumeID := aws.StringValue(v.VolumeId)
vol := &Volume{
ID: volumeID,
Info: VolumeInfo{
Description: volumeID,
},
}
state := aws.StringValue(v.State)
vol.Status = state
for _, attachment := range v.Attachments {
vol.AttachedTo = aws.StringValue(attachment.InstanceId)
if aws.StringValue(attachment.InstanceId) == a.instanceId {
vol.LocalDevice = aws.StringValue(attachment.Device)
}
}
// never mount root volumes
// these are volumes that aws sets aside for root volumes mount points
if vol.LocalDevice == "/dev/sda1" || vol.LocalDevice == "/dev/xvda" {
klog.Warningf("Not mounting: %q, since it is a root volume", vol.LocalDevice)
continue
}
skipVolume := false
for _, tag := range v.Tags {
k := aws.StringValue(tag.Key)
v := aws.StringValue(tag.Value)
switch k {
case awsup.TagClusterName, "Name":
{
// Ignore
}
// case TagNameMasterId:
// id, err := strconv.Atoi(v)
// if err != nil {
// klog.Warningf("error parsing master-id tag on volume %q %s=%s; skipping volume", volumeID, k, v)
// skipVolume = true
// } else {
// vol.Info.MasterID = id
// }
default:
if strings.HasPrefix(k, awsup.TagNameEtcdClusterPrefix) {
etcdClusterName := strings.TrimPrefix(k, awsup.TagNameEtcdClusterPrefix)
spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, v)
if err != nil {
// Fail safe
klog.Warningf("error parsing etcd cluster tag %q on volume %q; skipping volume: %v", v, volumeID, err)
skipVolume = true
}
vol.Info.EtcdClusters = append(vol.Info.EtcdClusters, spec)
} else if strings.HasPrefix(k, awsup.TagNameRolePrefix) {
// Ignore
} else if strings.HasPrefix(k, awsup.TagNameClusterOwnershipPrefix) {
// Ignore
} else {
klog.Warningf("unknown tag on volume %q: %s=%s", volumeID, k, v)
}
}
}
if !skipVolume {
volumes = append(volumes, vol)
}
}
return true
})
if err != nil {
return nil, fmt.Errorf("error querying for EC2 volumes: %v", err)
}
return volumes, nil
}
//func (a *AWSVolumes) FindMountedVolumes() ([]*Volume, error) {
// request := &ec2.DescribeVolumesInput{}
// request.Filters = []*ec2.Filter{
// newEc2Filter("tag:"+TagNameKubernetesCluster, a.clusterTag),
// newEc2Filter("tag-key", TagNameRoleMaster),
// newEc2Filter("attachment.instance-id", a.instanceId),
// }
//
// return a.findVolumes(request)
//}
//
//func (a *AWSVolumes) FindMountableVolumes() ([]*Volume, error) {
// request := &ec2.DescribeVolumesInput{}
// request.Filters = []*ec2.Filter{
// newEc2Filter("tag:"+TagNameKubernetesCluster, a.clusterTag),
// newEc2Filter("tag-key", TagNameRoleMaster),
// newEc2Filter("availability-zone", a.zone),
// }
//
// return a.findVolumes(request)
//}
func (a *AWSVolumes) FindVolumes() ([]*Volume, error) {
request := &ec2.DescribeVolumesInput{}
request.Filters = []*ec2.Filter{
newEc2Filter("tag:"+awsup.TagClusterName, a.clusterTag),
newEc2Filter("tag-key", awsup.TagNameRolePrefix+awsup.TagRoleMaster),
newEc2Filter("availability-zone", a.zone),
}
return a.findVolumes(request)
}
// FindMountedVolume implements Volumes::FindMountedVolume
func (v *AWSVolumes) FindMountedVolume(volume *Volume) (string, error) {
device := volume.LocalDevice
_, err := os.Stat(pathFor(device))
if err == nil {
return device, nil
}
if !os.IsNotExist(err) {
return "", fmt.Errorf("error checking for device %q: %v", device, err)
}
if volume.ID != "" {
expected := volume.ID
expected = "nvme-Amazon_Elastic_Block_Store_" + strings.Replace(expected, "-", "", -1)
// Look for nvme devices
// On AWS, nvme volumes are not mounted on a device path, but are instead mounted on an nvme device
// We must identify the correct volume by matching the nvme info
device, err := findNvmeVolume(expected)
if err != nil {
return "", fmt.Errorf("error checking for nvme volume %q: %v", expected, err)
}
if device != "" {
klog.Infof("found nvme volume %q at %q", expected, device)
return device, nil
}
}
return "", nil
}
func findNvmeVolume(findName string) (device string, err error) {
p := pathFor(filepath.Join("/dev/disk/by-id", findName))
stat, err := os.Lstat(p)
if err != nil {
if os.IsNotExist(err) {
klog.V(4).Infof("nvme path not found %q", p)
return "", nil
}
return "", fmt.Errorf("error getting stat of %q: %v", p, err)
}
if stat.Mode()&os.ModeSymlink != os.ModeSymlink {
klog.Warningf("nvme file %q found, but was not a symlink", p)
return "", nil
}
resolved, err := filepath.EvalSymlinks(p)
if err != nil {
return "", fmt.Errorf("error reading target of symlink %q: %v", p, err)
}
// Reverse pathFor
devPath := pathFor("/dev")
if strings.HasPrefix(resolved, devPath) {
resolved = strings.Replace(resolved, devPath, "/dev", 1)
}
if !strings.HasPrefix(resolved, "/dev") {
return "", fmt.Errorf("resolved symlink for %q was unexpected: %q", p, resolved)
}
return resolved, nil
}
// assignDevice picks a hopefully unused device and reserves it for the volume attachment
func (a *AWSVolumes) assignDevice(volumeID string) (string, error) {
a.mutex.Lock()
defer a.mutex.Unlock()
// TODO: Check for actual devices in use (like cloudprovider does)
for _, d := range devices {
if a.deviceMap[d] == "" {
a.deviceMap[d] = volumeID
return d, nil
}
}
return "", fmt.Errorf("All devices in use")
}
// releaseDevice releases the volume mapping lock; used when an attach was known to fail
func (a *AWSVolumes) releaseDevice(d string, volumeID string) {
a.mutex.Lock()
defer a.mutex.Unlock()
if a.deviceMap[d] != volumeID {
klog.Fatalf("deviceMap logic error: %q -> %q, not %q", d, a.deviceMap[d], volumeID)
}
a.deviceMap[d] = ""
}
// AttachVolume attaches the specified volume to this instance, returning the mountpoint & nil if successful
func (a *AWSVolumes) AttachVolume(volume *Volume) error {
volumeID := volume.ID
device := volume.LocalDevice
if device == "" {
d, err := a.assignDevice(volumeID)
if err != nil {
return err
}
device = d
request := &ec2.AttachVolumeInput{
Device: aws.String(device),
InstanceId: aws.String(a.instanceId),
VolumeId: aws.String(volumeID),
}
attachResponse, err := a.ec2.AttachVolume(request)
if err != nil {
return fmt.Errorf("Error attaching EBS volume %q: %v", volumeID, err)
}
klog.V(2).Infof("AttachVolume request returned %v", attachResponse)
}
// Wait (forever) for volume to attach or reach a failure-to-attach condition
for {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{&volumeID},
}
volumes, err := a.findVolumes(request)
if err != nil {
return fmt.Errorf("Error describing EBS volume %q: %v", volumeID, err)
}
if len(volumes) == 0 {
return fmt.Errorf("EBS volume %q disappeared during attach", volumeID)
}
if len(volumes) != 1 {
return fmt.Errorf("Multiple volumes found with id %q", volumeID)
}
v := volumes[0]
if v.AttachedTo != "" {
if v.AttachedTo == a.instanceId {
// TODO: Wait for device to appear?
volume.LocalDevice = device
return nil
}
a.releaseDevice(device, volumeID)
return fmt.Errorf("Unable to attach volume %q, was attached to %q", volumeID, v.AttachedTo)
}
switch v.Status {
case "attaching":
klog.V(2).Infof("Waiting for volume %q to be attached (currently %q)", volumeID, v.Status)
// continue looping
default:
return fmt.Errorf("Observed unexpected volume state %q", v.Status)
}
time.Sleep(10 * time.Second)
}
}
func (a *AWSVolumes) GossipSeeds() (gossip.SeedProvider, error) {
tags := make(map[string]string) tags := make(map[string]string)
tags[awsup.TagClusterName] = a.clusterTag tags[awsup.TagClusterName] = a.clusterTag
return gossipaws.NewSeedProvider(a.ec2, tags) return gossipaws.NewSeedProvider(a.ec2, tags)
} }
func (a *AWSVolumes) InstanceID() string { func (a *AWSCloudProvider) InstanceID() string {
return a.instanceId return a.instanceId
} }

View File

@ -38,8 +38,8 @@ type client interface {
var _ client = &gossipazure.Client{} var _ client = &gossipazure.Client{}
// AzureVolumes implements the Volumes interface for Azure. // AzureCloudProvider implements the CloudProvider interface for Azure.
type AzureVolumes struct { type AzureCloudProvider struct {
client client client client
clusterTag string clusterTag string
@ -47,10 +47,10 @@ type AzureVolumes struct {
internalIP net.IP internalIP net.IP
} }
var _ Volumes = &AzureVolumes{} var _ CloudProvider = &AzureCloudProvider{}
// NewAzureVolumes returns a new AzureVolumes. // NewAzureCloudProvider returns a new AzureCloudProvider.
func NewAzureVolumes() (*AzureVolumes, error) { func NewAzureCloudProvider() (*AzureCloudProvider, error) {
client, err := gossipazure.NewClient() client, err := gossipazure.NewClient()
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating a new Azure client: %s", err) return nil, fmt.Errorf("error creating a new Azure client: %s", err)
@ -72,7 +72,7 @@ func NewAzureVolumes() (*AzureVolumes, error) {
if internalIP == nil { if internalIP == nil {
return nil, fmt.Errorf("error querying internal IP") return nil, fmt.Errorf("error querying internal IP")
} }
return &AzureVolumes{ return &AzureCloudProvider{
client: client, client: client,
clusterTag: clusterTag, clusterTag: clusterTag,
instanceID: instanceID, instanceID: instanceID,
@ -80,37 +80,20 @@ func NewAzureVolumes() (*AzureVolumes, error) {
}, nil }, nil
} }
// InstanceID implements Volumes InstanceID. // InstanceID implements CloudProvider InstanceID.
func (a *AzureVolumes) InstanceID() string { func (a *AzureCloudProvider) InstanceID() string {
return a.instanceID return a.instanceID
} }
// InternalIP implements Volumes InternalIP. // InstanceInternalIP implements CloudProvider InstanceInternalIP.
func (a *AzureVolumes) InternalIP() net.IP { func (a *AzureCloudProvider) InstanceInternalIP() net.IP {
return a.internalIP return a.internalIP
} }
func (a *AzureVolumes) GossipSeeds() (gossip.SeedProvider, error) { // GossipSeeds implements CloudProvider GossipSeeds.
func (a *AzureCloudProvider) GossipSeeds() (gossip.SeedProvider, error) {
tags := map[string]string{ tags := map[string]string{
azure.TagClusterName: a.clusterTag, azure.TagClusterName: a.clusterTag,
} }
return gossipazure.NewSeedProvider(a.client, tags) return gossipazure.NewSeedProvider(a.client, tags)
} }
func (a *AzureVolumes) AttachVolume(volume *Volume) error {
// TODO(kenji): Implement this. We currently don't need to implement this
// as we let etcd-manager manage volumes, not protokube.
return nil
}
func (a *AzureVolumes) FindVolumes() ([]*Volume, error) {
// TODO(kenji): Implement this. We currently don't need to implement this
// as we let etcd-manager manage volumes, not protokube.
return nil, nil
}
func (a *AzureVolumes) FindMountedVolume(volume *Volume) (string, error) {
// TODO(kenji): Implement this. We currently don't need to implement this
// as we let etcd-manager manage volumes, not protokube.
return "", nil
}

View File

@ -26,7 +26,6 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/digitalocean/godo" "github.com/digitalocean/godo"
"golang.org/x/oauth2" "golang.org/x/oauth2"
@ -50,17 +49,18 @@ type TokenSource struct {
AccessToken string AccessToken string
} }
type DOVolumes struct { type DOCloudProvider struct {
ClusterID string ClusterID string
godoClient *godo.Client godoClient *godo.Client
region string region string
dropletName string dropletName string
dropletID int dropletID int
dropletIP net.IP
dropletTags []string dropletTags []string
} }
var _ Volumes = &DOVolumes{} var _ CloudProvider = &DOCloudProvider{}
func GetClusterID() (string, error) { func GetClusterID() (string, error) {
clusterID := "" clusterID := ""
@ -88,22 +88,27 @@ func GetClusterID() (string, error) {
return clusterID, fmt.Errorf("failed to get droplet clusterID") return clusterID, fmt.Errorf("failed to get droplet clusterID")
} }
func NewDOVolumes() (*DOVolumes, error) { func NewDOCloudProvider() (*DOCloudProvider, error) {
region, err := getMetadataRegion() region, err := getMetadataRegion()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get droplet region: %s", err) return nil, fmt.Errorf("failed to get droplet region: %s", err)
} }
dropletID, err := getMetadataDropletID() dropletIDStr, err := getMetadataDropletID()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get droplet id: %s", err) return nil, fmt.Errorf("failed to get droplet id: %s", err)
} }
dropletID, err := strconv.Atoi(dropletIDStr)
dropletIDInt, err := strconv.Atoi(dropletID)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to convert droplet ID to int: %s", err) return nil, fmt.Errorf("failed to convert droplet ID to int: %s", err)
} }
dropletIPStr, err := getMetadata(dropletInternalIPMetadataURL)
if err != nil {
return nil, fmt.Errorf("failed to get droplet ip: %s", err)
}
dropletIP := net.ParseIP(dropletIPStr)
dropletName, err := getMetadataDropletName() dropletName, err := getMetadataDropletName()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get droplet name: %s", err) return nil, fmt.Errorf("failed to get droplet name: %s", err)
@ -124,10 +129,11 @@ func NewDOVolumes() (*DOVolumes, error) {
return nil, fmt.Errorf("failed to get clusterID: %s", err) return nil, fmt.Errorf("failed to get clusterID: %s", err)
} }
return &DOVolumes{ return &DOCloudProvider{
godoClient: godoClient, godoClient: godoClient,
ClusterID: clusterID, ClusterID: clusterID,
dropletID: dropletIDInt, dropletID: dropletID,
dropletIP: dropletIP,
dropletName: dropletName, dropletName: dropletName,
region: region, region: region,
dropletTags: dropletTags, dropletTags: dropletTags,
@ -158,130 +164,13 @@ func NewDOCloud() (*godo.Client, error) {
return client, nil return client, nil
} }
func (d *DOVolumes) AttachVolume(volume *Volume) error {
for {
action, _, err := d.godoClient.StorageActions.Attach(context.TODO(), volume.ID, d.dropletID)
if err != nil {
return fmt.Errorf("error attaching volume: %s", err)
}
if action.Status != godo.ActionInProgress && action.Status != godo.ActionCompleted {
return fmt.Errorf("invalid status for digitalocean volume: %s", volume.ID)
}
doVolume, err := d.getVolumeByID(volume.ID)
if err != nil {
return fmt.Errorf("error getting volume status: %s", err)
}
if len(doVolume.DropletIDs) == 1 {
if doVolume.DropletIDs[0] != d.dropletID {
return fmt.Errorf("digitalocean volume %s is attached to another droplet", doVolume.ID)
}
volume.LocalDevice = getLocalDeviceName(doVolume)
return nil
}
time.Sleep(10 * time.Second)
}
}
func (d *DOVolumes) FindVolumes() ([]*Volume, error) {
doVolumes, err := getAllVolumesByRegion(d.godoClient, d.region)
if err != nil {
return nil, fmt.Errorf("failed to list volumes: %s", err)
}
var volumes []*Volume
for _, doVolume := range doVolumes {
// determine if this volume belongs to this cluster
// check for string d.ClusterID but with strings "." replaced with "-"
if !strings.Contains(doVolume.Name, strings.Replace(d.ClusterID, ".", "-", -1)) {
continue
}
vol := &Volume{
ID: doVolume.ID,
Info: VolumeInfo{
Description: doVolume.Description,
},
}
if len(doVolume.DropletIDs) == 1 {
vol.AttachedTo = strconv.Itoa(doVolume.DropletIDs[0])
vol.LocalDevice = getLocalDeviceName(&doVolume)
}
etcdClusterSpec, err := d.getEtcdClusterSpec(doVolume)
if err != nil {
return nil, fmt.Errorf("failed to get etcd cluster spec: %s", err)
}
vol.Info.EtcdClusters = append(vol.Info.EtcdClusters, etcdClusterSpec)
volumes = append(volumes, vol)
}
return volumes, nil
}
func getAllVolumesByRegion(godoClient *godo.Client, region string) ([]godo.Volume, error) {
allVolumes := []godo.Volume{}
opt := &godo.ListOptions{}
for {
volumes, resp, err := godoClient.Storage.ListVolumes(context.TODO(), &godo.ListVolumeParams{
Region: region,
ListOptions: opt,
})
if err != nil {
return nil, err
}
allVolumes = append(allVolumes, volumes...)
if resp.Links == nil || resp.Links.IsLastPage() {
break
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}
opt.Page = page + 1
}
return allVolumes, nil
}
func (d *DOVolumes) FindMountedVolume(volume *Volume) (string, error) {
device := volume.LocalDevice
_, err := os.Stat(pathFor(device))
if err == nil {
return device, nil
}
if !os.IsNotExist(err) {
return "", fmt.Errorf("error checking for device %q: %v", device, err)
}
return "", nil
}
func (d *DOVolumes) getVolumeByID(id string) (*godo.Volume, error) {
vol, _, err := d.godoClient.Storage.GetVolume(context.TODO(), id)
return vol, err
}
// getEtcdClusterSpec returns etcd.EtcdClusterSpec which holds // getEtcdClusterSpec returns etcd.EtcdClusterSpec which holds
// necessary information required for starting an etcd server. // necessary information required for starting an etcd server.
// DigitalOcean support on kops only supports single master setup for now // DigitalOcean support on kops only supports single master setup for now
// but in the future when it supports multiple masters this method be // but in the future when it supports multiple masters this method be
// updated to handle that case. // updated to handle that case.
// TODO: use tags once it's supported for volumes // TODO: use tags once it's supported for volumes
func (d *DOVolumes) getEtcdClusterSpec(vol godo.Volume) (*etcd.EtcdClusterSpec, error) { func (d *DOCloudProvider) getEtcdClusterSpec(vol godo.Volume) (*etcd.EtcdClusterSpec, error) {
nodeName := d.dropletName nodeName := d.dropletName
var clusterKey string var clusterKey string
@ -300,11 +189,7 @@ func (d *DOVolumes) getEtcdClusterSpec(vol godo.Volume) (*etcd.EtcdClusterSpec,
}, nil }, nil
} }
func getLocalDeviceName(vol *godo.Volume) string { func (d *DOCloudProvider) GossipSeeds() (gossip.SeedProvider, error) {
return localDevicePrefix + vol.Name
}
func (d *DOVolumes) GossipSeeds() (gossip.SeedProvider, error) {
for _, dropletTag := range d.dropletTags { for _, dropletTag := range d.dropletTags {
if strings.Contains(dropletTag, strings.Replace(d.ClusterID, ".", "-", -1)) { if strings.Contains(dropletTag, strings.Replace(d.ClusterID, ".", "-", -1)) {
return gossipdo.NewSeedProvider(d.godoClient, dropletTag) return gossipdo.NewSeedProvider(d.godoClient, dropletTag)
@ -314,19 +199,12 @@ func (d *DOVolumes) GossipSeeds() (gossip.SeedProvider, error) {
return nil, fmt.Errorf("could not determine a matching droplet tag for gossip seeding") return nil, fmt.Errorf("could not determine a matching droplet tag for gossip seeding")
} }
func (d *DOVolumes) InstanceName() string { func (d *DOCloudProvider) InstanceID() string {
return d.dropletName return d.dropletName
} }
// GetDropletInternalIP gets the private IP of the droplet running this program func (d *DOCloudProvider) InstanceInternalIP() net.IP {
// This function is exported so it can be called from protokube return d.dropletIP
func GetDropletInternalIP() (net.IP, error) {
addr, err := getMetadata(dropletInternalIPMetadataURL)
if err != nil {
return nil, err
}
return net.ParseIP(addr), nil
} }
func getMetadataRegion() (string, error) { func getMetadataRegion() (string, error) {

View File

@ -17,23 +17,19 @@ limitations under the License.
package protokube package protokube
import ( import (
"context"
"fmt" "fmt"
"net" "net"
"os"
"strings" "strings"
"cloud.google.com/go/compute/metadata" "cloud.google.com/go/compute/metadata"
compute "google.golang.org/api/compute/v1" compute "google.golang.org/api/compute/v1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/protokube/pkg/gossip" "k8s.io/kops/protokube/pkg/gossip"
"k8s.io/kops/upup/pkg/fi/cloudup/gce"
"k8s.io/kops/upup/pkg/fi/cloudup/gce/gcediscovery" "k8s.io/kops/upup/pkg/fi/cloudup/gce/gcediscovery"
) )
// GCEVolumes is the Volumes implementation for GCE // GCECloudProvider is the CloudProvider implementation for GCE
type GCEVolumes struct { type GCECloudProvider struct {
compute *compute.Service compute *compute.Service
discovery *gcediscovery.Discovery discovery *gcediscovery.Discovery
@ -45,16 +41,16 @@ type GCEVolumes struct {
internalIP net.IP internalIP net.IP
} }
var _ Volumes = &GCEVolumes{} var _ CloudProvider = &GCECloudProvider{}
// NewGCEVolumes builds a GCEVolumes // NewGCECloudProvider builds a GCECloudProvider
func NewGCEVolumes() (*GCEVolumes, error) { func NewGCECloudProvider() (*GCECloudProvider, error) {
discovery, err := gcediscovery.New() discovery, err := gcediscovery.New()
if err != nil { if err != nil {
return nil, err return nil, err
} }
a := &GCEVolumes{ a := &GCECloudProvider{
discovery: discovery, discovery: discovery,
compute: discovery.Compute(), compute: discovery.Compute(),
} }
@ -68,16 +64,16 @@ func NewGCEVolumes() (*GCEVolumes, error) {
} }
// Project returns the current GCE project // Project returns the current GCE project
func (a *GCEVolumes) Project() string { func (a *GCECloudProvider) Project() string {
return a.project return a.project
} }
// InternalIP implements Volumes InternalIP // InstanceInternalIP implements CloudProvider InstanceInternalIP
func (a *GCEVolumes) InternalIP() net.IP { func (a *GCECloudProvider) InstanceInternalIP() net.IP {
return a.internalIP return a.internalIP
} }
func (a *GCEVolumes) discoverTags() error { func (a *GCECloudProvider) discoverTags() error {
// Cluster Name // Cluster Name
{ {
a.clusterName = a.discovery.ClusterName() a.clusterName = a.discovery.ClusterName()
@ -139,207 +135,10 @@ func (a *GCEVolumes) discoverTags() error {
return nil return nil
} }
func (v *GCEVolumes) buildGCEVolume(d *compute.Disk) (*Volume, error) { func (g *GCECloudProvider) GossipSeeds() (gossip.SeedProvider, error) {
volumeName := d.Name
vol := &Volume{
ID: volumeName,
Info: VolumeInfo{
Description: volumeName,
},
}
vol.Status = d.Status
for _, attachedTo := range d.Users {
u, err := gce.ParseGoogleCloudURL(attachedTo)
if err != nil {
return nil, fmt.Errorf("error parsing disk attachment url %q: %v", attachedTo, err)
}
vol.AttachedTo = u.Name
if u.Project == v.project && u.Zone == v.zone && u.Name == v.instanceName {
devicePath := "/dev/disk/by-id/google-" + volumeName
vol.LocalDevice = devicePath
klog.V(2).Infof("volume %q is attached to this instance at %s", d.Name, devicePath)
} else {
klog.V(2).Infof("volume %q is attached to another instance %q", d.Name, attachedTo)
}
}
for k, v := range d.Labels {
switch k {
case gce.GceLabelNameKubernetesCluster:
{
// Ignore
}
default:
if strings.HasPrefix(k, gce.GceLabelNameEtcdClusterPrefix) {
etcdClusterName := k[len(gce.GceLabelNameEtcdClusterPrefix):]
value, err := gce.DecodeGCELabel(v)
if err != nil {
return nil, fmt.Errorf("Error decoding GCE label: %s=%q", k, v)
}
spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, value)
if err != nil {
return nil, fmt.Errorf("error parsing etcd cluster label %q on volume %q: %v", value, volumeName, err)
}
vol.Info.EtcdClusters = append(vol.Info.EtcdClusters, spec)
} else if strings.HasPrefix(k, gce.GceLabelNameRolePrefix) {
// Ignore
} else {
klog.Warningf("unknown label on volume %q: %s=%s", volumeName, k, v)
}
}
}
return vol, nil
}
func (v *GCEVolumes) FindVolumes() ([]*Volume, error) {
var volumes []*Volume
klog.V(2).Infof("Listing GCE disks in %s/%s", v.project, v.zone)
// TODO: Apply filters
ctx := context.Background()
err := v.compute.Disks.List(v.project, v.zone).Pages(ctx, func(page *compute.DiskList) error {
for _, d := range page.Items {
klog.V(4).Infof("Found disk %q with labels %v", d.Name, d.Labels)
diskClusterName := d.Labels[gce.GceLabelNameKubernetesCluster]
if diskClusterName == "" {
klog.V(4).Infof("Skipping disk %q with no cluster name", d.Name)
continue
}
// Note that the cluster name is _not_ encoded with EncodeGCELabel
// this is because it is also used by k8s itself, e.g. in the route controller,
// and that is not encoded (issue #28436)
// Instead we use the much simpler SafeClusterName sanitizer
findClusterName := gce.SafeClusterName(v.clusterName)
if diskClusterName != findClusterName {
klog.V(2).Infof("Skipping disk %q with cluster name that does not match: %s=%s (looking for %s)", d.Name, gce.GceLabelNameKubernetesCluster, diskClusterName, findClusterName)
continue
}
roles := make(map[string]string)
for k, v := range d.Labels {
if strings.HasPrefix(k, gce.GceLabelNameRolePrefix) {
roleName := strings.TrimPrefix(k, gce.GceLabelNameRolePrefix)
value, err := gce.DecodeGCELabel(v)
if err != nil {
klog.Warningf("error decoding GCE role label: %s=%s", k, v)
continue
}
roles[roleName] = value
}
}
_, isMaster := roles["master"]
if !isMaster {
klog.V(2).Infof("Skipping disk %q - no master role", d.Name)
continue
}
vol, err := v.buildGCEVolume(d)
if err != nil {
// Fail safe
klog.Warningf("skipping malformed volume %q: %v", d.Name, err)
continue
}
volumes = append(volumes, vol)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("error querying GCE disks: %v", err)
}
//instance, err := v.compute.Instances.Get(v.project, v.zone, v.instanceName).Do()
//for _, d := range instance.Disks {
// var found *Volume
// source := d.Source
// for _, v := range volumes {
// if v.ID == source {
// if found != nil {
// return nil, fmt.Errorf("Found multiple volumes with name %q", v.ID)
// }
// found = v
// }
// }
//
// if found != nil {
// if d.DeviceName == "" {
// return fmt.Errorf("DeviceName for mounted disk %q was unexpected empty", d.Source)
// }
// found.LocalDevice = d.DeviceName
// }
//}
return volumes, nil
}
// FindMountedVolume implements Volumes::FindMountedVolume
func (v *GCEVolumes) FindMountedVolume(volume *Volume) (string, error) {
device := volume.LocalDevice
_, err := os.Stat(pathFor(device))
if err == nil {
return device, nil
}
if os.IsNotExist(err) {
return "", nil
}
return "", fmt.Errorf("error checking for device %q: %v", device, err)
}
// AttachVolume attaches the specified volume to this instance, returning the mountpoint & nil if successful
func (v *GCEVolumes) AttachVolume(volume *Volume) error {
volumeName := volume.ID
volumeURL := gce.GoogleCloudURL{
Project: v.project,
Zone: v.zone,
Name: volumeName,
Type: "disks",
}
attachedDisk := &compute.AttachedDisk{
DeviceName: volumeName,
// TODO: The k8s GCE provider sets Kind, but this seems wrong. Open an issue?
// Kind: disk.Kind,
Mode: "READ_WRITE",
Source: volumeURL.BuildURL(),
Type: "PERSISTENT",
}
attachOp, err := v.compute.Instances.AttachDisk(v.project, v.zone, v.instanceName, attachedDisk).Do()
if err != nil {
return fmt.Errorf("error attach disk %q: %v", volumeName, err)
}
err = gce.WaitForOp(v.compute, attachOp)
if err != nil {
return fmt.Errorf("error waiting for disk attach to complete %q: %v", volumeName, err)
}
devicePath := "/dev/disk/by-id/google-" + volumeName
// TODO: Wait for device to appear?
volume.LocalDevice = devicePath
return nil
}
func (g *GCEVolumes) GossipSeeds() (gossip.SeedProvider, error) {
return g.discovery, nil return g.discovery, nil
} }
func (g *GCEVolumes) InstanceName() string { func (g *GCECloudProvider) InstanceID() string {
return g.instanceName return g.instanceName
} }

View File

@ -21,7 +21,6 @@ import (
"fmt" "fmt"
"net" "net"
"os" "os"
"strconv"
"github.com/hetznercloud/hcloud-go/hcloud" "github.com/hetznercloud/hcloud-go/hcloud"
"github.com/hetznercloud/hcloud-go/hcloud/metadata" "github.com/hetznercloud/hcloud-go/hcloud/metadata"
@ -31,31 +30,17 @@ import (
"k8s.io/kops/upup/pkg/fi/cloudup/hetzner" "k8s.io/kops/upup/pkg/fi/cloudup/hetzner"
) )
// HetznerVolumes defines the Hetzner Cloud volume implementation. // HetznerCloudProvider defines the Hetzner Cloud volume implementation.
type HetznerVolumes struct { type HetznerCloudProvider struct {
hcloudClient *hcloud.Client hcloudClient *hcloud.Client
server *hcloud.Server server *hcloud.Server
serverIP net.IP
} }
func (h HetznerVolumes) AttachVolume(volume *Volume) error { var _ CloudProvider = &HetznerCloudProvider{}
// TODO(hakman): no longer needed, remove from interface
panic("implement me")
}
func (h HetznerVolumes) FindVolumes() ([]*Volume, error) { // NewHetznerCloudProvider returns a new Hetzner Cloud provider.
// TODO(hakman): no longer needed, remove from interface func NewHetznerCloudProvider() (*HetznerCloudProvider, error) {
panic("implement me")
}
func (h HetznerVolumes) FindMountedVolume(volume *Volume) (device string, err error) {
// TODO(hakman): no longer needed, remove from interface
panic("implement me")
}
var _ Volumes = &HetznerVolumes{}
// NewHetznerVolumes returns a new Hetzner Cloud volume provider.
func NewHetznerVolumes() (*HetznerVolumes, error) {
serverID, err := metadata.NewClient().InstanceID() serverID, err := metadata.NewClient().InstanceID()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to retrieve server id: %s", err) return nil, fmt.Errorf("failed to retrieve server id: %s", err)
@ -89,25 +74,20 @@ func NewHetznerVolumes() (*HetznerVolumes, error) {
return nil, fmt.Errorf("failed to find private net of the running server") return nil, fmt.Errorf("failed to find private net of the running server")
} }
h := &HetznerVolumes{ h := &HetznerCloudProvider{
hcloudClient: hcloudClient, hcloudClient: hcloudClient,
server: server, server: server,
serverIP: server.PrivateNet[0].IP,
} }
return h, nil return h, nil
} }
func (h HetznerVolumes) InternalIP() (ip net.IP, err error) { func (h HetznerCloudProvider) InstanceInternalIP() net.IP {
if len(h.server.PrivateNet) == 0 { return h.serverIP
return nil, fmt.Errorf("failed to find server private ip address")
}
klog.V(4).Infof("Found first private IP of the running server: %s", h.server.PrivateNet[0].IP.String())
return h.server.PrivateNet[0].IP, nil
} }
func (h *HetznerVolumes) GossipSeeds() (gossip.SeedProvider, error) { func (h *HetznerCloudProvider) GossipSeeds() (gossip.SeedProvider, error) {
clusterName, ok := h.server.Labels[hetzner.TagKubernetesClusterName] clusterName, ok := h.server.Labels[hetzner.TagKubernetesClusterName]
if !ok { if !ok {
return nil, fmt.Errorf("failed to find cluster name label for running server: %v", h.server.Labels) return nil, fmt.Errorf("failed to find cluster name label for running server: %v", h.server.Labels)
@ -115,6 +95,6 @@ func (h *HetznerVolumes) GossipSeeds() (gossip.SeedProvider, error) {
return gossiphetzner.NewSeedProvider(h.hcloudClient, clusterName) return gossiphetzner.NewSeedProvider(h.hcloudClient, clusterName)
} }
func (h *HetznerVolumes) InstanceID() string { func (h *HetznerCloudProvider) InstanceID() string {
return strconv.Itoa(h.server.ID) return fmt.Sprintf("%s-%d", h.server.Name, h.server.ID)
} }

View File

@ -98,14 +98,3 @@ func (k *KubeBoot) syncOnce(ctx context.Context) error {
return nil return nil
} }
func pathFor(hostPath string) string {
if hostPath[0] != '/' {
klog.Fatalf("path was not absolute: %q", hostPath)
}
return RootFS + hostPath[1:]
}
func (k *KubeBoot) String() string {
return DebugString(k)
}

View File

@ -1,157 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package protokube
//
//import (
// "encoding/json"
// "fmt"
// "k8s.io/klog/v2"
// "io"
// "io/ioutil"
// "os"
// "os/exec"
// "path"
// "strings"
// "sync"
// "time"
//)
//
//const BootstrapDir = "/etc/kubernetes/bootstrap"
//
//type BootstrapTask struct {
// Command []string `json:"command"`
//}
//
//func (b *BootstrapTask) String() string {
// return DebugString(b)
//}
//
//// RunKubelet runs the bootstrap tasks, and watches them until they exit
//// Currently only one task is supported / will work properly
//func (k *KubeBoot) RunBootstrapTasks() error {
// bootstrapDir := pathFor(BootstrapDir)
//
// var dirs []os.FileInfo
// var err error
//
// for {
// dirs, err = ioutil.ReadDir(bootstrapDir)
// if err != nil {
// if os.IsNotExist(err) {
// dirs = nil
// } else {
// return fmt.Errorf("error listing %q: %v", bootstrapDir, err)
// }
// }
// if len(dirs) != 0 {
// break
// }
// klog.Infof("No entries found in %q", BootstrapDir)
// time.Sleep(10 * time.Second)
// }
//
// for _, dir := range dirs {
// if !dir.IsDir() {
// continue
// }
//
// p := path.Join(bootstrapDir, dir.Name())
// files, err := ioutil.ReadDir(p)
// if err != nil {
// return fmt.Errorf("error listing %q: %v", p, err)
// }
//
// if len(files) == 0 {
// klog.Infof("No files in %q; ignoring", p)
// continue
// }
//
// // TODO: Support more than one bootstrap task?
//
// // TODO: Have multiple proto-kubelet configurations to support recovery?
// // i.e. launch newest version that stays up?
//
// fp := path.Join(p, files[0].Name())
// err = k.runBootstrapTask(fp)
// if err != nil {
// return fmt.Errorf("error running bootstrap task %q: %v", fp, err)
// }
// }
//
// return nil
//}
//
//// RunKubelet runs a bootstrap task and watches it until it exits
//func (k *KubeBoot) runBootstrapTask(path string) error {
// // TODO: Use a file lock or similar to only start proto-kubelet if real-kubelet is not running?
//
// data, err := ioutil.ReadFile(path)
// if err != nil {
// return fmt.Errorf("error reading task %q: %v", path, err)
// }
//
// task := &BootstrapTask{}
//
// err = json.Unmarshal(data, task)
// if err != nil {
// return fmt.Errorf("error parsing task %q: %v", path, err)
// }
//
// name := task.Command[0]
// args := task.Command[1:]
//
// cmd := exec.Command(name, args...)
//
// stdout, err := cmd.StdoutPipe()
// if err != nil {
// return fmt.Errorf("error building stdout pipe: %v", err)
// }
// stderr, err := cmd.StderrPipe()
// if err != nil {
// return fmt.Errorf("error building stderr pipe: %v", err)
// }
//
// wg := new(sync.WaitGroup)
// wg.Add(2)
//
// err = cmd.Start()
// if err != nil {
// return fmt.Errorf("error starting command %q: %v", strings.Join(task.Command, " "), err)
// }
//
// go copyStream(os.Stdout, stdout, wg)
// go copyStream(os.Stderr, stderr, wg)
//
// wg.Wait()
//
// err = cmd.Wait()
// if err != nil {
// return fmt.Errorf("error from command %q: %v", task.Command, err)
// }
//
// return nil
//}
//
//func copyStream(dst io.Writer, src io.ReadCloser, waitGroup *sync.WaitGroup) {
// _, err := io.Copy(dst, src)
// if err != nil {
// // Not entirely sure if we need to do something special in this case?
// klog.Warningf("error copying stream: %v", err)
// }
// waitGroup.Done()
//}

View File

@ -22,13 +22,9 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"os"
"strings" "strings"
cinderv3 "github.com/gophercloud/gophercloud/openstack/blockstorage/v3/volumes"
"github.com/gophercloud/gophercloud/openstack/compute/v2/extensions/volumeattach"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/protokube/pkg/gossip" "k8s.io/kops/protokube/pkg/gossip"
gossipos "k8s.io/kops/protokube/pkg/gossip/openstack" gossipos "k8s.io/kops/protokube/pkg/gossip/openstack"
"k8s.io/kops/upup/pkg/fi/cloudup/openstack" "k8s.io/kops/upup/pkg/fi/cloudup/openstack"
@ -50,8 +46,8 @@ type InstanceMetadata struct {
ServerID string `json:"uuid"` ServerID string `json:"uuid"`
} }
// GCEVolumes is the Volumes implementation for GCE // OpenStackCloudProvider is the CloudProvider implementation for OpenStack
type OpenstackVolumes struct { type OpenStackCloudProvider struct {
cloud openstack.OpenstackCloud cloud openstack.OpenstackCloud
meta *InstanceMetadata meta *InstanceMetadata
@ -63,7 +59,7 @@ type OpenstackVolumes struct {
storageZone string storageZone string
} }
var _ Volumes = &OpenstackVolumes{} var _ CloudProvider = &OpenStackCloudProvider{}
func getLocalMetadata() (*InstanceMetadata, error) { func getLocalMetadata() (*InstanceMetadata, error) {
var meta InstanceMetadata var meta InstanceMetadata
@ -88,8 +84,8 @@ func getLocalMetadata() (*InstanceMetadata, error) {
return nil, err return nil, err
} }
// NewOpenstackVolumes builds a OpenstackVolume // NewOpenStackCloudProvider builds a OpenstackVolume
func NewOpenstackVolumes() (*OpenstackVolumes, error) { func NewOpenStackCloudProvider() (*OpenStackCloudProvider, error) {
metadata, err := getLocalMetadata() metadata, err := getLocalMetadata()
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to get server metadata: %v", err) return nil, fmt.Errorf("Failed to get server metadata: %v", err)
@ -101,10 +97,10 @@ func NewOpenstackVolumes() (*OpenstackVolumes, error) {
oscloud, err := openstack.NewOpenstackCloud(tags, nil, "protokube") oscloud, err := openstack.NewOpenstackCloud(tags, nil, "protokube")
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to initialize OpenstackVolumes: %v", err) return nil, fmt.Errorf("Failed to initialize OpenStackCloudProvider: %v", err)
} }
a := &OpenstackVolumes{ a := &OpenStackCloudProvider{
cloud: oscloud, cloud: oscloud,
meta: metadata, meta: metadata,
} }
@ -117,17 +113,17 @@ func NewOpenstackVolumes() (*OpenstackVolumes, error) {
return a, nil return a, nil
} }
// Project returns the current GCE project // Project returns the current OpenStack project
func (a *OpenstackVolumes) Project() string { func (a *OpenStackCloudProvider) Project() string {
return a.meta.ProjectID return a.meta.ProjectID
} }
// InternalIP implements Volumes InternalIP // InstanceInternalIP implements CloudProvider InstanceInternalIP
func (a *OpenstackVolumes) InternalIP() net.IP { func (a *OpenStackCloudProvider) InstanceInternalIP() net.IP {
return a.internalIP return a.internalIP
} }
func (a *OpenstackVolumes) discoverTags() error { func (a *OpenStackCloudProvider) discoverTags() error {
// Cluster Name // Cluster Name
{ {
a.clusterName = strings.TrimSpace(string(a.meta.UserMeta.ClusterName)) a.clusterName = strings.TrimSpace(string(a.meta.UserMeta.ClusterName))
@ -185,99 +181,10 @@ func (a *OpenstackVolumes) discoverTags() error {
return nil return nil
} }
func (v *OpenstackVolumes) buildOpenstackVolume(d *cinderv3.Volume) (*Volume, error) { func (g *OpenStackCloudProvider) GossipSeeds() (gossip.SeedProvider, error) {
volumeName := d.Name
vol := &Volume{
ID: d.ID,
Info: VolumeInfo{
Description: volumeName,
},
}
vol.Status = d.Status
for _, attachedTo := range d.Attachments {
vol.AttachedTo = attachedTo.HostName
if attachedTo.ServerID == v.meta.ServerID {
vol.LocalDevice = attachedTo.Device
}
}
// FIXME: Zone matters, broken in my env
for k, v := range d.Metadata {
if strings.HasPrefix(k, openstack.TagNameEtcdClusterPrefix) {
etcdClusterName := k[len(openstack.TagNameEtcdClusterPrefix):]
spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, v)
if err != nil {
return nil, fmt.Errorf("error parsing etcd cluster meta %q on volume %q: %v", v, d.Name, err)
}
vol.Info.EtcdClusters = append(vol.Info.EtcdClusters, spec)
}
}
return vol, nil
}
func (v *OpenstackVolumes) FindVolumes() ([]*Volume, error) {
var volumes []*Volume
klog.V(2).Infof("Listing Openstack disks in %s/%s", v.project, v.meta.AvailabilityZone)
vols, err := v.cloud.ListVolumes(cinderv3.ListOpts{
TenantID: v.project,
})
if err != nil {
return volumes, fmt.Errorf("FindVolumes: Failed to list volume.")
}
for _, volume := range vols {
if clusterName, ok := volume.Metadata[openstack.TagClusterName]; ok && clusterName == v.clusterName {
if _, isMasterRole := volume.Metadata[openstack.TagNameRolePrefix+"master"]; isMasterRole {
vol, err := v.buildOpenstackVolume(&volume)
if err != nil {
klog.Errorf("FindVolumes: Failed to build openstack volume %s: %v", volume.Name, err)
continue
}
volumes = append(volumes, vol)
}
}
}
return volumes, nil
}
// FindMountedVolume implements Volumes::FindMountedVolume
func (v *OpenstackVolumes) FindMountedVolume(volume *Volume) (string, error) {
device := volume.LocalDevice
_, err := os.Stat(pathFor(device))
if err == nil {
return device, nil
}
if os.IsNotExist(err) {
return "", nil
}
return "", fmt.Errorf("error checking for device %q: %v", device, err)
}
// AttachVolume attaches the specified volume to this instance, returning the mountpoint & nil if successful
func (v *OpenstackVolumes) AttachVolume(volume *Volume) error {
opts := volumeattach.CreateOpts{
VolumeID: volume.ID,
}
attachment, err := v.cloud.AttachVolume(v.meta.ServerID, opts)
if err != nil {
return fmt.Errorf("AttachVolume: failed to attach volume: %s", err)
}
volume.LocalDevice = attachment.Device
return nil
}
func (g *OpenstackVolumes) GossipSeeds() (gossip.SeedProvider, error) {
return gossipos.NewSeedProvider(g.cloud.ComputeClient(), g.clusterName, g.project) return gossipos.NewSeedProvider(g.cloud.ComputeClient(), g.clusterName, g.project)
} }
func (g *OpenstackVolumes) InstanceName() string { func (g *OpenStackCloudProvider) InstanceID() string {
return g.instanceName return g.instanceName
} }

View File

@ -1,30 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package protokube
import (
"encoding/json"
"fmt"
)
func DebugString(o interface{}) string {
b, err := json.Marshal(o)
if err != nil {
return fmt.Sprintf("error marshaling %T: %v", o, err)
}
return string(b)
}

View File

@ -17,50 +17,13 @@ limitations under the License.
package protokube package protokube
import ( import (
"k8s.io/kops/protokube/pkg/etcd" "net"
"k8s.io/kops/protokube/pkg/gossip"
) )
type Volumes interface { type CloudProvider interface {
AttachVolume(volume *Volume) error InstanceID() string
FindVolumes() ([]*Volume, error) InstanceInternalIP() net.IP
GossipSeeds() (gossip.SeedProvider, error)
// FindMountedVolume returns the device (e.g. /dev/sda) where the volume is mounted
// If not found, it returns "", nil
// On error, it returns "", err
FindMountedVolume(volume *Volume) (device string, err error)
}
type Volume struct {
// ID is the cloud-provider identifier for the volume
ID string
// LocalDevice is set if the volume is attached to the local machine
LocalDevice string
// AttachedTo is set to the ID of the machine the volume is attached to, or "" if not attached
AttachedTo string
// Mountpoint is the path on which the volume is mounted, if mounted
// It will likely be "/mnt/master-" + ID
Mountpoint string
// Status is a volume provider specific Status string; it makes it easier for the volume provider
Status string
Info VolumeInfo
}
func (v *Volume) String() string {
return DebugString(v)
}
type VolumeInfo struct {
Description string
// MasterID int
// TODO: Maybe the events cluster can just be a PetSet - do we need it for boot?
EtcdClusters []*etcd.EtcdClusterSpec
}
func (v *VolumeInfo) String() string {
return DebugString(v)
} }