diff --git a/protokube/cmd/protokube/main.go b/protokube/cmd/protokube/main.go index a94444002d..4c93afb134 100644 --- a/protokube/cmd/protokube/main.go +++ b/protokube/cmd/protokube/main.go @@ -19,7 +19,6 @@ package main import ( "flag" "fmt" - "net" "os" "path" "strings" @@ -92,79 +91,62 @@ func run() error { flags.AddGoFlagSet(flag.CommandLine) flags.Parse(os.Args) - var volumes protokube.Volumes - var internalIP net.IP - + var cloudProvider protokube.CloudProvider if cloud == "aws" { - awsVolumes, err := protokube.NewAWSVolumes() + awsCloudProvider, err := protokube.NewAWSCloudProvider() if err != nil { klog.Errorf("Error initializing AWS: %q", err) os.Exit(1) } - volumes = awsVolumes - internalIP = awsVolumes.InternalIP() + cloudProvider = awsCloudProvider } else if cloud == "digitalocean" { - doVolumes, err := protokube.NewDOVolumes() + doCloudProvider, err := protokube.NewDOCloudProvider() if err != nil { klog.Errorf("Error initializing DigitalOcean: %q", err) os.Exit(1) } - volumes = doVolumes - internalIP, err = protokube.GetDropletInternalIP() - if err != nil { - klog.Errorf("Error getting droplet internal IP: %s", err) - os.Exit(1) - } + cloudProvider = doCloudProvider } else if cloud == "hetzner" { - hetznerVolumes, err := protokube.NewHetznerVolumes() + hetznerCloudProvider, err := protokube.NewHetznerCloudProvider() if err != nil { klog.Errorf("error initializing Hetzner Cloud: %q", err) os.Exit(1) } - volumes = hetznerVolumes - internalIP, err = hetznerVolumes.InternalIP() - if err != nil { - klog.Errorf("error getting server internal IP: %s", err) - os.Exit(1) - } + cloudProvider = hetznerCloudProvider } else if cloud == "gce" { - gceVolumes, err := protokube.NewGCEVolumes() + gceCloudProvider, err := protokube.NewGCECloudProvider() if err != nil { klog.Errorf("Error initializing GCE: %q", err) os.Exit(1) } - volumes = gceVolumes - internalIP = gceVolumes.InternalIP() + cloudProvider = gceCloudProvider } else if cloud == "openstack" { - klog.Info("Initializing openstack volumes") - osVolumes, err := protokube.NewOpenstackVolumes() + osCloudProvider, err := protokube.NewOpenStackCloudProvider() if err != nil { - klog.Errorf("Error initializing openstack: %q", err) + klog.Errorf("Error initializing OpenStack: %q", err) os.Exit(1) } - volumes = osVolumes - internalIP = osVolumes.InternalIP() + cloudProvider = osCloudProvider } else if cloud == "azure" { - klog.Info("Initializing Azure volumes") - azureVolumes, err := protokube.NewAzureVolumes() + azureVolumes, err := protokube.NewAzureCloudProvider() if err != nil { klog.Errorf("Error initializing Azure: %q", err) os.Exit(1) } - volumes = azureVolumes - internalIP = azureVolumes.InternalIP() + cloudProvider = azureVolumes } else { klog.Errorf("Unknown cloud %q", cloud) os.Exit(1) } + internalIP := cloudProvider.InstanceInternalIP() if internalIP == nil { klog.Errorf("Cannot determine internal IP") os.Exit(1) @@ -196,71 +178,31 @@ func run() error { Path: path.Join(rootfs, "etc/hosts"), } - var gossipSeeds gossiputils.SeedProvider - var err error - var gossipName string - if cloud == "aws" { - gossipSeeds, err = volumes.(*protokube.AWSVolumes).GossipSeeds() - if err != nil { - return err - } - gossipName = volumes.(*protokube.AWSVolumes).InstanceID() - } else if cloud == "gce" { - gossipSeeds, err = volumes.(*protokube.GCEVolumes).GossipSeeds() - if err != nil { - return err - } - gossipName = volumes.(*protokube.GCEVolumes).InstanceName() - } else if cloud == "openstack" { - gossipSeeds, err = volumes.(*protokube.OpenstackVolumes).GossipSeeds() - if err != nil { - return err - } - gossipName = volumes.(*protokube.OpenstackVolumes).InstanceName() - } else if cloud == "digitalocean" { - gossipSeeds, err = volumes.(*protokube.DOVolumes).GossipSeeds() - if err != nil { - return err - } - gossipName = volumes.(*protokube.DOVolumes).InstanceName() - } else if cloud == "hetzner" { - gossipSeeds, err = volumes.(*protokube.HetznerVolumes).GossipSeeds() - if err != nil { - return err - } - gossipName = volumes.(*protokube.HetznerVolumes).InstanceID() - } else if cloud == "azure" { - gossipSeeds, err = volumes.(*protokube.AzureVolumes).GossipSeeds() - if err != nil { - return err - } - gossipName = volumes.(*protokube.AzureVolumes).InstanceID() - } else { - klog.Fatalf("seed provider for %q not yet implemented", cloud) + gossipName := cloudProvider.InstanceID() + gossipSeeds, err := cloudProvider.GossipSeeds() + if err != nil { + klog.Errorf("error finding gossip seeds: %w", err) } channelName := "dns" - var gossipState gossiputils.GossipState - - gossipState, err = gossiputils.GetGossipState(gossipProtocol, gossipListen, channelName, gossipName, []byte(gossipSecret), gossipSeeds) + gossipState, err := gossiputils.GetGossipState(gossipProtocol, gossipListen, channelName, gossipName, []byte(gossipSecret), gossipSeeds) if err != nil { - klog.Errorf("Error initializing gossip: %v", err) + klog.Errorf("error initializing gossip: %w", err) os.Exit(1) } if gossipProtocolSecondary != "" { - secondaryGossipState, err := gossiputils.GetGossipState(gossipProtocolSecondary, gossipListenSecondary, channelName, gossipName, []byte(gossipSecretSecondary), gossipSeeds) if err != nil { - klog.Errorf("Error initializing secondary gossip: %v", err) + klog.Errorf("error initializing secondary gossip: %w", err) os.Exit(1) } - gossipState = &gossiputils.MultiGossipState{ Primary: gossipState, Secondary: secondaryGossipState, } } + go func() { err := gossipState.Start() if err != nil { diff --git a/protokube/pkg/protokube/aws_volume.go b/protokube/pkg/protokube/aws_volume.go index d7865f5654..f8358adc7e 100644 --- a/protokube/pkg/protokube/aws_volume.go +++ b/protokube/pkg/protokube/aws_volume.go @@ -19,11 +19,7 @@ package protokube import ( "fmt" "net" - "os" - "path/filepath" - "strings" "sync" - "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/ec2metadata" @@ -31,8 +27,6 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" "k8s.io/klog/v2" - - "k8s.io/kops/protokube/pkg/etcd" "k8s.io/kops/protokube/pkg/gossip" gossipaws "k8s.io/kops/protokube/pkg/gossip/aws" "k8s.io/kops/upup/pkg/fi/cloudup/awsup" @@ -40,8 +34,8 @@ import ( var devices = []string{"/dev/xvdu", "/dev/xvdv", "/dev/xvdx", "/dev/xvdx", "/dev/xvdy", "/dev/xvdz"} -// AWSVolumes defines the aws volume implementation -type AWSVolumes struct { +// AWSCloudProvider defines the AWS cloud provider implementation +type AWSCloudProvider struct { mutex sync.Mutex clusterTag string @@ -53,11 +47,11 @@ type AWSVolumes struct { zone string } -var _ Volumes = &AWSVolumes{} +var _ CloudProvider = &AWSCloudProvider{} -// NewAWSVolumes returns a new aws volume provider -func NewAWSVolumes() (*AWSVolumes, error) { - a := &AWSVolumes{ +// NewAWSCloudProvider returns a new aws volume provider +func NewAWSCloudProvider() (*AWSCloudProvider, error) { + a := &AWSCloudProvider{ deviceMap: make(map[string]string), } @@ -100,11 +94,11 @@ func NewAWSVolumes() (*AWSVolumes, error) { return a, nil } -func (a *AWSVolumes) InternalIP() net.IP { +func (a *AWSCloudProvider) InstanceInternalIP() net.IP { return a.internalIP } -func (a *AWSVolumes) discoverTags() error { +func (a *AWSCloudProvider) discoverTags() error { instance, err := a.describeInstance() if err != nil { return err @@ -133,7 +127,7 @@ func (a *AWSVolumes) discoverTags() error { return nil } -func (a *AWSVolumes) describeInstance() (*ec2.Instance, error) { +func (a *AWSCloudProvider) describeInstance() (*ec2.Instance, error) { request := &ec2.DescribeInstancesInput{} request.InstanceIds = []*string{&a.instanceId} @@ -155,298 +149,13 @@ func (a *AWSVolumes) describeInstance() (*ec2.Instance, error) { return instances[0], nil } -func newEc2Filter(name string, value string) *ec2.Filter { - filter := &ec2.Filter{ - Name: aws.String(name), - Values: []*string{ - aws.String(value), - }, - } - return filter -} - -func (a *AWSVolumes) findVolumes(request *ec2.DescribeVolumesInput) ([]*Volume, error) { - var volumes []*Volume - err := a.ec2.DescribeVolumesPages(request, func(p *ec2.DescribeVolumesOutput, lastPage bool) (shouldContinue bool) { - for _, v := range p.Volumes { - volumeID := aws.StringValue(v.VolumeId) - vol := &Volume{ - ID: volumeID, - Info: VolumeInfo{ - Description: volumeID, - }, - } - state := aws.StringValue(v.State) - - vol.Status = state - - for _, attachment := range v.Attachments { - vol.AttachedTo = aws.StringValue(attachment.InstanceId) - if aws.StringValue(attachment.InstanceId) == a.instanceId { - vol.LocalDevice = aws.StringValue(attachment.Device) - } - } - - // never mount root volumes - // these are volumes that aws sets aside for root volumes mount points - if vol.LocalDevice == "/dev/sda1" || vol.LocalDevice == "/dev/xvda" { - klog.Warningf("Not mounting: %q, since it is a root volume", vol.LocalDevice) - continue - } - - skipVolume := false - - for _, tag := range v.Tags { - k := aws.StringValue(tag.Key) - v := aws.StringValue(tag.Value) - - switch k { - case awsup.TagClusterName, "Name": - { - // Ignore - } - // case TagNameMasterId: - // id, err := strconv.Atoi(v) - // if err != nil { - // klog.Warningf("error parsing master-id tag on volume %q %s=%s; skipping volume", volumeID, k, v) - // skipVolume = true - // } else { - // vol.Info.MasterID = id - // } - default: - if strings.HasPrefix(k, awsup.TagNameEtcdClusterPrefix) { - etcdClusterName := strings.TrimPrefix(k, awsup.TagNameEtcdClusterPrefix) - spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, v) - if err != nil { - // Fail safe - klog.Warningf("error parsing etcd cluster tag %q on volume %q; skipping volume: %v", v, volumeID, err) - skipVolume = true - } - vol.Info.EtcdClusters = append(vol.Info.EtcdClusters, spec) - } else if strings.HasPrefix(k, awsup.TagNameRolePrefix) { - // Ignore - } else if strings.HasPrefix(k, awsup.TagNameClusterOwnershipPrefix) { - // Ignore - } else { - klog.Warningf("unknown tag on volume %q: %s=%s", volumeID, k, v) - } - } - } - - if !skipVolume { - volumes = append(volumes, vol) - } - } - return true - }) - if err != nil { - return nil, fmt.Errorf("error querying for EC2 volumes: %v", err) - } - return volumes, nil -} - -//func (a *AWSVolumes) FindMountedVolumes() ([]*Volume, error) { -// request := &ec2.DescribeVolumesInput{} -// request.Filters = []*ec2.Filter{ -// newEc2Filter("tag:"+TagNameKubernetesCluster, a.clusterTag), -// newEc2Filter("tag-key", TagNameRoleMaster), -// newEc2Filter("attachment.instance-id", a.instanceId), -// } -// -// return a.findVolumes(request) -//} -// -//func (a *AWSVolumes) FindMountableVolumes() ([]*Volume, error) { -// request := &ec2.DescribeVolumesInput{} -// request.Filters = []*ec2.Filter{ -// newEc2Filter("tag:"+TagNameKubernetesCluster, a.clusterTag), -// newEc2Filter("tag-key", TagNameRoleMaster), -// newEc2Filter("availability-zone", a.zone), -// } -// -// return a.findVolumes(request) -//} - -func (a *AWSVolumes) FindVolumes() ([]*Volume, error) { - request := &ec2.DescribeVolumesInput{} - request.Filters = []*ec2.Filter{ - newEc2Filter("tag:"+awsup.TagClusterName, a.clusterTag), - newEc2Filter("tag-key", awsup.TagNameRolePrefix+awsup.TagRoleMaster), - newEc2Filter("availability-zone", a.zone), - } - - return a.findVolumes(request) -} - -// FindMountedVolume implements Volumes::FindMountedVolume -func (v *AWSVolumes) FindMountedVolume(volume *Volume) (string, error) { - device := volume.LocalDevice - - _, err := os.Stat(pathFor(device)) - if err == nil { - return device, nil - } - if !os.IsNotExist(err) { - return "", fmt.Errorf("error checking for device %q: %v", device, err) - } - - if volume.ID != "" { - expected := volume.ID - expected = "nvme-Amazon_Elastic_Block_Store_" + strings.Replace(expected, "-", "", -1) - - // Look for nvme devices - // On AWS, nvme volumes are not mounted on a device path, but are instead mounted on an nvme device - // We must identify the correct volume by matching the nvme info - device, err := findNvmeVolume(expected) - if err != nil { - return "", fmt.Errorf("error checking for nvme volume %q: %v", expected, err) - } - if device != "" { - klog.Infof("found nvme volume %q at %q", expected, device) - return device, nil - } - } - - return "", nil -} - -func findNvmeVolume(findName string) (device string, err error) { - p := pathFor(filepath.Join("/dev/disk/by-id", findName)) - stat, err := os.Lstat(p) - if err != nil { - if os.IsNotExist(err) { - klog.V(4).Infof("nvme path not found %q", p) - return "", nil - } - return "", fmt.Errorf("error getting stat of %q: %v", p, err) - } - - if stat.Mode()&os.ModeSymlink != os.ModeSymlink { - klog.Warningf("nvme file %q found, but was not a symlink", p) - return "", nil - } - - resolved, err := filepath.EvalSymlinks(p) - if err != nil { - return "", fmt.Errorf("error reading target of symlink %q: %v", p, err) - } - - // Reverse pathFor - devPath := pathFor("/dev") - if strings.HasPrefix(resolved, devPath) { - resolved = strings.Replace(resolved, devPath, "/dev", 1) - } - - if !strings.HasPrefix(resolved, "/dev") { - return "", fmt.Errorf("resolved symlink for %q was unexpected: %q", p, resolved) - } - - return resolved, nil -} - -// assignDevice picks a hopefully unused device and reserves it for the volume attachment -func (a *AWSVolumes) assignDevice(volumeID string) (string, error) { - a.mutex.Lock() - defer a.mutex.Unlock() - - // TODO: Check for actual devices in use (like cloudprovider does) - for _, d := range devices { - if a.deviceMap[d] == "" { - a.deviceMap[d] = volumeID - return d, nil - } - } - return "", fmt.Errorf("All devices in use") -} - -// releaseDevice releases the volume mapping lock; used when an attach was known to fail -func (a *AWSVolumes) releaseDevice(d string, volumeID string) { - a.mutex.Lock() - defer a.mutex.Unlock() - - if a.deviceMap[d] != volumeID { - klog.Fatalf("deviceMap logic error: %q -> %q, not %q", d, a.deviceMap[d], volumeID) - } - a.deviceMap[d] = "" -} - -// AttachVolume attaches the specified volume to this instance, returning the mountpoint & nil if successful -func (a *AWSVolumes) AttachVolume(volume *Volume) error { - volumeID := volume.ID - - device := volume.LocalDevice - if device == "" { - d, err := a.assignDevice(volumeID) - if err != nil { - return err - } - device = d - - request := &ec2.AttachVolumeInput{ - Device: aws.String(device), - InstanceId: aws.String(a.instanceId), - VolumeId: aws.String(volumeID), - } - - attachResponse, err := a.ec2.AttachVolume(request) - if err != nil { - return fmt.Errorf("Error attaching EBS volume %q: %v", volumeID, err) - } - - klog.V(2).Infof("AttachVolume request returned %v", attachResponse) - } - - // Wait (forever) for volume to attach or reach a failure-to-attach condition - for { - request := &ec2.DescribeVolumesInput{ - VolumeIds: []*string{&volumeID}, - } - - volumes, err := a.findVolumes(request) - if err != nil { - return fmt.Errorf("Error describing EBS volume %q: %v", volumeID, err) - } - - if len(volumes) == 0 { - return fmt.Errorf("EBS volume %q disappeared during attach", volumeID) - } - if len(volumes) != 1 { - return fmt.Errorf("Multiple volumes found with id %q", volumeID) - } - - v := volumes[0] - if v.AttachedTo != "" { - if v.AttachedTo == a.instanceId { - // TODO: Wait for device to appear? - - volume.LocalDevice = device - return nil - } - a.releaseDevice(device, volumeID) - - return fmt.Errorf("Unable to attach volume %q, was attached to %q", volumeID, v.AttachedTo) - } - - switch v.Status { - case "attaching": - klog.V(2).Infof("Waiting for volume %q to be attached (currently %q)", volumeID, v.Status) - // continue looping - - default: - return fmt.Errorf("Observed unexpected volume state %q", v.Status) - } - - time.Sleep(10 * time.Second) - } -} - -func (a *AWSVolumes) GossipSeeds() (gossip.SeedProvider, error) { +func (a *AWSCloudProvider) GossipSeeds() (gossip.SeedProvider, error) { tags := make(map[string]string) tags[awsup.TagClusterName] = a.clusterTag return gossipaws.NewSeedProvider(a.ec2, tags) } -func (a *AWSVolumes) InstanceID() string { +func (a *AWSCloudProvider) InstanceID() string { return a.instanceId } diff --git a/protokube/pkg/protokube/azure_volume.go b/protokube/pkg/protokube/azure_volume.go index e0da179614..7b0e9667f4 100644 --- a/protokube/pkg/protokube/azure_volume.go +++ b/protokube/pkg/protokube/azure_volume.go @@ -38,8 +38,8 @@ type client interface { var _ client = &gossipazure.Client{} -// AzureVolumes implements the Volumes interface for Azure. -type AzureVolumes struct { +// AzureCloudProvider implements the CloudProvider interface for Azure. +type AzureCloudProvider struct { client client clusterTag string @@ -47,10 +47,10 @@ type AzureVolumes struct { internalIP net.IP } -var _ Volumes = &AzureVolumes{} +var _ CloudProvider = &AzureCloudProvider{} -// NewAzureVolumes returns a new AzureVolumes. -func NewAzureVolumes() (*AzureVolumes, error) { +// NewAzureCloudProvider returns a new AzureCloudProvider. +func NewAzureCloudProvider() (*AzureCloudProvider, error) { client, err := gossipazure.NewClient() if err != nil { return nil, fmt.Errorf("error creating a new Azure client: %s", err) @@ -72,7 +72,7 @@ func NewAzureVolumes() (*AzureVolumes, error) { if internalIP == nil { return nil, fmt.Errorf("error querying internal IP") } - return &AzureVolumes{ + return &AzureCloudProvider{ client: client, clusterTag: clusterTag, instanceID: instanceID, @@ -80,37 +80,20 @@ func NewAzureVolumes() (*AzureVolumes, error) { }, nil } -// InstanceID implements Volumes InstanceID. -func (a *AzureVolumes) InstanceID() string { +// InstanceID implements CloudProvider InstanceID. +func (a *AzureCloudProvider) InstanceID() string { return a.instanceID } -// InternalIP implements Volumes InternalIP. -func (a *AzureVolumes) InternalIP() net.IP { +// InstanceInternalIP implements CloudProvider InstanceInternalIP. +func (a *AzureCloudProvider) InstanceInternalIP() net.IP { return a.internalIP } -func (a *AzureVolumes) GossipSeeds() (gossip.SeedProvider, error) { +// GossipSeeds implements CloudProvider GossipSeeds. +func (a *AzureCloudProvider) GossipSeeds() (gossip.SeedProvider, error) { tags := map[string]string{ azure.TagClusterName: a.clusterTag, } return gossipazure.NewSeedProvider(a.client, tags) } - -func (a *AzureVolumes) AttachVolume(volume *Volume) error { - // TODO(kenji): Implement this. We currently don't need to implement this - // as we let etcd-manager manage volumes, not protokube. - return nil -} - -func (a *AzureVolumes) FindVolumes() ([]*Volume, error) { - // TODO(kenji): Implement this. We currently don't need to implement this - // as we let etcd-manager manage volumes, not protokube. - return nil, nil -} - -func (a *AzureVolumes) FindMountedVolume(volume *Volume) (string, error) { - // TODO(kenji): Implement this. We currently don't need to implement this - // as we let etcd-manager manage volumes, not protokube. - return "", nil -} diff --git a/protokube/pkg/protokube/do_volume.go b/protokube/pkg/protokube/do_volume.go index 02fc27dcd5..51e401cdd8 100644 --- a/protokube/pkg/protokube/do_volume.go +++ b/protokube/pkg/protokube/do_volume.go @@ -26,7 +26,6 @@ import ( "os" "strconv" "strings" - "time" "github.com/digitalocean/godo" "golang.org/x/oauth2" @@ -50,17 +49,18 @@ type TokenSource struct { AccessToken string } -type DOVolumes struct { +type DOCloudProvider struct { ClusterID string godoClient *godo.Client region string dropletName string dropletID int + dropletIP net.IP dropletTags []string } -var _ Volumes = &DOVolumes{} +var _ CloudProvider = &DOCloudProvider{} func GetClusterID() (string, error) { clusterID := "" @@ -88,22 +88,27 @@ func GetClusterID() (string, error) { return clusterID, fmt.Errorf("failed to get droplet clusterID") } -func NewDOVolumes() (*DOVolumes, error) { +func NewDOCloudProvider() (*DOCloudProvider, error) { region, err := getMetadataRegion() if err != nil { return nil, fmt.Errorf("failed to get droplet region: %s", err) } - dropletID, err := getMetadataDropletID() + dropletIDStr, err := getMetadataDropletID() if err != nil { return nil, fmt.Errorf("failed to get droplet id: %s", err) } - - dropletIDInt, err := strconv.Atoi(dropletID) + dropletID, err := strconv.Atoi(dropletIDStr) if err != nil { return nil, fmt.Errorf("failed to convert droplet ID to int: %s", err) } + dropletIPStr, err := getMetadata(dropletInternalIPMetadataURL) + if err != nil { + return nil, fmt.Errorf("failed to get droplet ip: %s", err) + } + dropletIP := net.ParseIP(dropletIPStr) + dropletName, err := getMetadataDropletName() if err != nil { return nil, fmt.Errorf("failed to get droplet name: %s", err) @@ -124,10 +129,11 @@ func NewDOVolumes() (*DOVolumes, error) { return nil, fmt.Errorf("failed to get clusterID: %s", err) } - return &DOVolumes{ + return &DOCloudProvider{ godoClient: godoClient, ClusterID: clusterID, - dropletID: dropletIDInt, + dropletID: dropletID, + dropletIP: dropletIP, dropletName: dropletName, region: region, dropletTags: dropletTags, @@ -158,130 +164,13 @@ func NewDOCloud() (*godo.Client, error) { return client, nil } -func (d *DOVolumes) AttachVolume(volume *Volume) error { - for { - action, _, err := d.godoClient.StorageActions.Attach(context.TODO(), volume.ID, d.dropletID) - if err != nil { - return fmt.Errorf("error attaching volume: %s", err) - } - - if action.Status != godo.ActionInProgress && action.Status != godo.ActionCompleted { - return fmt.Errorf("invalid status for digitalocean volume: %s", volume.ID) - } - - doVolume, err := d.getVolumeByID(volume.ID) - if err != nil { - return fmt.Errorf("error getting volume status: %s", err) - } - - if len(doVolume.DropletIDs) == 1 { - if doVolume.DropletIDs[0] != d.dropletID { - return fmt.Errorf("digitalocean volume %s is attached to another droplet", doVolume.ID) - } - - volume.LocalDevice = getLocalDeviceName(doVolume) - return nil - } - - time.Sleep(10 * time.Second) - } -} - -func (d *DOVolumes) FindVolumes() ([]*Volume, error) { - doVolumes, err := getAllVolumesByRegion(d.godoClient, d.region) - if err != nil { - return nil, fmt.Errorf("failed to list volumes: %s", err) - } - - var volumes []*Volume - for _, doVolume := range doVolumes { - // determine if this volume belongs to this cluster - // check for string d.ClusterID but with strings "." replaced with "-" - if !strings.Contains(doVolume.Name, strings.Replace(d.ClusterID, ".", "-", -1)) { - continue - } - - vol := &Volume{ - ID: doVolume.ID, - Info: VolumeInfo{ - Description: doVolume.Description, - }, - } - - if len(doVolume.DropletIDs) == 1 { - vol.AttachedTo = strconv.Itoa(doVolume.DropletIDs[0]) - vol.LocalDevice = getLocalDeviceName(&doVolume) - } - - etcdClusterSpec, err := d.getEtcdClusterSpec(doVolume) - if err != nil { - return nil, fmt.Errorf("failed to get etcd cluster spec: %s", err) - } - - vol.Info.EtcdClusters = append(vol.Info.EtcdClusters, etcdClusterSpec) - volumes = append(volumes, vol) - } - - return volumes, nil -} - -func getAllVolumesByRegion(godoClient *godo.Client, region string) ([]godo.Volume, error) { - allVolumes := []godo.Volume{} - - opt := &godo.ListOptions{} - for { - volumes, resp, err := godoClient.Storage.ListVolumes(context.TODO(), &godo.ListVolumeParams{ - Region: region, - ListOptions: opt, - }) - if err != nil { - return nil, err - } - - allVolumes = append(allVolumes, volumes...) - - if resp.Links == nil || resp.Links.IsLastPage() { - break - } - - page, err := resp.Links.CurrentPage() - if err != nil { - return nil, err - } - - opt.Page = page + 1 - } - - return allVolumes, nil -} - -func (d *DOVolumes) FindMountedVolume(volume *Volume) (string, error) { - device := volume.LocalDevice - - _, err := os.Stat(pathFor(device)) - if err == nil { - return device, nil - } - - if !os.IsNotExist(err) { - return "", fmt.Errorf("error checking for device %q: %v", device, err) - } - - return "", nil -} - -func (d *DOVolumes) getVolumeByID(id string) (*godo.Volume, error) { - vol, _, err := d.godoClient.Storage.GetVolume(context.TODO(), id) - return vol, err -} - // getEtcdClusterSpec returns etcd.EtcdClusterSpec which holds // necessary information required for starting an etcd server. // DigitalOcean support on kops only supports single master setup for now // but in the future when it supports multiple masters this method be // updated to handle that case. // TODO: use tags once it's supported for volumes -func (d *DOVolumes) getEtcdClusterSpec(vol godo.Volume) (*etcd.EtcdClusterSpec, error) { +func (d *DOCloudProvider) getEtcdClusterSpec(vol godo.Volume) (*etcd.EtcdClusterSpec, error) { nodeName := d.dropletName var clusterKey string @@ -300,11 +189,7 @@ func (d *DOVolumes) getEtcdClusterSpec(vol godo.Volume) (*etcd.EtcdClusterSpec, }, nil } -func getLocalDeviceName(vol *godo.Volume) string { - return localDevicePrefix + vol.Name -} - -func (d *DOVolumes) GossipSeeds() (gossip.SeedProvider, error) { +func (d *DOCloudProvider) GossipSeeds() (gossip.SeedProvider, error) { for _, dropletTag := range d.dropletTags { if strings.Contains(dropletTag, strings.Replace(d.ClusterID, ".", "-", -1)) { return gossipdo.NewSeedProvider(d.godoClient, dropletTag) @@ -314,19 +199,12 @@ func (d *DOVolumes) GossipSeeds() (gossip.SeedProvider, error) { return nil, fmt.Errorf("could not determine a matching droplet tag for gossip seeding") } -func (d *DOVolumes) InstanceName() string { +func (d *DOCloudProvider) InstanceID() string { return d.dropletName } -// GetDropletInternalIP gets the private IP of the droplet running this program -// This function is exported so it can be called from protokube -func GetDropletInternalIP() (net.IP, error) { - addr, err := getMetadata(dropletInternalIPMetadataURL) - if err != nil { - return nil, err - } - - return net.ParseIP(addr), nil +func (d *DOCloudProvider) InstanceInternalIP() net.IP { + return d.dropletIP } func getMetadataRegion() (string, error) { diff --git a/protokube/pkg/protokube/gce_volume.go b/protokube/pkg/protokube/gce_volume.go index f00f41c956..d9c5287976 100644 --- a/protokube/pkg/protokube/gce_volume.go +++ b/protokube/pkg/protokube/gce_volume.go @@ -17,23 +17,19 @@ limitations under the License. package protokube import ( - "context" "fmt" "net" - "os" "strings" "cloud.google.com/go/compute/metadata" compute "google.golang.org/api/compute/v1" "k8s.io/klog/v2" - "k8s.io/kops/protokube/pkg/etcd" "k8s.io/kops/protokube/pkg/gossip" - "k8s.io/kops/upup/pkg/fi/cloudup/gce" "k8s.io/kops/upup/pkg/fi/cloudup/gce/gcediscovery" ) -// GCEVolumes is the Volumes implementation for GCE -type GCEVolumes struct { +// GCECloudProvider is the CloudProvider implementation for GCE +type GCECloudProvider struct { compute *compute.Service discovery *gcediscovery.Discovery @@ -45,16 +41,16 @@ type GCEVolumes struct { internalIP net.IP } -var _ Volumes = &GCEVolumes{} +var _ CloudProvider = &GCECloudProvider{} -// NewGCEVolumes builds a GCEVolumes -func NewGCEVolumes() (*GCEVolumes, error) { +// NewGCECloudProvider builds a GCECloudProvider +func NewGCECloudProvider() (*GCECloudProvider, error) { discovery, err := gcediscovery.New() if err != nil { return nil, err } - a := &GCEVolumes{ + a := &GCECloudProvider{ discovery: discovery, compute: discovery.Compute(), } @@ -68,16 +64,16 @@ func NewGCEVolumes() (*GCEVolumes, error) { } // Project returns the current GCE project -func (a *GCEVolumes) Project() string { +func (a *GCECloudProvider) Project() string { return a.project } -// InternalIP implements Volumes InternalIP -func (a *GCEVolumes) InternalIP() net.IP { +// InstanceInternalIP implements CloudProvider InstanceInternalIP +func (a *GCECloudProvider) InstanceInternalIP() net.IP { return a.internalIP } -func (a *GCEVolumes) discoverTags() error { +func (a *GCECloudProvider) discoverTags() error { // Cluster Name { a.clusterName = a.discovery.ClusterName() @@ -139,207 +135,10 @@ func (a *GCEVolumes) discoverTags() error { return nil } -func (v *GCEVolumes) buildGCEVolume(d *compute.Disk) (*Volume, error) { - volumeName := d.Name - vol := &Volume{ - ID: volumeName, - Info: VolumeInfo{ - Description: volumeName, - }, - } - - vol.Status = d.Status - - for _, attachedTo := range d.Users { - u, err := gce.ParseGoogleCloudURL(attachedTo) - if err != nil { - return nil, fmt.Errorf("error parsing disk attachment url %q: %v", attachedTo, err) - } - - vol.AttachedTo = u.Name - - if u.Project == v.project && u.Zone == v.zone && u.Name == v.instanceName { - devicePath := "/dev/disk/by-id/google-" + volumeName - vol.LocalDevice = devicePath - klog.V(2).Infof("volume %q is attached to this instance at %s", d.Name, devicePath) - } else { - klog.V(2).Infof("volume %q is attached to another instance %q", d.Name, attachedTo) - } - } - - for k, v := range d.Labels { - switch k { - case gce.GceLabelNameKubernetesCluster: - { - // Ignore - } - - default: - if strings.HasPrefix(k, gce.GceLabelNameEtcdClusterPrefix) { - etcdClusterName := k[len(gce.GceLabelNameEtcdClusterPrefix):] - - value, err := gce.DecodeGCELabel(v) - if err != nil { - return nil, fmt.Errorf("Error decoding GCE label: %s=%q", k, v) - } - spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, value) - if err != nil { - return nil, fmt.Errorf("error parsing etcd cluster label %q on volume %q: %v", value, volumeName, err) - } - vol.Info.EtcdClusters = append(vol.Info.EtcdClusters, spec) - } else if strings.HasPrefix(k, gce.GceLabelNameRolePrefix) { - // Ignore - } else { - klog.Warningf("unknown label on volume %q: %s=%s", volumeName, k, v) - } - } - } - - return vol, nil -} - -func (v *GCEVolumes) FindVolumes() ([]*Volume, error) { - var volumes []*Volume - - klog.V(2).Infof("Listing GCE disks in %s/%s", v.project, v.zone) - - // TODO: Apply filters - ctx := context.Background() - err := v.compute.Disks.List(v.project, v.zone).Pages(ctx, func(page *compute.DiskList) error { - for _, d := range page.Items { - klog.V(4).Infof("Found disk %q with labels %v", d.Name, d.Labels) - - diskClusterName := d.Labels[gce.GceLabelNameKubernetesCluster] - if diskClusterName == "" { - klog.V(4).Infof("Skipping disk %q with no cluster name", d.Name) - continue - } - // Note that the cluster name is _not_ encoded with EncodeGCELabel - // this is because it is also used by k8s itself, e.g. in the route controller, - // and that is not encoded (issue #28436) - // Instead we use the much simpler SafeClusterName sanitizer - findClusterName := gce.SafeClusterName(v.clusterName) - if diskClusterName != findClusterName { - klog.V(2).Infof("Skipping disk %q with cluster name that does not match: %s=%s (looking for %s)", d.Name, gce.GceLabelNameKubernetesCluster, diskClusterName, findClusterName) - continue - } - - roles := make(map[string]string) - for k, v := range d.Labels { - if strings.HasPrefix(k, gce.GceLabelNameRolePrefix) { - roleName := strings.TrimPrefix(k, gce.GceLabelNameRolePrefix) - - value, err := gce.DecodeGCELabel(v) - if err != nil { - klog.Warningf("error decoding GCE role label: %s=%s", k, v) - continue - } - roles[roleName] = value - } - } - - _, isMaster := roles["master"] - if !isMaster { - klog.V(2).Infof("Skipping disk %q - no master role", d.Name) - continue - } - - vol, err := v.buildGCEVolume(d) - if err != nil { - // Fail safe - klog.Warningf("skipping malformed volume %q: %v", d.Name, err) - continue - } - volumes = append(volumes, vol) - } - - return nil - }) - if err != nil { - return nil, fmt.Errorf("error querying GCE disks: %v", err) - } - - //instance, err := v.compute.Instances.Get(v.project, v.zone, v.instanceName).Do() - //for _, d := range instance.Disks { - // var found *Volume - // source := d.Source - // for _, v := range volumes { - // if v.ID == source { - // if found != nil { - // return nil, fmt.Errorf("Found multiple volumes with name %q", v.ID) - // } - // found = v - // } - // } - // - // if found != nil { - // if d.DeviceName == "" { - // return fmt.Errorf("DeviceName for mounted disk %q was unexpected empty", d.Source) - // } - // found.LocalDevice = d.DeviceName - // } - //} - - return volumes, nil -} - -// FindMountedVolume implements Volumes::FindMountedVolume -func (v *GCEVolumes) FindMountedVolume(volume *Volume) (string, error) { - device := volume.LocalDevice - - _, err := os.Stat(pathFor(device)) - if err == nil { - return device, nil - } - if os.IsNotExist(err) { - return "", nil - } - return "", fmt.Errorf("error checking for device %q: %v", device, err) -} - -// AttachVolume attaches the specified volume to this instance, returning the mountpoint & nil if successful -func (v *GCEVolumes) AttachVolume(volume *Volume) error { - volumeName := volume.ID - - volumeURL := gce.GoogleCloudURL{ - Project: v.project, - Zone: v.zone, - Name: volumeName, - Type: "disks", - } - - attachedDisk := &compute.AttachedDisk{ - DeviceName: volumeName, - // TODO: The k8s GCE provider sets Kind, but this seems wrong. Open an issue? - // Kind: disk.Kind, - Mode: "READ_WRITE", - Source: volumeURL.BuildURL(), - Type: "PERSISTENT", - } - - attachOp, err := v.compute.Instances.AttachDisk(v.project, v.zone, v.instanceName, attachedDisk).Do() - if err != nil { - return fmt.Errorf("error attach disk %q: %v", volumeName, err) - } - - err = gce.WaitForOp(v.compute, attachOp) - if err != nil { - return fmt.Errorf("error waiting for disk attach to complete %q: %v", volumeName, err) - } - - devicePath := "/dev/disk/by-id/google-" + volumeName - - // TODO: Wait for device to appear? - - volume.LocalDevice = devicePath - - return nil -} - -func (g *GCEVolumes) GossipSeeds() (gossip.SeedProvider, error) { +func (g *GCECloudProvider) GossipSeeds() (gossip.SeedProvider, error) { return g.discovery, nil } -func (g *GCEVolumes) InstanceName() string { +func (g *GCECloudProvider) InstanceID() string { return g.instanceName } diff --git a/protokube/pkg/protokube/hetzner_volume.go b/protokube/pkg/protokube/hetzner_volume.go index 3491986546..347862681b 100644 --- a/protokube/pkg/protokube/hetzner_volume.go +++ b/protokube/pkg/protokube/hetzner_volume.go @@ -21,7 +21,6 @@ import ( "fmt" "net" "os" - "strconv" "github.com/hetznercloud/hcloud-go/hcloud" "github.com/hetznercloud/hcloud-go/hcloud/metadata" @@ -31,31 +30,17 @@ import ( "k8s.io/kops/upup/pkg/fi/cloudup/hetzner" ) -// HetznerVolumes defines the Hetzner Cloud volume implementation. -type HetznerVolumes struct { +// HetznerCloudProvider defines the Hetzner Cloud volume implementation. +type HetznerCloudProvider struct { hcloudClient *hcloud.Client server *hcloud.Server + serverIP net.IP } -func (h HetznerVolumes) AttachVolume(volume *Volume) error { - // TODO(hakman): no longer needed, remove from interface - panic("implement me") -} +var _ CloudProvider = &HetznerCloudProvider{} -func (h HetznerVolumes) FindVolumes() ([]*Volume, error) { - // TODO(hakman): no longer needed, remove from interface - panic("implement me") -} - -func (h HetznerVolumes) FindMountedVolume(volume *Volume) (device string, err error) { - // TODO(hakman): no longer needed, remove from interface - panic("implement me") -} - -var _ Volumes = &HetznerVolumes{} - -// NewHetznerVolumes returns a new Hetzner Cloud volume provider. -func NewHetznerVolumes() (*HetznerVolumes, error) { +// NewHetznerCloudProvider returns a new Hetzner Cloud provider. +func NewHetznerCloudProvider() (*HetznerCloudProvider, error) { serverID, err := metadata.NewClient().InstanceID() if err != nil { return nil, fmt.Errorf("failed to retrieve server id: %s", err) @@ -89,25 +74,20 @@ func NewHetznerVolumes() (*HetznerVolumes, error) { return nil, fmt.Errorf("failed to find private net of the running server") } - h := &HetznerVolumes{ + h := &HetznerCloudProvider{ hcloudClient: hcloudClient, server: server, + serverIP: server.PrivateNet[0].IP, } return h, nil } -func (h HetznerVolumes) InternalIP() (ip net.IP, err error) { - if len(h.server.PrivateNet) == 0 { - return nil, fmt.Errorf("failed to find server private ip address") - } - - klog.V(4).Infof("Found first private IP of the running server: %s", h.server.PrivateNet[0].IP.String()) - - return h.server.PrivateNet[0].IP, nil +func (h HetznerCloudProvider) InstanceInternalIP() net.IP { + return h.serverIP } -func (h *HetznerVolumes) GossipSeeds() (gossip.SeedProvider, error) { +func (h *HetznerCloudProvider) GossipSeeds() (gossip.SeedProvider, error) { clusterName, ok := h.server.Labels[hetzner.TagKubernetesClusterName] if !ok { return nil, fmt.Errorf("failed to find cluster name label for running server: %v", h.server.Labels) @@ -115,6 +95,6 @@ func (h *HetznerVolumes) GossipSeeds() (gossip.SeedProvider, error) { return gossiphetzner.NewSeedProvider(h.hcloudClient, clusterName) } -func (h *HetznerVolumes) InstanceID() string { - return strconv.Itoa(h.server.ID) +func (h *HetznerCloudProvider) InstanceID() string { + return fmt.Sprintf("%s-%d", h.server.Name, h.server.ID) } diff --git a/protokube/pkg/protokube/kube_boot.go b/protokube/pkg/protokube/kube_boot.go index 2fdad35eac..7c9b1f1591 100644 --- a/protokube/pkg/protokube/kube_boot.go +++ b/protokube/pkg/protokube/kube_boot.go @@ -98,14 +98,3 @@ func (k *KubeBoot) syncOnce(ctx context.Context) error { return nil } - -func pathFor(hostPath string) string { - if hostPath[0] != '/' { - klog.Fatalf("path was not absolute: %q", hostPath) - } - return RootFS + hostPath[1:] -} - -func (k *KubeBoot) String() string { - return DebugString(k) -} diff --git a/protokube/pkg/protokube/kube_boot_task.go b/protokube/pkg/protokube/kube_boot_task.go deleted file mode 100644 index c93465a9b1..0000000000 --- a/protokube/pkg/protokube/kube_boot_task.go +++ /dev/null @@ -1,157 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package protokube - -// -//import ( -// "encoding/json" -// "fmt" -// "k8s.io/klog/v2" -// "io" -// "io/ioutil" -// "os" -// "os/exec" -// "path" -// "strings" -// "sync" -// "time" -//) -// -//const BootstrapDir = "/etc/kubernetes/bootstrap" -// -//type BootstrapTask struct { -// Command []string `json:"command"` -//} -// -//func (b *BootstrapTask) String() string { -// return DebugString(b) -//} -// -//// RunKubelet runs the bootstrap tasks, and watches them until they exit -//// Currently only one task is supported / will work properly -//func (k *KubeBoot) RunBootstrapTasks() error { -// bootstrapDir := pathFor(BootstrapDir) -// -// var dirs []os.FileInfo -// var err error -// -// for { -// dirs, err = ioutil.ReadDir(bootstrapDir) -// if err != nil { -// if os.IsNotExist(err) { -// dirs = nil -// } else { -// return fmt.Errorf("error listing %q: %v", bootstrapDir, err) -// } -// } -// if len(dirs) != 0 { -// break -// } -// klog.Infof("No entries found in %q", BootstrapDir) -// time.Sleep(10 * time.Second) -// } -// -// for _, dir := range dirs { -// if !dir.IsDir() { -// continue -// } -// -// p := path.Join(bootstrapDir, dir.Name()) -// files, err := ioutil.ReadDir(p) -// if err != nil { -// return fmt.Errorf("error listing %q: %v", p, err) -// } -// -// if len(files) == 0 { -// klog.Infof("No files in %q; ignoring", p) -// continue -// } -// -// // TODO: Support more than one bootstrap task? -// -// // TODO: Have multiple proto-kubelet configurations to support recovery? -// // i.e. launch newest version that stays up? -// -// fp := path.Join(p, files[0].Name()) -// err = k.runBootstrapTask(fp) -// if err != nil { -// return fmt.Errorf("error running bootstrap task %q: %v", fp, err) -// } -// } -// -// return nil -//} -// -//// RunKubelet runs a bootstrap task and watches it until it exits -//func (k *KubeBoot) runBootstrapTask(path string) error { -// // TODO: Use a file lock or similar to only start proto-kubelet if real-kubelet is not running? -// -// data, err := ioutil.ReadFile(path) -// if err != nil { -// return fmt.Errorf("error reading task %q: %v", path, err) -// } -// -// task := &BootstrapTask{} -// -// err = json.Unmarshal(data, task) -// if err != nil { -// return fmt.Errorf("error parsing task %q: %v", path, err) -// } -// -// name := task.Command[0] -// args := task.Command[1:] -// -// cmd := exec.Command(name, args...) -// -// stdout, err := cmd.StdoutPipe() -// if err != nil { -// return fmt.Errorf("error building stdout pipe: %v", err) -// } -// stderr, err := cmd.StderrPipe() -// if err != nil { -// return fmt.Errorf("error building stderr pipe: %v", err) -// } -// -// wg := new(sync.WaitGroup) -// wg.Add(2) -// -// err = cmd.Start() -// if err != nil { -// return fmt.Errorf("error starting command %q: %v", strings.Join(task.Command, " "), err) -// } -// -// go copyStream(os.Stdout, stdout, wg) -// go copyStream(os.Stderr, stderr, wg) -// -// wg.Wait() -// -// err = cmd.Wait() -// if err != nil { -// return fmt.Errorf("error from command %q: %v", task.Command, err) -// } -// -// return nil -//} -// -//func copyStream(dst io.Writer, src io.ReadCloser, waitGroup *sync.WaitGroup) { -// _, err := io.Copy(dst, src) -// if err != nil { -// // Not entirely sure if we need to do something special in this case? -// klog.Warningf("error copying stream: %v", err) -// } -// waitGroup.Done() -//} diff --git a/protokube/pkg/protokube/openstack_volume.go b/protokube/pkg/protokube/openstack_volume.go index 7d19793a4c..c95caa4eea 100644 --- a/protokube/pkg/protokube/openstack_volume.go +++ b/protokube/pkg/protokube/openstack_volume.go @@ -22,13 +22,9 @@ import ( "io" "net" "net/http" - "os" "strings" - cinderv3 "github.com/gophercloud/gophercloud/openstack/blockstorage/v3/volumes" - "github.com/gophercloud/gophercloud/openstack/compute/v2/extensions/volumeattach" "k8s.io/klog/v2" - "k8s.io/kops/protokube/pkg/etcd" "k8s.io/kops/protokube/pkg/gossip" gossipos "k8s.io/kops/protokube/pkg/gossip/openstack" "k8s.io/kops/upup/pkg/fi/cloudup/openstack" @@ -50,8 +46,8 @@ type InstanceMetadata struct { ServerID string `json:"uuid"` } -// GCEVolumes is the Volumes implementation for GCE -type OpenstackVolumes struct { +// OpenStackCloudProvider is the CloudProvider implementation for OpenStack +type OpenStackCloudProvider struct { cloud openstack.OpenstackCloud meta *InstanceMetadata @@ -63,7 +59,7 @@ type OpenstackVolumes struct { storageZone string } -var _ Volumes = &OpenstackVolumes{} +var _ CloudProvider = &OpenStackCloudProvider{} func getLocalMetadata() (*InstanceMetadata, error) { var meta InstanceMetadata @@ -88,8 +84,8 @@ func getLocalMetadata() (*InstanceMetadata, error) { return nil, err } -// NewOpenstackVolumes builds a OpenstackVolume -func NewOpenstackVolumes() (*OpenstackVolumes, error) { +// NewOpenStackCloudProvider builds a OpenstackVolume +func NewOpenStackCloudProvider() (*OpenStackCloudProvider, error) { metadata, err := getLocalMetadata() if err != nil { return nil, fmt.Errorf("Failed to get server metadata: %v", err) @@ -101,10 +97,10 @@ func NewOpenstackVolumes() (*OpenstackVolumes, error) { oscloud, err := openstack.NewOpenstackCloud(tags, nil, "protokube") if err != nil { - return nil, fmt.Errorf("Failed to initialize OpenstackVolumes: %v", err) + return nil, fmt.Errorf("Failed to initialize OpenStackCloudProvider: %v", err) } - a := &OpenstackVolumes{ + a := &OpenStackCloudProvider{ cloud: oscloud, meta: metadata, } @@ -117,17 +113,17 @@ func NewOpenstackVolumes() (*OpenstackVolumes, error) { return a, nil } -// Project returns the current GCE project -func (a *OpenstackVolumes) Project() string { +// Project returns the current OpenStack project +func (a *OpenStackCloudProvider) Project() string { return a.meta.ProjectID } -// InternalIP implements Volumes InternalIP -func (a *OpenstackVolumes) InternalIP() net.IP { +// InstanceInternalIP implements CloudProvider InstanceInternalIP +func (a *OpenStackCloudProvider) InstanceInternalIP() net.IP { return a.internalIP } -func (a *OpenstackVolumes) discoverTags() error { +func (a *OpenStackCloudProvider) discoverTags() error { // Cluster Name { a.clusterName = strings.TrimSpace(string(a.meta.UserMeta.ClusterName)) @@ -185,99 +181,10 @@ func (a *OpenstackVolumes) discoverTags() error { return nil } -func (v *OpenstackVolumes) buildOpenstackVolume(d *cinderv3.Volume) (*Volume, error) { - volumeName := d.Name - vol := &Volume{ - ID: d.ID, - Info: VolumeInfo{ - Description: volumeName, - }, - } - - vol.Status = d.Status - - for _, attachedTo := range d.Attachments { - vol.AttachedTo = attachedTo.HostName - if attachedTo.ServerID == v.meta.ServerID { - vol.LocalDevice = attachedTo.Device - } - } - - // FIXME: Zone matters, broken in my env - - for k, v := range d.Metadata { - if strings.HasPrefix(k, openstack.TagNameEtcdClusterPrefix) { - etcdClusterName := k[len(openstack.TagNameEtcdClusterPrefix):] - spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, v) - if err != nil { - return nil, fmt.Errorf("error parsing etcd cluster meta %q on volume %q: %v", v, d.Name, err) - } - vol.Info.EtcdClusters = append(vol.Info.EtcdClusters, spec) - } - } - - return vol, nil -} - -func (v *OpenstackVolumes) FindVolumes() ([]*Volume, error) { - var volumes []*Volume - - klog.V(2).Infof("Listing Openstack disks in %s/%s", v.project, v.meta.AvailabilityZone) - - vols, err := v.cloud.ListVolumes(cinderv3.ListOpts{ - TenantID: v.project, - }) - if err != nil { - return volumes, fmt.Errorf("FindVolumes: Failed to list volume.") - } - - for _, volume := range vols { - if clusterName, ok := volume.Metadata[openstack.TagClusterName]; ok && clusterName == v.clusterName { - if _, isMasterRole := volume.Metadata[openstack.TagNameRolePrefix+"master"]; isMasterRole { - vol, err := v.buildOpenstackVolume(&volume) - if err != nil { - klog.Errorf("FindVolumes: Failed to build openstack volume %s: %v", volume.Name, err) - continue - } - volumes = append(volumes, vol) - } - } - } - - return volumes, nil -} - -// FindMountedVolume implements Volumes::FindMountedVolume -func (v *OpenstackVolumes) FindMountedVolume(volume *Volume) (string, error) { - device := volume.LocalDevice - - _, err := os.Stat(pathFor(device)) - if err == nil { - return device, nil - } - if os.IsNotExist(err) { - return "", nil - } - return "", fmt.Errorf("error checking for device %q: %v", device, err) -} - -// AttachVolume attaches the specified volume to this instance, returning the mountpoint & nil if successful -func (v *OpenstackVolumes) AttachVolume(volume *Volume) error { - opts := volumeattach.CreateOpts{ - VolumeID: volume.ID, - } - attachment, err := v.cloud.AttachVolume(v.meta.ServerID, opts) - if err != nil { - return fmt.Errorf("AttachVolume: failed to attach volume: %s", err) - } - volume.LocalDevice = attachment.Device - return nil -} - -func (g *OpenstackVolumes) GossipSeeds() (gossip.SeedProvider, error) { +func (g *OpenStackCloudProvider) GossipSeeds() (gossip.SeedProvider, error) { return gossipos.NewSeedProvider(g.cloud.ComputeClient(), g.clusterName, g.project) } -func (g *OpenstackVolumes) InstanceName() string { +func (g *OpenStackCloudProvider) InstanceID() string { return g.instanceName } diff --git a/protokube/pkg/protokube/utils.go b/protokube/pkg/protokube/utils.go deleted file mode 100644 index 2978bd6918..0000000000 --- a/protokube/pkg/protokube/utils.go +++ /dev/null @@ -1,30 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package protokube - -import ( - "encoding/json" - "fmt" -) - -func DebugString(o interface{}) string { - b, err := json.Marshal(o) - if err != nil { - return fmt.Sprintf("error marshaling %T: %v", o, err) - } - return string(b) -} diff --git a/protokube/pkg/protokube/volumes.go b/protokube/pkg/protokube/volumes.go index adc47bee46..81771792fd 100644 --- a/protokube/pkg/protokube/volumes.go +++ b/protokube/pkg/protokube/volumes.go @@ -17,50 +17,13 @@ limitations under the License. package protokube import ( - "k8s.io/kops/protokube/pkg/etcd" + "net" + + "k8s.io/kops/protokube/pkg/gossip" ) -type Volumes interface { - AttachVolume(volume *Volume) error - FindVolumes() ([]*Volume, error) - - // FindMountedVolume returns the device (e.g. /dev/sda) where the volume is mounted - // If not found, it returns "", nil - // On error, it returns "", err - FindMountedVolume(volume *Volume) (device string, err error) -} - -type Volume struct { - // ID is the cloud-provider identifier for the volume - ID string - - // LocalDevice is set if the volume is attached to the local machine - LocalDevice string - - // AttachedTo is set to the ID of the machine the volume is attached to, or "" if not attached - AttachedTo string - - // Mountpoint is the path on which the volume is mounted, if mounted - // It will likely be "/mnt/master-" + ID - Mountpoint string - - // Status is a volume provider specific Status string; it makes it easier for the volume provider - Status string - - Info VolumeInfo -} - -func (v *Volume) String() string { - return DebugString(v) -} - -type VolumeInfo struct { - Description string - // MasterID int - // TODO: Maybe the events cluster can just be a PetSet - do we need it for boot? - EtcdClusters []*etcd.EtcdClusterSpec -} - -func (v *VolumeInfo) String() string { - return DebugString(v) +type CloudProvider interface { + InstanceID() string + InstanceInternalIP() net.IP + GossipSeeds() (gossip.SeedProvider, error) }