Merge pull request #5101 from Kamatera/kamatera-cloudprovider

cluster-autoscaler: add cloudprovider - Kamatera
This commit is contained in:
Kubernetes Prow Robot 2022-08-22 06:41:43 -07:00 committed by GitHub
commit 4511ce0c36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 3172 additions and 0 deletions

View File

@ -34,6 +34,7 @@ You should also take a look at the notes and "gotchas" for your specific cloud p
* [TencentCloud](./cloudprovider/tencentcloud/README.md)
* [Scaleway](./cloudprovider/scaleway/README.md)
* [Rancher](./cloudprovider/rancher/README.md)
* [Kamatera](./cloudprovider/kamatera/README.md)
# Releases
@ -182,3 +183,4 @@ Supported cloud providers:
* BaiduCloud https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/baiducloud/README.md
* Huawei Cloud https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/huaweicloud/README.md
* Rancher https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/rancher/README.md
* Kamatera https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/kamatera/README.md

View File

@ -38,6 +38,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/hetzner"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/huaweicloud"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/ionoscloud"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/kamatera"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/linode"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/magnum"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci"
@ -69,6 +70,7 @@ var AvailableCloudProviders = []string{
cloudprovider.OVHcloudProviderName,
cloudprovider.ClusterAPIProviderName,
cloudprovider.IonoscloudProviderName,
cloudprovider.KamateraProviderName,
cloudprovider.LinodeProviderName,
cloudprovider.BizflyCloudProviderName,
cloudprovider.BrightboxProviderName,
@ -123,6 +125,8 @@ func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGro
return clusterapi.BuildClusterAPI(opts, do, rl)
case cloudprovider.IonoscloudProviderName:
return ionoscloud.BuildIonosCloud(opts, do, rl)
case cloudprovider.KamateraProviderName:
return kamatera.BuildKamatera(opts, do, rl)
case cloudprovider.LinodeProviderName:
return linode.BuildLinode(opts, do, rl)
case cloudprovider.OracleCloudProviderName:

View File

@ -0,0 +1,43 @@
//go:build kamatera
// +build kamatera
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package builder
import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/kamatera"
"k8s.io/autoscaler/cluster-autoscaler/config"
)
// AvailableCloudProviders supported by the Kamaterea cloud provider builder.
var AvailableCloudProviders = []string{
cloudprovider.KamateraProviderName,
}
// DefaultCloudProvider is Kamatera.
const DefaultCloudProvider = cloudprovider.KamateraProviderName
func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
switch opts.CloudProviderName {
case cloudprovider.KamateraProviderName:
return kamatera.BuildKamatera(opts, do, rl)
}
return nil
}

View File

