kops/upup/pkg/kutil/import_cluster.go

848 lines
24 KiB
Go

/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kutil
import (
"bytes"
"compress/gzip"
"encoding/base64"
"fmt"
"io"
"strconv"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/apis/kops/registry"
"k8s.io/kops/pkg/client/simple"
"k8s.io/kops/pkg/pki"
"k8s.io/kops/pkg/resources"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
)
// ImportCluster tries to reverse engineer an existing k8s cluster, adding it to the cluster registry
type ImportCluster struct {
ClusterName string
Cloud fi.Cloud
Clientset simple.Clientset
}
func (x *ImportCluster) ImportAWSCluster() error {
awsCloud := x.Cloud.(awsup.AWSCloud)
clusterName := x.ClusterName
if clusterName == "" {
return fmt.Errorf("ClusterName must be specified")
}
var instanceGroups []*kops.InstanceGroup
cluster := &kops.Cluster{}
cluster.ObjectMeta.Annotations = make(map[string]string)
// This annotation relaxes some validation (e.g. cluster name as full-dns name)
cluster.ObjectMeta.Annotations[kops.AnnotationNameManagement] = kops.AnnotationValueManagementImported
cluster.Spec.CloudProvider = string(kops.CloudProviderAWS)
cluster.ObjectMeta.Name = clusterName
cluster.Spec.KubeControllerManager = &kops.KubeControllerManagerConfig{}
cluster.Spec.Channel = kops.DefaultChannel
cluster.Spec.KubernetesAPIAccess = []string{"0.0.0.0/0"}
cluster.Spec.SSHAccess = []string{"0.0.0.0/0"}
configBase, err := x.Clientset.ConfigBaseFor(cluster)
if err != nil {
return fmt.Errorf("error building ConfigBase for cluster: %v", err)
}
cluster.Spec.ConfigBase = configBase.Path()
channel, err := kops.LoadChannel(cluster.Spec.Channel)
if err != nil {
return err
}
instances, err := findInstances(awsCloud)
if err != nil {
return fmt.Errorf("error finding instances: %v", err)
}
var masterInstance *ec2.Instance
subnets := make(map[string]*kops.ClusterSubnetSpec)
for _, instance := range instances {
instanceState := aws.StringValue(instance.State.Name)
if instanceState != "terminated" && instance.Placement != nil {
zoneName := aws.StringValue(instance.Placement.AvailabilityZone)
// We name the subnet after the zone
subnetName := zoneName
subnet := subnets[subnetName]
if subnet == nil {
subnet = &kops.ClusterSubnetSpec{
Name: subnetName,
Zone: zoneName,
Type: kops.SubnetTypePublic,
}
subnets[subnetName] = subnet
}
subnetID := aws.StringValue(instance.SubnetId)
if subnetID != "" {
subnet.ProviderID = subnetID
}
}
role, _ := awsup.FindEC2Tag(instance.Tags, "Role")
if role == clusterName+"-master" {
if masterInstance != nil {
masterState := aws.StringValue(masterInstance.State.Name)
glog.Infof("Found multiple masters: %s and %s", masterState, instanceState)
if masterState == "terminated" && instanceState != "terminated" {
// OK
} else if instanceState == "terminated" && masterState != "terminated" {
// Ignore this one
continue
} else {
return fmt.Errorf("found multiple masters")
}
}
masterInstance = instance
}
}
if masterInstance == nil {
return fmt.Errorf("could not find master node")
}
masterInstanceID := aws.StringValue(masterInstance.InstanceId)
glog.Infof("Found master: %q", masterInstanceID)
masterGroup := &kops.InstanceGroup{}
masterGroup.Spec.Role = kops.InstanceGroupRoleMaster
masterGroup.Spec.MinSize = fi.Int32(1)
masterGroup.Spec.MaxSize = fi.Int32(1)
masterGroup.Spec.MachineType = aws.StringValue(masterInstance.InstanceType)
masterInstanceGroups := []*kops.InstanceGroup{masterGroup}
instanceGroups = append(instanceGroups, masterGroup)
awsSubnets, err := resources.DescribeSubnets(x.Cloud)
if err != nil {
return fmt.Errorf("error finding subnets: %v", err)
}
for _, s := range awsSubnets {
subnetID := aws.StringValue(s.SubnetId)
found := false
for _, subnet := range subnets {
if subnet.ProviderID == subnetID {
subnet.CIDR = aws.StringValue(s.CidrBlock)
found = true
}
}
if !found {
glog.Warningf("Ignoring subnet %q in which no instances were found", subnetID)
}
}
for k, subnet := range subnets {
if subnet.ProviderID == "" {
return fmt.Errorf("cannot find subnet %q. Please report this issue", k)
}
if subnet.CIDR == "" {
return fmt.Errorf("cannot find subnet %q. If you used an existing subnet, please tag it with %s=%s and retry the import", subnet.ProviderID, awsup.TagClusterName, clusterName)
}
}
vpcID := aws.StringValue(masterInstance.VpcId)
var vpc *ec2.Vpc
{
vpc, err = awsCloud.DescribeVPC(vpcID)
if err != nil {
return err
}
if vpc == nil {
return fmt.Errorf("cannot find vpc %q", vpcID)
}
}
cluster.Spec.NetworkID = vpcID
cluster.Spec.NetworkCIDR = aws.StringValue(vpc.CidrBlock)
for _, subnet := range subnets {
cluster.Spec.Subnets = append(cluster.Spec.Subnets, *subnet)
}
masterSubnet := subnets[aws.StringValue(masterInstance.Placement.AvailabilityZone)]
if masterSubnet == nil {
return fmt.Errorf("cannot find subnet %q for master. Please report this issue", aws.StringValue(masterInstance.Placement.AvailabilityZone))
}
masterGroup.Spec.Subnets = []string{masterSubnet.Name}
masterGroup.ObjectMeta.Name = "master-" + masterSubnet.Name
userData, err := GetInstanceUserData(awsCloud, aws.StringValue(masterInstance.InstanceId))
if err != nil {
return fmt.Errorf("error getting master user-data: %v", err)
}
conf, err := ParseUserDataConfiguration(userData)
if err != nil {
return fmt.Errorf("error parsing master user-data: %v", err)
}
//master := &NodeSSH{
// Hostname: c.Master,
//}
//err := master.AddSSHIdentity(c.SSHIdentity)
//if err != nil {
// return err
//}
//
//
//fmt.Printf("Connecting to node on %s\n", c.Node)
//
//node := &NodeSSH{
// Hostname: c.Node,
//}
//err = node.AddSSHIdentity(c.SSHIdentity)
//if err != nil {
// return err
//}
instancePrefix := conf.Settings["INSTANCE_PREFIX"]
if instancePrefix == "" {
return fmt.Errorf("cannot determine INSTANCE_PREFIX")
}
if instancePrefix != clusterName {
return fmt.Errorf("INSTANCE_PREFIX %q did not match cluster name %q", instancePrefix, clusterName)
}
//k8s.NodeMachineType, err = InstanceType(node)
//if err != nil {
// return fmt.Errorf("cannot determine node instance type: %v", err)
//}
// We want to upgrade!
// k8s.ImageId = ""
//clusterConfig.ClusterIPRange = conf.Settings["CLUSTER_IP_RANGE"]
cluster.Spec.KubeControllerManager.AllocateNodeCIDRs = conf.ParseBool("ALLOCATE_NODE_CIDRS")
//clusterConfig.KubeUser = conf.Settings["KUBE_USER"]
cluster.Spec.ServiceClusterIPRange = conf.Settings["SERVICE_CLUSTER_IP_RANGE"]
cluster.Spec.NonMasqueradeCIDR = conf.Settings["NON_MASQUERADE_CIDR"]
//clusterConfig.EnableClusterMonitoring = conf.Settings["ENABLE_CLUSTER_MONITORING"]
//clusterConfig.EnableClusterLogging = conf.ParseBool("ENABLE_CLUSTER_LOGGING")
//clusterConfig.EnableNodeLogging = conf.ParseBool("ENABLE_NODE_LOGGING")
//clusterConfig.LoggingDestination = conf.Settings["LOGGING_DESTINATION"]
//clusterConfig.ElasticsearchLoggingReplicas, err = parseInt(conf.Settings["ELASTICSEARCH_LOGGING_REPLICAS"])
//if err != nil {
// return fmt.Errorf("cannot parse ELASTICSEARCH_LOGGING_REPLICAS=%q: %v", conf.Settings["ELASTICSEARCH_LOGGING_REPLICAS"], err)
//}
//clusterConfig.EnableClusterDNS = conf.ParseBool("ENABLE_CLUSTER_DNS")
//clusterConfig.EnableClusterUI = conf.ParseBool("ENABLE_CLUSTER_UI")
//clusterConfig.DNSReplicas, err = parseInt(conf.Settings["DNS_REPLICAS"])
//if err != nil {
// return fmt.Errorf("cannot parse DNS_REPLICAS=%q: %v", conf.Settings["DNS_REPLICAS"], err)
//}
//clusterConfig.DNSServerIP = conf.Settings["DNS_SERVER_IP"]
cluster.Spec.ClusterDNSDomain = conf.Settings["DNS_DOMAIN"]
//clusterConfig.AdmissionControl = conf.Settings["ADMISSION_CONTROL"]
//clusterConfig.MasterIPRange = conf.Settings["MASTER_IP_RANGE"]
//clusterConfig.DNSServerIP = conf.Settings["DNS_SERVER_IP"]
//clusterConfig.DockerStorage = conf.Settings["DOCKER_STORAGE"]
//k8s.MasterExtraSans = conf.Settings["MASTER_EXTRA_SANS"] // Not user set
nodeGroup := &kops.InstanceGroup{}
nodeGroup.Spec.Role = kops.InstanceGroupRoleNode
nodeGroup.ObjectMeta.Name = "nodes"
for _, subnet := range subnets {
nodeGroup.Spec.Subnets = append(nodeGroup.Spec.Subnets, subnet.Name)
}
instanceGroups = append(instanceGroups, nodeGroup)
//primaryNodeSet.Spec.MinSize, err = conf.ParseInt("NUM_MINIONS")
//if err != nil {
// return fmt.Errorf("cannot parse NUM_MINIONS=%q: %v", conf.Settings["NUM_MINIONS"], err)
//}
{
groups, err := awsup.FindAutoscalingGroups(awsCloud, awsCloud.Tags())
if err != nil {
return fmt.Errorf("error listing autoscaling groups: %v", err)
}
if len(groups) == 0 {
glog.Warningf("No Autoscaling group found")
}
if len(groups) == 1 {
glog.Warningf("Multiple Autoscaling groups found")
}
minSize := int32(0)
maxSize := int32(0)
for _, group := range groups {
minSize += int32(aws.Int64Value(group.MinSize))
maxSize += int32(aws.Int64Value(group.MaxSize))
}
if minSize != 0 {
nodeGroup.Spec.MinSize = fi.Int32(minSize)
}
if maxSize != 0 {
nodeGroup.Spec.MaxSize = fi.Int32(maxSize)
}
// Determine the machine type
for _, group := range groups {
name := aws.StringValue(group.LaunchConfigurationName)
launchConfiguration, err := resources.FindAutoscalingLaunchConfiguration(awsCloud, name)
if err != nil {
return fmt.Errorf("error finding autoscaling LaunchConfiguration %q: %v", name, err)
}
if launchConfiguration == nil {
glog.Warningf("LaunchConfiguration %q not found; ignoring", name)
continue
}
nodeGroup.Spec.MachineType = aws.StringValue(launchConfiguration.InstanceType)
break
}
}
if conf.Version == "1.1" {
// If users went with defaults on some things, clear them out so they get the new defaults
//if clusterConfig.AdmissionControl == "NamespaceLifecycle,LimitRanger,SecurityContextDeny,ServiceAccount,ResourceQuota" {
// // More admission controllers in 1.2
// clusterConfig.AdmissionControl = ""
//}
if masterGroup.Spec.MachineType == "t2.micro" {
// Different defaults in 1.2
masterGroup.Spec.MachineType = ""
}
if nodeGroup.Spec.MachineType == "t2.micro" {
// Encourage users to pick something better...
nodeGroup.Spec.MachineType = ""
}
}
if conf.Version == "1.2" {
// If users went with defaults on some things, clear them out so they get the new defaults
//if clusterConfig.AdmissionControl == "NamespaceLifecycle,LimitRanger,ServiceAccount,PersistentVolumeLabel,ResourceQuota" {
// // More admission controllers in 1.2
// clusterConfig.AdmissionControl = ""
//}
}
for _, etcdClusterName := range []string{"main", "events"} {
etcdCluster := &kops.EtcdClusterSpec{
Name: etcdClusterName,
}
for _, ig := range masterInstanceGroups {
member := &kops.EtcdMemberSpec{
InstanceGroup: fi.String(ig.ObjectMeta.Name),
}
name := ig.ObjectMeta.Name
// We expect the IG to have a `master-` prefix, but this is both superfluous
// and not how we named things previously
name = strings.TrimPrefix(name, "master-")
member.Name = name
etcdCluster.Members = append(etcdCluster.Members, member)
}
cluster.Spec.EtcdClusters = append(cluster.Spec.EtcdClusters, etcdCluster)
}
//if masterInstance.PublicIpAddress != nil {
// eip, err := findElasticIP(cloud, *masterInstance.PublicIpAddress)
// if err != nil {
// return err
// }
// if eip != nil {
// k8s.MasterElasticIP = masterInstance.PublicIpAddress
// }
//}
//
//vpc, err := cloud.DescribeVPC(*k8s.VPCID)
//if err != nil {
// return err
//}
//k8s.DHCPOptionsID = vpc.DhcpOptionsId
//
//igw, err := findInternetGateway(cloud, *k8s.VPCID)
//if err != nil {
// return err
//}
//if igw == nil {
// return fmt.Errorf("unable to find internet gateway for VPC %q", k8s.VPCID)
//}
//k8s.InternetGatewayID = igw.InternetGatewayId
//
//rt, err := findRouteTable(cloud, *k8s.SubnetID)
//if err != nil {
// return err
//}
//if rt == nil {
// return fmt.Errorf("unable to find route table for Subnet %q", k8s.SubnetID)
//}
//k8s.RouteTableID = rt.RouteTableId
//b.Context = "aws_" + instancePrefix
keyStore, err := x.Clientset.KeyStore(cluster)
if err != nil {
return err
}
//caCert, err := masterSSH.Join("ca.crt").ReadFile()
caCert, err := conf.ParseCert("CA_CERT")
if err != nil {
return err
}
err = keyStore.AddCert(fi.CertificateId_CA, caCert)
if err != nil {
return err
}
////masterKey, err := masterSSH.Join("server.key").ReadFile()
//masterKey, err := conf.ParseKey("MASTER_KEY")
//if err != nil {
// return err
//}
////masterCert, err := masterSSH.Join("server.cert").ReadFile()
//masterCert, err := conf.ParseCert("MASTER_CERT")
//if err != nil {
// return err
//}
//err = keyStore.ImportKeypair("master", masterKey, masterCert)
//if err != nil {
// return err
//}
//
////kubeletKey, err := kubeletSSH.Join("kubelet.key").ReadFile()
//kubeletKey, err := conf.ParseKey("KUBELET_KEY")
//if err != nil {
// return err
//}
////kubeletCert, err := kubeletSSH.Join("kubelet.cert").ReadFile()
//kubeletCert, err := conf.ParseCert("KUBELET_CERT")
//if err != nil {
// return err
//}
//err = keyStore.ImportKeypair("kubelet", kubeletKey, kubeletCert)
//if err != nil {
// return err
//}
// We don't store the kubecfg key
//kubecfgKey, err := masterSSH.Join("kubecfg.key").ReadFile()
//if err != nil {
// return err
//}
//kubecfgCert, err := masterSSH.Join("kubecfg.cert").ReadFile()
//if err != nil {
// return err
//}
//err = keyStore.ImportKeypair("kubecfg", kubecfgKey, kubecfgCert)
//if err != nil {
// return err
//}
//// We will generate new tokens, but some of these are in existing API objects
//secretStore := x.StateStore.Secrets()
//kubePassword := conf.Settings["KUBE_PASSWORD"]
//kubeletToken = conf.Settings["KUBELET_TOKEN"]
//kubeProxyToken = conf.Settings["KUBE_PROXY_TOKEN"]
var fullInstanceGroups []*kops.InstanceGroup
for _, ig := range instanceGroups {
full, err := cloudup.PopulateInstanceGroupSpec(cluster, ig, channel)
if err != nil {
return err
}
fullInstanceGroups = append(fullInstanceGroups, full)
}
err = registry.CreateClusterConfig(x.Clientset, cluster, fullInstanceGroups)
if err != nil {
return err
}
// Note - we can't PopulateClusterSpec & WriteCompletedConfig, because the cluster doesn't have a valid DNS Name
//fullCluster, err := cloudup.PopulateClusterSpec(cluster, x.ClusterRegistry)
//if err != nil {
// return err
//}
//
//err = x.ClusterRegistry.WriteCompletedConfig(fullCluster)
//if err != nil {
// return fmt.Errorf("error writing completed cluster spec: %v", err)
//}
return nil
}
func parseInt(s string) (int, error) {
if s == "" {
return 0, nil
}
n, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, err
}
return int(n), nil
}
//func writeConf(p string, k8s *cloudup.CloudConfig) error {
// jsonBytes, err := json.Marshal(k8s)
// if err != nil {
// return fmt.Errorf("error serializing configuration (json write phase): %v", err)
// }
//
// var confObj interface{}
// err = yaml.Unmarshal(jsonBytes, &confObj)
// if err != nil {
// return fmt.Errorf("error serializing configuration (yaml read phase): %v", err)
// }
//
// m := confObj.(map[interface{}]interface{})
//
// for k, v := range m {
// if v == nil {
// delete(m, k)
// }
// s, ok := v.(string)
// if ok && s == "" {
// delete(m, k)
// }
// //glog.Infof("%v=%v", k, v)
// }
//
// yaml, err := yaml.Marshal(confObj)
// if err != nil {
// return fmt.Errorf("error serializing configuration (yaml write phase): %v", err)
// }
//
// err = ioutil.WriteFile(p, yaml, 0600)
// if err != nil {
// return fmt.Errorf("error writing configuration to file %q: %v", p, err)
// }
//
// return nil
//}
//
//func findInternetGateway(cloud awsup.AWSCloud, vpcID string) (*ec2.InternetGateway, error) {
// request := &ec2.DescribeInternetGatewaysInput{
// Filters: []*ec2.Filter{fi.NewEC2Filter("attachment.vpc-id", vpcID)},
// }
//
// response, err := cloud.EC2.DescribeInternetGateways(request)
// if err != nil {
// return nil, fmt.Errorf("error listing InternetGateways: %v", err)
// }
// if response == nil || len(response.InternetGateways) == 0 {
// return nil, nil
// }
//
// if len(response.InternetGateways) != 1 {
// return nil, fmt.Errorf("found multiple InternetGatewayAttachments to VPC")
// }
// igw := response.InternetGateways[0]
// return igw, nil
//}
//func findRouteTable(cloud awsup.AWSCloud, subnetID string) (*ec2.RouteTable, error) {
// request := &ec2.DescribeRouteTablesInput{
// Filters: []*ec2.Filter{fi.NewEC2Filter("association.subnet-id", subnetID)},
// }
//
// response, err := cloud.EC2.DescribeRouteTables(request)
// if err != nil {
// return nil, fmt.Errorf("error listing RouteTables: %v", err)
// }
// if response == nil || len(response.RouteTables) == 0 {
// return nil, nil
// }
//
// if len(response.RouteTables) != 1 {
// return nil, fmt.Errorf("found multiple RouteTables matching tags")
// }
// rt := response.RouteTables[0]
// return rt, nil
//}
//
//func findElasticIP(cloud awsup.AWSCloud, publicIP string) (*ec2.Address, error) {
// request := &ec2.DescribeAddressesInput{
// PublicIps: []*string{&publicIP},
// }
//
// response, err := cloud.EC2.DescribeAddresses(request)
// if err != nil {
// if awsErr, ok := err.(awserr.Error); ok {
// if awsErr.Code() == "InvalidAddress.NotFound" {
// return nil, nil
// }
// }
// return nil, fmt.Errorf("error listing Addresses: %v", err)
// }
// if response == nil || len(response.Addresses) == 0 {
// return nil, nil
// }
//
// if len(response.Addresses) != 1 {
// return nil, fmt.Errorf("found multiple Addresses matching IP %q", publicIP)
// }
// return response.Addresses[0], nil
//}
func findInstances(c awsup.AWSCloud) ([]*ec2.Instance, error) {
filters := resources.BuildEC2Filters(c)
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
glog.V(2).Infof("Querying EC2 instances")
var instances []*ec2.Instance
err := c.EC2().DescribeInstancesPages(request, func(p *ec2.DescribeInstancesOutput, lastPage bool) bool {
for _, reservation := range p.Reservations {
for _, instance := range reservation.Instances {
instances = append(instances, instance)
}
}
return true
})
if err != nil {
return nil, fmt.Errorf("error describing instances: %v", err)
}
return instances, nil
}
//func GetMetadata(t *NodeSSH, key string) (string, error) {
// b, err := t.exec("curl -s http://169.254.169.254/latest/meta-data/" + key)
// if err != nil {
// return "", fmt.Errorf("error querying for metadata %q: %v", key, err)
// }
// return string(b), nil
//}
//
//func InstanceType(t *NodeSSH) (string, error) {
// return GetMetadata(t, "instance-type")
//}
//
//func GetMetadataList(t *NodeSSH, key string) ([]string, error) {
// d, err := GetMetadata(t, key)
// if err != nil {
// return nil, err
// }
// var macs []string
// for _, line := range strings.Split(d, "\n") {
// mac := line
// mac = strings.Trim(mac, "/")
// mac = strings.TrimSpace(mac)
// if mac == "" {
// continue
// }
// macs = append(macs, mac)
// }
//
// return macs, nil
//}
// Fetch instance UserData
func GetInstanceUserData(cloud awsup.AWSCloud, instanceID string) ([]byte, error) {
request := &ec2.DescribeInstanceAttributeInput{}
request.InstanceId = aws.String(instanceID)
request.Attribute = aws.String("userData")
response, err := cloud.EC2().DescribeInstanceAttribute(request)
if err != nil {
return nil, fmt.Errorf("error querying EC2 for user metadata for instance %q: %v", instanceID, err)
}
if response.UserData != nil {
b, err := base64.StdEncoding.DecodeString(aws.StringValue(response.UserData.Value))
if err != nil {
return nil, fmt.Errorf("error decoding EC2 UserData: %v", err)
}
return b, nil
}
return nil, nil
}
type UserDataConfiguration struct {
Version string
Settings map[string]string
}
func (u *UserDataConfiguration) ParseBool(key string) *bool {
s := u.Settings[key]
if s == "" {
return nil
}
s = strings.ToLower(s)
if s == "true" || s == "1" || s == "y" || s == "yes" || s == "t" {
return fi.Bool(true)
}
return fi.Bool(false)
}
func (u *UserDataConfiguration) ParseCert(key string) (*pki.Certificate, error) {
s := u.Settings[key]
if s == "" {
return nil, nil
}
data, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return nil, fmt.Errorf("error decoding base64 certificate %q: %v", key, err)
}
cert, err := pki.ParsePEMCertificate(data)
if err != nil {
return nil, fmt.Errorf("error parsing certificate %q: %v", key, err)
}
return cert, nil
}
func (u *UserDataConfiguration) ParseKey(key string) (*pki.PrivateKey, error) {
s := u.Settings[key]
if s == "" {
return nil, nil
}
data, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return nil, fmt.Errorf("error decoding base64 private key %q: %v", key, err)
}
k, err := pki.ParsePEMPrivateKey(data)
if err != nil {
return nil, fmt.Errorf("error parsing private key %q: %v", key, err)
}
return k, nil
}
func ParseUserDataConfiguration(raw []byte) (*UserDataConfiguration, error) {
userData, err := UserDataToString(raw)
if err != nil {
return nil, err
}
settings := make(map[string]string)
version := ""
if strings.Contains(userData, "install-salt master") || strings.Contains(userData, "dpkg -s salt-minion") {
version = "1.1"
} else {
version = "1.2"
}
if version == "1.1" {
for _, line := range strings.Split(userData, "\n") {
if !strings.HasPrefix(line, "readonly ") {
continue
}
line = line[9:]
sep := strings.Index(line, "=")
k := ""
v := ""
if sep != -1 {
k = line[0:sep]
v = line[sep+1:]
}
if k == "" {
glog.V(4).Infof("Unknown line: %s", line)
}
if len(v) >= 2 && v[0] == '\'' && v[len(v)-1] == '\'' {
v = v[1 : len(v)-1]
}
settings[k] = v
}
} else {
for _, line := range strings.Split(userData, "\n") {
sep := strings.Index(line, ": ")
k := ""
v := ""
if sep != -1 {
k = line[0:sep]
v = line[sep+2:]
}
if k == "" {
glog.V(4).Infof("Unknown line: %s", line)
}
if len(v) >= 2 && v[0] == '"' && v[len(v)-1] == '"' {
v = v[1 : len(v)-1]
} else if len(v) >= 2 && v[0] == '\'' && v[len(v)-1] == '\'' {
v = v[1 : len(v)-1]
}
settings[k] = v
}
}
c := &UserDataConfiguration{
Version: version,
Settings: settings,
}
return c, nil
}
func UserDataToString(userData []byte) (string, error) {
var err error
if len(userData) > 2 && userData[0] == 31 && userData[1] == 139 {
// GZIP
glog.V(2).Infof("gzip data detected; will decompress")
userData, err = gunzipBytes(userData)
if err != nil {
return "", fmt.Errorf("error decompressing user data: %v", err)
}
}
return string(userData), nil
}
func gunzipBytes(d []byte) ([]byte, error) {
var out bytes.Buffer
in := bytes.NewReader(d)
r, err := gzip.NewReader(in)
if err != nil {
return nil, fmt.Errorf("error building gunzip reader: %v", err)
}
defer r.Close()
_, err = io.Copy(&out, r)
if err != nil {
return nil, fmt.Errorf("error decompressing data: %v", err)
}
return out.Bytes(), nil
}