Merge pull request #4194 from justinsb/kubedisco

Add experimental kube-discovery
This commit is contained in:
k8s-ci-robot 2018-01-04 18:08:12 -08:00 committed by GitHub
commit 48365c3883
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 357 additions and 0 deletions

View File

@ -1,8 +1,38 @@
#=============================================================================
# Go rules
http_archive(
name = "io_bazel_rules_go",
url = "https://github.com/bazelbuild/rules_go/releases/download/0.8.1/rules_go-0.8.1.tar.gz",
sha256 = "90bb270d0a92ed5c83558b2797346917c46547f6f7103e648941ecdb6b9d0e72",
)
load("@io_bazel_rules_go//go:def.bzl", "go_rules_dependencies", "go_register_toolchains")
go_rules_dependencies()
go_register_toolchains()
#=============================================================================
# Docker rules
git_repository(
name = "io_bazel_rules_docker",
remote = "https://github.com/bazelbuild/rules_docker.git",
tag = "v0.3.0",
)
load(
"@io_bazel_rules_docker//container:container.bzl",
"container_pull",
container_repositories = "repositories",
)
container_repositories()
container_pull(
name = "debian_hyperkube_base_amd64",
registry = "gcr.io",
repository = "google_containers/debian-hyperkube-base-amd64",
# 'tag' is also supported, but digest is encouraged for reproducibility.
digest = "sha256:fc1b461367730660ac5a40c1eb2d1b23221829acf8a892981c12361383b3742b",
tag = "0.8",
)

View File

@ -31,6 +31,7 @@ k8s.io/kops/federation
k8s.io/kops/federation/model
k8s.io/kops/federation/targets/kubernetestarget
k8s.io/kops/federation/tasks
k8s.io/kops/kube-discovery/cmd/kube-discovery
k8s.io/kops/nodeup/pkg/bootstrap
k8s.io/kops/nodeup/pkg/distros
k8s.io/kops/nodeup/pkg/model

32
kube-discovery/README.md Normal file
View File

@ -0,0 +1,32 @@
## kube-discovery
Status: experimental
kube-discovery does master discovery, currently on bare-metal. The intention is to split this functionality
out of protokube, to make it reusable and modular.
Discovery methods:
* mDNS/DNS-SD (aka bonjour / zeroconf). Looks for services of type `_kubernetes._tcp`, with a name of clustername.
## mDNS
Example avahi configuration
`/etc/avahi/services/kubernetes.service`
```
<?xml version="1.0" standalone='no'?>
<!DOCTYPE service-group SYSTEM "avahi-service.dtd">
<service-group>
<name replace-wildcards="yes">example.k8s.local</name>
<service>
<type>_kubernetes._tcp</type>
<port>443</port>
</service>
</service-group>
```

View File

