Merge pull request #8237 from srikiz/DO-AddLoadBalancer

[DigitalOcean] Add load balancer support for master HA
This commit is contained in:
Kubernetes Prow Robot 2020-02-02 21:09:20 -08:00 committed by GitHub
commit 4c6b87494a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 480 additions and 5 deletions

View File

@ -18,6 +18,7 @@ go_library(
"//pkg/assets:go_default_library",
"//pkg/client/simple:go_default_library",
"//pkg/featureflag:go_default_library",
"//pkg/resources/digitalocean:go_default_library",
"//upup/pkg/fi/cloudup:go_default_library",
"//upup/pkg/fi/cloudup/aliup:go_default_library",
"//upup/pkg/fi/cloudup/awstasks:go_default_library",

View File

@ -21,6 +21,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/upup/pkg/fi/cloudup"
"k8s.io/kops/upup/pkg/fi/cloudup/aliup"
"k8s.io/kops/upup/pkg/fi/cloudup/awstasks"
@ -77,6 +78,10 @@ func (s *CloudDiscoveryStatusStore) GetApiIngressStatus(cluster *kops.Cluster) (
return osCloud.GetApiIngressStatus(cluster)
}
if doCloud, ok := cloud.(*digitalocean.Cloud); ok {
return doCloud.GetApiIngressStatus(cluster)
}
return nil, fmt.Errorf("API Ingress Status not implemented for %T", cloud)
}

View File

@ -3,15 +3,19 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"api_loadbalancer.go",
"context.go",
"droplets.go",
],
importpath = "k8s.io/kops/pkg/model/domodel",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/kops:go_default_library",
"//pkg/dns:go_default_library",
"//pkg/model:go_default_library",
"//upup/pkg/fi:go_default_library",
"//upup/pkg/fi/cloudup/do:go_default_library",
"//upup/pkg/fi/cloudup/dotasks:go_default_library",
"//upup/pkg/fi/fitasks:go_default_library",
],
)

View File

@ -0,0 +1,89 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package domodel
import (
"errors"
"fmt"
"strings"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/dns"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/do"
"k8s.io/kops/upup/pkg/fi/cloudup/dotasks"
"k8s.io/kops/upup/pkg/fi/fitasks"
)
// APILoadBalancerModelBuilder builds a LoadBalancer for accessing the API
type APILoadBalancerModelBuilder struct {
*DOModelContext
Lifecycle *fi.Lifecycle
}
var _ fi.ModelBuilder = &APILoadBalancerModelBuilder{}
func (b *APILoadBalancerModelBuilder) Build(c *fi.ModelBuilderContext) error {
// Configuration where a load balancer fronts the API
if !b.UseLoadBalancerForAPI() {
return nil
}
lbSpec := b.Cluster.Spec.API.LoadBalancer
if lbSpec == nil {
// Skipping API LB creation; not requested in Spec
return nil
}
switch lbSpec.Type {
case kops.LoadBalancerTypeInternal:
// OK
case kops.LoadBalancerTypePublic:
// OK
default:
return fmt.Errorf("unhandled LoadBalancer type %q", lbSpec.Type)
}
clusterName := strings.Replace(b.ClusterName(), ".", "-", -1)
loadbalancerName := "api-" + clusterName
clusterMasterTag := do.TagKubernetesClusterMasterPrefix + ":" + clusterName
// Create LoadBalancer for API LB
loadbalancer := &dotasks.LoadBalancer{
Name: fi.String(loadbalancerName),
Region: fi.String(b.Cluster.Spec.Subnets[0].Region),
DropletTag: fi.String(clusterMasterTag),
Lifecycle: b.Lifecycle,
}
c.AddTask(loadbalancer)
// Temporarily do not know the role of the following function
if dns.IsGossipHostname(b.Cluster.Name) || b.UsePrivateDNS() {
// Ensure the LB hostname is included in the TLS certificate,
// if we're not going to use an alias for it
// TODO: I don't love this technique for finding the task by name & modifying it
masterKeypairTask, found := c.Tasks["Keypair/master"]
if !found {
return errors.New("keypair/master task not found")
}
masterKeypair := masterKeypairTask.(*fitasks.Keypair)
masterKeypair.AlternateNameTasks = append(masterKeypair.AlternateNameTasks, loadbalancer)
}
return nil
}

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"os"
"strings"
"github.com/digitalocean/godo"
"golang.org/x/oauth2"
@ -128,7 +129,43 @@ func (c *Cloud) Droplets() godo.DropletsService {
return c.Client.Droplets
}
func (c *Cloud) LoadBalancers() godo.LoadBalancersService {
return c.Client.LoadBalancers
}
// FindVPCInfo is not implemented, it's only here to satisfy the fi.Cloud interface
func (c *Cloud) FindVPCInfo(id string) (*fi.VPCInfo, error) {
return nil, errors.New("not implemented")
}
func (c *Cloud) GetApiIngressStatus(cluster *kops.Cluster) ([]kops.ApiIngressStatus, error) {
var ingresses []kops.ApiIngressStatus
if cluster.Spec.MasterPublicName != "" {
// Note that this must match Digital Ocean's lb name
klog.V(2).Infof("Querying DO to find Loadbalancers for API (%q)", cluster.Name)
loadBalancers, err := getAllLoadBalancers(c)
if err != nil {
return nil, fmt.Errorf("LoadBalancers.List returned error: %v", err)
}
lbName := "api-" + strings.Replace(cluster.Name, ".", "-", -1)
for _, lb := range loadBalancers {
if lb.Name == lbName {
klog.V(10).Infof("Matching LB name found for API (%q)", cluster.Name)
if lb.Status != "active" {
return nil, fmt.Errorf("load-balancer is not yet active (current status: %s)", lb.Status)
}
address := lb.IP
ingresses = append(ingresses, kops.ApiIngressStatus{IP: address})
return ingresses, nil
}
}
}
return nil, nil
}

