mirror of https://github.com/kubernetes/kops.git
Merge pull request #3893 from zengchen1024/cinder_volume_task
Automatic merge from submit-queue. Implement volume task for Openstack platform Implement volume task to create volume for ETCD cluster. Which issue this PR fixes: #3886
This commit is contained in:
commit
ae94d14e54
|
@ -123,6 +123,7 @@ k8s.io/kops/upup/pkg/fi/cloudup/dotasks
|
|||
k8s.io/kops/upup/pkg/fi/cloudup/gce
|
||||
k8s.io/kops/upup/pkg/fi/cloudup/gcetasks
|
||||
k8s.io/kops/upup/pkg/fi/cloudup/openstack
|
||||
k8s.io/kops/upup/pkg/fi/cloudup/openstacktasks
|
||||
k8s.io/kops/upup/pkg/fi/cloudup/terraform
|
||||
k8s.io/kops/upup/pkg/fi/cloudup/vsphere
|
||||
k8s.io/kops/upup/pkg/fi/cloudup/vspheretasks
|
||||
|
|
|
@ -30,6 +30,8 @@ import (
|
|||
"k8s.io/kops/upup/pkg/fi/cloudup/dotasks"
|
||||
"k8s.io/kops/upup/pkg/fi/cloudup/gce"
|
||||
"k8s.io/kops/upup/pkg/fi/cloudup/gcetasks"
|
||||
"k8s.io/kops/upup/pkg/fi/cloudup/openstack"
|
||||
"k8s.io/kops/upup/pkg/fi/cloudup/openstacktasks"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -95,6 +97,11 @@ func (b *MasterVolumeBuilder) Build(c *fi.ModelBuilderContext) error {
|
|||
b.addVSphereVolume(c, name, volumeSize, zone, etcd, m, allMembers)
|
||||
case kops.CloudProviderBareMetal:
|
||||
glog.Fatalf("BareMetal not implemented")
|
||||
case kops.CloudProviderOpenstack:
|
||||
err = b.addOpenstackVolume(c, name, volumeSize, zone, etcd, m, allMembers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown cloudprovider %q", b.Cluster.Spec.CloudProvider)
|
||||
}
|
||||
|
@ -205,3 +212,33 @@ func (b *MasterVolumeBuilder) addGCEVolume(c *fi.ModelBuilderContext, name strin
|
|||
func (b *MasterVolumeBuilder) addVSphereVolume(c *fi.ModelBuilderContext, name string, volumeSize int32, zone string, etcd *kops.EtcdClusterSpec, m *kops.EtcdMemberSpec, allMembers []string) {
|
||||
fmt.Print("addVSphereVolume to be implemented")
|
||||
}
|
||||
|
||||
func (b *MasterVolumeBuilder) addOpenstackVolume(c *fi.ModelBuilderContext, name string, volumeSize int32, zone string, etcd *kops.EtcdClusterSpec, m *kops.EtcdMemberSpec, allMembers []string) error {
|
||||
volumeType := fi.StringValue(m.VolumeType)
|
||||
if volumeType == "" {
|
||||
return fmt.Errorf("must set ETCDMemberSpec.VolumeType on Openstack platform")
|
||||
}
|
||||
|
||||
// The tags are how protokube knows to mount the volume and use it for etcd
|
||||
tags := make(map[string]string)
|
||||
// Apply all user defined labels on the volumes
|
||||
for k, v := range b.Cluster.Spec.CloudLabels {
|
||||
tags[k] = v
|
||||
}
|
||||
// This is the configuration of the etcd cluster
|
||||
tags[openstack.TagNameEtcdClusterPrefix+etcd.Name] = m.Name + "/" + strings.Join(allMembers, ",")
|
||||
// This says "only mount on a master"
|
||||
tags[openstack.TagNameRolePrefix+"master"] = "1"
|
||||
|
||||
t := &openstacktasks.Volume{
|
||||
Name: s(name),
|
||||
AvailabilityZone: s(zone),
|
||||
VolumeType: s(volumeType),
|
||||
SizeGB: fi.Int64(int64(volumeSize)),
|
||||
Tags: tags,
|
||||
Lifecycle: b.Lifecycle,
|
||||
}
|
||||
c.AddTask(t)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -18,25 +18,90 @@ package openstack
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/gophercloud/gophercloud"
|
||||
os "github.com/gophercloud/gophercloud/openstack"
|
||||
cinder "github.com/gophercloud/gophercloud/openstack/blockstorage/v2/volumes"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kops/pkg/apis/kops"
|
||||
"k8s.io/kops/pkg/cloudinstances"
|
||||
"k8s.io/kops/upup/pkg/fi"
|
||||
"k8s.io/kops/util/pkg/vfs"
|
||||
"k8s.io/kubernetes/federation/pkg/dnsprovider"
|
||||
)
|
||||
|
||||
const TagNameEtcdClusterPrefix = "k8s.io/etcd/"
|
||||
const TagNameRolePrefix = "k8s.io/role/"
|
||||
const TagClusterName = "KubernetesCluster"
|
||||
|
||||
// readBackoff is the backoff strategy for openstack read retries.
|
||||
var readBackoff = wait.Backoff{
|
||||
Duration: time.Second,
|
||||
Factor: 1.5,
|
||||
Jitter: 0.1,
|
||||
Steps: 4,
|
||||
}
|
||||
|
||||
// writeBackoff is the backoff strategy for openstack write retries.
|
||||
var writeBackoff = wait.Backoff{
|
||||
Duration: time.Second,
|
||||
Factor: 1.5,
|
||||
Jitter: 0.1,
|
||||
Steps: 5,
|
||||
}
|
||||
|
||||
type OpenstackCloud interface {
|
||||
fi.Cloud
|
||||
|
||||
// SetVolumeTags will set the tags for the Cinder volume
|
||||
SetVolumeTags(id string, tags map[string]string) error
|
||||
|
||||
// GetCloudTags will return the tags attached on cloud
|
||||
GetCloudTags() map[string]string
|
||||
|
||||
// ListVolumes will return the Cinder volumes which match the options
|
||||
ListVolumes(opt cinder.ListOpts) ([]cinder.Volume, error)
|
||||
|
||||
// CreateVolume will create a new Cinder Volume
|
||||
CreateVolume(opt cinder.CreateOpts) (*cinder.Volume, error)
|
||||
}
|
||||
|
||||
type openstackCloud struct {
|
||||
cinderClient *gophercloud.ServiceClient
|
||||
tags map[string]string
|
||||
}
|
||||
|
||||
var _ fi.Cloud = &openstackCloud{}
|
||||
|
||||
func NewOpenstackCloud() (OpenstackCloud, error) {
|
||||
return &openstackCloud{}, nil
|
||||
func NewOpenstackCloud(tags map[string]string) (OpenstackCloud, error) {
|
||||
config := vfs.OpenstackConfig{}
|
||||
|
||||
authOption, err := config.GetCredential()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
provider, err := os.AuthenticatedClient(authOption)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building openstack authenticated client: %v", err)
|
||||
}
|
||||
|
||||
endpointOpt, err := config.GetServiceConfig("Cinder")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cinderClient, err := os.NewBlockStorageV2(provider, endpointOpt)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building swift client: %v", err)
|
||||
}
|
||||
|
||||
c := &openstackCloud{
|
||||
cinderClient: cinderClient,
|
||||
tags: tags,
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *openstackCloud) ProviderID() kops.CloudProviderID {
|
||||
|
@ -62,3 +127,78 @@ func (c *openstackCloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error
|
|||
func (c *openstackCloud) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
|
||||
return nil, fmt.Errorf("openstackCloud::GetCloudGroups not implemented")
|
||||
}
|
||||
|
||||
func (c *openstackCloud) SetVolumeTags(id string, tags map[string]string) error {
|
||||
if len(tags) == 0 {
|
||||
return nil
|
||||
}
|
||||
if id == "" {
|
||||
return fmt.Errorf("error setting tags to unknown volume")
|
||||
}
|
||||
glog.V(4).Infof("setting tags to cinder volume %q: %v", id, tags)
|
||||
|
||||
opt := cinder.UpdateOpts{Metadata: tags}
|
||||
done, err := vfs.RetryWithBackoff(writeBackoff, func() (bool, error) {
|
||||
_, err := cinder.Update(c.cinderClient, id, opt).Extract()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error setting tags to cinder volume %q: %v", id, err)
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
} else if done {
|
||||
return nil
|
||||
} else {
|
||||
return wait.ErrWaitTimeout
|
||||
}
|
||||
}
|
||||
|
||||
func (c *openstackCloud) GetCloudTags() map[string]string {
|
||||
return c.tags
|
||||
}
|
||||
|
||||
func (c *openstackCloud) ListVolumes(opt cinder.ListOpts) ([]cinder.Volume, error) {
|
||||
var volumes []cinder.Volume
|
||||
|
||||
done, err := vfs.RetryWithBackoff(readBackoff, func() (bool, error) {
|
||||
allPages, err := cinder.List(c.cinderClient, opt).AllPages()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error listing volumes %v: %v", opt, err)
|
||||
}
|
||||
|
||||
vs, err := cinder.ExtractVolumes(allPages)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error extracting volumes from pages: %v", err)
|
||||
}
|
||||
volumes = vs
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return volumes, err
|
||||
} else if done {
|
||||
return volumes, nil
|
||||
} else {
|
||||
return volumes, wait.ErrWaitTimeout
|
||||
}
|
||||
}
|
||||
|
||||
func (c *openstackCloud) CreateVolume(opt cinder.CreateOpts) (*cinder.Volume, error) {
|
||||
var volume *cinder.Volume
|
||||
|
||||
done, err := vfs.RetryWithBackoff(writeBackoff, func() (bool, error) {
|
||||
v, err := cinder.Create(c.cinderClient, opt).Extract()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error creating volume %v: %v", opt, err)
|
||||
}
|
||||
volume = v
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return volume, err
|
||||
} else if done {
|
||||
return volume, nil
|
||||
} else {
|
||||
return volume, wait.ErrWaitTimeout
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package openstacktasks
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
cinder "github.com/gophercloud/gophercloud/openstack/blockstorage/v2/volumes"
|
||||
"k8s.io/kops/upup/pkg/fi"
|
||||
"k8s.io/kops/upup/pkg/fi/cloudup/openstack"
|
||||
)
|
||||
|
||||
type Volume struct {
|
||||
ID *string
|
||||
Name *string
|
||||
AvailabilityZone *string
|
||||
VolumeType *string
|
||||
SizeGB *int64
|
||||
Tags map[string]string
|
||||
Lifecycle *fi.Lifecycle
|
||||
}
|
||||
|
||||
var _ fi.CompareWithID = &Volume{}
|
||||
|
||||
func (c *Volume) CompareWithID() *string {
|
||||
return c.ID
|
||||
}
|
||||
|
||||
func (c *Volume) Find(context *fi.Context) (*Volume, error) {
|
||||
cloud := context.Cloud.(openstack.OpenstackCloud)
|
||||
opt := cinder.ListOpts{
|
||||
Name: fi.StringValue(c.Name),
|
||||
Metadata: cloud.GetCloudTags(),
|
||||
}
|
||||
volumes, err := cloud.ListVolumes(opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
n := len(volumes)
|
||||
if n == 0 {
|
||||
return nil, nil
|
||||
} else if n != 1 {
|
||||
return nil, fmt.Errorf("found multiple Volumes with name: %s", fi.StringValue(c.Name))
|
||||
}
|
||||
v := volumes[0]
|
||||
actual := &Volume{
|
||||
ID: fi.String(v.ID),
|
||||
Name: fi.String(v.Name),
|
||||
AvailabilityZone: fi.String(v.AvailabilityZone),
|
||||
VolumeType: fi.String(v.VolumeType),
|
||||
SizeGB: fi.Int64(int64(v.Size)),
|
||||
Tags: v.Metadata,
|
||||
Lifecycle: c.Lifecycle,
|
||||
}
|
||||
return actual, nil
|
||||
}
|
||||
|
||||
func (c *Volume) Run(context *fi.Context) error {
|
||||
cloud := context.Cloud.(openstack.OpenstackCloud)
|
||||
for k, v := range cloud.GetCloudTags() {
|
||||
c.Tags[k] = v
|
||||
}
|
||||
|
||||
return fi.DefaultDeltaRunMethod(c, context)
|
||||
}
|
||||
|
||||
func (_ *Volume) CheckChanges(a, e, changes *Volume) error {
|
||||
if a == nil {
|
||||
if e.Name == nil {
|
||||
return fi.RequiredField("Name")
|
||||
}
|
||||
if e.AvailabilityZone == nil {
|
||||
return fi.RequiredField("AvailabilityZone")
|
||||
}
|
||||
if e.VolumeType == nil {
|
||||
return fi.RequiredField("VolumeType")
|
||||
}
|
||||
if e.SizeGB == nil {
|
||||
return fi.RequiredField("SizeGB")
|
||||
}
|
||||
} else {
|
||||
if changes.ID != nil {
|
||||
return fi.CannotChangeField("ID")
|
||||
}
|
||||
if changes.AvailabilityZone != nil {
|
||||
return fi.CannotChangeField("AvailabilityZone")
|
||||
}
|
||||
if changes.VolumeType != nil {
|
||||
return fi.CannotChangeField("VolumeType")
|
||||
}
|
||||
if changes.SizeGB != nil {
|
||||
return fi.CannotChangeField("SizeGB")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (_ *Volume) RenderOpenstack(t *openstack.OpenstackAPITarget, a, e, changes *Volume) error {
|
||||
if a == nil {
|
||||
glog.V(2).Infof("Creating PersistentVolume with Name:%q", fi.StringValue(e.Name))
|
||||
|
||||
opt := cinder.CreateOpts{
|
||||
Size: int(*e.SizeGB),
|
||||
AvailabilityZone: fi.StringValue(e.AvailabilityZone),
|
||||
Metadata: e.Tags,
|
||||
Name: fi.StringValue(e.Name),
|
||||
VolumeType: fi.StringValue(e.VolumeType),
|
||||
}
|
||||
|
||||
v, err := t.Cloud.CreateVolume(opt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating PersistentVolume: %v", err)
|
||||
}
|
||||
|
||||
e.ID = fi.String(v.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
if changes != nil && changes.Tags != nil {
|
||||
glog.V(2).Infof("Update the tags on volume %q: %v, the differences are %v", fi.StringValue(e.ID), e.Tags, changes.Tags)
|
||||
|
||||
err := t.Cloud.SetVolumeTags(fi.StringValue(e.ID), e.Tags)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error updating the tags on volume %q: %v", fi.StringValue(e.ID), err)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Openstack task Volume::RenderOpenstack did nothing")
|
||||
return nil
|
||||
}
|
|
@ -133,7 +133,8 @@ func BuildCloud(cluster *kops.Cluster) (fi.Cloud, error) {
|
|||
}
|
||||
case kops.CloudProviderOpenstack:
|
||||
{
|
||||
osc, err := openstack.NewOpenstackCloud()
|
||||
cloudTags := map[string]string{openstack.TagClusterName: cluster.ObjectMeta.Name}
|
||||
osc, err := openstack.NewOpenstackCloud(cloudTags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue