autoscaler/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/scaleway_kapsule_api.go

625 lines
18 KiB
Go

/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scalewaygo
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"k8s.io/autoscaler/cluster-autoscaler/version"
"k8s.io/klog/v2"
"net"
"net/http"
"net/url"
"time"
)
const (
defaultApiURL string = "https://api.scaleway.com"
defaultHTTPTimeout = 30
pageSizeListPools uint32 = 100
pageSizeListNodes uint32 = 100
)
var (
// ErrMissingClusterID is returned when no cluster id has been found
// either in env variables or in config file
ErrMissingClusterID = errors.New("cluster ID is not provided")
// ErrMissingSecretKey is returned when no secret key has been found
// either in env variables or in config file
ErrMissingSecretKey = errors.New("scaleway secret key is not provided")
// ErrMissingRegion is returned when no region has been found
// either in env variables or in config file
ErrMissingRegion = errors.New("region is not provided")
// ErrClientSide indicates an error on user side
ErrClientSide = errors.New("400 error type")
// ErrServerSide indicates an error on server side
ErrServerSide = errors.New("500 error type")
// ErrOther indicates a generic HTTP error
ErrOther = errors.New("generic error type")
)
// Config is used to deserialize config file passed with flag `cloud-config`
type Config struct {
ClusterID string `json:"cluster_id"`
SecretKey string `json:"secret_key"`
Region string `json:"region"`
ApiUrl string `json:"api_url"`
UserAgent string
}
// NewClient returns a new Client able to talk to Scaleway API
func NewClient(cfg Config) (*client, error) {
if cfg.ClusterID == "" {
return nil, ErrMissingClusterID
}
if cfg.SecretKey == "" {
return nil, ErrMissingSecretKey
}
if cfg.Region == "" {
return nil, ErrMissingRegion
}
if cfg.ApiUrl == "" {
cfg.ApiUrl = defaultApiURL
}
hc := &http.Client{
Timeout: defaultHTTPTimeout * time.Second,
Transport: &http.Transport{
DialContext: (&net.Dialer{Timeout: 5 * time.Second}).DialContext,
TLSHandshakeTimeout: 5 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
},
}
return &client{
httpClient: hc,
apiURL: cfg.ApiUrl,
token: cfg.SecretKey,
userAgent: fmt.Sprintf("%s/%s cluster-id/%s", cfg.UserAgent, version.ClusterAutoscalerVersion, cfg.ClusterID),
region: cfg.Region,
}, nil
}
// scalewayRequest contains all the contents related to performing a request on the Scaleway API.
type scalewayRequest struct {
Method string
Path string
Query url.Values
Body io.Reader
}
// Listing queries default to `fetch all resources` if no `page` is provided
// as CA needs access to all nodes and pools
// Client is used to talk to Scaleway Kapsule API
type Client interface {
GetPool(ctx context.Context, req *GetPoolRequest) (*Pool, error)
ListPools(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error)
UpdatePool(ctx context.Context, req *UpdatePoolRequest) (*Pool, error)
ListNodes(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error)
DeleteNode(ctx context.Context, req *DeleteNodeRequest) (*Node, error)
}
// client contains necessary information to perform API calls
type client struct {
httpClient *http.Client
apiURL string
token string
userAgent string
region string
}
func (req *scalewayRequest) getURL(apiURL string) (*url.URL, error) {
completeURL, err := url.Parse(apiURL + req.Path)
if err != nil {
return nil, fmt.Errorf("invalid url %s: %s", apiURL+req.Path, err)
}
completeURL.RawQuery = req.Query.Encode()
return completeURL, nil
}
func (c *client) ApiURL() string {
return c.apiURL
}
func (c *client) Token() string {
return c.token
}
func (c *client) Region() string {
return c.region
}
// do perform a single HTTP request based on the generic Request object.
func (c *client) do(ctx context.Context, req *scalewayRequest, res interface{}) error {
if req == nil {
return errors.New("request must be non-nil")
}
// build URL
completeURL, err := req.getURL(c.apiURL)
if err != nil {
return err
}
// build request
httpRequest, err := http.NewRequest(req.Method, completeURL.String(), req.Body)
if err != nil {
return fmt.Errorf("could not create request: %w", err)
}
httpRequest.Header.Set("User-Agent", c.userAgent)
httpRequest.Header.Set("X-Auth-Token", c.token)
if req.Body != nil {
httpRequest.Header.Set("Content-Type", "application/json")
}
httpRequest = httpRequest.WithContext(ctx)
// execute request
httpResponse, err := c.httpClient.Do(httpRequest)
if err != nil {
return fmt.Errorf("error executing request: %w", err)
}
defer func() {
if err := httpResponse.Body.Close(); err != nil {
klog.Errorf("failed to close response body: %v", err)
}
}()
ct := httpResponse.Header.Get("Content-Type")
if ct != "application/json" {
return fmt.Errorf("unexpected content-type: %s with status: %s", ct, httpResponse.Status)
}
err = json.NewDecoder(httpResponse.Body).Decode(&res)
if err != nil {
return fmt.Errorf("could not parse %s response body: %w", ct, err)
}
switch {
case httpResponse.StatusCode >= 200 && httpResponse.StatusCode < 300:
return nil
case httpResponse.StatusCode >= 400 && httpResponse.StatusCode < 500:
err = ErrClientSide
case httpResponse.StatusCode >= 500 && httpResponse.StatusCode < 600:
err = ErrServerSide
default:
err = ErrOther
}
return fmt.Errorf("%d %v %v: %w", httpResponse.StatusCode, httpRequest.Method, httpRequest.URL, err)
}
// NodeStatus is the state in which a node might be
type NodeStatus string
const (
// NodeStatusCreating indicates that node is provisioning the underlying instance/BM
NodeStatusCreating = NodeStatus("creating")
// NodeStatusStarting indicates that node is being configured and/or booting
NodeStatusStarting = NodeStatus("starting")
// NodeStatusRegistering indicates that underlying node has booted and k8s services are starting
NodeStatusRegistering = NodeStatus("registering")
// NodeStatusNotReady indicates that k8s has marked this node as `NotReady`
NodeStatusNotReady = NodeStatus("not_ready")
// NodeStatusReady indicates that node is ready for use
NodeStatusReady = NodeStatus("ready")
// NodeStatusDeleting indicates that node is being deleted
NodeStatusDeleting = NodeStatus("deleting")
// NodeStatusDeleted indicates that node is deleted
NodeStatusDeleted = NodeStatus("deleted")
// NodeStatusLocked indicates that node has been locked for legal reasons
NodeStatusLocked = NodeStatus("locked")
// NodeStatusRebooting indicates that node is rebooting
NodeStatusRebooting = NodeStatus("rebooting")
// NodeStatusCreationError indicates that node failed to create
NodeStatusCreationError = NodeStatus("creation_error")
// NodeStatusUpgrading indicates that this node CP is currently upgrading k8s version
NodeStatusUpgrading = NodeStatus("upgrading")
)
// Node represents an instance running in a scaleway pool
type Node struct {
// ID: the ID of the node
ID string `json:"id"`
// PoolID: the pool ID of the node
PoolID string `json:"pool_id"`
// ClusterID: the cluster ID of the node
ClusterID string `json:"cluster_id"`
// ProviderID: the underlying instance ID
ProviderID string `json:"provider_id"`
// Name: the name of the node
Name string `json:"name"`
// Status: the status of the node
Status NodeStatus `json:"status"`
// CreatedAt: the date at which the node was created
CreatedAt *time.Time `json:"created_at"`
// UpdatedAt: the date at which the node was last updated
UpdatedAt *time.Time `json:"updated_at"`
}
// PoolStatus is the state in which a pool might be (unused)
type PoolStatus string
// These are possible statuses for a scaleway pool
const (
PoolStatusReady = PoolStatus("ready")
PoolStatusDeleting = PoolStatus("deleting")
PoolStatusDeleted = PoolStatus("deleted")
PoolStatusScaling = PoolStatus("scaling")
PoolStatusWarning = PoolStatus("warning")
PoolStatusLocked = PoolStatus("locked")
PoolStatusUpgrading = PoolStatus("upgrading")
)
// Pool is the abstraction used to gather nodes with the same specs
type Pool struct {
// ID: the ID of the pool
ID string `json:"id"`
// ClusterID: the cluster ID of the pool
ClusterID string `json:"cluster_id"`
// CreatedAt: the date at which the pool was created
CreatedAt *time.Time `json:"created_at"`
// UpdatedAt: the date at which the pool was last updated
UpdatedAt *time.Time `json:"updated_at"`
// Name: the name of the pool
Name string `json:"name"`
// Status: the status of the pool
Status PoolStatus `json:"status"`
// Version: the version of the pool
Version string `json:"version"`
// NodeType: the node type is the type of Scaleway Instance wanted for the pool
NodeType string `json:"node_type"`
// Autoscaling: the enablement of the autoscaling feature for the pool
Autoscaling bool `json:"autoscaling"`
// Size: the size (number of nodes) of the pool
Size uint32 `json:"size"`
// MinSize: the minimum size of the pool
MinSize uint32 `json:"min_size"`
// MaxSize: the maximum size of the pool
MaxSize uint32 `json:"max_size"`
// Zone: the zone where the nodes will be spawn in
Zone string `json:"zone"`
}
// GetPoolRequest is passed to `GetPool` method
type GetPoolRequest struct {
// PoolID: the ID of the requested pool
PoolID string `json:"-"`
}
// GetPool is used to request a Pool by its id
func (c *client) GetPool(ctx context.Context, req *GetPoolRequest) (*Pool, error) {
var err error
klog.V(4).Info("GetPool,PoolID=", req.PoolID)
if fmt.Sprint(req.PoolID) == "" {
return nil, errors.New("field PoolID cannot be empty in request")
}
scwReq := &scalewayRequest{
Method: "GET",
Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/pools/" + fmt.Sprint(req.PoolID) + "",
}
var resp Pool
err = c.do(ctx, scwReq, &resp)
if err != nil {
return nil, err
}
return &resp, nil
}
// ListPoolsRequest is passed to `ListPools` method
// it can be used for optional pagination
type ListPoolsRequest struct {
// the ID of the cluster from which the pools will be listed from
ClusterID string `json:"-"`
// Page: the page number for the returned pools
Page *int32 `json:"-"`
// PageSize: the maximum number of pools per page
PageSize *uint32 `json:"-"`
}
// GenericNodeSpecs represents NodeType specs used for scale-up simulations.
// it is used to select the appropriate pool to scale-up.
type GenericNodeSpecs struct {
NodePricePerHour float32 `json:"node_price_per_hour"`
MaxPods uint32 `json:"max_pods"`
Gpu uint32 `json:"gpu"`
CpuCapacity uint32 `json:"cpu_capacity"`
CpuAllocatable uint32 `json:"cpu_allocatable"`
MemoryCapacity uint64 `json:"memory_capacity"`
MemoryAllocatable uint64 `json:"memory_allocatable"`
LocalStorageCapacity uint64 `json:"local_storage_capacity"`
LocalStorageAllocatable uint64 `json:"local_storage_allocatable"`
Labels map[string]string `json:"labels"`
Taints map[string]string `json:"taints"`
}
// PoolWithGenericNodeSpecs contains the requested `Pool` with additional `Specs` information
type PoolWithGenericNodeSpecs struct {
Pool *Pool `json:"pool"`
Specs GenericNodeSpecs `json:"specs"`
}
// ListPoolsResponse is returned from `ListPools` method
type ListPoolsResponse struct {
// TotalCount: the total number of pools that exists for the cluster
TotalCount uint32 `json:"total_count"`
// Pools: the paginated returned pools
Pools []*PoolWithGenericNodeSpecs `json:"pools"`
}
// ListPools returns pools associated to a cluster id, pagination optional
func (c *client) ListPools(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error) {
klog.V(4).Info("ListPools,ClusterID=", req.ClusterID)
if req.Page != nil {
return c.listPoolsPaginated(ctx, req)
}
listPools := func(page int32) (*ListPoolsResponse, error) {
return c.listPoolsPaginated(ctx, &ListPoolsRequest{
ClusterID: req.ClusterID,
Page: &page,
})
}
page := int32(1)
resp, err := listPools(page)
if err != nil {
return nil, err
}
nbPages := (resp.TotalCount + pageSizeListPools - 1) / pageSizeListPools
for uint32(page) <= nbPages {
page++
r, err := listPools(page)
if err != nil {
return nil, err
}
resp.Pools = append(resp.Pools, r.Pools...)
if r.TotalCount != resp.TotalCount {
// pools have changed on scaleway side, retrying
resp.TotalCount = r.TotalCount
resp.Pools = []*PoolWithGenericNodeSpecs{}
page = int32(1)
nbPages = (resp.TotalCount + pageSizeListPools - 1) / pageSizeListPools
}
}
return resp, nil
}
func (c *client) listPoolsPaginated(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error) {
var err error
pageSize := pageSizeListPools
if req.PageSize == nil {
req.PageSize = &pageSize
}
query := url.Values{}
if req.Page != nil {
query.Set("page", fmt.Sprint(*req.Page))
}
query.Set("page_size", fmt.Sprint(*req.PageSize))
if fmt.Sprint(req.ClusterID) == "" {
return nil, errors.New("field ClusterID cannot be empty in request")
}
scwReq := &scalewayRequest{
Method: "GET",
Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/clusters/" + fmt.Sprint(req.ClusterID) + "/pools-autoscaler",
Query: query,
}
var resp ListPoolsResponse
err = c.do(ctx, scwReq, &resp)
if err != nil {
return nil, err
}
return &resp, nil
}
// UpdatePoolRequest is passed to `UpdatePool` method
type UpdatePoolRequest struct {
// PoolID: the ID of the pool to update
PoolID string `json:"-"`
// Size: the new size for the pool
Size *uint32 `json:"size"`
}
// UpdatePool is used to resize a pool, to decrease pool size `DeleteNode` should be used instead
func (c *client) UpdatePool(ctx context.Context, req *UpdatePoolRequest) (*Pool, error) {
var err error
klog.V(4).Info("UpdatePool,PoolID=", req.PoolID)
if fmt.Sprint(req.PoolID) == "" {
return nil, errors.New("field PoolID cannot be empty in request")
}
scwReq := &scalewayRequest{
Method: "PATCH",
Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/pools/" + fmt.Sprint(req.PoolID) + "",
}
buf, err := json.Marshal(req)
if err != nil {
return nil, err
}
scwReq.Body = bytes.NewReader(buf)
var resp Pool
err = c.do(ctx, scwReq, &resp)
if err != nil {
return nil, err
}
return &resp, nil
}
// ListNodesRequest is passed to `ListNodes` method
type ListNodesRequest struct {
// ClusterID: the cluster ID from which the nodes will be listed from
ClusterID string `json:"-"`
// PoolID: the pool ID on which to filter the returned nodes
PoolID *string `json:"-"`
// Page: the page number for the returned nodes
Page *int32 `json:"-"`
// PageSize: the maximum number of nodes per page
PageSize *uint32 `json:"-"`
}
// ListNodesResponse is returned from `ListNodes` method
type ListNodesResponse struct {
// TotalCount: the total number of nodes
TotalCount uint32 `json:"total_count"`
// Nodes: the paginated returned nodes
Nodes []*Node `json:"nodes"`
}
// ListNodes returns the Nodes associated to a Cluster and/or a Pool
func (c *client) ListNodes(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error) {
klog.V(4).Info("ListNodes,ClusterID=", req.ClusterID)
if req.Page != nil {
return c.listNodesPaginated(ctx, req)
}
listNodes := func(page int32) (*ListNodesResponse, error) {
ctx := context.Background()
return c.listNodesPaginated(ctx, &ListNodesRequest{
ClusterID: req.ClusterID,
PoolID: req.PoolID,
Page: &page,
})
}
page := int32(1)
resp, err := listNodes(page)
if err != nil {
return nil, err
}
nbPages := (resp.TotalCount + pageSizeListNodes - 1) / pageSizeListNodes
for uint32(page) <= nbPages {
page++
r, err := listNodes(page)
if err != nil {
return nil, err
}
resp.Nodes = append(resp.Nodes, r.Nodes...)
if r.TotalCount != resp.TotalCount {
// nodes have changed on scaleway side, retrying
resp.TotalCount = r.TotalCount
resp.Nodes = []*Node{}
page = int32(1)
nbPages = (resp.TotalCount + pageSizeListNodes - 1) / pageSizeListNodes
}
}
return resp, nil
}
func (c *client) listNodesPaginated(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error) {
var err error
pageSize := pageSizeListNodes
if req.PageSize == nil {
req.PageSize = &pageSize
}
query := url.Values{}
if req.PoolID != nil {
query.Set("pool_id", fmt.Sprint(*req.PoolID))
}
if req.Page != nil {
query.Set("page", fmt.Sprint(*req.Page))
}
query.Set("page_size", fmt.Sprint(*req.PageSize))
if fmt.Sprint(req.ClusterID) == "" {
return nil, errors.New("field ClusterID cannot be empty in request")
}
scwReq := &scalewayRequest{
Method: "GET",
Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/clusters/" + fmt.Sprint(req.ClusterID) + "/nodes",
Query: query,
}
var resp ListNodesResponse
err = c.do(ctx, scwReq, &resp)
if err != nil {
return nil, err
}
return &resp, nil
}
// DeleteNodeRequest is passed to `DeleteNode` method
type DeleteNodeRequest struct {
NodeID string `json:"-"`
}
// DeleteNode asynchronously deletes a Node by its id
func (c *client) DeleteNode(ctx context.Context, req *DeleteNodeRequest) (*Node, error) {
var err error
klog.V(4).Info("DeleteNode,NodeID=", req.NodeID)
if fmt.Sprint(req.NodeID) == "" {
return nil, errors.New("field NodeID cannot be empty in request")
}
scwReq := &scalewayRequest{
Method: "DELETE",
Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/nodes/" + fmt.Sprint(req.NodeID) + "",
}
var resp Node
err = c.do(ctx, scwReq, &resp)
if err != nil {
return nil, err
}
return &resp, nil
}