View File

@ -33,9 +33,10 @@ import (
)
const (
resourceTypeDroplet = "droplet"
resourceTypeVolume = "volume"
resourceTypeDNSRecord = "dns-record"
resourceTypeDroplet = "droplet"
resourceTypeVolume = "volume"
resourceTypeDNSRecord = "dns-record"
resourceTypeLoadBalancer = "loadbalancer"
)
type listFn func(fi.Cloud, string) ([]*resources.Resource, error)
@ -47,6 +48,7 @@ func ListResources(cloud *Cloud, clusterName string) (map[string]*resources.Reso
listVolumes,
listDroplets,
listDNS,
listLoadBalancers,
}
for _, fn := range listFunctions {
@ -265,6 +267,67 @@ func getAllRecordsByDomain(cloud *Cloud, domain string) ([]godo.DomainRecord, er
return allRecords, nil
}
func listLoadBalancers(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(*Cloud)
var resourceTrackers []*resources.Resource
clusterTag := "KubernetesCluster-Master:" + strings.Replace(clusterName, ".", "-", -1)
lbs, err := getAllLoadBalancers(c)
if err != nil {
return nil, fmt.Errorf("failed to list lbs: %v", err)
}
for _, lb := range lbs {
if strings.Contains(lb.Tag, clusterTag) {
resourceTracker := &resources.Resource{
Name: lb.Name,
ID: lb.ID,
Type: resourceTypeLoadBalancer,
Deleter: deleteLoadBalancer,
Obj: lb,
}
var blocks []string
for _, dropletID := range lb.DropletIDs {
blocks = append(blocks, "droplet:"+strconv.Itoa(dropletID))
}
resourceTracker.Blocks = blocks
resourceTrackers = append(resourceTrackers, resourceTracker)
}
}
return resourceTrackers, nil
}
func getAllLoadBalancers(cloud *Cloud) ([]godo.LoadBalancer, error) {
allLoadBalancers := []godo.LoadBalancer{}
opt := &godo.ListOptions{}
for {
lbs, resp, err := cloud.LoadBalancers().List(context.TODO(), opt)
if err != nil {
return nil, err
}
allLoadBalancers = append(allLoadBalancers, lbs...)
if resp.Links == nil || resp.Links.IsLastPage() {
break
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}
opt.Page = page + 1
}
return allLoadBalancers, nil
}
func deleteDroplet(cloud fi.Cloud, t *resources.Resource) error {
c := cloud.(*Cloud)
@ -315,6 +378,18 @@ func deleteRecord(cloud fi.Cloud, domain string, t *resources.Resource) error {
return nil
}
func deleteLoadBalancer(cloud fi.Cloud, t *resources.Resource) error {
c := cloud.(*Cloud)
lb := t.Obj.(godo.LoadBalancer)
_, err := c.Client.LoadBalancers.Delete(context.TODO(), lb.ID)
if err != nil {
return fmt.Errorf("failed to delete load balancer with name %s %v", lb.Name, err)
}
return nil
}
func waitForDetach(cloud *Cloud, action *godo.Action) error {
timeout := time.After(10 * time.Second)
ticker := time.NewTicker(500 * time.Millisecond)

View File

@ -396,8 +396,9 @@ func (c *ApplyClusterCmd) Run() error {
modelContext.SSHPublicKeys = sshPublicKeys
l.AddTypes(map[string]interface{}{
"volume": &dotasks.Volume{},
"droplet": &dotasks.Droplet{},
"volume": &dotasks.Volume{},
"droplet": &dotasks.Droplet{},
"loadbalancer": &dotasks.LoadBalancer{},
})
}
case kops.CloudProviderAWS:
@ -643,8 +644,12 @@ func (c *ApplyClusterCmd) Run() error {
&model.IAMModelBuilder{KopsModelContext: modelContext, Lifecycle: &securityLifecycle},
)
case kops.CloudProviderDO:
doModelContext := &domodel.DOModelContext{
KopsModelContext: modelContext,
}
l.Builders = append(l.Builders,
&model.MasterVolumeBuilder{KopsModelContext: modelContext, Lifecycle: &clusterLifecycle},
&domodel.APILoadBalancerModelBuilder{DOModelContext: doModelContext, Lifecycle: &securityLifecycle},
)
case kops.CloudProviderGCE:

View File

@ -5,6 +5,8 @@ go_library(
srcs = [
"droplet.go",
"droplet_fitask.go",
"loadbalancer.go",
"loadbalancer_fitask.go",
"volume.go",
"volume_fitask.go",
],

View File

@ -0,0 +1,182 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dotasks
import (
"context"
"errors"
"fmt"
"net"
"time"
"github.com/digitalocean/godo"
"k8s.io/klog"
"k8s.io/kops/pkg/resources/digitalocean"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/do"
)
//go:generate fitask -type=LoadBalancer
type LoadBalancer struct {
Name *string
ID *string
Lifecycle *fi.Lifecycle
Region *string
DropletTag *string
IPAddress *string
}
var _ fi.CompareWithID = &LoadBalancer{}
func (lb *LoadBalancer) CompareWithID() *string {
return lb.ID
}
func (lb *LoadBalancer) Find(c *fi.Context) (*LoadBalancer, error) {
if fi.StringValue(lb.ID) == "" {
// Loadbalancer = nil if not found
return nil, nil
}
cloud := c.Cloud.(*digitalocean.Cloud)
lbService := cloud.LoadBalancers()
loadbalancer, _, err := lbService.Get(context.TODO(), fi.StringValue(lb.ID))
if err != nil {
return nil, fmt.Errorf("load balancer service get request returned error %v", err)
}
return &LoadBalancer{
Name: fi.String(loadbalancer.Name),
ID: fi.String(loadbalancer.ID),
Lifecycle: lb.Lifecycle,
Region: fi.String(loadbalancer.Region.Slug),
}, nil
}
func (lb *LoadBalancer) Run(c *fi.Context) error {
return fi.DefaultDeltaRunMethod(lb, c)
}
func (_ *LoadBalancer) CheckChanges(a, e, changes *LoadBalancer) error {
if a != nil {
if changes.Name != nil {
return fi.CannotChangeField("Name")
}
if changes.ID != nil {
return fi.CannotChangeField("ID")
}
if changes.Region != nil {
return fi.CannotChangeField("Region")
}
} else {
if e.Name == nil {
return fi.RequiredField("Name")
}
if e.Region == nil {
return fi.RequiredField("Region")
}
}
return nil
}
func (_ *LoadBalancer) RenderDO(t *do.DOAPITarget, a, e, changes *LoadBalancer) error {
Rules := []godo.ForwardingRule{
{
EntryProtocol: "https",
EntryPort: 443,
TargetProtocol: "https",
TargetPort: 443,
TlsPassthrough: true,
},
{
EntryProtocol: "http",
EntryPort: 80,
TargetProtocol: "http",
TargetPort: 80,
},
}
HealthCheck := &godo.HealthCheck{
Protocol: "tcp",
Port: 443,
Path: "",
CheckIntervalSeconds: 60,
ResponseTimeoutSeconds: 5,
UnhealthyThreshold: 3,
HealthyThreshold: 5,
}
klog.V(10).Infof("Creating load balancer for DO")
loadBalancerService := t.Cloud.LoadBalancers()
loadbalancer, _, err := loadBalancerService.Create(context.TODO(), &godo.LoadBalancerRequest{
Name: fi.StringValue(e.Name),
Region: fi.StringValue(e.Region),
Tag: fi.StringValue(e.DropletTag),
ForwardingRules: Rules,
HealthCheck: HealthCheck,
})
if err != nil {
klog.Errorf("Error creating load balancer with Name=%s, Error=%v", fi.StringValue(e.Name), err)
return err
}
e.ID = fi.String(loadbalancer.ID)
e.IPAddress = fi.String(loadbalancer.IP) // This will be empty on create, but will be filled later on FindIPAddress invokation.
return nil
}
func (lb *LoadBalancer) FindIPAddress(c *fi.Context) (*string, error) {
cloud := c.Cloud.(*digitalocean.Cloud)
loadBalancerService := cloud.LoadBalancers()
klog.V(10).Infof("Find IP address for load balancer ID=%s", fi.StringValue(lb.ID))
loadBalancer, _, err := loadBalancerService.Get(context.TODO(), fi.StringValue(lb.ID))
if err != nil {
klog.Errorf("Error fetching load balancer with Name=%s", fi.StringValue(lb.Name))
return nil, err
}
address := loadBalancer.IP
if isIPv4(address) {
klog.V(10).Infof("load balancer address=%s", address)
return &address, nil
}
const lbWaitTime = 10 * time.Second
klog.Warningf("IP address for LB %s not yet available -- sleeping %s", fi.StringValue(lb.Name), lbWaitTime)
time.Sleep(lbWaitTime)
return nil, errors.New("IP Address is still empty.")
}
func isIPv4(host string) bool {
ip := net.ParseIP(host)
if ip == nil {
return false
}
return ip.To4() != nil
}

View File

@ -0,0 +1,75 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by ""fitask" -type=LoadBalancer"; DO NOT EDIT
package dotasks
import (
"encoding/json"
"k8s.io/kops/upup/pkg/fi"
)
// LoadBalancer
// JSON marshaling boilerplate
type realLoadBalancer LoadBalancer
// UnmarshalJSON implements conversion to JSON, supporting an alternate specification of the object as a string
func (o *LoadBalancer) UnmarshalJSON(data []byte) error {
var jsonName string
if err := json.Unmarshal(data, &jsonName); err == nil {
o.Name = &jsonName
return nil
}
var r realLoadBalancer
if err := json.Unmarshal(data, &r); err != nil {
return err
}
*o = LoadBalancer(r)
return nil
}
var _ fi.HasLifecycle = &LoadBalancer{}
// GetLifecycle returns the Lifecycle of the object, implementing fi.HasLifecycle
func (o *LoadBalancer) GetLifecycle() *fi.Lifecycle {
return o.Lifecycle
}
// SetLifecycle sets the Lifecycle of the object, implementing fi.SetLifecycle
func (o *LoadBalancer) SetLifecycle(lifecycle fi.Lifecycle) {
o.Lifecycle = &lifecycle
}
var _ fi.HasName = &LoadBalancer{}
// GetName returns the Name of the object, implementing fi.HasName
func (o *LoadBalancer) GetName() *string {
return o.Name
}
// SetName sets the Name of the object, implementing fi.SetName
func (o *LoadBalancer) SetName(name string) {
o.Name = &name
}
// String is the stringer function for the task, producing readable output using fi.TaskAsString
func (o *LoadBalancer) String() string {
return fi.TaskAsString(o)
}