Changes for handling kops update

This commit is contained in:
Srikanth 2020-05-06 13:48:16 +05:30
parent 5e4c48bbeb
commit 5d3f2447e4
3 changed files with 121 additions and 15 deletions

View File

@ -107,5 +107,9 @@ func (s *CloudDiscoveryStatusStore) FindClusterStatus(cluster *kops.Cluster) (*k
if osCloud, ok := cloud.(openstack.OpenstackCloud); ok {
return osCloud.FindClusterStatus(cluster)
}
if doCloud, ok := cloud.(*digitalocean.Cloud); ok {
return doCloud.FindClusterStatus(cluster)
}
return nil, fmt.Errorf("etcd Status not implemented for %T", cloud)
}

View File

@ -32,9 +32,13 @@ import (
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/resources/digitalocean/dns"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/upup/pkg/fi"
)
const TagKubernetesClusterIndex = "k8s-index"
const TagKubernetesClusterNamePrefix = "KubernetesCluster"
// TokenSource implements oauth2.TokenSource
type TokenSource struct {
AccessToken string
@ -175,3 +179,99 @@ func (c *Cloud) GetApiIngressStatus(cluster *kops.Cluster) ([]kops.ApiIngressSta
return nil, nil
}
// FindClusterStatus discovers the status of the cluster, by looking for the tagged etcd volumes
func (c *Cloud) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
etcdStatus, err := findEtcdStatus(c, cluster)
if err != nil {
return nil, err
}
status := &kops.ClusterStatus{
EtcdClusters: etcdStatus,
}
klog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status))
return status, nil
}
// findEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes
func findEtcdStatus(c *Cloud, cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) {
statusMap := make(map[string]*kops.EtcdClusterStatus)
klog.V(2).Infof("Querying DO for etcd volumes")
volumes, err := getAllVolumesByRegion(c, c.RegionName)
if err != nil {
return nil, fmt.Errorf("error describing volumes: %v", err)
}
for _, volume := range volumes {
volumeID := volume.ID
etcdClusterName := ""
var etcdClusterSpec *etcd.EtcdClusterSpec
for _, myTag := range volume.Tags {
klog.V(2).Infof("findEtcdStatus status (from cloud): %v", myTag)
// check if volume belongs to this cluster.
// tag will be in the format "KubernetesCluster:dev5-k8s-local" (where clusterName is dev5.k8s.local)
clusterName := strings.Replace(cluster.Name, ".", "-", -1)
if strings.Contains(myTag, fmt.Sprintf("%s, %s", TagKubernetesClusterNamePrefix, ":", clusterName)) {
klog.V(2).Infof("findEtcdStatus cluster comparison matched for tag: %v", myTag)
// this volume belongs to our cluster, add this to our etcdClusterSpec.
// loop through the tags again and
for _, volumeTag := range volume.Tags {
if strings.Contains(volumeTag, TagKubernetesClusterIndex) {
if len(strings.Split(volumeTag, ":")) < 2 {
return nil, fmt.Errorf("error splitting the volume tag %q on volume %q", volumeTag, volume)
}
dropletIndex := strings.Split(volumeTag, ":")[1]
etcdClusterSpec, err = c.getEtcdClusterSpec(volume.Name, dropletIndex)
klog.V(2).Infof("findEtcdStatus etcdClusterSpec: %v", fi.DebugAsJsonString(etcdClusterSpec))
if err != nil {
return nil, fmt.Errorf("error parsing etcd cluster tag %q on volume %q: %v", volumeTag, volumeID, err)
}
etcdClusterName = etcdClusterSpec.ClusterKey
status := statusMap[etcdClusterName]
if status == nil {
status = &kops.EtcdClusterStatus{
Name: etcdClusterName,
}
statusMap[etcdClusterName] = status
}
memberName := etcdClusterSpec.NodeName
status.Members = append(status.Members, &kops.EtcdMemberStatus{
Name: memberName,
VolumeId: volume.ID,
})
}
}
}
}
}
var status []kops.EtcdClusterStatus
for _, v := range statusMap {
status = append(status, *v)
}
return status, nil
}
func (c *Cloud) getEtcdClusterSpec(volumeName string, dropletName string) (*etcd.EtcdClusterSpec, error) {
var clusterKey string
if strings.Contains(volumeName, "etcd-main") {
clusterKey = "main"
} else if strings.Contains(volumeName, "etcd-events") {
clusterKey = "events"
} else {
return nil, fmt.Errorf("could not determine etcd cluster type for volume: %s", volumeName)
}
return &etcd.EtcdClusterSpec{
ClusterKey: clusterKey,
NodeName: dropletName,
NodeNames: []string{dropletName},
}, nil
}

View File

@ -124,25 +124,27 @@ func (_ *LoadBalancer) RenderDO(t *do.DOAPITarget, a, e, changes *LoadBalancer)
HealthyThreshold: 5,
}
klog.V(10).Infof("Creating load balancer for DO")
if a == nil {
klog.V(10).Infof("Creating load balancer for DO")
loadBalancerService := t.Cloud.LoadBalancers()
loadbalancer, _, err := loadBalancerService.Create(context.TODO(), &godo.LoadBalancerRequest{
Name: fi.StringValue(e.Name),
Region: fi.StringValue(e.Region),
Tag: fi.StringValue(e.DropletTag),
ForwardingRules: Rules,
HealthCheck: HealthCheck,
})
loadBalancerService := t.Cloud.LoadBalancers()
loadbalancer, _, err := loadBalancerService.Create(context.TODO(), &godo.LoadBalancerRequest{
Name: fi.StringValue(e.Name),
Region: fi.StringValue(e.Region),
Tag: fi.StringValue(e.DropletTag),
ForwardingRules: Rules,
HealthCheck: HealthCheck,
})
if err != nil {
klog.Errorf("Error creating load balancer with Name=%s, Error=%v", fi.StringValue(e.Name), err)
return err
if err != nil {
klog.Errorf("Error creating load balancer with Name=%s, Error=%v", fi.StringValue(e.Name), err)
return err
}
e.ID = fi.String(loadbalancer.ID)
e.IPAddress = fi.String(loadbalancer.IP) // This will be empty on create, but will be filled later on FindIPAddress invokation.
}
e.ID = fi.String(loadbalancer.ID)
e.IPAddress = fi.String(loadbalancer.IP) // This will be empty on create, but will be filled later on FindIPAddress invokation.
return nil
}