@ -56,6 +56,8 @@ const (
HetznerProviderName = "hetzner"
// MagnumProviderName gets the provider name of magnum
MagnumProviderName = "magnum"
// KamateraProviderName gets the provider name of kamatera
KamateraProviderName = "kamatera"
// KubemarkProviderName gets the provider name of kubemark
KubemarkProviderName = "kubemark"
// HuaweicloudProviderName gets the provider name of huaweicloud

View File

@ -0,0 +1,4 @@
approvers:
#- OriHoch
reviewers:
#- OriHoch

View File

@ -0,0 +1,174 @@
# Cluster Autoscaler for Kamatera
The cluster autoscaler for Kamatera scales nodes in a Kamatera cluster.
## Kamatera Kubernetes
[Kamatera](https://www.kamatera.com/express/compute/) supports Kubernetes clusters using our Rancher app
or by creating a self-managed cluster directly on Kamatera compute servers, the autoscaler supports
both methods.
## Cluster Autoscaler Node Groups
An autoscaler node group is composed of multiple Kamatera servers with the same server configuration.
All servers belonging to a node group are identified by Kamatera server tags `k8sca-CLUSTER_NAME`, `k8scang-NODEGROUP_NAME`.
The cluster and node groups must be specified in the autoscaler cloud configuration file.
## Deployment
Copy [examples/deployment.yaml](examples/deployment.yaml) and modify the configuration as needed, see below
regarding the required configuration values and format. When the configuraiont is ready, deploy it to your cluster
e.g. using `kubectl apply -f deployment.yaml`.
## Configuration
The cluster autoscaler only considers the cluster and node groups defined in the configuration file.
You can see an example of the cloud config file at [examples/deployment.yaml](examples/deployment.yaml),
**Important Note:** The cluster and node group names must be 15 characters or less.
it is an INI file with the following fields:
| Key | Value | Mandatory | Default |
|-----|-------|-----------|---------|
| global/kamatera-api-client-id | Kamatera API Client ID | yes | none |
| global/kamatera-api-secret | Kamatera API Secret | yes | none |
| global/cluster-name | **max 15 characters: english letters, numbers, dash, underscore, space, dot**: distinct string used to set the cluster server tag | yes | none |
| global/default-min-size | default minimum size of a node group (must be > 0) | no | 1 |
| global/default-max-size | default maximum size of a node group | no | 254 |
| global/default-<SERVER_CONFIG_KEY> | replace <SERVER_CONFIG_KEY> with the relevant configuration key | see below | see below |
| nodegroup \"name\" | **max 15 characters: english letters, numbers, dash, underscore, space, dot**: distinct string within the cluster used to set the node group server tag | yes | none |
| nodegroup \"name\"/min-size | minimum size for a specific node group | no | global/defaut-min-size |
| nodegroup \"name\"/max-size | maximum size for a specific node group | no | global/defaut-min-size |
| nodegroup \"name\"/<SERVER_CONFIG_KEY> | replace <SERVER_CONFIG_KEY> with the relevant configuration key | no | global/default-<SERVER_CONFIG_KEY> |
### Server configuration keys
Following are the supported server configuration keys:
| Key | Value | Mandatory | Default |
|-----|-------|-----------|---------|
| name-prefix | Prefix for all created server names | no | none |
| password | Server root password | no | none |
| ssh-key | Public SSH key to add to the server authorized keys | no | none |
| datacenter | Datacenter ID | yes | none |
| image | Image ID or name | yes | none |
| cpu | CPU type and size identifier | yes | none |
| ram | RAM size in MB | yes | none |
| disk | Disk specifications - see below for details | yes | none |
| dailybackup | boolean - set to true to enable daily backups | no | false |
| managed | boolean - set to true to enable managed services | no | false |
| network | Network specifications - see below for details | yes | none |
| billingcycle | \"hourly\" or \"monthly\" | no | \"hourly\" |
| monthlypackage | For monthly billing only - the monthly network package to use | no | none |
| script-base64 | base64 encoded server initialization script, must be provided to connect the server to the cluster, see below for details | no | none |
### Disk specifications
Server disks are specified using an array of strings which are the same as the cloudcli `--disk` argument
as specified in [cloudcli server create](https://github.com/cloudwm/cloudcli/blob/master/docs/cloudcli_server_create.md).
For multiple disks, include the configuration multiple times, example:
```
[global]
; default for all node groups: single 100gb disk
default-disk = "size=100"
[nodegroup "ng1"]
; this node group will use the default
[nodegroup "ng2"]
; override the default and use 2 disks
disk = "size=100"
disk = "size=200"
```
### Network specifications
Networks are specified using an array of strings which are the same as the cloudcli `--network` argument
as specified in [cloudcli server create](https://github.com/cloudwm/cloudcli/blob/master/docs/cloudcli_server_create.md).
For multiple networks, include the configuration multiple times, example:
```
[global]
; default for all node groups: single public network with auto-assigned ip
default-network = "name=wan,ip=auto"
[nodegroup "ng1"]
; this node group will use the default
[nodegroup "ng2"]
; override the default and attach 2 networks - 1 public and 1 private
network = "name=wan,ip=auto"
network = "name=lan-12345-abcde,ip=auto"
```
### Server Initialization Script
This script is required so that the server will connect to the relevant cluster. The specific script depends on
how you create and manage the cluster.
See below for some common configurations, but the exact script may need to be modified depending on your requirements
and server image.
The script needs to be provided as a base64 encoded string. You can encode your script using the following command:
`cat script.sh | base64 -w0`.
#### Kamatera Rancher Server Initialization Script
Using Kamatera Rancher you need to get the command to join a server to the cluster. This is available from the
following URL: `https://rancher.domain/v3/clusterregistrationtokens`. The relevant command is available under
`data[].nodeCommand`, if you have a single cluster, it will be the first one. If you have multiple cluster you
will have to locate the relevant cluster from the array using `clusterId`. The command will look like this:
```
sudo docker run -d --privileged --restart=unless-stopped --net=host -v /etc/kubernetes:/etc/kubernetes -v /var/run:/var/run rancher/rancher-agent:v2.6.4 --server https://rancher.domain --token aaa --ca-checksum bbb
```
You can replace this command in the example script at [examples/server-init-rancher.sh.txt](examples/server-init-rancher.sh.txt)
#### Kubeadm Initialization Script
The example script at [examples/server-init-kubeadm.sh.txt](examples/server-init-kubeadm.sh.txt) can be used as a base for
writing your own script to join the server to your cluster.
## Development
Make sure you are inside the `cluster-autoscaler` path of the [autoscaler repository](https://github.com/kubernetes/autoscaler).
Run tests:
```
go test -v k8s.io/autoscaler/cluster-autoscaler/cloudprovider/kamatera
```
Setup a Kamatera cluster, you can use [this guide](https://github.com/Kamatera/rancher-kubernetes/blob/main/README.md)
Get the cluster kubeconfig and set in local file and set in the `KUBECONFIG` environment variable.
Make sure you are connected to the cluster using `kubectl get nodes`.
Create a cloud config file according to the above documentation and set it's path in `CLOUD_CONFIG_FILE` env var.
Build the binary and run it:
```
make build &&\
./cluster-autoscaler-amd64 --cloud-config $CLOUD_CONFIG_FILE --cloud-provider kamatera --kubeconfig $KUBECONFIG -v2
```
Build the docker image:
```
make container
```
Tag and push it to a Docker registry
```
docker tag staging-k8s.gcr.io/cluster-autoscaler-amd64:dev ghcr.io/github_username_lowercase/cluster-autoscaler-amd64
docker push ghcr.io/github_username_lowercase/cluster-autoscaler-amd64
```
Make sure relevant clsuter has access to this registry/image.
Follow the documentation for deploying the image and using the autoscaler.

View File

@ -0,0 +1,191 @@
apiVersion: v1
kind: ServiceAccount
metadata:
labels:
k8s-addon: cluster-autoscaler.addons.k8s.io
k8s-app: cluster-autoscaler
name: cluster-autoscaler
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: cluster-autoscaler
labels:
k8s-addon: cluster-autoscaler.addons.k8s.io
k8s-app: cluster-autoscaler
rules:
- apiGroups: [""]
resources: ["events", "endpoints"]
verbs: ["create", "patch"]
- apiGroups: [""]
resources: ["pods/eviction"]
verbs: ["create"]
- apiGroups: [""]
resources: ["pods/status"]
verbs: ["update"]
- apiGroups: [""]
resources: ["endpoints"]
resourceNames: ["cluster-autoscaler"]
verbs: ["get", "update"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["watch", "list", "get", "update"]
- apiGroups: [""]
resources:
- "namespaces"
- "pods"
- "services"
- "replicationcontrollers"
- "persistentvolumeclaims"
- "persistentvolumes"
verbs: ["watch", "list", "get"]
- apiGroups: ["extensions"]
resources: ["replicasets", "daemonsets"]
verbs: ["watch", "list", "get"]
- apiGroups: ["policy"]
resources: ["poddisruptionbudgets"]
verbs: ["watch", "list"]
- apiGroups: ["apps"]
resources: ["statefulsets", "replicasets", "daemonsets"]
verbs: ["watch", "list", "get"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses", "csinodes", "csidrivers", "csistoragecapacities"]
verbs: ["watch", "list", "get"]
- apiGroups: ["batch", "extensions"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "patch"]
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["create"]
- apiGroups: ["coordination.k8s.io"]
resourceNames: ["cluster-autoscaler"]
resources: ["leases"]
verbs: ["get", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: cluster-autoscaler
namespace: kube-system
labels:
k8s-addon: cluster-autoscaler.addons.k8s.io
k8s-app: cluster-autoscaler
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["create","list","watch"]
- apiGroups: [""]
resources: ["configmaps"]
resourceNames:
- "cluster-autoscaler-status"
- "cluster-autoscaler-priority-expander"
verbs: ["delete", "get", "update", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: cluster-autoscaler
labels:
k8s-addon: cluster-autoscaler.addons.k8s.io
k8s-app: cluster-autoscaler
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-autoscaler
subjects:
- kind: ServiceAccount
name: cluster-autoscaler
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: cluster-autoscaler
namespace: kube-system
labels:
k8s-addon: cluster-autoscaler.addons.k8s.io
k8s-app: cluster-autoscaler
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: cluster-autoscaler
subjects:
- kind: ServiceAccount
name: cluster-autoscaler
namespace: kube-system
---
apiVersion: v1
kind: Secret
metadata:
name: cluster-autoscaler-kamatera
namespace: kube-system
type: Opaque
stringData:
cloud-config: |-
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=aaabbb
default-min-size=1
default-max-size=10
default-password=Aa123456789!
default-ssh-key=ssh-rsa AAAAB3N....Rw==
default-datacenter=IL
default-image=ubuntu_server_18.04_64-bit
default-cpu=1A
default-ram=1024
default-disk=size=50
default-dailybackup=no
default-managed=no
default-network=name=wan,ip=auto
default-billingcycle=hourly
default-monthlypackage=
default-script=<base64-encoded-initilaization-script>
[nodegroup "default"]
[nodegroup "highcpu"]
cpu=2D
[nodegroup "highram"]
ram=4096
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: cluster-autoscaler
namespace: kube-system
labels:
app: cluster-autoscaler
spec:
replicas: 1
selector:
matchLabels:
app: cluster-autoscaler
template:
metadata:
labels:
app: cluster-autoscaler
spec:
serviceAccountName: cluster-autoscaler
containers:
- image: k8s.gcr.io/cluster-autoscaler:{{ ca_version }}
name: cluster-autoscaler
command:
- ./cluster-autoscaler
- --cloud-provider=kamatera
- --cloud-config=/config/cloud-config
- --v=2
- --logtostderr=true
- --namespace=kube-system
imagePullPolicy: "Always"
volumeMounts:
- name: cloud-config
mountPath: /config
readOnly: true
volumes:
- name: cloud-config
secret:
secretName: cluster-autoscaler-kamatera

View File

@ -0,0 +1,35 @@
#!/bin/bash
export DEBIAN_FRONTEND=noninteractive
export HOME=/root/
apt-get update && apt-get install -y apt-transport-https ca-certificates curl software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add -
cat <<EOF >/etc/apt/sources.list.d/kubernetes.list
deb https://apt.kubernetes.io/ kubernetes-xenial main
EOF
add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
apt-get update
apt-get upgrade -y
apt-get install -y kubelet=1.20.1-00 kubeadm=1.20.1-00 kubectl=1.20.1-00
apt-mark hold kubelet kubeadm kubectl
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu bionic stable"
apt update
apt install -y docker-ce
cat <<EOF | tee /etc/default/kubelet
KUBELET_EXTRA_ARGS=--cloud-provider=external
EOF
cat > /etc/docker/daemon.json <<EOF
{
"exec-opts": ["native.cgroupdriver=systemd"],
"log-driver": "json-file",
"log-opts": {
"max-size": "100m"
},
"storage-driver": "overlay2"
}
EOF
mkdir -p /etc/systemd/system/docker.service.d
systemctl daemon-reload
systemctl restart docker
kubeadm join <master-ip> --token <token> --discovery-token-ca-cert-hash sha256:<hash>

View File

@ -0,0 +1,13 @@
#!/bin/bash
export DEBIAN_FRONTEND=noninteractive
export HOME=/root/
apt-get update && apt-get install -y apt-transport-https ca-certificates curl software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
apt-get update
apt install -y docker-ce
sudo docker run -d --privileged --restart=unless-stopped --net=host -v /etc/kubernetes:/etc/kubernetes -v /var/run:/var/run \
rancher/rancher-agent:v2.6.4 \
--server https://rancher.domain \
--token aaa \
--ca-checksum bbb

View File

@ -0,0 +1,34 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"context"
)
// kamateraAPIClient is the interface used to call kamatera API
type kamateraAPIClient interface {
ListServers(ctx context.Context, instances map[string]*Instance) ([]Server, error)
DeleteServer(ctx context.Context, name string) error
CreateServers(ctx context.Context, count int, config ServerConfig) ([]Server, error)
}
// buildKamateraAPIClient returns the struct ready to perform calls to kamatera API
func buildKamateraAPIClient(clientId string, secret string, url string) kamateraAPIClient {
client := NewKamateraApiClientRest(clientId, secret, url)
return &client
}

View File

@ -0,0 +1,270 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"context"
"encoding/hex"
"fmt"
"github.com/satori/go.uuid"
"k8s.io/autoscaler/cluster-autoscaler/version"
"k8s.io/klog/v2"
"strings"
)
const (
userAgent = "kubernetes/cluster-autoscaler/" + version.ClusterAutoscalerVersion
)
// NewKamateraApiClientRest factory to create new Rest API Client struct
func NewKamateraApiClientRest(clientId string, secret string, url string) (client KamateraApiClientRest) {
return KamateraApiClientRest{
userAgent: userAgent,
clientId: clientId,
secret: secret,
url: url,
}
}
// KamateraServerPostRequest struct for Kamatera server post request
type KamateraServerPostRequest struct {
ServerName string `json:"name"`
}
// KamateraServerTerminatePostRequest struct for Kamatera server terminate post request
type KamateraServerTerminatePostRequest struct {
ServerName string `json:"name"`
Force bool `json:"force"`
}
// KamateraServerCreatePostRequest struct for Kamatera server create post request
type KamateraServerCreatePostRequest struct {
Name string `json:"name"`
Password string `json:"password"`
PasswordValidate string `json:"passwordValidate"`
SshKey string `json:"ssh-key"`
Datacenter string `json:"datacenter"`
Image string `json:"image"`
Cpu string `json:"cpu"`
Ram string `json:"ram"`
Disk string `json:"disk"`
Dailybackup string `json:"dailybackup"`
Managed string `json:"managed"`
Network string `json:"network"`
Quantity int `json:"quantity"`
BillingCycle string `json:"billingCycle"`
MonthlyPackage string `json:"monthlypackage"`
Poweronaftercreate string `json:"poweronaftercreate"`
ScriptFile string `json:"script-file"`
UserdataFile string `json:"userdata-file"`
Tag string `json:"tag"`
}
// KamateraApiClientRest is the struct to perform API calls
type KamateraApiClientRest struct {
userAgent string
clientId string
secret string
url string
}
// ListServers returns a list of all servers in the relevant account and fetches their tags
func (c *KamateraApiClientRest) ListServers(ctx context.Context, instances map[string]*Instance) ([]Server, error) {
res, err := request(
ctx,
ProviderConfig{ApiUrl: c.url, ApiClientID: c.clientId, ApiSecret: c.secret},
"GET",
"/service/servers",
nil,
)
if err != nil {
return nil, err
}
var servers []Server
for _, server := range res.([]interface{}) {
server := server.(map[string]interface{})
serverName := server["name"].(string)
serverPowerOn := server["power"].(string) == "on"
serverTags, err := c.getServerTags(ctx, serverName, instances)
if err != nil {
return nil, err
}
servers = append(servers, Server{
Name: serverName,
Tags: serverTags,
PowerOn: serverPowerOn,
})
}
return servers, nil
}
// DeleteServer deletes a server according to the given name
func (c *KamateraApiClientRest) DeleteServer(ctx context.Context, name string) error {
res, err := request(
ctx,
ProviderConfig{ApiUrl: c.url, ApiClientID: c.clientId, ApiSecret: c.secret},
"POST",
"/service/server/poweroff",
KamateraServerPostRequest{ServerName: name},
)
if err == nil {
commandId := res.([]interface{})[0].(string)
_, err = waitCommand(
ctx,
ProviderConfig{ApiUrl: c.url, ApiClientID: c.clientId, ApiSecret: c.secret},
commandId,
)
if err != nil {
return err
}
} else {
klog.V(1).Infof("Failed to power off server but will attempt to terminate anyway %s: %v", name, err)
}
_, err = request(
ctx,
ProviderConfig{ApiUrl: c.url, ApiClientID: c.clientId, ApiSecret: c.secret},
"POST",
"/service/server/terminate",
KamateraServerTerminatePostRequest{ServerName: name, Force: true},
)
if err != nil {
return err
}
return nil
}
// CreateServers creates new servers according to the given configuration
func (c *KamateraApiClientRest) CreateServers(ctx context.Context, count int, config ServerConfig) ([]Server, error) {
var tags []string
for _, tag := range config.Tags {
tags = append(tags, tag)
}
Tag, err := kamateraRequestArray(tags)
if err != nil {
return nil, err
}
Network, err := kamateraRequestArray(config.Networks)
if err != nil {
return nil, err
}
Disk, err := kamateraRequestArray(config.Disks)
if err != nil {
return nil, err
}
serverNameCommandIds := make(map[string]string)
for i := 0; i < count; i++ {
serverName := kamateraServerName(config.NamePrefix)
res, err := request(
ctx,
ProviderConfig{ApiUrl: c.url, ApiClientID: c.clientId, ApiSecret: c.secret},
"POST",
"/service/server",
KamateraServerCreatePostRequest{
Name: serverName,
Password: config.Password,
PasswordValidate: config.Password,
SshKey: config.SshKey,
Datacenter: config.Datacenter,
Image: config.Image,
Cpu: config.Cpu,
Ram: config.Ram,
Disk: Disk,
Dailybackup: kamateraRequestBool(config.Dailybackup),
Managed: kamateraRequestBool(config.Managed),
Network: Network,
Quantity: 1,
BillingCycle: config.BillingCycle,
MonthlyPackage: config.MonthlyPackage,
Poweronaftercreate: "yes",
ScriptFile: config.ScriptFile,
UserdataFile: config.UserdataFile,
Tag: Tag,
},
)
if err != nil {
return nil, err
}
if config.Password == "__generate__" {
resData := res.(map[string]interface{})
klog.V(2).Infof("Generated password for server %s: %s", serverName, resData["password"].(string))
serverNameCommandIds[serverName] = resData["commandIds"].([]interface{})[0].(string)
} else {
serverNameCommandIds[serverName] = res.([]interface{})[0].(string)
}
}
results, err := waitCommands(
ctx,
ProviderConfig{ApiUrl: c.url, ApiClientID: c.clientId, ApiSecret: c.secret},
serverNameCommandIds,
)
if err != nil {
return nil, err
}
var servers []Server
for serverName := range results {
servers = append(servers, Server{
Name: serverName,
Tags: tags,
})
}
return servers, nil
}
func (c *KamateraApiClientRest) getServerTags(ctx context.Context, serverName string, instances map[string]*Instance) ([]string, error) {
if instances[serverName] == nil {
res, err := request(
ctx,
ProviderConfig{ApiUrl: c.url, ApiClientID: c.clientId, ApiSecret: c.secret},
"POST",
"/server/tags",
KamateraServerPostRequest{ServerName: serverName},
)
if err != nil {
return nil, err
}
var tags []string
for _, row := range res.([]interface{}) {
row := row.(map[string]interface{})
tags = append(tags, row["tag name"].(string))
}
return tags, nil
}
return instances[serverName].Tags, nil
}
func kamateraRequestBool(val bool) string {
if val {
return "yes"
}
return "no"
}
func kamateraRequestArray(val []string) (string, error) {
for _, v := range val {
if strings.Contains(v, " ") {
return "", fmt.Errorf("invalid Kamatera server configuration array value (\"%s\"): ,must not contain spaces", v)
}
}
return strings.Join(val, " "), nil
}
func kamateraServerName(namePrefix string) string {
if len(namePrefix) > 0 {
namePrefix = fmt.Sprintf("%s-", namePrefix)
}
return fmt.Sprintf("%s%s", namePrefix, hex.EncodeToString(uuid.NewV4().Bytes()))
}

View File

@ -0,0 +1,147 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"testing"
)
const (
mockKamateraClientId = "mock-client-id"
mockKamateraSecret = "mock-secret"
)
func TestApiClientRest_ListServers_NoServers(t *testing.T) {
server := NewHttpServerMock(MockFieldContentType, MockFieldResponse)
defer server.Close()
ctx := context.Background()
client := NewKamateraApiClientRest(mockKamateraClientId, mockKamateraSecret, server.URL)
server.On("handle", "/service/servers").Return(
"application/json",
`[]`,
).Once()
servers, err := client.ListServers(ctx, map[string]*Instance{})
assert.NoError(t, err)
assert.Equal(t, 0, len(servers))
mock.AssertExpectationsForObjects(t, server)
}
func TestApiClientRest_ListServers(t *testing.T) {
server := NewHttpServerMock(MockFieldContentType, MockFieldResponse)
defer server.Close()
ctx := context.Background()
client := NewKamateraApiClientRest(mockKamateraClientId, mockKamateraSecret, server.URL)
newServerName1 := mockKamateraServerName()
cachedServerName2 := mockKamateraServerName()
cachedServerName3 := mockKamateraServerName()
server.On("handle", "/service/servers").Return(
"application/json",
fmt.Sprintf(`[
{"name": "%s", "power": "on"},
{"name": "%s", "power": "on"},
{"name": "%s", "power": "off"}
]`, newServerName1, cachedServerName2, cachedServerName3),
).On("handle", "/server/tags").Return(
"application/json",
`[{"tag name": "test-tag"}, {"tag name": "other-test-tag"}]`,
)
servers, err := client.ListServers(ctx, map[string]*Instance{
cachedServerName2: {
Id: cachedServerName2,
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning},
PowerOn: true,
Tags: []string{"my-tag", "my-other-tag"},
},
cachedServerName3: {
Id: cachedServerName3,
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning},
PowerOn: true,
Tags: []string{"another-tag", "my-other-tag"},
},
})
assert.NoError(t, err)
assert.Equal(t, 3, len(servers))
assert.Equal(t, servers, []Server{
{
Name: newServerName1,
Tags: []string{"test-tag", "other-test-tag"},
PowerOn: true,
},
{
Name: cachedServerName2,
Tags: []string{"my-tag", "my-other-tag"},
PowerOn: true,
},
{
Name: cachedServerName3,
Tags: []string{"another-tag", "my-other-tag"},
PowerOn: false,
},
})
mock.AssertExpectationsForObjects(t, server)
}
func TestApiClientRest_DeleteServer(t *testing.T) {
server := NewHttpServerMock(MockFieldContentType, MockFieldResponse)
defer server.Close()
ctx := context.Background()
client := NewKamateraApiClientRest(mockKamateraClientId, mockKamateraSecret, server.URL)
serverName := mockKamateraServerName()
commandId := "mock-command-id"
server.On("handle", "/service/server/poweroff").Return(
"application/json",
fmt.Sprintf(`["%s"]`, commandId),
).Once().On("handle", "/service/queue").Return(
"application/json",
`[{"status": "complete"}]`,
).Once().On("handle", "/service/server/terminate").Return(
"application/json",
"{}",
).Once()
err := client.DeleteServer(ctx, serverName)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, server)
}
func TestApiClientRest_CreateServers(t *testing.T) {
server := NewHttpServerMock(MockFieldContentType, MockFieldResponse)
defer server.Close()
ctx := context.Background()
client := NewKamateraApiClientRest(mockKamateraClientId, mockKamateraSecret, server.URL)
commandId := "command"
server.On("handle", "/service/server").Return(
"application/json",
fmt.Sprintf(`["%s"]`, commandId),
).Twice().On("handle", "/service/queue").Return(
"application/json",
`[{"status": "complete"}]`,
).Twice()
servers, err := client.CreateServers(ctx, 2, mockServerConfig("test", []string{"foo", "bar"}))
assert.NoError(t, err)
assert.Equal(t, 2, len(servers))
assert.Less(t, 10, len(servers[0].Name))
assert.Less(t, 10, len(servers[1].Name))
assert.Equal(t, servers[0].Tags, []string{"foo", "bar"})
assert.Equal(t, servers[1].Tags, []string{"foo", "bar"})
mock.AssertExpectationsForObjects(t, server)
}

View File

@ -0,0 +1,281 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"fmt"
"io"
"strconv"
"gopkg.in/gcfg.v1"
)
const (
defaultMinSize int = 1
defaultMaxSize int = 254
defaultApiUrl string = "https://cloudcli.cloudwm.com"
)
// nodeGroupConfig is the configuration for a specific node group.
type nodeGroupConfig struct {
minSize int
maxSize int
NamePrefix string
Password string
SshKey string
Datacenter string
Image string
Cpu string
Ram string
Disks []string
Dailybackup bool
Managed bool
Networks []string
BillingCycle string
MonthlyPackage string
ScriptBase64 string
}
// kamateraConfig holds the configuration for the Kamatera provider.
type kamateraConfig struct {
apiClientId string
apiSecret string
apiUrl string
clusterName string
defaultMinSize int
defaultMaxSize int
nodeGroupCfg map[string]*nodeGroupConfig // key is the node group name
}
// GcfgGlobalConfig is the gcfg representation of the global section in the cloud config file for Kamatera.
type GcfgGlobalConfig struct {
KamateraApiClientId string `gcfg:"kamatera-api-client-id"`
KamateraApiSecret string `gcfg:"kamatera-api-secret"`
KamateraApiUrl string `gcfg:"kamatera-api-url"`
ClusterName string `gcfg:"cluster-name"`
DefaultMinSize string `gcfg:"default-min-size"`
DefaultMaxSize string `gcfg:"default-max-size"`
DefaultNamePrefix string `gcfg:"default-name-prefix"`
DefaultPassword string `gcfg:"default-password"`
DefaultSshKey string `gcfg:"default-ssh-key"`
DefaultDatacenter string `gcfg:"default-datacenter"`
DefaultImage string `gcfg:"default-image"`
DefaultCpu string `gcfg:"default-cpu"`
DefaultRam string `gcfg:"default-ram"`
DefaultDisks []string `gcfg:"default-disk"`
DefaultDailybackup bool `gcfg:"default-dailybackup"`
DefaultManaged bool `gcfg:"default-managed"`
DefaultNetworks []string `gcfg:"default-network"`
DefaultBillingCycle string `gcfg:"default-billingcycle"`
DefaultMonthlyPackage string `gcfg:"default-monthlypackage"`
DefaultScriptBase64 string `gcfg:"default-script-base64"`
}
// GcfgNodeGroupConfig is the gcfg representation of the section in the cloud config file to change defaults for a node group.
type GcfgNodeGroupConfig struct {
MinSize string `gcfg:"min-size"`
MaxSize string `gcfg:"max-size"`
NamePrefix string `gcfg:"name-prefix"`
Password string `gcfg:"password"`
SshKey string `gcfg:"ssh-key"`
Datacenter string `gcfg:"datacenter"`
Image string `gcfg:"image"`
Cpu string `gcfg:"cpu"`
Ram string `gcfg:"ram"`
Disks []string `gcfg:"disk"`
Dailybackup bool `gcfg:"dailybackup"`
Managed bool `gcfg:"managed"`
Networks []string `gcfg:"network"`
BillingCycle string `gcfg:"billingcycle"`
MonthlyPackage string `gcfg:"monthlypackage"`
ScriptBase64 string `gcfg:"script-base64"`
}
// gcfgCloudConfig is the gcfg representation of the cloud config file for Kamatera.
type gcfgCloudConfig struct {
Global GcfgGlobalConfig `gcfg:"global"`
NodeGroups map[string]*GcfgNodeGroupConfig `gcfg:"nodegroup"` // key is the node group name
}
// buildCloudConfig creates the configuration struct for the provider.
func buildCloudConfig(config io.Reader) (*kamateraConfig, error) {
// read the config and get the gcfg struct
var gcfgCloudConfig gcfgCloudConfig
if err := gcfg.ReadInto(&gcfgCloudConfig, config); err != nil {
return nil, err
}
// get the clusterName and Kamatera tokens
clusterName := gcfgCloudConfig.Global.ClusterName
if len(clusterName) == 0 {
return nil, fmt.Errorf("cluster name is not set")
}
apiClientId := gcfgCloudConfig.Global.KamateraApiClientId
if len(apiClientId) == 0 {
return nil, fmt.Errorf("kamatera api client id is not set")
}
apiSecret := gcfgCloudConfig.Global.KamateraApiSecret
if len(apiSecret) == 0 {
return nil, fmt.Errorf("kamatera api secret is not set")
}
apiUrl := gcfgCloudConfig.Global.KamateraApiUrl
if len(apiUrl) == 0 {
apiUrl = defaultApiUrl
}
// Cluster name must be max 15 characters due to limitation of Kamatera server tags
if len(clusterName) > 15 {
return nil, fmt.Errorf("cluster name must be at most 15 characters long")
}
// get the default min and max size as defined in the global section of the config file
defaultMinSize, defaultMaxSize, err := getSizeLimits(
gcfgCloudConfig.Global.DefaultMinSize,
gcfgCloudConfig.Global.DefaultMaxSize,
defaultMinSize,
defaultMaxSize)
if err != nil {
return nil, fmt.Errorf("cannot get default size values in global section: %v", err)
}
// get the specific configuration of a node group
nodeGroupCfg := make(map[string]*nodeGroupConfig)
for nodeGroupName, gcfgNodeGroup := range gcfgCloudConfig.NodeGroups {
// node group name must be max 15 characters due to limitation of Kamatera server tags
if len(nodeGroupName) > 15 {
return nil, fmt.Errorf("node group name must be at most 15 characters long")
}
minSize, maxSize, err := getSizeLimits(gcfgNodeGroup.MinSize, gcfgNodeGroup.MaxSize, defaultMinSize, defaultMaxSize)
if err != nil {
return nil, fmt.Errorf("cannot get size values for node group %s: %v", nodeGroupName, err)
}
namePrefix := gcfgCloudConfig.Global.DefaultNamePrefix
if len(gcfgNodeGroup.NamePrefix) > 0 {
namePrefix = gcfgNodeGroup.NamePrefix
}
password := gcfgCloudConfig.Global.DefaultPassword
if len(gcfgNodeGroup.Password) > 0 {
password = gcfgNodeGroup.Password
}
sshKey := gcfgCloudConfig.Global.DefaultSshKey
if len(gcfgNodeGroup.SshKey) > 0 {
sshKey = gcfgNodeGroup.SshKey
}
datacenter := gcfgCloudConfig.Global.DefaultDatacenter
if len(gcfgNodeGroup.Datacenter) > 0 {
datacenter = gcfgNodeGroup.Datacenter
}
image := gcfgCloudConfig.Global.DefaultImage
if len(gcfgNodeGroup.Image) > 0 {
image = gcfgNodeGroup.Image
}
cpu := gcfgCloudConfig.Global.DefaultCpu
if len(gcfgNodeGroup.Cpu) > 0 {
cpu = gcfgNodeGroup.Cpu
}
ram := gcfgCloudConfig.Global.DefaultRam
if len(gcfgNodeGroup.Ram) > 0 {
ram = gcfgNodeGroup.Ram
}
disks := gcfgCloudConfig.Global.DefaultDisks
if gcfgNodeGroup.Disks != nil {
disks = gcfgNodeGroup.Disks
}
dailybackup := gcfgCloudConfig.Global.DefaultDailybackup
if gcfgNodeGroup.Dailybackup {
dailybackup = gcfgNodeGroup.Dailybackup
}
managed := gcfgCloudConfig.Global.DefaultManaged
if gcfgNodeGroup.Managed {
managed = gcfgNodeGroup.Managed
}
networks := gcfgCloudConfig.Global.DefaultNetworks
if gcfgNodeGroup.Networks != nil {
networks = gcfgNodeGroup.Networks
}
billingCycle := gcfgCloudConfig.Global.DefaultBillingCycle
if len(gcfgNodeGroup.BillingCycle) > 0 {
billingCycle = gcfgNodeGroup.BillingCycle
}
monthlyPackage := gcfgCloudConfig.Global.DefaultMonthlyPackage
if len(gcfgNodeGroup.MonthlyPackage) > 0 {
monthlyPackage = gcfgNodeGroup.MonthlyPackage
}
scriptBase64 := gcfgCloudConfig.Global.DefaultScriptBase64
if len(gcfgNodeGroup.ScriptBase64) > 0 {
scriptBase64 = gcfgNodeGroup.ScriptBase64
}
ngc := &nodeGroupConfig{
maxSize: maxSize,
minSize: minSize,
NamePrefix: namePrefix,
Password: password,
SshKey: sshKey,
Datacenter: datacenter,
Image: image,
Cpu: cpu,
Ram: ram,
Disks: disks,
Dailybackup: dailybackup,
Managed: managed,
Networks: networks,
BillingCycle: billingCycle,
MonthlyPackage: monthlyPackage,
ScriptBase64: scriptBase64,
}
nodeGroupCfg[nodeGroupName] = ngc
}
return &kamateraConfig{
clusterName: clusterName,
apiClientId: apiClientId,
apiSecret: apiSecret,
apiUrl: apiUrl,
defaultMinSize: defaultMinSize,
defaultMaxSize: defaultMaxSize,
nodeGroupCfg: nodeGroupCfg,
}, nil
}
// getSizeLimits takes the max, min size of a node group as strings (empty if no values are provided)
// and default sizes, validates them and returns them as integer, or an error if such occurred
func getSizeLimits(minStr string, maxStr string, defaultMin int, defaultMax int) (int, int, error) {
var err error
min := defaultMin
if len(minStr) != 0 {
min, err = strconv.Atoi(minStr)
if err != nil {
return 0, 0, fmt.Errorf("could not parse min size for node group: %v", err)
}
}
if min < 0 {
return 0, 0, fmt.Errorf("min size for node group cannot be < 0")
}
max := defaultMax
if len(maxStr) != 0 {
max, err = strconv.Atoi(maxStr)
if err != nil {
return 0, 0, fmt.Errorf("could not parse max size for node group: %v", err)
}
}
if min > max {
return 0, 0, fmt.Errorf("min size for a node group must be less than its max size (got min: %d, max: %d)",
min, max)
}
return min, max, nil
}

View File

@ -0,0 +1,253 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestCloudConfig_getSizeLimits(t *testing.T) {
_, _, err := getSizeLimits("3", "2", 1, 2)
assert.Error(t, err, "no errors on minSize > maxSize")
_, _, err = getSizeLimits("4", "", 2, 3)
assert.Error(t, err, "no errors on minSize > maxSize using defaults")
_, _, err = getSizeLimits("", "4", 5, 10)
assert.Error(t, err, "no errors on minSize > maxSize using defaults")
_, _, err = getSizeLimits("-1", "4", 5, 10)
assert.Error(t, err, "no errors on minSize <= 0")
_, _, err = getSizeLimits("1", "4a", 5, 10)
assert.Error(t, err, "no error on malformed integer string")
_, _, err = getSizeLimits("1.0", "4", 5, 10)
assert.Error(t, err, "no error on malformed integer string")
min, max, err := getSizeLimits("", "", 1, 2)
assert.Equal(t, 1, min)
assert.Equal(t, 2, max)
min, max, err = getSizeLimits("", "3", 1, 2)
assert.Equal(t, 1, min)
assert.Equal(t, 3, max)
min, max, err = getSizeLimits("6", "8", 1, 2)
assert.Equal(t, 6, min)
assert.Equal(t, 8, max)
}
func TestCloudConfig_buildCloudConfig(t *testing.T) {
cfg := strings.NewReader(`
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=aaabbb
default-min-size=1
default-max-size=10
default-name-prefix=test
default-password=Aa123456!
default-ssh-key=ssh-rsa AAAA...
default-datacenter=IL
default-image=ubuntu-1604-x64-server-2016-03-01
default-cpu=1a
default-ram=1024
default-disk=size=10
default-disk=size=20
default-network=name=wan,ip=auto
default-billingcycle=hourly
default-monthlypackage=t5000
default-script-base64=ZGVmYXVsdAo=
[nodegroup "default"]
[nodegroup "highcpu"]
min-size=3
name-prefix=highcpu
password=Bb654321!
ssh-key=ssh-rsa BBBB...
datacenter=US
image=ubuntu-2204
cpu=2a
ram=2048
disk=size=50
dailybackup=true
managed=true
network=name=wan,ip=auto
network=name=lan-12345-lan,ip=auto
billingcycle=monthly
monthlypackage=t10000
script-base64=aGlnaGJwdQo=
[nodegroup "highram"]
max-size=2
`)
config, err := buildCloudConfig(cfg)
assert.NoError(t, err)
assert.Equal(t, "1a222bbb3ccc44d5555e6ff77g88hh9i", config.apiClientId)
assert.Equal(t, "9ii88h7g6f55555ee4444444dd33eee2", config.apiSecret)
assert.Equal(t, "aaabbb", config.clusterName)
assert.Equal(t, 1, config.defaultMinSize)
assert.Equal(t, 10, config.defaultMaxSize)
assert.Equal(t, 3, len(config.nodeGroupCfg))
assert.Equal(t, 1, config.nodeGroupCfg["default"].minSize)
assert.Equal(t, 10, config.nodeGroupCfg["default"].maxSize)
assert.Equal(t, 3, config.nodeGroupCfg["highcpu"].minSize)
assert.Equal(t, 10, config.nodeGroupCfg["highcpu"].maxSize)
assert.Equal(t, 1, config.nodeGroupCfg["highram"].minSize)
assert.Equal(t, 2, config.nodeGroupCfg["highram"].maxSize)
// default server configurations
assert.Equal(t, "test", config.nodeGroupCfg["default"].NamePrefix)
assert.Equal(t, "Aa123456!", config.nodeGroupCfg["default"].Password)
assert.Equal(t, "ssh-rsa AAAA...", config.nodeGroupCfg["default"].SshKey)
assert.Equal(t, "IL", config.nodeGroupCfg["default"].Datacenter)
assert.Equal(t, "ubuntu-1604-x64-server-2016-03-01", config.nodeGroupCfg["default"].Image)
assert.Equal(t, "1a", config.nodeGroupCfg["default"].Cpu)
assert.Equal(t, "1024", config.nodeGroupCfg["default"].Ram)
assert.Equal(t, []string{"size=10", "size=20"}, config.nodeGroupCfg["default"].Disks)
assert.False(t, config.nodeGroupCfg["default"].Dailybackup)
assert.False(t, config.nodeGroupCfg["default"].Managed)
assert.Equal(t, []string{"name=wan,ip=auto"}, config.nodeGroupCfg["default"].Networks)
assert.Equal(t, "hourly", config.nodeGroupCfg["default"].BillingCycle)
assert.Equal(t, "t5000", config.nodeGroupCfg["default"].MonthlyPackage)
assert.Equal(t, "ZGVmYXVsdAo=", config.nodeGroupCfg["default"].ScriptBase64)
// highcpu server configurations
assert.Equal(t, "highcpu", config.nodeGroupCfg["highcpu"].NamePrefix)
assert.Equal(t, "Bb654321!", config.nodeGroupCfg["highcpu"].Password)
assert.Equal(t, "ssh-rsa BBBB...", config.nodeGroupCfg["highcpu"].SshKey)
assert.Equal(t, "US", config.nodeGroupCfg["highcpu"].Datacenter)
assert.Equal(t, "ubuntu-2204", config.nodeGroupCfg["highcpu"].Image)
assert.Equal(t, "2a", config.nodeGroupCfg["highcpu"].Cpu)
assert.Equal(t, "2048", config.nodeGroupCfg["highcpu"].Ram)
assert.Equal(t, []string{"size=50"}, config.nodeGroupCfg["highcpu"].Disks)
assert.True(t, config.nodeGroupCfg["highcpu"].Dailybackup)
assert.True(t, config.nodeGroupCfg["highcpu"].Managed)
assert.Equal(t, []string{"name=wan,ip=auto", "name=lan-12345-lan,ip=auto"}, config.nodeGroupCfg["highcpu"].Networks)
assert.Equal(t, "monthly", config.nodeGroupCfg["highcpu"].BillingCycle)
assert.Equal(t, "t10000", config.nodeGroupCfg["highcpu"].MonthlyPackage)
assert.Equal(t, "aGlnaGJwdQo=", config.nodeGroupCfg["highcpu"].ScriptBase64)
cfg = strings.NewReader(`
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=aaabbb
default-min-size=1
default-max-size=10
[nodegroup "default"]
[nodegroup "highcpu"]
min-size=3
[nodegroup "highram"]
max-size=2a
`)
config, err = buildCloudConfig(cfg)
assert.Error(t, err, "no error on size of a specific node group is not an integer string")
cfg = strings.NewReader(`
[global]
cluster-name=aaabbb
default-min-size=1
default-max-size=10
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
[nodegroup "default"]
[nodegroup "highcpu"]
min-size=3
[nodegroup "highram"]
max-size=2
`)
config, err = buildCloudConfig(cfg)
assert.Error(t, err, "no error on missing kamatera api secret")
assert.Contains(t, err.Error(), "kamatera api secret is not set")
cfg = strings.NewReader(`
[global]
cluster-name=aaabbb
default-min-size=1
default-max-size=10
[nodegroup "default"]
[nodegroup "highcpu"]
min-size=3
[nodegroup "highram"]
max-size=2
`)
config, err = buildCloudConfig(cfg)
assert.Error(t, err, "no error on missing kamatera api client id")
assert.Contains(t, err.Error(), "kamatera api client id is not set")
cfg = strings.NewReader(`
[gglobal]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=aaabbb
default-min-size=1
default-max-size=10
[nodegroup "default"]
[nodegroup "highcpu"]
min-size=3
[nodegroup "highram"]
max-size=2
`)
config, err = buildCloudConfig(cfg)
assert.Error(t, err, "no error when config has no global section")
assert.Contains(t, err.Error(), "can't store data at section \"gglobal\"")
cfg = strings.NewReader(`
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=aaabbb
default-min-size=1
default-max-size=10
[nodegroup "1234567890123456"]
`)
config, err = buildCloudConfig(cfg)
assert.Error(t, err, "no error when nodegroup name is more then 15 characters")
assert.Contains(t, err.Error(), "node group name must be at most 15 characters long")
cfg = strings.NewReader(`
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=1234567890123456
default-min-size=1
default-max-size=10
[nodegroup "default"]
`)
config, err = buildCloudConfig(cfg)
assert.Error(t, err, "no error when cluster name is more then 15 characters")
assert.Contains(t, err.Error(), "cluster name must be at most 15 characters long")
}

View File

@ -0,0 +1,177 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"fmt"
"io"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"os"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
klog "k8s.io/klog/v2"
)
// kamateraCloudProvider implements cloudprovider.CloudProvider interface.
type kamateraCloudProvider struct {
manager *manager
resourceLimiter *cloudprovider.ResourceLimiter
}
// Name returns name of the cloud provider.
func (k *kamateraCloudProvider) Name() string {
return cloudprovider.KamateraProviderName
}
// NodeGroups returns all node groups configured for this cloud provider.
func (k *kamateraCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
nodeGroups := make([]cloudprovider.NodeGroup, len(k.manager.nodeGroups))
i := 0
for _, ng := range k.manager.nodeGroups {
nodeGroups[i] = ng
i++
}
return nodeGroups
}
// NodeGroupForNode returns the node group for the given node, nil if the node
// should not be processed by cluster autoscaler, or non-nil error if such
// occurred. Must be implemented.
func (k *kamateraCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.NodeGroup, error) {
for _, ng := range k.manager.nodeGroups {
instance, err := ng.findInstanceForNode(node)
if err != nil {
return nil, err
}
if instance != nil {
return ng, nil
}
}
return nil, nil
}
// Pricing returns pricing model for this cloud provider or error if not available.
// Implementation optional.
func (k *kamateraCloudProvider) Pricing() (cloudprovider.PricingModel, errors.AutoscalerError) {
return nil, cloudprovider.ErrNotImplemented
}
// GetAvailableMachineTypes get all machine types that can be requested from the cloud provider.
// Implementation optional.
func (k *kamateraCloudProvider) GetAvailableMachineTypes() ([]string, error) {
return []string{}, cloudprovider.ErrNotImplemented
}
// NewNodeGroup builds a theoretical node group based on the node definition provided. The node group is not automatically
// created on the cloud provider side. The node group is not returned by NodeGroups() until it is created.
// Implementation optional.
func (k *kamateraCloudProvider) NewNodeGroup(machineType string, labels map[string]string, systemLabels map[string]string,
taints []apiv1.Taint, extraResources map[string]resource.Quantity) (cloudprovider.NodeGroup, error) {
return nil, cloudprovider.ErrNotImplemented
}
// GetResourceLimiter returns struct containing limits (max, min) for resources (cores, memory etc.).
func (k *kamateraCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) {
return k.resourceLimiter, nil
}
// GPULabel returns the label added to nodes with GPU resource.
func (k *kamateraCloudProvider) GPULabel() string {
return ""
}
// GetAvailableGPUTypes return all available GPU types cloud provider supports.
func (k *kamateraCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
return nil
}
// Cleanup cleans up open resources before the cloud provider is destroyed, i.e. go routines etc.
func (k *kamateraCloudProvider) Cleanup() error {
return nil
}
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
func (k *kamateraCloudProvider) Refresh() error {
return k.manager.refresh()
}
// BuildKamatera builds the Kamatera cloud provider.
func BuildKamatera(
opts config.AutoscalingOptions,
do cloudprovider.NodeGroupDiscoveryOptions,
rl *cloudprovider.ResourceLimiter,
) cloudprovider.CloudProvider {
if opts.CloudConfig == "" {
klog.Fatalf("No config file provided, please specify it via the --cloud-config flag")
}
configFile, err := os.Open(opts.CloudConfig)
if err != nil {
klog.Fatalf("Could not open cloud provider configuration file %q, error: %v", opts.CloudConfig, err)
}
defer configFile.Close()
kcp, err := newKamateraCloudProvider(configFile, rl, createKubeClient(opts))
if err != nil {
klog.Fatalf("Could not create kamatera cloud provider: %v", err)
}
return kcp
}
func newKamateraCloudProvider(config io.Reader, rl *cloudprovider.ResourceLimiter, kubeClient kubernetes.Interface) (cloudprovider.CloudProvider, error) {
m, err := newManager(config, kubeClient)
if err != nil {
return nil, fmt.Errorf("could not create kamatera manager: %v", err)
}
err = m.refresh()
if err != nil {
klog.V(1).Infof("Error on first import of Kamatera node groups: %v", err)
}
klog.V(1).Infof("First import of existing Kamatera node groups ended")
if len(m.nodeGroups) == 0 {
klog.V(1).Infof("Could not import any Kamatera node groups")
} else {
klog.V(1).Infof("imported Kamatera node groups:")
for _, ng := range m.nodeGroups {
klog.V(1).Infof("%s", ng.extendedDebug())
}
}
return &kamateraCloudProvider{
manager: m,
resourceLimiter: rl,
}, nil
}
func getKubeConfig(opts config.AutoscalingOptions) *rest.Config {
klog.V(1).Infof("Using kubeconfig file: %s", opts.KubeConfigPath)
kubeConfig, err := clientcmd.BuildConfigFromFlags("", opts.KubeConfigPath)
if err != nil {
klog.Fatalf("Failed to build kubeConfig: %v", err)
}
return kubeConfig
}
func createKubeClient(opts config.AutoscalingOptions) kubernetes.Interface {
return kubernetes.NewForConfigOrDie(getKubeConfig(opts))
}

View File

@ -0,0 +1,167 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
func TestCloudProvider_newKamateraCloudProvider(t *testing.T) {
// test ok on correctly creating a Kamatera provider
server := NewHttpServerMock(MockFieldContentType, MockFieldResponse)
defer server.Close()
rl := &cloudprovider.ResourceLimiter{}
cfg := strings.NewReader(fmt.Sprintf(`
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
kamatera-api-url=%s
cluster-name=aaabbb
`, server.URL))
server.On("handle", "/service/servers").Return(
"application/json", `[]`).Once()
_, err := newKamateraCloudProvider(cfg, rl, nil)
assert.NoError(t, err)
// test error on creating a Kamatera provider when config is bad
cfg = strings.NewReader(`
[global]
kamatera-api-client-id=
kamatera-api-secret=
cluster-name=
`)
_, err = newKamateraCloudProvider(cfg, rl, nil)
assert.Error(t, err)
assert.Equal(t, "could not create kamatera manager: failed to parse config: cluster name is not set", err.Error())
}
func TestCloudProvider_NodeGroups(t *testing.T) {
// test ok on getting the correct nodes when calling NodeGroups()
cfg := strings.NewReader(`
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=aaabbb
`)
m, _ := newManager(cfg, nil)
m.nodeGroups = map[string]*NodeGroup{
"ng1": {id: "ng1"},
"ng2": {id: "ng2"},
}
kcp := &kamateraCloudProvider{manager: m}
ng := kcp.NodeGroups()
assert.Equal(t, 2, len(ng))
assert.Contains(t, ng, m.nodeGroups["ng1"])
assert.Contains(t, ng, m.nodeGroups["ng2"])
}
func TestCloudProvider_NodeGroupForNode(t *testing.T) {
cfg := strings.NewReader(`
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=aaabbb
`)
m, _ := newManager(cfg, nil)
kamateraServerName1 := mockKamateraServerName()
kamateraServerName2 := mockKamateraServerName()
kamateraServerName3 := mockKamateraServerName()
kamateraServerName4 := mockKamateraServerName()
ng1 := &NodeGroup{
id: "ng1",
instances: map[string]*Instance{
kamateraServerName1: {Id: kamateraServerName1},
kamateraServerName2: {Id: kamateraServerName2},
},
}
ng2 := &NodeGroup{
id: "ng2",
instances: map[string]*Instance{
kamateraServerName3: {Id: kamateraServerName3},
kamateraServerName4: {Id: kamateraServerName4},
},
}
m.nodeGroups = map[string]*NodeGroup{
"ng1": ng1,
"ng2": ng2,
}
kcp := &kamateraCloudProvider{manager: m}
// test ok on getting the right node group for an apiv1.Node
node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: kamateraServerName1,
},
}
ng, err := kcp.NodeGroupForNode(node)
assert.NoError(t, err)
assert.Equal(t, ng1, ng)
// test ok on getting the right node group for an apiv1.Node
node = &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: kamateraServerName4,
},
}
ng, err = kcp.NodeGroupForNode(node)
assert.NoError(t, err)
assert.Equal(t, ng2, ng)
// test ok on getting nil when looking for a apiv1.Node we do not manage
node = &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: mockKamateraServerName(),
},
}
ng, err = kcp.NodeGroupForNode(node)
assert.NoError(t, err)
assert.Nil(t, ng)
}
func TestCloudProvider_others(t *testing.T) {
cfg := strings.NewReader(`
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=aaabbb
`)
m, _ := newManager(cfg, nil)
resourceLimiter := &cloudprovider.ResourceLimiter{}
kcp := &kamateraCloudProvider{
manager: m,
resourceLimiter: resourceLimiter,
}
assert.Equal(t, cloudprovider.KamateraProviderName, kcp.Name())
_, err := kcp.GetAvailableMachineTypes()
assert.Error(t, err)
_, err = kcp.NewNodeGroup("", nil, nil, nil, nil)
assert.Error(t, err)
rl, err := kcp.GetResourceLimiter()
assert.Equal(t, resourceLimiter, rl)
assert.Equal(t, "", kcp.GPULabel())
assert.Nil(t, kcp.GetAvailableGPUTypes())
assert.Nil(t, kcp.Cleanup())
_, err2 := kcp.Pricing()
assert.Error(t, err2)
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"context"
"fmt"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)
// Instance implements cloudprovider.Instance interface. Instance contains
// configuration info and functions to control a single Kamatera server instance.
type Instance struct {
// Id is the Kamatera server Name.
Id string
// Status represents status of the node. (Optional)
Status *cloudprovider.InstanceStatus
// Kamatera specific fields
PowerOn bool
Tags []string
}
func (i *Instance) delete(client kamateraAPIClient) error {
i.Status.State = cloudprovider.InstanceDeleting
return client.DeleteServer(context.Background(), i.Id)
}
func (i *Instance) extendedDebug() string {
state := ""
if i.Status.State == cloudprovider.InstanceRunning {
state = "Running"
} else if i.Status.State == cloudprovider.InstanceCreating {
state = "Creating"
} else if i.Status.State == cloudprovider.InstanceDeleting {
state = "Deleting"
}
return fmt.Sprintf("instance ID: %s state: %s powerOn: %v", i.Id, state, i.PowerOn)
}

