support gossip for AliCloud

This commit is contained in:
LilyFaFa 2019-01-09 21:30:48 +08:00 committed by Lily
parent ae6cda7bb1
commit 12d54b6a1f
11 changed files with 932 additions and 1 deletions

4
Gopkg.lock generated
View File

@ -290,12 +290,13 @@
[[projects]]
branch = "master"
digest = "1:c208e46a2c410d62b3e5c97f1f1db797d8d0862307b7800c4783dac5be1c83fa"
digest = "1:2beb133a93078255768c2445096e4a549a845c15c0ce7614f8c8aa35b43c0850"
name = "github.com/denverdino/aliyungo"
packages = [
"common",
"ecs",
"ess",
"metadata",
"oss",
"ram",
"slb",
@ -2217,6 +2218,7 @@
"github.com/denverdino/aliyungo/common",
"github.com/denverdino/aliyungo/ecs",
"github.com/denverdino/aliyungo/ess",
"github.com/denverdino/aliyungo/metadata",
"github.com/denverdino/aliyungo/oss",
"github.com/denverdino/aliyungo/ram",
"github.com/denverdino/aliyungo/slb",

View File

@ -138,6 +138,7 @@ k8s.io/kops/pkg/values
k8s.io/kops/protokube/cmd/protokube
k8s.io/kops/protokube/pkg/etcd
k8s.io/kops/protokube/pkg/gossip
k8s.io/kops/protokube/pkg/gossip/ali
k8s.io/kops/protokube/pkg/gossip/aws
k8s.io/kops/protokube/pkg/gossip/dns
k8s.io/kops/protokube/pkg/gossip/dns/hosts

View File

@ -165,6 +165,21 @@ func (i *Installation) buildSystemdJob() *nodetasks.Service {
buffer.WriteString("\" ")
}
if os.Getenv("OSS_REGION") != "" {
buffer.WriteString("\"OSS_REGION=")
buffer.WriteString(os.Getenv("OSS_REGION"))
buffer.WriteString("\" ")
}
if os.Getenv("ALIYUN_ACCESS_KEY_ID") != "" {
buffer.WriteString("\"ALIYUN_ACCESS_KEY_ID=")
buffer.WriteString(os.Getenv("ALIYUN_ACCESS_KEY_ID"))
buffer.WriteString("\" ")
buffer.WriteString("\"ALIYUN_ACCESS_KEY_SECRET=")
buffer.WriteString(os.Getenv("ALIYUN_ACCESS_KEY_SECRET"))
buffer.WriteString("\" ")
}
if buffer.String() != "" {
manifest.Set("Service", "Environment", buffer.String())
}

View File

@ -438,6 +438,25 @@ func (t *ProtokubeBuilder) ProtokubeEnvironmentVariables() string {
buffer.WriteString(" ")
}
if os.Getenv("OSS_REGION") != "" {
buffer.WriteString(" ")
buffer.WriteString("-e 'OSS_REGION=")
buffer.WriteString(os.Getenv("OSS_REGION"))
buffer.WriteString("'")
buffer.WriteString(" ")
}
if os.Getenv("ALIYUN_ACCESS_KEY_ID") != "" {
buffer.WriteString(" ")
buffer.WriteString("-e 'ALIYUN_ACCESS_KEY_ID=")
buffer.WriteString(os.Getenv("ALIYUN_ACCESS_KEY_ID"))
buffer.WriteString("'")
buffer.WriteString(" -e 'ALIYUN_ACCESS_KEY_SECRET=")
buffer.WriteString(os.Getenv("ALIYUN_ACCESS_KEY_SECRET"))
buffer.WriteString("'")
buffer.WriteString(" ")
}
t.writeProxyEnvVars(&buffer)
return buffer.String()

View File

@ -195,6 +195,21 @@ func run() error {
clusterID = osVolumes.ClusterID()
}
} else if cloud == "alicloud" {
glog.Info("Initializing AliCloud volumes")
aliVolumes, err := protokube.NewALIVolumes()
if err != nil {
glog.Errorf("Error initializing Aliyun: %q", err)
os.Exit(1)
}
volumes = aliVolumes
if clusterID == "" {
clusterID = aliVolumes.ClusterID()
}
if internalIP == nil {
internalIP = aliVolumes.InternalIP()
}
} else {
glog.Errorf("Unknown cloud %q", cloud)
os.Exit(1)
@ -257,6 +272,12 @@ func run() error {
return err
}
gossipName = volumes.(*protokube.OpenstackVolumes).InstanceName()
} else if cloud == "alicloud" {
gossipSeeds, err = volumes.(*protokube.ALIVolumes).GossipSeeds()
if err != nil {
return err
}
gossipName = volumes.(*protokube.ALIVolumes).InstanceID()
} else {
glog.Fatalf("seed provider for %q not yet implemented", cloud)
}

