Rework Scaleway cloudprovider

This commit is contained in:
Pablo RUTH 2025-09-18 13:53:40 +02:00
parent 64a815fe45
commit b6d1509b41
4 changed files with 185 additions and 617 deletions

View File

@ -19,9 +19,7 @@ package scaleway
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"time"
@ -47,13 +45,17 @@ type scalewayCloudProvider struct {
// ClusterID is the cluster id where the Autoscaler is running.
clusterID string
// nodeGroups is an abstraction around the Pool object returned by the API
nodeGroups []*NodeGroup
nodeGroups map[string]*NodeGroup
// lastRefresh is the last time the nodes and node groups were refreshed from the API
lastRefresh time.Time
// refreshInterval is the minimum duration between refreshes
refreshInterval time.Duration
resourceLimiter *cloudprovider.ResourceLimiter
}
func readConf(config *scalewaygo.Config, configFile io.Reader) error {
body, err := ioutil.ReadAll(configFile)
body, err := io.ReadAll(configFile)
if err != nil {
return err
}
@ -92,21 +94,18 @@ func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl
klog.Fatalf("failed to create scaleway cloud provider: %v", err)
}
klog.V(4).Infof("Scaleway Cloud Provider built; ClusterId=%s,SecretKey=%s-***,Region=%s,ApiURL=%s", cfg.ClusterID, client.Token()[:8], client.Region(), client.ApiURL())
klog.V(4).Infof("Scaleway Cloud Provider built; ClusterId=%s,Region=%s,ApiURL=%s", cfg.ClusterID, client.Region(), client.ApiURL())
return &scalewayCloudProvider{
client: client,
clusterID: cfg.ClusterID,
resourceLimiter: rl,
refreshInterval: 60 * time.Second,
}
}
// BuildScaleway returns CloudProvider implementation for Scaleway.
func BuildScaleway(
opts config.AutoscalingOptions,
do cloudprovider.NodeGroupDiscoveryOptions,
rl *cloudprovider.ResourceLimiter,
) cloudprovider.CloudProvider {
func BuildScaleway(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
var configFile io.Reader
if opts.CloudConfig != "" {
@ -123,6 +122,7 @@ func BuildScaleway(
}()
}
}
return newScalewayCloudProvider(configFile, opts.UserAgent, rl)
}
@ -134,23 +134,14 @@ func (*scalewayCloudProvider) Name() string {
// NodeGroups returns all node groups configured for this cluster.
// critical endpoint, make it fast
func (scw *scalewayCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
klog.V(4).Info("NodeGroups,ClusterID=", scw.clusterID)
nodeGroups := make([]cloudprovider.NodeGroup, len(scw.nodeGroups))
for i, ng := range scw.nodeGroups {
nodeGroups[i] = ng
}
return nodeGroups
}
func (scw *scalewayCloudProvider) nodeGroupForNode(node *apiv1.Node) (*NodeGroup, error) {
var nodeGroups []cloudprovider.NodeGroup
for _, ng := range scw.nodeGroups {
if _, ok := ng.nodes[node.Spec.ProviderID]; ok {
return ng, nil
}
nodeGroups = append(nodeGroups, ng)
}
return nil, nil
return nodeGroups
}
// NodeGroupForNode returns the node group for the given node, nil if the node
@ -163,6 +154,16 @@ func (scw *scalewayCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovi
return scw.nodeGroupForNode(node)
}
func (scw *scalewayCloudProvider) nodeGroupForNode(node *apiv1.Node) (*NodeGroup, error) {
for _, ng := range scw.nodeGroups {
if _, ok := ng.nodes[node.Spec.ProviderID]; ok {
return ng, nil
}
}
return nil, nil
}
// HasInstance returns whether a given node has a corresponding instance in this cloud provider
func (scw *scalewayCloudProvider) HasInstance(node *apiv1.Node) (bool, error) {
return true, cloudprovider.ErrNotImplemented
@ -196,6 +197,8 @@ func (scw *scalewayCloudProvider) GetAvailableMachineTypes() ([]string, error) {
return []string{}, nil
}
// NewNodeGroup creates a new node group based on the provided definition.
// Not implemented
func (scw *scalewayCloudProvider) NewNodeGroup(
machineType string,
labels map[string]string,
@ -244,36 +247,55 @@ func (scw *scalewayCloudProvider) Cleanup() error {
func (scw *scalewayCloudProvider) Refresh() error {
klog.V(4).Info("Refresh,ClusterID=", scw.clusterID)
ctx := context.Background()
resp, err := scw.client.ListPools(ctx, &scalewaygo.ListPoolsRequest{ClusterID: scw.clusterID})
// FIX for first time call where lastRefresh is zero
if time.Since(scw.lastRefresh) < scw.refreshInterval {
klog.V(4).Infof("Refresh,ClusterID=%s,skipping refresh, last refresh was %s ago", scw.clusterID, time.Since(scw.lastRefresh))
return nil
}
pools, err := scw.client.ListPools(context.Background(), scw.clusterID)
if err != nil {
klog.Errorf("Refresh,failed to list pools for cluster %s: %s", scw.clusterID, err)
return err
}
var ng []*NodeGroup
nodes, err := scw.client.ListNodes(context.Background(), scw.clusterID)
if err != nil {
klog.Errorf("Refresh,failed to list nodes for cluster %s: %s", scw.clusterID, err)
return err
}
for _, p := range resp.Pools {
if p.Pool.Autoscaling == false {
// Build NodeGroups
nodeGroups := make(map[string]*NodeGroup)
for _, pool := range pools {
if !pool.Pool.Autoscaling {
continue
}
nodes, err := nodesFromPool(scw.client, p.Pool)
if err != nil {
return fmt.Errorf("Refresh,failed to list nodes for pool %s: %w", p.Pool.ID, err)
}
ng = append(ng, &NodeGroup{
nodeGroup := &NodeGroup{
Client: scw.client,
nodes: nodes,
specs: &p.Specs,
p: p.Pool,
})
}
klog.V(4).Infof("Refresh,ClusterID=%s,%d pools found", scw.clusterID, len(ng))
nodes: make(map[string]*scalewaygo.Node),
specs: pool.Specs,
pool: pool.Pool,
}
scw.nodeGroups = ng
nodeGroups[pool.Pool.ID] = nodeGroup
}
// Assign nodes to NodeGroups
for _, node := range nodes {
_, ok := nodeGroups[node.PoolID]
if !ok {
continue
}
nodeGroups[node.PoolID].nodes[node.ProviderID] = &node
}
klog.V(4).Infof("Refresh,ClusterID=%s,%d pools found", scw.clusterID, len(nodeGroups))
scw.nodeGroups = nodeGroups
scw.lastRefresh = time.Now()
return nil
}

View File

@ -18,7 +18,6 @@ package scaleway
import (
"context"
"errors"
"fmt"
"strings"
@ -29,7 +28,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/scaleway/scalewaygo"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/klog/v2"
)
@ -39,22 +37,20 @@ type NodeGroup struct {
scalewaygo.Client
nodes map[string]*scalewaygo.Node
specs *scalewaygo.GenericNodeSpecs
p *scalewaygo.Pool
specs scalewaygo.GenericNodeSpecs
pool scalewaygo.Pool
}
// MaxSize returns maximum size of the node group.
func (ng *NodeGroup) MaxSize() int {
klog.V(6).Info("MaxSize,called")
return int(ng.p.MaxSize)
return ng.pool.MaxSize
}
// MinSize returns minimum size of the node group.
func (ng *NodeGroup) MinSize() int {
klog.V(6).Info("MinSize,called")
return int(ng.p.MinSize)
return ng.pool.MinSize
}
// TargetSize returns the current target size of the node group. It is possible that the
@ -63,42 +59,31 @@ func (ng *NodeGroup) MinSize() int {
// removed nodes are deleted completely).
func (ng *NodeGroup) TargetSize() (int, error) {
klog.V(6).Info("TargetSize,called")
return int(ng.p.Size), nil
return ng.pool.Size, nil
}
// IncreaseSize increases the size of the node group. To delete a node you need
// to explicitly name it and use DeleteNode. This function should wait until
// node group size is updated.
func (ng *NodeGroup) IncreaseSize(delta int) error {
klog.V(4).Infof("IncreaseSize,ClusterID=%s,delta=%d", ng.p.ClusterID, delta)
klog.V(4).Infof("IncreaseSize,ClusterID=%s,delta=%d", ng.pool.ClusterID, delta)
if delta <= 0 {
return fmt.Errorf("delta must be strictly positive, have: %d", delta)
}
targetSize := ng.p.Size + uint32(delta)
targetSize := ng.pool.Size + delta
if targetSize > uint32(ng.MaxSize()) {
return fmt.Errorf("size increase is too large. current: %d desired: %d max: %d",
ng.p.Size, targetSize, ng.MaxSize())
if targetSize > ng.MaxSize() {
return fmt.Errorf("size increase is too large. current: %d desired: %d max: %d", ng.pool.Size, targetSize, ng.MaxSize())
}
ctx := context.Background()
pool, err := ng.UpdatePool(ctx, &scalewaygo.UpdatePoolRequest{
PoolID: ng.p.ID,
Size: &targetSize,
})
_, err := ng.UpdatePool(context.Background(), ng.pool.ID, targetSize)
if err != nil {
return err
}
if pool.Size != targetSize {
return fmt.Errorf("couldn't increase size to %d. Current size is: %d",
targetSize, pool.Size)
}
ng.p.Size = targetSize
ng.pool.Size = targetSize
return nil
}
@ -114,21 +99,18 @@ func (ng *NodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
ctx := context.Background()
klog.V(4).Info("DeleteNodes,", len(nodes), " nodes to reclaim")
for _, n := range nodes {
node, ok := ng.nodes[n.Spec.ProviderID]
if !ok {
klog.Errorf("DeleteNodes,ProviderID=%s,PoolID=%s,node marked for deletion not found in pool", n.Spec.ProviderID, ng.p.ID)
klog.Errorf("DeleteNodes,ProviderID=%s,PoolID=%s,node marked for deletion not found in pool", n.Spec.ProviderID, ng.pool.ID)
continue
}
updatedNode, err := ng.DeleteNode(ctx, &scalewaygo.DeleteNodeRequest{
NodeID: node.ID,
})
if err != nil || updatedNode.Status != scalewaygo.NodeStatusDeleting {
_, err := ng.DeleteNode(ctx, node.ID)
if err != nil {
return err
}
ng.p.Size--
ng.pool.Size--
ng.nodes[n.Spec.ProviderID].Status = scalewaygo.NodeStatusDeleting
}
@ -146,54 +128,44 @@ func (ng *NodeGroup) ForceDeleteNodes(nodes []*apiv1.Node) error {
// It is assumed that cloud provider will not delete the existing nodes when there
// is an option to just decrease the target.
func (ng *NodeGroup) DecreaseTargetSize(delta int) error {
klog.V(4).Infof("DecreaseTargetSize,ClusterID=%s,delta=%d", ng.p.ClusterID, delta)
klog.V(4).Infof("DecreaseTargetSize,ClusterID=%s,delta=%d", ng.pool.ClusterID, delta)
if delta >= 0 {
return fmt.Errorf("delta must be strictly negative, have: %d", delta)
}
targetSize := ng.p.Size + uint32(delta)
targetSize := ng.pool.Size + delta
if int(targetSize) < ng.MinSize() {
return fmt.Errorf("size decrease is too large. current: %d desired: %d min: %d",
ng.p.Size, targetSize, ng.MinSize())
return fmt.Errorf("size decrease is too large. current: %d desired: %d min: %d", ng.pool.Size, targetSize, ng.MinSize())
}
ctx := context.Background()
pool, err := ng.UpdatePool(ctx, &scalewaygo.UpdatePoolRequest{
PoolID: ng.p.ID,
Size: &targetSize,
})
_, err := ng.UpdatePool(ctx, ng.pool.ID, targetSize)
if err != nil {
return err
}
if pool.Size != targetSize {
return fmt.Errorf("couldn't decrease size to %d. Current size is: %d",
targetSize, pool.Size)
}
ng.pool.Size = targetSize
ng.p.Size = targetSize
return nil
}
// Id returns an unique identifier of the node group.
func (ng *NodeGroup) Id() string {
return ng.p.ID
return ng.pool.ID
}
// Debug returns a string containing all information regarding this node group.
func (ng *NodeGroup) Debug() string {
klog.V(4).Info("Debug,called")
return fmt.Sprintf("id:%s,status:%s,version:%s,autoscaling:%t,size:%d,min_size:%d,max_size:%d", ng.Id(), ng.p.Status, ng.p.Version, ng.p.Autoscaling, ng.p.Size, ng.MinSize(), ng.MaxSize())
return fmt.Sprintf("id:%s,status:%s,version:%s,autoscaling:%t,size:%d,min_size:%d,max_size:%d", ng.Id(), ng.pool.Status, ng.pool.Version, ng.pool.Autoscaling, ng.pool.Size, ng.MinSize(), ng.MaxSize())
}
// Nodes returns a list of all nodes that belong to this node group.
func (ng *NodeGroup) Nodes() ([]cloudprovider.Instance, error) {
var nodes []cloudprovider.Instance
klog.V(4).Info("Nodes,PoolID=", ng.p.ID)
klog.V(4).Info("Nodes,PoolID=", ng.pool.ID)
nodes := make([]cloudprovider.Instance, 0, len(ng.nodes))
for _, node := range ng.nodes {
nodes = append(nodes, cloudprovider.Instance{
Id: node.ProviderID,
@ -211,7 +183,7 @@ func (ng *NodeGroup) Nodes() ([]cloudprovider.Instance, error) {
// capacity and allocatable information as well as all pods that are started on
// the node by default, using manifest (most likely only kube-proxy).
func (ng *NodeGroup) TemplateNodeInfo() (*framework.NodeInfo, error) {
klog.V(4).Infof("TemplateNodeInfo,PoolID=%s", ng.p.ID)
klog.V(4).Infof("TemplateNodeInfo,PoolID=%s", ng.pool.ID)
node := apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: ng.specs.Labels[apiv1.LabelHostname],
@ -222,34 +194,25 @@ func (ng *NodeGroup) TemplateNodeInfo() (*framework.NodeInfo, error) {
Allocatable: apiv1.ResourceList{},
},
}
node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(int64(ng.specs.CpuCapacity), resource.DecimalSI)
node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(int64(ng.specs.MemoryCapacity), resource.DecimalSI)
node.Status.Capacity[apiv1.ResourceEphemeralStorage] = *resource.NewQuantity(int64(ng.specs.LocalStorageCapacity), resource.DecimalSI)
node.Status.Capacity[apiv1.ResourcePods] = *resource.NewQuantity(int64(ng.specs.MaxPods), resource.DecimalSI)
node.Status.Allocatable[apiv1.ResourceCPU] = *resource.NewQuantity(int64(ng.specs.CpuAllocatable), resource.DecimalSI)
node.Status.Allocatable[apiv1.ResourceMemory] = *resource.NewQuantity(int64(ng.specs.MemoryAllocatable), resource.DecimalSI)
node.Status.Allocatable[apiv1.ResourceEphemeralStorage] = *resource.NewQuantity(int64(ng.specs.LocalStorageAllocatable), resource.DecimalSI)
node.Status.Allocatable[apiv1.ResourcePods] = *resource.NewQuantity(int64(ng.specs.MaxPods), resource.DecimalSI)
for capacityName, capacityValue := range ng.specs.Capacity {
node.Status.Capacity[apiv1.ResourceName(capacityName)] = *resource.NewQuantity(capacityValue, resource.DecimalSI)
}
if ng.specs.Gpu > 0 {
nbGpu := *resource.NewQuantity(int64(ng.specs.Gpu), resource.DecimalSI)
node.Status.Capacity[gpu.ResourceNvidiaGPU] = nbGpu
node.Status.Allocatable[gpu.ResourceNvidiaGPU] = nbGpu
for allocatableName, allocatableValue := range ng.specs.Allocatable {
node.Status.Allocatable[apiv1.ResourceName(allocatableName)] = *resource.NewQuantity(int64(allocatableValue), resource.DecimalSI)
}
node.Status.Conditions = cloudprovider.BuildReadyConditions()
node.Spec.Taints = parseTaints(ng.specs.Taints)
nodeInfo := framework.NewNodeInfo(&node, nil, &framework.PodInfo{Pod: cloudprovider.BuildKubeProxy(ng.p.Name)})
nodeInfo := framework.NewNodeInfo(&node, nil, &framework.PodInfo{Pod: cloudprovider.BuildKubeProxy(ng.pool.Name)})
return nodeInfo, nil
}
func parseTaints(taints map[string]string) []apiv1.Taint {
k8sTaints := make([]apiv1.Taint, 0, len(taints))
for key, valueEffect := range taints {
splittedValueEffect := strings.Split(valueEffect, ":")
var taint apiv1.Taint
@ -270,23 +233,15 @@ func parseTaints(taints map[string]string) []apiv1.Taint {
k8sTaints = append(k8sTaints, taint)
}
return k8sTaints
}
// Exist checks if the node group really exists on the cloud provider side. Allows to tell the
// theoretical node group from the real one.
func (ng *NodeGroup) Exist() bool {
klog.V(4).Infof("Exist,PoolID=%s", ng.p.ID)
_, err := ng.GetPool(context.Background(), &scalewaygo.GetPoolRequest{
PoolID: ng.p.ID,
})
if err != nil && errors.Is(err, scalewaygo.ErrClientSide) {
return false
}
klog.V(4).Infof("Exist,PoolID=%s", ng.pool.ID)
return true
}
// Pool Autoprovision feature is not supported by Scaleway
@ -311,25 +266,6 @@ func (ng *NodeGroup) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*c
return nil, cloudprovider.ErrNotImplemented
}
// nodesFromPool returns the nodes associated to a Scaleway Pool
func nodesFromPool(client scalewaygo.Client, p *scalewaygo.Pool) (map[string]*scalewaygo.Node, error) {
ctx := context.Background()
resp, err := client.ListNodes(ctx, &scalewaygo.ListNodesRequest{ClusterID: p.ClusterID, PoolID: &p.ID})
if err != nil {
return nil, err
}
nodes := make(map[string]*scalewaygo.Node)
for _, node := range resp.Nodes {
nodes[node.ProviderID] = node
}
klog.V(4).Infof("nodesFromPool,PoolID=%s,%d nodes found", p.ID, len(nodes))
return nodes, nil
}
func fromScwStatus(status scalewaygo.NodeStatus) *cloudprovider.InstanceStatus {
st := &cloudprovider.InstanceStatus{}
switch status {

View File

@ -1,238 +0,0 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scaleway
import (
"context"
"errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/scaleway/scalewaygo"
"testing"
)
func TestNodeGroup_TargetSize(t *testing.T) {
var nodesNb uint32 = 3
ng := &NodeGroup{
p: &scalewaygo.Pool{
Size: nodesNb,
},
}
size, err := ng.TargetSize()
assert.NoError(t, err)
assert.Equal(t, int(nodesNb), size, "target size is wrong")
}
func TestNodeGroup_IncreaseSize(t *testing.T) {
ctx := context.Background()
nodesNb := 3
delta := 2
client := &clientMock{}
ng := &NodeGroup{
Client: client,
p: &scalewaygo.Pool{
Size: uint32(nodesNb),
MinSize: 1,
MaxSize: 10,
Autoscaling: true,
},
}
newSize := uint32(nodesNb + delta)
client.On("UpdatePool",
ctx,
&scalewaygo.UpdatePoolRequest{
PoolID: ng.p.ID,
Size: &newSize,
}).Return(
&scalewaygo.Pool{
Size: newSize,
}, nil,
).Once()
err := ng.IncreaseSize(delta)
assert.NoError(t, err)
}
func TestNodeGroup_IncreaseNegativeDelta(t *testing.T) {
nodesNb := 3
delta := -2
client := &clientMock{}
ng := &NodeGroup{
Client: client,
p: &scalewaygo.Pool{
Size: uint32(nodesNb),
},
}
err := ng.IncreaseSize(delta)
assert.Error(t, err)
}
func TestNodeGroup_IncreaseAboveMaximum(t *testing.T) {
nodesNb := 3
delta := 10
client := &clientMock{}
ng := &NodeGroup{
Client: client,
p: &scalewaygo.Pool{
Size: uint32(nodesNb),
MaxSize: 10,
},
}
err := ng.IncreaseSize(delta)
assert.Error(t, err)
}
func TestNodeGroup_DecreaseTargetSize(t *testing.T) {
ctx := context.Background()
nodesNb := 5
delta := -4
client := &clientMock{}
ng := &NodeGroup{
Client: client,
p: &scalewaygo.Pool{
Size: uint32(nodesNb),
MinSize: 1,
MaxSize: 10,
Autoscaling: true,
},
}
newSize := uint32(nodesNb + delta)
client.On("UpdatePool",
ctx,
&scalewaygo.UpdatePoolRequest{
PoolID: ng.p.ID,
Size: &newSize,
}).Return(
&scalewaygo.Pool{
Size: newSize,
}, nil,
).Once()
err := ng.DecreaseTargetSize(delta)
assert.NoError(t, err)
}
func TestNodeGroup_DecreaseTargetSizePositiveDelta(t *testing.T) {
nodesNb := 3
delta := 2
client := &clientMock{}
ng := &NodeGroup{
Client: client,
p: &scalewaygo.Pool{
Size: uint32(nodesNb),
},
}
err := ng.DecreaseTargetSize(delta)
assert.Error(t, err)
}
func TestNodeGroup_DecreaseBelowMinimum(t *testing.T) {
nodesNb := 3
delta := -3
client := &clientMock{}
ng := &NodeGroup{
Client: client,
p: &scalewaygo.Pool{
Size: uint32(nodesNb),
MinSize: 1,
},
}
err := ng.DecreaseTargetSize(delta)
assert.Error(t, err)
}
func TestNodeGroup_DeleteNodes(t *testing.T) {
ctx := context.Background()
client := &clientMock{}
ng := &NodeGroup{
Client: client,
nodes: map[string]*scalewaygo.Node{
"scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15": {ID: "6852824b-e409-4c77-94df-819629d135b9", ProviderID: "scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15"},
"scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066": {ID: "84acb1a6-0e14-4j36-8b32-71bf7b328c22", ProviderID: "scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066"},
"scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529": {ID: "5c4d832a-d964-4c64-9d53-b9295c206cdd", ProviderID: "scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529"},
},
p: &scalewaygo.Pool{
Size: 3,
},
}
nodes := []*apiv1.Node{
{Spec: apiv1.NodeSpec{ProviderID: "scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15"}},
{Spec: apiv1.NodeSpec{ProviderID: "scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066"}},
{Spec: apiv1.NodeSpec{ProviderID: "scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529"}},
}
client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: ng.nodes["scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15"].ID}).Return(&scalewaygo.Node{Status: scalewaygo.NodeStatusDeleting}, nil).Once()
client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: ng.nodes["scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066"].ID}).Return(&scalewaygo.Node{Status: scalewaygo.NodeStatusDeleting}, nil).Once()
client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: ng.nodes["scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529"].ID}).Return(&scalewaygo.Node{Status: scalewaygo.NodeStatusDeleting}, nil).Once()
err := ng.DeleteNodes(nodes)
assert.NoError(t, err)
assert.Equal(t, uint32(0), ng.p.Size)
}
func TestNodeGroup_DeleteNodesErr(t *testing.T) {
ctx := context.Background()
client := &clientMock{}
ng := &NodeGroup{
Client: client,
nodes: map[string]*scalewaygo.Node{
"nonexistent-on-provider-side": {ID: "unknown"},
},
}
nodes := []*apiv1.Node{
{Spec: apiv1.NodeSpec{ProviderID: "nonexistent-on-provider-side"}},
}
client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: "unknown"}).Return(&scalewaygo.Node{}, errors.New("nonexistent")).Once()
err := ng.DeleteNodes(nodes)
assert.Error(t, err)
}
type clientMock struct {
mock.Mock
}
func (m *clientMock) GetPool(ctx context.Context, req *scalewaygo.GetPoolRequest) (*scalewaygo.Pool, error) {
args := m.Called(ctx, req)
return args.Get(0).(*scalewaygo.Pool), args.Error(1)
}
func (m *clientMock) ListPools(ctx context.Context, req *scalewaygo.ListPoolsRequest) (*scalewaygo.ListPoolsResponse, error) {
args := m.Called(ctx, req)
return args.Get(0).(*scalewaygo.ListPoolsResponse), args.Error(1)
}
func (m *clientMock) UpdatePool(ctx context.Context, req *scalewaygo.UpdatePoolRequest) (*scalewaygo.Pool, error) {
args := m.Called(ctx, req)
return args.Get(0).(*scalewaygo.Pool), args.Error(1)
}
func (m *clientMock) ListNodes(ctx context.Context, req *scalewaygo.ListNodesRequest) (*scalewaygo.ListNodesResponse, error) {
args := m.Called(ctx, req)
return args.Get(0).(*scalewaygo.ListNodesResponse), args.Error(1)
}
func (m *clientMock) DeleteNode(ctx context.Context, req *scalewaygo.DeleteNodeRequest) (*scalewaygo.Node, error) {
args := m.Called(ctx, req)
return args.Get(0).(*scalewaygo.Node), args.Error(1)
}

View File

@ -23,19 +23,21 @@ import (
"errors"
"fmt"
"io"
"k8s.io/autoscaler/cluster-autoscaler/version"
"k8s.io/klog/v2"
"net"
"net/http"
"net/url"
"strconv"
"time"
"k8s.io/autoscaler/cluster-autoscaler/version"
"k8s.io/klog/v2"
)
const (
defaultApiURL string = "https://api.scaleway.com"
defaultHTTPTimeout = 30
pageSizeListPools uint32 = 100
pageSizeListNodes uint32 = 100
pageSizeListPools int = 100
pageSizeListNodes int = 100
)
var (
@ -57,6 +59,14 @@ var (
ErrOther = errors.New("generic error type")
)
// Client is used to talk to Scaleway Kapsule API
type Client interface {
ListPools(ctx context.Context, clusterID string) ([]PoolWithGenericNodeSpecs, error)
UpdatePool(ctx context.Context, poolID string, size int) (Pool, error)
ListNodes(ctx context.Context, cluserID string) ([]Node, error)
DeleteNode(ctx context.Context, nodeID string) (Node, error)
}
// Config is used to deserialize config file passed with flag `cloud-config`
type Config struct {
ClusterID string `json:"cluster_id"`
@ -107,18 +117,6 @@ type scalewayRequest struct {
Body io.Reader
}
// Listing queries default to `fetch all resources` if no `page` is provided
// as CA needs access to all nodes and pools
// Client is used to talk to Scaleway Kapsule API
type Client interface {
GetPool(ctx context.Context, req *GetPoolRequest) (*Pool, error)
ListPools(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error)
UpdatePool(ctx context.Context, req *UpdatePoolRequest) (*Pool, error)
ListNodes(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error)
DeleteNode(ctx context.Context, req *DeleteNodeRequest) (*Node, error)
}
// client contains necessary information to perform API calls
type client struct {
httpClient *http.Client
@ -296,162 +294,79 @@ type Pool struct {
// Autoscaling: the enablement of the autoscaling feature for the pool
Autoscaling bool `json:"autoscaling"`
// Size: the size (number of nodes) of the pool
Size uint32 `json:"size"`
Size int `json:"size"`
// MinSize: the minimum size of the pool
MinSize uint32 `json:"min_size"`
MinSize int `json:"min_size"`
// MaxSize: the maximum size of the pool
MaxSize uint32 `json:"max_size"`
MaxSize int `json:"max_size"`
// Zone: the zone where the nodes will be spawn in
Zone string `json:"zone"`
}
// GetPoolRequest is passed to `GetPool` method
type GetPoolRequest struct {
// PoolID: the ID of the requested pool
PoolID string `json:"-"`
}
// GetPool is used to request a Pool by its id
func (c *client) GetPool(ctx context.Context, req *GetPoolRequest) (*Pool, error) {
var err error
klog.V(4).Info("GetPool,PoolID=", req.PoolID)
if fmt.Sprint(req.PoolID) == "" {
return nil, errors.New("field PoolID cannot be empty in request")
}
scwReq := &scalewayRequest{
Method: "GET",
Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/pools/" + fmt.Sprint(req.PoolID) + "",
}
var resp Pool
err = c.do(ctx, scwReq, &resp)
if err != nil {
return nil, err
}
return &resp, nil
}
// ListPoolsRequest is passed to `ListPools` method
// it can be used for optional pagination
type ListPoolsRequest struct {
// the ID of the cluster from which the pools will be listed from
ClusterID string `json:"-"`
// Page: the page number for the returned pools
Page *int32 `json:"-"`
// PageSize: the maximum number of pools per page
PageSize *uint32 `json:"-"`
}
// GenericNodeSpecs represents NodeType specs used for scale-up simulations.
// it is used to select the appropriate pool to scale-up.
type GenericNodeSpecs struct {
NodePricePerHour float32 `json:"node_price_per_hour"`
MaxPods uint32 `json:"max_pods"`
Gpu uint32 `json:"gpu"`
CpuCapacity uint32 `json:"cpu_capacity"`
CpuAllocatable uint32 `json:"cpu_allocatable"`
MemoryCapacity uint64 `json:"memory_capacity"`
MemoryAllocatable uint64 `json:"memory_allocatable"`
LocalStorageCapacity uint64 `json:"local_storage_capacity"`
LocalStorageAllocatable uint64 `json:"local_storage_allocatable"`
Labels map[string]string `json:"labels"`
Taints map[string]string `json:"taints"`
NodePricePerHour float32 `json:"node_price_per_hour"`
Capacity map[string]int64 `json:"capacity"`
Allocatable map[string]int64 `json:"allocatable"`
Labels map[string]string `json:"labels"`
Taints map[string]string `json:"taints"`
}
// PoolWithGenericNodeSpecs contains the requested `Pool` with additional `Specs` information
type PoolWithGenericNodeSpecs struct {
Pool *Pool `json:"pool"`
Pool Pool `json:"pool"`
Specs GenericNodeSpecs `json:"specs"`
}
// ListPoolsResponse is returned from `ListPools` method
type ListPoolsResponse struct {
// TotalCount: the total number of pools that exists for the cluster
TotalCount uint32 `json:"total_count"`
// Pools: the paginated returned pools
Pools []*PoolWithGenericNodeSpecs `json:"pools"`
Pools []PoolWithGenericNodeSpecs `json:"pools"`
}
// ListPools returns pools associated to a cluster id, pagination optional
func (c *client) ListPools(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error) {
klog.V(4).Info("ListPools,ClusterID=", req.ClusterID)
// ListPools returns pools associated to a cluster id
func (c *client) ListPools(ctx context.Context, clusterID string) ([]PoolWithGenericNodeSpecs, error) {
klog.V(4).Info("ListPools,ClusterID=", clusterID)
if req.Page != nil {
return c.listPoolsPaginated(ctx, req)
}
listPools := func(page int32) (*ListPoolsResponse, error) {
return c.listPoolsPaginated(ctx, &ListPoolsRequest{
ClusterID: req.ClusterID,
Page: &page,
})
}
page := int32(1)
resp, err := listPools(page)
if err != nil {
return nil, err
}
nbPages := (resp.TotalCount + pageSizeListPools - 1) / pageSizeListPools
for uint32(page) <= nbPages {
page++
r, err := listPools(page)
var currentPage = 1
var pools []PoolWithGenericNodeSpecs
for {
paginatedPools, err := c.listPoolsPaginated(ctx, clusterID, currentPage, pageSizeListPools)
if err != nil {
return nil, err
return []PoolWithGenericNodeSpecs{}, err
}
pools = append(pools, paginatedPools...)
if len(paginatedPools) < pageSizeListPools || len(paginatedPools) == 0 {
break
}
resp.Pools = append(resp.Pools, r.Pools...)
if r.TotalCount != resp.TotalCount {
// pools have changed on scaleway side, retrying
resp.TotalCount = r.TotalCount
resp.Pools = []*PoolWithGenericNodeSpecs{}
page = int32(1)
nbPages = (resp.TotalCount + pageSizeListPools - 1) / pageSizeListPools
}
currentPage++
}
return resp, nil
return pools, nil
}
func (c *client) listPoolsPaginated(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error) {
var err error
pageSize := pageSizeListPools
if req.PageSize == nil {
req.PageSize = &pageSize
func (c *client) listPoolsPaginated(ctx context.Context, clusterID string, page, pageSize int) ([]PoolWithGenericNodeSpecs, error) {
if len(clusterID) == 0 {
return nil, errors.New("clusterID cannot be empty in request")
}
query := url.Values{}
if req.Page != nil {
query.Set("page", fmt.Sprint(*req.Page))
}
query.Set("page_size", fmt.Sprint(*req.PageSize))
if fmt.Sprint(req.ClusterID) == "" {
return nil, errors.New("field ClusterID cannot be empty in request")
}
query.Set("page", strconv.Itoa(page))
query.Set("page_size", strconv.Itoa(pageSize))
scwReq := &scalewayRequest{
Method: "GET",
Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/clusters/" + fmt.Sprint(req.ClusterID) + "/pools-autoscaler",
Path: fmt.Sprintf("/k8s/v1/regions/%s/clusters/%s/pools-autoscaler", c.region, clusterID),
Query: query,
}
var resp ListPoolsResponse
err := c.do(ctx, scwReq, &resp)
err = c.do(ctx, scwReq, &resp)
if err != nil {
return nil, err
}
return &resp, nil
return resp.Pools, err
}
// UpdatePoolRequest is passed to `UpdatePool` method
@ -459,166 +374,99 @@ type UpdatePoolRequest struct {
// PoolID: the ID of the pool to update
PoolID string `json:"-"`
// Size: the new size for the pool
Size *uint32 `json:"size"`
Size int `json:"size"`
}
// UpdatePool is used to resize a pool, to decrease pool size `DeleteNode` should be used instead
func (c *client) UpdatePool(ctx context.Context, req *UpdatePoolRequest) (*Pool, error) {
var err error
func (c *client) UpdatePool(ctx context.Context, poolID string, size int) (Pool, error) {
klog.V(4).Info("UpdatePool,PoolID=", poolID)
klog.V(4).Info("UpdatePool,PoolID=", req.PoolID)
if fmt.Sprint(req.PoolID) == "" {
return nil, errors.New("field PoolID cannot be empty in request")
if len(poolID) == 0 {
return Pool{}, errors.New("field PoolID cannot be empty in request")
}
scwReq := &scalewayRequest{
Method: "PATCH",
Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/pools/" + fmt.Sprint(req.PoolID) + "",
Path: fmt.Sprintf("/k8s/v1/regions/%s/pools/%s", c.region, poolID),
}
buf, err := json.Marshal(req)
buf, err := json.Marshal(UpdatePoolRequest{PoolID: poolID, Size: size})
if err != nil {
return nil, err
return Pool{}, err
}
scwReq.Body = bytes.NewReader(buf)
var resp Pool
err = c.do(ctx, scwReq, &resp)
if err != nil {
return nil, err
}
return &resp, nil
}
// ListNodesRequest is passed to `ListNodes` method
type ListNodesRequest struct {
// ClusterID: the cluster ID from which the nodes will be listed from
ClusterID string `json:"-"`
// PoolID: the pool ID on which to filter the returned nodes
PoolID *string `json:"-"`
// Page: the page number for the returned nodes
Page *int32 `json:"-"`
// PageSize: the maximum number of nodes per page
PageSize *uint32 `json:"-"`
return resp, err
}
// ListNodesResponse is returned from `ListNodes` method
type ListNodesResponse struct {
// TotalCount: the total number of nodes
TotalCount uint32 `json:"total_count"`
// Nodes: the paginated returned nodes
Nodes []*Node `json:"nodes"`
Nodes []Node `json:"nodes"`
}
// ListNodes returns the Nodes associated to a Cluster and/or a Pool
func (c *client) ListNodes(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error) {
klog.V(4).Info("ListNodes,ClusterID=", req.ClusterID)
// ListNodes returns the Nodes associated to a Cluster
func (c *client) ListNodes(ctx context.Context, clusterID string) ([]Node, error) {
klog.V(4).Info("ListNodes,ClusterID=", clusterID)
if req.Page != nil {
return c.listNodesPaginated(ctx, req)
}
listNodes := func(page int32) (*ListNodesResponse, error) {
ctx := context.Background()
return c.listNodesPaginated(ctx, &ListNodesRequest{
ClusterID: req.ClusterID,
PoolID: req.PoolID,
Page: &page,
})
}
page := int32(1)
resp, err := listNodes(page)
if err != nil {
return nil, err
}
nbPages := (resp.TotalCount + pageSizeListNodes - 1) / pageSizeListNodes
for uint32(page) <= nbPages {
page++
r, err := listNodes(page)
var currentPage = 1
var nodes []Node
for {
paginatedNodes, err := c.listNodesPaginated(ctx, clusterID, currentPage, pageSizeListPools)
if err != nil {
return nil, err
return []Node{}, err
}
nodes = append(nodes, paginatedNodes...)
if len(paginatedNodes) < int(pageSizeListNodes) || len(paginatedNodes) == 0 {
break
}
resp.Nodes = append(resp.Nodes, r.Nodes...)
if r.TotalCount != resp.TotalCount {
// nodes have changed on scaleway side, retrying
resp.TotalCount = r.TotalCount
resp.Nodes = []*Node{}
page = int32(1)
nbPages = (resp.TotalCount + pageSizeListNodes - 1) / pageSizeListNodes
}
currentPage++
}
return resp, nil
return nodes, nil
}
func (c *client) listNodesPaginated(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error) {
var err error
pageSize := pageSizeListNodes
if req.PageSize == nil {
req.PageSize = &pageSize
func (c *client) listNodesPaginated(ctx context.Context, clusterID string, page, pageSize int) ([]Node, error) {
if len(clusterID) == 0 {
return nil, errors.New("clusterID cannot be empty in request")
}
query := url.Values{}
if req.PoolID != nil {
query.Set("pool_id", fmt.Sprint(*req.PoolID))
}
if req.Page != nil {
query.Set("page", fmt.Sprint(*req.Page))
}
query.Set("page_size", fmt.Sprint(*req.PageSize))
if fmt.Sprint(req.ClusterID) == "" {
return nil, errors.New("field ClusterID cannot be empty in request")
}
query.Set("page", strconv.Itoa(page))
query.Set("page_size", strconv.Itoa(pageSize))
scwReq := &scalewayRequest{
Method: "GET",
Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/clusters/" + fmt.Sprint(req.ClusterID) + "/nodes",
Path: fmt.Sprintf("/k8s/v1/regions/%s/clusters/%s/nodes", c.region, clusterID),
Query: query,
}
var resp ListNodesResponse
err := c.do(ctx, scwReq, &resp)
err = c.do(ctx, scwReq, &resp)
if err != nil {
return nil, err
}
return &resp, nil
}
// DeleteNodeRequest is passed to `DeleteNode` method
type DeleteNodeRequest struct {
NodeID string `json:"-"`
return resp.Nodes, err
}
// DeleteNode asynchronously deletes a Node by its id
func (c *client) DeleteNode(ctx context.Context, req *DeleteNodeRequest) (*Node, error) {
var err error
func (c *client) DeleteNode(ctx context.Context, nodeID string) (Node, error) {
klog.V(4).Info("DeleteNode,NodeID=", nodeID)
klog.V(4).Info("DeleteNode,NodeID=", req.NodeID)
if fmt.Sprint(req.NodeID) == "" {
return nil, errors.New("field NodeID cannot be empty in request")
if len(nodeID) == 0 {
return Node{}, errors.New("field NodeID cannot be empty in request")
}
scwReq := &scalewayRequest{
Method: "DELETE",
Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/nodes/" + fmt.Sprint(req.NodeID) + "",
Path: fmt.Sprintf("/k8s/v1/regions/%s/nodes/%s", c.region, nodeID),
}
var resp Node
err := c.do(ctx, scwReq, &resp)
err = c.do(ctx, scwReq, &resp)
if err != nil {
return nil, err
}
return &resp, nil
return resp, err
}