diff --git a/cmd/kops/edit_cluster.go b/cmd/kops/edit_cluster.go index efe584b736..72e5ae79d0 100644 --- a/cmd/kops/edit_cluster.go +++ b/cmd/kops/edit_cluster.go @@ -236,8 +236,15 @@ func RunEditCluster(f *util.Factory, cmd *cobra.Command, args []string, out io.W return preservedFile(err, file, out) } + // Retrieve the current status of the cluster. This will eventually be part of the cluster object. + statusDiscovery := &cloudDiscoveryStatusStore{} + status, err := statusDiscovery.FindClusterStatus(oldCluster) + if err != nil { + return err + } + // Note we perform as much validation as we can, before writing a bad config - _, err = clientset.UpdateCluster(newCluster) + _, err = clientset.UpdateCluster(newCluster, status) if err != nil { return preservedFile(err, file, out) } diff --git a/cmd/kops/replace.go b/cmd/kops/replace.go index 59e4eba286..0dda103951 100644 --- a/cmd/kops/replace.go +++ b/cmd/kops/replace.go @@ -115,9 +115,18 @@ func RunReplace(f *util.Factory, cmd *cobra.Command, out io.Writer, c *replaceOp } case *kopsapi.Cluster: - _, err = clientset.UpdateCluster(v) - if err != nil { - return fmt.Errorf("error replacing cluster: %v", err) + { + // Retrieve the current status of the cluster. This will eventually be part of the cluster object. + statusDiscovery := &cloudDiscoveryStatusStore{} + status, err := statusDiscovery.FindClusterStatus(v) + if err != nil { + return err + } + + _, err = clientset.UpdateCluster(v, status) + if err != nil { + return fmt.Errorf("error replacing cluster: %v", err) + } } case *kopsapi.InstanceGroup: diff --git a/cmd/kops/status_discovery.go b/cmd/kops/status_discovery.go index f698359158..ca70a93efc 100644 --- a/cmd/kops/status_discovery.go +++ b/cmd/kops/status_discovery.go @@ -68,3 +68,21 @@ func (s *cloudDiscoveryStatusStore) GetApiIngressStatus(cluster *kops.Cluster) ( return nil, fmt.Errorf("API Ingress Status not implemented for %T", cloud) } + +// FindClusterStatus discovers the status of the cluster, by inspecting the cloud objects +func (s *cloudDiscoveryStatusStore) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) { + cloud, err := cloudup.BuildCloud(cluster) + if err != nil { + return nil, err + } + + if gceCloud, ok := cloud.(gce.GCECloud); ok { + return gceCloud.FindClusterStatus(cluster) + } + + if awsCloud, ok := cloud.(awsup.AWSCloud); ok { + return awsCloud.FindClusterStatus(cluster) + } + + return nil, fmt.Errorf("Etcd Status not implemented for %T", cloud) +} diff --git a/cmd/kops/upgrade_cluster.go b/cmd/kops/upgrade_cluster.go index 349ea82839..e08654a58d 100644 --- a/cmd/kops/upgrade_cluster.go +++ b/cmd/kops/upgrade_cluster.go @@ -300,8 +300,15 @@ func (c *UpgradeClusterCmd) Run(args []string) error { return err } + // Retrieve the current status of the cluster. This will eventually be part of the cluster object. + statusDiscovery := &cloudDiscoveryStatusStore{} + status, err := statusDiscovery.FindClusterStatus(cluster) + if err != nil { + return err + } + // Note we perform as much validation as we can, before writing a bad config - _, err = clientset.UpdateCluster(cluster) + _, err = clientset.UpdateCluster(cluster, status) if err != nil { return err } diff --git a/hack/.packages b/hack/.packages index 01c1a738f7..3546cecffb 100644 --- a/hack/.packages +++ b/hack/.packages @@ -86,6 +86,7 @@ k8s.io/kops/pkg/util/stringorslice k8s.io/kops/pkg/util/templater k8s.io/kops/pkg/validation k8s.io/kops/protokube/cmd/protokube +k8s.io/kops/protokube/pkg/etcd k8s.io/kops/protokube/pkg/gossip k8s.io/kops/protokube/pkg/gossip/aws k8s.io/kops/protokube/pkg/gossip/dns diff --git a/pkg/apis/kops/status.go b/pkg/apis/kops/status.go index 11d9acd642..813bbbb095 100644 --- a/pkg/apis/kops/status.go +++ b/pkg/apis/kops/status.go @@ -16,13 +16,39 @@ limitations under the License. package kops -import "github.com/golang/glog" +import ( + "github.com/golang/glog" +) // StatusStore abstracts the key status functions; and lets us introduce status gradually type StatusStore interface { + // FindClusterStatus discovers the status of the cluster, by inspecting the cloud objects + FindClusterStatus(cluster *Cluster) (*ClusterStatus, error) + GetApiIngressStatus(cluster *Cluster) ([]ApiIngressStatus, error) } +type ClusterStatus struct { + // EtcdClusters stores the status for each cluster + EtcdClusters []EtcdClusterStatus `json:"etcdClusters,omitempty"` +} + +// EtcdStatus represents the status of etcd: because etcd only allows limited reconfiguration, we have to block changes once etcd has been initialized. +type EtcdClusterStatus struct { + // Name is the name of the etcd cluster (main, events etc) + Name string `json:"name,omitempty"` + // EtcdMember stores the configurations for each member of the cluster (including the data volume) + Members []*EtcdMemberStatus `json:"etcdMembers,omitempty"` +} + +type EtcdMemberStatus struct { + // Name is the name of the member within the etcd cluster + Name string `json:"name,omitempty"` + + // volumeId is the id of the cloud volume (e.g. the AWS volume id) + VolumeId string `json:"volumeId,omitempty"` +} + // ApiIngress represents the status of an ingress point: // traffic intended for the service should be sent to an ingress point. type ApiIngressStatus struct { @@ -44,6 +70,12 @@ type NoopStatusStore struct { var _ StatusStore = &NoopStatusStore{} +// FindClusterStatus discovers the status of the cluster, by inspecting the cloud objects +func (s *NoopStatusStore) FindClusterStatus(cluster *Cluster) (*ClusterStatus, error) { + glog.Warningf("FindClusterStatus called on NoopStore") + return nil, nil +} + func (s *NoopStatusStore) GetApiIngressStatus(cluster *Cluster) ([]ApiIngressStatus, error) { glog.Warningf("GetApiIngressStatus called on NoopStore") return nil, nil diff --git a/pkg/apis/kops/validation/cluster.go b/pkg/apis/kops/validation/cluster.go new file mode 100644 index 0000000000..0cbc004ffa --- /dev/null +++ b/pkg/apis/kops/validation/cluster.go @@ -0,0 +1,139 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package validation + +import ( + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/kops/pkg/apis/kops" + "k8s.io/kops/upup/pkg/fi" +) + +func ValidateClusterUpdate(obj *kops.Cluster, status *kops.ClusterStatus, old *kops.Cluster) field.ErrorList { + allErrs := field.ErrorList{} + + if err := ValidateCluster(obj, false); err != nil { + allErrs = append(allErrs, err) + } + + // Validate etcd cluster changes + { + newClusters := make(map[string]*kops.EtcdClusterSpec) + for _, etcdCluster := range obj.Spec.EtcdClusters { + newClusters[etcdCluster.Name] = etcdCluster + } + oldClusters := make(map[string]*kops.EtcdClusterSpec) + for _, etcdCluster := range old.Spec.EtcdClusters { + oldClusters[etcdCluster.Name] = etcdCluster + } + + for k, newCluster := range newClusters { + fp := field.NewPath("Spec", "EtcdClusters").Key(k) + + oldCluster := oldClusters[k] + allErrs = append(allErrs, validateEtcdClusterUpdate(fp, newCluster, status, oldCluster)...) + } + for k := range oldClusters { + newCluster := newClusters[k] + if newCluster == nil { + fp := field.NewPath("Spec", "EtcdClusters").Key(k) + allErrs = append(allErrs, field.Forbidden(fp, "EtcdClusters cannot be removed")) + } + } + } + + return allErrs +} + +func validateEtcdClusterUpdate(fp *field.Path, obj *kops.EtcdClusterSpec, status *kops.ClusterStatus, old *kops.EtcdClusterSpec) field.ErrorList { + allErrs := field.ErrorList{} + + if obj.Name != old.Name { + allErrs = append(allErrs, field.Forbidden(fp.Child("Name"), "Name cannot be changed")) + } + + var etcdClusterStatus *kops.EtcdClusterStatus + if status != nil { + for i := range status.EtcdClusters { + etcdCluster := &status.EtcdClusters[i] + if etcdCluster.Name == obj.Name { + etcdClusterStatus = etcdCluster + } + } + } + + // If the etcd cluster has been created (i.e. if we have status) then we can't support some changes + if etcdClusterStatus != nil { + newMembers := make(map[string]*kops.EtcdMemberSpec) + for _, member := range obj.Members { + newMembers[member.Name] = member + } + oldMembers := make(map[string]*kops.EtcdMemberSpec) + for _, member := range old.Members { + oldMembers[member.Name] = member + } + + for k, newMember := range newMembers { + fp := fp.Child("Members").Key(k) + + oldMember := oldMembers[k] + if oldMember == nil { + allErrs = append(allErrs, field.Forbidden(fp, "EtcdCluster members cannot be added")) + } else { + allErrs = append(allErrs, validateEtcdMemberUpdate(fp, newMember, etcdClusterStatus, oldMember)...) + } + } + for k := range oldMembers { + newCluster := newMembers[k] + if newCluster == nil { + fp := fp.Child("Members").Key(k) + allErrs = append(allErrs, field.Forbidden(fp, "EtcdCluster members cannot be removed")) + } + } + } + + return allErrs +} + +func validateEtcdMemberUpdate(fp *field.Path, obj *kops.EtcdMemberSpec, status *kops.EtcdClusterStatus, old *kops.EtcdMemberSpec) field.ErrorList { + allErrs := field.ErrorList{} + + if obj.Name != old.Name { + allErrs = append(allErrs, field.Forbidden(fp.Child("Name"), "Name cannot be changed")) + } + + if fi.StringValue(obj.InstanceGroup) != fi.StringValue(old.InstanceGroup) { + allErrs = append(allErrs, field.Forbidden(fp.Child("InstanceGroup"), "InstanceGroup cannot be changed")) + } + + if fi.StringValue(obj.VolumeType) != fi.StringValue(old.VolumeType) { + allErrs = append(allErrs, field.Forbidden(fp.Child("VolumeType"), "VolumeType cannot be changed")) + } + + if fi.Int32Value(obj.VolumeSize) != fi.Int32Value(old.VolumeSize) { + allErrs = append(allErrs, field.Forbidden(fp.Child("VolumeSize"), "VolumeSize cannot be changed")) + } + + if fi.StringValue(obj.KmsKeyId) != fi.StringValue(old.KmsKeyId) { + allErrs = append(allErrs, field.Forbidden(fp.Child("KmsKeyId"), "KmsKeyId cannot be changed")) + } + + if fi.BoolValue(obj.EncryptedVolume) != fi.BoolValue(old.EncryptedVolume) { + allErrs = append(allErrs, field.Forbidden(fp.Child("EncryptedVolume"), "EncryptedVolume cannot be changed")) + } + + return allErrs +} diff --git a/pkg/apiserver/registry/cluster/strategy.go b/pkg/apiserver/registry/cluster/strategy.go index 0b1994f94f..7bb2bd29ad 100644 --- a/pkg/apiserver/registry/cluster/strategy.go +++ b/pkg/apiserver/registry/cluster/strategy.go @@ -28,7 +28,9 @@ import ( "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" + "github.com/golang/glog" "k8s.io/kops/pkg/apis/kops" + "k8s.io/kops/pkg/apis/kops/validation" ) type clusterStrategy struct { @@ -65,8 +67,9 @@ func (clusterStrategy) Canonicalize(obj runtime.Object) { } func (clusterStrategy) ValidateUpdate(ctx genericapirequest.Context, obj, old runtime.Object) field.ErrorList { - return field.ErrorList{} - // return validation.ValidateServiceInjectionUpdate(obj.(*serviceinjection.ServiceInjection), old.(*serviceinjection.ServiceInjection)) + glog.Warningf("Performing cluster update without status validation") + var status *kops.ClusterStatus + return validation.ValidateClusterUpdate(obj.(*kops.Cluster), status, old.(*kops.Cluster)) } func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { diff --git a/pkg/client/simple/clientset.go b/pkg/client/simple/clientset.go index aa294ed766..c097ea692f 100644 --- a/pkg/client/simple/clientset.go +++ b/pkg/client/simple/clientset.go @@ -17,8 +17,10 @@ limitations under the License. package simple import ( + "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kops/pkg/apis/kops" + "k8s.io/kops/pkg/apis/kops/validation" kopsinternalversion "k8s.io/kops/pkg/client/clientset_generated/clientset/typed/kops/internalversion" "k8s.io/kops/util/pkg/vfs" "net/url" @@ -33,7 +35,7 @@ type Clientset interface { CreateCluster(cluster *kops.Cluster) (*kops.Cluster, error) // UpdateCluster updates a cluster - UpdateCluster(cluster *kops.Cluster) (*kops.Cluster, error) + UpdateCluster(cluster *kops.Cluster, status *kops.ClusterStatus) (*kops.Cluster, error) // ListClusters returns all clusters ListClusters(options metav1.ListOptions) (*kops.ClusterList, error) @@ -73,7 +75,16 @@ func (c *RESTClientset) CreateCluster(cluster *kops.Cluster) (*kops.Cluster, err } // UpdateCluster implements the UpdateCluster method of Clientset for a kubernetes-API state store -func (c *RESTClientset) UpdateCluster(cluster *kops.Cluster) (*kops.Cluster, error) { +func (c *RESTClientset) UpdateCluster(cluster *kops.Cluster, status *kops.ClusterStatus) (*kops.Cluster, error) { + glog.Warningf("validating cluster update client side; needs to move to server") + old, err := c.GetCluster(cluster.Name) + if err != nil { + return nil, err + } + if err := validation.ValidateClusterUpdate(cluster, status, old).ToAggregate(); err != nil { + return nil, err + } + namespace := restNamespaceForClusterName(cluster.Name) return c.KopsClient.Clusters(namespace).Update(cluster) } diff --git a/pkg/client/simple/vfsclientset/clientset.go b/pkg/client/simple/vfsclientset/clientset.go index b1877f3f6e..0512daa5c0 100644 --- a/pkg/client/simple/vfsclientset/clientset.go +++ b/pkg/client/simple/vfsclientset/clientset.go @@ -40,8 +40,8 @@ func (c *VFSClientset) GetCluster(name string) (*kops.Cluster, error) { } // UpdateCluster implements the UpdateCluster method of simple.Clientset for a VFS-backed state store -func (c *VFSClientset) UpdateCluster(cluster *kops.Cluster) (*kops.Cluster, error) { - return c.clusters().Update(cluster) +func (c *VFSClientset) UpdateCluster(cluster *kops.Cluster, status *kops.ClusterStatus) (*kops.Cluster, error) { + return c.clusters().Update(cluster, status) } // CreateCluster implements the CreateCluster method of simple.Clientset for a VFS-backed state store diff --git a/pkg/client/simple/vfsclientset/cluster.go b/pkg/client/simple/vfsclientset/cluster.go index 04249858f9..f9327a61f0 100644 --- a/pkg/client/simple/vfsclientset/cluster.go +++ b/pkg/client/simple/vfsclientset/cluster.go @@ -111,15 +111,19 @@ func (r *ClusterVFS) Create(c *api.Cluster) (*api.Cluster, error) { return c, nil } -func (r *ClusterVFS) Update(c *api.Cluster) (*api.Cluster, error) { - err := validation.ValidateCluster(c, false) +func (r *ClusterVFS) Update(c *api.Cluster, status *api.ClusterStatus) (*api.Cluster, error) { + clusterName := c.ObjectMeta.Name + if clusterName == "" { + return nil, field.Required(field.NewPath("Name"), "clusterName is required") + } + + old, err := r.Get(clusterName, metav1.GetOptions{}) if err != nil { return nil, err } - clusterName := c.ObjectMeta.Name - if clusterName == "" { - return nil, field.Required(field.NewPath("Name"), "clusterName is required") + if err := validation.ValidateClusterUpdate(c, status, old).ToAggregate(); err != nil { + return nil, err } if err := r.writeConfig(r.basePath.Join(clusterName, registry.PathCluster), c, vfs.WriteOptionOnlyIfExists); err != nil { diff --git a/protokube/pkg/etcd/cluster_spec.go b/protokube/pkg/etcd/cluster_spec.go new file mode 100644 index 0000000000..f351525867 --- /dev/null +++ b/protokube/pkg/etcd/cluster_spec.go @@ -0,0 +1,68 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "fmt" + "strings" +) + +// EtcdClusterSpec is configuration for the etcd cluster +type EtcdClusterSpec struct { + // ClusterKey is the initial cluster key + ClusterKey string `json:"clusterKey,omitempty"` + // NodeName is my nodename in the cluster + NodeName string `json:"nodeName,omitempty"` + // NodeNames is a collection of node members in the cluster + NodeNames []string `json:"nodeNames,omitempty"` +} + +// String returns a string representation of the EtcdClusterSpec +func (e *EtcdClusterSpec) String() string { + return DebugString(e) +} + +// ParseEtcdClusterSpec parses a tag on a volume that encodes an etcd cluster role +// The format is "/", e.g. "node1/node1,node2,node3" +func ParseEtcdClusterSpec(clusterKey, v string) (*EtcdClusterSpec, error) { + v = strings.TrimSpace(v) + + tokens := strings.Split(v, "/") + if len(tokens) != 2 { + return nil, fmt.Errorf("invalid EtcdClusterSpec (expected two tokens): %q", v) + } + + nodeName := tokens[0] + nodeNames := strings.Split(tokens[1], ",") + + found := false + for _, s := range nodeNames { + if s == nodeName { + found = true + } + } + if !found { + return nil, fmt.Errorf("invalid EtcdClusterSpec (member not found in all nodes): %q", v) + } + + c := &EtcdClusterSpec{ + ClusterKey: clusterKey, + NodeName: nodeName, + NodeNames: nodeNames, + } + return c, nil +} diff --git a/protokube/pkg/etcd/utils.go b/protokube/pkg/etcd/utils.go new file mode 100644 index 0000000000..a6648c464b --- /dev/null +++ b/protokube/pkg/etcd/utils.go @@ -0,0 +1,30 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "encoding/json" + "fmt" +) + +func DebugString(o interface{}) string { + b, err := json.Marshal(o) + if err != nil { + return fmt.Sprintf("error marshaling %T: %v", o, err) + } + return string(b) +} diff --git a/protokube/pkg/protokube/aws_volume.go b/protokube/pkg/protokube/aws_volume.go index 2937c0a445..0554be256f 100644 --- a/protokube/pkg/protokube/aws_volume.go +++ b/protokube/pkg/protokube/aws_volume.go @@ -33,6 +33,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" "github.com/golang/glog" + "k8s.io/kops/protokube/pkg/etcd" ) var devices = []string{"/dev/xvdu", "/dev/xvdv", "/dev/xvdx", "/dev/xvdx", "/dev/xvdy", "/dev/xvdz"} @@ -215,7 +216,7 @@ func (a *AWSVolumes) findVolumes(request *ec2.DescribeVolumesInput) ([]*Volume, default: if strings.HasPrefix(k, awsup.TagNameEtcdClusterPrefix) { etcdClusterName := strings.TrimPrefix(k, awsup.TagNameEtcdClusterPrefix) - spec, err := ParseEtcdClusterSpec(etcdClusterName, v) + spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, v) if err != nil { // Fail safe glog.Warningf("error parsing etcd cluster tag %q on volume %q; skipping volume: %v", v, volumeID, err) diff --git a/protokube/pkg/protokube/etcd_cluster.go b/protokube/pkg/protokube/etcd_cluster.go index 5565854e53..44508baede 100644 --- a/protokube/pkg/protokube/etcd_cluster.go +++ b/protokube/pkg/protokube/etcd_cluster.go @@ -28,18 +28,9 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "github.com/golang/glog" + "k8s.io/kops/protokube/pkg/etcd" ) -// EtcdClusterSpec is configuration for the etcd cluster -type EtcdClusterSpec struct { - // ClusterKey is the initial cluster key - ClusterKey string `json:"clusterKey,omitempty"` - // NodeName is my nodename in the cluster - NodeName string `json:"nodeName,omitempty"` - // NodeNames is a collection of node members in the cluster - NodeNames []string `json:"nodeNames,omitempty"` -} - // EtcdCluster is the configuration for the etcd cluster type EtcdCluster struct { // ClientPort is the incoming ports for client @@ -67,7 +58,7 @@ type EtcdCluster struct { // ProxyMode indicates we are running in proxy mode ProxyMode bool // Spec is the specification found from the volumes - Spec *EtcdClusterSpec + Spec *etcd.EtcdClusterSpec // VolumeMountPath is the mount path VolumeMountPath string // TLSCA is the path to a client ca for etcd clients @@ -94,12 +85,12 @@ type EtcdNode struct { type EtcdController struct { kubeBoot *KubeBoot volume *Volume - volumeSpec *EtcdClusterSpec + volumeSpec *etcd.EtcdClusterSpec cluster *EtcdCluster } // newEtcdController creates and returns a new etcd controller -func newEtcdController(kubeBoot *KubeBoot, v *Volume, spec *EtcdClusterSpec) (*EtcdController, error) { +func newEtcdController(kubeBoot *KubeBoot, v *Volume, spec *etcd.EtcdClusterSpec) (*EtcdController, error) { k := &EtcdController{ kubeBoot: kubeBoot, } @@ -287,10 +278,6 @@ func (c *EtcdCluster) configure(k *KubeBoot) error { return nil } -func (e *EtcdClusterSpec) String() string { - return DebugString(e) -} - // isTLS indicates the etcd cluster should be configured to use tls func (c *EtcdCluster) isTLS() bool { return notEmpty(c.TLSCert) && notEmpty(c.TLSKey) diff --git a/protokube/pkg/protokube/gce_volume.go b/protokube/pkg/protokube/gce_volume.go index f65d4e0225..f39c57aaeb 100644 --- a/protokube/pkg/protokube/gce_volume.go +++ b/protokube/pkg/protokube/gce_volume.go @@ -26,6 +26,7 @@ import ( "golang.org/x/net/context" "golang.org/x/oauth2/google" compute "google.golang.org/api/compute/v0.beta" + "k8s.io/kops/protokube/pkg/etcd" "k8s.io/kops/protokube/pkg/gossip" gossipgce "k8s.io/kops/protokube/pkg/gossip/gce" "k8s.io/kops/upup/pkg/fi/cloudup/gce" @@ -208,7 +209,7 @@ func (v *GCEVolumes) buildGCEVolume(d *compute.Disk) (*Volume, error) { if err != nil { return nil, fmt.Errorf("Error decoding GCE label: %s=%q", k, v) } - spec, err := ParseEtcdClusterSpec(etcdClusterName, value) + spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, value) if err != nil { return nil, fmt.Errorf("error parsing etcd cluster label %q on volume %q: %v", value, volumeName, err) } diff --git a/protokube/pkg/protokube/volume_mounter_test.go b/protokube/pkg/protokube/volume_mounter_test.go index 5d2ff8bd33..d61f9e874a 100644 --- a/protokube/pkg/protokube/volume_mounter_test.go +++ b/protokube/pkg/protokube/volume_mounter_test.go @@ -17,6 +17,7 @@ limitations under the License. package protokube import ( + "k8s.io/kops/protokube/pkg/etcd" "sort" "strings" "testing" @@ -44,13 +45,13 @@ func Test_VolumeSort_ByEtcdClusterName(t *testing.T) { t.Fatalf("Fail at sort 1: %v", getIDs(volumes)) } - v2.Info.EtcdClusters = append(v2.Info.EtcdClusters, &EtcdClusterSpec{ClusterKey: "events"}) + v2.Info.EtcdClusters = append(v2.Info.EtcdClusters, &etcd.EtcdClusterSpec{ClusterKey: "events"}) sort.Stable(ByEtcdClusterName(volumes)) if getIDs(volumes) != "2,1,3" { t.Fatalf("Fail at sort 2: %v", getIDs(volumes)) } - v3.Info.EtcdClusters = append(v3.Info.EtcdClusters, &EtcdClusterSpec{ClusterKey: "main"}) + v3.Info.EtcdClusters = append(v3.Info.EtcdClusters, &etcd.EtcdClusterSpec{ClusterKey: "main"}) sort.Stable(ByEtcdClusterName(volumes)) if getIDs(volumes) != "3,2,1" { t.Fatalf("Fail at sort 3: %v", getIDs(volumes)) @@ -75,7 +76,7 @@ func Test_Mount_Volumes(t *testing.T) { &Volume{ LocalDevice: "/dev/xvdb", Info: VolumeInfo{ - EtcdClusters: []*EtcdClusterSpec{ + EtcdClusters: []*etcd.EtcdClusterSpec{ { ClusterKey: "foo", NodeName: "bar", diff --git a/protokube/pkg/protokube/volumes.go b/protokube/pkg/protokube/volumes.go index 332dc90eb1..d1b16b24c5 100644 --- a/protokube/pkg/protokube/volumes.go +++ b/protokube/pkg/protokube/volumes.go @@ -17,8 +17,7 @@ limitations under the License. package protokube import ( - "fmt" - "strings" + "k8s.io/kops/protokube/pkg/etcd" ) type Volumes interface { @@ -54,40 +53,9 @@ type VolumeInfo struct { Description string //MasterID int // TODO: Maybe the events cluster can just be a PetSet - do we need it for boot? - EtcdClusters []*EtcdClusterSpec + EtcdClusters []*etcd.EtcdClusterSpec } func (v *VolumeInfo) String() string { return DebugString(v) } - -// Parses a tag on a volume that encodes an etcd cluster role -// The format is "/", e.g. "node1/node1,node2,node3" -func ParseEtcdClusterSpec(clusterKey, v string) (*EtcdClusterSpec, error) { - v = strings.TrimSpace(v) - - tokens := strings.Split(v, "/") - if len(tokens) != 2 { - return nil, fmt.Errorf("invalid EtcdClusterSpec (expected two tokens): %q", v) - } - - nodeName := tokens[0] - nodeNames := strings.Split(tokens[1], ",") - - found := false - for _, s := range nodeNames { - if s == nodeName { - found = true - } - } - if !found { - return nil, fmt.Errorf("invalid EtcdClusterSpec (member not found in all nodes): %q", v) - } - - c := &EtcdClusterSpec{ - ClusterKey: clusterKey, - NodeName: nodeName, - NodeNames: nodeNames, - } - return c, nil -} diff --git a/protokube/pkg/protokube/vsphere_volume.go b/protokube/pkg/protokube/vsphere_volume.go index 0103199c75..e2379d2317 100644 --- a/protokube/pkg/protokube/vsphere_volume.go +++ b/protokube/pkg/protokube/vsphere_volume.go @@ -16,13 +16,14 @@ limitations under the License. package protokube -// vspehre_volume houses vSphere volume and implements relevant interfaces. +// vsphere_volume houses vSphere volume and implements relevant interfaces. import ( "errors" "fmt" "github.com/golang/glog" "io/ioutil" + etcdmanager "k8s.io/kops/protokube/pkg/etcd" "k8s.io/kops/upup/pkg/fi/cloudup/vsphere" "net" "os/exec" @@ -78,7 +79,7 @@ func (v *VSphereVolumes) FindVolumes() ([]*Volume, error) { }, } - etcdSpec := &EtcdClusterSpec{ + etcdSpec := &etcdmanager.EtcdClusterSpec{ ClusterKey: etcd.EtcdClusterName, NodeName: etcd.EtcdNodeName, } @@ -88,7 +89,7 @@ func (v *VSphereVolumes) FindVolumes() ([]*Volume, error) { nodeNames = append(nodeNames, member.Name) } etcdSpec.NodeNames = nodeNames - vol.Info.EtcdClusters = []*EtcdClusterSpec{etcdSpec} + vol.Info.EtcdClusters = []*etcdmanager.EtcdClusterSpec{etcdSpec} volumes = append(volumes, vol) } glog.V(4).Infof("Found volumes: %v", volumes) diff --git a/upup/pkg/fi/cloudup/awsup/aws_cloud.go b/upup/pkg/fi/cloudup/awsup/aws_cloud.go index 19036090c9..0462b8a8ff 100644 --- a/upup/pkg/fi/cloudup/awsup/aws_cloud.go +++ b/upup/pkg/fi/cloudup/awsup/aws_cloud.go @@ -125,6 +125,9 @@ type AWSCloud interface { // DefaultInstanceType determines a suitable instance type for the specified instance group DefaultInstanceType(cluster *kops.Cluster, ig *kops.InstanceGroup) (string, error) + + // FindClusterStatus gets the status of the cluster as it exists in AWS, inferred from volumes + FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) } type awsCloudImplementation struct { diff --git a/upup/pkg/fi/cloudup/awsup/status.go b/upup/pkg/fi/cloudup/awsup/status.go new file mode 100644 index 0000000000..a785959a1c --- /dev/null +++ b/upup/pkg/fi/cloudup/awsup/status.go @@ -0,0 +1,122 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package awsup + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/golang/glog" + "k8s.io/kops/pkg/apis/kops" + "k8s.io/kops/protokube/pkg/etcd" + "k8s.io/kops/upup/pkg/fi" + "strings" +) + +// FindClusterStatus discovers the status of the cluster, by looking for the tagged etcd volumes +func (c *awsCloudImplementation) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) { + etcdStatus, err := findEtcdStatus(c, cluster) + if err != nil { + return nil, err + } + status := &kops.ClusterStatus{ + EtcdClusters: etcdStatus, + } + glog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status)) + return status, nil +} + +// FindEtcdStatus discovers the status of the cluster, by looking for the tagged etcd volumes +func (c *MockAWSCloud) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) { + etcdStatus, err := findEtcdStatus(c, cluster) + if err != nil { + return nil, err + } + return &kops.ClusterStatus{ + EtcdClusters: etcdStatus, + }, nil +} + +// findEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes +func findEtcdStatus(c AWSCloud, cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) { + glog.V(2).Infof("Querying AWS for etcd volumes") + statusMap := make(map[string]*kops.EtcdClusterStatus) + + tags := c.Tags() + + request := &ec2.DescribeVolumesInput{} + for k, v := range tags { + request.Filters = append(request.Filters, NewEC2Filter("tag:"+k, v)) + } + + var volumes []*ec2.Volume + glog.V(2).Infof("Listing EC2 Volumes") + err := c.EC2().DescribeVolumesPages(request, func(p *ec2.DescribeVolumesOutput, lastPage bool) bool { + for _, volume := range p.Volumes { + volumes = append(volumes, volume) + } + return true + }) + if err != nil { + return nil, fmt.Errorf("error describing volumes: %v", err) + } + + for _, volume := range volumes { + volumeID := aws.StringValue(volume.VolumeId) + + etcdClusterName := "" + var etcdClusterSpec *etcd.EtcdClusterSpec + master := false + for _, tag := range volume.Tags { + k := aws.StringValue(tag.Key) + v := aws.StringValue(tag.Value) + + if strings.HasPrefix(k, TagNameEtcdClusterPrefix) { + etcdClusterName := strings.TrimPrefix(k, TagNameEtcdClusterPrefix) + etcdClusterSpec, err = etcd.ParseEtcdClusterSpec(etcdClusterName, v) + if err != nil { + return nil, fmt.Errorf("error parsing etcd cluster tag %q on volume %q: %v", v, volumeID, err) + } + } else if k == TagNameRolePrefix+TagRoleMaster { + master = true + } + } + if etcdClusterName == "" || etcdClusterSpec == nil || !master { + continue + } + + status := statusMap[etcdClusterName] + if status == nil { + status = &kops.EtcdClusterStatus{ + Name: etcdClusterName, + } + statusMap[etcdClusterName] = status + } + + memberName := etcdClusterSpec.NodeName + status.Members = append(status.Members, &kops.EtcdMemberStatus{ + Name: memberName, + VolumeId: aws.StringValue(volume.VolumeId), + }) + } + + var status []kops.EtcdClusterStatus + for _, v := range statusMap { + status = append(status, *v) + } + return status, nil +} diff --git a/upup/pkg/fi/cloudup/gce/gce_cloud.go b/upup/pkg/fi/cloudup/gce/gce_cloud.go index b081ea423b..045c139003 100644 --- a/upup/pkg/fi/cloudup/gce/gce_cloud.go +++ b/upup/pkg/fi/cloudup/gce/gce_cloud.go @@ -39,6 +39,9 @@ type GCECloud interface { WaitForOp(op *compute.Operation) error GetApiIngressStatus(cluster *kops.Cluster) ([]kops.ApiIngressStatus, error) Labels() map[string]string + + // FindClusterStatus gets the status of the cluster as it exists in GCE, inferred from volumes + FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) } type gceCloudImplementation struct { diff --git a/upup/pkg/fi/cloudup/gce/mock_gce_cloud.go b/upup/pkg/fi/cloudup/gce/mock_gce_cloud.go index 702dece269..f0c124c5e0 100644 --- a/upup/pkg/fi/cloudup/gce/mock_gce_cloud.go +++ b/upup/pkg/fi/cloudup/gce/mock_gce_cloud.go @@ -89,6 +89,11 @@ func (c *mockGCECloud) WaitForOp(op *compute.Operation) error { return fmt.Errorf("mockGCECloud::WaitForOp not implemented") } +// FindClusterStatus implements GCECloud::FindClusterStatus +func (c *mockGCECloud) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) { + return nil, fmt.Errorf("mockGCECloud::FindClusterStatus not implemented") +} + // GetApiIngressStatus implements GCECloud::GetApiIngressStatus func (c *mockGCECloud) GetApiIngressStatus(cluster *kops.Cluster) ([]kops.ApiIngressStatus, error) { return nil, fmt.Errorf("mockGCECloud::GetApiIngressStatus not implemented") diff --git a/upup/pkg/fi/cloudup/gce/status.go b/upup/pkg/fi/cloudup/gce/status.go new file mode 100644 index 0000000000..dfc1ea69b1 --- /dev/null +++ b/upup/pkg/fi/cloudup/gce/status.go @@ -0,0 +1,149 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gce + +import ( + "context" + "fmt" + "github.com/golang/glog" + compute "google.golang.org/api/compute/v0.beta" + "k8s.io/kops/pkg/apis/kops" + "k8s.io/kops/protokube/pkg/etcd" + "k8s.io/kops/upup/pkg/fi" + "strings" +) + +func (c *gceCloudImplementation) allZones() ([]string, error) { + var zones []string + + // TODO: use PageToken to list all not just the first 500 + ctx := context.Background() + err := c.compute.Zones.List(c.project).Pages(ctx, func(page *compute.ZoneList) error { + for _, zone := range page.Items { + regionName := LastComponent(zone.Region) + if regionName == c.region { + zones = append(zones, zone.Name) + } + } + + return nil + }) + if err != nil { + return nil, fmt.Errorf("error listing zones: %v", err) + } + + return zones, nil +} + +// FindClusterStatus discovers the status of the cluster, by inspecting the cloud objects +func (c *gceCloudImplementation) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) { + etcdClusters, err := c.findEtcdStatus(cluster) + if err != nil { + return nil, err + } + + status := &kops.ClusterStatus{ + EtcdClusters: etcdClusters, + } + glog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status)) + return status, nil +} + +// FindEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes +func (c *gceCloudImplementation) findEtcdStatus(cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) { + statusMap := make(map[string]*kops.EtcdClusterStatus) + + labels := c.Labels() + + zones, err := c.allZones() + if err != nil { + return nil, err + } + + var disks []*compute.Disk + + // TODO: Filter disks query by Label? + ctx := context.Background() + for _, zone := range zones { + err := c.compute.Disks.List(c.project, zone).Pages(ctx, func(page *compute.DiskList) error { + for _, d := range page.Items { + glog.V(4).Infof("Found disk %q with labels %v", d.Name, d.Labels) + + match := true + for k, v := range labels { + if d.Labels[k] != v { + match = false + } + } + if match { + disks = append(disks, d) + } + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("error describing volumes: %v", err) + } + } + + for _, disk := range disks { + etcdClusterName := "" + var etcdClusterSpec *etcd.EtcdClusterSpec + master := false + for k, v := range disk.Labels { + if strings.HasPrefix(k, GceLabelNameEtcdClusterPrefix) { + etcdClusterName = strings.TrimPrefix(k, GceLabelNameEtcdClusterPrefix) + value, err := DecodeGCELabel(v) + if err != nil { + return nil, fmt.Errorf("unexpected etcd label on volume %q: %s=%s", disk.Name, k, v) + } + spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, value) + if err != nil { + return nil, fmt.Errorf("error parsing etcd cluster label %q on volume %q: %v", value, disk.Name, err) + } + etcdClusterSpec = spec + } else if strings.HasPrefix(k, GceLabelNameRolePrefix) { + roleName := strings.TrimPrefix(k, GceLabelNameRolePrefix) + if roleName == "master" { + master = true + } + } + } + if etcdClusterName == "" || etcdClusterSpec == nil || !master { + continue + } + + status := statusMap[etcdClusterName] + if status == nil { + status = &kops.EtcdClusterStatus{ + Name: etcdClusterName, + } + statusMap[etcdClusterName] = status + } + + status.Members = append(status.Members, &kops.EtcdMemberStatus{ + Name: etcdClusterSpec.NodeName, + VolumeId: disk.Name, + }) + } + + var status []kops.EtcdClusterStatus + for _, v := range statusMap { + status = append(status, *v) + } + return status, nil +}