Add function to get etcd status

This will allow us to permit changing the etcd configuration only if
etcd has not yet been configured.
This commit is contained in:
Justin Santa Barbara 2017-08-05 12:28:03 -04:00
parent 6a238539e0
commit e730e043ab
24 changed files with 646 additions and 76 deletions

View File

@ -236,8 +236,15 @@ func RunEditCluster(f *util.Factory, cmd *cobra.Command, args []string, out io.W
return preservedFile(err, file, out)
}
// Retrieve the current status of the cluster. This will eventually be part of the cluster object.
statusDiscovery := &cloudDiscoveryStatusStore{}
status, err := statusDiscovery.FindClusterStatus(oldCluster)
if err != nil {
return err
}
// Note we perform as much validation as we can, before writing a bad config
_, err = clientset.UpdateCluster(newCluster)
_, err = clientset.UpdateCluster(newCluster, status)
if err != nil {
return preservedFile(err, file, out)
}

View File

@ -115,10 +115,19 @@ func RunReplace(f *util.Factory, cmd *cobra.Command, out io.Writer, c *replaceOp
}
case *kopsapi.Cluster:
_, err = clientset.UpdateCluster(v)
{
// Retrieve the current status of the cluster. This will eventually be part of the cluster object.
statusDiscovery := &cloudDiscoveryStatusStore{}
status, err := statusDiscovery.FindClusterStatus(v)
if err != nil {
return err
}
_, err = clientset.UpdateCluster(v, status)
if err != nil {
return fmt.Errorf("error replacing cluster: %v", err)
}
}
case *kopsapi.InstanceGroup:
clusterName := v.ObjectMeta.Labels[kopsapi.LabelClusterName]

View File

@ -68,3 +68,21 @@ func (s *cloudDiscoveryStatusStore) GetApiIngressStatus(cluster *kops.Cluster) (
return nil, fmt.Errorf("API Ingress Status not implemented for %T", cloud)
}
// FindClusterStatus discovers the status of the cluster, by inspecting the cloud objects
func (s *cloudDiscoveryStatusStore) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
cloud, err := cloudup.BuildCloud(cluster)
if err != nil {
return nil, err
}
if gceCloud, ok := cloud.(gce.GCECloud); ok {
return gceCloud.FindClusterStatus(cluster)
}
if awsCloud, ok := cloud.(awsup.AWSCloud); ok {
return awsCloud.FindClusterStatus(cluster)
}
return nil, fmt.Errorf("Etcd Status not implemented for %T", cloud)
}

View File

@ -300,8 +300,15 @@ func (c *UpgradeClusterCmd) Run(args []string) error {
return err
}
// Retrieve the current status of the cluster. This will eventually be part of the cluster object.
statusDiscovery := &cloudDiscoveryStatusStore{}
status, err := statusDiscovery.FindClusterStatus(cluster)
if err != nil {
return err
}
// Note we perform as much validation as we can, before writing a bad config
_, err = clientset.UpdateCluster(cluster)
_, err = clientset.UpdateCluster(cluster, status)
if err != nil {
return err
}

View File

@ -86,6 +86,7 @@ k8s.io/kops/pkg/util/stringorslice
k8s.io/kops/pkg/util/templater
k8s.io/kops/pkg/validation
k8s.io/kops/protokube/cmd/protokube
k8s.io/kops/protokube/pkg/etcd
k8s.io/kops/protokube/pkg/gossip
k8s.io/kops/protokube/pkg/gossip/aws
k8s.io/kops/protokube/pkg/gossip/dns

View File

