mirror of https://github.com/kubernetes/kops.git
Add experimental kube-discovery
To split off master discovery from protokube.
This commit is contained in:
parent
fcc904f468
commit
b8b426b800
30
WORKSPACE
30
WORKSPACE
|
|
@ -1,8 +1,38 @@
|
|||
#=============================================================================
|
||||
# Go rules
|
||||
|
||||
http_archive(
|
||||
name = "io_bazel_rules_go",
|
||||
url = "https://github.com/bazelbuild/rules_go/releases/download/0.8.1/rules_go-0.8.1.tar.gz",
|
||||
sha256 = "90bb270d0a92ed5c83558b2797346917c46547f6f7103e648941ecdb6b9d0e72",
|
||||
)
|
||||
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_rules_dependencies", "go_register_toolchains")
|
||||
go_rules_dependencies()
|
||||
go_register_toolchains()
|
||||
|
||||
#=============================================================================
|
||||
# Docker rules
|
||||
|
||||
git_repository(
|
||||
name = "io_bazel_rules_docker",
|
||||
remote = "https://github.com/bazelbuild/rules_docker.git",
|
||||
tag = "v0.3.0",
|
||||
)
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_docker//container:container.bzl",
|
||||
"container_pull",
|
||||
container_repositories = "repositories",
|
||||
)
|
||||
|
||||
container_repositories()
|
||||
|
||||
container_pull(
|
||||
name = "debian_hyperkube_base_amd64",
|
||||
registry = "gcr.io",
|
||||
repository = "google_containers/debian-hyperkube-base-amd64",
|
||||
# 'tag' is also supported, but digest is encouraged for reproducibility.
|
||||
digest = "sha256:fc1b461367730660ac5a40c1eb2d1b23221829acf8a892981c12361383b3742b",
|
||||
tag = "0.8",
|
||||
)
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ k8s.io/kops/federation
|
|||
k8s.io/kops/federation/model
|
||||
k8s.io/kops/federation/targets/kubernetestarget
|
||||
k8s.io/kops/federation/tasks
|
||||
k8s.io/kops/kube-discovery/cmd/kube-discovery
|
||||
k8s.io/kops/nodeup/pkg/bootstrap
|
||||
k8s.io/kops/nodeup/pkg/distros
|
||||
k8s.io/kops/nodeup/pkg/model
|
||||
|
|
|
|||
|
|
@ -0,0 +1,32 @@
|
|||
## kube-discovery
|
||||
|
||||
Status: experimental
|
||||
|
||||
kube-discovery does master discovery, currently on bare-metal. The intention is to split this functionality
|
||||
out of protokube, to make it reusable and modular.
|
||||
|
||||
Discovery methods:
|
||||
|
||||
* mDNS/DNS-SD (aka bonjour / zeroconf). Looks for services of type `_kubernetes._tcp`, with a name of clustername.
|
||||
|
||||
|
||||
## mDNS
|
||||
|
||||
Example avahi configuration
|
||||
|
||||
`/etc/avahi/services/kubernetes.service`
|
||||
|
||||
```
|
||||
<?xml version="1.0" standalone='no'?>
|
||||
<!DOCTYPE service-group SYSTEM "avahi-service.dtd">
|
||||
|
||||
<service-group>
|
||||
<name replace-wildcards="yes">example.k8s.local</name>
|
||||
|
||||
<service>
|
||||
<type>_kubernetes._tcp</type>
|
||||
<port>443</port>
|
||||
</service>
|
||||
|
||||
</service-group>
|
||||
```
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["main.go"],
|
||||
importpath = "k8s.io/kops/kube-discovery/cmd/kube-discovery",
|
||||
visibility = ["//visibility:private"],
|
||||
deps = [
|
||||
"//protokube/pkg/gossip/dns/hosts:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/github.com/miekg/dns:go_default_library",
|
||||
"//vendor/github.com/spf13/pflag:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_binary(
|
||||
name = "kube-discovery",
|
||||
importpath = "k8s.io/kops/kube-discovery/cmd/kube-discovery",
|
||||
library = ":go_default_library",
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
|
@ -0,0 +1,257 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/miekg/dns"
|
||||
"github.com/spf13/pflag"
|
||||
"k8s.io/kops/protokube/pkg/gossip/dns/hosts"
|
||||
)
|
||||
|
||||
var (
|
||||
flags = pflag.NewFlagSet("", pflag.ExitOnError)
|
||||
// BuildVersion is overwritten during build. This can be used to resolve issues.
|
||||
BuildVersion = "0.1"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Printf("kube-discovery version %s\n", BuildVersion)
|
||||
|
||||
if err := run(); err != nil {
|
||||
glog.Errorf("unexpected error: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
Containerized bool
|
||||
ClusterID string
|
||||
DnsDiscoveryTimeout time.Duration
|
||||
Interval time.Duration
|
||||
Prefixes []string
|
||||
}
|
||||
|
||||
func (o *Options) InitDefaults() {
|
||||
o.DnsDiscoveryTimeout = 5 * time.Second
|
||||
o.Interval = 60 * time.Second
|
||||
o.Prefixes = []string{"api.internal."}
|
||||
}
|
||||
|
||||
// run is responsible for running the protokube service controller
|
||||
func run() error {
|
||||
var o Options
|
||||
o.InitDefaults()
|
||||
|
||||
flags.BoolVar(&o.Containerized, "containerized", o.Containerized, "Set if we are running containerized.")
|
||||
flags.StringVar(&o.ClusterID, "cluster-id", o.ClusterID, "Cluster ID")
|
||||
|
||||
// Trick to avoid 'logging before flag.Parse' warning
|
||||
flag.CommandLine.Parse([]string{})
|
||||
|
||||
flag.Set("logtostderr", "true")
|
||||
flags.AddGoFlagSet(flag.CommandLine)
|
||||
flags.Parse(os.Args)
|
||||
|
||||
if o.ClusterID == "" {
|
||||
glog.Infof("updating records for all discovered clusters")
|
||||
} else {
|
||||
glog.Infof("updating records for cluster %q", o.ClusterID)
|
||||
}
|
||||
|
||||
c := &DiscoveryController{
|
||||
Options: o,
|
||||
}
|
||||
return c.Run()
|
||||
}
|
||||
|
||||
type DiscoveryController struct {
|
||||
Options Options
|
||||
}
|
||||
|
||||
func (c *DiscoveryController) Run() error {
|
||||
for {
|
||||
err := c.runOnce()
|
||||
if err != nil {
|
||||
glog.Warningf("error updating records: %v", err)
|
||||
}
|
||||
time.Sleep(c.Options.Interval)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *DiscoveryController) runOnce() error {
|
||||
o := &c.Options
|
||||
|
||||
rootfs := "/"
|
||||
if o.Containerized {
|
||||
rootfs = "/rootfs/"
|
||||
}
|
||||
|
||||
clusters, err := discoverKubernetesClusters(o.DnsDiscoveryTimeout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error from dns resolve: %v", err)
|
||||
}
|
||||
|
||||
glog.Infof("clusters: %v", clusters)
|
||||
|
||||
hostsPath := filepath.Join(rootfs, "etc/hosts")
|
||||
|
||||
addrToHosts := make(map[string][]string)
|
||||
for k, addrs := range clusters {
|
||||
if o.ClusterID != "" {
|
||||
if k != o.ClusterID {
|
||||
glog.V(2).Infof("skipping discovered cluster %q as does not match configured %q", k, o.ClusterID)
|
||||
continue
|
||||
}
|
||||
}
|
||||
for _, addr := range addrs {
|
||||
addrString := addr.String()
|
||||
for _, prefix := range o.Prefixes {
|
||||
addrToHosts[addrString] = append(addrToHosts[addrString], prefix+k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(addrToHosts) == 0 {
|
||||
// We don't update if there are no records remaining, just in case it is a transient blip
|
||||
glog.Warningf("no records found; skipping update")
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: Combined with previously discovered records (with a sliding window?)
|
||||
// TODO: Support an iptables / ipvs backend?
|
||||
// TODO: Verify resolved records against certificates?
|
||||
if err := hosts.UpdateHostsFileWithRecords(hostsPath, addrToHosts); err != nil {
|
||||
return fmt.Errorf("error updating hosts file: %v", err)
|
||||
} else {
|
||||
glog.Infof("updated %s", hostsPath)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func discoverKubernetesClusters(timeout time.Duration) (map[string][]net.IP, error) {
|
||||
addr := &net.UDPAddr{IP: net.IPv4(224, 0, 0, 251), Port: 5353}
|
||||
connection, err := net.ListenMulticastUDP("udp4", nil, addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error lisening for multicast: %v", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := connection.Close()
|
||||
if err != nil {
|
||||
glog.Warningf("error closing multicast connection: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
serviceName := "_kubernetes._tcp.local."
|
||||
|
||||
{
|
||||
m := new(dns.Msg)
|
||||
m.SetQuestion(serviceName, dns.TypePTR)
|
||||
m.RecursionDesired = false
|
||||
buf, err := m.Pack()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building DNS query: %v", err)
|
||||
}
|
||||
if _, err := connection.WriteToUDP(buf, addr); err != nil {
|
||||
return nil, fmt.Errorf("error sending DNS query: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
stopAt := time.Now().Add(timeout)
|
||||
|
||||
if err := connection.SetReadDeadline(stopAt); err != nil {
|
||||
return nil, fmt.Errorf("error setting socket read deadline: %v", err)
|
||||
}
|
||||
|
||||
ptrs := make(map[string][]*dns.PTR)
|
||||
srvs := make(map[string][]*dns.SRV)
|
||||
txts := make(map[string][]*dns.TXT)
|
||||
aaaas := make(map[string][]*dns.AAAA)
|
||||
as := make(map[string][]*dns.A)
|
||||
|
||||
buf := make([]byte, 65536)
|
||||
for {
|
||||
n, err := connection.Read(buf)
|
||||
if err != nil {
|
||||
if err, ok := err.(net.Error); ok && err.Timeout() {
|
||||
break
|
||||
}
|
||||
return nil, fmt.Errorf("error reading UDP response: %v", err)
|
||||
}
|
||||
msg := new(dns.Msg)
|
||||
if err := msg.Unpack(buf[:n]); err != nil {
|
||||
glog.Warningf("got unparseable DNS packet: %v", err)
|
||||
continue
|
||||
}
|
||||
glog.V(4).Infof("got response: %v", msg)
|
||||
|
||||
for _, rr := range msg.Answer {
|
||||
switch rr := rr.(type) {
|
||||
case *dns.PTR:
|
||||
glog.V(4).Infof("PTR %v", rr)
|
||||
ptrs[rr.Hdr.Name] = append(ptrs[rr.Hdr.Name], rr)
|
||||
case *dns.TXT:
|
||||
glog.V(4).Infof("TXT %v", rr)
|
||||
txts[rr.Hdr.Name] = append(txts[rr.Hdr.Name], rr)
|
||||
case *dns.SRV:
|
||||
glog.V(4).Infof("SRV %v", rr)
|
||||
srvs[rr.Hdr.Name] = append(srvs[rr.Hdr.Name], rr)
|
||||
case *dns.AAAA:
|
||||
glog.V(4).Infof("AAAA %v", rr)
|
||||
aaaas[rr.Hdr.Name] = append(aaaas[rr.Hdr.Name], rr)
|
||||
case *dns.A:
|
||||
glog.V(4).Infof("A %v", rr)
|
||||
as[rr.Hdr.Name] = append(as[rr.Hdr.Name], rr)
|
||||
default:
|
||||
glog.V(2).Infof("ignoring answer of unknown type %T: %v", rr, rr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
addrs := make(map[string][]net.IP)
|
||||
|
||||
for _, ptr := range ptrs[serviceName] {
|
||||
instance := strings.TrimSuffix(ptr.Ptr, serviceName)
|
||||
instance = strings.TrimSuffix(instance, ".")
|
||||
|
||||
// Dots in the instance name are escaped
|
||||
instance = strings.Replace(instance, "\\.", ".", -1)
|
||||
|
||||
for _, srv := range srvs[ptr.Ptr] {
|
||||
// TODO: Ignore if port is not 443?
|
||||
for _, a := range as[srv.Target] {
|
||||
addrs[instance] = append(addrs[instance], a.A)
|
||||
}
|
||||
for _, aaaa := range aaaas[srv.Target] {
|
||||
addrs[instance] = append(addrs[instance], aaaa.AAAA)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return addrs, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_docker//container:container.bzl",
|
||||
"container_image",
|
||||
)
|
||||
|
||||
container_image(
|
||||
name = "kube-discovery",
|
||||
base = "@debian_hyperkube_base_amd64//image",
|
||||
cmd = ["/usr/bin/kube-discovery"],
|
||||
directory = "/usr/bin/",
|
||||
files = [
|
||||
"//kube-discovery/cmd/kube-discovery",
|
||||
],
|
||||
)
|
||||
Loading…
Reference in New Issue