View File

@ -0,0 +1,240 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"context"
"encoding/base64"
"fmt"
"io"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/client-go/kubernetes"
"regexp"
klog "k8s.io/klog/v2"
)
const (
clusterServerTagPrefix string = "k8sca-"
nodeGroupTagPrefix string = "k8scang-"
)
// manager handles Kamatera communication and holds information about
// the node groups
type manager struct {
client kamateraAPIClient
config *kamateraConfig
nodeGroups map[string]*NodeGroup // key: NodeGroup.id
instances map[string]*Instance // key: Instance.id (which is also the Kamatera server name)
kubeClient kubernetes.Interface
}
func newManager(config io.Reader, kubeClient kubernetes.Interface) (*manager, error) {
cfg, err := buildCloudConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to parse config: %v", err)
}
client := buildKamateraAPIClient(cfg.apiClientId, cfg.apiSecret, cfg.apiUrl)
m := &manager{
client: client,
config: cfg,
nodeGroups: make(map[string]*NodeGroup),
instances: make(map[string]*Instance),
kubeClient: kubeClient,
}
return m, nil
}
func (m *manager) refresh() error {
servers, err := m.client.ListServers(
context.Background(),
m.instances,
)
if err != nil {
return fmt.Errorf("failed to get list of Kamatera servers from Kamatera API: %v", err)
}
nodeGroups := make(map[string]*NodeGroup)
for nodeGroupName, nodeGroupCfg := range m.config.nodeGroupCfg {
nodeGroup, err := m.buildNodeGroup(nodeGroupName, nodeGroupCfg, servers)
if err != nil {
return fmt.Errorf("failed to build node group %s: %v", nodeGroupName, err)
}
nodeGroups[nodeGroupName] = nodeGroup
}
// show some debug info
klog.V(2).Infof("Kamatera node groups after refresh:")
for _, ng := range nodeGroups {
klog.V(2).Infof("%s", ng.extendedDebug())
}
m.nodeGroups = nodeGroups
return nil
}
func (m *manager) buildNodeGroup(name string, cfg *nodeGroupConfig, servers []Server) (*NodeGroup, error) {
// TODO: do validation of server args with Kamatera api
instances, err := m.getNodeGroupInstances(name, servers)
if err != nil {
return nil, fmt.Errorf("failed to get instances for node group %s: %v", name, err)
}
password := cfg.Password
if len(password) == 0 {
password = "__generate__"
}
scriptBytes, err := base64.StdEncoding.DecodeString(cfg.ScriptBase64)
if err != nil {
return nil, fmt.Errorf("failed to decode script for node group %s: %v", name, err)
}
script := string(scriptBytes)
if len(script) < 1 {
return nil, fmt.Errorf("script for node group %s is empty", name)
}
if len(cfg.Datacenter) < 1 {
return nil, fmt.Errorf("datacenter for node group %s is empty", name)
}
if len(cfg.Image) < 1 {
return nil, fmt.Errorf("image for node group %s is empty", name)
}
if len(cfg.Cpu) < 1 {
return nil, fmt.Errorf("cpu for node group %s is empty", name)
}
if len(cfg.Ram) < 1 {
return nil, fmt.Errorf("ram for node group %s is empty", name)
}
if len(cfg.Disks) < 1 {
return nil, fmt.Errorf("no disks for node group %s", name)
}
if len(cfg.Networks) < 1 {
return nil, fmt.Errorf("no networks for node group %s", name)
}
billingCycle := cfg.BillingCycle
if billingCycle == "" {
billingCycle = "hourly"
} else if billingCycle != "hourly" && billingCycle != "monthly" {
return nil, fmt.Errorf("billing cycle for node group %s is invalid", name)
}
tags := []string{
fmt.Sprintf("%s%s", clusterServerTagPrefix, m.config.clusterName),
fmt.Sprintf("%s%s", nodeGroupTagPrefix, name),
}
for _, tag := range tags {
if len(tag) < 3 {
return nil, fmt.Errorf("tag %s is too short, must be at least 3 characters", tag)
}
if len(tag) > 24 {
return nil, fmt.Errorf("tag %s is too long, must be at most 24 characters", tag)
}
tagRegexp := `^[a-zA-Z0-9\-_\s\.]{3,24}$`
matched, err := regexp.MatchString(tagRegexp, tag)
if err != nil {
return nil, fmt.Errorf("failed to validate tag %s: %v", tag, err)
}
if !matched {
return nil, fmt.Errorf("tag %s is invalid, must contain only English letters, numbers, dash, underscore, space, dot", tag)
}
}
serverConfig := ServerConfig{
NamePrefix: cfg.NamePrefix,
Password: password,
SshKey: cfg.SshKey,
Datacenter: cfg.Datacenter,
Image: cfg.Image,
Cpu: cfg.Cpu,
Ram: cfg.Ram,
Disks: cfg.Disks,
Dailybackup: cfg.Dailybackup,
Managed: cfg.Managed,
Networks: cfg.Networks,
BillingCycle: billingCycle,
MonthlyPackage: cfg.MonthlyPackage,
ScriptFile: script,
UserdataFile: "",
Tags: tags,
}
ng := &NodeGroup{
id: name,
manager: m,
minSize: cfg.minSize,
maxSize: cfg.maxSize,
instances: instances,
serverConfig: serverConfig,
}
return ng, nil
}
func (m *manager) getNodeGroupInstances(name string, servers []Server) (map[string]*Instance, error) {
clusterTag := fmt.Sprintf("%s%s", clusterServerTagPrefix, m.config.clusterName)
nodeGroupTag := fmt.Sprintf("%s%s", nodeGroupTagPrefix, name)
instances := make(map[string]*Instance)
for _, server := range servers {
hasClusterTag := false
hasNodeGroupTag := false
for _, tag := range server.Tags {
if tag == nodeGroupTag {
hasNodeGroupTag = true
} else if tag == clusterTag {
hasClusterTag = true
}
}
if m.instances[server.Name] == nil {
var state cloudprovider.InstanceState
if server.PowerOn {
state = cloudprovider.InstanceRunning
} else {
// for new servers that are stopped we assume they were deleted previously and deletion failed
state = cloudprovider.InstanceDeleting
}
m.instances[server.Name] = &Instance{
Id: server.Name,
Status: &cloudprovider.InstanceStatus{State: state},
PowerOn: server.PowerOn,
Tags: server.Tags,
}
} else {
if server.PowerOn {
m.instances[server.Name].Status.State = cloudprovider.InstanceRunning
} else {
// we can only make assumption about server state being powered on
// for other conditions we can't know why server is powered off, so we can't update state
}
m.instances[server.Name] = m.instances[server.Name]
m.instances[server.Name].PowerOn = server.PowerOn
m.instances[server.Name].Tags = server.Tags
}
if hasClusterTag && hasNodeGroupTag {
instances[server.Name] = m.instances[server.Name]
}
}
return instances, nil
}
func (m *manager) addInstance(server Server, state cloudprovider.InstanceState) (*Instance, error) {
if m.instances[server.Name] == nil {
m.instances[server.Name] = &Instance{
Id: server.Name,
Status: &cloudprovider.InstanceStatus{State: state},
PowerOn: server.PowerOn,
Tags: server.Tags,
}
} else {
m.instances[server.Name].Status.State = state
m.instances[server.Name].PowerOn = server.PowerOn
m.instances[server.Name].Tags = server.Tags
}
return m.instances[server.Name], nil
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"context"
"fmt"
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestManager_newManager(t *testing.T) {
cfg := strings.NewReader(`
[globalxxx]
`)
_, err := newManager(cfg, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "can't store data at section \"globalxxx\"")
cfg = strings.NewReader(`
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=aaabbb
`)
_, err = newManager(cfg, nil)
assert.NoError(t, err)
}
func TestManager_refresh(t *testing.T) {
cfg := strings.NewReader(fmt.Sprintf(`
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=aaabbb
default-datacenter=IL
default-image=ubuntu
default-cpu=1a
default-ram=1024
default-disk=size=10
default-network=name=wan,ip=auto
default-script-base64=ZGVmYXVsdAo=
[nodegroup "ng1"]
min-size=1
max-size=2
[nodegroup "ng2"]
min-size=4
max-size=5
`))
m, err := newManager(cfg, nil)
assert.NoError(t, err)
client := kamateraClientMock{}
m.client = &client
ctx := context.Background()
serverName1 := mockKamateraServerName()
serverName2 := mockKamateraServerName()
serverName3 := mockKamateraServerName()
serverName4 := mockKamateraServerName()
client.On(
"ListServers", ctx, m.instances,
).Return(
[]Server{
{Name: serverName1, Tags: []string{fmt.Sprintf("%s%s", clusterServerTagPrefix, "aaabbb"), fmt.Sprintf("%s%s", nodeGroupTagPrefix, "ng1")}},
{Name: serverName2, Tags: []string{fmt.Sprintf("%s%s", nodeGroupTagPrefix, "ng1"), fmt.Sprintf("%s%s", clusterServerTagPrefix, "aaabbb")}},
{Name: serverName3, Tags: []string{fmt.Sprintf("%s%s", nodeGroupTagPrefix, "ng1"), fmt.Sprintf("%s%s", clusterServerTagPrefix, "aaabbb")}},
{Name: serverName4, Tags: []string{fmt.Sprintf("%s%s", nodeGroupTagPrefix, "ng2"), fmt.Sprintf("%s%s", clusterServerTagPrefix, "aaabbb")}},
},
nil,
).Once()
err = m.refresh()
assert.NoError(t, err)
assert.Equal(t, 2, len(m.nodeGroups))
assert.Equal(t, 3, len(m.nodeGroups["ng1"].instances))
assert.Equal(t, 1, len(m.nodeGroups["ng2"].instances))
// test api error
client.On(
"ListServers", ctx, m.instances,
).Return(
[]Server{},
fmt.Errorf("error on API call"),
).Once()
err = m.refresh()
assert.Error(t, err)
assert.Equal(t, "failed to get list of Kamatera servers from Kamatera API: error on API call", err.Error())
}
func TestManager_refreshInvalidServerConfiguration(t *testing.T) {
cfgString := ""
assertRefreshServerConfigError(t, cfgString, "script for node group ng1 is empty")
cfgString = "default-script-base64=invalid"
assertRefreshServerConfigError(t, cfgString, "failed to decode script for node group ng1: illegal base64 data")
cfgString = "default-script-base64=ZGVmYXVsdAo="
assertRefreshServerConfigError(t, cfgString, "datacenter for node group ng1 is empty")
cfgString += "\ndefault-datacenter=IL"
assertRefreshServerConfigError(t, cfgString, "image for node group ng1 is empty")
cfgString += "\ndefault-image=ubuntu"
assertRefreshServerConfigError(t, cfgString, "cpu for node group ng1 is empty")
cfgString += "\ndefault-cpu=1a"
assertRefreshServerConfigError(t, cfgString, "ram for node group ng1 is empty")
cfgString += "\ndefault-ram=1024"
assertRefreshServerConfigError(t, cfgString, "no disks for node group ng1")
cfgString += "\ndefault-disk=size=10"
assertRefreshServerConfigError(t, cfgString, "no networks for node group ng1")
cfgString += "\ndefault-network=name=wan,ip=auto"
assertRefreshServerConfigError(t, cfgString, "")
}
func assertRefreshServerConfigError(t *testing.T, cfgString string, expectedError string) {
cfg := strings.NewReader(fmt.Sprintf(`
[global]
kamatera-api-client-id=1a222bbb3ccc44d5555e6ff77g88hh9i
kamatera-api-secret=9ii88h7g6f55555ee4444444dd33eee2
cluster-name=aaabbb
%s
[nodegroup "ng1"]
`, cfgString))
m, err := newManager(cfg, nil)
assert.NoError(t, err)
client := kamateraClientMock{}
m.client = &client
ctx := context.Background()
serverName1 := mockKamateraServerName()
client.On(
"ListServers", ctx, m.instances,
).Return(
[]Server{
{Name: serverName1, Tags: []string{fmt.Sprintf("%s%s", clusterServerTagPrefix, "aaabbb"), fmt.Sprintf("%s%s", nodeGroupTagPrefix, "ng1")}},
},
nil,
).Once()
err = m.refresh()
if expectedError == "" {
assert.NoError(t, err)
} else {
assert.Error(t, err)
assert.Contains(t, err.Error(), expectedError)
}
}

View File

@ -0,0 +1,316 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"context"
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"strconv"
"strings"
)
// NodeGroup implements cloudprovider.NodeGroup interface. NodeGroup contains
// configuration info and functions to control a set of nodes that have the
// same capacity and set of labels.
type NodeGroup struct {
id string
manager *manager
minSize int
maxSize int
instances map[string]*Instance // key is the instance ID
serverConfig ServerConfig
}
// MaxSize returns maximum size of the node group.
func (n *NodeGroup) MaxSize() int {
return n.maxSize
}
// MinSize returns minimum size of the node group.
func (n *NodeGroup) MinSize() int {
return n.minSize
}
// TargetSize returns the current target size of the node group. It is possible that the
// number of nodes in Kubernetes is different at the moment but should be equal
// to Size() once everything stabilizes (new nodes finish startup and registration or
// removed nodes are deleted completely). Implementation required.
func (n *NodeGroup) TargetSize() (int, error) {
return len(n.instances), nil
}
// IncreaseSize increases the size of the node group. To delete a node you need
// to explicitly name it and use DeleteNode. This function should wait until
// node group size is updated. Implementation required.
func (n *NodeGroup) IncreaseSize(delta int) error {
if delta <= 0 {
return fmt.Errorf("delta must be positive, have: %d", delta)
}
currentSize := len(n.instances)
targetSize := currentSize + delta
if targetSize > n.MaxSize() {
return fmt.Errorf("size increase is too large. current: %d desired: %d max: %d",
currentSize, targetSize, n.MaxSize())
}
err := n.createInstances(delta)
if err != nil {
return err
}
return nil
}
// DeleteNodes deletes nodes from this node group. Error is returned either on
// failure or if the given node doesn't belong to this node group. This function
// should wait until node group size is updated. Implementation required.
func (n *NodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
for _, node := range nodes {
instance, err := n.findInstanceForNode(node)
if err != nil {
return err
}
if instance == nil {
return fmt.Errorf("Failed to delete node %q with provider ID %q: cannot find this node in the node group",
node.Name, node.Spec.ProviderID)
}
err = n.deleteInstance(instance)
if err != nil {
return fmt.Errorf("Failed to delete node %q with provider ID %q: %v",
node.Name, node.Spec.ProviderID, err)
}
}
return nil
}
// DecreaseTargetSize decreases the target size of the node group. This function
// doesn't permit to delete any existing node and can be used only to reduce the
// request for new nodes that have not been yet fulfilled. Delta should be negative.
// It is assumed that cloud provider will not delete the existing nodes when there
// is an option to just decrease the target. Implementation required.
func (n *NodeGroup) DecreaseTargetSize(delta int) error {
// requests for new nodes are always fulfilled so we cannot
// decrease the size without actually deleting nodes
return cloudprovider.ErrNotImplemented
}
// Id returns an unique identifier of the node group.
func (n *NodeGroup) Id() string {
return n.id
}
// Debug returns a string containing all information regarding this node group.
func (n *NodeGroup) Debug() string {
return fmt.Sprintf("node group ID: %s (min:%d max:%d)", n.Id(), n.MinSize(), n.MaxSize())
}
// Nodes returns a list of all nodes that belong to this node group.
// It is required that Instance objects returned by this method have Id field set.
// Other fields are optional.
// This list should include also instances that might have not become a kubernetes node yet.
func (n *NodeGroup) Nodes() ([]cloudprovider.Instance, error) {
var instances []cloudprovider.Instance
for _, instance := range n.instances {
instances = append(instances, cloudprovider.Instance{
Id: instance.Id,
Status: instance.Status,
})
}
return instances, nil
}
// TemplateNodeInfo returns a schedulerframework.NodeInfo structure of an empty
// (as if just started) node. This will be used in scale-up simulations to
// predict what would a new node look like if a node group was expanded. The returned
// NodeInfo is expected to have a fully populated Node object, with all of the labels,
// capacity and allocatable information as well as all pods that are started on
// the node by default, using manifest (most likely only kube-proxy). Implementation optional.
func (n *NodeGroup) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) {
resourceList, err := n.getResourceList()
if err != nil {
return nil, fmt.Errorf("failed to create resource list for node group %s error: %v", n.id, err)
}
node := apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: kamateraServerName(""),
Labels: map[string]string{},
},
Status: apiv1.NodeStatus{
Capacity: resourceList,
Conditions: cloudprovider.BuildReadyConditions(),
},
}
node.Status.Allocatable = node.Status.Capacity
node.Status.Conditions = cloudprovider.BuildReadyConditions()
nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(n.id))
nodeInfo.SetNode(&node)
return nodeInfo, nil
}
// Exist checks if the node group really exists on the cloud provider side. Allows to tell the
// theoretical node group from the real one. Implementation required.
func (n *NodeGroup) Exist() bool {
return true
}
// Create creates the node group on the cloud provider side. Implementation optional.
func (n *NodeGroup) Create() (cloudprovider.NodeGroup, error) {
return nil, cloudprovider.ErrNotImplemented
}
// Delete deletes the node group on the cloud provider side.
// This will be executed only for autoprovisioned node groups, once their size drops to 0.
// Implementation optional.
func (n *NodeGroup) Delete() error {
return cloudprovider.ErrNotImplemented
}
// Autoprovisioned returns true if the node group is autoprovisioned. An autoprovisioned group
// was created by CA and can be deleted when scaled to 0.
func (n *NodeGroup) Autoprovisioned() bool {
return false
}
// GetOptions returns NodeGroupAutoscalingOptions that should be used for this particular
// NodeGroup. Returning a nil will result in using default options.
// Implementation optional.
func (n *NodeGroup) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) {
return nil, cloudprovider.ErrNotImplemented
}
func (n *NodeGroup) findInstanceForNode(node *apiv1.Node) (*Instance, error) {
for _, instance := range n.instances {
if instance.Id == node.Spec.ProviderID {
klog.V(2).Infof("findInstanceForNode(%s): found based on node ProviderID", node.Name)
return instance, nil
} else if node.Spec.ProviderID == "" && instance.Id == node.Name {
klog.V(2).Infof("findInstanceForNode(%s): found based on node Id", node.Name)
// Rancher does not set providerID for nodes, so we use node name as providerID
// We also set the ProviderID as some autoscaler code expects it to be set
node.Spec.ProviderID = instance.Id
err := setNodeProviderID(n.manager.kubeClient, node.Name, instance.Id)
if err != nil {
// this is not a critical error, the autoscaler can continue functioning in this condition
// as the same node object is used in later code the ProviderID change will be picked up
klog.Warningf("failed to set node ProviderID for node name %s: %v", instance.Id, err)
}
return instance, nil
}
}
return nil, nil
}
func (n *NodeGroup) deleteInstance(instance *Instance) error {
err := instance.delete(n.manager.client)
if err != nil {
return err
}
instances := make(map[string]*Instance)
for _, i := range n.instances {
if i.Id != instance.Id {
instances[i.Id] = i
}
}
n.instances = instances
return nil
}
func (n *NodeGroup) createInstances(count int) error {
servers, err := n.manager.client.CreateServers(context.Background(), count, n.serverConfig)
if err != nil {
return err
}
for _, server := range servers {
instance, err := n.manager.addInstance(server, cloudprovider.InstanceCreating)
if err != nil {
return err
}
n.instances[server.Name] = instance
}
return nil
}
func (n *NodeGroup) extendedDebug() string {
// TODO: provide extended debug information regarding this node group
msgs := []string{n.Debug()}
for _, instance := range n.instances {
msgs = append(msgs, instance.extendedDebug())
}
return strings.Join(msgs, "\n")
}
func (n *NodeGroup) getResourceList() (apiv1.ResourceList, error) {
ramMb, err := strconv.Atoi(n.serverConfig.Ram)
if err != nil {
return nil, fmt.Errorf("failed to parse server config ram %s: %v", n.serverConfig.Ram, err)
}
// TODO: handle CPU types
if len(n.serverConfig.Cpu) < 2 {
return nil, fmt.Errorf("failed to parse server config cpu %s", n.serverConfig.Cpu)
}
cpuCores, err := strconv.Atoi(n.serverConfig.Cpu[0 : len(n.serverConfig.Cpu)-1])
if err != nil {
return nil, fmt.Errorf("failed to parse server config cpu %s: %v", n.serverConfig.Cpu, err)
}
// TODO: handle additional disks
firstDiskSizeGb := 0
if len(n.serverConfig.Disks) > 0 {
firstDiskSpec := n.serverConfig.Disks[0]
for _, attr := range strings.Split(firstDiskSpec, ",") {
if strings.HasPrefix(attr, "size=") {
firstDiskSizeGb, err = strconv.Atoi(strings.Split(attr, "=")[1])
if err != nil {
return nil, err
}
}
}
}
return apiv1.ResourceList{
// TODO somehow determine the actual pods that will be running
apiv1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
apiv1.ResourceCPU: *resource.NewQuantity(int64(cpuCores), resource.DecimalSI),
apiv1.ResourceMemory: *resource.NewQuantity(int64(ramMb*1024*1024*1024), resource.DecimalSI),
apiv1.ResourceStorage: *resource.NewQuantity(int64(firstDiskSizeGb*1024*1024*1024), resource.DecimalSI),
}, nil
}
func setNodeProviderID(kubeClient kubernetes.Interface, nodeName string, value string) error {
node, err := kubeClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
if err != nil {
klog.Errorf("failed to get node to update provider ID %s %+v", nodeName, err)
return err
}
node.Spec.ProviderID = value
_, err = kubeClient.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update node's provider ID %s %+v", nodeName, err)
return err
}
klog.V(2).Infof("updated provider ID on node: %s", nodeName)
return nil
}

