Merge pull request #14933 from Mia-Cross/scaleway_load_balancer

scaleway: load-balancer support
This commit is contained in:
Kubernetes Prow Robot 2023-01-30 00:12:51 -08:00 committed by GitHub
commit 254fd0ca39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 9228 additions and 10 deletions

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awstasks"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
"k8s.io/kops/upup/pkg/fi/cloudup/scaleway"
"github.com/blang/semver/v4"
utilnet "k8s.io/apimachinery/pkg/util/net"
@ -234,6 +235,14 @@ func (b *KopsModelContext) CloudTags(name string, shared bool) map[string]string
tags[k] = v
}
}
case kops.CloudProviderScaleway:
for k, v := range b.Cluster.Spec.CloudLabels {
if k == scaleway.TagClusterName && shared == true {
klog.V(4).Infof("Skipping %q tag for shared resource", scaleway.TagClusterName)
continue
}
tags[k] = v
}
}
return tags
}

View File

@ -0,0 +1,110 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scalewaymodel
import (
"fmt"
"github.com/scaleway/scaleway-sdk-go/api/lb/v1"
"k8s.io/klog/v2"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/dns"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/scaleway"
"k8s.io/kops/upup/pkg/fi/cloudup/scalewaytasks"
)
// APILoadBalancerModelBuilder builds a load-balancer for accessing the API
type APILoadBalancerModelBuilder struct {
*ScwModelContext
Lifecycle fi.Lifecycle
}
var _ fi.CloudupModelBuilder = &APILoadBalancerModelBuilder{}
func (b *APILoadBalancerModelBuilder) Build(c *fi.CloudupModelBuilderContext) error {
// Configuration where a load balancer fronts the API
if !b.UseLoadBalancerForAPI() {
return nil
}
lbSpec := b.Cluster.Spec.API.LoadBalancer
switch lbSpec.Type {
case kops.LoadBalancerTypePublic:
klog.V(8).Infof("Using public load-balancer")
case kops.LoadBalancerTypeInternal:
return fmt.Errorf("Scaleway clusters don't have a VPC yet, so internal load-balancers are not supported at the time")
default:
return fmt.Errorf("unhandled load-balancer type %q", lbSpec.Type)
}
zone, err := scaleway.ParseZoneFromClusterSpec(b.Cluster.Spec)
if err != nil {
return fmt.Errorf("building load-balancer task: %w", err)
}
lbTags := []string(nil)
for k, v := range b.CloudTags(b.ClusterName(), false) {
lbTags = append(lbTags, fmt.Sprintf("%s=%s", k, v))
}
lbTags = append(lbTags, fmt.Sprintf("%s=%s", scaleway.TagNameRolePrefix, scaleway.TagRoleControlPlane))
loadBalancerName := "api." + b.ClusterName()
loadBalancer := &scalewaytasks.LoadBalancer{
Name: fi.PtrTo(loadBalancerName),
Zone: fi.PtrTo(string(zone)),
Lifecycle: b.Lifecycle,
Tags: lbTags,
Description: "Load-balancer for kops cluster " + b.ClusterName(),
SslCompatibilityLevel: string(lb.SSLCompatibilityLevelSslCompatibilityLevelUnknown),
}
c.AddTask(loadBalancer)
lbBackend := &scalewaytasks.LBBackend{
Name: fi.PtrTo("lb-backend"),
Lifecycle: b.Lifecycle,
Zone: fi.PtrTo(string(zone)),
ForwardProtocol: fi.PtrTo(string(lb.ProtocolTCP)),
ForwardPort: fi.PtrTo(int32(443)),
ForwardPortAlgorithm: fi.PtrTo(string(lb.ForwardPortAlgorithmRoundrobin)),
StickySessions: fi.PtrTo(string(lb.StickySessionsTypeNone)),
ProxyProtocol: fi.PtrTo(string(lb.ProxyProtocolProxyProtocolUnknown)),
LoadBalancer: loadBalancer,
}
c.AddTask(lbBackend)
lbFrontend := &scalewaytasks.LBFrontend{
Name: fi.PtrTo("lb-frontend"),
Lifecycle: b.Lifecycle,
Zone: fi.PtrTo(string(zone)),
InboundPort: fi.PtrTo(int32(443)),
LoadBalancer: loadBalancer,
LBBackend: lbBackend,
}
c.AddTask(lbFrontend)
if dns.IsGossipClusterName(b.Cluster.Name) || b.Cluster.UsesPrivateDNS() || b.Cluster.UsesNoneDNS() {
// Ensure the LB hostname is included in the TLS certificate,
// if we're not going to use an alias for it
loadBalancer.ForAPIServer = true
}
return nil
}

View File