View File

@ -0,0 +1,13 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["seeds.go"],
importpath = "k8s.io/kops/protokube/pkg/gossip/ali",
visibility = ["//visibility:public"],
deps = [
"//protokube/pkg/gossip:go_default_library",
"//vendor/github.com/denverdino/aliyungo/common:go_default_library",
"//vendor/github.com/denverdino/aliyungo/ecs:go_default_library",
],
)

View File

@ -0,0 +1,79 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ali
import (
"github.com/denverdino/aliyungo/common"
"github.com/denverdino/aliyungo/ecs"
"k8s.io/kops/protokube/pkg/gossip"
)
type SeedProvider struct {
ecs *ecs.Client
region string
tag map[string]string
}
var _ gossip.SeedProvider = &SeedProvider{}
func (p *SeedProvider) GetSeeds() ([]string, error) {
var seeds []string
// We could query at most 50 instances at a time on Aliyun ECS
maxPageSize := 50
args := &ecs.DescribeInstancesArgs{
// TODO: pending? starting?
Status: ecs.Running,
RegionId: common.Region(p.region),
Pagination: common.Pagination{
PageNumber: 1,
PageSize: maxPageSize,
},
Tag: p.tag,
}
var instances []ecs.InstanceAttributesType
for {
resp, page, err := p.ecs.DescribeInstances(args)
if err != nil {
return nil, err
}
instances = append(instances, resp...)
if page.NextPage() == nil {
break
}
args.Pagination = *(page.NextPage())
}
for _, instance := range instances {
// TODO: Multiple IP addresses?
for _, ip := range instance.VpcAttributes.PrivateIpAddress.IpAddress {
seeds = append(seeds, ip)
}
}
return seeds, nil
}
func NewSeedProvider(c *ecs.Client, region string, tag map[string]string) (*SeedProvider, error) {
return &SeedProvider{
ecs: c,
region: region,
tag: tag,
}, nil
}

View File