@ -16,13 +16,39 @@ limitations under the License.
package kops
import "github.com/golang/glog"
import (
"github.com/golang/glog"
)
// StatusStore abstracts the key status functions; and lets us introduce status gradually
type StatusStore interface {
// FindClusterStatus discovers the status of the cluster, by inspecting the cloud objects
FindClusterStatus(cluster *Cluster) (*ClusterStatus, error)
GetApiIngressStatus(cluster *Cluster) ([]ApiIngressStatus, error)
}
type ClusterStatus struct {
// EtcdClusters stores the status for each cluster
EtcdClusters []EtcdClusterStatus `json:"etcdClusters,omitempty"`
}
// EtcdStatus represents the status of etcd: because etcd only allows limited reconfiguration, we have to block changes once etcd has been initialized.
type EtcdClusterStatus struct {
// Name is the name of the etcd cluster (main, events etc)
Name string `json:"name,omitempty"`
// EtcdMember stores the configurations for each member of the cluster (including the data volume)
Members []*EtcdMemberStatus `json:"etcdMembers,omitempty"`
}
type EtcdMemberStatus struct {
// Name is the name of the member within the etcd cluster
Name string `json:"name,omitempty"`
// volumeId is the id of the cloud volume (e.g. the AWS volume id)
VolumeId string `json:"volumeId,omitempty"`
}
// ApiIngress represents the status of an ingress point:
// traffic intended for the service should be sent to an ingress point.
type ApiIngressStatus struct {
@ -44,6 +70,12 @@ type NoopStatusStore struct {
var _ StatusStore = &NoopStatusStore{}
// FindClusterStatus discovers the status of the cluster, by inspecting the cloud objects
func (s *NoopStatusStore) FindClusterStatus(cluster *Cluster) (*ClusterStatus, error) {
glog.Warningf("FindClusterStatus called on NoopStore")
return nil, nil
}
func (s *NoopStatusStore) GetApiIngressStatus(cluster *Cluster) ([]ApiIngressStatus, error) {
glog.Warningf("GetApiIngressStatus called on NoopStore")
return nil, nil

View File

@ -0,0 +1,139 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validation
import (
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/upup/pkg/fi"
)
func ValidateClusterUpdate(obj *kops.Cluster, status *kops.ClusterStatus, old *kops.Cluster) field.ErrorList {
allErrs := field.ErrorList{}
if err := ValidateCluster(obj, false); err != nil {
allErrs = append(allErrs, err)
}
// Validate etcd cluster changes
{
newClusters := make(map[string]*kops.EtcdClusterSpec)
for _, etcdCluster := range obj.Spec.EtcdClusters {
newClusters[etcdCluster.Name] = etcdCluster
}
oldClusters := make(map[string]*kops.EtcdClusterSpec)
for _, etcdCluster := range old.Spec.EtcdClusters {
oldClusters[etcdCluster.Name] = etcdCluster
}
for k, newCluster := range newClusters {
fp := field.NewPath("Spec", "EtcdClusters").Key(k)
oldCluster := oldClusters[k]
allErrs = append(allErrs, validateEtcdClusterUpdate(fp, newCluster, status, oldCluster)...)
}
for k := range oldClusters {
newCluster := newClusters[k]
if newCluster == nil {
fp := field.NewPath("Spec", "EtcdClusters").Key(k)
allErrs = append(allErrs, field.Forbidden(fp, "EtcdClusters cannot be removed"))
}
}
}
return allErrs
}
func validateEtcdClusterUpdate(fp *field.Path, obj *kops.EtcdClusterSpec, status *kops.ClusterStatus, old *kops.EtcdClusterSpec) field.ErrorList {
allErrs := field.ErrorList{}
if obj.Name != old.Name {
allErrs = append(allErrs, field.Forbidden(fp.Child("Name"), "Name cannot be changed"))
}
var etcdClusterStatus *kops.EtcdClusterStatus
if status != nil {
for i := range status.EtcdClusters {
etcdCluster := &status.EtcdClusters[i]
if etcdCluster.Name == obj.Name {
etcdClusterStatus = etcdCluster
}
}
}
// If the etcd cluster has been created (i.e. if we have status) then we can't support some changes
if etcdClusterStatus != nil {
newMembers := make(map[string]*kops.EtcdMemberSpec)
for _, member := range obj.Members {
newMembers[member.Name] = member
}
oldMembers := make(map[string]*kops.EtcdMemberSpec)
for _, member := range old.Members {
oldMembers[member.Name] = member
}
for k, newMember := range newMembers {
fp := fp.Child("Members").Key(k)
oldMember := oldMembers[k]
if oldMember == nil {
allErrs = append(allErrs, field.Forbidden(fp, "EtcdCluster members cannot be added"))
} else {
allErrs = append(allErrs, validateEtcdMemberUpdate(fp, newMember, etcdClusterStatus, oldMember)...)
}
}
for k := range oldMembers {
newCluster := newMembers[k]
if newCluster == nil {
fp := fp.Child("Members").Key(k)
allErrs = append(allErrs, field.Forbidden(fp, "EtcdCluster members cannot be removed"))
}
}
}
return allErrs
}
func validateEtcdMemberUpdate(fp *field.Path, obj *kops.EtcdMemberSpec, status *kops.EtcdClusterStatus, old *kops.EtcdMemberSpec) field.ErrorList {
allErrs := field.ErrorList{}
if obj.Name != old.Name {
allErrs = append(allErrs, field.Forbidden(fp.Child("Name"), "Name cannot be changed"))
}
if fi.StringValue(obj.InstanceGroup) != fi.StringValue(old.InstanceGroup) {
allErrs = append(allErrs, field.Forbidden(fp.Child("InstanceGroup"), "InstanceGroup cannot be changed"))
}
if fi.StringValue(obj.VolumeType) != fi.StringValue(old.VolumeType) {
allErrs = append(allErrs, field.Forbidden(fp.Child("VolumeType"), "VolumeType cannot be changed"))
}
if fi.Int32Value(obj.VolumeSize) != fi.Int32Value(old.VolumeSize) {
allErrs = append(allErrs, field.Forbidden(fp.Child("VolumeSize"), "VolumeSize cannot be changed"))
}
if fi.StringValue(obj.KmsKeyId) != fi.StringValue(old.KmsKeyId) {
allErrs = append(allErrs, field.Forbidden(fp.Child("KmsKeyId"), "KmsKeyId cannot be changed"))
}
if fi.BoolValue(obj.EncryptedVolume) != fi.BoolValue(old.EncryptedVolume) {
allErrs = append(allErrs, field.Forbidden(fp.Child("EncryptedVolume"), "EncryptedVolume cannot be changed"))
}
return allErrs
}

View File

@ -28,7 +28,9 @@ import (
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/names"
"github.com/golang/glog"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/apis/kops/validation"
)
type clusterStrategy struct {
@ -65,8 +67,9 @@ func (clusterStrategy) Canonicalize(obj runtime.Object) {
}
func (clusterStrategy) ValidateUpdate(ctx genericapirequest.Context, obj, old runtime.Object) field.ErrorList {
return field.ErrorList{}
// return validation.ValidateServiceInjectionUpdate(obj.(*serviceinjection.ServiceInjection), old.(*serviceinjection.ServiceInjection))
glog.Warningf("Performing cluster update without status validation")
var status *kops.ClusterStatus
return validation.ValidateClusterUpdate(obj.(*kops.Cluster), status, old.(*kops.Cluster))
}
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) {

View File

@ -17,8 +17,10 @@ limitations under the License.
package simple
import (
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/apis/kops/validation"
kopsinternalversion "k8s.io/kops/pkg/client/clientset_generated/clientset/typed/kops/internalversion"
"k8s.io/kops/util/pkg/vfs"
"net/url"
@ -33,7 +35,7 @@ type Clientset interface {
CreateCluster(cluster *kops.Cluster) (*kops.Cluster, error)
// UpdateCluster updates a cluster
UpdateCluster(cluster *kops.Cluster) (*kops.Cluster, error)
UpdateCluster(cluster *kops.Cluster, status *kops.ClusterStatus) (*kops.Cluster, error)
// ListClusters returns all clusters
ListClusters(options metav1.ListOptions) (*kops.ClusterList, error)
@ -73,7 +75,16 @@ func (c *RESTClientset) CreateCluster(cluster *kops.Cluster) (*kops.Cluster, err
}
// UpdateCluster implements the UpdateCluster method of Clientset for a kubernetes-API state store
func (c *RESTClientset) UpdateCluster(cluster *kops.Cluster) (*kops.Cluster, error) {
func (c *RESTClientset) UpdateCluster(cluster *kops.Cluster, status *kops.ClusterStatus) (*kops.Cluster, error) {
glog.Warningf("validating cluster update client side; needs to move to server")
old, err := c.GetCluster(cluster.Name)
if err != nil {
return nil, err
}
if err := validation.ValidateClusterUpdate(cluster, status, old).ToAggregate(); err != nil {
return nil, err
}
namespace := restNamespaceForClusterName(cluster.Name)
return c.KopsClient.Clusters(namespace).Update(cluster)
}

View File

@ -40,8 +40,8 @@ func (c *VFSClientset) GetCluster(name string) (*kops.Cluster, error) {
}
// UpdateCluster implements the UpdateCluster method of simple.Clientset for a VFS-backed state store
func (c *VFSClientset) UpdateCluster(cluster *kops.Cluster) (*kops.Cluster, error) {
return c.clusters().Update(cluster)
func (c *VFSClientset) UpdateCluster(cluster *kops.Cluster, status *kops.ClusterStatus) (*kops.Cluster, error) {
return c.clusters().Update(cluster, status)
}
// CreateCluster implements the CreateCluster method of simple.Clientset for a VFS-backed state store

View File

@ -111,15 +111,19 @@ func (r *ClusterVFS) Create(c *api.Cluster) (*api.Cluster, error) {
return c, nil
}
func (r *ClusterVFS) Update(c *api.Cluster) (*api.Cluster, error) {
err := validation.ValidateCluster(c, false)
func (r *ClusterVFS) Update(c *api.Cluster, status *api.ClusterStatus) (*api.Cluster, error) {
clusterName := c.ObjectMeta.Name
if clusterName == "" {
return nil, field.Required(field.NewPath("Name"), "clusterName is required")
}
old, err := r.Get(clusterName, metav1.GetOptions{})
if err != nil {
return nil, err
}
clusterName := c.ObjectMeta.Name
if clusterName == "" {
return nil, field.Required(field.NewPath("Name"), "clusterName is required")
if err := validation.ValidateClusterUpdate(c, status, old).ToAggregate(); err != nil {
return nil, err
}
if err := r.writeConfig(r.basePath.Join(clusterName, registry.PathCluster), c, vfs.WriteOptionOnlyIfExists); err != nil {

View File

@ -0,0 +1,68 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"fmt"
"strings"
)
// EtcdClusterSpec is configuration for the etcd cluster
type EtcdClusterSpec struct {
// ClusterKey is the initial cluster key
ClusterKey string `json:"clusterKey,omitempty"`
// NodeName is my nodename in the cluster
NodeName string `json:"nodeName,omitempty"`
// NodeNames is a collection of node members in the cluster
NodeNames []string `json:"nodeNames,omitempty"`
}
// String returns a string representation of the EtcdClusterSpec
func (e *EtcdClusterSpec) String() string {
return DebugString(e)
}
// ParseEtcdClusterSpec parses a tag on a volume that encodes an etcd cluster role
// The format is "<myname>/<allnames>", e.g. "node1/node1,node2,node3"
func ParseEtcdClusterSpec(clusterKey, v string) (*EtcdClusterSpec, error) {
v = strings.TrimSpace(v)
tokens := strings.Split(v, "/")
if len(tokens) != 2 {
return nil, fmt.Errorf("invalid EtcdClusterSpec (expected two tokens): %q", v)
}
nodeName := tokens[0]
nodeNames := strings.Split(tokens[1], ",")
found := false
for _, s := range nodeNames {
if s == nodeName {
found = true
}
}
if !found {
return nil, fmt.Errorf("invalid EtcdClusterSpec (member not found in all nodes): %q", v)
}
c := &EtcdClusterSpec{
ClusterKey: clusterKey,
NodeName: nodeName,
NodeNames: nodeNames,
}
return c, nil
}

View File

@ -0,0 +1,30 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"encoding/json"
"fmt"
)
func DebugString(o interface{}) string {
b, err := json.Marshal(o)
if err != nil {
return fmt.Sprintf("error marshaling %T: %v", o, err)
}
return string(b)
}

View File

@ -33,6 +33,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog"
"k8s.io/kops/protokube/pkg/etcd"
)
var devices = []string{"/dev/xvdu", "/dev/xvdv", "/dev/xvdx", "/dev/xvdx", "/dev/xvdy", "/dev/xvdz"}
@ -215,7 +216,7 @@ func (a *AWSVolumes) findVolumes(request *ec2.DescribeVolumesInput) ([]*Volume,
default:
if strings.HasPrefix(k, awsup.TagNameEtcdClusterPrefix) {
etcdClusterName := strings.TrimPrefix(k, awsup.TagNameEtcdClusterPrefix)
spec, err := ParseEtcdClusterSpec(etcdClusterName, v)
spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, v)
if err != nil {
// Fail safe
glog.Warningf("error parsing etcd cluster tag %q on volume %q; skipping volume: %v", v, volumeID, err)

View File

@ -28,18 +28,9 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"github.com/golang/glog"
"k8s.io/kops/protokube/pkg/etcd"
)
// EtcdClusterSpec is configuration for the etcd cluster
type EtcdClusterSpec struct {
// ClusterKey is the initial cluster key
ClusterKey string `json:"clusterKey,omitempty"`
// NodeName is my nodename in the cluster
NodeName string `json:"nodeName,omitempty"`
// NodeNames is a collection of node members in the cluster
NodeNames []string `json:"nodeNames,omitempty"`
}
// EtcdCluster is the configuration for the etcd cluster
type EtcdCluster struct {
// ClientPort is the incoming ports for client
@ -67,7 +58,7 @@ type EtcdCluster struct {
// ProxyMode indicates we are running in proxy mode
ProxyMode bool
// Spec is the specification found from the volumes
Spec *EtcdClusterSpec
Spec *etcd.EtcdClusterSpec
// VolumeMountPath is the mount path
VolumeMountPath string
// TLSCA is the path to a client ca for etcd clients
@ -94,12 +85,12 @@ type EtcdNode struct {
type EtcdController struct {
kubeBoot *KubeBoot
volume *Volume
volumeSpec *EtcdClusterSpec
volumeSpec *etcd.EtcdClusterSpec
cluster *EtcdCluster
}
// newEtcdController creates and returns a new etcd controller
func newEtcdController(kubeBoot *KubeBoot, v *Volume, spec *EtcdClusterSpec) (*EtcdController, error) {
func newEtcdController(kubeBoot *KubeBoot, v *Volume, spec *etcd.EtcdClusterSpec) (*EtcdController, error) {
k := &EtcdController{
kubeBoot: kubeBoot,
}
@ -287,10 +278,6 @@ func (c *EtcdCluster) configure(k *KubeBoot) error {
return nil
}
func (e *EtcdClusterSpec) String() string {
return DebugString(e)
}
// isTLS indicates the etcd cluster should be configured to use tls
func (c *EtcdCluster) isTLS() bool {
return notEmpty(c.TLSCert) && notEmpty(c.TLSKey)

View File

@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
compute "google.golang.org/api/compute/v0.beta"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/protokube/pkg/gossip"
gossipgce "k8s.io/kops/protokube/pkg/gossip/gce"
"k8s.io/kops/upup/pkg/fi/cloudup/gce"
@ -208,7 +209,7 @@ func (v *GCEVolumes) buildGCEVolume(d *compute.Disk) (*Volume, error) {
if err != nil {
return nil, fmt.Errorf("Error decoding GCE label: %s=%q", k, v)
}
spec, err := ParseEtcdClusterSpec(etcdClusterName, value)
spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, value)
if err != nil {
return nil, fmt.Errorf("error parsing etcd cluster label %q on volume %q: %v", value, volumeName, err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package protokube
import (
"k8s.io/kops/protokube/pkg/etcd"
"sort"
"strings"
"testing"
@ -44,13 +45,13 @@ func Test_VolumeSort_ByEtcdClusterName(t *testing.T) {
t.Fatalf("Fail at sort 1: %v", getIDs(volumes))
}
v2.Info.EtcdClusters = append(v2.Info.EtcdClusters, &EtcdClusterSpec{ClusterKey: "events"})
v2.Info.EtcdClusters = append(v2.Info.EtcdClusters, &etcd.EtcdClusterSpec{ClusterKey: "events"})
sort.Stable(ByEtcdClusterName(volumes))
if getIDs(volumes) != "2,1,3" {
t.Fatalf("Fail at sort 2: %v", getIDs(volumes))
}
v3.Info.EtcdClusters = append(v3.Info.EtcdClusters, &EtcdClusterSpec{ClusterKey: "main"})
v3.Info.EtcdClusters = append(v3.Info.EtcdClusters, &etcd.EtcdClusterSpec{ClusterKey: "main"})
sort.Stable(ByEtcdClusterName(volumes))
if getIDs(volumes) != "3,2,1" {
t.Fatalf("Fail at sort 3: %v", getIDs(volumes))
@ -75,7 +76,7 @@ func Test_Mount_Volumes(t *testing.T) {
&Volume{
LocalDevice: "/dev/xvdb",
Info: VolumeInfo{
EtcdClusters: []*EtcdClusterSpec{
EtcdClusters: []*etcd.EtcdClusterSpec{
{
ClusterKey: "foo",
NodeName: "bar",

View File

@ -17,8 +17,7 @@ limitations under the License.
package protokube
import (
"fmt"
"strings"
"k8s.io/kops/protokube/pkg/etcd"
)
type Volumes interface {
@ -54,40 +53,9 @@ type VolumeInfo struct {
Description string
//MasterID int
// TODO: Maybe the events cluster can just be a PetSet - do we need it for boot?
EtcdClusters []*EtcdClusterSpec
EtcdClusters []*etcd.EtcdClusterSpec
}
func (v *VolumeInfo) String() string {
return DebugString(v)
}
// Parses a tag on a volume that encodes an etcd cluster role
// The format is "<myname>/<allnames>", e.g. "node1/node1,node2,node3"
func ParseEtcdClusterSpec(clusterKey, v string) (*EtcdClusterSpec, error) {
v = strings.TrimSpace(v)
tokens := strings.Split(v, "/")
if len(tokens) != 2 {
return nil, fmt.Errorf("invalid EtcdClusterSpec (expected two tokens): %q", v)
}
nodeName := tokens[0]
nodeNames := strings.Split(tokens[1], ",")
found := false
for _, s := range nodeNames {
if s == nodeName {
found = true
}
}
if !found {
return nil, fmt.Errorf("invalid EtcdClusterSpec (member not found in all nodes): %q", v)
}
c := &EtcdClusterSpec{
ClusterKey: clusterKey,
NodeName: nodeName,
NodeNames: nodeNames,
}
return c, nil
}

View File

@ -16,13 +16,14 @@ limitations under the License.
package protokube
// vspehre_volume houses vSphere volume and implements relevant interfaces.
// vsphere_volume houses vSphere volume and implements relevant interfaces.
import (
"errors"
"fmt"
"github.com/golang/glog"
"io/ioutil"
etcdmanager "k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/upup/pkg/fi/cloudup/vsphere"
"net"
"os/exec"
@ -78,7 +79,7 @@ func (v *VSphereVolumes) FindVolumes() ([]*Volume, error) {
},
}
etcdSpec := &EtcdClusterSpec{
etcdSpec := &etcdmanager.EtcdClusterSpec{
ClusterKey: etcd.EtcdClusterName,
NodeName: etcd.EtcdNodeName,
}
@ -88,7 +89,7 @@ func (v *VSphereVolumes) FindVolumes() ([]*Volume, error) {
nodeNames = append(nodeNames, member.Name)
}
etcdSpec.NodeNames = nodeNames
vol.Info.EtcdClusters = []*EtcdClusterSpec{etcdSpec}
vol.Info.EtcdClusters = []*etcdmanager.EtcdClusterSpec{etcdSpec}
volumes = append(volumes, vol)
}
glog.V(4).Infof("Found volumes: %v", volumes)

View File

@ -125,6 +125,9 @@ type AWSCloud interface {
// DefaultInstanceType determines a suitable instance type for the specified instance group
DefaultInstanceType(cluster *kops.Cluster, ig *kops.InstanceGroup) (string, error)
// FindClusterStatus gets the status of the cluster as it exists in AWS, inferred from volumes
FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error)
}
type awsCloudImplementation struct {

View File

@ -0,0 +1,122 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package awsup
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/upup/pkg/fi"
"strings"
)
// FindClusterStatus discovers the status of the cluster, by looking for the tagged etcd volumes
func (c *awsCloudImplementation) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
etcdStatus, err := findEtcdStatus(c, cluster)
if err != nil {
return nil, err
}
status := &kops.ClusterStatus{
EtcdClusters: etcdStatus,
}
glog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status))
return status, nil
}
// FindEtcdStatus discovers the status of the cluster, by looking for the tagged etcd volumes
func (c *MockAWSCloud) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
etcdStatus, err := findEtcdStatus(c, cluster)
if err != nil {
return nil, err
}
return &kops.ClusterStatus{
EtcdClusters: etcdStatus,
}, nil
}
// findEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes
func findEtcdStatus(c AWSCloud, cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) {
glog.V(2).Infof("Querying AWS for etcd volumes")
statusMap := make(map[string]*kops.EtcdClusterStatus)
tags := c.Tags()
request := &ec2.DescribeVolumesInput{}
for k, v := range tags {
request.Filters = append(request.Filters, NewEC2Filter("tag:"+k, v))
}
var volumes []*ec2.Volume
glog.V(2).Infof("Listing EC2 Volumes")
err := c.EC2().DescribeVolumesPages(request, func(p *ec2.DescribeVolumesOutput, lastPage bool) bool {
for _, volume := range p.Volumes {
volumes = append(volumes, volume)
}
return true
})
if err != nil {
return nil, fmt.Errorf("error describing volumes: %v", err)
}
for _, volume := range volumes {
volumeID := aws.StringValue(volume.VolumeId)
etcdClusterName := ""
var etcdClusterSpec *etcd.EtcdClusterSpec
master := false
for _, tag := range volume.Tags {
k := aws.StringValue(tag.Key)
v := aws.StringValue(tag.Value)
if strings.HasPrefix(k, TagNameEtcdClusterPrefix) {
etcdClusterName := strings.TrimPrefix(k, TagNameEtcdClusterPrefix)
etcdClusterSpec, err = etcd.ParseEtcdClusterSpec(etcdClusterName, v)
if err != nil {
return nil, fmt.Errorf("error parsing etcd cluster tag %q on volume %q: %v", v, volumeID, err)
}
} else if k == TagNameRolePrefix+TagRoleMaster {
master = true
}
}
if etcdClusterName == "" || etcdClusterSpec == nil || !master {
continue
}
status := statusMap[etcdClusterName]
if status == nil {
status = &kops.EtcdClusterStatus{
Name: etcdClusterName,
}
statusMap[etcdClusterName] = status
}
memberName := etcdClusterSpec.NodeName
status.Members = append(status.Members, &kops.EtcdMemberStatus{
Name: memberName,
VolumeId: aws.StringValue(volume.VolumeId),
})
}
var status []kops.EtcdClusterStatus
for _, v := range statusMap {
status = append(status, *v)
}
return status, nil
}

View File

@ -39,6 +39,9 @@ type GCECloud interface {
WaitForOp(op *compute.Operation) error
GetApiIngressStatus(cluster *kops.Cluster) ([]kops.ApiIngressStatus, error)
Labels() map[string]string
// FindClusterStatus gets the status of the cluster as it exists in GCE, inferred from volumes
FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error)
}
type gceCloudImplementation struct {

View File

@ -89,6 +89,11 @@ func (c *mockGCECloud) WaitForOp(op *compute.Operation) error {
return fmt.Errorf("mockGCECloud::WaitForOp not implemented")
}
// FindClusterStatus implements GCECloud::FindClusterStatus
func (c *mockGCECloud) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
return nil, fmt.Errorf("mockGCECloud::FindClusterStatus not implemented")
}
// GetApiIngressStatus implements GCECloud::GetApiIngressStatus
func (c *mockGCECloud) GetApiIngressStatus(cluster *kops.Cluster) ([]kops.ApiIngressStatus, error) {
return nil, fmt.Errorf("mockGCECloud::GetApiIngressStatus not implemented")

View File

@ -0,0 +1,149 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"context"
"fmt"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v0.beta"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/upup/pkg/fi"
"strings"
)
func (c *gceCloudImplementation) allZones() ([]string, error) {
var zones []string
// TODO: use PageToken to list all not just the first 500
ctx := context.Background()
err := c.compute.Zones.List(c.project).Pages(ctx, func(page *compute.ZoneList) error {
for _, zone := range page.Items {
regionName := LastComponent(zone.Region)
if regionName == c.region {
zones = append(zones, zone.Name)
}
}
return nil
})
if err != nil {
return nil, fmt.Errorf("error listing zones: %v", err)
}
return zones, nil
}
// FindClusterStatus discovers the status of the cluster, by inspecting the cloud objects
func (c *gceCloudImplementation) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
etcdClusters, err := c.findEtcdStatus(cluster)
if err != nil {
return nil, err
}
status := &kops.ClusterStatus{
EtcdClusters: etcdClusters,
}
glog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status))
return status, nil
}
// FindEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes
func (c *gceCloudImplementation) findEtcdStatus(cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) {
statusMap := make(map[string]*kops.EtcdClusterStatus)
labels := c.Labels()
zones, err := c.allZones()
if err != nil {
return nil, err
}
var disks []*compute.Disk
// TODO: Filter disks query by Label?
ctx := context.Background()
for _, zone := range zones {
err := c.compute.Disks.List(c.project, zone).Pages(ctx, func(page *compute.DiskList) error {
for _, d := range page.Items {
glog.V(4).Infof("Found disk %q with labels %v", d.Name, d.Labels)
match := true
for k, v := range labels {
if d.Labels[k] != v {
match = false
}
}
if match {
disks = append(disks, d)
}
}
return nil
})
if err != nil {
return nil, fmt.Errorf("error describing volumes: %v", err)
}
}
for _, disk := range disks {
etcdClusterName := ""
var etcdClusterSpec *etcd.EtcdClusterSpec
master := false
for k, v := range disk.Labels {
if strings.HasPrefix(k, GceLabelNameEtcdClusterPrefix) {
etcdClusterName = strings.TrimPrefix(k, GceLabelNameEtcdClusterPrefix)
value, err := DecodeGCELabel(v)
if err != nil {
return nil, fmt.Errorf("unexpected etcd label on volume %q: %s=%s", disk.Name, k, v)
}
spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, value)
if err != nil {
return nil, fmt.Errorf("error parsing etcd cluster label %q on volume %q: %v", value, disk.Name, err)
}
etcdClusterSpec = spec
} else if strings.HasPrefix(k, GceLabelNameRolePrefix) {
roleName := strings.TrimPrefix(k, GceLabelNameRolePrefix)
if roleName == "master" {
master = true
}
}
}
if etcdClusterName == "" || etcdClusterSpec == nil || !master {
continue
}
status := statusMap[etcdClusterName]
if status == nil {
status = &kops.EtcdClusterStatus{
Name: etcdClusterName,
}
statusMap[etcdClusterName] = status
}
status.Members = append(status.Members, &kops.EtcdMemberStatus{
Name: etcdClusterSpec.NodeName,
VolumeId: disk.Name,
})
}
var status []kops.EtcdClusterStatus
for _, v := range statusMap {
status = append(status, *v)
}
return status, nil
}