@ -23,12 +23,14 @@ import (
iam "github.com/scaleway/scaleway-sdk-go/api/iam/v1alpha1"
"github.com/scaleway/scaleway-sdk-go/api/instance/v1"
"github.com/scaleway/scaleway-sdk-go/api/lb/v1"
)
const (
resourceTypeServer = "server"
resourceTypeSSHKey = "ssh-key"
resourceTypeVolume = "volume"
resourceTypeLoadBalancer = "load-balancer"
resourceTypeServer = "server"
resourceTypeSSHKey = "ssh-key"
resourceTypeVolume = "volume"
)
type listFn func(fi.Cloud, string) ([]*resources.Resource, error)
@ -38,6 +40,7 @@ func ListResources(cloud scaleway.ScwCloud, clusterInfo resources.ClusterInfo) (
clusterName := clusterInfo.Name
listFunctions := []listFn{
listLoadBalancers,
listServers,
listSSHKeys,
listVolumes,
@ -56,6 +59,30 @@ func ListResources(cloud scaleway.ScwCloud, clusterInfo resources.ClusterInfo) (
return resourceTrackers, nil
}
func listLoadBalancers(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(scaleway.ScwCloud)
lbs, err := c.GetClusterLoadBalancers(clusterName)
if err != nil {
return nil, err
}
resourceTrackers := []*resources.Resource(nil)
for _, loadBalancer := range lbs {
resourceTracker := &resources.Resource{
Name: loadBalancer.Name,
ID: loadBalancer.ID,
Type: resourceTypeLoadBalancer,
Deleter: func(cloud fi.Cloud, tracker *resources.Resource) error {
return deleteLoadBalancer(cloud, tracker)
},
Obj: loadBalancer,
}
resourceTrackers = append(resourceTrackers, resourceTracker)
}
return resourceTrackers, nil
}
func listServers(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) {
c := cloud.(scaleway.ScwCloud)
servers, err := c.GetClusterServers(clusterName, nil)
@ -131,6 +158,13 @@ func listVolumes(cloud fi.Cloud, clusterName string) ([]*resources.Resource, err
return resourceTrackers, nil
}
func deleteLoadBalancer(cloud fi.Cloud, tracker *resources.Resource) error {
c := cloud.(scaleway.ScwCloud)
loadBalancer := tracker.Obj.(*lb.LB)
return c.DeleteLoadBalancer(loadBalancer)
}
func deleteServer(cloud fi.Cloud, tracker *resources.Resource) error {
c := cloud.(scaleway.ScwCloud)
server := tracker.Obj.(*instance.Server)

View File

@ -688,6 +688,7 @@ func (c *ApplyClusterCmd) Run(ctx context.Context) error {
KopsModelContext: modelContext,
}
l.Builders = append(l.Builders,
&scalewaymodel.APILoadBalancerModelBuilder{ScwModelContext: scwModelContext, Lifecycle: networkLifecycle},
&scalewaymodel.InstanceModelBuilder{ScwModelContext: scwModelContext, BootstrapScriptBuilder: bootstrapScriptBuilder, Lifecycle: clusterLifecycle},
&scalewaymodel.SSHKeyModelBuilder{ScwModelContext: scwModelContext, Lifecycle: securityLifecycle},
)

View File

@ -23,6 +23,7 @@ import (
iam "github.com/scaleway/scaleway-sdk-go/api/iam/v1alpha1"
"github.com/scaleway/scaleway-sdk-go/api/instance/v1"
"github.com/scaleway/scaleway-sdk-go/api/lb/v1"
"github.com/scaleway/scaleway-sdk-go/scw"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/errors"
@ -56,6 +57,7 @@ type ScwCloud interface {
IamService() *iam.API
InstanceService() *instance.API
LBService() *lb.ZonedAPI
DeleteGroup(group *cloudinstances.CloudInstanceGroup) error
DeleteInstance(i *cloudinstances.CloudInstance) error
@ -66,10 +68,12 @@ type ScwCloud interface {
GetApiIngressStatus(cluster *kops.Cluster) ([]fi.ApiIngressStatus, error)
GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error)
GetClusterLoadBalancers(clusterName string) ([]*lb.LB, error)
GetClusterServers(clusterName string, serverName *string) ([]*instance.Server, error)
GetClusterSSHKeys(clusterName string) ([]*iam.SSHKey, error)
GetClusterVolumes(clusterName string) ([]*instance.Volume, error)
DeleteLoadBalancer(loadBalancer *lb.LB) error
DeleteServer(server *instance.Server) error
DeleteSSHKey(sshkey *iam.SSHKey) error
DeleteVolume(volume *instance.Volume) error
@ -87,6 +91,7 @@ type scwCloudImplementation struct {
iamAPI *iam.API
instanceAPI *instance.API
lbAPI *lb.ZonedAPI
}
// NewScwCloud returns a Cloud with a Scaleway Client using the env vars SCW_ACCESS_KEY, SCW_SECRET_KEY and SCW_DEFAULT_PROJECT_ID
@ -135,6 +140,7 @@ func NewScwCloud(tags map[string]string) (ScwCloud, error) {
tags: tags,
iamAPI: iam.NewAPI(scwClient),
instanceAPI: instance.NewAPI(scwClient),
lbAPI: lb.NewZonedAPI(scwClient),
}, nil
}
@ -172,6 +178,10 @@ func (s *scwCloudImplementation) InstanceService() *instance.API {
return s.instanceAPI
}
func (s *scwCloudImplementation) LBService() *lb.ZonedAPI {
return s.lbAPI
}
func (s *scwCloudImplementation) DeleteGroup(group *cloudinstances.CloudInstanceGroup) error {
toDelete := append(group.NeedUpdate, group.Ready...)
for _, cloudInstance := range toDelete {
@ -193,20 +203,56 @@ func (s *scwCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstance)
klog.V(4).Infof("error deleting cloud instance %s of group %s : instance was already deleted", i.ID, i.CloudInstanceGroup.HumanName)
return nil
}
return fmt.Errorf("error deleting cloud instance %s of group %s: %w", i.ID, i.CloudInstanceGroup.HumanName, err)
return fmt.Errorf("deleting cloud instance %s of group %s: %w", i.ID, i.CloudInstanceGroup.HumanName, err)
}
err = s.DeleteServer(server.Server)
if err != nil {
return fmt.Errorf("error deleting cloud instance %s of group %s: %w", i.ID, i.CloudInstanceGroup.HumanName, err)
return fmt.Errorf("deleting cloud instance %s of group %s: %w", i.ID, i.CloudInstanceGroup.HumanName, err)
}
return nil
}
func (s *scwCloudImplementation) DeregisterInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Infof("Scaleway DeregisterInstance is not implemented yet")
return fmt.Errorf("DeregisterInstance is not implemented yet for Scaleway")
server, err := s.instanceAPI.GetServer(&instance.GetServerRequest{
Zone: s.zone,
ServerID: i.ID,
})
if err != nil {
return fmt.Errorf("deregistering cloud instance %s of group %q: %w", i.ID, i.CloudInstanceGroup.HumanName, err)
}
// We remove the instance's IP from load-balancers
lbs, err := s.GetClusterLoadBalancers(s.ClusterName(server.Server.Tags))
if err != nil {
return fmt.Errorf("deregistering cloud instance %s of group %q: %w", i.ID, i.CloudInstanceGroup.HumanName, err)
}
for _, loadBalancer := range lbs {
backEnds, err := s.lbAPI.ListBackends(&lb.ZonedAPIListBackendsRequest{
Zone: s.zone,
LBID: loadBalancer.ID,
}, scw.WithAllPages())
if err != nil {
return fmt.Errorf("deregistering cloud instance %s of group %q: listing load-balancer's back-ends for instance creation: %w", i.ID, i.CloudInstanceGroup.HumanName, err)
}
for _, backEnd := range backEnds.Backends {
for _, serverIP := range backEnd.Pool {
if serverIP == fi.ValueOf(server.Server.PrivateIP) {
_, err := s.lbAPI.RemoveBackendServers(&lb.ZonedAPIRemoveBackendServersRequest{
Zone: s.zone,
BackendID: backEnd.ID,
ServerIP: []string{serverIP},
})
if err != nil {
return fmt.Errorf("deregistering cloud instance %s of group %q: removing IP from lb: %w", i.ID, i.CloudInstanceGroup.HumanName, err)
}
}
}
}
}
return nil
}
func (s *scwCloudImplementation) DetachInstance(i *cloudinstances.CloudInstance) error {
@ -227,8 +273,31 @@ func (s *scwCloudImplementation) FindVPCInfo(id string) (*fi.VPCInfo, error) {
}
func (s *scwCloudImplementation) GetApiIngressStatus(cluster *kops.Cluster) ([]fi.ApiIngressStatus, error) {
klog.V(8).Info("Scaleway clusters don't have load-balancers yet so GetApiIngressStatus is not implemented")
return nil, nil
var ingresses []fi.ApiIngressStatus
name := "api." + cluster.Name
responseLoadBalancers, err := s.lbAPI.ListLBs(&lb.ZonedAPIListLBsRequest{
Zone: s.zone,
Name: &name,
}, scw.WithAllPages())
if err != nil {
return nil, fmt.Errorf("finding load-balancers: %w", err)
}
if len(responseLoadBalancers.LBs) == 0 {
klog.V(8).Infof("Could not find any load-balancers for cluster %s", cluster.Name)
return nil, nil
}
if len(responseLoadBalancers.LBs) > 1 {
klog.V(4).Infof("More than 1 load-balancer with the name %s was found", name)
}
for _, loadBalancer := range responseLoadBalancers.LBs {
for _, lbIP := range loadBalancer.IP {
ingresses = append(ingresses, fi.ApiIngressStatus{IP: lbIP.IPAddress})
}
}
return ingresses, nil
}
func (s *scwCloudImplementation) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
@ -311,6 +380,18 @@ func buildCloudGroup(ig *kops.InstanceGroup, sg []*instance.Server, nodeMap map[
return cloudInstanceGroup, nil
}
func (s *scwCloudImplementation) GetClusterLoadBalancers(clusterName string) ([]*lb.LB, error) {
loadBalancerName := "api." + clusterName
lbs, err := s.lbAPI.ListLBs(&lb.ZonedAPIListLBsRequest{
Zone: s.zone,
Name: &loadBalancerName,
}, scw.WithAllPages())
if err != nil {
return nil, fmt.Errorf("listing cluster load-balancers: %w", err)
}
return lbs.LBs, nil
}
func (s *scwCloudImplementation) GetClusterServers(clusterName string, serverName *string) ([]*instance.Server, error) {
request := &instance.ListServersRequest{
Zone: s.zone,
@ -352,6 +433,45 @@ func (s *scwCloudImplementation) GetClusterVolumes(clusterName string) ([]*insta
return volumes.Volumes, nil
}
func (s *scwCloudImplementation) DeleteLoadBalancer(loadBalancer *lb.LB) error {
ipsToRelease := loadBalancer.IP
// We delete the load-balancer once it's in a stable state
_, err := s.lbAPI.WaitForLb(&lb.ZonedAPIWaitForLBRequest{
LBID: loadBalancer.ID,
Zone: s.zone,
})
if err != nil {
return fmt.Errorf("waiting for load-balancer: %w", err)
}
err = s.lbAPI.DeleteLB(&lb.ZonedAPIDeleteLBRequest{
Zone: s.zone,
LBID: loadBalancer.ID,
})
if err != nil {
return fmt.Errorf("deleting load-balancer %s: %w", loadBalancer.ID, err)
}
// We wait for the load-balancer to be deleted, then we detach its IPs
_, err = s.lbAPI.WaitForLb(&lb.ZonedAPIWaitForLBRequest{
LBID: loadBalancer.ID,
Zone: s.zone,
})
if !is404Error(err) {
return fmt.Errorf("waiting for load-balancer %s after deletion: %w", loadBalancer.ID, err)
}
for _, ip := range ipsToRelease {
err := s.lbAPI.ReleaseIP(&lb.ZonedAPIReleaseIPRequest{
Zone: s.zone,
IPID: ip.ID,
})
if err != nil {
return fmt.Errorf("deleting load-balancer IP: %w", err)
}
}
return nil
}
func (s *scwCloudImplementation) DeleteServer(server *instance.Server) error {
srv, err := s.instanceAPI.GetServer(&instance.GetServerRequest{
Zone: s.zone,

View File

@ -21,6 +21,7 @@ import (
"fmt"
"github.com/scaleway/scaleway-sdk-go/api/instance/v1"
"github.com/scaleway/scaleway-sdk-go/api/lb/v1"
"github.com/scaleway/scaleway-sdk-go/scw"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/scaleway"
@ -37,7 +38,9 @@ type Instance struct {
Image *string
Tags []string
Count int
UserData *fi.Resource
UserData *fi.Resource
LoadBalancer *LoadBalancer
}
var _ fi.CloudupTask = &Instance{}
@ -118,6 +121,7 @@ func (_ *Instance) RenderScw(c *fi.CloudupContext, actual, expected, changes *In
cloud := c.T.Cloud.(scaleway.ScwCloud)
instanceService := cloud.InstanceService()
zone := scw.Zone(fi.ValueOf(expected.Zone))
controlPlanePrivateIPs := []string(nil)
userData, err := fi.ResourceAsBytes(*expected.UserData)
if err != nil {
@ -185,6 +189,20 @@ func (_ *Instance) RenderScw(c *fi.CloudupContext, actual, expected, changes *In
if err != nil {
return fmt.Errorf("error waiting for instance %s of group %q: %w", srv.Server.ID, fi.ValueOf(expected.Name), err)
}
// If instance has control-plane role, we add its private IP to the list to add it to the lb's backend
if fi.ValueOf(expected.Role) == scaleway.TagRoleControlPlane {
// We update the server's infos (to get its IP)
server, err := instanceService.GetServer(&instance.GetServerRequest{
Zone: zone,
ServerID: srv.Server.ID,
})
if err != nil {
return fmt.Errorf("getting server %s: %s", srv.Server.ID, err)
}
controlPlanePrivateIPs = append(controlPlanePrivateIPs, *server.Server.PrivateIP)
}
}
// If newInstanceCount < 0, we need to delete instances of this group
@ -197,6 +215,11 @@ func (_ *Instance) RenderScw(c *fi.CloudupContext, actual, expected, changes *In
for i := 0; i > newInstanceCount; i-- {
toDelete := igInstances[i*-1]
if fi.ValueOf(actual.Role) == scaleway.TagRoleControlPlane {
controlPlanePrivateIPs = append(controlPlanePrivateIPs, *toDelete.PrivateIP)
}
err = cloud.DeleteServer(toDelete)
if err != nil {
return fmt.Errorf("error deleting instance of group %s: %w", toDelete.Name, err)
@ -204,5 +227,63 @@ func (_ *Instance) RenderScw(c *fi.CloudupContext, actual, expected, changes *In
}
}
// If IG is control-plane, we need to update the load-balancer's back-end
if len(controlPlanePrivateIPs) > 0 {
lbService := cloud.LBService()
zone := scw.Zone(cloud.Zone())
lbs, err := cloud.GetClusterLoadBalancers(cloud.ClusterName(expected.Tags))
if err != nil {
return fmt.Errorf("listing load-balancers for instance creation: %w", err)
}
for _, loadBalancer := range lbs {
backEnds, err := lbService.ListBackends(&lb.ZonedAPIListBackendsRequest{
Zone: zone,
LBID: loadBalancer.ID,
})
if err != nil {
return fmt.Errorf("listing load-balancer's back-ends for instance creation: %w", err)
}
if backEnds.TotalCount > 1 {
return fmt.Errorf("cannot have multiple back-ends for load-balancer %s", loadBalancer.Name)
} else if backEnds.TotalCount < 1 {
return fmt.Errorf("load-balancer %s should have 1 back-end, got 0", loadBalancer.Name)
}
backEnd := backEnds.Backends[0]
// If we are adding instances, we also need to add them to the load-balancer's backend
if newInstanceCount > 0 {
_, err = lbService.AddBackendServers(&lb.ZonedAPIAddBackendServersRequest{
Zone: zone,
BackendID: backEnd.ID,
ServerIP: controlPlanePrivateIPs,
})
if err != nil {
return fmt.Errorf("adding servers' IPs to load-balancer's back-end: %w", err)
}
} else {
// If we are deleting instances, we also need to delete them from the load-balancer's backend
_, err = lbService.RemoveBackendServers(&lb.ZonedAPIRemoveBackendServersRequest{
Zone: zone,
BackendID: backEnd.ID,
ServerIP: controlPlanePrivateIPs,
})
if err != nil {
return fmt.Errorf("removing servers' IPs from load-balancer's back-end: %w", err)
}
}
_, err = lbService.WaitForLb(&lb.ZonedAPIWaitForLBRequest{
LBID: loadBalancer.ID,
Zone: zone,
})
if err != nil {
return fmt.Errorf("waiting for load-balancer %s: %w", loadBalancer.ID, err)
}
}
}
return nil
}

View File

@ -0,0 +1,165 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scalewaytasks
import (
"fmt"
"github.com/scaleway/scaleway-sdk-go/api/lb/v1"
"github.com/scaleway/scaleway-sdk-go/scw"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/scaleway"
)
// +kops:fitask
type LBBackend struct {
Name *string
Lifecycle fi.Lifecycle
ID *string
Zone *string
ForwardProtocol *string
ForwardPort *int32
ForwardPortAlgorithm *string
StickySessions *string
ProxyProtocol *string
LoadBalancer *LoadBalancer
}
var _ fi.CloudupTask = &LBBackend{}
var _ fi.CompareWithID = &LBBackend{}
func (l *LBBackend) CompareWithID() *string {
return l.ID
}
func (l *LBBackend) Find(context *fi.CloudupContext) (*LBBackend, error) {
cloud := context.T.Cloud.(scaleway.ScwCloud)
lbService := cloud.LBService()
backendResponse, err := lbService.ListBackends(&lb.ZonedAPIListBackendsRequest{
Zone: scw.Zone(cloud.Zone()),
LBID: fi.ValueOf(l.LoadBalancer.LBID),
Name: l.Name,
})
if err != nil {
return nil, fmt.Errorf("listing back-ends for load-balancer %s: %w", fi.ValueOf(l.LoadBalancer.LBID), err)
}
if backendResponse.TotalCount != 1 {
return nil, nil
}
backend := backendResponse.Backends[0]
return &LBBackend{
Name: fi.PtrTo(backend.Name),
Lifecycle: l.Lifecycle,
ID: fi.PtrTo(backend.ID),
Zone: fi.PtrTo(string(backend.LB.Zone)),
ForwardProtocol: fi.PtrTo(string(backend.ForwardProtocol)),
ForwardPort: fi.PtrTo(backend.ForwardPort),
ForwardPortAlgorithm: fi.PtrTo(string(backend.ForwardPortAlgorithm)),
StickySessions: fi.PtrTo(string(backend.StickySessions)),
ProxyProtocol: fi.PtrTo(string(backend.ProxyProtocol)),
LoadBalancer: &LoadBalancer{
Name: fi.PtrTo(backend.LB.Name),
},
}, nil
}
func (l *LBBackend) Run(context *fi.CloudupContext) error {
return fi.CloudupDefaultDeltaRunMethod(l, context)
}
func (_ *LBBackend) CheckChanges(actual, expected, changes *LBBackend) error {
if actual != nil {
if changes.Name != nil {
return fi.CannotChangeField("Name")
}
if changes.ID != nil {
return fi.CannotChangeField("ID")
}
if changes.Zone != nil {
return fi.CannotChangeField("Zone")
}
} else {
if expected.Name == nil {
return fi.RequiredField("Name")
}
if expected.Zone == nil {
return fi.RequiredField("Zone")
}
}
return nil
}
func (l *LBBackend) RenderScw(t *scaleway.ScwAPITarget, actual, expected, changes *LBBackend) error {
lbService := t.Cloud.LBService()
if actual != nil {
_, err := lbService.UpdateBackend(&lb.ZonedAPIUpdateBackendRequest{
Zone: scw.Zone(fi.ValueOf(actual.Zone)),
BackendID: fi.ValueOf(actual.ID),
Name: fi.ValueOf(actual.Name),
ForwardProtocol: lb.Protocol(fi.ValueOf(expected.ForwardProtocol)),
ForwardPort: fi.ValueOf(expected.ForwardPort),
ForwardPortAlgorithm: lb.ForwardPortAlgorithm(fi.ValueOf(expected.ForwardPortAlgorithm)),
StickySessions: lb.StickySessionsType(fi.ValueOf(expected.StickySessions)),
ProxyProtocol: lb.ProxyProtocol(fi.ValueOf(expected.ProxyProtocol)),
})
if err != nil {
return fmt.Errorf("updating back-end for load-balancer %s: %w", fi.ValueOf(actual.LoadBalancer.Name), err)
}
} else {
backendCreated, err := lbService.CreateBackend(&lb.ZonedAPICreateBackendRequest{
Zone: scw.Zone(fi.ValueOf(expected.Zone)),
LBID: fi.ValueOf(expected.LoadBalancer.LBID), // try expected instead of l
Name: fi.ValueOf(expected.Name),
ForwardProtocol: lb.Protocol(fi.ValueOf(expected.ForwardProtocol)),
ForwardPort: fi.ValueOf(expected.ForwardPort),
ForwardPortAlgorithm: lb.ForwardPortAlgorithm(fi.ValueOf(expected.ForwardPortAlgorithm)),
StickySessions: lb.StickySessionsType(fi.ValueOf(expected.StickySessions)),
HealthCheck: &lb.HealthCheck{
CheckMaxRetries: 5,
TCPConfig: &lb.HealthCheckTCPConfig{},
Port: fi.ValueOf(expected.ForwardPort),
CheckTimeout: scw.TimeDurationPtr(3000),
CheckDelay: scw.TimeDurationPtr(1001),
},
ProxyProtocol: lb.ProxyProtocol(fi.ValueOf(expected.ProxyProtocol)),
})
if err != nil {
return fmt.Errorf("creating back-end for load-balancer %s: %w", fi.ValueOf(expected.LoadBalancer.Name), err)
}
expected.ID = &backendCreated.ID
}
_, err := lbService.WaitForLb(&lb.ZonedAPIWaitForLBRequest{
LBID: fi.ValueOf(expected.LoadBalancer.LBID),
Zone: scw.Zone(fi.ValueOf(expected.Zone)),
})
if err != nil {
return fmt.Errorf("waiting for load-balancer %s: %w", fi.ValueOf(expected.LoadBalancer.Name), err)
}
return nil
}

View File

@ -0,0 +1,149 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scalewaytasks
import (
"fmt"
"github.com/scaleway/scaleway-sdk-go/api/lb/v1"
"github.com/scaleway/scaleway-sdk-go/scw"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/scaleway"
)
// +kops:fitask
type LBFrontend struct {
Name *string
Lifecycle fi.Lifecycle
ID *string
Zone *string
InboundPort *int32
LoadBalancer *LoadBalancer
LBBackend *LBBackend
}
var _ fi.CloudupTask = &LBFrontend{}
var _ fi.CompareWithID = &LBFrontend{}
func (l *LBFrontend) CompareWithID() *string {
return l.ID
}
func (l *LBFrontend) Find(context *fi.CloudupContext) (*LBFrontend, error) {
cloud := context.T.Cloud.(scaleway.ScwCloud)
lbService := cloud.LBService()
frontendResponse, err := lbService.ListFrontends(&lb.ZonedAPIListFrontendsRequest{
Zone: scw.Zone(cloud.Zone()),
LBID: fi.ValueOf(l.LoadBalancer.LBID),
Name: l.Name,
})
if err != nil {
return nil, fmt.Errorf("listing front-ends for load-balancer %s: %w", fi.ValueOf(l.LoadBalancer.LBID), err)
}
if frontendResponse.TotalCount != 1 {
return nil, nil
}
frontend := frontendResponse.Frontends[0]
return &LBFrontend{
Name: fi.PtrTo(frontend.Name),
Lifecycle: l.Lifecycle,
ID: fi.PtrTo(frontend.ID),
Zone: fi.PtrTo(string(frontend.LB.Zone)),
InboundPort: fi.PtrTo(frontend.InboundPort),
LoadBalancer: &LoadBalancer{
Name: fi.PtrTo(frontend.LB.Name),
},
LBBackend: &LBBackend{
Name: fi.PtrTo(frontend.Backend.Name),
ID: fi.PtrTo(frontend.Backend.ID),
},
}, nil
}
func (l *LBFrontend) Run(context *fi.CloudupContext) error {
return fi.CloudupDefaultDeltaRunMethod(l, context)
}
func (_ *LBFrontend) CheckChanges(actual, expected, changes *LBFrontend) error {
if actual != nil {
if changes.Name != nil {
return fi.CannotChangeField("Name")
}
if changes.ID != nil {
return fi.CannotChangeField("ID")
}
if changes.Zone != nil {
return fi.CannotChangeField("Zone")
}
} else {
if expected.Name == nil {
return fi.RequiredField("Name")
}
if expected.Zone == nil {
return fi.RequiredField("Zone")
}
}
return nil
}
func (l *LBFrontend) RenderScw(t *scaleway.ScwAPITarget, actual, expected, changes *LBFrontend) error {
lbService := t.Cloud.LBService()
if actual != nil {
_, err := lbService.UpdateFrontend(&lb.ZonedAPIUpdateFrontendRequest{
Zone: scw.Zone(fi.ValueOf(actual.Zone)),
FrontendID: fi.ValueOf(actual.ID),
Name: fi.ValueOf(actual.Name),
InboundPort: fi.ValueOf(expected.InboundPort),
BackendID: fi.ValueOf(actual.LBBackend.ID),
})
if err != nil {
return fmt.Errorf("updating front-end for load-balancer %s: %w", fi.ValueOf(actual.LoadBalancer.Name), err)
}
} else {
frontendCreated, err := lbService.CreateFrontend(&lb.ZonedAPICreateFrontendRequest{
Zone: scw.Zone(fi.ValueOf(expected.Zone)),
LBID: fi.ValueOf(expected.LoadBalancer.LBID), // try expected instead of l
Name: fi.ValueOf(expected.Name),
InboundPort: fi.ValueOf(expected.InboundPort),
BackendID: fi.ValueOf(expected.LBBackend.ID), // try expected instead of l
})
if err != nil {
return fmt.Errorf("creating front-end for load-balancer %s: %w", fi.ValueOf(expected.LoadBalancer.Name), err)
}
expected.ID = &frontendCreated.ID
}
_, err := lbService.WaitForLb(&lb.ZonedAPIWaitForLBRequest{
LBID: fi.ValueOf(expected.LoadBalancer.LBID),
Zone: scw.Zone(fi.ValueOf(expected.Zone)),
})
if err != nil {
return fmt.Errorf("waiting for load-balancer %s: %w", fi.ValueOf(expected.LoadBalancer.Name), err)
}
return nil
}

View File

@ -0,0 +1,52 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by fitask. DO NOT EDIT.
package scalewaytasks
import (
"k8s.io/kops/upup/pkg/fi"
)
// LBBackend
var _ fi.HasLifecycle = &LBBackend{}
// GetLifecycle returns the Lifecycle of the object, implementing fi.HasLifecycle
func (o *LBBackend) GetLifecycle() fi.Lifecycle {
return o.Lifecycle
}
// SetLifecycle sets the Lifecycle of the object, implementing fi.SetLifecycle
func (o *LBBackend) SetLifecycle(lifecycle fi.Lifecycle) {
o.Lifecycle = lifecycle
}
var _ fi.HasName = &LBBackend{}
// GetName returns the Name of the object, implementing fi.HasName
func (o *LBBackend) GetName() *string {
return o.Name
}
// String is the stringer function for the task, producing readable output using fi.TaskAsString
func (o *LBBackend) String() string {
return fi.CloudupTaskAsString(o)
}

View File

@ -0,0 +1,52 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by fitask. DO NOT EDIT.
package scalewaytasks
import (
"k8s.io/kops/upup/pkg/fi"
)
// LBFrontend
var _ fi.HasLifecycle = &LBFrontend{}
// GetLifecycle returns the Lifecycle of the object, implementing fi.HasLifecycle
func (o *LBFrontend) GetLifecycle() fi.Lifecycle {
return o.Lifecycle
}
// SetLifecycle sets the Lifecycle of the object, implementing fi.SetLifecycle
func (o *LBFrontend) SetLifecycle(lifecycle fi.Lifecycle) {
o.Lifecycle = lifecycle
}
var _ fi.HasName = &LBFrontend{}
// GetName returns the Name of the object, implementing fi.HasName
func (o *LBFrontend) GetName() *string {
return o.Name
}
// String is the stringer function for the task, producing readable output using fi.TaskAsString
func (o *LBFrontend) String() string {
return fi.CloudupTaskAsString(o)
}

View File

@ -0,0 +1,193 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scalewaytasks
import (
"fmt"
"k8s.io/klog/v2"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/scaleway"
"github.com/scaleway/scaleway-sdk-go/api/lb/v1"
"github.com/scaleway/scaleway-sdk-go/scw"
)
// +kops:fitask
type LoadBalancer struct {
Name *string
Lifecycle fi.Lifecycle
Zone *string
LBID *string
LBAddresses []string
Tags []string
Description string
SslCompatibilityLevel string
ForAPIServer bool
}
var _ fi.CompareWithID = &LoadBalancer{}
var _ fi.HasAddress = &LoadBalancer{}
func (l *LoadBalancer) CompareWithID() *string {
return l.LBID
}
func (l *LoadBalancer) IsForAPIServer() bool {
return l.ForAPIServer
}
func (l *LoadBalancer) Find(context *fi.CloudupContext) (*LoadBalancer, error) {
cloud := context.T.Cloud.(scaleway.ScwCloud)
lbService := cloud.LBService()
lbResponse, err := lbService.ListLBs(&lb.ZonedAPIListLBsRequest{
Zone: scw.Zone(cloud.Zone()),
Name: l.Name,
}, scw.WithAllPages())
if err != nil {
return nil, fmt.Errorf("getting load-balancer %s: %w", fi.ValueOf(l.LBID), err)
}
if lbResponse.TotalCount != 1 {
return nil, nil
}
loadBalancer := lbResponse.LBs[0]
lbIPs := []string(nil)
for _, IP := range loadBalancer.IP {
lbIPs = append(lbIPs, IP.IPAddress)
}
return &LoadBalancer{
Name: fi.PtrTo(loadBalancer.Name),
LBID: fi.PtrTo(loadBalancer.ID),
Zone: fi.PtrTo(string(loadBalancer.Zone)),
LBAddresses: lbIPs,
Tags: loadBalancer.Tags,
Lifecycle: l.Lifecycle,
ForAPIServer: l.ForAPIServer,
}, nil
}
func (l *LoadBalancer) FindAddresses(context *fi.CloudupContext) ([]string, error) {
cloud := context.T.Cloud.(scaleway.ScwCloud)
lbService := cloud.LBService()
if l.LBID == nil {
return nil, nil
}
loadBalancer, err := lbService.GetLB(&lb.ZonedAPIGetLBRequest{
Zone: scw.Zone(cloud.Zone()),
LBID: fi.ValueOf(l.LBID),
})
if err != nil {
return nil, err
}
addresses := []string(nil)
for _, address := range loadBalancer.IP {
addresses = append(addresses, address.IPAddress)
}
return addresses, nil
}
func (l *LoadBalancer) Run(context *fi.CloudupContext) error {
return fi.CloudupDefaultDeltaRunMethod(l, context)
}
func (_ *LoadBalancer) CheckChanges(actual, expected, changes *LoadBalancer) error {
if actual != nil {
if changes.Name != nil {
return fi.CannotChangeField("Name")
}
if changes.LBID != nil {
return fi.CannotChangeField("ID")
}
if changes.Zone != nil {
return fi.CannotChangeField("Zone")
}
} else {
if expected.Name == nil {
return fi.RequiredField("Name")
}
if expected.Zone == nil {
return fi.RequiredField("Zone")
}
}
return nil
}
func (l *LoadBalancer) RenderScw(t *scaleway.ScwAPITarget, actual, expected, changes *LoadBalancer) error {
lbService := t.Cloud.LBService()
if actual != nil {
klog.Infof("Updating existing load-balancer with name %q", expected.Name)
// We update the tags
if changes != nil || len(actual.Tags) != len(expected.Tags) {
_, err := lbService.UpdateLB(&lb.ZonedAPIUpdateLBRequest{
Zone: scw.Zone(fi.ValueOf(actual.Zone)),
LBID: fi.ValueOf(actual.LBID),
Name: fi.ValueOf(actual.Name),
Description: expected.Description,
SslCompatibilityLevel: lb.SSLCompatibilityLevel(expected.SslCompatibilityLevel),
Tags: expected.Tags,
})
if err != nil {
return fmt.Errorf("updatings tags for load-balancer %q: %w", fi.ValueOf(expected.Name), err)
}
}
expected.LBID = actual.LBID
expected.LBAddresses = actual.LBAddresses
} else {
klog.Infof("Creating new load-balancer with name %q", expected.Name)
lbCreated, err := lbService.CreateLB(&lb.ZonedAPICreateLBRequest{
Zone: scw.Zone(fi.ValueOf(expected.Zone)),
Name: fi.ValueOf(expected.Name),
Tags: expected.Tags,
})
if err != nil {
return fmt.Errorf("creating load-balancer: %w", err)
}
_, err = lbService.WaitForLb(&lb.ZonedAPIWaitForLBRequest{
LBID: lbCreated.ID,
Zone: scw.Zone(fi.ValueOf(expected.Zone)),
})
if err != nil {
return fmt.Errorf("waiting for load-balancer %s: %w", lbCreated.ID, err)
}
lbIPs := []string(nil)
for _, ip := range lbCreated.IP {
lbIPs = append(lbIPs, ip.IPAddress)
}
expected.LBID = &lbCreated.ID
expected.LBAddresses = lbIPs
}
return nil
}

View File

@ -0,0 +1,52 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by fitask. DO NOT EDIT.
package scalewaytasks
import (
"k8s.io/kops/upup/pkg/fi"
)
// LoadBalancer
var _ fi.HasLifecycle = &LoadBalancer{}
// GetLifecycle returns the Lifecycle of the object, implementing fi.HasLifecycle
func (o *LoadBalancer) GetLifecycle() fi.Lifecycle {
return o.Lifecycle
}
// SetLifecycle sets the Lifecycle of the object, implementing fi.SetLifecycle
func (o *LoadBalancer) SetLifecycle(lifecycle fi.Lifecycle) {
o.Lifecycle = lifecycle
}
var _ fi.HasName = &LoadBalancer{}
// GetName returns the Name of the object, implementing fi.HasName
func (o *LoadBalancer) GetName() *string {
return o.Name
}
// String is the stringer function for the task, producing readable output using fi.TaskAsString
func (o *LoadBalancer) String() string {
return fi.CloudupTaskAsString(o)
}

View File

@ -177,6 +177,9 @@ func BuildCloud(cluster *kops.Cluster) (fi.Cloud, error) {
"zone": string(zone),
"region": string(region),
}
for k, v := range cluster.Spec.CloudLabels {
cloudTags[k] = v
}
scwCloud, err := scaleway.NewScwCloud(cloudTags)
if err != nil {

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,277 @@
package lb
import (
"time"
"github.com/scaleway/scaleway-sdk-go/internal/async"
"github.com/scaleway/scaleway-sdk-go/internal/errors"
"github.com/scaleway/scaleway-sdk-go/scw"
)
const (
defaultRetryInterval = 2 * time.Second
defaultTimeout = 5 * time.Minute
)
// WaitForLBRequest is used by WaitForLb method.
type WaitForLBRequest struct {
LBID string
Region scw.Region
Timeout *time.Duration
RetryInterval *time.Duration
}
// WaitForLb waits for the lb to be in a "terminal state" before returning.
// This function can be used to wait for a lb to be ready for example.
func (s *API) WaitForLb(req *WaitForLBRequest, opts ...scw.RequestOption) (*LB, error) {
return waitForLb(req.Timeout, req.RetryInterval, func() (*LB, error) {
return s.GetLB(&GetLBRequest{
Region: req.Region,
LBID: req.LBID,
}, opts...)
})
}
// ZonedAPIWaitForLBRequest is used by WaitForLb method.
type ZonedAPIWaitForLBRequest struct {
LBID string
Zone scw.Zone
Timeout *time.Duration
RetryInterval *time.Duration
}
// WaitForLb waits for the lb to be in a "terminal state" before returning.
// This function can be used to wait for a lb to be ready for example.
func (s *ZonedAPI) WaitForLb(req *ZonedAPIWaitForLBRequest, opts ...scw.RequestOption) (*LB, error) {
return waitForLb(req.Timeout, req.RetryInterval, func() (*LB, error) {
return s.GetLB(&ZonedAPIGetLBRequest{
Zone: req.Zone,
LBID: req.LBID,
}, opts...)
})
}
func waitForLb(timeout *time.Duration, retryInterval *time.Duration, getLB func() (*LB, error)) (*LB, error) {
t := defaultTimeout
if timeout != nil {
t = *timeout
}
r := defaultRetryInterval
if retryInterval != nil {
r = *retryInterval
}
terminalStatus := map[LBStatus]struct{}{
LBStatusReady: {},
LBStatusStopped: {},
LBStatusError: {},
LBStatusLocked: {},
}
lb, err := async.WaitSync(&async.WaitSyncConfig{
Get: func() (interface{}, bool, error) {
res, err := getLB()
if err != nil {
return nil, false, err
}
_, isTerminal := terminalStatus[res.Status]
return res, isTerminal, nil
},
Timeout: t,
IntervalStrategy: async.LinearIntervalStrategy(r),
})
if err != nil {
return nil, errors.Wrap(err, "waiting for lb failed")
}
return lb.(*LB), nil
}
// ZonedAPIWaitForLBInstancesRequest is used by WaitForLb method.
type ZonedAPIWaitForLBInstancesRequest struct {
LBID string
Zone scw.Zone
Timeout *time.Duration
RetryInterval *time.Duration
}
// WaitForLbInstances waits for the lb to be in a "terminal state" and the attached instances before returning.
func (s *ZonedAPI) WaitForLbInstances(req *ZonedAPIWaitForLBInstancesRequest, opts ...scw.RequestOption) (*LB, error) {
return waitForLbInstances(req.Timeout, req.RetryInterval, func() (*LB, error) {
return s.GetLB(&ZonedAPIGetLBRequest{
Zone: req.Zone,
LBID: req.LBID,
}, opts...)
})
}
func waitForLbInstances(timeout *time.Duration, retryInterval *time.Duration, getLB func() (*LB, error)) (*LB, error) {
t := defaultTimeout
if timeout != nil {
t = *timeout
}
r := defaultRetryInterval
if retryInterval != nil {
r = *retryInterval
}
terminalLBStatus := map[LBStatus]struct{}{
LBStatusReady: {},
LBStatusStopped: {},
LBStatusError: {},
LBStatusLocked: {},
}
terminalInstanceStatus := map[InstanceStatus]struct{}{
InstanceStatusReady: {},
InstanceStatusStopped: {},
InstanceStatusError: {},
InstanceStatusLocked: {},
}
lb, err := async.WaitSync(&async.WaitSyncConfig{
Get: func() (interface{}, bool, error) {
res, err := getLB()
if err != nil {
return nil, false, err
}
_, isTerminal := terminalLBStatus[res.Status]
for _, i := range res.Instances {
_, isInstanceTerminal := terminalInstanceStatus[i.Status]
if !isInstanceTerminal {
return res, isTerminal, nil
}
}
return res, isTerminal, nil
},
Timeout: t,
IntervalStrategy: async.LinearIntervalStrategy(r),
})
if err != nil {
return nil, errors.Wrap(err, "waiting for lb failed")
}
return lb.(*LB), nil
}
// ZonedAPIWaitForLBPNRequest is used by WaitForLBPN method.
type ZonedAPIWaitForLBPNRequest struct {
LBID string
Zone scw.Zone
Timeout *time.Duration
RetryInterval *time.Duration
}
func waitForPNLb(timeout *time.Duration, retryInterval *time.Duration, getPNs func() ([]*PrivateNetwork, error)) ([]*PrivateNetwork, error) {
t := defaultTimeout
if timeout != nil {
t = *timeout
}
r := defaultRetryInterval
if retryInterval != nil {
r = *retryInterval
}
terminalStatus := map[PrivateNetworkStatus]struct{}{
PrivateNetworkStatusReady: {},
PrivateNetworkStatusError: {},
}
pn, err := async.WaitSync(&async.WaitSyncConfig{
Get: func() (interface{}, bool, error) {
pns, err := getPNs()
for _, pn := range pns {
if err != nil {
return nil, false, err
}
//wait at the first not terminal state
_, isTerminal := terminalStatus[pn.Status]
if !isTerminal {
return pns, isTerminal, nil
}
}
return pns, true, nil
},
Timeout: t,
IntervalStrategy: async.LinearIntervalStrategy(r),
})
if err != nil {
return nil, errors.Wrap(err, "waiting for attachment failed")
}
return pn.([]*PrivateNetwork), nil
}
// WaitForLBPN waits for the private_network attached status on a load balancer
// to be in a "terminal state" before returning.
// This function can be used to wait for an attached private_network to be ready for example.
func (s *ZonedAPI) WaitForLBPN(req *ZonedAPIWaitForLBPNRequest, opts ...scw.RequestOption) ([]*PrivateNetwork, error) {
return waitForPNLb(req.Timeout, req.RetryInterval, func() ([]*PrivateNetwork, error) {
lbPNs, err := s.ListLBPrivateNetworks(&ZonedAPIListLBPrivateNetworksRequest{
Zone: req.Zone,
LBID: req.LBID,
}, opts...)
if err != nil {
return nil, err
}
return lbPNs.PrivateNetwork, nil
})
}
// ZonedAPIWaitForLBCertificateRequest is used by WaitForLbCertificate method.
type ZonedAPIWaitForLBCertificateRequest struct {
CertID string
Zone scw.Zone
Timeout *time.Duration
RetryInterval *time.Duration
}
// WaitForLBCertificate waits for the certificate to be in a "terminal state"
func (s *ZonedAPI) WaitForLBCertificate(req *ZonedAPIWaitForLBCertificateRequest, opts ...scw.RequestOption) (*Certificate, error) {
return waitForLBCertificate(req.Timeout, req.RetryInterval, func() (*Certificate, error) {
return s.GetCertificate(&ZonedAPIGetCertificateRequest{
Zone: req.Zone,
CertificateID: req.CertID,
}, opts...)
})
}
func waitForLBCertificate(timeout *time.Duration, retryInterval *time.Duration, getCertificate func() (*Certificate, error)) (*Certificate, error) {
t := defaultTimeout
if timeout != nil {
t = *timeout
}
r := defaultRetryInterval
if retryInterval != nil {
r = *retryInterval
}
terminalStatus := map[CertificateStatus]struct{}{
CertificateStatusError: {},
CertificateStatusReady: {},
}
crt, err := async.WaitSync(&async.WaitSyncConfig{
Get: func() (interface{}, bool, error) {
res, err := getCertificate()
if err != nil {
return nil, false, err
}
_, isTerminal := terminalStatus[res.Status]
return res, isTerminal, nil
},
Timeout: t,
IntervalStrategy: async.LinearIntervalStrategy(r),
})
if err != nil {
return nil, errors.Wrap(err, "waiting for lb failed")
}
return crt.(*Certificate), nil
}

1
vendor/modules.txt generated vendored
View File

@ -734,6 +734,7 @@ github.com/sahilm/fuzzy
## explicit; go 1.17
github.com/scaleway/scaleway-sdk-go/api/iam/v1alpha1
github.com/scaleway/scaleway-sdk-go/api/instance/v1
github.com/scaleway/scaleway-sdk-go/api/lb/v1
github.com/scaleway/scaleway-sdk-go/api/marketplace/v1
github.com/scaleway/scaleway-sdk-go/internal/async
github.com/scaleway/scaleway-sdk-go/internal/auth