diff --git a/protokube/.gitignore b/protokube/.gitignore new file mode 100644 index 0000000000..5cc02d3794 --- /dev/null +++ b/protokube/.gitignore @@ -0,0 +1,3 @@ +.build/ +vendor/ + diff --git a/protokube/Makefile b/protokube/Makefile new file mode 100644 index 0000000000..e90ac3eee6 --- /dev/null +++ b/protokube/Makefile @@ -0,0 +1,23 @@ +gocode: godeps + go install k8s.io/kube-deploy/protokube/cmd/... + +godeps: + # I think strip-vendor is the workaround for 25572 + glide install --strip-vendor --strip-vcs + +tar: gocode + rm -rf .build/tar + mkdir -p .build/tar/protokube/root + cp ${GOPATH}/bin/protokube .build/tar/protokube/root + tar czvf .build/protokube.tar.gz -C .build/tar/ . + tar tvf .build/protokube.tar.gz + (sha1sum .build/protokube.tar.gz | cut -d' ' -f1) > .build/protokube.tar.gz.sha1 + +upload: tar + rm -rf .build/s3 + mkdir -p .build/s3/protokube + cp .build/protokube.tar.gz .build/s3/protokube/ + cp .build/protokube.tar.gz.sha1 .build/s3/protokube/ + aws s3 sync .build/s3/ s3://kubeupv2/ + aws s3api put-object-acl --bucket kubeupv2 --key protokube/protokube.tar.gz --acl public-read + aws s3api put-object-acl --bucket kubeupv2 --key protokube/protokube.tar.gz.sha1 --acl public-read diff --git a/protokube/cmd/protokube/main.go b/protokube/cmd/protokube/main.go new file mode 100644 index 0000000000..06859ce0f3 --- /dev/null +++ b/protokube/cmd/protokube/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "flag" + "github.com/golang/glog" + "k8s.io/kube-deploy/protokube/pkg/protokube" + "os" +) + +func main() { + //flagModel := "model" + //flag.StringVar(&flagModel, "model", flagModel, "directory to use as model for desired configuration") + //var flagConf string + //flag.StringVar(&flagConf, "conf", "node.yaml", "configuration location") + //var flagAssetDir string + //flag.StringVar(&flagAssetDir, "assets", "/var/cache/nodeup", "the location for the local asset cache") + // + //dryrun := false + //flag.BoolVar(&dryrun, "dryrun", false, "Don't create cloud resources; just show what would be done") + //target := "direct" + //flag.StringVar(&target, "target", target, "Target - direct, cloudinit") + + //if dryrun { + // target = "dryrun" + //} + + flag.Set("logtostderr", "true") + flag.Parse() + + volumes, err := protokube.NewAWSVolumes() + if err != nil { + glog.Errorf("Error initializing AWS: %q", err) + os.Exit(1) + } + + //if flagConf == "" { + // glog.Exitf("--conf is required") + //} + + kubeboot := protokube.NewKubeBoot(volumes) + err = kubeboot.Bootstrap() + if err != nil { + glog.Errorf("Error during bootstrap: %q", err) + os.Exit(1) + } + + glog.Infof("Bootstrap complete; starting kubelet") + + err = kubeboot.RunBootstrapTasks() + if err != nil { + glog.Errorf("Error during bootstrap: %q", err) + os.Exit(1) + } + + glog.Infof("Unexpected exited from kubelet run") + os.Exit(1) +} diff --git a/protokube/glide.lock b/protokube/glide.lock new file mode 100644 index 0000000000..e526bedb4a --- /dev/null +++ b/protokube/glide.lock @@ -0,0 +1,39 @@ +hash: d51b01457dd5499bc488e3a367f00fd9e935cdf125296e9451bbf39458fe255a +updated: 2016-05-29T08:36:17.761287445-04:00 +imports: +- name: github.com/aws/aws-sdk-go + version: c924893c38ecc04b18d7aab8a7aa561cb8b4c4cc + subpackages: + - aws + - aws/ec2metadata + - aws/request + - aws/session + - service/ec2 + - aws/awserr + - aws/credentials + - aws/client + - aws/client/metadata + - aws/awsutil + - aws/corehandlers + - aws/defaults + - private/endpoints + - private/protocol + - private/protocol/ec2query + - private/signer/v4 + - private/waiter + - aws/credentials/ec2rolecreds + - private/protocol/query/queryutil + - private/protocol/xml/xmlutil + - private/protocol/rest +- name: github.com/go-ini/ini + version: 2e44421e256d82ebbf3d4d4fcabe8930b905eff3 +- name: github.com/golang/glog + version: 23def4e6c14b4da8ac2ed8007337bc5eb5007998 +- name: github.com/jmespath/go-jmespath + version: 3433f3ea46d9f8019119e7dd41274e112a2359a9 +- name: k8s.io/kubernetes + version: a99e4ca79334cefd5fbdd0fd7f78a6b2595f2fde + subpackages: + - pkg/util/exec + - pkg/util/mount +devImports: [] diff --git a/protokube/glide.yaml b/protokube/glide.yaml new file mode 100644 index 0000000000..e8d280f8d6 --- /dev/null +++ b/protokube/glide.yaml @@ -0,0 +1,14 @@ +package: k8s.io/kube-deploy/protokube +import: +- package: github.com/aws/aws-sdk-go + subpackages: + - aws + - aws/ec2metadata + - aws/request + - aws/session + - service/ec2 +- package: github.com/golang/glog +- package: k8s.io/kubernetes + subpackages: + - pkg/util/exec + - pkg/util/mount diff --git a/protokube/pkg/protokube/aws_volume.go b/protokube/pkg/protokube/aws_volume.go new file mode 100644 index 0000000000..79099cf045 --- /dev/null +++ b/protokube/pkg/protokube/aws_volume.go @@ -0,0 +1,258 @@ +package protokube + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/golang/glog" + "time" +) + +// The tag name we use to differentiate multiple logically independent clusters running in the same region +const TagNameKubernetesCluster = "KubernetesCluster" + +// The tag name we use for specifying that something is in the master role +const TagNameRoleMaster = "k8s.io/role/master" + +const DefaultAttachDevice = "/dev/xvdb" + +type AWSVolumes struct { + ec2 *ec2.EC2 + metadata *ec2metadata.EC2Metadata + + zone string + clusterTag string + instanceId string +} + +var _ Volumes = &AWSVolumes{} + +func NewAWSVolumes() (*AWSVolumes, error) { + a := &AWSVolumes{} + + s := session.New() + s.Handlers.Send.PushFront(func(r *request.Request) { + // Log requests + glog.V(4).Infof("AWS API Request: %s/%s", r.ClientInfo.ServiceName, r.Operation) + }) + + config := aws.NewConfig() + a.metadata = ec2metadata.New(s, config) + + region, err := a.metadata.Region() + if err != nil { + return nil, fmt.Errorf("error querying ec2 metadata service (for az/region): %v", err) + } + + a.zone, err = a.metadata.GetMetadata("placement/availability-zone") + if err != nil { + return nil, fmt.Errorf("error querying ec2 metadata service (for az): %v", err) + } + + a.instanceId, err = a.metadata.GetMetadata("instance-id") + if err != nil { + return nil, fmt.Errorf("error querying ec2 metadata service (for instance-id): %v", err) + } + + a.ec2 = ec2.New(s, config.WithRegion(region)) + + err = a.discoverTags() + if err != nil { + return nil, err + } + + return a, nil +} + +func (a *AWSVolumes) discoverTags() error { + instance, err := a.describeInstance() + if err != nil { + return err + } + + tagMap := make(map[string]string) + for _, tag := range instance.Tags { + tagMap[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value) + } + + clusterID := tagMap[TagNameKubernetesCluster] + if clusterID == "" { + return fmt.Errorf("Cluster tag %q not found on this instance (%q)", TagNameKubernetesCluster, a.instanceId) + } + + a.clusterTag = clusterID + + return nil +} + +func (a *AWSVolumes) describeInstance() (*ec2.Instance, error) { + request := &ec2.DescribeInstancesInput{} + request.InstanceIds = []*string{&a.instanceId} + + var instances []*ec2.Instance + err := a.ec2.DescribeInstancesPages(request, func(p *ec2.DescribeInstancesOutput, lastPage bool) (shouldContinue bool) { + for _, r := range p.Reservations { + instances = append(instances, r.Instances...) + } + return true + }) + + if err != nil { + return nil, fmt.Errorf("error querying for EC2 instance %q: %v", a.instanceId, err) + } + + if len(instances) != 1 { + return nil, fmt.Errorf("unexpected number of instances found with id %q: %d", a.instanceId, len(instances)) + } + + return instances[0], nil +} + +func newEc2Filter(name string, value string) *ec2.Filter { + filter := &ec2.Filter{ + Name: aws.String(name), + Values: []*string{ + aws.String(value), + }, + } + return filter +} + +func (a *AWSVolumes) FindMountedVolumes() ([]*Volume, error) { + request := &ec2.DescribeVolumesInput{} + request.Filters = []*ec2.Filter{ + newEc2Filter("tag:"+TagNameKubernetesCluster, a.clusterTag), + newEc2Filter("tag-key", TagNameRoleMaster), + newEc2Filter("attachment.instance-id", a.instanceId), + } + + var volumes []*Volume + err := a.ec2.DescribeVolumesPages(request, func(p *ec2.DescribeVolumesOutput, lastPage bool) (shouldContinue bool) { + for _, v := range p.Volumes { + vol := &Volume{ + Name: aws.StringValue(v.VolumeId), + Available: false, + } + + var myAttachment *ec2.VolumeAttachment + + for _, attachment := range v.Attachments { + if aws.StringValue(attachment.InstanceId) == a.instanceId { + myAttachment = attachment + } + } + + if myAttachment == nil { + glog.Warningf("Requested volumes attached to this instance, but volume %q was returned that was not attached", a.instanceId) + continue + } + + vol.Device = aws.StringValue(myAttachment.Device) + volumes = append(volumes, vol) + } + return true + }) + + if err != nil { + return nil, fmt.Errorf("error querying for EC2 volumes: %v", err) + } + return volumes, nil +} + +func (a *AWSVolumes) FindMountableVolumes() ([]*Volume, error) { + request := &ec2.DescribeVolumesInput{} + request.Filters = []*ec2.Filter{ + newEc2Filter("tag:"+TagNameKubernetesCluster, a.clusterTag), + newEc2Filter("tag-key", TagNameRoleMaster), + newEc2Filter("availability-zone", a.zone), + } + + var volumes []*Volume + err := a.ec2.DescribeVolumesPages(request, func(p *ec2.DescribeVolumesOutput, lastPage bool) (shouldContinue bool) { + for _, v := range p.Volumes { + vol := &Volume{ + Name: aws.StringValue(v.VolumeId), + } + state := aws.StringValue(v.State) + + switch state { + case "available": + vol.Available = true + break + } + + volumes = append(volumes, vol) + } + return true + }) + + if err != nil { + return nil, fmt.Errorf("error querying for EC2 volumes: %v", err) + } + return volumes, nil +} + +// AttachVolume attaches the specified volume to this instance, returning the mountpoint & nil if successful +func (a *AWSVolumes) AttachVolume(volume *Volume) (string, error) { + volumeID := volume.Name + + device := volume.Device + if device == "" { + device = DefaultAttachDevice + + request := &ec2.AttachVolumeInput{ + Device: aws.String(device), + InstanceId: aws.String(a.instanceId), + VolumeId: aws.String(volumeID), + } + + attachResponse, err := a.ec2.AttachVolume(request) + if err != nil { + return "", fmt.Errorf("Error attaching EBS volume %q: %v", volumeID, err) + } + + glog.V(2).Infof("AttachVolume request returned %v", attachResponse) + } + + // Wait (forever) for volume to attach or reach a failure-to-attach condition + for { + time.Sleep(10 * time.Second) + + request := &ec2.DescribeVolumesInput{ + VolumeIds: []*string{&volumeID}, + } + + response, err := a.ec2.DescribeVolumes(request) + if err != nil { + return "", fmt.Errorf("Error describing EBS volume %q: %v", volumeID, err) + } + + attachmentState := "" + for _, v := range response.Volumes { + for _, a := range v.Attachments { + attachmentState = aws.StringValue(a.State) + } + } + + if attachmentState == "" { + // TODO: retry? + // Not attached + return "", fmt.Errorf("Attach was requested, but volume %q was not seen as attaching", volumeID) + } + + switch attachmentState { + case "attached": + return device, nil + + case "attaching": + glog.V(2).Infof("Waiting for volume %q to be attached (currently %q)", volumeID, attachmentState) + // continue looping + + default: + return "", fmt.Errorf("Observed unexpected volume state %q", attachmentState) + } + } +} diff --git a/protokube/pkg/protokube/kube_boot.go b/protokube/pkg/protokube/kube_boot.go new file mode 100644 index 0000000000..ceb72b13e8 --- /dev/null +++ b/protokube/pkg/protokube/kube_boot.go @@ -0,0 +1,46 @@ +package protokube + +import ( + "github.com/golang/glog" + "time" +) + +type KubeBoot struct { + volumes Volumes +} + +func NewKubeBoot(volumes Volumes) *KubeBoot { + k := &KubeBoot{ + volumes: volumes, + } + return k +} + +func (k *KubeBoot) Bootstrap() error { + for { + done, err := k.tryBootstrap() + if err != nil { + glog.Warningf("error during attempt to acquire master volume (will sleep and retry): %v", err) + } else if done { + break + } else { + glog.Infof("unable to acquire master volume; will sleep and retry") + } + + time.Sleep(1 * time.Minute) + } + + return nil +} + +func (k *KubeBoot) tryBootstrap() (bool, error) { + mountpoint, err := k.mountMasterVolume() + if err != nil { + return false, err + } + + glog.Infof("mounted master on %s", mountpoint) + // TODO: Should we set up symlinks here? + + return true, nil +} diff --git a/protokube/pkg/protokube/kube_boot_task.go b/protokube/pkg/protokube/kube_boot_task.go new file mode 100644 index 0000000000..a63174ff30 --- /dev/null +++ b/protokube/pkg/protokube/kube_boot_task.go @@ -0,0 +1,117 @@ +package protokube + +import ( + "encoding/json" + "fmt" + "github.com/golang/glog" + "io" + "io/ioutil" + "os" + "os/exec" + "path" + "sync" +) + +const BootstrapDir = "/etc/kubernetes/bootstrap" + +type BootstrapTask struct { + Command []string `json:"command"` +} + +// RunKubelet runs the bootstrap tasks, and watches them until they exit +// Currently only one task is supported / will work properly +func (k *KubeBoot) RunBootstrapTasks() error { + dirs, err := ioutil.ReadDir(BootstrapDir) + if err != nil { + return fmt.Errorf("error listing %q: %v", BootstrapDir, err) + } + + for _, dir := range dirs { + if !dir.IsDir() { + continue + } + + p := path.Join(BootstrapDir, dir.Name()) + files, err := ioutil.ReadDir(p) + if err != nil { + return fmt.Errorf("error listing %q: %v", p, err) + } + + if len(files) == 0 { + glog.Infof("No files in %q; ignoring", p) + continue + } + + // TODO: Support more than one bootstrap task? + + // TODO: Have multiple proto-kubelet configurations to support recovery? + // i.e. launch newest version that stays up? + + fp := path.Join(p, files[0].Name()) + err = k.runBootstrapTask(fp) + if err != nil { + return fmt.Errorf("error running bootstrap task %q: %v", fp, err) + } + } + return nil +} + +// RunKubelet runs a bootstrap task and watches it until it exits +func (k *KubeBoot) runBootstrapTask(path string) error { + // TODO: Use a file lock or similar to only start proto-kubelet if real-kubelet is not running? + + data, err := ioutil.ReadFile(path) + if err != nil { + return fmt.Errorf("error reading task %q: %v", path, err) + } + + task := &BootstrapTask{} + + err = json.Unmarshal(data, task) + if err != nil { + return fmt.Errorf("error parsing task %q: %v", path, err) + } + + name := task.Command[0] + args := task.Command[1:] + + cmd := exec.Command(name, args...) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("error building stdout pipe: %v", err) + } + stderr, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("error building stderr pipe: %v", err) + } + + wg := new(sync.WaitGroup) + wg.Add(2) + + err = cmd.Start() + if err != nil { + return fmt.Errorf("error starting command %q: %v", task.Command, err) + } + + go copyStream(os.Stdout, stdout, wg) + go copyStream(os.Stderr, stderr, wg) + + wg.Wait() + + err = cmd.Wait() + if err != nil { + return fmt.Errorf("error from command %q: %v", task.Command, err) + } + + return nil +} + +func copyStream(dst io.Writer, src io.ReadCloser, waitGroup *sync.WaitGroup) { + _, err := io.Copy(dst, src) + if err != nil { + // Not entirely sure if we need to do something special in this case? + glog.Warningf("error copying stream: %v", err) + } + waitGroup.Done() +} diff --git a/protokube/pkg/protokube/kube_boot_volumes.go b/protokube/pkg/protokube/kube_boot_volumes.go new file mode 100644 index 0000000000..2055684f3f --- /dev/null +++ b/protokube/pkg/protokube/kube_boot_volumes.go @@ -0,0 +1,166 @@ +package protokube + +import ( + "fmt" + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/util/mount" + "os" + "time" +) + +const MasterMountpoint = "/master-pd" + +func (k *KubeBoot) mountMasterVolume() (string, error) { + // TODO: mount ephemeral volumes (particular on AWS)? + + // Mount a master volume + device, err := k.attachMasterVolume() + if err != nil { + return "", fmt.Errorf("unable to attach master volume: %q", err) + } + + if device == "" { + return "", nil + } + + glog.V(2).Infof("Master volume is attached at %q", device) + + fstype := "" + err = k.safeFormatAndMount(device, MasterMountpoint, fstype) + if err != nil { + return "", fmt.Errorf("unable to mount master volume: %q", err) + } + + return MasterMountpoint, nil +} + +func (k *KubeBoot) safeFormatAndMount(device string, mountpoint string, fstype string) error { + // Wait for the device to show up + for { + _, err := os.Stat(device) + if err == nil { + break + } + if !os.IsNotExist(err) { + return fmt.Errorf("error checking for device %q: %v", device, err) + } + glog.Infof("Waiting for device %q to be attached", device) + time.Sleep(1 * time.Second) + } + glog.Infof("Found device %q", device) + + // Mount the device + + mounter := &mount.SafeFormatAndMount{Interface: mount.New(), Runner: exec.New()} + + // Only mount the PD globally once. + notMnt, err := mounter.IsLikelyNotMountPoint(mountpoint) + if err != nil { + if os.IsNotExist(err) { + glog.Infof("Creating mount directory %q", mountpoint) + if err := os.MkdirAll(mountpoint, 0750); err != nil { + return err + } + notMnt = true + } else { + return err + } + } + + options := []string{} + //if readOnly { + // options = append(options, "ro") + //} + if notMnt { + glog.Infof("Mounting device %q on %q", device, mountpoint) + + err = mounter.FormatAndMount(device, mountpoint, fstype, options) + if err != nil { + //os.Remove(mountpoint) + return fmt.Errorf("error formatting and mounting disk %q on %q: %v", device, mountpoint, err) + } + } else { + glog.Infof("Device already mounted on : %q, verifying it is our device", mountpoint) + + mounts, err := mounter.List() + if err != nil { + return fmt.Errorf("error listing existing mounts: %v", err) + } + + var existing []*mount.MountPoint + for i := range mounts { + m := &mounts[i] + if m.Path == mountpoint { + existing = append(existing, m) + } + } + + if len(existing) != 1 { + glog.Infof("Existing mounts unexpected") + + for i := range mounts { + m := &mounts[i] + glog.Infof("%s\t%s", m.Device, m.Path) + } + } + + if len(existing) == 0 { + return fmt.Errorf("Unable to find existing mount of %q at %q", device, mountpoint) + } else if len(existing) != 1 { + return fmt.Errorf("Found multiple existing mounts of %q at %q", device, mountpoint) + } else { + glog.Infof("Found existing mount of %q and %q", device, mountpoint) + } + + } + return nil +} + +func (k *KubeBoot) attachMasterVolume() (string, error) { + volumes, err := k.volumes.FindMountedVolumes() + if err != nil { + return "", err + } + + if len(volumes) != 0 { + if len(volumes) != 1 { + // TODO: unmount? + glog.Warningf("Found multiple master volumes: %v", volumes) + } + + glog.V(2).Infof("Found master volume already attached: %q", volumes[0].Name) + + device, err := k.volumes.AttachVolume(volumes[0]) + if err != nil { + return "", fmt.Errorf("Error attaching volume %q: %v", volumes[0].Name, err) + } + return device, nil + } + + volumes, err = k.volumes.FindMountableVolumes() + if err != nil { + return "", err + } + + if len(volumes) == 0 { + glog.Infof("No available master volumes") + return "", nil + } + + for _, volume := range volumes { + if !volume.Available { + continue + } + + glog.V(2).Infof("Trying to mount master volume: %q", volume.Name) + + device, err := k.volumes.AttachVolume(volume) + if err != nil { + return "", fmt.Errorf("Error attaching volume %q: %v", volume.Name, err) + } + return device, nil + } + + return "", nil +} diff --git a/protokube/pkg/protokube/volumes.go b/protokube/pkg/protokube/volumes.go new file mode 100644 index 0000000000..8a2aa0175b --- /dev/null +++ b/protokube/pkg/protokube/volumes.go @@ -0,0 +1,13 @@ +package protokube + +type Volumes interface { + AttachVolume(volume *Volume) (string, error) + FindMountedVolumes() ([]*Volume, error) + FindMountableVolumes() ([]*Volume, error) +} + +type Volume struct { + Name string + Device string + Available bool +}