Protokube: prototyping the 'missing' kubelet pieces

Working towards self-hosting of k8s, we will likely have to add some
features to kubelet, such as independent mounting of disks or copying of
resources from S3.  protokube lets us develop those features prior to
moving them into kubelet.

In particular, today we need to mount an EBS volume on the master prior
to starting kubelet, if we want to run the master in an ASG.

protokube is a service that runs on boot, and it tries to mount the
master volume.  Once it mounts the master volume, it runs kubelet.
Currently it runs kubelet by looking at a directory
/etc/kubernetes/bootstrap; the intention is that we could actually have
multiple versions of kubelet in here (or other services) and then we
could automatically roll-back from a failed update.
This commit is contained in:
Justin Santa Barbara 2016-05-30 17:58:57 -04:00
parent 212224b3cc
commit b11ad36f94
10 changed files with 736 additions and 0 deletions

3
protokube/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.build/
vendor/

23
protokube/Makefile Normal file
View File

@ -0,0 +1,23 @@
gocode: godeps
go install k8s.io/kube-deploy/protokube/cmd/...
godeps:
# I think strip-vendor is the workaround for 25572
glide install --strip-vendor --strip-vcs
tar: gocode
rm -rf .build/tar
mkdir -p .build/tar/protokube/root
cp ${GOPATH}/bin/protokube .build/tar/protokube/root
tar czvf .build/protokube.tar.gz -C .build/tar/ .
tar tvf .build/protokube.tar.gz
(sha1sum .build/protokube.tar.gz | cut -d' ' -f1) > .build/protokube.tar.gz.sha1
upload: tar
rm -rf .build/s3
mkdir -p .build/s3/protokube
cp .build/protokube.tar.gz .build/s3/protokube/
cp .build/protokube.tar.gz.sha1 .build/s3/protokube/
aws s3 sync .build/s3/ s3://kubeupv2/
aws s3api put-object-acl --bucket kubeupv2 --key protokube/protokube.tar.gz --acl public-read
aws s3api put-object-acl --bucket kubeupv2 --key protokube/protokube.tar.gz.sha1 --acl public-read

View File

@ -0,0 +1,57 @@
package main
import (
"flag"
"github.com/golang/glog"
"k8s.io/kube-deploy/protokube/pkg/protokube"
"os"
)
func main() {
//flagModel := "model"
//flag.StringVar(&flagModel, "model", flagModel, "directory to use as model for desired configuration")
//var flagConf string
//flag.StringVar(&flagConf, "conf", "node.yaml", "configuration location")
//var flagAssetDir string
//flag.StringVar(&flagAssetDir, "assets", "/var/cache/nodeup", "the location for the local asset cache")
//
//dryrun := false
//flag.BoolVar(&dryrun, "dryrun", false, "Don't create cloud resources; just show what would be done")
//target := "direct"
//flag.StringVar(&target, "target", target, "Target - direct, cloudinit")
//if dryrun {
// target = "dryrun"
//}
flag.Set("logtostderr", "true")
flag.Parse()
volumes, err := protokube.NewAWSVolumes()
if err != nil {
glog.Errorf("Error initializing AWS: %q", err)
os.Exit(1)
}
//if flagConf == "" {
// glog.Exitf("--conf is required")
//}
kubeboot := protokube.NewKubeBoot(volumes)
err = kubeboot.Bootstrap()
if err != nil {
glog.Errorf("Error during bootstrap: %q", err)
os.Exit(1)
}
glog.Infof("Bootstrap complete; starting kubelet")
err = kubeboot.RunBootstrapTasks()
if err != nil {
glog.Errorf("Error during bootstrap: %q", err)
os.Exit(1)
}
glog.Infof("Unexpected exited from kubelet run")
os.Exit(1)
}

39
protokube/glide.lock generated Normal file
View File