@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
go_library(
name = "go_default_library",
srcs = ["main.go"],
importpath = "k8s.io/kops/kube-discovery/cmd/kube-discovery",
visibility = ["//visibility:private"],
deps = [
"//protokube/pkg/gossip/dns/hosts:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/miekg/dns:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
],
)
go_binary(
name = "kube-discovery",
importpath = "k8s.io/kops/kube-discovery/cmd/kube-discovery",
library = ":go_default_library",
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,257 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"fmt"
"net"
"os"
"path/filepath"
"strings"
"time"
"github.com/golang/glog"
"github.com/miekg/dns"
"github.com/spf13/pflag"
"k8s.io/kops/protokube/pkg/gossip/dns/hosts"
)
var (
flags = pflag.NewFlagSet("", pflag.ExitOnError)
// BuildVersion is overwritten during build. This can be used to resolve issues.
BuildVersion = "0.1"
)
func main() {
fmt.Printf("kube-discovery version %s\n", BuildVersion)
if err := run(); err != nil {
glog.Errorf("unexpected error: %v", err)
os.Exit(1)
}
os.Exit(0)
}
type Options struct {
Containerized bool
ClusterID string
DnsDiscoveryTimeout time.Duration
Interval time.Duration
Prefixes []string
}
func (o *Options) InitDefaults() {
o.DnsDiscoveryTimeout = 5 * time.Second
o.Interval = 60 * time.Second
o.Prefixes = []string{"api.internal."}
}
// run is responsible for running the protokube service controller
func run() error {
var o Options
o.InitDefaults()
flags.BoolVar(&o.Containerized, "containerized", o.Containerized, "Set if we are running containerized.")
flags.StringVar(&o.ClusterID, "cluster-id", o.ClusterID, "Cluster ID")
// Trick to avoid 'logging before flag.Parse' warning
flag.CommandLine.Parse([]string{})
flag.Set("logtostderr", "true")
flags.AddGoFlagSet(flag.CommandLine)
flags.Parse(os.Args)
if o.ClusterID == "" {
glog.Infof("updating records for all discovered clusters")
} else {
glog.Infof("updating records for cluster %q", o.ClusterID)
}
c := &DiscoveryController{
Options: o,
}
return c.Run()
}
type DiscoveryController struct {
Options Options
}
func (c *DiscoveryController) Run() error {
for {
err := c.runOnce()
if err != nil {
glog.Warningf("error updating records: %v", err)
}
time.Sleep(c.Options.Interval)
}
}
func (c *DiscoveryController) runOnce() error {
o := &c.Options
rootfs := "/"
if o.Containerized {
rootfs = "/rootfs/"
}
clusters, err := discoverKubernetesClusters(o.DnsDiscoveryTimeout)
if err != nil {
return fmt.Errorf("error from dns resolve: %v", err)
}
glog.Infof("clusters: %v", clusters)
hostsPath := filepath.Join(rootfs, "etc/hosts")
addrToHosts := make(map[string][]string)
for k, addrs := range clusters {
if o.ClusterID != "" {
if k != o.ClusterID {
glog.V(2).Infof("skipping discovered cluster %q as does not match configured %q", k, o.ClusterID)
continue
}
}
for _, addr := range addrs {
addrString := addr.String()
for _, prefix := range o.Prefixes {
addrToHosts[addrString] = append(addrToHosts[addrString], prefix+k)
}
}
}
if len(addrToHosts) == 0 {
// We don't update if there are no records remaining, just in case it is a transient blip
glog.Warningf("no records found; skipping update")
return nil
}
// TODO: Combined with previously discovered records (with a sliding window?)
// TODO: Support an iptables / ipvs backend?
// TODO: Verify resolved records against certificates?
if err := hosts.UpdateHostsFileWithRecords(hostsPath, addrToHosts); err != nil {
return fmt.Errorf("error updating hosts file: %v", err)
} else {
glog.Infof("updated %s", hostsPath)
}
return nil
}
func discoverKubernetesClusters(timeout time.Duration) (map[string][]net.IP, error) {
addr := &net.UDPAddr{IP: net.IPv4(224, 0, 0, 251), Port: 5353}
connection, err := net.ListenMulticastUDP("udp4", nil, addr)
if err != nil {
return nil, fmt.Errorf("error lisening for multicast: %v", err)
}
defer func() {
err := connection.Close()
if err != nil {
glog.Warningf("error closing multicast connection: %v", err)
}
}()
serviceName := "_kubernetes._tcp.local."
{
m := new(dns.Msg)
m.SetQuestion(serviceName, dns.TypePTR)
m.RecursionDesired = false
buf, err := m.Pack()
if err != nil {
return nil, fmt.Errorf("error building DNS query: %v", err)
}
if _, err := connection.WriteToUDP(buf, addr); err != nil {
return nil, fmt.Errorf("error sending DNS query: %v", err)
}
}
stopAt := time.Now().Add(timeout)
if err := connection.SetReadDeadline(stopAt); err != nil {
return nil, fmt.Errorf("error setting socket read deadline: %v", err)
}
ptrs := make(map[string][]*dns.PTR)
srvs := make(map[string][]*dns.SRV)
txts := make(map[string][]*dns.TXT)
aaaas := make(map[string][]*dns.AAAA)
as := make(map[string][]*dns.A)
buf := make([]byte, 65536)
for {
n, err := connection.Read(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
break
}
return nil, fmt.Errorf("error reading UDP response: %v", err)
}
msg := new(dns.Msg)
if err := msg.Unpack(buf[:n]); err != nil {
glog.Warningf("got unparseable DNS packet: %v", err)
continue
}
glog.V(4).Infof("got response: %v", msg)
for _, rr := range msg.Answer {
switch rr := rr.(type) {
case *dns.PTR:
glog.V(4).Infof("PTR %v", rr)
ptrs[rr.Hdr.Name] = append(ptrs[rr.Hdr.Name], rr)
case *dns.TXT:
glog.V(4).Infof("TXT %v", rr)
txts[rr.Hdr.Name] = append(txts[rr.Hdr.Name], rr)
case *dns.SRV:
glog.V(4).Infof("SRV %v", rr)
srvs[rr.Hdr.Name] = append(srvs[rr.Hdr.Name], rr)
case *dns.AAAA:
glog.V(4).Infof("AAAA %v", rr)
aaaas[rr.Hdr.Name] = append(aaaas[rr.Hdr.Name], rr)
case *dns.A:
glog.V(4).Infof("A %v", rr)
as[rr.Hdr.Name] = append(as[rr.Hdr.Name], rr)
default:
glog.V(2).Infof("ignoring answer of unknown type %T: %v", rr, rr)
}
}
}
addrs := make(map[string][]net.IP)
for _, ptr := range ptrs[serviceName] {
instance := strings.TrimSuffix(ptr.Ptr, serviceName)
instance = strings.TrimSuffix(instance, ".")
// Dots in the instance name are escaped
instance = strings.Replace(instance, "\\.", ".", -1)
for _, srv := range srvs[ptr.Ptr] {
// TODO: Ignore if port is not 443?
for _, a := range as[srv.Target] {
addrs[instance] = append(addrs[instance], a.A)
}
for _, aaaa := range aaaas[srv.Target] {
addrs[instance] = append(addrs[instance], aaaa.AAAA)
}
}
}
return addrs, nil
}

View File

@ -0,0 +1,16 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_docker//container:container.bzl",
"container_image",
)
container_image(
name = "kube-discovery",
base = "@debian_hyperkube_base_amd64//image",
cmd = ["/usr/bin/kube-discovery"],
directory = "/usr/bin/",
files = [
"//kube-discovery/cmd/kube-discovery",
],
)