diff --git a/protokube/Makefile b/protokube/Makefile index e90ac3eee6..cb9aaf5ad2 100644 --- a/protokube/Makefile +++ b/protokube/Makefile @@ -1,5 +1,5 @@ gocode: godeps - go install k8s.io/kube-deploy/protokube/cmd/... + go install k8s.io/kube-deploy/protokube/cmd/protokube godeps: # I think strip-vendor is the workaround for 25572 @@ -9,6 +9,8 @@ tar: gocode rm -rf .build/tar mkdir -p .build/tar/protokube/root cp ${GOPATH}/bin/protokube .build/tar/protokube/root + cp -r model/ .build/tar/protokube/root/model/ + cp -r templates/ .build/tar/protokube/root/templates/ tar czvf .build/protokube.tar.gz -C .build/tar/ . tar tvf .build/protokube.tar.gz (sha1sum .build/protokube.tar.gz | cut -d' ' -f1) > .build/protokube.tar.gz.sha1 @@ -21,3 +23,23 @@ upload: tar aws s3 sync .build/s3/ s3://kubeupv2/ aws s3api put-object-acl --bucket kubeupv2 --key protokube/protokube.tar.gz --acl public-read aws s3api put-object-acl --bucket kubeupv2 --key protokube/protokube.tar.gz.sha1 --acl public-read + +ssh-push: tar + scp .build/protokube.tar.gz ${TARGET}:/tmp/ + ssh ${TARGET} sudo tar zxf /tmp/protokube.tar.gz -C /opt/ + +gofmt: + gofmt -w -s cmd/ + gofmt -w -s pkg/ + +builder-image: + docker build -f images/builder/Dockerfile -t builder . + +build-in-docker: builder-image + docker run -it -v `pwd`:/src builder /onbuild.sh + +image: build-in-docker + docker build -t kope/protokube -f images/protokube/Dockerfile . + +push: image + docker push kope/protokube:latest diff --git a/protokube/cmd/protokube/main.go b/protokube/cmd/protokube/main.go index 91f7f3c65f..dce2bbaf86 100644 --- a/protokube/cmd/protokube/main.go +++ b/protokube/cmd/protokube/main.go @@ -2,30 +2,29 @@ package main import ( "flag" + "fmt" "github.com/golang/glog" "k8s.io/kube-deploy/protokube/pkg/protokube" + "net" "os" + "strings" ) func main() { - //flagModel := "model" - //flag.StringVar(&flagModel, "model", flagModel, "directory to use as model for desired configuration") - //var flagConf string - //flag.StringVar(&flagConf, "conf", "node.yaml", "configuration location") - //var flagAssetDir string - //flag.StringVar(&flagAssetDir, "assets", "/var/cache/nodeup", "the location for the local asset cache") - // - //dryrun := false - //flag.BoolVar(&dryrun, "dryrun", false, "Don't create cloud resources; just show what would be done") - //target := "direct" - //flag.StringVar(&target, "target", target, "Target - direct, cloudinit") - - //if dryrun { - // target = "dryrun" - //} - master := false - flag.BoolVar(&master, "master", false, "Act as master") + flag.BoolVar(&master, "master", master, "Act as master") + + containerized := false + flag.BoolVar(&containerized, "containerized", containerized, "Set if we are running containerized.") + + dnsZoneName := "" + flag.StringVar(&dnsZoneName, "dns-zone-name", dnsZoneName, "Name of zone to use for DNS") + + dnsInternalSuffix := "" + flag.StringVar(&dnsInternalSuffix, "dns-internal-suffix", dnsInternalSuffix, "DNS suffix for internal domain names") + + clusterID := "" + flag.StringVar(&clusterID, "cluster-id", clusterID, "Cluster ID") flag.Set("logtostderr", "true") flag.Parse() @@ -36,20 +35,80 @@ func main() { os.Exit(1) } - //if flagConf == "" { - // glog.Exitf("--conf is required") - //} + if clusterID == "" { + clusterID = volumes.ClusterID() + if clusterID == "" { + glog.Errorf("cluster-id is required (cannot be determined from cloud)") + os.Exit(1) + } else { + glog.Infof("Setting cluster-id from cloud: %s", clusterID) + } + } - kubeboot := protokube.NewKubeBoot(master, volumes) - err = kubeboot.Bootstrap() + if dnsInternalSuffix == "" { + // TODO: Maybe only master needs DNS? + dnsInternalSuffix = ".internal." + clusterID + glog.Infof("Setting dns-internal-suffix to %q", dnsInternalSuffix) + } + + // Make sure it's actually a suffix (starts with .) + if !strings.HasPrefix(dnsInternalSuffix, ".") { + dnsInternalSuffix = "." + dnsInternalSuffix + } + + if dnsZoneName == "" { + tokens := strings.Split(dnsInternalSuffix, ".") + dnsZoneName = strings.Join(tokens[len(tokens)-2:], ".") + } + + // Get internal IP from cloud, to avoid problems if we're in a container + // TODO: Just run with --net=host ?? + //internalIP, err := findInternalIP() + //if err != nil { + // glog.Errorf("Error finding internal IP: %q", err) + // os.Exit(1) + //} + internalIP := volumes.InternalIP() + + dns, err := protokube.NewRoute53DNSProvider(dnsZoneName) + if err != nil { + glog.Errorf("Error initializing DNS: %q", err) + os.Exit(1) + } + + rootfs := "/" + if containerized { + rootfs = "/rootfs/" + } + k := &protokube.KubeBoot{ + Containerized: containerized, + RootFS: rootfs, + + Master: master, + InternalDNSSuffix: dnsInternalSuffix, + InternalIP: internalIP, + //MasterID : fromVolume + //EtcdClusters : fromVolume + + Volumes: volumes, + DNS: dns, + } + + err = k.Bootstrap() if err != nil { glog.Errorf("Error during bootstrap: %q", err) os.Exit(1) } - glog.Infof("Bootstrap complete; starting kubelet") + glog.Infof("Bootstrap complete; applying configuration") + err = k.ApplyModel() + if err != nil { + glog.Errorf("Error during configuration: %q", err) + os.Exit(1) + } - err = kubeboot.RunBootstrapTasks() + glog.Infof("Bootstrap complete; starting kubelet") + err = k.RunBootstrapTasks() if err != nil { glog.Errorf("Error during bootstrap: %q", err) os.Exit(1) @@ -58,3 +117,66 @@ func main() { glog.Infof("Unexpected exited from kubelet run") os.Exit(1) } + +// TODO: run with --net=host ?? +func findInternalIP() (net.IP, error) { + var ips []net.IP + + networkInterfaces, err := net.Interfaces() + if err != nil { + return nil, fmt.Errorf("error querying interfaces to determine internal ip: %v", err) + } + + for i := range networkInterfaces { + networkInterface := &networkInterfaces[i] + flags := networkInterface.Flags + name := networkInterface.Name + + if (flags & net.FlagLoopback) != 0 { + glog.V(2).Infof("Ignoring interface %s - loopback", name) + continue + } + + // Not a lot else to go on... + if !strings.HasPrefix(name, "eth") { + glog.V(2).Infof("Ignoring interface %s - name does not look like ethernet device", name) + continue + } + + addrs, err := networkInterface.Addrs() + if err != nil { + return nil, fmt.Errorf("error querying network interface %s for IP adddresses: %v", name, err) + } + + for _, addr := range addrs { + ip, _, err := net.ParseCIDR(addr.String()) + if err != nil { + return nil, fmt.Errorf("error parsing address %s on network interface %s: %v", addr.String(), name, err) + } + + if ip.IsLoopback() { + glog.V(2).Infof("Ignoring address %s (loopback)", ip) + continue + } + + if ip.IsLinkLocalMulticast() || ip.IsLinkLocalUnicast() { + glog.V(2).Infof("Ignoring address %s (link-local)", ip) + continue + } + + ips = append(ips, ip) + } + } + + if len(ips) == 0 { + return nil, fmt.Errorf("unable to determine internal ip (no adddresses found)") + } + + if len(ips) != 1 { + glog.Warningf("Found multiple internal IPs; making arbitrary choice") + for _, ip := range ips { + glog.Warningf("\tip: %s", ip.String()) + } + } + return ips[0], nil +} diff --git a/protokube/glide.lock b/protokube/glide.lock index e526bedb4a..e6ba164bbe 100644 --- a/protokube/glide.lock +++ b/protokube/glide.lock @@ -1,5 +1,5 @@ -hash: d51b01457dd5499bc488e3a367f00fd9e935cdf125296e9451bbf39458fe255a -updated: 2016-05-29T08:36:17.761287445-04:00 +hash: 212c116624840fbcaaf42e4f11b9f159e3426b8dffbb8b7747a1615ea68e7fb5 +updated: 2016-06-05T15:03:24.971484029-04:00 imports: - name: github.com/aws/aws-sdk-go version: c924893c38ecc04b18d7aab8a7aa561cb8b4c4cc @@ -9,6 +9,7 @@ imports: - aws/request - aws/session - service/ec2 + - service/route53 - aws/awserr - aws/credentials - aws/client @@ -21,10 +22,16 @@ imports: - private/protocol/ec2query - private/signer/v4 - private/waiter + - private/protocol/restxml - aws/credentials/ec2rolecreds - private/protocol/query/queryutil - private/protocol/xml/xmlutil - private/protocol/rest + - private/protocol/query +- name: github.com/cloudfoundry-incubator/candiedyaml + version: 99c3df83b51532e3615f851d8c2dbb638f5313bf +- name: github.com/ghodss/yaml + version: aa0c862057666179de291b67d9f093d12b5a8473 - name: github.com/go-ini/ini version: 2e44421e256d82ebbf3d4d4fcabe8930b905eff3 - name: github.com/golang/glog @@ -32,7 +39,7 @@ imports: - name: github.com/jmespath/go-jmespath version: 3433f3ea46d9f8019119e7dd41274e112a2359a9 - name: k8s.io/kubernetes - version: a99e4ca79334cefd5fbdd0fd7f78a6b2595f2fde + version: 56af9acd6f0a36f974fe8a9d0bd06049b14d5d19 subpackages: - pkg/util/exec - pkg/util/mount diff --git a/protokube/glide.yaml b/protokube/glide.yaml index e8d280f8d6..35f2ec0780 100644 --- a/protokube/glide.yaml +++ b/protokube/glide.yaml @@ -12,3 +12,4 @@ import: subpackages: - pkg/util/exec - pkg/util/mount +- package: github.com/ghodss/yaml diff --git a/protokube/images/builder/Dockerfile b/protokube/images/builder/Dockerfile new file mode 100644 index 0000000000..e8471317a4 --- /dev/null +++ b/protokube/images/builder/Dockerfile @@ -0,0 +1,14 @@ +FROM debian:jessie + +# Install packages: +# curl (to download glide & golang) +# git, mercurial (for go get) +RUN apt-get update && apt-get install --yes curl mercurial git gcc make + +# Install golang +RUN curl -L https://storage.googleapis.com/golang/go1.6.2.linux-amd64.tar.gz | tar zx -C /usr/local +ENV PATH $PATH:/usr/local/go/bin +# Install glide +RUN curl -L https://github.com/Masterminds/glide/releases/download/0.10.2/glide-0.10.2-linux-amd64.tar.gz | tar zx --strip-components 1 -C /usr/bin + +COPY images/builder/onbuild.sh /onbuild.sh diff --git a/protokube/images/builder/onbuild.sh b/protokube/images/builder/onbuild.sh new file mode 100755 index 0000000000..908e8a1cfe --- /dev/null +++ b/protokube/images/builder/onbuild.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +mkdir -p /go +export GOPATH=/go + +mkdir -p /go/src/k8s.io/kube-deploy +ln -s /src/ /go/src/k8s.io/kube-deploy/protokube + +ls -lR /go/src/k8s.io/kube-deploy/protokube/cmd/ + +cd /go/src/k8s.io/kube-deploy/protokube/ +make gocode + +mkdir -p /src/.build/artifacts/ +cp /go/bin/protokube /src/.build/artifacts/ diff --git a/protokube/images/protokube/Dockerfile b/protokube/images/protokube/Dockerfile new file mode 100644 index 0000000000..e9474b3e84 --- /dev/null +++ b/protokube/images/protokube/Dockerfile @@ -0,0 +1,12 @@ +FROM debian:jessie + +# ca-certificates: Needed to talk to EC2 API +# e2fsprogs: Needed to mount / format ext4 filesytems +RUN apt-get update && apt-get install --yes ca-certificates e2fsprogs + +COPY model/ /model/ +COPY templates/ /templates/ +COPY /.build/artifacts/protokube /usr/bin/protokube + +CMD /usr/bin/protokube + diff --git a/protokube/model/etcd/events.config b/protokube/model/etcd/events.config new file mode 100644 index 0000000000..226c35cdcc --- /dev/null +++ b/protokube/model/etcd/events.config @@ -0,0 +1,5 @@ +ClusterName: etcd-events +ClientPort: 4002 +PeerPort: 2381 +DataDirName: data-events +PodName: etcd-server-events diff --git a/protokube/model/etcd/main.config b/protokube/model/etcd/main.config new file mode 100644 index 0000000000..34151875b7 --- /dev/null +++ b/protokube/model/etcd/main.config @@ -0,0 +1,5 @@ +ClusterName: etcd +ClientPort: 4001 +PeerPort: 2380 +DataDirName: data +PodName: etcd-server diff --git a/protokube/pkg/protokube/aws_dns.go b/protokube/pkg/protokube/aws_dns.go new file mode 100644 index 0000000000..bfc83c1f9f --- /dev/null +++ b/protokube/pkg/protokube/aws_dns.go @@ -0,0 +1,117 @@ +package protokube + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/route53" + "github.com/golang/glog" + "strings" + "time" +) + +type Route53DNSProvider struct { + client *route53.Route53 + + zoneName string + zone *route53.HostedZone +} + +func NewRoute53DNSProvider(zoneName string) (*Route53DNSProvider, error) { + if zoneName == "" { + return nil, fmt.Errorf("zone name is required") + } + + p := &Route53DNSProvider{ + zoneName: zoneName, + } + + s := session.New() + s.Handlers.Send.PushFront(func(r *request.Request) { + // Log requests + glog.V(4).Infof("AWS API Request: %s/%s", r.ClientInfo.ServiceName, r.Operation.Name) + }) + + config := aws.NewConfig() + + p.client = route53.New(s, config) + + return p, nil +} + +func (p *Route53DNSProvider) getZone() (*route53.HostedZone, error) { + if p.zone != nil { + return p.zone, nil + } + + findZone := p.zoneName + if !strings.HasSuffix(findZone, ".") { + findZone += "." + } + request := &route53.ListHostedZonesByNameInput{ + DNSName: aws.String(findZone), + } + + response, err := p.client.ListHostedZonesByName(request) + if err != nil { + return nil, fmt.Errorf("error querying for DNS HostedZones %q: %v", findZone, err) + } + + var zones []*route53.HostedZone + for _, zone := range response.HostedZones { + if aws.StringValue(zone.Name) == findZone { + zones = append(zones, zone) + } + } + if len(zones) == 0 { + return nil, nil + } + if len(zones) != 1 { + return nil, fmt.Errorf("found multiple hosted zones matched name %q", findZone) + } + + p.zone = zones[0] + + return p.zone, nil +} + +func (p *Route53DNSProvider) Set(fqdn string, recordType string, value string, ttl time.Duration) error { + zone, err := p.getZone() + if err != nil { + return err + } + + rrs := &route53.ResourceRecordSet{ + Name: aws.String(fqdn), + Type: aws.String(recordType), + TTL: aws.Int64(int64(ttl.Seconds())), + ResourceRecords: []*route53.ResourceRecord{ + {Value: aws.String(value)}, + }, + } + + change := &route53.Change{ + Action: aws.String("UPSERT"), + ResourceRecordSet: rrs, + } + + changeBatch := &route53.ChangeBatch{} + changeBatch.Changes = []*route53.Change{change} + + request := &route53.ChangeResourceRecordSetsInput{} + request.HostedZoneId = zone.Id + request.ChangeBatch = changeBatch + + glog.V(2).Infof("Updating DNS record %q", fqdn) + glog.V(4).Infof("route53 request: %s", DebugString(request)) + + response, err := p.client.ChangeResourceRecordSets(request) + if err != nil { + return fmt.Errorf("error creating ResourceRecordSets: %v", err) + } + + glog.V(2).Infof("Change id is %q", aws.StringValue(response.ChangeInfo.Id)) + + return nil +} diff --git a/protokube/pkg/protokube/aws_volume.go b/protokube/pkg/protokube/aws_volume.go index 79099cf045..8fdc7bde28 100644 --- a/protokube/pkg/protokube/aws_volume.go +++ b/protokube/pkg/protokube/aws_volume.go @@ -8,6 +8,9 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" "github.com/golang/glog" + "net" + "strconv" + "strings" "time" ) @@ -17,6 +20,10 @@ const TagNameKubernetesCluster = "KubernetesCluster" // The tag name we use for specifying that something is in the master role const TagNameRoleMaster = "k8s.io/role/master" +const TagNameEtcdClusterPrefix = "k8s.io/etcd/" + +const TagNameMasterId = "k8s.io/master/id" + const DefaultAttachDevice = "/dev/xvdb" type AWSVolumes struct { @@ -26,6 +33,7 @@ type AWSVolumes struct { zone string clusterTag string instanceId string + internalIP net.IP } var _ Volumes = &AWSVolumes{} @@ -36,7 +44,7 @@ func NewAWSVolumes() (*AWSVolumes, error) { s := session.New() s.Handlers.Send.PushFront(func(r *request.Request) { // Log requests - glog.V(4).Infof("AWS API Request: %s/%s", r.ClientInfo.ServiceName, r.Operation) + glog.V(4).Infof("AWS API Request: %s/%s", r.ClientInfo.ServiceName, r.Operation.Name) }) config := aws.NewConfig() @@ -67,6 +75,14 @@ func NewAWSVolumes() (*AWSVolumes, error) { return a, nil } +func (a *AWSVolumes) ClusterID() string { + return a.clusterTag +} + +func (a *AWSVolumes) InternalIP() net.IP { + return a.internalIP +} + func (a *AWSVolumes) discoverTags() error { instance, err := a.describeInstance() if err != nil { @@ -85,6 +101,11 @@ func (a *AWSVolumes) discoverTags() error { a.clusterTag = clusterID + a.internalIP = net.ParseIP(aws.StringValue(instance.PrivateIpAddress)) + if a.internalIP == nil { + return fmt.Errorf("Internal IP not found on this instance (%q)", a.instanceId) + } + return nil } @@ -121,20 +142,23 @@ func newEc2Filter(name string, value string) *ec2.Filter { return filter } -func (a *AWSVolumes) FindMountedVolumes() ([]*Volume, error) { - request := &ec2.DescribeVolumesInput{} - request.Filters = []*ec2.Filter{ - newEc2Filter("tag:"+TagNameKubernetesCluster, a.clusterTag), - newEc2Filter("tag-key", TagNameRoleMaster), - newEc2Filter("attachment.instance-id", a.instanceId), - } - +func (a *AWSVolumes) findVolumes(request *ec2.DescribeVolumesInput) ([]*Volume, error) { var volumes []*Volume err := a.ec2.DescribeVolumesPages(request, func(p *ec2.DescribeVolumesOutput, lastPage bool) (shouldContinue bool) { for _, v := range p.Volumes { + name := aws.StringValue(v.VolumeId) vol := &Volume{ - Name: aws.StringValue(v.VolumeId), - Available: false, + Name: name, + Info: VolumeInfo{ + Name: name, + }, + } + state := aws.StringValue(v.State) + + switch state { + case "available": + vol.Available = true + break } var myAttachment *ec2.VolumeAttachment @@ -145,13 +169,46 @@ func (a *AWSVolumes) FindMountedVolumes() ([]*Volume, error) { } } - if myAttachment == nil { - glog.Warningf("Requested volumes attached to this instance, but volume %q was returned that was not attached", a.instanceId) - continue + if myAttachment != nil { + vol.Device = aws.StringValue(myAttachment.Device) } - vol.Device = aws.StringValue(myAttachment.Device) - volumes = append(volumes, vol) + skipVolume := false + + for _, tag := range v.Tags { + k := aws.StringValue(tag.Key) + v := aws.StringValue(tag.Value) + + switch k { + case TagNameKubernetesCluster, TagNameRoleMaster, "Name": + // Ignore + case TagNameMasterId: + id, err := strconv.Atoi(v) + if err != nil { + glog.Warningf("error parsing master-id tag on volume %q %s=%s; skipping volume", name, k, v) + skipVolume = true + } else { + vol.Info.MasterID = id + } + default: + if strings.HasPrefix(k, TagNameEtcdClusterPrefix) { + etcdClusterName := k[len(TagNameEtcdClusterPrefix):] + spec, err := ParseEtcdClusterSpec(etcdClusterName, v) + if err != nil { + // Fail safe + glog.Warningf("error parsing etcd cluster tag %q on volume %q; skipping volume: %v", v, name, err) + skipVolume = true + } + vol.Info.EtcdClusters = append(vol.Info.EtcdClusters, spec) + } else { + glog.Warningf("unknown tag on volume %q: %s=%s", name, k, v) + } + } + } + + if !skipVolume { + volumes = append(volumes, vol) + } } return true }) @@ -162,6 +219,17 @@ func (a *AWSVolumes) FindMountedVolumes() ([]*Volume, error) { return volumes, nil } +func (a *AWSVolumes) FindMountedVolumes() ([]*Volume, error) { + request := &ec2.DescribeVolumesInput{} + request.Filters = []*ec2.Filter{ + newEc2Filter("tag:"+TagNameKubernetesCluster, a.clusterTag), + newEc2Filter("tag-key", TagNameRoleMaster), + newEc2Filter("attachment.instance-id", a.instanceId), + } + + return a.findVolumes(request) +} + func (a *AWSVolumes) FindMountableVolumes() ([]*Volume, error) { request := &ec2.DescribeVolumesInput{} request.Filters = []*ec2.Filter{ @@ -170,29 +238,7 @@ func (a *AWSVolumes) FindMountableVolumes() ([]*Volume, error) { newEc2Filter("availability-zone", a.zone), } - var volumes []*Volume - err := a.ec2.DescribeVolumesPages(request, func(p *ec2.DescribeVolumesOutput, lastPage bool) (shouldContinue bool) { - for _, v := range p.Volumes { - vol := &Volume{ - Name: aws.StringValue(v.VolumeId), - } - state := aws.StringValue(v.State) - - switch state { - case "available": - vol.Available = true - break - } - - volumes = append(volumes, vol) - } - return true - }) - - if err != nil { - return nil, fmt.Errorf("error querying for EC2 volumes: %v", err) - } - return volumes, nil + return a.findVolumes(request) } // AttachVolume attaches the specified volume to this instance, returning the mountpoint & nil if successful @@ -219,8 +265,6 @@ func (a *AWSVolumes) AttachVolume(volume *Volume) (string, error) { // Wait (forever) for volume to attach or reach a failure-to-attach condition for { - time.Sleep(10 * time.Second) - request := &ec2.DescribeVolumesInput{ VolumeIds: []*string{&volumeID}, } @@ -254,5 +298,7 @@ func (a *AWSVolumes) AttachVolume(volume *Volume) (string, error) { default: return "", fmt.Errorf("Observed unexpected volume state %q", attachmentState) } + + time.Sleep(10 * time.Second) } } diff --git a/protokube/pkg/protokube/etcd_cluster.go b/protokube/pkg/protokube/etcd_cluster.go new file mode 100644 index 0000000000..438f52947d --- /dev/null +++ b/protokube/pkg/protokube/etcd_cluster.go @@ -0,0 +1,153 @@ +package protokube + +import ( + "fmt" + "github.com/ghodss/yaml" + "io/ioutil" + "os" + "path" + "strings" +) + +type EtcdCluster struct { + PeerPort int + ClientPort int + LogFile string + DataDirName string + ClusterName string + ClusterToken string + Me *EtcdNode + Nodes []*EtcdNode + PodName string + + Spec *EtcdClusterSpec +} + +func (e *EtcdCluster) String() string { + return DebugString(e) +} + +type EtcdNode struct { + Name string + InternalName string +} + +func (e *EtcdNode) String() string { + return DebugString(e) +} + +func (k *KubeBoot) BuildEtcdClusters(modelDir string) ([]*EtcdCluster, error) { + var clusters []*EtcdCluster + + for _, spec := range k.EtcdClusters { + modelTemplatePath := path.Join(modelDir, spec.ClusterKey+".config") + modelTemplate, err := ioutil.ReadFile(modelTemplatePath) + if err != nil { + return nil, fmt.Errorf("error reading model template %q: %v", modelTemplatePath, err) + } + + cluster := &EtcdCluster{} + cluster.Spec = spec + + model, err := ExecuteTemplate("model-etcd-"+spec.ClusterKey, string(modelTemplate), cluster) + if err != nil { + return nil, fmt.Errorf("error executing etcd model template %q: %v", modelTemplatePath, err) + } + + err = yaml.Unmarshal([]byte(model), cluster) + if err != nil { + return nil, fmt.Errorf("error parsing etcd model template %q: %v", modelTemplatePath, err) + } + + clusters = append(clusters, cluster) + } + + return clusters, nil +} + +func (c *EtcdCluster) configure(k *KubeBoot) error { + name := c.ClusterName + if !strings.HasPrefix(name, "etcd") { + // For sanity, and to avoid collisions in directories / dns + return fmt.Errorf("unexpected name for etcd cluster (must start with etcd): %q", name) + } + if c.LogFile == "" { + c.LogFile = "/var/log/" + name + ".log" + } + + if c.PodName == "" { + c.PodName = c.ClusterName + } + + err := touchFile(k.PathFor(c.LogFile)) + if err != nil { + return fmt.Errorf("error touching log-file %q: %v", c.LogFile, err) + } + + if c.ClusterToken == "" { + c.ClusterToken = "etcd-cluster-token-" + name + } + + for _, nodeName := range c.Spec.NodeNames { + name := name + "-" + nodeName + fqdn := k.BuildInternalDNSName(name) + + node := &EtcdNode{ + Name: name, + InternalName: fqdn, + } + c.Nodes = append(c.Nodes, node) + + if nodeName == c.Spec.NodeName { + c.Me = node + + err := k.MapInternalDNSName(fqdn) + if err != nil { + return fmt.Errorf("error mapping internal dns name for %q: %v", name, err) + } + } + } + + if c.Me == nil { + return fmt.Errorf("my node name %s not found in cluster %v", c.Spec.NodeName, strings.Join(c.Spec.NodeNames, ",")) + } + + manifestTemplatePath := "templates/etcd/manifest.template" + manifestTemplate, err := ioutil.ReadFile(manifestTemplatePath) + if err != nil { + return fmt.Errorf("error reading etcd manifest template %q: %v", manifestTemplatePath, err) + } + manifest, err := ExecuteTemplate("etcd-manifest", string(manifestTemplate), c) + if err != nil { + return fmt.Errorf("error executing etcd manifest template: %v", err) + } + + manifestPath := "/etc/kubernetes/manifests/" + name + ".manifest" + err = ioutil.WriteFile(k.PathFor(manifestPath), []byte(manifest), 0644) + if err != nil { + return fmt.Errorf("error writing etcd manifest %q: %v", manifestPath, err) + } + + return nil +} + +func touchFile(p string) error { + _, err := os.Lstat(p) + if err == nil { + return nil + } + + if !os.IsNotExist(err) { + return fmt.Errorf("error getting state of file %q: %v", p, err) + } + + f, err := os.Create(p) + if err != nil { + return fmt.Errorf("error touching file %q: %v", p, err) + } + err = f.Close() + if err != nil { + return fmt.Errorf("error closing touched file %q: %v", p, err) + } + return nil +} diff --git a/protokube/pkg/protokube/kube_boot.go b/protokube/pkg/protokube/kube_boot.go index c44970bd2f..dcef79bc45 100644 --- a/protokube/pkg/protokube/kube_boot.go +++ b/protokube/pkg/protokube/kube_boot.go @@ -2,20 +2,33 @@ package protokube import ( "github.com/golang/glog" + "net" "time" ) type KubeBoot struct { - master bool - volumes Volumes + Containerized bool + RootFS string + + Master bool + InternalDNSSuffix string + InternalIP net.IP + MasterID int + EtcdClusters []*EtcdClusterSpec + + Volumes Volumes + DNS DNSProvider } -func NewKubeBoot(master bool, volumes Volumes) *KubeBoot { - k := &KubeBoot{ - master: master, - volumes: volumes, +func (k *KubeBoot) PathFor(hostPath string) string { + if hostPath[0] != '/' { + glog.Fatalf("path was not absolute: %q", hostPath) } - return k + return k.RootFS + hostPath[1:] +} + +func (k *KubeBoot) String() string { + return DebugString(k) } func (k *KubeBoot) Bootstrap() error { @@ -36,8 +49,8 @@ func (k *KubeBoot) Bootstrap() error { } func (k *KubeBoot) tryBootstrap() (bool, error) { - if k.master { - mountpoint, err := k.mountMasterVolume() + if k.Master { + volumeInfo, mountpoint, err := k.mountMasterVolume() if err != nil { return false, err } @@ -47,7 +60,16 @@ func (k *KubeBoot) tryBootstrap() (bool, error) { return false, nil } - glog.Infof("mounted master on %s", mountpoint) + glog.Infof("mounted master volume %q on %s", volumeInfo.Name, mountpoint) + + // Copy roles from volume + k.EtcdClusters = volumeInfo.EtcdClusters + for _, etcdClusterSpec := range volumeInfo.EtcdClusters { + glog.Infof("Found etcd cluster spec on volume: %v", etcdClusterSpec) + } + + k.MasterID = volumeInfo.MasterID + // TODO: Should we set up symlinks here? } diff --git a/protokube/pkg/protokube/kube_boot_task.go b/protokube/pkg/protokube/kube_boot_task.go index a63174ff30..852510d57b 100644 --- a/protokube/pkg/protokube/kube_boot_task.go +++ b/protokube/pkg/protokube/kube_boot_task.go @@ -9,7 +9,9 @@ import ( "os" "os/exec" "path" + "strings" "sync" + "time" ) const BootstrapDir = "/etc/kubernetes/bootstrap" @@ -18,12 +20,32 @@ type BootstrapTask struct { Command []string `json:"command"` } +func (b *BootstrapTask) String() string { + return DebugString(b) +} + // RunKubelet runs the bootstrap tasks, and watches them until they exit // Currently only one task is supported / will work properly func (k *KubeBoot) RunBootstrapTasks() error { - dirs, err := ioutil.ReadDir(BootstrapDir) - if err != nil { - return fmt.Errorf("error listing %q: %v", BootstrapDir, err) + bootstrapDir := k.PathFor(BootstrapDir) + + var dirs []os.FileInfo + var err error + + for { + dirs, err = ioutil.ReadDir(bootstrapDir) + if err != nil { + if os.IsNotExist(err) { + dirs = nil + } else { + return fmt.Errorf("error listing %q: %v", bootstrapDir, err) + } + } + if len(dirs) != 0 { + break + } + glog.Infof("No entries found in %q", BootstrapDir) + time.Sleep(10 * time.Second) } for _, dir := range dirs { @@ -31,7 +53,7 @@ func (k *KubeBoot) RunBootstrapTasks() error { continue } - p := path.Join(BootstrapDir, dir.Name()) + p := path.Join(bootstrapDir, dir.Name()) files, err := ioutil.ReadDir(p) if err != nil { return fmt.Errorf("error listing %q: %v", p, err) @@ -53,6 +75,7 @@ func (k *KubeBoot) RunBootstrapTasks() error { return fmt.Errorf("error running bootstrap task %q: %v", fp, err) } } + return nil } @@ -91,7 +114,7 @@ func (k *KubeBoot) runBootstrapTask(path string) error { err = cmd.Start() if err != nil { - return fmt.Errorf("error starting command %q: %v", task.Command, err) + return fmt.Errorf("error starting command %q: %v", strings.Join(task.Command, " "), err) } go copyStream(os.Stdout, stdout, wg) diff --git a/protokube/pkg/protokube/kube_boot_volumes.go b/protokube/pkg/protokube/kube_boot_volumes.go index 2055684f3f..502a1f7cc4 100644 --- a/protokube/pkg/protokube/kube_boot_volumes.go +++ b/protokube/pkg/protokube/kube_boot_volumes.go @@ -9,36 +9,38 @@ import ( "time" ) -const MasterMountpoint = "/master-pd" +const MasterMountpoint = "/mnt/master-pd" -func (k *KubeBoot) mountMasterVolume() (string, error) { +func (k *KubeBoot) mountMasterVolume() (*VolumeInfo, string, error) { // TODO: mount ephemeral volumes (particular on AWS)? // Mount a master volume - device, err := k.attachMasterVolume() + volume, device, err := k.attachMasterVolume() if err != nil { - return "", fmt.Errorf("unable to attach master volume: %q", err) + return nil, "", fmt.Errorf("unable to attach master volume: %q", err) } if device == "" { - return "", nil + return nil, "", nil } - glog.V(2).Infof("Master volume is attached at %q", device) + glog.V(2).Infof("Master volume %q is attached at %q", volume.Name, device) + glog.Infof("Doing safe-format-and-mount of %s to %s", device, MasterMountpoint) fstype := "" err = k.safeFormatAndMount(device, MasterMountpoint, fstype) if err != nil { - return "", fmt.Errorf("unable to mount master volume: %q", err) + return nil, "", fmt.Errorf("unable to mount master volume: %q", err) } - return MasterMountpoint, nil + return volume, MasterMountpoint, nil } func (k *KubeBoot) safeFormatAndMount(device string, mountpoint string, fstype string) error { // Wait for the device to show up + for { - _, err := os.Stat(device) + _, err := os.Stat(k.PathFor(device)) if err == nil { break } @@ -50,21 +52,35 @@ func (k *KubeBoot) safeFormatAndMount(device string, mountpoint string, fstype s } glog.Infof("Found device %q", device) - // Mount the device + //// Mount the device + //var mounter mount.Interface + //runner := exec.New() + //if k.Containerized { + // mounter = mount.NewNsenterMounter() + // runner = NewChrootRunner(runner, "/rootfs") + //} else { + // mounter = mount.New() + //} - mounter := &mount.SafeFormatAndMount{Interface: mount.New(), Runner: exec.New()} + // If we are containerized, we still first SafeFormatAndMount in our namespace + // This is because SafeFormatAndMount doesn't seem to work in a container + safeFormatAndMount := &mount.SafeFormatAndMount{Interface: mount.New(), Runner: exec.New()} - // Only mount the PD globally once. - notMnt, err := mounter.IsLikelyNotMountPoint(mountpoint) + // Check if it is already mounted + mounts, err := safeFormatAndMount.List() if err != nil { - if os.IsNotExist(err) { - glog.Infof("Creating mount directory %q", mountpoint) - if err := os.MkdirAll(mountpoint, 0750); err != nil { - return err - } - notMnt = true - } else { - return err + return fmt.Errorf("error listing existing mounts: %v", err) + } + + // Note: IsLikelyNotMountPoint is not containerized + + findMountpoint := k.PathFor(mountpoint) + var existing []*mount.MountPoint + for i := range mounts { + m := &mounts[i] + glog.V(2).Infof("found existing mount: %v", m) + if m.Path == findMountpoint { + existing = append(existing, m) } } @@ -72,30 +88,32 @@ func (k *KubeBoot) safeFormatAndMount(device string, mountpoint string, fstype s //if readOnly { // options = append(options, "ro") //} - if notMnt { - glog.Infof("Mounting device %q on %q", device, mountpoint) + if len(existing) == 0 { + glog.Infof("Creating mount directory %q", k.PathFor(mountpoint)) + if err := os.MkdirAll(k.PathFor(mountpoint), 0750); err != nil { + return err + } - err = mounter.FormatAndMount(device, mountpoint, fstype, options) + glog.Infof("Mounting device %q on %q", k.PathFor(device), k.PathFor(mountpoint)) + + err = safeFormatAndMount.FormatAndMount(k.PathFor(device), k.PathFor(mountpoint), fstype, options) if err != nil { //os.Remove(mountpoint) - return fmt.Errorf("error formatting and mounting disk %q on %q: %v", device, mountpoint, err) + return fmt.Errorf("error formatting and mounting disk %q on %q: %v", k.PathFor(device), k.PathFor(mountpoint), err) + } + + // If we are containerized, we then also mount it into the host + if k.Containerized { + hostMounter := mount.NewNsenterMounter() + err = hostMounter.Mount(device, mountpoint, fstype, options) + if err != nil { + //os.Remove(mountpoint) + return fmt.Errorf("error formatting and mounting disk %q on %q in host: %v", device, mountpoint, err) + } } } else { glog.Infof("Device already mounted on : %q, verifying it is our device", mountpoint) - mounts, err := mounter.List() - if err != nil { - return fmt.Errorf("error listing existing mounts: %v", err) - } - - var existing []*mount.MountPoint - for i := range mounts { - m := &mounts[i] - if m.Path == mountpoint { - existing = append(existing, m) - } - } - if len(existing) != 1 { glog.Infof("Existing mounts unexpected") @@ -103,11 +121,7 @@ func (k *KubeBoot) safeFormatAndMount(device string, mountpoint string, fstype s m := &mounts[i] glog.Infof("%s\t%s", m.Device, m.Path) } - } - if len(existing) == 0 { - return fmt.Errorf("Unable to find existing mount of %q at %q", device, mountpoint) - } else if len(existing) != 1 { return fmt.Errorf("Found multiple existing mounts of %q at %q", device, mountpoint) } else { glog.Infof("Found existing mount of %q and %q", device, mountpoint) @@ -117,10 +131,10 @@ func (k *KubeBoot) safeFormatAndMount(device string, mountpoint string, fstype s return nil } -func (k *KubeBoot) attachMasterVolume() (string, error) { - volumes, err := k.volumes.FindMountedVolumes() +func (k *KubeBoot) attachMasterVolume() (*VolumeInfo, string, error) { + volumes, err := k.Volumes.FindMountedVolumes() if err != nil { - return "", err + return nil, "", err } if len(volumes) != 0 { @@ -129,23 +143,25 @@ func (k *KubeBoot) attachMasterVolume() (string, error) { glog.Warningf("Found multiple master volumes: %v", volumes) } - glog.V(2).Infof("Found master volume already attached: %q", volumes[0].Name) + volume := volumes[0] - device, err := k.volumes.AttachVolume(volumes[0]) + glog.V(2).Infof("Found master volume already attached: %q", volume.Name) + + device, err := k.Volumes.AttachVolume(volume) if err != nil { - return "", fmt.Errorf("Error attaching volume %q: %v", volumes[0].Name, err) + return nil, "", fmt.Errorf("Error attaching volume %q: %v", volume.Name, err) } - return device, nil + return &volume.Info, device, nil } - volumes, err = k.volumes.FindMountableVolumes() + volumes, err = k.Volumes.FindMountableVolumes() if err != nil { - return "", err + return nil, "", err } if len(volumes) == 0 { glog.Infof("No available master volumes") - return "", nil + return nil, "", nil } for _, volume := range volumes { @@ -155,12 +171,12 @@ func (k *KubeBoot) attachMasterVolume() (string, error) { glog.V(2).Infof("Trying to mount master volume: %q", volume.Name) - device, err := k.volumes.AttachVolume(volume) + device, err := k.Volumes.AttachVolume(volume) if err != nil { - return "", fmt.Errorf("Error attaching volume %q: %v", volume.Name, err) + return nil, "", fmt.Errorf("Error attaching volume %q: %v", volume.Name, err) } - return device, nil + return &volume.Info, device, nil } - return "", nil + return nil, "", nil } diff --git a/protokube/pkg/protokube/kube_dns.go b/protokube/pkg/protokube/kube_dns.go new file mode 100644 index 0000000000..ccc8cf547a --- /dev/null +++ b/protokube/pkg/protokube/kube_dns.go @@ -0,0 +1,27 @@ +package protokube + +import ( + "fmt" + "time" +) + +const defaultTTL = time.Minute + +type DNSProvider interface { + Set(fqdn string, recordType string, value string, ttl time.Duration) error +} + +// MapInternalName maps a FQDN to the internal IP address of the current machine +func (k *KubeBoot) MapInternalDNSName(fqdn string) error { + err := k.DNS.Set(fqdn, "A", k.InternalIP.String(), defaultTTL) + if err != nil { + return fmt.Errorf("error configuring DNS name %q: %v", fqdn, err) + } + return nil +} + +// BuildInternalDNSName builds a DNS name for use inside the cluster, adding our internal DNS suffix to the key, +func (k *KubeBoot) BuildInternalDNSName(key string) string { + fqdn := key + k.InternalDNSSuffix + return fqdn +} diff --git a/protokube/pkg/protokube/kube_model.go b/protokube/pkg/protokube/kube_model.go new file mode 100644 index 0000000000..1b1c57bcf2 --- /dev/null +++ b/protokube/pkg/protokube/kube_model.go @@ -0,0 +1,26 @@ +package protokube + +import ( + "fmt" + "github.com/golang/glog" +) + +// ApplyModel applies the configuration as specified in the model +func (k *KubeBoot) ApplyModel() error { + modelDir := "model/etcd" + + etcdClusters, err := k.BuildEtcdClusters(modelDir) + if err != nil { + return fmt.Errorf("error building etcd models: %v", err) + } + + for _, etcdCluster := range etcdClusters { + glog.Infof("configuring etcd cluster %s", etcdCluster.ClusterName) + err := etcdCluster.configure(k) + if err != nil { + return fmt.Errorf("error applying etcd model: %v", err) + } + } + + return nil +} diff --git a/protokube/pkg/protokube/models.go b/protokube/pkg/protokube/models.go new file mode 100644 index 0000000000..d1e666e7b4 --- /dev/null +++ b/protokube/pkg/protokube/models.go @@ -0,0 +1,29 @@ +package protokube + +import ( + "bytes" + "fmt" + "text/template" +) + +func ExecuteTemplate(key string, contents string, model interface{}) (string, error) { + t := template.New(key) + + //funcMap := make(template.FuncMap) + //t.Funcs(funcMap) + + _, err := t.Parse(contents) + if err != nil { + return "", fmt.Errorf("error parsing template %q: %v", key, err) + } + + t.Option("missingkey=zero") + + var buffer bytes.Buffer + err = t.ExecuteTemplate(&buffer, key, model) + if err != nil { + return "", fmt.Errorf("error executing template %q: %v", key, err) + } + + return buffer.String(), nil +} diff --git a/protokube/pkg/protokube/utils.go b/protokube/pkg/protokube/utils.go new file mode 100644 index 0000000000..09fc7a33b0 --- /dev/null +++ b/protokube/pkg/protokube/utils.go @@ -0,0 +1,14 @@ +package protokube + +import ( + "encoding/json" + "fmt" +) + +func DebugString(o interface{}) string { + b, err := json.Marshal(o) + if err != nil { + return fmt.Sprintf("error marshaling %T: %v", o, err) + } + return string(b) +} diff --git a/protokube/pkg/protokube/volumes.go b/protokube/pkg/protokube/volumes.go index 8a2aa0175b..05f2a4c152 100644 --- a/protokube/pkg/protokube/volumes.go +++ b/protokube/pkg/protokube/volumes.go @@ -1,5 +1,10 @@ package protokube +import ( + "fmt" + "strings" +) + type Volumes interface { AttachVolume(volume *Volume) (string, error) FindMountedVolumes() ([]*Volume, error) @@ -10,4 +15,63 @@ type Volume struct { Name string Device string Available bool + + Info VolumeInfo +} + +func (v *Volume) String() string { + return DebugString(v) +} + +type VolumeInfo struct { + Name string + MasterID int + // TODO: Maybe the events cluster can just be a PetSet - do we need it for boot? + EtcdClusters []*EtcdClusterSpec +} + +func (v *VolumeInfo) String() string { + return DebugString(v) +} + +type EtcdClusterSpec struct { + ClusterKey string + + NodeName string + NodeNames []string +} + +func (e *EtcdClusterSpec) String() string { + return DebugString(e) +} + +// Parses a tag on a volume that encodes an etcd cluster role +// The format is "/", e.g. "node1/node1,node2,node3" +func ParseEtcdClusterSpec(clusterKey, v string) (*EtcdClusterSpec, error) { + v = strings.TrimSpace(v) + + tokens := strings.Split(v, "/") + if len(tokens) != 2 { + return nil, fmt.Errorf("invalid EtcdClusterSpec (expected two tokens): %q", v) + } + + nodeName := tokens[0] + nodeNames := strings.Split(tokens[1], ",") + + found := false + for _, s := range nodeNames { + if s == nodeName { + found = true + } + } + if !found { + return nil, fmt.Errorf("invalid EtcdClusterSpec (member not found in all nodes): %q", v) + } + + c := &EtcdClusterSpec{ + ClusterKey: clusterKey, + NodeName: nodeName, + NodeNames: nodeNames, + } + return c, nil } diff --git a/protokube/templates/etcd/manifest.template b/protokube/templates/etcd/manifest.template new file mode 100644 index 0000000000..aa361caa66 --- /dev/null +++ b/protokube/templates/etcd/manifest.template @@ -0,0 +1,69 @@ +# etcd podspec +apiVersion: v1 +kind: Pod +metadata: + name: {{ .PodName }} + namespace: kube-system +spec: + hostNetwork: true + containers: + - name: etcd-container + image: gcr.io/google_containers/etcd:2.2.1 + resources: + requests: + cpu: 200m + command: + - /bin/sh + - -c + - /usr/local/bin/etcd 1>>/var/log/etcd.log 2>&1 + env: + - name: ETCD_NAME + value: {{ .Me.Name }} + - name: ETCD_DATA_DIR + value: /var/etcd/{{ .DataDirName}} + - name: ETCD_LISTEN_PEER_URLS + value: http://0.0.0.0:{{ .PeerPort }} + - name: ETCD_LISTEN_CLIENT_URLS + value: http://0.0.0.0:{{ .ClientPort }} + - name: ETCD_ADVERTISE_CLIENT_URLS + value: http://{{ .Me.InternalName }}:{{ .ClientPort }} + - name: ETCD_INITIAL_ADVERTISE_PEER_URLS + value: http://{{ .Me.InternalName }}:{{ .PeerPort }} + - name: ETCD_INITIAL_CLUSTER_STATE + value: new + - name: ETCD_INITIAL_CLUSTER_TOKEN + value: {{ .ClusterToken }} + - name: ETCD_INITIAL_CLUSTER + value: {{ range $index, $node := .Nodes -}} + {{- if $index }},{{ end -}} + {{ $node.Name }}=http://{{ $node.InternalName }}:{{ $.PeerPort }} + {{- end }} + livenessProbe: + httpGet: + host: 127.0.0.1 + port: {{ .ClientPort }} + path: /health + initialDelaySeconds: 15 + timeoutSeconds: 15 + ports: + - name: serverport + containerPort: {{ .PeerPort }} + hostPort: {{ .PeerPort }} + - name: clientport + containerPort: {{ .ClientPort }} + hostPort: {{ .ClientPort }} + volumeMounts: + - mountPath: /var/etcd + name: varetcd + readOnly: false + - mountPath: /var/log/etcd.log + name: varlogetcd + readOnly: false + volumes: + - name: varetcd + hostPath: + path: /mnt/master-pd/var/{{ .DataDirName }} + - name: varlogetcd + hostPath: + path: {{ .LogFile }} +