@ -0,0 +1,39 @@
hash: d51b01457dd5499bc488e3a367f00fd9e935cdf125296e9451bbf39458fe255a
updated: 2016-05-29T08:36:17.761287445-04:00
imports:
- name: github.com/aws/aws-sdk-go
version: c924893c38ecc04b18d7aab8a7aa561cb8b4c4cc
subpackages:
- aws
- aws/ec2metadata
- aws/request
- aws/session
- service/ec2
- aws/awserr
- aws/credentials
- aws/client
- aws/client/metadata
- aws/awsutil
- aws/corehandlers
- aws/defaults
- private/endpoints
- private/protocol
- private/protocol/ec2query
- private/signer/v4
- private/waiter
- aws/credentials/ec2rolecreds
- private/protocol/query/queryutil
- private/protocol/xml/xmlutil
- private/protocol/rest
- name: github.com/go-ini/ini
version: 2e44421e256d82ebbf3d4d4fcabe8930b905eff3
- name: github.com/golang/glog
version: 23def4e6c14b4da8ac2ed8007337bc5eb5007998
- name: github.com/jmespath/go-jmespath
version: 3433f3ea46d9f8019119e7dd41274e112a2359a9
- name: k8s.io/kubernetes
version: a99e4ca79334cefd5fbdd0fd7f78a6b2595f2fde
subpackages:
- pkg/util/exec
- pkg/util/mount
devImports: []

14
protokube/glide.yaml Normal file
View File

@ -0,0 +1,14 @@
package: k8s.io/kube-deploy/protokube
import:
- package: github.com/aws/aws-sdk-go
subpackages:
- aws
- aws/ec2metadata
- aws/request
- aws/session
- service/ec2
- package: github.com/golang/glog
- package: k8s.io/kubernetes
subpackages:
- pkg/util/exec
- pkg/util/mount

View File