View File

@ -0,0 +1,297 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"context"
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)
func TestNodeGroup_IncreaseSize(t *testing.T) {
client := kamateraClientMock{}
ctx := context.Background()
mgr := manager{
client: &client,
instances: make(map[string]*Instance),
}
serverName1 := mockKamateraServerName()
serverName2 := mockKamateraServerName()
serverName3 := mockKamateraServerName()
serverConfig := mockServerConfig("test", []string{})
ng := NodeGroup{
id: "ng1",
manager: &mgr,
minSize: 1,
maxSize: 7,
instances: map[string]*Instance{
serverName1: {Id: serverName1, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
serverName2: {Id: serverName2, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
serverName3: {Id: serverName3, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
},
serverConfig: serverConfig,
}
// test error on bad delta values
err := ng.IncreaseSize(0)
assert.Error(t, err)
assert.Equal(t, "delta must be positive, have: 0", err.Error())
err = ng.IncreaseSize(-1)
assert.Error(t, err)
assert.Equal(t, "delta must be positive, have: -1", err.Error())
// test error on a too large increase of nodes
err = ng.IncreaseSize(5)
assert.Error(t, err)
assert.Equal(t, "size increase is too large. current: 3 desired: 8 max: 7", err.Error())
// test ok to add a node
client.On(
"CreateServers", ctx, 1, serverConfig,
).Return(
[]Server{{Name: mockKamateraServerName()}}, nil,
).Once()
err = ng.IncreaseSize(1)
assert.NoError(t, err)
assert.Equal(t, 4, len(ng.instances))
// test ok to add multiple nodes
client.On(
"CreateServers", ctx, 2, serverConfig,
).Return(
[]Server{
{Name: mockKamateraServerName()},
{Name: mockKamateraServerName()},
}, nil,
).Once()
err = ng.IncreaseSize(2)
assert.NoError(t, err)
assert.Equal(t, 6, len(ng.instances))
// test error on linode API call error
client.On(
"CreateServers", ctx, 1, serverConfig,
).Return(
[]Server{}, fmt.Errorf("error on API call"),
).Once()
err = ng.IncreaseSize(1)
assert.Error(t, err, "no error on injected API call error")
assert.Equal(t, "error on API call", err.Error())
}
func TestNodeGroup_DecreaseTargetSize(t *testing.T) {
ng := &NodeGroup{}
err := ng.DecreaseTargetSize(-1)
assert.Error(t, err)
assert.Equal(t, "Not implemented", err.Error())
}
func TestNodeGroup_DeleteNodes(t *testing.T) {
client := kamateraClientMock{}
ctx := context.Background()
mgr := manager{
client: &client,
instances: make(map[string]*Instance),
}
serverName1 := mockKamateraServerName()
serverName2 := mockKamateraServerName()
serverName3 := mockKamateraServerName()
serverName4 := mockKamateraServerName()
serverName5 := mockKamateraServerName()
serverName6 := mockKamateraServerName()
ng := NodeGroup{
id: "ng1",
minSize: 1,
maxSize: 6,
instances: map[string]*Instance{
serverName1: {Id: serverName1, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
serverName2: {Id: serverName2, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
serverName3: {Id: serverName3, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
serverName4: {Id: serverName4, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
serverName5: {Id: serverName5, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
serverName6: {Id: serverName6, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
},
manager: &mgr,
}
// test of deleting nodes
client.On(
"DeleteServer", ctx, serverName1,
).Return(nil).Once().On(
"DeleteServer", ctx, serverName2,
).Return(nil).Once().On(
"DeleteServer", ctx, serverName6,
).Return(nil).Once()
err := ng.DeleteNodes([]*apiv1.Node{
{Spec: apiv1.NodeSpec{ProviderID: serverName1}},
{Spec: apiv1.NodeSpec{ProviderID: serverName2}},
{Spec: apiv1.NodeSpec{ProviderID: serverName6}},
})
assert.NoError(t, err)
assert.Equal(t, 3, len(ng.instances))
assert.Equal(t, serverName3, ng.instances[serverName3].Id)
assert.Equal(t, serverName4, ng.instances[serverName4].Id)
assert.Equal(t, serverName5, ng.instances[serverName5].Id)
// test error on deleting a node we are not managing
err = ng.DeleteNodes([]*apiv1.Node{{Spec: apiv1.NodeSpec{ProviderID: mockKamateraServerName()}}})
assert.Error(t, err)
assert.Contains(t, err.Error(), "cannot find this node in the node group")
// test error on deleting a node when the linode API call fails
client.On(
"DeleteServer", ctx, serverName4,
).Return(fmt.Errorf("error on API call")).Once()
err = ng.DeleteNodes([]*apiv1.Node{
{Spec: apiv1.NodeSpec{ProviderID: serverName4}},
})
assert.Error(t, err)
assert.Contains(t, err.Error(), "error on API call")
}
func TestNodeGroup_Nodes(t *testing.T) {
client := kamateraClientMock{}
mgr := manager{
client: &client,
instances: make(map[string]*Instance),
}
serverName1 := mockKamateraServerName()
serverName2 := mockKamateraServerName()
serverName3 := mockKamateraServerName()
ng := NodeGroup{
id: "ng1",
minSize: 1,
maxSize: 6,
instances: map[string]*Instance{
serverName1: {Id: serverName1, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
serverName2: {Id: serverName2, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
serverName3: {Id: serverName3, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
},
manager: &mgr,
}
// test nodes returned from Nodes() are only the ones we are expecting
instancesList, err := ng.Nodes()
assert.NoError(t, err)
assert.Equal(t, 3, len(instancesList))
var serverIds []string
for _, instance := range instancesList {
serverIds = append(serverIds, instance.Id)
}
assert.Equal(t, 3, len(serverIds))
assert.Contains(t, serverIds, serverName1)
assert.Contains(t, serverIds, serverName2)
assert.Contains(t, serverIds, serverName3)
}
func TestNodeGroup_getResourceList(t *testing.T) {
ng := &NodeGroup{}
_, err := ng.getResourceList()
assert.ErrorContains(t, err, "failed to parse server config ram")
ng.serverConfig.Ram = "1024mb"
_, err = ng.getResourceList()
assert.ErrorContains(t, err, "failed to parse server config ram")
ng.serverConfig.Ram = "1024"
_, err = ng.getResourceList()
assert.ErrorContains(t, err, "failed to parse server config cpu")
ng.serverConfig.Cpu = "55PX"
_, err = ng.getResourceList()
assert.ErrorContains(t, err, "failed to parse server config cpu")
ng.serverConfig.Cpu = "55P"
rl, err := ng.getResourceList()
assert.NoError(t, err)
assert.Equal(t, apiv1.ResourceList{
apiv1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
apiv1.ResourceCPU: *resource.NewQuantity(int64(55), resource.DecimalSI),
apiv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*1024*1024), resource.DecimalSI),
apiv1.ResourceStorage: *resource.NewQuantity(int64(0*1024*1024*1024), resource.DecimalSI),
}, rl)
ng.serverConfig.Disks = []string{"size=50"}
rl, err = ng.getResourceList()
assert.NoError(t, err)
assert.Equal(t, apiv1.ResourceList{
apiv1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
apiv1.ResourceCPU: *resource.NewQuantity(int64(55), resource.DecimalSI),
apiv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*1024*1024), resource.DecimalSI),
apiv1.ResourceStorage: *resource.NewQuantity(int64(50*1024*1024*1024), resource.DecimalSI),
}, rl)
}
func TestNodeGroup_TemplateNodeInfo(t *testing.T) {
ng := &NodeGroup{
serverConfig: ServerConfig{
Ram: "1024",
Cpu: "5D",
Disks: []string{"size=50"},
},
}
nodeInfo, err := ng.TemplateNodeInfo()
assert.NoError(t, err)
assert.Equal(t, nodeInfo.Node().Status.Capacity, apiv1.ResourceList{
apiv1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
apiv1.ResourceCPU: *resource.NewQuantity(int64(5), resource.DecimalSI),
apiv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*1024*1024), resource.DecimalSI),
apiv1.ResourceStorage: *resource.NewQuantity(int64(50*1024*1024*1024), resource.DecimalSI),
})
}
func TestNodeGroup_Others(t *testing.T) {
client := kamateraClientMock{}
mgr := manager{
client: &client,
instances: make(map[string]*Instance),
}
serverName1 := mockKamateraServerName()
serverName2 := mockKamateraServerName()
serverName3 := mockKamateraServerName()
ng := NodeGroup{
id: "ng1",
minSize: 1,
maxSize: 7,
instances: map[string]*Instance{
serverName1: {Id: serverName1, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
serverName2: {Id: serverName2, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
serverName3: {Id: serverName3, Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}},
},
manager: &mgr,
}
assert.Equal(t, 1, ng.MinSize())
assert.Equal(t, 7, ng.MaxSize())
ts, err := ng.TargetSize()
assert.NoError(t, err)
assert.Equal(t, 3, ts)
assert.Equal(t, "ng1", ng.Id())
assert.Equal(t, "node group ID: ng1 (min:1 max:7)", ng.Debug())
assert.Equal(t, fmt.Sprintf(`node group ID: ng1 (min:1 max:7)
instance ID: %s state: Running powerOn: false
instance ID: %s state: Running powerOn: false
instance ID: %s state: Running powerOn: false`, serverName1, serverName2, serverName3), ng.extendedDebug())
assert.Equal(t, true, ng.Exist())
assert.Equal(t, false, ng.Autoprovisioned())
_, err = ng.Create()
assert.Error(t, err)
assert.Equal(t, "Not implemented", err.Error())
err = ng.Delete()
assert.Error(t, err)
assert.Equal(t, "Not implemented", err.Error())
}

View File

@ -0,0 +1,175 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Functions to handle Kamatera API calls
Copied from the Kamatera terraform provider:
https://github.com/Kamatera/terraform-provider-kamatera/blob/master/kamatera/request.go
*/
package kamatera
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"k8s.io/klog/v2"
"net/http"
"strings"
"time"
)
// ProviderConfig is the configuration for the Kamatera cloud provider
type ProviderConfig struct {
ApiUrl string
ApiClientID string
ApiSecret string
}
func request(ctx context.Context, provider ProviderConfig, method string, path string, body interface{}) (interface{}, error) {
buf := new(bytes.Buffer)
if body != nil {
if err := json.NewEncoder(buf).Encode(body); err != nil {
return nil, err
}
}
path = strings.TrimPrefix(path, "/")
url := fmt.Sprintf("%s/%s", provider.ApiUrl, path)
klog.V(2).Infof("kamatera request: %s %s %s", method, url, buf.String())
req, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s/%s", provider.ApiUrl, path), buf)
if err != nil {
return nil, err
}
req.Header.Add("AuthClientId", provider.ApiClientID)
req.Header.Add("AuthSecret", provider.ApiSecret)
req.Header.Add("Accept", "application/json")
req.Header.Add("Content-Type", "application/json")
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
var result interface{}
err = json.NewDecoder(res.Body).Decode(&result)
if err != nil {
if res.StatusCode != 200 {
return nil, fmt.Errorf("bad status code from Kamatera API: %d", res.StatusCode)
}
return nil, fmt.Errorf("invalid response from Kamatera API: %+v", result)
}
if res.StatusCode != 200 {
return nil, fmt.Errorf("error response from Kamatera API (%d): %+v", res.StatusCode, result)
}
return result, nil
}
func waitCommand(ctx context.Context, provider ProviderConfig, commandID string) (map[string]interface{}, error) {
startTime := time.Now()
time.Sleep(2 * time.Second)
for {
if startTime.Add(40*time.Minute).Sub(time.Now()) < 0 {
return nil, errors.New("timeout waiting for Kamatera command to complete")
}
time.Sleep(2 * time.Second)
result, e := request(ctx, provider, "GET", fmt.Sprintf("/service/queue?id=%s", commandID), nil)
if e != nil {
return nil, e
}
commands := result.([]interface{})
if len(commands) != 1 {
return nil, errors.New("invalid response from Kamatera queue API: invalid number of command responses")
}
command := commands[0].(map[string]interface{})
status, hasStatus := command["status"]
if hasStatus {
switch status.(string) {
case "complete":
return command, nil
case "error":
log, hasLog := command["log"]
if hasLog {
return nil, fmt.Errorf("kamatera command failed: %s", log)
}
return nil, fmt.Errorf("kamatera command failed: %v", command)
}
}
}
}
func waitCommands(ctx context.Context, provider ProviderConfig, commandIds map[string]string) (map[string]interface{}, error) {
startTime := time.Now()
time.Sleep(2 * time.Second)
commandIdsResults := make(map[string]interface{})
for id := range commandIds {
commandIdsResults[id] = nil
}
for {
if startTime.Add((40)*time.Minute).Sub(time.Now()) < 0 {
return nil, errors.New("timeout waiting for Kamatera commands to complete")
}
time.Sleep(2 * time.Second)
for id, result := range commandIdsResults {
if result == nil {
commandId := commandIds[id]
result, e := request(ctx, provider, "GET", fmt.Sprintf("/service/queue?id=%s", commandId), nil)
if e != nil {
return nil, e
}
commands := result.([]interface{})
if len(commands) != 1 {
return nil, errors.New("invalid response from Kamatera queue API: invalid number of command responses")
}
command := commands[0].(map[string]interface{})
status, hasStatus := command["status"]
if hasStatus {
switch status.(string) {
case "complete":
commandIdsResults[id] = command
break
case "error":
log, hasLog := command["log"]
if hasLog {
return nil, fmt.Errorf("kamatera command failed: %s", log)
}
return nil, fmt.Errorf("kamatera command failed: %v", command)
}
}
}
}
numComplete := 0
for _, result := range commandIdsResults {
if result != nil {
numComplete++
}
}
if numComplete == len(commandIds) {
return commandIdsResults, nil
}
}
}

View File

@ -0,0 +1,25 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
// Server contains information about a Kamatera server, as fetched from the API
// It is used by the manager to keep track of the servers in the cluster.
type Server struct {
Name string
Tags []string
PowerOn bool
}

View File

@ -0,0 +1,37 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
// ServerConfig struct for Kamatera server config
type ServerConfig struct {
NamePrefix string
Password string
SshKey string
Datacenter string
Image string
Cpu string
Ram string
Disks []string
Dailybackup bool
Managed bool
Networks []string
BillingCycle string
MonthlyPackage string
ScriptFile string
UserdataFile string
Tags []string
}

View File

@ -0,0 +1,73 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kamatera
import (
"context"
"encoding/hex"
"fmt"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/mock"
)
func mockKamateraServerName() string {
return fmt.Sprintf("%s", hex.EncodeToString(uuid.NewV4().Bytes()))
}
func mockServerConfig(namePrefix string, tags []string) ServerConfig {
return ServerConfig{
NamePrefix: namePrefix,
Password: "",
SshKey: "",
Datacenter: "IL",
Image: "ubuntu_server_18.04_64-bit",
Cpu: "1A",
Ram: "1024",
Disks: []string{"size=10"},
Dailybackup: false,
Managed: false,
Networks: []string{"name=wan,ip=auto"},
BillingCycle: "hourly",
MonthlyPackage: "",
ScriptFile: "#!/bin/bash",
UserdataFile: "",
Tags: tags,
}
}
type kamateraClientMock struct {
mock.Mock
}
func (c *kamateraClientMock) SetBaseURL(baseURL string) {
c.Called(baseURL)
}
func (c *kamateraClientMock) ListServers(ctx context.Context, instances map[string]*Instance) ([]Server, error) {
args := c.Called(ctx, instances)
return args.Get(0).([]Server), args.Error(1)
}
func (c *kamateraClientMock) CreateServers(ctx context.Context, count int, config ServerConfig) ([]Server, error) {
args := c.Called(ctx, count, config)
return args.Get(0).([]Server), args.Error(1)
}
func (c *kamateraClientMock) DeleteServer(ctx context.Context, id string) error {
args := c.Called(ctx, id)
return args.Error(0)
}