@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"ali_volume.go",
"aws_volume.go",
"baremetal_volume.go",
"channels.go",
@ -35,10 +36,12 @@ go_library(
"//pkg/resources/digitalocean:go_default_library",
"//protokube/pkg/etcd:go_default_library",
"//protokube/pkg/gossip:go_default_library",
"//protokube/pkg/gossip/ali:go_default_library",
"//protokube/pkg/gossip/aws:go_default_library",
"//protokube/pkg/gossip/dns:go_default_library",
"//protokube/pkg/gossip/gce:go_default_library",
"//protokube/pkg/gossip/openstack:go_default_library",
"//upup/pkg/fi/cloudup/aliup:go_default_library",
"//upup/pkg/fi/cloudup/awsup:go_default_library",
"//upup/pkg/fi/cloudup/gce:go_default_library",
"//upup/pkg/fi/cloudup/openstack:go_default_library",
@ -50,6 +53,9 @@ go_library(
"//vendor/github.com/aws/aws-sdk-go/aws/request:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/session:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library",
"//vendor/github.com/denverdino/aliyungo/common:go_default_library",
"//vendor/github.com/denverdino/aliyungo/ecs:go_default_library",
"//vendor/github.com/denverdino/aliyungo/metadata:go_default_library",
"//vendor/github.com/digitalocean/godo:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/gophercloud/gophercloud/openstack/blockstorage/v2/volumes:go_default_library",

View File

@ -0,0 +1,322 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package protokube
import (
"fmt"
"net"
"net/http"
"os"
"strings"
"github.com/denverdino/aliyungo/common"
"github.com/denverdino/aliyungo/ecs"
"github.com/denverdino/aliyungo/metadata"
"github.com/golang/glog"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/protokube/pkg/gossip"
gossipali "k8s.io/kops/protokube/pkg/gossip/ali"
"k8s.io/kops/upup/pkg/fi/cloudup/aliup"
)
// ALIVolumes is the Volumes implementation for Aliyun ECS
type ALIVolumes struct {
client *ecs.Client
clusterTag string
region string
zone string
instanceId string
internalIP net.IP
}
var _ Volumes = &ALIVolumes{}
func NewALIVolumes() (*ALIVolumes, error) {
accessKeyId := os.Getenv("ALIYUN_ACCESS_KEY_ID")
if accessKeyId == "" {
return nil, fmt.Errorf("error initialing ALIVolumes: ALIYUN_ACCESS_KEY_ID cannot be empty")
}
accessKeySecret := os.Getenv("ALIYUN_ACCESS_KEY_SECRET")
if accessKeySecret == "" {
return nil, fmt.Errorf("error initialing ALIVolumes: ALIYUN_ACCESS_KEY_SECRET cannot be empty")
}
ecsEndpoint := os.Getenv("ALIYUN_ECS_ENDPOINT")
if ecsEndpoint == "" {
// TODO: shall we raise error here?
ecsEndpoint = ecs.ECSDefaultEndpoint
}
client := ecs.NewClientWithEndpoint(ecsEndpoint, accessKeyId, accessKeySecret)
a := &ALIVolumes{
client: client,
}
err := a.discoverTags()
if err != nil {
return nil, err
}
return a, nil
}
// ClusterID implements Volumes ClusterID
func (a *ALIVolumes) ClusterID() string {
return a.clusterTag
}
// InstanceID implements Volumes InstanceID
func (a *ALIVolumes) InstanceID() string {
return a.instanceId
}
// InternalIP implements Volumes InternalIP
func (a *ALIVolumes) InternalIP() net.IP {
return a.internalIP
}
func (a *ALIVolumes) discoverTags() error {
metadataClient := metadata.NewMetaData(&http.Client{})
// Region
{
region, err := metadataClient.Region()
if err != nil {
return fmt.Errorf("error reading region from Aliyun: %v", err)
}
a.region = region
if a.region == "" {
return fmt.Errorf("region metadata was empty")
}
glog.Infof("Found region=%q", a.region)
}
// Zone
{
zone, err := metadataClient.Zone()
if err != nil {
return fmt.Errorf("error reading zone from Aliyun: %v", err)
}
a.zone = zone
if a.zone == "" {
return fmt.Errorf("zone metadata was empty")
}
glog.Infof("Found zone=%q", a.zone)
}
// Instance Name
{
instanceId, err := metadataClient.InstanceID()
if err != nil {
return fmt.Errorf("error reading instance ID from Aliyun: %v", err)
}
a.instanceId = instanceId
if a.instanceId == "" {
return fmt.Errorf("instance ID metadata was empty")
}
glog.Infof("Found instanceId=%q", a.instanceId)
}
// Internal IP
{
internalIP, err := metadataClient.PrivateIPv4()
if err != nil {
return fmt.Errorf("error querying InternalIP from Aliyun: %v", err)
}
if internalIP == "" {
return fmt.Errorf("InternalIP from metadata was empty")
}
a.internalIP = net.ParseIP(internalIP)
if a.internalIP == nil {
return fmt.Errorf("InternalIP from metadata was not parseable(%q)", internalIP)
}
glog.Infof("Found internalIP=%q", a.internalIP)
}
// Cluster Tag
{
describeTagsArgs := &ecs.DescribeTagsArgs{
RegionId: common.Region(a.region),
ResourceType: ecs.TagResourceInstance,
ResourceId: a.instanceId,
}
result, _, err := a.client.DescribeTags(describeTagsArgs)
if err != nil {
return fmt.Errorf("error querying Aliyun instance tags: %v", err)
}
for _, tag := range result {
if tag.TagKey == aliup.TagClusterName {
a.clusterTag = tag.TagValue
}
}
if a.clusterTag == "" {
return fmt.Errorf("cluster tag metadata was empty")
}
}
return nil
}
// AttachVolume attaches the specified volume to this instance, returning the mountpoint & nil if successful
func (a *ALIVolumes) AttachVolume(volume *Volume) error {
// TODO: what if this volume has already been attached to another instance?
// Aliyun Disk can only be attached to one instance
if volume.LocalDevice == "" && volume.AttachedTo == "" {
attachDiskArgs := &ecs.AttachDiskArgs{
InstanceId: a.instanceId,
DiskId: volume.ID,
// TODO: DeleteWithInstance?
}
err := a.client.AttachDisk(attachDiskArgs)
if err != nil {
return fmt.Errorf("error attach disk %q: %v", volume.ID, err)
}
// TODO: Do we have to wait for attach to complete?
// retrieve device info
args := &ecs.DescribeDisksArgs{
RegionId: common.Region(a.region),
ZoneId: a.zone,
DiskIds: []string{volume.ID},
}
disks, _, err := a.client.DescribeDisks(args)
if err != nil || len(disks) == 0 {
return fmt.Errorf("error querying Aliyun disk %q: %v", volume.ID, err)
}
volume.LocalDevice = disks[0].Device
volume.AttachedTo = a.instanceId
} else if volume.AttachedTo != a.instanceId {
return fmt.Errorf("cannot reattach an attached disk without detaching it first")
}
return nil
}
func (a *ALIVolumes) FindVolumes() ([]*Volume, error) {
glog.V(2).Infof("Listing Aliyun disks in %s", a.zone)
var volumes []*Volume
var disks []ecs.DiskItemType
// We could query at most 50 disks at a time on Aliyun ECS
maxPageSize := 50
tags := make(map[string]string)
tags[aliup.TagClusterName] = a.clusterTag
tags[aliup.TagNameRolePrefix+"master"] = "1"
args := &ecs.DescribeDisksArgs{
RegionId: common.Region(a.region),
ZoneId: a.zone,
Tag: tags,
Pagination: common.Pagination{
PageNumber: 1,
PageSize: maxPageSize,
},
}
for {
resp, page, err := a.client.DescribeDisks(args)
if err != nil {
return nil, fmt.Errorf("error querying Aliyun disks: %v", err)
}
disks = append(disks, resp...)
if page.NextPage() == nil {
break
}
args.Pagination = *(page.NextPage())
}
for _, disk := range disks {
volume := &Volume{
ID: disk.DiskId,
Info: VolumeInfo{
Description: disk.Description,
},
Status: string(disk.Status),
AttachedTo: disk.InstanceId,
}
if volume.AttachedTo == a.instanceId {
volume.LocalDevice = disk.Device
}
describeTagsArgs := &ecs.DescribeTagsArgs{
RegionId: common.Region(a.region),
ResourceType: ecs.TagResourceDisk,
ResourceId: disk.DiskId,
}
result, _, err := a.client.DescribeTags(describeTagsArgs)
if err != nil {
return nil, fmt.Errorf("error querying Aliyun disk tags: %v", err)
}
skipVolume := false
for _, tag := range result {
switch tag.TagKey {
case aliup.TagClusterName:
{
// Ignore
}
default:
if strings.HasPrefix(tag.TagKey, aliup.TagNameEtcdClusterPrefix) {
etcdClusterName := strings.TrimPrefix(tag.TagKey, aliup.TagNameEtcdClusterPrefix)
spec, err := etcd.ParseEtcdClusterSpec(etcdClusterName, tag.TagValue)
if err != nil {
// Fail safe
glog.Warningf("error parsing etcd cluster tag %q on volume %q; skipping volume: %v", tag.TagValue, volume.ID, err)
skipVolume = true
}
volume.Info.EtcdClusters = append(volume.Info.EtcdClusters, spec)
} else if strings.HasPrefix(tag.TagKey, aliup.TagNameRolePrefix) {
// Ignore
} else {
glog.Warningf("unknown tag on volume %q: %s=%s", volume.ID, tag.TagKey, tag.TagValue)
}
}
}
if !skipVolume {
volumes = append(volumes, volume)
}
}
return volumes, nil
}
// FindMountedVolume implements Volumes::FindMountedVolume
func (a *ALIVolumes) FindMountedVolume(volume *Volume) (string, error) {
device := volume.LocalDevice
_, err := os.Stat(pathFor(device))
if err == nil {
return device, nil
}
if os.IsNotExist(err) {
if strings.HasPrefix(device, "/dev/xvd") {
device = "/dev/vd" + strings.TrimPrefix(device, "/dev/xvd")
_, err = os.Stat(pathFor(device))
return device, err
} else if strings.HasPrefix(device, "/dev/vd") {
device = "/dev/xvd" + strings.TrimPrefix(device, "/dev/vd")
_, err = os.Stat(pathFor(device))
return device, err
}
return "", nil
}
return "", fmt.Errorf("error checking for device %q: %v", device, err)
}
func (a *ALIVolumes) GossipSeeds() (gossip.SeedProvider, error) {
tags := make(map[string]string)
tags[aliup.TagClusterName] = a.clusterTag
return gossipali.NewSeedProvider(a.client, a.region, tags)
}

View File

@ -0,0 +1,10 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["client.go"],
importmap = "k8s.io/kops/vendor/github.com/denverdino/aliyungo/metadata",
importpath = "github.com/denverdino/aliyungo/metadata",
visibility = ["//visibility:public"],
deps = ["//vendor/github.com/denverdino/aliyungo/util:go_default_library"],
)

View File

@ -0,0 +1,443 @@
package metadata
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"time"
"encoding/json"
"reflect"
"github.com/denverdino/aliyungo/util"
)
const (
ENDPOINT = "http://100.100.100.200"
META_VERSION_LATEST = "latest"
RS_TYPE_META_DATA = "meta-data"
RS_TYPE_USER_DATA = "user-data"
DNS_NAMESERVERS = "dns-conf/nameservers"
EIPV4 = "eipv4"
HOSTNAME = "hostname"
IMAGE_ID = "image-id"
INSTANCE_ID = "instance-id"
MAC = "mac"
NETWORK_TYPE = "network-type"
NTP_CONF_SERVERS = "ntp-conf/ntp-servers"
OWNER_ACCOUNT_ID = "owner-account-id"
PRIVATE_IPV4 = "private-ipv4"
REGION = "region-id"
SERIAL_NUMBER = "serial-number"
SOURCE_ADDRESS = "source-address"
VPC_CIDR_BLOCK = "vpc-cidr-block"
VPC_ID = "vpc-id"
VSWITCH_CIDR_BLOCK = "vswitch-cidr-block"
VSWITCH_ID = "vswitch-id"
ZONE = "zone-id"
RAM_SECURITY = "Ram/security-credentials"
)
type IMetaDataRequest interface {
Version(version string) IMetaDataRequest
ResourceType(rtype string) IMetaDataRequest
Resource(resource string) IMetaDataRequest
SubResource(sub string) IMetaDataRequest
Url() (string, error)
Do(api interface{}) error
}
type MetaData struct {
// mock for unit test.
mock requestMock
client *http.Client
}
func NewMetaData(client *http.Client) *MetaData {
if client == nil {
client = &http.Client{}
}
return &MetaData{
client: client,
}
}
func NewMockMetaData(client *http.Client, sendRequest requestMock) *MetaData {
if client == nil {
client = &http.Client{}
}
return &MetaData{
client: client,
mock: sendRequest,
}
}
func (m *MetaData) New() *MetaDataRequest {
return &MetaDataRequest{
client: m.client,
sendRequest: m.mock,
}
}
func (m *MetaData) HostName() (string, error) {
var hostname ResultList
err := m.New().Resource(HOSTNAME).Do(&hostname)
if err != nil {
return "", err
}
return hostname.result[0], nil
}
func (m *MetaData) ImageID() (string, error) {
var image ResultList
err := m.New().Resource(IMAGE_ID).Do(&image)
if err != nil {
return "", err
}
return image.result[0], err
}
func (m *MetaData) InstanceID() (string, error) {
var instanceid ResultList
err := m.New().Resource(INSTANCE_ID).Do(&instanceid)
if err != nil {
return "", err
}
return instanceid.result[0], err
}
func (m *MetaData) Mac() (string, error) {
var mac ResultList
err := m.New().Resource(MAC).Do(&mac)
if err != nil {
return "", err
}
return mac.result[0], nil
}
func (m *MetaData) NetworkType() (string, error) {
var network ResultList
err := m.New().Resource(NETWORK_TYPE).Do(&network)
if err != nil {
return "", err
}
return network.result[0], nil
}
func (m *MetaData) OwnerAccountID() (string, error) {
var owner ResultList
err := m.New().Resource(OWNER_ACCOUNT_ID).Do(&owner)
if err != nil {
return "", err
}
return owner.result[0], nil
}
func (m *MetaData) PrivateIPv4() (string, error) {
var private ResultList
err := m.New().Resource(PRIVATE_IPV4).Do(&private)
if err != nil {
return "", err
}
return private.result[0], nil
}
func (m *MetaData) Region() (string, error) {
var region ResultList
err := m.New().Resource(REGION).Do(&region)
if err != nil {
return "", err
}
return region.result[0], nil
}
func (m *MetaData) SerialNumber() (string, error) {
var serial ResultList
err := m.New().Resource(SERIAL_NUMBER).Do(&serial)
if err != nil {
return "", err
}
return serial.result[0], nil
}
func (m *MetaData) SourceAddress() (string, error) {
var source ResultList
err := m.New().Resource(SOURCE_ADDRESS).Do(&source)
if err != nil {
return "", err
}
return source.result[0], nil
}
func (m *MetaData) VpcCIDRBlock() (string, error) {
var vpcCIDR ResultList
err := m.New().Resource(VPC_CIDR_BLOCK).Do(&vpcCIDR)
if err != nil {
return "", err
}
return vpcCIDR.result[0], err
}
func (m *MetaData) VpcID() (string, error) {
var vpcId ResultList
err := m.New().Resource(VPC_ID).Do(&vpcId)
if err != nil {
return "", err
}
return vpcId.result[0], err
}
func (m *MetaData) VswitchCIDRBlock() (string, error) {
var cidr ResultList
err := m.New().Resource(VSWITCH_CIDR_BLOCK).Do(&cidr)
if err != nil {
return "", err
}
return cidr.result[0], err
}
func (m *MetaData) VswitchID() (string, error) {
var vswithcid ResultList
err := m.New().Resource(VSWITCH_ID).Do(&vswithcid)
if err != nil {
return "", err
}
return vswithcid.result[0], err
}
func (m *MetaData) EIPv4() (string, error) {
var eip ResultList
err := m.New().Resource(EIPV4).Do(&eip)
if err != nil {
return "", err
}
return eip.result[0], nil
}
func (m *MetaData) DNSNameServers() ([]string, error) {
var data ResultList
err := m.New().Resource(DNS_NAMESERVERS).Do(&data)
if err != nil {
return []string{}, err
}
return data.result, nil
}
func (m *MetaData) NTPConfigServers() ([]string, error) {
var data ResultList
err := m.New().Resource(NTP_CONF_SERVERS).Do(&data)
if err != nil {
return []string{}, err
}
return data.result, nil
}
func (m *MetaData) Zone() (string, error) {
var zone ResultList
err := m.New().Resource(ZONE).Do(&zone)
if err != nil {
return "", err
}
return zone.result[0], nil
}
func (m *MetaData) RoleName() (string, error) {
var roleName ResultList
err := m.New().Resource("ram/security-credentials/").Do(&roleName)
if err != nil {
return "", err
}
return roleName.result[0], nil
}
func (m *MetaData) RamRoleToken(role string) (RoleAuth, error) {
var roleauth RoleAuth
err := m.New().Resource(RAM_SECURITY).SubResource(role).Do(&roleauth)
if err != nil {
return RoleAuth{}, err
}
return roleauth, nil
}
type requestMock func(resource string) (string, error)
//
type MetaDataRequest struct {
version string
resourceType string
resource string
subResource string
client *http.Client
sendRequest requestMock
}
func (vpc *MetaDataRequest) Version(version string) IMetaDataRequest {
vpc.version = version
return vpc
}
func (vpc *MetaDataRequest) ResourceType(rtype string) IMetaDataRequest {
vpc.resourceType = rtype
return vpc
}
func (vpc *MetaDataRequest) Resource(resource string) IMetaDataRequest {
vpc.resource = resource
return vpc
}
func (vpc *MetaDataRequest) SubResource(sub string) IMetaDataRequest {
vpc.subResource = sub
return vpc
}
var retry = util.AttemptStrategy{
Min: 5,
Total: 5 * time.Second,
Delay: 200 * time.Millisecond,
}
func (vpc *MetaDataRequest) Url() (string, error) {
if vpc.version == "" {
vpc.version = "latest"
}
if vpc.resourceType == "" {
vpc.resourceType = "meta-data"
}
if vpc.resource == "" {
return "", errors.New("the resource you want to visit must not be nil!")
}
r := fmt.Sprintf("%s/%s/%s/%s", ENDPOINT, vpc.version, vpc.resourceType, vpc.resource)
if vpc.subResource == "" {
return r, nil
}
return fmt.Sprintf("%s/%s", r, vpc.subResource), nil
}
func (vpc *MetaDataRequest) Do(api interface{}) (err error) {
var res = ""
for r := retry.Start(); r.Next(); {
if vpc.sendRequest != nil {
res, err = vpc.sendRequest(vpc.resource)
} else {
res, err = vpc.send()
}
if !shouldRetry(err) {
break
}
}
if err != nil {
return err
}
return vpc.Decode(res, api)
}
func (vpc *MetaDataRequest) Decode(data string, api interface{}) error {
if data == "" {
url, _ := vpc.Url()
return errors.New(fmt.Sprintf("metadata: alivpc decode data must not be nil. url=[%s]\n", url))
}
switch api.(type) {
case *ResultList:
api.(*ResultList).result = strings.Split(data, "\n")
return nil
case *RoleAuth:
return json.Unmarshal([]byte(data), api)
default:
return errors.New(fmt.Sprintf("metadata: unknow type to decode, type=%s\n", reflect.TypeOf(api)))
}
}
func (vpc *MetaDataRequest) send() (string, error) {
url, err := vpc.Url()
if err != nil {
return "", err
}
requ, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return "", err
}
resp, err := vpc.client.Do(requ)
if err != nil {
return "", err
}
if resp.StatusCode != 200 {
return "", fmt.Errorf("Aliyun Metadata API Error: Status Code: %d", resp.StatusCode)
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(data), nil
}
type TimeoutError interface {
error
Timeout() bool // Is the error a timeout?
}
func shouldRetry(err error) bool {
if err == nil {
return false
}
_, ok := err.(TimeoutError)
if ok {
return true
}
switch err {
case io.ErrUnexpectedEOF, io.EOF:
return true
}
switch e := err.(type) {
case *net.DNSError:
return true
case *net.OpError:
switch e.Op {
case "read", "write":
return true
}
case *url.Error:
// url.Error can be returned either by net/url if a URL cannot be
// parsed, or by net/http if the response is closed before the headers
// are received or parsed correctly. In that later case, e.Op is set to
// the HTTP method name with the first letter uppercased. We don't want
// to retry on POST operations, since those are not idempotent, all the
// other ones should be safe to retry.
switch e.Op {
case "Get", "Put", "Delete", "Head":
return shouldRetry(e.Err)
default:
return false
}
}
return false
}
type ResultList struct {
result []string
}
type RoleAuth struct {
AccessKeyId string
AccessKeySecret string
Expiration time.Time
SecurityToken string
LastUpdated time.Time
Code string
}