@ -0,0 +1,258 @@
package protokube
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog"
"time"
)
// The tag name we use to differentiate multiple logically independent clusters running in the same region
const TagNameKubernetesCluster = "KubernetesCluster"
// The tag name we use for specifying that something is in the master role
const TagNameRoleMaster = "k8s.io/role/master"
const DefaultAttachDevice = "/dev/xvdb"
type AWSVolumes struct {
ec2 *ec2.EC2
metadata *ec2metadata.EC2Metadata
zone string
clusterTag string
instanceId string
}
var _ Volumes = &AWSVolumes{}
func NewAWSVolumes() (*AWSVolumes, error) {
a := &AWSVolumes{}
s := session.New()
s.Handlers.Send.PushFront(func(r *request.Request) {
// Log requests
glog.V(4).Infof("AWS API Request: %s/%s", r.ClientInfo.ServiceName, r.Operation)
})
config := aws.NewConfig()
a.metadata = ec2metadata.New(s, config)
region, err := a.metadata.Region()
if err != nil {
return nil, fmt.Errorf("error querying ec2 metadata service (for az/region): %v", err)
}
a.zone, err = a.metadata.GetMetadata("placement/availability-zone")
if err != nil {
return nil, fmt.Errorf("error querying ec2 metadata service (for az): %v", err)
}
a.instanceId, err = a.metadata.GetMetadata("instance-id")
if err != nil {
return nil, fmt.Errorf("error querying ec2 metadata service (for instance-id): %v", err)
}
a.ec2 = ec2.New(s, config.WithRegion(region))
err = a.discoverTags()
if err != nil {
return nil, err
}
return a, nil
}
func (a *AWSVolumes) discoverTags() error {
instance, err := a.describeInstance()
if err != nil {
return err
}
tagMap := make(map[string]string)
for _, tag := range instance.Tags {
tagMap[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value)
}
clusterID := tagMap[TagNameKubernetesCluster]
if clusterID == "" {
return fmt.Errorf("Cluster tag %q not found on this instance (%q)", TagNameKubernetesCluster, a.instanceId)
}
a.clusterTag = clusterID
return nil
}
func (a *AWSVolumes) describeInstance() (*ec2.Instance, error) {
request := &ec2.DescribeInstancesInput{}
request.InstanceIds = []*string{&a.instanceId}
var instances []*ec2.Instance
err := a.ec2.DescribeInstancesPages(request, func(p *ec2.DescribeInstancesOutput, lastPage bool) (shouldContinue bool) {
for _, r := range p.Reservations {
instances = append(instances, r.Instances...)
}
return true
})
if err != nil {
return nil, fmt.Errorf("error querying for EC2 instance %q: %v", a.instanceId, err)
}
if len(instances) != 1 {
return nil, fmt.Errorf("unexpected number of instances found with id %q: %d", a.instanceId, len(instances))
}
return instances[0], nil
}
func newEc2Filter(name string, value string) *ec2.Filter {
filter := &ec2.Filter{
Name: aws.String(name),
Values: []*string{
aws.String(value),
},
}
return filter
}
func (a *AWSVolumes) FindMountedVolumes() ([]*Volume, error) {
request := &ec2.DescribeVolumesInput{}
request.Filters = []*ec2.Filter{
newEc2Filter("tag:"+TagNameKubernetesCluster, a.clusterTag),
newEc2Filter("tag-key", TagNameRoleMaster),
newEc2Filter("attachment.instance-id", a.instanceId),
}
var volumes []*Volume
err := a.ec2.DescribeVolumesPages(request, func(p *ec2.DescribeVolumesOutput, lastPage bool) (shouldContinue bool) {
for _, v := range p.Volumes {
vol := &Volume{
Name: aws.StringValue(v.VolumeId),
Available: false,
}
var myAttachment *ec2.VolumeAttachment
for _, attachment := range v.Attachments {
if aws.StringValue(attachment.InstanceId) == a.instanceId {
myAttachment = attachment
}
}
if myAttachment == nil {
glog.Warningf("Requested volumes attached to this instance, but volume %q was returned that was not attached", a.instanceId)
continue
}
vol.Device = aws.StringValue(myAttachment.Device)
volumes = append(volumes, vol)
}
return true
})
if err != nil {
return nil, fmt.Errorf("error querying for EC2 volumes: %v", err)
}
return volumes, nil
}
func (a *AWSVolumes) FindMountableVolumes() ([]*Volume, error) {
request := &ec2.DescribeVolumesInput{}
request.Filters = []*ec2.Filter{
newEc2Filter("tag:"+TagNameKubernetesCluster, a.clusterTag),
newEc2Filter("tag-key", TagNameRoleMaster),
newEc2Filter("availability-zone", a.zone),
}
var volumes []*Volume
err := a.ec2.DescribeVolumesPages(request, func(p *ec2.DescribeVolumesOutput, lastPage bool) (shouldContinue bool) {
for _, v := range p.Volumes {
vol := &Volume{
Name: aws.StringValue(v.VolumeId),
}
state := aws.StringValue(v.State)
switch state {
case "available":
vol.Available = true
break
}
volumes = append(volumes, vol)
}
return true
})
if err != nil {
return nil, fmt.Errorf("error querying for EC2 volumes: %v", err)
}
return volumes, nil
}
// AttachVolume attaches the specified volume to this instance, returning the mountpoint & nil if successful
func (a *AWSVolumes) AttachVolume(volume *Volume) (string, error) {
volumeID := volume.Name
device := volume.Device
if device == "" {
device = DefaultAttachDevice
request := &ec2.AttachVolumeInput{
Device: aws.String(device),
InstanceId: aws.String(a.instanceId),
VolumeId: aws.String(volumeID),
}
attachResponse, err := a.ec2.AttachVolume(request)
if err != nil {
return "", fmt.Errorf("Error attaching EBS volume %q: %v", volumeID, err)
}
glog.V(2).Infof("AttachVolume request returned %v", attachResponse)
}
// Wait (forever) for volume to attach or reach a failure-to-attach condition
for {
time.Sleep(10 * time.Second)
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{&volumeID},
}
response, err := a.ec2.DescribeVolumes(request)
if err != nil {
return "", fmt.Errorf("Error describing EBS volume %q: %v", volumeID, err)
}
attachmentState := ""
for _, v := range response.Volumes {
for _, a := range v.Attachments {
attachmentState = aws.StringValue(a.State)
}
}
if attachmentState == "" {
// TODO: retry?
// Not attached
return "", fmt.Errorf("Attach was requested, but volume %q was not seen as attaching", volumeID)
}
switch attachmentState {
case "attached":
return device, nil
case "attaching":
glog.V(2).Infof("Waiting for volume %q to be attached (currently %q)", volumeID, attachmentState)
// continue looping
default:
return "", fmt.Errorf("Observed unexpected volume state %q", attachmentState)
}
}
}

View File

@ -0,0 +1,46 @@
package protokube
import (
"github.com/golang/glog"
"time"
)
type KubeBoot struct {
volumes Volumes
}
func NewKubeBoot(volumes Volumes) *KubeBoot {
k := &KubeBoot{
volumes: volumes,
}
return k
}
func (k *KubeBoot) Bootstrap() error {
for {
done, err := k.tryBootstrap()
if err != nil {
glog.Warningf("error during attempt to acquire master volume (will sleep and retry): %v", err)
} else if done {
break
} else {
glog.Infof("unable to acquire master volume; will sleep and retry")
}
time.Sleep(1 * time.Minute)
}
return nil
}
func (k *KubeBoot) tryBootstrap() (bool, error) {
mountpoint, err := k.mountMasterVolume()
if err != nil {
return false, err
}
glog.Infof("mounted master on %s", mountpoint)
// TODO: Should we set up symlinks here?
return true, nil
}

View File

@ -0,0 +1,117 @@
package protokube
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"io"
"io/ioutil"
"os"
"os/exec"
"path"
"sync"
)
const BootstrapDir = "/etc/kubernetes/bootstrap"
type BootstrapTask struct {
Command []string `json:"command"`
}
// RunKubelet runs the bootstrap tasks, and watches them until they exit
// Currently only one task is supported / will work properly
func (k *KubeBoot) RunBootstrapTasks() error {
dirs, err := ioutil.ReadDir(BootstrapDir)
if err != nil {
return fmt.Errorf("error listing %q: %v", BootstrapDir, err)
}
for _, dir := range dirs {
if !dir.IsDir() {
continue
}
p := path.Join(BootstrapDir, dir.Name())
files, err := ioutil.ReadDir(p)
if err != nil {
return fmt.Errorf("error listing %q: %v", p, err)
}
if len(files) == 0 {
glog.Infof("No files in %q; ignoring", p)
continue
}
// TODO: Support more than one bootstrap task?
// TODO: Have multiple proto-kubelet configurations to support recovery?
// i.e. launch newest version that stays up?
fp := path.Join(p, files[0].Name())
err = k.runBootstrapTask(fp)
if err != nil {
return fmt.Errorf("error running bootstrap task %q: %v", fp, err)
}
}
return nil
}
// RunKubelet runs a bootstrap task and watches it until it exits
func (k *KubeBoot) runBootstrapTask(path string) error {
// TODO: Use a file lock or similar to only start proto-kubelet if real-kubelet is not running?
data, err := ioutil.ReadFile(path)
if err != nil {
return fmt.Errorf("error reading task %q: %v", path, err)
}
task := &BootstrapTask{}
err = json.Unmarshal(data, task)
if err != nil {
return fmt.Errorf("error parsing task %q: %v", path, err)
}
name := task.Command[0]
args := task.Command[1:]
cmd := exec.Command(name, args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("error building stdout pipe: %v", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("error building stderr pipe: %v", err)
}
wg := new(sync.WaitGroup)
wg.Add(2)
err = cmd.Start()
if err != nil {
return fmt.Errorf("error starting command %q: %v", task.Command, err)
}
go copyStream(os.Stdout, stdout, wg)
go copyStream(os.Stderr, stderr, wg)
wg.Wait()
err = cmd.Wait()
if err != nil {
return fmt.Errorf("error from command %q: %v", task.Command, err)
}
return nil
}
func copyStream(dst io.Writer, src io.ReadCloser, waitGroup *sync.WaitGroup) {
_, err := io.Copy(dst, src)
if err != nil {
// Not entirely sure if we need to do something special in this case?
glog.Warningf("error copying stream: %v", err)
}
waitGroup.Done()
}

View File

@ -0,0 +1,166 @@
package protokube
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
"os"
"time"
)
const MasterMountpoint = "/master-pd"
func (k *KubeBoot) mountMasterVolume() (string, error) {
// TODO: mount ephemeral volumes (particular on AWS)?
// Mount a master volume
device, err := k.attachMasterVolume()
if err != nil {
return "", fmt.Errorf("unable to attach master volume: %q", err)
}
if device == "" {
return "", nil
}
glog.V(2).Infof("Master volume is attached at %q", device)
fstype := ""
err = k.safeFormatAndMount(device, MasterMountpoint, fstype)
if err != nil {
return "", fmt.Errorf("unable to mount master volume: %q", err)
}
return MasterMountpoint, nil
}
func (k *KubeBoot) safeFormatAndMount(device string, mountpoint string, fstype string) error {
// Wait for the device to show up
for {
_, err := os.Stat(device)
if err == nil {
break
}
if !os.IsNotExist(err) {
return fmt.Errorf("error checking for device %q: %v", device, err)
}
glog.Infof("Waiting for device %q to be attached", device)
time.Sleep(1 * time.Second)
}
glog.Infof("Found device %q", device)
// Mount the device
mounter := &mount.SafeFormatAndMount{Interface: mount.New(), Runner: exec.New()}
// Only mount the PD globally once.
notMnt, err := mounter.IsLikelyNotMountPoint(mountpoint)
if err != nil {
if os.IsNotExist(err) {
glog.Infof("Creating mount directory %q", mountpoint)
if err := os.MkdirAll(mountpoint, 0750); err != nil {
return err
}
notMnt = true
} else {
return err
}
}
options := []string{}
//if readOnly {
// options = append(options, "ro")
//}
if notMnt {
glog.Infof("Mounting device %q on %q", device, mountpoint)
err = mounter.FormatAndMount(device, mountpoint, fstype, options)
if err != nil {
//os.Remove(mountpoint)
return fmt.Errorf("error formatting and mounting disk %q on %q: %v", device, mountpoint, err)
}
} else {
glog.Infof("Device already mounted on : %q, verifying it is our device", mountpoint)
mounts, err := mounter.List()
if err != nil {
return fmt.Errorf("error listing existing mounts: %v", err)
}
var existing []*mount.MountPoint
for i := range mounts {
m := &mounts[i]
if m.Path == mountpoint {
existing = append(existing, m)
}
}
if len(existing) != 1 {
glog.Infof("Existing mounts unexpected")
for i := range mounts {
m := &mounts[i]
glog.Infof("%s\t%s", m.Device, m.Path)
}
}
if len(existing) == 0 {
return fmt.Errorf("Unable to find existing mount of %q at %q", device, mountpoint)
} else if len(existing) != 1 {
return fmt.Errorf("Found multiple existing mounts of %q at %q", device, mountpoint)
} else {
glog.Infof("Found existing mount of %q and %q", device, mountpoint)
}
}
return nil
}
func (k *KubeBoot) attachMasterVolume() (string, error) {
volumes, err := k.volumes.FindMountedVolumes()
if err != nil {
return "", err
}
if len(volumes) != 0 {
if len(volumes) != 1 {
// TODO: unmount?
glog.Warningf("Found multiple master volumes: %v", volumes)
}
glog.V(2).Infof("Found master volume already attached: %q", volumes[0].Name)
device, err := k.volumes.AttachVolume(volumes[0])
if err != nil {
return "", fmt.Errorf("Error attaching volume %q: %v", volumes[0].Name, err)
}
return device, nil
}
volumes, err = k.volumes.FindMountableVolumes()
if err != nil {
return "", err
}
if len(volumes) == 0 {
glog.Infof("No available master volumes")
return "", nil
}
for _, volume := range volumes {
if !volume.Available {
continue
}
glog.V(2).Infof("Trying to mount master volume: %q", volume.Name)
device, err := k.volumes.AttachVolume(volume)
if err != nil {
return "", fmt.Errorf("Error attaching volume %q: %v", volume.Name, err)
}
return device, nil
}
return "", nil
}

View File

@ -0,0 +1,13 @@
package protokube
type Volumes interface {
AttachVolume(volume *Volume) (string, error)
FindMountedVolumes() ([]*Volume, error)
FindMountableVolumes() ([]*Volume, error)
}
type Volume struct {
Name string
Device string
Available bool
}